Skip to main content

nodedb_cluster/swim/wire/
codec.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! zerompk (MessagePack) codec for [`SwimMessage`].
4//!
5//! Thin wrapper over `zerompk::to_msgpack_vec` / `zerompk::from_msgpack`
6//! that maps codec errors into the typed [`SwimError`] so the failure
7//! detector never sees raw zerompk errors.
8//!
9//! The encode path is infallible in practice — `SwimMessage` is composed
10//! entirely of types with well-defined MessagePack representations — but
11//! the return type stays fallible so a future addition of a fallible
12//! field cannot silently panic.
13
14use super::message::SwimMessage;
15use crate::swim::error::SwimError;
16
17/// Serialize a `SwimMessage` into a zerompk byte buffer.
18pub fn encode(msg: &SwimMessage) -> Result<Vec<u8>, SwimError> {
19    zerompk::to_msgpack_vec(msg).map_err(|e| SwimError::Encode {
20        detail: e.to_string(),
21    })
22}
23
24/// Decode a zerompk byte buffer into a `SwimMessage`. Truncated or
25/// malformed input returns [`SwimError::Decode`] rather than panicking.
26pub fn decode(bytes: &[u8]) -> Result<SwimMessage, SwimError> {
27    zerompk::from_msgpack(bytes).map_err(|e| SwimError::Decode {
28        detail: e.to_string(),
29    })
30}
31
32#[cfg(test)]
33mod tests {
34    use super::super::probe::{Ack, Nack, NackReason, Ping, PingReq, ProbeId};
35    use super::*;
36    use crate::swim::incarnation::Incarnation;
37    use crate::swim::member::MemberState;
38    use crate::swim::member::record::MemberUpdate;
39    use nodedb_types::NodeId;
40    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
41
42    fn addr(port: u16) -> SocketAddr {
43        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
44    }
45
46    fn update(id: &str, port: u16) -> MemberUpdate {
47        MemberUpdate {
48            node_id: NodeId::try_new(id).expect("test fixture"),
49            addr: addr(port).to_string(),
50            state: MemberState::Alive,
51            incarnation: Incarnation::new(1),
52        }
53    }
54
55    fn assert_roundtrip(msg: SwimMessage) {
56        let bytes = encode(&msg).expect("encode");
57        let decoded = decode(&bytes).expect("decode");
58        assert_eq!(decoded, msg);
59    }
60
61    #[test]
62    fn ping_roundtrip_empty_piggyback() {
63        assert_roundtrip(SwimMessage::Ping(Ping {
64            probe_id: ProbeId::new(5),
65            from: NodeId::try_new("a").expect("test fixture"),
66            incarnation: Incarnation::new(3),
67            piggyback: vec![],
68        }));
69    }
70
71    #[test]
72    fn ping_roundtrip_with_piggyback() {
73        assert_roundtrip(SwimMessage::Ping(Ping {
74            probe_id: ProbeId::new(12),
75            from: NodeId::try_new("sender").expect("test fixture"),
76            incarnation: Incarnation::new(7),
77            piggyback: vec![update("n1", 7001), update("n2", 7002)],
78        }));
79    }
80
81    #[test]
82    fn ping_req_roundtrip() {
83        assert_roundtrip(SwimMessage::PingReq(PingReq {
84            probe_id: ProbeId::new(9),
85            from: NodeId::try_new("a").expect("test fixture"),
86            target: NodeId::try_new("b").expect("test fixture"),
87            target_addr: addr(7003).to_string(),
88            piggyback: vec![update("helper", 7004)],
89        }));
90    }
91
92    #[test]
93    fn ack_roundtrip() {
94        assert_roundtrip(SwimMessage::Ack(Ack {
95            probe_id: ProbeId::new(1),
96            from: NodeId::try_new("b").expect("test fixture"),
97            incarnation: Incarnation::new(11),
98            piggyback: vec![],
99        }));
100    }
101
102    #[test]
103    fn nack_roundtrip_every_reason() {
104        for reason in [
105            NackReason::TargetUnreachable,
106            NackReason::TargetDead,
107            NackReason::RateLimited,
108        ] {
109            assert_roundtrip(SwimMessage::Nack(Nack {
110                probe_id: ProbeId::new(2),
111                from: NodeId::try_new("c").expect("test fixture"),
112                reason,
113                piggyback: vec![],
114            }));
115        }
116    }
117
118    #[test]
119    fn decode_rejects_garbage() {
120        let garbage = [0xff_u8; 8];
121        assert!(matches!(decode(&garbage), Err(SwimError::Decode { .. })));
122    }
123
124    #[test]
125    fn decode_rejects_truncated() {
126        let full = encode(&SwimMessage::Ping(Ping {
127            probe_id: ProbeId::new(1),
128            from: NodeId::try_new("a").expect("test fixture"),
129            incarnation: Incarnation::ZERO,
130            piggyback: vec![],
131        }))
132        .expect("encode");
133        let truncated = &full[..full.len() / 2];
134        assert!(matches!(decode(truncated), Err(SwimError::Decode { .. })));
135    }
136
137    #[test]
138    fn wire_tag_stability_ping() {
139        // zerompk encodes SwimMessage as [VariantName, payload]. Lock the
140        // PascalCase variant name so a rename breaks this test loudly.
141        let msg = SwimMessage::Ping(Ping {
142            probe_id: ProbeId::new(1),
143            from: NodeId::try_new("a").expect("test fixture"),
144            incarnation: Incarnation::ZERO,
145            piggyback: vec![],
146        });
147        let bytes = encode(&msg).expect("encode");
148        let as_str = String::from_utf8_lossy(&bytes);
149        assert!(
150            as_str.contains("Ping"),
151            "wire tag 'Ping' missing from encoded bytes: {bytes:?}"
152        );
153    }
154
155    #[test]
156    fn wire_tag_distinguishes_variants() {
157        // Locks in that the four variants encode to disjoint tag strings.
158        // We can't substring-match "ack" because msgpack length-prefixes
159        // short strings with bytes that can appear inside other fields;
160        // instead we verify that the Ack encoding does NOT contain the
161        // Ping tag (and vice versa), which is the property we actually
162        // care about for wire compatibility.
163        let ack = SwimMessage::Ack(Ack {
164            probe_id: ProbeId::new(1),
165            from: NodeId::try_new("sender").expect("test fixture"),
166            incarnation: Incarnation::ZERO,
167            piggyback: vec![],
168        });
169        let ping = SwimMessage::Ping(Ping {
170            probe_id: ProbeId::new(1),
171            from: NodeId::try_new("sender").expect("test fixture"),
172            incarnation: Incarnation::ZERO,
173            piggyback: vec![],
174        });
175        let ack_bytes = encode(&ack).expect("encode");
176        let ping_bytes = encode(&ping).expect("encode");
177        assert_ne!(
178            ack_bytes, ping_bytes,
179            "ack and ping must encode to different bytes"
180        );
181        // Round-trip type stability: decoded variants match the input.
182        assert!(matches!(decode(&ack_bytes), Ok(SwimMessage::Ack(_))));
183        assert!(matches!(decode(&ping_bytes), Ok(SwimMessage::Ping(_))));
184    }
185
186    #[test]
187    fn wire_tag_stability_ping_req() {
188        let msg = SwimMessage::PingReq(PingReq {
189            probe_id: ProbeId::new(1),
190            from: NodeId::try_new("a").expect("test fixture"),
191            target: NodeId::try_new("b").expect("test fixture"),
192            target_addr: addr(7000).to_string(),
193            piggyback: vec![],
194        });
195        let bytes = encode(&msg).expect("encode");
196        let as_str = String::from_utf8_lossy(&bytes);
197        assert!(
198            as_str.contains("PingReq"),
199            "expected 'PingReq' variant name, got: {as_str:?}"
200        );
201    }
202}