nodedb_cluster/wire_version/
envelope.rs1use super::error::WireVersionError;
24use super::types::WireVersion;
25
26const ENVELOPE_MARKER: u8 = 0xc1;
29
30const ENVELOPE_HEADER_LEN: usize = 1 + 2 + 4;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct Versioned<T> {
37 pub version: WireVersion,
38 pub inner: T,
39}
40
41pub fn encode_versioned<T>(value: &T) -> Result<Vec<u8>, WireVersionError>
43where
44 T: zerompk::ToMessagePack,
45{
46 let inner_bytes = zerompk::to_msgpack_vec(value)
47 .map_err(|e| WireVersionError::DecodeFailure(format!("encode inner: {e}")))?;
48
49 encode_envelope(WireVersion::CURRENT.0, &inner_bytes)
50}
51
52pub fn decode_versioned<T>(bytes: &[u8]) -> Result<T, WireVersionError>
60where
61 T: zerompk::FromMessagePackOwned,
62{
63 match parse_envelope(bytes)? {
64 Some((version_raw, inner_bytes)) => {
65 if version_raw == 0 {
67 return Err(WireVersionError::DecodeFailure(
68 "v2 envelope with version 0 is invalid".to_string(),
69 ));
70 }
71 let peer_version = WireVersion(version_raw);
72 if peer_version > WireVersion::CURRENT {
73 return Err(WireVersionError::UnsupportedVersion {
74 peer_version,
75 supported_min: WireVersion::CURRENT,
76 supported_max: WireVersion::CURRENT,
77 });
78 }
79 zerompk::from_msgpack(inner_bytes).map_err(|e| {
80 WireVersionError::DecodeFailure(format!("decode inner (v{peer_version}): {e}"))
81 })
82 }
83 None => Err(WireVersionError::DecodeFailure(
84 "missing envelope marker: raw v1 frames are not accepted".to_string(),
85 )),
86 }
87}
88
89pub fn wrap_bytes_versioned(inner: &[u8]) -> Result<Vec<u8>, WireVersionError> {
91 encode_envelope(WireVersion::CURRENT.0, inner)
92}
93
94pub fn unwrap_bytes_versioned(bytes: &[u8]) -> Result<&[u8], WireVersionError> {
99 match parse_envelope(bytes)? {
100 Some((version_raw, inner)) => {
101 if version_raw == 0 {
102 return Err(WireVersionError::DecodeFailure(
103 "v2 envelope with version 0 is invalid".to_string(),
104 ));
105 }
106 let peer_version = WireVersion(version_raw);
107 if peer_version > WireVersion::CURRENT {
108 return Err(WireVersionError::UnsupportedVersion {
109 peer_version,
110 supported_min: WireVersion::CURRENT,
111 supported_max: WireVersion::CURRENT,
112 });
113 }
114 Ok(inner)
115 }
116 None => Err(WireVersionError::DecodeFailure(
117 "missing envelope marker: raw v1 frames are not accepted".to_string(),
118 )),
119 }
120}
121
122fn encode_envelope(version: u16, inner: &[u8]) -> Result<Vec<u8>, WireVersionError> {
126 if inner.len() > u32::MAX as usize {
127 return Err(WireVersionError::DecodeFailure(format!(
128 "inner payload {} bytes exceeds u32 length limit",
129 inner.len()
130 )));
131 }
132 let mut buf = Vec::with_capacity(ENVELOPE_HEADER_LEN + inner.len());
133 buf.push(ENVELOPE_MARKER);
134 buf.extend_from_slice(&version.to_be_bytes());
135 buf.extend_from_slice(&(inner.len() as u32).to_be_bytes());
136 buf.extend_from_slice(inner);
137 Ok(buf)
138}
139
140fn parse_envelope(bytes: &[u8]) -> Result<Option<(u16, &[u8])>, WireVersionError> {
150 if bytes.is_empty() || bytes[0] != ENVELOPE_MARKER {
151 return Ok(None);
152 }
153 if bytes.len() < ENVELOPE_HEADER_LEN {
154 return Err(WireVersionError::DecodeFailure(format!(
155 "v2 envelope truncated: header needs {} bytes, got {}",
156 ENVELOPE_HEADER_LEN,
157 bytes.len()
158 )));
159 }
160 let version = u16::from_be_bytes([bytes[1], bytes[2]]);
161 let inner_len = u32::from_be_bytes([bytes[3], bytes[4], bytes[5], bytes[6]]) as usize;
162 let inner_start = ENVELOPE_HEADER_LEN;
163 let inner_end = inner_start.checked_add(inner_len).ok_or_else(|| {
164 WireVersionError::DecodeFailure("v2 envelope inner length overflows usize".to_string())
165 })?;
166 if inner_end > bytes.len() {
167 return Err(WireVersionError::DecodeFailure(format!(
168 "v2 envelope truncated: declared inner_len={inner_len}, available={}",
169 bytes.len() - inner_start
170 )));
171 }
172 Ok(Some((version, &bytes[inner_start..inner_end])))
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[derive(
180 Debug,
181 Clone,
182 PartialEq,
183 Eq,
184 serde::Serialize,
185 serde::Deserialize,
186 zerompk::ToMessagePack,
187 zerompk::FromMessagePack,
188 )]
189 struct Payload {
190 value: u32,
191 label: String,
192 }
193
194 #[test]
195 fn v2_roundtrip() {
196 let orig = Payload {
197 value: 42,
198 label: "hello".to_string(),
199 };
200 let bytes = encode_versioned(&orig).unwrap();
201 assert_eq!(bytes[0], ENVELOPE_MARKER);
202 let decoded: Payload = decode_versioned(&bytes).unwrap();
203 assert_eq!(orig, decoded);
204 }
205
206 #[test]
207 fn decode_versioned_rejects_raw_no_marker() {
208 let orig = Payload {
209 value: 7,
210 label: "raw".to_string(),
211 };
212 let raw_bytes = zerompk::to_msgpack_vec(&orig).unwrap();
213 assert_ne!(raw_bytes[0], ENVELOPE_MARKER);
214
215 let err = decode_versioned::<Payload>(&raw_bytes).unwrap_err();
216 match err {
217 WireVersionError::DecodeFailure(msg) => {
218 assert!(
219 msg.contains("missing envelope marker"),
220 "unexpected message: {msg}"
221 );
222 }
223 other => panic!("expected DecodeFailure, got: {other}"),
224 }
225 }
226
227 #[test]
228 fn unknown_future_version_returns_unsupported_version() {
229 let fake_inner = zerompk::to_msgpack_vec(&Payload {
230 value: 0,
231 label: String::new(),
232 })
233 .unwrap();
234 let bytes = encode_envelope(9999, &fake_inner).unwrap();
235
236 let err = decode_versioned::<Payload>(&bytes).unwrap_err();
237 match err {
238 WireVersionError::UnsupportedVersion { peer_version, .. } => {
239 assert_eq!(peer_version, WireVersion(9999));
240 }
241 other => panic!("expected UnsupportedVersion, got: {other}"),
242 }
243 }
244
245 #[test]
246 fn unknown_future_version_does_not_silently_succeed() {
247 let inner = zerompk::to_msgpack_vec(&Payload {
248 value: 1,
249 label: "x".to_string(),
250 })
251 .unwrap();
252 let bytes = encode_envelope(65535, &inner).unwrap();
253 let err = decode_versioned::<Payload>(&bytes).unwrap_err();
254 assert!(
255 matches!(err, WireVersionError::UnsupportedVersion { .. }),
256 "must error on future version, not silently succeed: {err}"
257 );
258 }
259
260 #[test]
261 fn truncated_envelope_header_is_loud_error() {
262 let bytes = vec![ENVELOPE_MARKER, 0x00, 0x02];
266 let err = decode_versioned::<Payload>(&bytes).unwrap_err();
267 match err {
268 WireVersionError::DecodeFailure(msg) => assert!(msg.contains("truncated")),
269 other => panic!("expected DecodeFailure(truncated), got {other}"),
270 }
271 }
272
273 #[test]
274 fn envelope_with_version_zero_is_loud_error() {
275 let inner = zerompk::to_msgpack_vec(&Payload {
276 value: 0,
277 label: String::new(),
278 })
279 .unwrap();
280 let bytes = encode_envelope(0, &inner).unwrap();
281 let err = decode_versioned::<Payload>(&bytes).unwrap_err();
282 assert!(
283 matches!(err, WireVersionError::DecodeFailure(_)),
284 "version=0 must be a loud error, got: {err}"
285 );
286 }
287}