atomr_cluster/
gossip_pdu.rs1use atomr_core::actor::Address;
9use serde::{Deserialize, Serialize};
10
11use crate::membership::MembershipState;
12use crate::vector_clock::{VectorClock, VectorRelation};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
18#[non_exhaustive]
19pub enum GossipPdu {
20 Status { from: String, version: VectorClock },
22 Envelope { from: String, version: VectorClock, state: MembershipState },
24 Merge { from: String, version: VectorClock },
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30#[non_exhaustive]
31pub enum GossipDecision {
32 SendEnvelope,
34 RequestMerge,
36 MergeBoth,
38 Same,
40}
41
42pub fn decide(local: &VectorClock, remote: &VectorClock) -> GossipDecision {
45 match local.compare(remote) {
46 VectorRelation::Same => GossipDecision::Same,
47 VectorRelation::After => GossipDecision::SendEnvelope,
48 VectorRelation::Before => GossipDecision::RequestMerge,
49 VectorRelation::Concurrent => GossipDecision::MergeBoth,
50 }
51}
52
53pub fn pick_gossip_target<'a>(
56 peers: &'a [Address],
57 self_addr: &Address,
58 cursor: usize,
59) -> Option<&'a Address> {
60 if peers.is_empty() {
61 return None;
62 }
63 let total = peers.len();
64 for offset in 0..total {
65 let p = &peers[(cursor + offset) % total];
66 if p != self_addr {
67 return Some(p);
68 }
69 }
70 None
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76
77 fn vc(entries: &[(&str, u64)]) -> VectorClock {
78 let mut v = VectorClock::new();
79 for (node, n) in entries {
80 for _ in 0..*n {
81 v.tick(node);
82 }
83 }
84 v
85 }
86
87 #[test]
88 fn decide_same_when_equal() {
89 let a = vc(&[("n1", 1), ("n2", 2)]);
90 let b = vc(&[("n1", 1), ("n2", 2)]);
91 assert_eq!(decide(&a, &b), GossipDecision::Same);
92 }
93
94 #[test]
95 fn decide_send_envelope_when_local_is_newer() {
96 let a = vc(&[("n1", 3), ("n2", 2)]);
97 let b = vc(&[("n1", 1), ("n2", 2)]);
98 assert_eq!(decide(&a, &b), GossipDecision::SendEnvelope);
99 }
100
101 #[test]
102 fn decide_request_merge_when_local_is_older() {
103 let a = vc(&[("n1", 1), ("n2", 2)]);
104 let b = vc(&[("n1", 3), ("n2", 2)]);
105 assert_eq!(decide(&a, &b), GossipDecision::RequestMerge);
106 }
107
108 #[test]
109 fn decide_merge_when_concurrent() {
110 let a = vc(&[("n1", 2), ("n2", 0)]);
111 let b = vc(&[("n1", 0), ("n2", 2)]);
112 assert_eq!(decide(&a, &b), GossipDecision::MergeBoth);
113 }
114
115 #[test]
116 fn pick_gossip_target_skips_self() {
117 let peers = vec![Address::local("a"), Address::local("b"), Address::local("c")];
118 let self_addr = Address::local("b");
119 let pick = pick_gossip_target(&peers, &self_addr, 1);
120 assert_eq!(pick, Some(&peers[2]));
122 }
123
124 #[test]
125 fn pick_gossip_target_returns_none_when_only_self() {
126 let peers = vec![Address::local("a")];
127 let self_addr = Address::local("a");
128 assert!(pick_gossip_target(&peers, &self_addr, 0).is_none());
129 }
130
131 #[test]
132 fn pick_gossip_target_handles_empty() {
133 let pick = pick_gossip_target(&[], &Address::local("x"), 0);
134 assert!(pick.is_none());
135 }
136
137 #[test]
138 fn pdus_serialize_round_trip() {
139 let pdu = GossipPdu::Status { from: "node-1".into(), version: vc(&[("a", 1)]) };
140 let cfg = bincode::config::standard();
141 let bytes = bincode::serde::encode_to_vec(&pdu, cfg).unwrap();
142 let (back, _): (GossipPdu, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
143 match back {
144 GossipPdu::Status { from, .. } => assert_eq!(from, "node-1"),
145 _ => panic!("expected Status"),
146 }
147 }
148}