Skip to main content

atomr_cluster/
gossip_pdu.rs

1//! Gossip dissemination PDUs.
2//!
3//! `GossipStatus` / `GossipEnvelope` / `GossipMerge` are the PDUs the
4//! leader exchanges with peers on each tick. The actual transport plug-in
5//! (over atomr-remote) wires up once Phase 5.D ships; this module contains
6//! the typed shapes.
7
8use atomr_core::actor::Address;
9use serde::{Deserialize, Serialize};
10
11use crate::membership::MembershipState;
12use crate::vector_clock::{VectorClock, VectorRelation};
13
14/// One gossip exchange. Sender hands a `GossipStatus` to the peer;
15/// peer responds with either a full `GossipEnvelope` (if it has
16/// newer state) or a `GossipMerge` request (if its state is older).
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[non_exhaustive]
19pub enum GossipPdu {
20    /// "Here is my version vector — do you have newer state?"
21    Status { from: String, version: VectorClock },
22    /// "Here's my whole state, merge it in."
23    Envelope { from: String, version: VectorClock, state: MembershipState },
24    /// "I'm older than you — please send me your envelope."
25    Merge { from: String, version: VectorClock },
26}
27
28/// Decision the receiver makes after comparing version vectors.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30#[non_exhaustive]
31pub enum GossipDecision {
32    /// We're newer — send our envelope.
33    SendEnvelope,
34    /// We're older — request theirs.
35    RequestMerge,
36    /// Concurrent — merge both.
37    MergeBoth,
38    /// Identical — nothing to do.
39    Same,
40}
41
42/// Pure decision function: given the local + remote version vectors,
43/// what should we do?
44pub fn decide(local: &VectorClock, remote: &VectorClock) -> GossipDecision {
45    match local.compare(remote) {
46        VectorRelation::Same => GossipDecision::Same,
47        VectorRelation::After => GossipDecision::SendEnvelope,
48        VectorRelation::Before => GossipDecision::RequestMerge,
49        VectorRelation::Concurrent => GossipDecision::MergeBoth,
50    }
51}
52
53/// Pick the next peer to gossip with, given a list of known peer
54/// addresses and a per-tick cursor. Skips `self_addr`.
55pub fn pick_gossip_target<'a>(
56    peers: &'a [Address],
57    self_addr: &Address,
58    cursor: usize,
59) -> Option<&'a Address> {
60    if peers.is_empty() {
61        return None;
62    }
63    let total = peers.len();
64    for offset in 0..total {
65        let p = &peers[(cursor + offset) % total];
66        if p != self_addr {
67            return Some(p);
68        }
69    }
70    None
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76
77    fn vc(entries: &[(&str, u64)]) -> VectorClock {
78        let mut v = VectorClock::new();
79        for (node, n) in entries {
80            for _ in 0..*n {
81                v.tick(node);
82            }
83        }
84        v
85    }
86
87    #[test]
88    fn decide_same_when_equal() {
89        let a = vc(&[("n1", 1), ("n2", 2)]);
90        let b = vc(&[("n1", 1), ("n2", 2)]);
91        assert_eq!(decide(&a, &b), GossipDecision::Same);
92    }
93
94    #[test]
95    fn decide_send_envelope_when_local_is_newer() {
96        let a = vc(&[("n1", 3), ("n2", 2)]);
97        let b = vc(&[("n1", 1), ("n2", 2)]);
98        assert_eq!(decide(&a, &b), GossipDecision::SendEnvelope);
99    }
100
101    #[test]
102    fn decide_request_merge_when_local_is_older() {
103        let a = vc(&[("n1", 1), ("n2", 2)]);
104        let b = vc(&[("n1", 3), ("n2", 2)]);
105        assert_eq!(decide(&a, &b), GossipDecision::RequestMerge);
106    }
107
108    #[test]
109    fn decide_merge_when_concurrent() {
110        let a = vc(&[("n1", 2), ("n2", 0)]);
111        let b = vc(&[("n1", 0), ("n2", 2)]);
112        assert_eq!(decide(&a, &b), GossipDecision::MergeBoth);
113    }
114
115    #[test]
116    fn pick_gossip_target_skips_self() {
117        let peers = vec![Address::local("a"), Address::local("b"), Address::local("c")];
118        let self_addr = Address::local("b");
119        let pick = pick_gossip_target(&peers, &self_addr, 1);
120        // cursor=1 → peers[1]="b" → skipped → peers[2]="c"
121        assert_eq!(pick, Some(&peers[2]));
122    }
123
124    #[test]
125    fn pick_gossip_target_returns_none_when_only_self() {
126        let peers = vec![Address::local("a")];
127        let self_addr = Address::local("a");
128        assert!(pick_gossip_target(&peers, &self_addr, 0).is_none());
129    }
130
131    #[test]
132    fn pick_gossip_target_handles_empty() {
133        let pick = pick_gossip_target(&[], &Address::local("x"), 0);
134        assert!(pick.is_none());
135    }
136
137    #[test]
138    fn pdus_serialize_round_trip() {
139        let pdu = GossipPdu::Status { from: "node-1".into(), version: vc(&[("a", 1)]) };
140        let cfg = bincode::config::standard();
141        let bytes = bincode::serde::encode_to_vec(&pdu, cfg).unwrap();
142        let (back, _): (GossipPdu, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
143        match back {
144            GossipPdu::Status { from, .. } => assert_eq!(from, "node-1"),
145            _ => panic!("expected Status"),
146        }
147    }
148}