Skip to main content

nodedb_cluster/swim/wire/
message.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Top-level SWIM datagram enum.
4//!
5//! `SwimMessage` is the single type every transport sends and receives.
6//! zerompk encodes it as a length-2 MessagePack array `[VariantName,
7//! payload]`, where `VariantName` is the Rust variant identifier
8//! verbatim (`Ping`, `PingReq`, `Ack`, `Nack`). The variant name strings
9//! are part of the wire contract — renaming them breaks compatibility.
10
11use serde::{Deserialize, Serialize};
12
13use super::probe::{Ack, Nack, Ping, PingReq};
14use crate::swim::member::record::MemberUpdate;
15
16/// The four datagram types SWIM exchanges over the wire.
17#[derive(
18    Debug,
19    Clone,
20    PartialEq,
21    Eq,
22    Serialize,
23    Deserialize,
24    zerompk::ToMessagePack,
25    zerompk::FromMessagePack,
26)]
27pub enum SwimMessage {
28    Ping(Ping),
29    PingReq(PingReq),
30    Ack(Ack),
31    Nack(Nack),
32}
33
34impl SwimMessage {
35    /// Mutable borrow of the piggyback slot, independent of variant.
36    /// Used by the dissemination queue to stamp outgoing deltas
37    /// without caring which message type it is stamping onto.
38    pub fn piggyback_mut(&mut self) -> &mut Vec<MemberUpdate> {
39        match self {
40            SwimMessage::Ping(m) => &mut m.piggyback,
41            SwimMessage::PingReq(m) => &mut m.piggyback,
42            SwimMessage::Ack(m) => &mut m.piggyback,
43            SwimMessage::Nack(m) => &mut m.piggyback,
44        }
45    }
46
47    /// Read-only borrow of the piggyback slot.
48    pub fn piggyback(&self) -> &[MemberUpdate] {
49        match self {
50            SwimMessage::Ping(m) => &m.piggyback,
51            SwimMessage::PingReq(m) => &m.piggyback,
52            SwimMessage::Ack(m) => &m.piggyback,
53            SwimMessage::Nack(m) => &m.piggyback,
54        }
55    }
56
57    /// Drop piggyback entries beyond `max`. Used before encoding to keep
58    /// a datagram below the UDP MTU — the dissemination queue will
59    /// decide which updates are highest-priority; this helper just
60    /// enforces the upper bound.
61    pub fn truncate_piggyback(&mut self, max: usize) {
62        let slot = self.piggyback_mut();
63        if slot.len() > max {
64            slot.truncate(max);
65        }
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::super::probe::{NackReason, ProbeId};
72    use super::*;
73    use crate::swim::incarnation::Incarnation;
74    use crate::swim::member::MemberState;
75    use nodedb_types::NodeId;
76    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
77
78    fn mk_update(id: &str) -> MemberUpdate {
79        MemberUpdate {
80            node_id: NodeId::try_new(id).expect("test fixture"),
81            addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7000).to_string(),
82            state: MemberState::Alive,
83            incarnation: Incarnation::ZERO,
84        }
85    }
86
87    fn ping_with_piggyback(n: usize) -> SwimMessage {
88        SwimMessage::Ping(Ping {
89            probe_id: ProbeId::new(1),
90            from: NodeId::try_new("a").expect("test fixture"),
91            incarnation: Incarnation::new(2),
92            piggyback: (0..n).map(|i| mk_update(&format!("n{i}"))).collect(),
93        })
94    }
95
96    #[test]
97    fn piggyback_accessor_returns_variant_slot() {
98        let msg = ping_with_piggyback(3);
99        assert_eq!(msg.piggyback().len(), 3);
100    }
101
102    #[test]
103    fn truncate_bounds_piggyback() {
104        let mut msg = ping_with_piggyback(10);
105        msg.truncate_piggyback(4);
106        assert_eq!(msg.piggyback().len(), 4);
107    }
108
109    #[test]
110    fn truncate_is_noop_when_under_limit() {
111        let mut msg = ping_with_piggyback(2);
112        msg.truncate_piggyback(16);
113        assert_eq!(msg.piggyback().len(), 2);
114    }
115
116    #[test]
117    fn piggyback_mut_accessor_for_every_variant() {
118        let mut variants: Vec<SwimMessage> = vec![
119            ping_with_piggyback(0),
120            SwimMessage::PingReq(PingReq {
121                probe_id: ProbeId::ZERO,
122                from: NodeId::try_new("a").expect("test fixture"),
123                target: NodeId::try_new("b").expect("test fixture"),
124                target_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7001).to_string(),
125                piggyback: vec![],
126            }),
127            SwimMessage::Ack(Ack {
128                probe_id: ProbeId::ZERO,
129                from: NodeId::try_new("b").expect("test fixture"),
130                incarnation: Incarnation::ZERO,
131                piggyback: vec![],
132            }),
133            SwimMessage::Nack(Nack {
134                probe_id: ProbeId::ZERO,
135                from: NodeId::try_new("c").expect("test fixture"),
136                reason: NackReason::TargetUnreachable,
137                piggyback: vec![],
138            }),
139        ];
140        for m in &mut variants {
141            m.piggyback_mut().push(mk_update("extra"));
142            assert_eq!(m.piggyback().len(), 1);
143        }
144    }
145}