use crate::merge::Merge;
use crate::state::ConstraintState;
use crate::merkle::StateHash;
use crate::delta::{ConstraintDelta, DeltaTracker};
use crate::vclock::VectorClock;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GossipMessage {
Ping {
node: String,
state_hash: StateHash,
clock: VectorClock,
},
Sync {
node: String,
state: ConstraintState,
clock: VectorClock,
},
Delta {
node: String,
delta: ConstraintDelta,
clock: VectorClock,
},
Ack {
node: String,
clock: VectorClock,
},
}
#[derive(Debug, Clone)]
pub struct GossipNode {
pub node_id: String,
pub state: ConstraintState,
pub clock: VectorClock,
pub tracker: DeltaTracker,
peer_hashes: std::collections::HashMap<String, StateHash>,
rounds: u64,
syncs: u64,
}
pub struct GossipResult {
pub responses: Vec<GossipMessage>,
pub state_changed: bool,
pub converged: bool,
}
impl GossipNode {
pub fn new(node_id: &str) -> Self {
Self {
node_id: node_id.to_string(),
state: ConstraintState::new(node_id),
clock: VectorClock::new(),
tracker: DeltaTracker::new(),
peer_hashes: std::collections::HashMap::new(),
rounds: 0,
syncs: 0,
}
}
pub fn state_hash(&self) -> StateHash {
StateHash::from_state(&self.state)
}
fn tick(&mut self) {
self.clock.increment(&self.node_id);
}
pub fn add_constraint(&mut self, id: &str) {
self.state.add_constraint(id);
self.tick();
}
pub fn record_satisfied(&mut self, count: u64) {
self.state.record_satisfied(count);
self.tick();
}
pub fn record_violations(&mut self, count: u64) {
self.state.record_violations(count);
self.tick();
}
pub fn ping(&self) -> GossipMessage {
GossipMessage::Ping {
node: self.node_id.clone(),
state_hash: self.state_hash(),
clock: self.clock.clone(),
}
}
pub fn receive(&mut self, msg: &GossipMessage) -> GossipResult {
self.rounds += 1;
match msg {
GossipMessage::Ping { node, state_hash, clock } => {
self.peer_hashes.insert(node.clone(), *state_hash);
let my_hash = self.state_hash();
if my_hash == *state_hash {
GossipResult {
responses: vec![GossipMessage::Ack {
node: self.node_id.clone(),
clock: self.clock.clone(),
}],
state_changed: false,
converged: true,
}
} else {
self.syncs += 1;
GossipResult {
responses: vec![GossipMessage::Sync {
node: self.node_id.clone(),
state: self.state.clone(),
clock: self.clock.clone(),
}],
state_changed: false,
converged: false,
}
}
}
GossipMessage::Sync { node, state, clock } => {
let old_hash = self.state_hash();
self.state.merge(state);
self.clock.merge(clock);
let new_hash = self.state_hash();
self.peer_hashes.insert(node.clone(), new_hash);
GossipResult {
responses: vec![GossipMessage::Ack {
node: self.node_id.clone(),
clock: self.clock.clone(),
}],
state_changed: old_hash != new_hash,
converged: false,
}
}
GossipMessage::Delta { node, delta: _, clock } => {
self.clock.merge(clock);
GossipResult {
responses: vec![GossipMessage::Ack {
node: self.node_id.clone(),
clock: self.clock.clone(),
}],
state_changed: true,
converged: false,
}
}
GossipMessage::Ack { node, clock } => {
self.clock.merge(clock);
let my_hash = self.state_hash();
self.peer_hashes.insert(node.clone(), my_hash);
let all_match = self.peer_hashes.values().all(|h| *h == my_hash);
GossipResult {
responses: vec![],
state_changed: false,
converged: all_match,
}
}
}
}
pub fn rounds(&self) -> u64 { self.rounds }
pub fn syncs(&self) -> u64 { self.syncs }
pub fn peer_count(&self) -> usize { self.peer_hashes.len() }
}
pub fn exchange(a: &mut GossipNode, b: &mut GossipNode) -> bool {
let ping = a.ping();
let r1 = b.receive(&ping);
let mut changed = false;
for resp in &r1.responses {
let r2 = a.receive(resp);
if r2.state_changed { changed = true; }
}
changed
}
impl fmt::Display for GossipNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GossipNode({}, {} constraints, {} peers, {} rounds, hash={})",
self.node_id,
self.state.active_constraint_count(),
self.peer_count(),
self.rounds,
self.state_hash())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_nodes_converge_immediately() {
let a = GossipNode::new("a");
let mut b = GossipNode::new("b");
let ping = a.ping();
let result = b.receive(&ping);
assert!(result.converged);
}
#[test]
fn test_sync_via_exchange() {
let mut a = GossipNode::new("a");
let mut b = GossipNode::new("b");
a.add_constraint("c1");
a.record_satisfied(100);
exchange(&mut a, &mut b);
exchange(&mut b, &mut a);
assert!(b.state.constraints.contains("c1"));
}
#[test]
fn test_bidirectional_sync() {
let mut a = GossipNode::new("a");
let mut b = GossipNode::new("b");
a.add_constraint("c1");
b.add_constraint("c2");
exchange(&mut a, &mut b);
exchange(&mut b, &mut a);
assert!(a.state.constraints.contains("c2"),
"a should have c2 from b's sync");
assert!(b.state.constraints.contains("c1"),
"b should have c1 from a's sync");
assert_eq!(a.state.active_constraint_count(), 2);
assert_eq!(b.state.active_constraint_count(), 2);
}
#[test]
fn test_three_way_eventual_consistency() {
let mut a = GossipNode::new("a");
let mut b = GossipNode::new("b");
let mut c = GossipNode::new("c");
a.add_constraint("c1");
b.add_constraint("c2");
c.add_constraint("c3");
exchange(&mut a, &mut b);
exchange(&mut b, &mut a);
exchange(&mut b, &mut c);
exchange(&mut c, &mut b);
exchange(&mut a, &mut c);
exchange(&mut c, &mut a);
assert_eq!(a.state.active_constraint_count(), 3, "a should have all 3");
assert_eq!(b.state.active_constraint_count(), 3, "b should have all 3");
assert_eq!(c.state.active_constraint_count(), 3, "c should have all 3");
}
#[test]
fn test_display() {
let node = GossipNode::new("forgemaster");
let s = format!("{}", node);
assert!(s.contains("forgemaster"));
}
}