Skip to main content

nodedb_cluster/swim/wire/
codec.rs

1//! zerompk (MessagePack) codec for [`SwimMessage`].
2//!
3//! Thin wrapper over `zerompk::to_msgpack_vec` / `zerompk::from_msgpack`
4//! that maps codec errors into the typed [`SwimError`] so the failure
5//! detector never sees raw zerompk errors.
6//!
7//! The encode path is infallible in practice — `SwimMessage` is composed
8//! entirely of types with well-defined MessagePack representations — but
9//! the return type stays fallible so a future addition of a fallible
10//! field cannot silently panic.
11
12use super::message::SwimMessage;
13use crate::swim::error::SwimError;
14
15/// Serialize a `SwimMessage` into a zerompk byte buffer.
16pub fn encode(msg: &SwimMessage) -> Result<Vec<u8>, SwimError> {
17    zerompk::to_msgpack_vec(msg).map_err(|e| SwimError::Encode {
18        detail: e.to_string(),
19    })
20}
21
22/// Decode a zerompk byte buffer into a `SwimMessage`. Truncated or
23/// malformed input returns [`SwimError::Decode`] rather than panicking.
24pub fn decode(bytes: &[u8]) -> Result<SwimMessage, SwimError> {
25    zerompk::from_msgpack(bytes).map_err(|e| SwimError::Decode {
26        detail: e.to_string(),
27    })
28}
29
30#[cfg(test)]
31mod tests {
32    use super::super::probe::{Ack, Nack, NackReason, Ping, PingReq, ProbeId};
33    use super::*;
34    use crate::swim::incarnation::Incarnation;
35    use crate::swim::member::MemberState;
36    use crate::swim::member::record::MemberUpdate;
37    use nodedb_types::NodeId;
38    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
39
40    fn addr(port: u16) -> SocketAddr {
41        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
42    }
43
44    fn update(id: &str, port: u16) -> MemberUpdate {
45        MemberUpdate {
46            node_id: NodeId::new(id),
47            addr: addr(port).to_string(),
48            state: MemberState::Alive,
49            incarnation: Incarnation::new(1),
50        }
51    }
52
53    fn assert_roundtrip(msg: SwimMessage) {
54        let bytes = encode(&msg).expect("encode");
55        let decoded = decode(&bytes).expect("decode");
56        assert_eq!(decoded, msg);
57    }
58
59    #[test]
60    fn ping_roundtrip_empty_piggyback() {
61        assert_roundtrip(SwimMessage::Ping(Ping {
62            probe_id: ProbeId::new(5),
63            from: NodeId::new("a"),
64            incarnation: Incarnation::new(3),
65            piggyback: vec![],
66        }));
67    }
68
69    #[test]
70    fn ping_roundtrip_with_piggyback() {
71        assert_roundtrip(SwimMessage::Ping(Ping {
72            probe_id: ProbeId::new(12),
73            from: NodeId::new("sender"),
74            incarnation: Incarnation::new(7),
75            piggyback: vec![update("n1", 7001), update("n2", 7002)],
76        }));
77    }
78
79    #[test]
80    fn ping_req_roundtrip() {
81        assert_roundtrip(SwimMessage::PingReq(PingReq {
82            probe_id: ProbeId::new(9),
83            from: NodeId::new("a"),
84            target: NodeId::new("b"),
85            target_addr: addr(7003).to_string(),
86            piggyback: vec![update("helper", 7004)],
87        }));
88    }
89
90    #[test]
91    fn ack_roundtrip() {
92        assert_roundtrip(SwimMessage::Ack(Ack {
93            probe_id: ProbeId::new(1),
94            from: NodeId::new("b"),
95            incarnation: Incarnation::new(11),
96            piggyback: vec![],
97        }));
98    }
99
100    #[test]
101    fn nack_roundtrip_every_reason() {
102        for reason in [
103            NackReason::TargetUnreachable,
104            NackReason::TargetDead,
105            NackReason::RateLimited,
106        ] {
107            assert_roundtrip(SwimMessage::Nack(Nack {
108                probe_id: ProbeId::new(2),
109                from: NodeId::new("c"),
110                reason,
111                piggyback: vec![],
112            }));
113        }
114    }
115
116    #[test]
117    fn decode_rejects_garbage() {
118        let garbage = [0xff_u8; 8];
119        assert!(matches!(decode(&garbage), Err(SwimError::Decode { .. })));
120    }
121
122    #[test]
123    fn decode_rejects_truncated() {
124        let full = encode(&SwimMessage::Ping(Ping {
125            probe_id: ProbeId::new(1),
126            from: NodeId::new("a"),
127            incarnation: Incarnation::ZERO,
128            piggyback: vec![],
129        }))
130        .expect("encode");
131        let truncated = &full[..full.len() / 2];
132        assert!(matches!(decode(truncated), Err(SwimError::Decode { .. })));
133    }
134
135    #[test]
136    fn wire_tag_stability_ping() {
137        // zerompk encodes SwimMessage as [VariantName, payload]. Lock the
138        // PascalCase variant name so a rename breaks this test loudly.
139        let msg = SwimMessage::Ping(Ping {
140            probe_id: ProbeId::new(1),
141            from: NodeId::new("a"),
142            incarnation: Incarnation::ZERO,
143            piggyback: vec![],
144        });
145        let bytes = encode(&msg).expect("encode");
146        let as_str = String::from_utf8_lossy(&bytes);
147        assert!(
148            as_str.contains("Ping"),
149            "wire tag 'Ping' missing from encoded bytes: {bytes:?}"
150        );
151    }
152
153    #[test]
154    fn wire_tag_distinguishes_variants() {
155        // Locks in that the four variants encode to disjoint tag strings.
156        // We can't substring-match "ack" because msgpack length-prefixes
157        // short strings with bytes that can appear inside other fields;
158        // instead we verify that the Ack encoding does NOT contain the
159        // Ping tag (and vice versa), which is the property we actually
160        // care about for wire compatibility.
161        let ack = SwimMessage::Ack(Ack {
162            probe_id: ProbeId::new(1),
163            from: NodeId::new("sender"),
164            incarnation: Incarnation::ZERO,
165            piggyback: vec![],
166        });
167        let ping = SwimMessage::Ping(Ping {
168            probe_id: ProbeId::new(1),
169            from: NodeId::new("sender"),
170            incarnation: Incarnation::ZERO,
171            piggyback: vec![],
172        });
173        let ack_bytes = encode(&ack).expect("encode");
174        let ping_bytes = encode(&ping).expect("encode");
175        assert_ne!(
176            ack_bytes, ping_bytes,
177            "ack and ping must encode to different bytes"
178        );
179        // Round-trip type stability: decoded variants match the input.
180        assert!(matches!(decode(&ack_bytes), Ok(SwimMessage::Ack(_))));
181        assert!(matches!(decode(&ping_bytes), Ok(SwimMessage::Ping(_))));
182    }
183
184    #[test]
185    fn wire_tag_stability_ping_req() {
186        let msg = SwimMessage::PingReq(PingReq {
187            probe_id: ProbeId::new(1),
188            from: NodeId::new("a"),
189            target: NodeId::new("b"),
190            target_addr: addr(7000).to_string(),
191            piggyback: vec![],
192        });
193        let bytes = encode(&msg).expect("encode");
194        let as_str = String::from_utf8_lossy(&bytes);
195        assert!(
196            as_str.contains("PingReq"),
197            "expected 'PingReq' variant name, got: {as_str:?}"
198        );
199    }
200}