nodedb_cluster/swim/wire/
codec.rs1use super::message::SwimMessage;
15use crate::swim::error::SwimError;
16
17pub fn encode(msg: &SwimMessage) -> Result<Vec<u8>, SwimError> {
19 zerompk::to_msgpack_vec(msg).map_err(|e| SwimError::Encode {
20 detail: e.to_string(),
21 })
22}
23
24pub fn decode(bytes: &[u8]) -> Result<SwimMessage, SwimError> {
27 zerompk::from_msgpack(bytes).map_err(|e| SwimError::Decode {
28 detail: e.to_string(),
29 })
30}
31
32#[cfg(test)]
33mod tests {
34 use super::super::probe::{Ack, Nack, NackReason, Ping, PingReq, ProbeId};
35 use super::*;
36 use crate::swim::incarnation::Incarnation;
37 use crate::swim::member::MemberState;
38 use crate::swim::member::record::MemberUpdate;
39 use nodedb_types::NodeId;
40 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
41
42 fn addr(port: u16) -> SocketAddr {
43 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
44 }
45
46 fn update(id: &str, port: u16) -> MemberUpdate {
47 MemberUpdate {
48 node_id: NodeId::try_new(id).expect("test fixture"),
49 addr: addr(port).to_string(),
50 state: MemberState::Alive,
51 incarnation: Incarnation::new(1),
52 }
53 }
54
55 fn assert_roundtrip(msg: SwimMessage) {
56 let bytes = encode(&msg).expect("encode");
57 let decoded = decode(&bytes).expect("decode");
58 assert_eq!(decoded, msg);
59 }
60
61 #[test]
62 fn ping_roundtrip_empty_piggyback() {
63 assert_roundtrip(SwimMessage::Ping(Ping {
64 probe_id: ProbeId::new(5),
65 from: NodeId::try_new("a").expect("test fixture"),
66 incarnation: Incarnation::new(3),
67 piggyback: vec![],
68 }));
69 }
70
71 #[test]
72 fn ping_roundtrip_with_piggyback() {
73 assert_roundtrip(SwimMessage::Ping(Ping {
74 probe_id: ProbeId::new(12),
75 from: NodeId::try_new("sender").expect("test fixture"),
76 incarnation: Incarnation::new(7),
77 piggyback: vec![update("n1", 7001), update("n2", 7002)],
78 }));
79 }
80
81 #[test]
82 fn ping_req_roundtrip() {
83 assert_roundtrip(SwimMessage::PingReq(PingReq {
84 probe_id: ProbeId::new(9),
85 from: NodeId::try_new("a").expect("test fixture"),
86 target: NodeId::try_new("b").expect("test fixture"),
87 target_addr: addr(7003).to_string(),
88 piggyback: vec![update("helper", 7004)],
89 }));
90 }
91
92 #[test]
93 fn ack_roundtrip() {
94 assert_roundtrip(SwimMessage::Ack(Ack {
95 probe_id: ProbeId::new(1),
96 from: NodeId::try_new("b").expect("test fixture"),
97 incarnation: Incarnation::new(11),
98 piggyback: vec![],
99 }));
100 }
101
102 #[test]
103 fn nack_roundtrip_every_reason() {
104 for reason in [
105 NackReason::TargetUnreachable,
106 NackReason::TargetDead,
107 NackReason::RateLimited,
108 ] {
109 assert_roundtrip(SwimMessage::Nack(Nack {
110 probe_id: ProbeId::new(2),
111 from: NodeId::try_new("c").expect("test fixture"),
112 reason,
113 piggyback: vec![],
114 }));
115 }
116 }
117
118 #[test]
119 fn decode_rejects_garbage() {
120 let garbage = [0xff_u8; 8];
121 assert!(matches!(decode(&garbage), Err(SwimError::Decode { .. })));
122 }
123
124 #[test]
125 fn decode_rejects_truncated() {
126 let full = encode(&SwimMessage::Ping(Ping {
127 probe_id: ProbeId::new(1),
128 from: NodeId::try_new("a").expect("test fixture"),
129 incarnation: Incarnation::ZERO,
130 piggyback: vec![],
131 }))
132 .expect("encode");
133 let truncated = &full[..full.len() / 2];
134 assert!(matches!(decode(truncated), Err(SwimError::Decode { .. })));
135 }
136
137 #[test]
138 fn wire_tag_stability_ping() {
139 let msg = SwimMessage::Ping(Ping {
142 probe_id: ProbeId::new(1),
143 from: NodeId::try_new("a").expect("test fixture"),
144 incarnation: Incarnation::ZERO,
145 piggyback: vec![],
146 });
147 let bytes = encode(&msg).expect("encode");
148 let as_str = String::from_utf8_lossy(&bytes);
149 assert!(
150 as_str.contains("Ping"),
151 "wire tag 'Ping' missing from encoded bytes: {bytes:?}"
152 );
153 }
154
155 #[test]
156 fn wire_tag_distinguishes_variants() {
157 let ack = SwimMessage::Ack(Ack {
164 probe_id: ProbeId::new(1),
165 from: NodeId::try_new("sender").expect("test fixture"),
166 incarnation: Incarnation::ZERO,
167 piggyback: vec![],
168 });
169 let ping = SwimMessage::Ping(Ping {
170 probe_id: ProbeId::new(1),
171 from: NodeId::try_new("sender").expect("test fixture"),
172 incarnation: Incarnation::ZERO,
173 piggyback: vec![],
174 });
175 let ack_bytes = encode(&ack).expect("encode");
176 let ping_bytes = encode(&ping).expect("encode");
177 assert_ne!(
178 ack_bytes, ping_bytes,
179 "ack and ping must encode to different bytes"
180 );
181 assert!(matches!(decode(&ack_bytes), Ok(SwimMessage::Ack(_))));
183 assert!(matches!(decode(&ping_bytes), Ok(SwimMessage::Ping(_))));
184 }
185
186 #[test]
187 fn wire_tag_stability_ping_req() {
188 let msg = SwimMessage::PingReq(PingReq {
189 probe_id: ProbeId::new(1),
190 from: NodeId::try_new("a").expect("test fixture"),
191 target: NodeId::try_new("b").expect("test fixture"),
192 target_addr: addr(7000).to_string(),
193 piggyback: vec![],
194 });
195 let bytes = encode(&msg).expect("encode");
196 let as_str = String::from_utf8_lossy(&bytes);
197 assert!(
198 as_str.contains("PingReq"),
199 "expected 'PingReq' variant name, got: {as_str:?}"
200 );
201 }
202}