Skip to main content

conduit_core/
codec.rs

1//! Binary frame format and wire encoding traits for conduit.
2//!
3//! Every conduit message is framed with an 11-byte header followed by
4//! a variable-length payload. The [`Encode`] / [`Decode`] traits
5//! provide zero-copy-friendly serialisation for primitive types, byte
6//! vectors, and strings.
7//!
8//! # Frame layout (11 bytes)
9//!
10//! | Offset | Size | Field            | Notes                                  |
11//! |--------|------|------------------|----------------------------------------|
12//! | 0      | 1    | `version`        | Always [`PROTOCOL_VERSION`] (1)        |
13//! | 1      | 1    | `reserved` | 0=protocol (reserved for future use)   |
14//! | 2      | 1    | `msg_type`       | See [`MsgType`]                        |
15//! | 3      | 4    | `sequence`       | LE u32, monotonic counter              |
16//! | 7      | 4    | `payload_len`    | LE u32, byte length of trailing data   |
17
18/// Size of the binary frame header in bytes.
19pub const FRAME_HEADER_SIZE: usize = 11;
20
21/// Current protocol version written into every frame.
22pub const PROTOCOL_VERSION: u8 = 1;
23
24/// Per-frame overhead in the drain wire format: 4 bytes for the u32 LE length prefix.
25pub const DRAIN_FRAME_OVERHEAD: usize = 4;
26
27// ---------------------------------------------------------------------------
28// MsgType
29// ---------------------------------------------------------------------------
30
31/// Message-type tag carried in the frame header.
32///
33/// Known variants cover the core protocol; user-defined types start at `0x10`.
34/// Any `u8` value is accepted on the wire via [`MsgType::Other`].
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum MsgType {
37    /// Client-to-server request (`0x00`).
38    Request,
39    /// Server-to-client response (`0x01`).
40    Response,
41    /// Server push / event (`0x02`).
42    Push,
43    /// Error frame (`0x04`).
44    Error,
45    /// Any other message type (user-defined, `0x10`+).
46    ///
47    /// **Warning:** `Other(v)` where `v` matches a known variant (0x00, 0x01,
48    /// 0x02, 0x04) will NOT roundtrip: `MsgType::from_u8(v)` returns the
49    /// named variant, not `Other(v)`. Use values `>= 0x10` for custom types
50    /// to avoid aliasing.
51    Other(u8),
52}
53
54impl MsgType {
55    /// Convert from the on-wire `u8` representation.
56    #[inline]
57    pub fn from_u8(v: u8) -> Self {
58        match v {
59            0x00 => Self::Request,
60            0x01 => Self::Response,
61            0x02 => Self::Push,
62            0x04 => Self::Error,
63            other => Self::Other(other),
64        }
65    }
66
67    /// Convert to the on-wire `u8` representation.
68    #[inline]
69    pub fn to_u8(self) -> u8 {
70        match self {
71            Self::Request => 0x00,
72            Self::Response => 0x01,
73            Self::Push => 0x02,
74            Self::Error => 0x04,
75            Self::Other(v) => v,
76        }
77    }
78}
79
80// ---------------------------------------------------------------------------
81// FrameHeader
82// ---------------------------------------------------------------------------
83
84/// Parsed representation of the 11-byte frame header.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct FrameHeader {
87    /// Protocol version (always [`PROTOCOL_VERSION`]).
88    pub version: u8,
89    /// Transport identifier: 0=protocol (reserved for future use).
90    pub reserved: u8,
91    /// Message type tag.
92    pub msg_type: MsgType,
93    /// Monotonically increasing sequence number (LE).
94    pub sequence: u32,
95    /// Length of the payload that follows this header (LE).
96    pub payload_len: u32,
97}
98
99impl FrameHeader {
100    /// Serialise the header into `buf` (appends exactly [`FRAME_HEADER_SIZE`] bytes).
101    #[inline]
102    pub fn write_to(&self, buf: &mut Vec<u8>) {
103        let seq = self.sequence.to_le_bytes();
104        let plen = self.payload_len.to_le_bytes();
105        let header: [u8; FRAME_HEADER_SIZE] = [
106            self.version,
107            self.reserved,
108            self.msg_type.to_u8(),
109            seq[0],
110            seq[1],
111            seq[2],
112            seq[3],
113            plen[0],
114            plen[1],
115            plen[2],
116            plen[3],
117        ];
118        buf.extend_from_slice(&header);
119    }
120
121    /// Attempt to parse a header from the first 11 bytes of `data`.
122    ///
123    /// Returns `None` if `data` is shorter than [`FRAME_HEADER_SIZE`].
124    #[inline]
125    pub fn read_from(data: &[u8]) -> Option<Self> {
126        if data.len() < FRAME_HEADER_SIZE {
127            return None;
128        }
129        let version = data[0];
130        if version != PROTOCOL_VERSION {
131            return None;
132        }
133        let reserved = data[1];
134        let msg_type = MsgType::from_u8(data[2]);
135        let sequence = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
136        let payload_len = u32::from_le_bytes([data[7], data[8], data[9], data[10]]);
137        Some(Self {
138            version,
139            reserved,
140            msg_type,
141            sequence,
142            payload_len,
143        })
144    }
145}
146
147// ---------------------------------------------------------------------------
148// frame_pack / frame_unpack
149// ---------------------------------------------------------------------------
150
151/// Build a complete frame: header bytes followed by payload bytes.
152#[inline]
153#[must_use]
154pub fn frame_pack(header: &FrameHeader, payload: &[u8]) -> Vec<u8> {
155    assert_eq!(
156        header.payload_len as usize,
157        payload.len(),
158        "frame_pack: header.payload_len ({}) != payload.len() ({})",
159        header.payload_len,
160        payload.len(),
161    );
162    let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
163    header.write_to(&mut buf);
164    buf.extend_from_slice(payload);
165    buf
166}
167
168/// Split a complete frame into its header and payload slice.
169///
170/// Returns `None` if the data is too short for the header, or if the
171/// remaining bytes are fewer than `payload_len`.
172#[inline]
173#[must_use]
174pub fn frame_unpack(data: &[u8]) -> Option<(FrameHeader, &[u8])> {
175    let header = FrameHeader::read_from(data)?;
176    let payload_end = FRAME_HEADER_SIZE.checked_add(header.payload_len as usize)?;
177    if data.len() < payload_end {
178        return None;
179    }
180    Some((header, &data[FRAME_HEADER_SIZE..payload_end]))
181}
182
183// ---------------------------------------------------------------------------
184// Encode / Decode traits
185// ---------------------------------------------------------------------------
186
187/// Encode a value into a byte buffer in conduit's binary wire format.
188pub trait Encode {
189    /// Append the encoded representation to `buf`.
190    fn encode(&self, buf: &mut Vec<u8>);
191
192    /// The exact number of bytes that [`encode`](Encode::encode)
193    /// will append.
194    fn encode_size(&self) -> usize;
195}
196
197/// Decode a value from a byte slice in conduit's binary wire format.
198///
199/// Returns the decoded value together with the number of bytes consumed,
200/// or `None` if the data is too short or malformed.
201pub trait Decode: Sized {
202    /// Attempt to decode from the start of `data`.
203    fn decode(data: &[u8]) -> Option<(Self, usize)>;
204}
205
206// ---------------------------------------------------------------------------
207// Primitive impls
208// ---------------------------------------------------------------------------
209
210macro_rules! impl_wire_int {
211    ($($ty:ty),+) => {
212        $(
213            impl Encode for $ty {
214                fn encode(&self, buf: &mut Vec<u8>) {
215                    buf.extend_from_slice(&self.to_le_bytes());
216                }
217
218                fn encode_size(&self) -> usize {
219                    std::mem::size_of::<$ty>()
220                }
221            }
222
223            impl Decode for $ty {
224                fn decode(data: &[u8]) -> Option<(Self, usize)> {
225                    const SIZE: usize = std::mem::size_of::<$ty>();
226                    if data.len() < SIZE {
227                        return None;
228                    }
229                    let arr: [u8; SIZE] = data[..SIZE].try_into().ok()?;
230                    Some((<$ty>::from_le_bytes(arr), SIZE))
231                }
232            }
233        )+
234    };
235}
236
237impl_wire_int!(u8, u16, u32, u64, i8, i16, i32, i64, f32, f64);
238
239// bool: encoded as a single byte (0 or 1).
240impl Encode for bool {
241    fn encode(&self, buf: &mut Vec<u8>) {
242        buf.push(u8::from(*self));
243    }
244
245    fn encode_size(&self) -> usize {
246        1
247    }
248}
249
250impl Decode for bool {
251    fn decode(data: &[u8]) -> Option<(Self, usize)> {
252        if data.is_empty() {
253            return None;
254        }
255        match data[0] {
256            0 => Some((false, 1)),
257            1 => Some((true, 1)),
258            _ => None,
259        }
260    }
261}
262
263// Vec<T>: 4-byte LE element count followed by each element encoded in sequence.
264// For Vec<u8>, this produces the same wire format as a length-prefixed byte blob
265// (count + N individual bytes = count + N raw bytes).
266impl<T: Encode> Encode for Vec<T> {
267    fn encode(&self, buf: &mut Vec<u8>) {
268        let count: u32 = self.len().try_into().unwrap_or_else(|_| {
269            panic!(
270                "conduit: vec too large ({} elements exceeds u32::MAX)",
271                self.len()
272            )
273        });
274        buf.extend_from_slice(&count.to_le_bytes());
275        for item in self {
276            item.encode(buf);
277        }
278    }
279
280    fn encode_size(&self) -> usize {
281        4 + self.iter().map(|item| item.encode_size()).sum::<usize>()
282    }
283}
284
285impl<T: Decode> Decode for Vec<T> {
286    fn decode(data: &[u8]) -> Option<(Self, usize)> {
287        if data.len() < 4 {
288            return None;
289        }
290        let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
291        let mut off = 4;
292        let mut items = Vec::with_capacity(count);
293        for _ in 0..count {
294            let (item, consumed) = T::decode(&data[off..])?;
295            off += consumed;
296            items.push(item);
297        }
298        Some((items, off))
299    }
300}
301
302// String: 4-byte LE length prefix followed by UTF-8 bytes.
303impl Encode for String {
304    fn encode(&self, buf: &mut Vec<u8>) {
305        let len: u32 = self.len().try_into().unwrap_or_else(|_| {
306            panic!(
307                "conduit: payload too large ({} bytes exceeds u32::MAX)",
308                self.len()
309            )
310        });
311        buf.extend_from_slice(&len.to_le_bytes());
312        buf.extend_from_slice(self.as_bytes());
313    }
314
315    fn encode_size(&self) -> usize {
316        4 + self.len()
317    }
318}
319
320impl Decode for String {
321    fn decode(data: &[u8]) -> Option<(Self, usize)> {
322        if data.len() < 4 {
323            return None;
324        }
325        let len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
326        // Length cannot exceed remaining buffer
327        if len > data.len() - 4 {
328            return None;
329        }
330        let total = 4 + len;
331        let s = std::str::from_utf8(&data[4..total]).ok()?;
332        Some((s.to_owned(), total))
333    }
334}
335
336// ---------------------------------------------------------------------------
337// Tests
338// ---------------------------------------------------------------------------
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn frame_header_roundtrip() {
346        let original = FrameHeader {
347            version: PROTOCOL_VERSION,
348            reserved: 0,
349            msg_type: MsgType::Request,
350            sequence: 42,
351            payload_len: 128,
352        };
353        let mut buf = Vec::new();
354        original.write_to(&mut buf);
355        assert_eq!(buf.len(), FRAME_HEADER_SIZE);
356        let parsed = FrameHeader::read_from(&buf).unwrap();
357        assert_eq!(original, parsed);
358    }
359
360    #[test]
361    fn frame_pack_unwrap() {
362        let header = FrameHeader {
363            version: PROTOCOL_VERSION,
364            reserved: 0,
365            msg_type: MsgType::Push,
366            sequence: 7,
367            payload_len: 5,
368        };
369        let payload = b"hello";
370        let frame = frame_pack(&header, payload);
371        assert_eq!(frame.len(), FRAME_HEADER_SIZE + 5);
372
373        let (parsed_header, parsed_payload) = frame_unpack(&frame).unwrap();
374        assert_eq!(parsed_header, header);
375        assert_eq!(parsed_payload, payload);
376    }
377
378    #[test]
379    fn frame_too_short() {
380        let short = [0u8; 5];
381        assert!(FrameHeader::read_from(&short).is_none());
382        assert!(frame_unpack(&short).is_none());
383    }
384
385    #[test]
386    fn encode_decode_primitives() {
387        // u8
388        let mut buf = Vec::new();
389        42u8.encode(&mut buf);
390        let (val, consumed) = u8::decode(&buf).unwrap();
391        assert_eq!(val, 42u8);
392        assert_eq!(consumed, 1);
393
394        // u32
395        buf.clear();
396        0xDEAD_BEEFu32.encode(&mut buf);
397        let (val, consumed) = u32::decode(&buf).unwrap();
398        assert_eq!(val, 0xDEAD_BEEFu32);
399        assert_eq!(consumed, 4);
400
401        // i64
402        buf.clear();
403        (-999_999i64).encode(&mut buf);
404        let (val, consumed) = i64::decode(&buf).unwrap();
405        assert_eq!(val, -999_999i64);
406        assert_eq!(consumed, 8);
407
408        // f64
409        buf.clear();
410        std::f64::consts::PI.encode(&mut buf);
411        let (val, consumed) = f64::decode(&buf).unwrap();
412        assert_eq!(val, std::f64::consts::PI);
413        assert_eq!(consumed, 8);
414
415        // bool
416        buf.clear();
417        true.encode(&mut buf);
418        let (val, consumed) = bool::decode(&buf).unwrap();
419        assert!(val);
420        assert_eq!(consumed, 1);
421
422        buf.clear();
423        false.encode(&mut buf);
424        let (val, consumed) = bool::decode(&buf).unwrap();
425        assert!(!val);
426        assert_eq!(consumed, 1);
427    }
428
429    #[test]
430    fn encode_decode_vec() {
431        let original: Vec<u8> = vec![0xCA, 0xFE, 0xBA, 0xBE];
432        let mut buf = Vec::new();
433        original.encode(&mut buf);
434        assert_eq!(buf.len(), 4 + 4); // 4-byte length + 4 bytes
435        let (decoded, consumed) = Vec::<u8>::decode(&buf).unwrap();
436        assert_eq!(decoded, original);
437        assert_eq!(consumed, 8);
438    }
439
440    #[test]
441    fn encode_decode_string() {
442        let original = String::from("conduit transport layer");
443        let mut buf = Vec::new();
444        original.encode(&mut buf);
445        assert_eq!(buf.len(), 4 + original.len());
446        let (decoded, consumed) = String::decode(&buf).unwrap();
447        assert_eq!(decoded, original);
448        assert_eq!(consumed, 4 + original.len());
449    }
450}