Skip to main content

nodedb_cluster/swim/wire/
message.rs

1//! Top-level SWIM datagram enum.
2//!
3//! `SwimMessage` is the single type every transport sends and receives.
4//! zerompk encodes it as a length-2 MessagePack array `[VariantName,
5//! payload]`, where `VariantName` is the Rust variant identifier
6//! verbatim (`Ping`, `PingReq`, `Ack`, `Nack`). The variant name strings
7//! are part of the wire contract — renaming them breaks compatibility.
8
9use serde::{Deserialize, Serialize};
10
11use super::probe::{Ack, Nack, Ping, PingReq};
12use crate::swim::member::record::MemberUpdate;
13
14/// The four datagram types SWIM exchanges over the wire.
15#[derive(
16    Debug,
17    Clone,
18    PartialEq,
19    Eq,
20    Serialize,
21    Deserialize,
22    zerompk::ToMessagePack,
23    zerompk::FromMessagePack,
24)]
25pub enum SwimMessage {
26    Ping(Ping),
27    PingReq(PingReq),
28    Ack(Ack),
29    Nack(Nack),
30}
31
32impl SwimMessage {
33    /// Mutable borrow of the piggyback slot, independent of variant.
34    /// Used by the dissemination queue to stamp outgoing deltas
35    /// without caring which message type it is stamping onto.
36    pub fn piggyback_mut(&mut self) -> &mut Vec<MemberUpdate> {
37        match self {
38            SwimMessage::Ping(m) => &mut m.piggyback,
39            SwimMessage::PingReq(m) => &mut m.piggyback,
40            SwimMessage::Ack(m) => &mut m.piggyback,
41            SwimMessage::Nack(m) => &mut m.piggyback,
42        }
43    }
44
45    /// Read-only borrow of the piggyback slot.
46    pub fn piggyback(&self) -> &[MemberUpdate] {
47        match self {
48            SwimMessage::Ping(m) => &m.piggyback,
49            SwimMessage::PingReq(m) => &m.piggyback,
50            SwimMessage::Ack(m) => &m.piggyback,
51            SwimMessage::Nack(m) => &m.piggyback,
52        }
53    }
54
55    /// Drop piggyback entries beyond `max`. Used before encoding to keep
56    /// a datagram below the UDP MTU — the dissemination queue will
57    /// decide which updates are highest-priority; this helper just
58    /// enforces the upper bound.
59    pub fn truncate_piggyback(&mut self, max: usize) {
60        let slot = self.piggyback_mut();
61        if slot.len() > max {
62            slot.truncate(max);
63        }
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::super::probe::{NackReason, ProbeId};
70    use super::*;
71    use crate::swim::incarnation::Incarnation;
72    use crate::swim::member::MemberState;
73    use nodedb_types::NodeId;
74    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
75
76    fn mk_update(id: &str) -> MemberUpdate {
77        MemberUpdate {
78            node_id: NodeId::new(id),
79            addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7000).to_string(),
80            state: MemberState::Alive,
81            incarnation: Incarnation::ZERO,
82        }
83    }
84
85    fn ping_with_piggyback(n: usize) -> SwimMessage {
86        SwimMessage::Ping(Ping {
87            probe_id: ProbeId::new(1),
88            from: NodeId::new("a"),
89            incarnation: Incarnation::new(2),
90            piggyback: (0..n).map(|i| mk_update(&format!("n{i}"))).collect(),
91        })
92    }
93
94    #[test]
95    fn piggyback_accessor_returns_variant_slot() {
96        let msg = ping_with_piggyback(3);
97        assert_eq!(msg.piggyback().len(), 3);
98    }
99
100    #[test]
101    fn truncate_bounds_piggyback() {
102        let mut msg = ping_with_piggyback(10);
103        msg.truncate_piggyback(4);
104        assert_eq!(msg.piggyback().len(), 4);
105    }
106
107    #[test]
108    fn truncate_is_noop_when_under_limit() {
109        let mut msg = ping_with_piggyback(2);
110        msg.truncate_piggyback(16);
111        assert_eq!(msg.piggyback().len(), 2);
112    }
113
114    #[test]
115    fn piggyback_mut_accessor_for_every_variant() {
116        let mut variants: Vec<SwimMessage> = vec![
117            ping_with_piggyback(0),
118            SwimMessage::PingReq(PingReq {
119                probe_id: ProbeId::ZERO,
120                from: NodeId::new("a"),
121                target: NodeId::new("b"),
122                target_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7001).to_string(),
123                piggyback: vec![],
124            }),
125            SwimMessage::Ack(Ack {
126                probe_id: ProbeId::ZERO,
127                from: NodeId::new("b"),
128                incarnation: Incarnation::ZERO,
129                piggyback: vec![],
130            }),
131            SwimMessage::Nack(Nack {
132                probe_id: ProbeId::ZERO,
133                from: NodeId::new("c"),
134                reason: NackReason::TargetUnreachable,
135                piggyback: vec![],
136            }),
137        ];
138        for m in &mut variants {
139            m.piggyback_mut().push(mk_update("extra"));
140            assert_eq!(m.piggyback().len(), 1);
141        }
142    }
143}