use std::collections::HashMap;
use super::types::{NodeId, StateDigest};
#[derive(Debug, Clone)]
pub enum ConvergenceResult {
Converged,
Pending(ConvergencePending),
Diverged(ConvergenceDiff),
}
impl ConvergenceResult {
pub fn is_converged(&self) -> bool {
matches!(self, Self::Converged)
}
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending(_))
}
pub fn is_diverged(&self) -> bool {
matches!(self, Self::Diverged(_))
}
}
#[derive(Debug, Clone)]
pub struct ConvergencePending {
pub blocking_property: ConvergenceProperty,
pub reason: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConvergenceProperty {
NetworkQuiescent,
AllNodesIdle,
NoPendingBuffers,
NoPendingSyncTimers,
StateDigestsEqual,
}
impl ConvergenceProperty {
pub fn id(&self) -> &'static str {
match self {
Self::NetworkQuiescent => "C1",
Self::AllNodesIdle => "C2",
Self::NoPendingBuffers => "C3",
Self::NoPendingSyncTimers => "C4",
Self::StateDigestsEqual => "C5",
}
}
}
#[derive(Debug, Clone)]
pub struct ConvergenceDiff {
pub digests: HashMap<NodeId, StateDigest>,
pub differing_nodes: Vec<NodeId>,
pub majority_digest: Option<StateDigest>,
}
#[derive(Debug, Clone)]
pub struct ConvergenceInput {
pub in_flight_messages: usize,
pub nodes: Vec<NodeConvergenceState>,
}
#[derive(Debug, Clone)]
pub struct NodeConvergenceState {
pub id: NodeId,
pub sync_active: bool,
pub buffer_size: usize,
pub sync_timer_count: usize,
pub digest: StateDigest,
}
pub fn check_convergence(input: &ConvergenceInput) -> ConvergenceResult {
if input.in_flight_messages > 0 {
return ConvergenceResult::Pending(ConvergencePending {
blocking_property: ConvergenceProperty::NetworkQuiescent,
reason: format!("{} messages in flight", input.in_flight_messages),
});
}
for node in &input.nodes {
if node.sync_active {
return ConvergenceResult::Pending(ConvergencePending {
blocking_property: ConvergenceProperty::AllNodesIdle,
reason: format!("node {} has sync active", node.id),
});
}
}
for node in &input.nodes {
if node.buffer_size > 0 {
return ConvergenceResult::Pending(ConvergencePending {
blocking_property: ConvergenceProperty::NoPendingBuffers,
reason: format!("node {} has {} buffered deltas", node.id, node.buffer_size),
});
}
}
for node in &input.nodes {
if node.sync_timer_count > 0 {
return ConvergenceResult::Pending(ConvergencePending {
blocking_property: ConvergenceProperty::NoPendingSyncTimers,
reason: format!("node {} has {} sync timers", node.id, node.sync_timer_count),
});
}
}
if input.nodes.is_empty() {
return ConvergenceResult::Converged;
}
let first_digest = input.nodes[0].digest;
let all_equal = input.nodes.iter().all(|n| n.digest == first_digest);
if all_equal {
return ConvergenceResult::Converged;
}
let mut digests = HashMap::new();
let mut digest_counts: HashMap<StateDigest, usize> = HashMap::new();
for node in &input.nodes {
digests.insert(node.id.clone(), node.digest);
*digest_counts.entry(node.digest).or_default() += 1;
}
let majority_digest = digest_counts
.iter()
.max_by(|(d1, c1), (d2, c2)| {
c1.cmp(c2).then_with(|| {
d1.0.cmp(&d2.0)
})
})
.map(|(digest, _)| *digest);
let differing_nodes: Vec<_> = input
.nodes
.iter()
.filter(|n| Some(n.digest) != majority_digest)
.map(|n| n.id.clone())
.collect();
ConvergenceResult::Diverged(ConvergenceDiff {
digests,
differing_nodes,
majority_digest,
})
}
pub fn is_deadlocked(input: &ConvergenceInput, queue_empty: bool) -> bool {
if !queue_empty {
return false;
}
let result = check_convergence(input);
if result.is_converged() {
return false;
}
let has_active_sync = input.nodes.iter().any(|n| n.sync_active);
let has_buffered = input.nodes.iter().any(|n| n.buffer_size > 0);
let has_timers = input.nodes.iter().any(|n| n.sync_timer_count > 0);
has_active_sync || has_buffered || has_timers
}
#[cfg(test)]
mod tests {
use super::*;
fn make_node(id: &str, digest: [u8; 32]) -> NodeConvergenceState {
NodeConvergenceState {
id: NodeId::new(id),
sync_active: false,
buffer_size: 0,
sync_timer_count: 0,
digest: StateDigest::from_bytes(digest),
}
}
#[test]
fn test_converged_empty() {
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![],
};
assert!(check_convergence(&input).is_converged());
}
#[test]
fn test_converged_single_node() {
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![make_node("a", [1; 32])],
};
assert!(check_convergence(&input).is_converged());
}
#[test]
fn test_converged_matching_digests() {
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![
make_node("a", [1; 32]),
make_node("b", [1; 32]),
make_node("c", [1; 32]),
],
};
assert!(check_convergence(&input).is_converged());
}
#[test]
fn test_pending_messages_in_flight() {
let input = ConvergenceInput {
in_flight_messages: 5,
nodes: vec![make_node("a", [1; 32]), make_node("b", [1; 32])],
};
let result = check_convergence(&input);
assert!(result.is_pending());
if let ConvergenceResult::Pending(p) = result {
assert_eq!(p.blocking_property, ConvergenceProperty::NetworkQuiescent);
}
}
#[test]
fn test_pending_sync_active() {
let mut node = make_node("a", [1; 32]);
node.sync_active = true;
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![node, make_node("b", [1; 32])],
};
let result = check_convergence(&input);
assert!(result.is_pending());
if let ConvergenceResult::Pending(p) = result {
assert_eq!(p.blocking_property, ConvergenceProperty::AllNodesIdle);
}
}
#[test]
fn test_pending_buffer_not_empty() {
let mut node = make_node("a", [1; 32]);
node.buffer_size = 3;
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![node, make_node("b", [1; 32])],
};
let result = check_convergence(&input);
assert!(result.is_pending());
if let ConvergenceResult::Pending(p) = result {
assert_eq!(p.blocking_property, ConvergenceProperty::NoPendingBuffers);
}
}
#[test]
fn test_pending_sync_timers() {
let mut node = make_node("a", [1; 32]);
node.sync_timer_count = 1;
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![node, make_node("b", [1; 32])],
};
let result = check_convergence(&input);
assert!(result.is_pending());
if let ConvergenceResult::Pending(p) = result {
assert_eq!(
p.blocking_property,
ConvergenceProperty::NoPendingSyncTimers
);
}
}
#[test]
fn test_diverged() {
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![
make_node("a", [1; 32]),
make_node("b", [1; 32]),
make_node("c", [2; 32]), ],
};
let result = check_convergence(&input);
assert!(result.is_diverged());
if let ConvergenceResult::Diverged(diff) = result {
assert_eq!(diff.differing_nodes.len(), 1);
assert!(diff.differing_nodes.contains(&NodeId::new("c")));
assert_eq!(diff.majority_digest, Some(StateDigest::from_bytes([1; 32])));
}
}
#[test]
fn test_deadlock_detection() {
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![make_node("a", [1; 32]), make_node("b", [2; 32])],
};
assert!(!is_deadlocked(&input, false));
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![make_node("a", [1; 32]), make_node("b", [1; 32])],
};
assert!(!is_deadlocked(&input, true));
let mut node = make_node("a", [1; 32]);
node.sync_active = true;
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![node, make_node("b", [2; 32])],
};
assert!(is_deadlocked(&input, true));
}
#[test]
fn test_majority_digest_tiebreaker() {
let input = ConvergenceInput {
in_flight_messages: 0,
nodes: vec![
make_node("a", [1; 32]), make_node("b", [2; 32]), ],
};
let result = check_convergence(&input);
if let ConvergenceResult::Diverged(diff) = result {
assert_eq!(
diff.majority_digest,
Some(StateDigest::from_bytes([2; 32])),
"Majority should be deterministic on tie"
);
} else {
panic!("Expected diverged result");
}
for _ in 0..10 {
let result = check_convergence(&input);
if let ConvergenceResult::Diverged(diff) = result {
assert_eq!(
diff.majority_digest,
Some(StateDigest::from_bytes([2; 32])),
"Majority should be consistent across calls"
);
}
}
}
}