use atomr_core::actor::Address;
use serde::{Deserialize, Serialize};
use crate::membership::MembershipState;
use crate::vector_clock::{VectorClock, VectorRelation};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub enum GossipPdu {
Status { from: String, version: VectorClock },
Envelope { from: String, version: VectorClock, state: MembershipState },
Merge { from: String, version: VectorClock },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum GossipDecision {
SendEnvelope,
RequestMerge,
MergeBoth,
Same,
}
pub fn decide(local: &VectorClock, remote: &VectorClock) -> GossipDecision {
match local.compare(remote) {
VectorRelation::Same => GossipDecision::Same,
VectorRelation::After => GossipDecision::SendEnvelope,
VectorRelation::Before => GossipDecision::RequestMerge,
VectorRelation::Concurrent => GossipDecision::MergeBoth,
}
}
pub fn pick_gossip_target<'a>(
peers: &'a [Address],
self_addr: &Address,
cursor: usize,
) -> Option<&'a Address> {
if peers.is_empty() {
return None;
}
let total = peers.len();
for offset in 0..total {
let p = &peers[(cursor + offset) % total];
if p != self_addr {
return Some(p);
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
fn vc(entries: &[(&str, u64)]) -> VectorClock {
let mut v = VectorClock::new();
for (node, n) in entries {
for _ in 0..*n {
v.tick(node);
}
}
v
}
#[test]
fn decide_same_when_equal() {
let a = vc(&[("n1", 1), ("n2", 2)]);
let b = vc(&[("n1", 1), ("n2", 2)]);
assert_eq!(decide(&a, &b), GossipDecision::Same);
}
#[test]
fn decide_send_envelope_when_local_is_newer() {
let a = vc(&[("n1", 3), ("n2", 2)]);
let b = vc(&[("n1", 1), ("n2", 2)]);
assert_eq!(decide(&a, &b), GossipDecision::SendEnvelope);
}
#[test]
fn decide_request_merge_when_local_is_older() {
let a = vc(&[("n1", 1), ("n2", 2)]);
let b = vc(&[("n1", 3), ("n2", 2)]);
assert_eq!(decide(&a, &b), GossipDecision::RequestMerge);
}
#[test]
fn decide_merge_when_concurrent() {
let a = vc(&[("n1", 2), ("n2", 0)]);
let b = vc(&[("n1", 0), ("n2", 2)]);
assert_eq!(decide(&a, &b), GossipDecision::MergeBoth);
}
#[test]
fn pick_gossip_target_skips_self() {
let peers = vec![Address::local("a"), Address::local("b"), Address::local("c")];
let self_addr = Address::local("b");
let pick = pick_gossip_target(&peers, &self_addr, 1);
assert_eq!(pick, Some(&peers[2]));
}
#[test]
fn pick_gossip_target_returns_none_when_only_self() {
let peers = vec![Address::local("a")];
let self_addr = Address::local("a");
assert!(pick_gossip_target(&peers, &self_addr, 0).is_none());
}
#[test]
fn pick_gossip_target_handles_empty() {
let pick = pick_gossip_target(&[], &Address::local("x"), 0);
assert!(pick.is_none());
}
#[test]
fn pdus_serialize_round_trip() {
let pdu = GossipPdu::Status { from: "node-1".into(), version: vc(&[("a", 1)]) };
let cfg = bincode::config::standard();
let bytes = bincode::serde::encode_to_vec(&pdu, cfg).unwrap();
let (back, _): (GossipPdu, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
match back {
GossipPdu::Status { from, .. } => assert_eq!(from, "node-1"),
_ => panic!("expected Status"),
}
}
}