nodedb_cluster/swim/wire/
codec.rs1use super::message::SwimMessage;
13use crate::swim::error::SwimError;
14
15pub fn encode(msg: &SwimMessage) -> Result<Vec<u8>, SwimError> {
17 zerompk::to_msgpack_vec(msg).map_err(|e| SwimError::Encode {
18 detail: e.to_string(),
19 })
20}
21
22pub fn decode(bytes: &[u8]) -> Result<SwimMessage, SwimError> {
25 zerompk::from_msgpack(bytes).map_err(|e| SwimError::Decode {
26 detail: e.to_string(),
27 })
28}
29
30#[cfg(test)]
31mod tests {
32 use super::super::probe::{Ack, Nack, NackReason, Ping, PingReq, ProbeId};
33 use super::*;
34 use crate::swim::incarnation::Incarnation;
35 use crate::swim::member::MemberState;
36 use crate::swim::member::record::MemberUpdate;
37 use nodedb_types::NodeId;
38 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
39
40 fn addr(port: u16) -> SocketAddr {
41 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
42 }
43
44 fn update(id: &str, port: u16) -> MemberUpdate {
45 MemberUpdate {
46 node_id: NodeId::new(id),
47 addr: addr(port).to_string(),
48 state: MemberState::Alive,
49 incarnation: Incarnation::new(1),
50 }
51 }
52
53 fn assert_roundtrip(msg: SwimMessage) {
54 let bytes = encode(&msg).expect("encode");
55 let decoded = decode(&bytes).expect("decode");
56 assert_eq!(decoded, msg);
57 }
58
59 #[test]
60 fn ping_roundtrip_empty_piggyback() {
61 assert_roundtrip(SwimMessage::Ping(Ping {
62 probe_id: ProbeId::new(5),
63 from: NodeId::new("a"),
64 incarnation: Incarnation::new(3),
65 piggyback: vec![],
66 }));
67 }
68
69 #[test]
70 fn ping_roundtrip_with_piggyback() {
71 assert_roundtrip(SwimMessage::Ping(Ping {
72 probe_id: ProbeId::new(12),
73 from: NodeId::new("sender"),
74 incarnation: Incarnation::new(7),
75 piggyback: vec![update("n1", 7001), update("n2", 7002)],
76 }));
77 }
78
79 #[test]
80 fn ping_req_roundtrip() {
81 assert_roundtrip(SwimMessage::PingReq(PingReq {
82 probe_id: ProbeId::new(9),
83 from: NodeId::new("a"),
84 target: NodeId::new("b"),
85 target_addr: addr(7003).to_string(),
86 piggyback: vec![update("helper", 7004)],
87 }));
88 }
89
90 #[test]
91 fn ack_roundtrip() {
92 assert_roundtrip(SwimMessage::Ack(Ack {
93 probe_id: ProbeId::new(1),
94 from: NodeId::new("b"),
95 incarnation: Incarnation::new(11),
96 piggyback: vec![],
97 }));
98 }
99
100 #[test]
101 fn nack_roundtrip_every_reason() {
102 for reason in [
103 NackReason::TargetUnreachable,
104 NackReason::TargetDead,
105 NackReason::RateLimited,
106 ] {
107 assert_roundtrip(SwimMessage::Nack(Nack {
108 probe_id: ProbeId::new(2),
109 from: NodeId::new("c"),
110 reason,
111 piggyback: vec![],
112 }));
113 }
114 }
115
116 #[test]
117 fn decode_rejects_garbage() {
118 let garbage = [0xff_u8; 8];
119 assert!(matches!(decode(&garbage), Err(SwimError::Decode { .. })));
120 }
121
122 #[test]
123 fn decode_rejects_truncated() {
124 let full = encode(&SwimMessage::Ping(Ping {
125 probe_id: ProbeId::new(1),
126 from: NodeId::new("a"),
127 incarnation: Incarnation::ZERO,
128 piggyback: vec![],
129 }))
130 .expect("encode");
131 let truncated = &full[..full.len() / 2];
132 assert!(matches!(decode(truncated), Err(SwimError::Decode { .. })));
133 }
134
135 #[test]
136 fn wire_tag_stability_ping() {
137 let msg = SwimMessage::Ping(Ping {
140 probe_id: ProbeId::new(1),
141 from: NodeId::new("a"),
142 incarnation: Incarnation::ZERO,
143 piggyback: vec![],
144 });
145 let bytes = encode(&msg).expect("encode");
146 let as_str = String::from_utf8_lossy(&bytes);
147 assert!(
148 as_str.contains("Ping"),
149 "wire tag 'Ping' missing from encoded bytes: {bytes:?}"
150 );
151 }
152
153 #[test]
154 fn wire_tag_distinguishes_variants() {
155 let ack = SwimMessage::Ack(Ack {
162 probe_id: ProbeId::new(1),
163 from: NodeId::new("sender"),
164 incarnation: Incarnation::ZERO,
165 piggyback: vec![],
166 });
167 let ping = SwimMessage::Ping(Ping {
168 probe_id: ProbeId::new(1),
169 from: NodeId::new("sender"),
170 incarnation: Incarnation::ZERO,
171 piggyback: vec![],
172 });
173 let ack_bytes = encode(&ack).expect("encode");
174 let ping_bytes = encode(&ping).expect("encode");
175 assert_ne!(
176 ack_bytes, ping_bytes,
177 "ack and ping must encode to different bytes"
178 );
179 assert!(matches!(decode(&ack_bytes), Ok(SwimMessage::Ack(_))));
181 assert!(matches!(decode(&ping_bytes), Ok(SwimMessage::Ping(_))));
182 }
183
184 #[test]
185 fn wire_tag_stability_ping_req() {
186 let msg = SwimMessage::PingReq(PingReq {
187 probe_id: ProbeId::new(1),
188 from: NodeId::new("a"),
189 target: NodeId::new("b"),
190 target_addr: addr(7000).to_string(),
191 piggyback: vec![],
192 });
193 let bytes = encode(&msg).expect("encode");
194 let as_str = String::from_utf8_lossy(&bytes);
195 assert!(
196 as_str.contains("PingReq"),
197 "expected 'PingReq' variant name, got: {as_str:?}"
198 );
199 }
200}