Skip to main content

nodedb_cluster/wire_version/
envelope.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Versioned wire envelope for cluster messages.
4//!
5//! # On-wire layout (v2 envelope)
6//!
7//! ```text
8//! [0xc1] [version u16 BE] [inner_len u32 BE] [inner bytes]
9//! ```
10//!
11//! `0xc1` is the MessagePack "reserved / never-used" marker — neither
12//! `zerompk` nor the upstream `rmp` family emit it. Any byte slice
13//! starting with `0xc1` is therefore guaranteed to be one of our
14//! versioned envelopes and never a raw `zerompk`-encoded `T`.
15//!
16//! Bytes not starting with `0xc1` are rejected — raw v1 frames are
17//! not accepted.
18//!
19//! An envelope with `version > WireVersion::CURRENT.0` is rejected
20//! with [`WireVersionError::UnsupportedVersion`] and is never silently
21//! misdecoded.
22
23use super::error::WireVersionError;
24use super::types::WireVersion;
25
26/// MessagePack reserved marker. Never emitted by valid msgpack
27/// encoders; used here as the unambiguous start byte of a v2 envelope.
28const ENVELOPE_MARKER: u8 = 0xc1;
29
30/// Length of the fixed envelope header (marker + version + inner_len).
31const ENVELOPE_HEADER_LEN: usize = 1 + 2 + 4;
32
33/// A versioned wrapper. Holds the version that was decoded alongside
34/// the inner value.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct Versioned<T> {
37    pub version: WireVersion,
38    pub inner: T,
39}
40
41/// Encode `value` into a v2 versioned envelope.
42pub fn encode_versioned<T>(value: &T) -> Result<Vec<u8>, WireVersionError>
43where
44    T: zerompk::ToMessagePack,
45{
46    let inner_bytes = zerompk::to_msgpack_vec(value)
47        .map_err(|e| WireVersionError::DecodeFailure(format!("encode inner: {e}")))?;
48
49    encode_envelope(WireVersion::CURRENT.0, &inner_bytes)
50}
51
52/// Decode a versioned wire message.
53///
54/// Bytes must begin with the reserved [`ENVELOPE_MARKER`] (`0xc1`).
55/// Bytes without the marker are rejected — raw v1 frames are not accepted.
56///
57/// An envelope with `version > WireVersion::CURRENT.0` is rejected
58/// with [`WireVersionError::UnsupportedVersion`].
59pub fn decode_versioned<T>(bytes: &[u8]) -> Result<T, WireVersionError>
60where
61    T: zerompk::FromMessagePackOwned,
62{
63    match parse_envelope(bytes)? {
64        Some((version_raw, inner_bytes)) => {
65            // Version 0 inside the envelope is malformed — reject loudly.
66            if version_raw == 0 {
67                return Err(WireVersionError::DecodeFailure(
68                    "v2 envelope with version 0 is invalid".to_string(),
69                ));
70            }
71            let peer_version = WireVersion(version_raw);
72            if peer_version > WireVersion::CURRENT {
73                return Err(WireVersionError::UnsupportedVersion {
74                    peer_version,
75                    supported_min: WireVersion::CURRENT,
76                    supported_max: WireVersion::CURRENT,
77                });
78            }
79            zerompk::from_msgpack(inner_bytes).map_err(|e| {
80                WireVersionError::DecodeFailure(format!("decode inner (v{peer_version}): {e}"))
81            })
82        }
83        None => Err(WireVersionError::DecodeFailure(
84            "missing envelope marker: raw v1 frames are not accepted".to_string(),
85        )),
86    }
87}
88
89/// Wrap arbitrary pre-encoded bytes in a v2 versioned envelope.
90pub fn wrap_bytes_versioned(inner: &[u8]) -> Result<Vec<u8>, WireVersionError> {
91    encode_envelope(WireVersion::CURRENT.0, inner)
92}
93
94/// Unwrap a versioned envelope and return the inner bytes.
95///
96/// Bytes must begin with the reserved [`ENVELOPE_MARKER`] (`0xc1`).
97/// Missing marker, version 0, or `version > CURRENT` are all errors.
98pub fn unwrap_bytes_versioned(bytes: &[u8]) -> Result<&[u8], WireVersionError> {
99    match parse_envelope(bytes)? {
100        Some((version_raw, inner)) => {
101            if version_raw == 0 {
102                return Err(WireVersionError::DecodeFailure(
103                    "v2 envelope with version 0 is invalid".to_string(),
104                ));
105            }
106            let peer_version = WireVersion(version_raw);
107            if peer_version > WireVersion::CURRENT {
108                return Err(WireVersionError::UnsupportedVersion {
109                    peer_version,
110                    supported_min: WireVersion::CURRENT,
111                    supported_max: WireVersion::CURRENT,
112                });
113            }
114            Ok(inner)
115        }
116        None => Err(WireVersionError::DecodeFailure(
117            "missing envelope marker: raw v1 frames are not accepted".to_string(),
118        )),
119    }
120}
121
122// ── Envelope encoding / parsing helpers ────────────────────────────────────
123
124/// Encode a v2 envelope: `[ENVELOPE_MARKER][version u16 BE][inner_len u32 BE][inner]`.
125fn encode_envelope(version: u16, inner: &[u8]) -> Result<Vec<u8>, WireVersionError> {
126    if inner.len() > u32::MAX as usize {
127        return Err(WireVersionError::DecodeFailure(format!(
128            "inner payload {} bytes exceeds u32 length limit",
129            inner.len()
130        )));
131    }
132    let mut buf = Vec::with_capacity(ENVELOPE_HEADER_LEN + inner.len());
133    buf.push(ENVELOPE_MARKER);
134    buf.extend_from_slice(&version.to_be_bytes());
135    buf.extend_from_slice(&(inner.len() as u32).to_be_bytes());
136    buf.extend_from_slice(inner);
137    Ok(buf)
138}
139
140/// Parse a v2 envelope.
141///
142/// - `Ok(Some((version, inner)))`: well-formed envelope.
143/// - `Ok(None)`: bytes do not begin with [`ENVELOPE_MARKER`] (treat as
144///   v1 raw).
145/// - `Err(...)`: bytes begin with [`ENVELOPE_MARKER`] but the header is
146///   truncated or the declared length overruns the buffer. We reject
147///   loudly rather than fall back to v1, because a corrupted v2
148///   envelope must not be silently misinterpreted.
149fn parse_envelope(bytes: &[u8]) -> Result<Option<(u16, &[u8])>, WireVersionError> {
150    if bytes.is_empty() || bytes[0] != ENVELOPE_MARKER {
151        return Ok(None);
152    }
153    if bytes.len() < ENVELOPE_HEADER_LEN {
154        return Err(WireVersionError::DecodeFailure(format!(
155            "v2 envelope truncated: header needs {} bytes, got {}",
156            ENVELOPE_HEADER_LEN,
157            bytes.len()
158        )));
159    }
160    let version = u16::from_be_bytes([bytes[1], bytes[2]]);
161    let inner_len = u32::from_be_bytes([bytes[3], bytes[4], bytes[5], bytes[6]]) as usize;
162    let inner_start = ENVELOPE_HEADER_LEN;
163    let inner_end = inner_start.checked_add(inner_len).ok_or_else(|| {
164        WireVersionError::DecodeFailure("v2 envelope inner length overflows usize".to_string())
165    })?;
166    if inner_end > bytes.len() {
167        return Err(WireVersionError::DecodeFailure(format!(
168            "v2 envelope truncated: declared inner_len={inner_len}, available={}",
169            bytes.len() - inner_start
170        )));
171    }
172    Ok(Some((version, &bytes[inner_start..inner_end])))
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[derive(
180        Debug,
181        Clone,
182        PartialEq,
183        Eq,
184        serde::Serialize,
185        serde::Deserialize,
186        zerompk::ToMessagePack,
187        zerompk::FromMessagePack,
188    )]
189    struct Payload {
190        value: u32,
191        label: String,
192    }
193
194    #[test]
195    fn v2_roundtrip() {
196        let orig = Payload {
197            value: 42,
198            label: "hello".to_string(),
199        };
200        let bytes = encode_versioned(&orig).unwrap();
201        assert_eq!(bytes[0], ENVELOPE_MARKER);
202        let decoded: Payload = decode_versioned(&bytes).unwrap();
203        assert_eq!(orig, decoded);
204    }
205
206    #[test]
207    fn decode_versioned_rejects_raw_no_marker() {
208        let orig = Payload {
209            value: 7,
210            label: "raw".to_string(),
211        };
212        let raw_bytes = zerompk::to_msgpack_vec(&orig).unwrap();
213        assert_ne!(raw_bytes[0], ENVELOPE_MARKER);
214
215        let err = decode_versioned::<Payload>(&raw_bytes).unwrap_err();
216        match err {
217            WireVersionError::DecodeFailure(msg) => {
218                assert!(
219                    msg.contains("missing envelope marker"),
220                    "unexpected message: {msg}"
221                );
222            }
223            other => panic!("expected DecodeFailure, got: {other}"),
224        }
225    }
226
227    #[test]
228    fn unknown_future_version_returns_unsupported_version() {
229        let fake_inner = zerompk::to_msgpack_vec(&Payload {
230            value: 0,
231            label: String::new(),
232        })
233        .unwrap();
234        let bytes = encode_envelope(9999, &fake_inner).unwrap();
235
236        let err = decode_versioned::<Payload>(&bytes).unwrap_err();
237        match err {
238            WireVersionError::UnsupportedVersion { peer_version, .. } => {
239                assert_eq!(peer_version, WireVersion(9999));
240            }
241            other => panic!("expected UnsupportedVersion, got: {other}"),
242        }
243    }
244
245    #[test]
246    fn unknown_future_version_does_not_silently_succeed() {
247        let inner = zerompk::to_msgpack_vec(&Payload {
248            value: 1,
249            label: "x".to_string(),
250        })
251        .unwrap();
252        let bytes = encode_envelope(65535, &inner).unwrap();
253        let err = decode_versioned::<Payload>(&bytes).unwrap_err();
254        assert!(
255            matches!(err, WireVersionError::UnsupportedVersion { .. }),
256            "must error on future version, not silently succeed: {err}"
257        );
258    }
259
260    #[test]
261    fn truncated_envelope_header_is_loud_error() {
262        // Marker present but header is truncated → must NOT silently fall
263        // back to raw v1 (the bytes also can't decode as T, but the
264        // error must explicitly name the truncated envelope).
265        let bytes = vec![ENVELOPE_MARKER, 0x00, 0x02];
266        let err = decode_versioned::<Payload>(&bytes).unwrap_err();
267        match err {
268            WireVersionError::DecodeFailure(msg) => assert!(msg.contains("truncated")),
269            other => panic!("expected DecodeFailure(truncated), got {other}"),
270        }
271    }
272
273    #[test]
274    fn envelope_with_version_zero_is_loud_error() {
275        let inner = zerompk::to_msgpack_vec(&Payload {
276            value: 0,
277            label: String::new(),
278        })
279        .unwrap();
280        let bytes = encode_envelope(0, &inner).unwrap();
281        let err = decode_versioned::<Payload>(&bytes).unwrap_err();
282        assert!(
283            matches!(err, WireVersionError::DecodeFailure(_)),
284            "version=0 must be a loud error, got: {err}"
285        );
286    }
287}