Skip to main content

conduit_core/
codec.rs

1//! Binary frame format and wire encoding traits for conduit.
2//!
3//! Every conduit message is framed with an 11-byte header followed by
4//! a variable-length payload. The [`Encode`] / [`Decode`] traits
5//! provide zero-copy-friendly serialisation for primitive types, byte
6//! vectors, and strings.
7//!
8//! # Frame layout (11 bytes)
9//!
10//! | Offset | Size | Field            | Notes                                  |
11//! |--------|------|------------------|----------------------------------------|
12//! | 0      | 1    | `version`        | Always [`PROTOCOL_VERSION`] (1)        |
13//! | 1      | 1    | `reserved` | 0=protocol (reserved for future use)   |
14//! | 2      | 1    | `msg_type`       | See [`MsgType`]                        |
15//! | 3      | 4    | `sequence`       | LE u32, monotonic counter              |
16//! | 7      | 4    | `payload_len`    | LE u32, byte length of trailing data   |
17
18/// Size of the binary frame header in bytes.
19pub const FRAME_HEADER_SIZE: usize = 11;
20
21/// Current protocol version written into every frame.
22pub const PROTOCOL_VERSION: u8 = 1;
23
24/// Per-frame overhead in the drain wire format: 4 bytes for the u32 LE length prefix.
25pub const DRAIN_FRAME_OVERHEAD: usize = 4;
26
27// ---------------------------------------------------------------------------
28// MsgType
29// ---------------------------------------------------------------------------
30
31/// Message-type tag carried in the frame header.
32///
33/// Known variants cover the core protocol; user-defined types start at `0x10`.
34/// Any `u8` value is accepted on the wire via [`MsgType::Other`].
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum MsgType {
37    /// Client-to-server request (`0x00`).
38    Request,
39    /// Server-to-client response (`0x01`).
40    Response,
41    /// Server push / event (`0x02`).
42    Push,
43    /// Error frame (`0x04`).
44    Error,
45    /// Any other message type (user-defined, `0x10`+).
46    ///
47    /// **Warning:** `Other(v)` where `v` matches a known variant (0x00, 0x01,
48    /// 0x02, 0x04) will NOT roundtrip: `MsgType::from_u8(v)` returns the
49    /// named variant, not `Other(v)`. Use values `>= 0x10` for custom types
50    /// to avoid aliasing.
51    Other(u8),
52}
53
54impl MsgType {
55    /// Convert from the on-wire `u8` representation.
56    #[inline]
57    pub fn from_u8(v: u8) -> Self {
58        match v {
59            0x00 => Self::Request,
60            0x01 => Self::Response,
61            0x02 => Self::Push,
62            0x04 => Self::Error,
63            other => Self::Other(other),
64        }
65    }
66
67    /// Convert to the on-wire `u8` representation.
68    #[inline]
69    pub fn to_u8(self) -> u8 {
70        match self {
71            Self::Request => 0x00,
72            Self::Response => 0x01,
73            Self::Push => 0x02,
74            Self::Error => 0x04,
75            Self::Other(v) => v,
76        }
77    }
78}
79
80// ---------------------------------------------------------------------------
81// FrameHeader
82// ---------------------------------------------------------------------------
83
84/// Parsed representation of the 11-byte frame header.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct FrameHeader {
87    /// Protocol version (always [`PROTOCOL_VERSION`]).
88    pub version: u8,
89    /// Transport identifier: 0=protocol (reserved for future use).
90    pub reserved: u8,
91    /// Message type tag.
92    pub msg_type: MsgType,
93    /// Monotonically increasing sequence number (LE).
94    pub sequence: u32,
95    /// Length of the payload that follows this header (LE).
96    pub payload_len: u32,
97}
98
99impl FrameHeader {
100    /// Serialise the header into `buf` (appends exactly [`FRAME_HEADER_SIZE`] bytes).
101    #[inline]
102    pub fn write_to(&self, buf: &mut Vec<u8>) {
103        let seq = self.sequence.to_le_bytes();
104        let plen = self.payload_len.to_le_bytes();
105        let header: [u8; FRAME_HEADER_SIZE] = [
106            self.version,
107            self.reserved,
108            self.msg_type.to_u8(),
109            seq[0],
110            seq[1],
111            seq[2],
112            seq[3],
113            plen[0],
114            plen[1],
115            plen[2],
116            plen[3],
117        ];
118        buf.extend_from_slice(&header);
119    }
120
121    /// Attempt to parse a header from the first 11 bytes of `data`.
122    ///
123    /// Returns `None` if `data` is shorter than [`FRAME_HEADER_SIZE`].
124    #[inline]
125    pub fn read_from(data: &[u8]) -> Option<Self> {
126        if data.len() < FRAME_HEADER_SIZE {
127            return None;
128        }
129        let version = data[0];
130        if version != PROTOCOL_VERSION {
131            return None;
132        }
133        let reserved = data[1];
134        let msg_type = MsgType::from_u8(data[2]);
135        let sequence = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
136        let payload_len = u32::from_le_bytes([data[7], data[8], data[9], data[10]]);
137        Some(Self {
138            version,
139            reserved,
140            msg_type,
141            sequence,
142            payload_len,
143        })
144    }
145}
146
147// ---------------------------------------------------------------------------
148// frame_pack / frame_unpack
149// ---------------------------------------------------------------------------
150
151/// Build a complete frame: header bytes followed by payload bytes.
152#[inline]
153#[must_use]
154pub fn frame_pack(header: &FrameHeader, payload: &[u8]) -> Vec<u8> {
155    assert_eq!(
156        header.payload_len as usize,
157        payload.len(),
158        "frame_pack: header.payload_len ({}) != payload.len() ({})",
159        header.payload_len,
160        payload.len(),
161    );
162    let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
163    header.write_to(&mut buf);
164    buf.extend_from_slice(payload);
165    buf
166}
167
168/// Split a complete frame into its header and payload slice.
169///
170/// Returns `None` if the data is too short for the header, or if the
171/// remaining bytes are fewer than `payload_len`.
172#[inline]
173#[must_use]
174pub fn frame_unpack(data: &[u8]) -> Option<(FrameHeader, &[u8])> {
175    let header = FrameHeader::read_from(data)?;
176    let payload_end = FRAME_HEADER_SIZE.checked_add(header.payload_len as usize)?;
177    if data.len() < payload_end {
178        return None;
179    }
180    Some((header, &data[FRAME_HEADER_SIZE..payload_end]))
181}
182
183// ---------------------------------------------------------------------------
184// Encode / Decode traits
185// ---------------------------------------------------------------------------
186
187/// Encode a value into a byte buffer in conduit's binary wire format.
188pub trait Encode {
189    /// Append the encoded representation to `buf`.
190    fn encode(&self, buf: &mut Vec<u8>);
191
192    /// The exact number of bytes that [`encode`](Encode::encode)
193    /// will append.
194    fn encode_size(&self) -> usize;
195}
196
197/// Decode a value from a byte slice in conduit's binary wire format.
198///
199/// Returns the decoded value together with the number of bytes consumed,
200/// or `None` if the data is too short or malformed.
201pub trait Decode: Sized {
202    /// Minimum number of bytes required to attempt decoding this type.
203    ///
204    /// For fixed-size types (primitives), this equals the exact encoded size.
205    /// For variable-size types (String, Vec), this is the minimum (the length
206    /// prefix size). Used by derived impls for an upfront bounds check.
207    const MIN_SIZE: usize = 0;
208
209    /// Attempt to decode from the start of `data`.
210    fn decode(data: &[u8]) -> Option<(Self, usize)>;
211}
212
213// ---------------------------------------------------------------------------
214// Primitive impls
215// ---------------------------------------------------------------------------
216
217macro_rules! impl_wire_int {
218    ($($ty:ty),+) => {
219        $(
220            impl Encode for $ty {
221                fn encode(&self, buf: &mut Vec<u8>) {
222                    buf.extend_from_slice(&self.to_le_bytes());
223                }
224
225                fn encode_size(&self) -> usize {
226                    std::mem::size_of::<$ty>()
227                }
228            }
229
230            impl Decode for $ty {
231                const MIN_SIZE: usize = std::mem::size_of::<$ty>();
232
233                fn decode(data: &[u8]) -> Option<(Self, usize)> {
234                    const SIZE: usize = std::mem::size_of::<$ty>();
235                    if data.len() < SIZE {
236                        return None;
237                    }
238                    let arr: [u8; SIZE] = data[..SIZE].try_into().ok()?;
239                    Some((<$ty>::from_le_bytes(arr), SIZE))
240                }
241            }
242        )+
243    };
244}
245
246impl_wire_int!(u8, u16, u32, u64, i8, i16, i32, i64, f32, f64);
247
248// bool: encoded as a single byte (0 or 1).
249impl Encode for bool {
250    fn encode(&self, buf: &mut Vec<u8>) {
251        buf.push(u8::from(*self));
252    }
253
254    fn encode_size(&self) -> usize {
255        1
256    }
257}
258
259impl Decode for bool {
260    const MIN_SIZE: usize = 1;
261
262    fn decode(data: &[u8]) -> Option<(Self, usize)> {
263        if data.is_empty() {
264            return None;
265        }
266        match data[0] {
267            0 => Some((false, 1)),
268            1 => Some((true, 1)),
269            _ => None,
270        }
271    }
272}
273
274// Vec<T>: 4-byte LE element count followed by each element encoded in sequence.
275// For Vec<u8>, this produces the same wire format as a length-prefixed byte blob
276// (count + N individual bytes = count + N raw bytes).
277impl<T: Encode> Encode for Vec<T> {
278    fn encode(&self, buf: &mut Vec<u8>) {
279        let count: u32 = self.len().try_into().unwrap_or_else(|_| {
280            panic!(
281                "conduit: vec too large ({} elements exceeds u32::MAX)",
282                self.len()
283            )
284        });
285        buf.extend_from_slice(&count.to_le_bytes());
286        for item in self {
287            item.encode(buf);
288        }
289    }
290
291    fn encode_size(&self) -> usize {
292        4 + self.iter().map(|item| item.encode_size()).sum::<usize>()
293    }
294}
295
296impl<T: Decode> Decode for Vec<T> {
297    const MIN_SIZE: usize = 4;
298
299    fn decode(data: &[u8]) -> Option<(Self, usize)> {
300        if data.len() < 4 {
301            return None;
302        }
303        let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
304        let mut off = 4;
305        let mut items = Vec::with_capacity(count);
306        for _ in 0..count {
307            let (item, consumed) = T::decode(&data[off..])?;
308            off += consumed;
309            items.push(item);
310        }
311        Some((items, off))
312    }
313}
314
315// String: 4-byte LE length prefix followed by UTF-8 bytes.
316impl Encode for String {
317    fn encode(&self, buf: &mut Vec<u8>) {
318        let len: u32 = self.len().try_into().unwrap_or_else(|_| {
319            panic!(
320                "conduit: payload too large ({} bytes exceeds u32::MAX)",
321                self.len()
322            )
323        });
324        buf.extend_from_slice(&len.to_le_bytes());
325        buf.extend_from_slice(self.as_bytes());
326    }
327
328    fn encode_size(&self) -> usize {
329        4 + self.len()
330    }
331}
332
333impl Decode for String {
334    const MIN_SIZE: usize = 4;
335
336    fn decode(data: &[u8]) -> Option<(Self, usize)> {
337        if data.len() < 4 {
338            return None;
339        }
340        let len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
341        // Length cannot exceed remaining buffer
342        if len > data.len() - 4 {
343            return None;
344        }
345        let total = 4 + len;
346        let s = std::str::from_utf8(&data[4..total]).ok()?;
347        Some((s.to_owned(), total))
348    }
349}
350
351// ---------------------------------------------------------------------------
352// Bytes: optimized Vec<u8> wrapper with bulk encode/decode
353// ---------------------------------------------------------------------------
354
355/// A newtype wrapper around `Vec<u8>` with optimized binary Encode/Decode.
356///
357/// Unlike `Vec<u8>` which goes through the generic `Vec<T>` impl (decoding
358/// each byte individually), `Bytes` uses a single bulk copy for both encoding
359/// and decoding.
360///
361/// The wire format is identical to `Vec<u8>`: `[u32 LE count][bytes...]`.
362#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
363pub struct Bytes(pub Vec<u8>);
364
365impl From<Vec<u8>> for Bytes {
366    fn from(v: Vec<u8>) -> Self {
367        Self(v)
368    }
369}
370
371impl From<Bytes> for Vec<u8> {
372    fn from(b: Bytes) -> Self {
373        b.0
374    }
375}
376
377impl std::ops::Deref for Bytes {
378    type Target = [u8];
379    fn deref(&self) -> &[u8] {
380        &self.0
381    }
382}
383
384impl AsRef<[u8]> for Bytes {
385    fn as_ref(&self) -> &[u8] {
386        &self.0
387    }
388}
389
390impl Encode for Bytes {
391    fn encode(&self, buf: &mut Vec<u8>) {
392        let count: u32 = self.0.len().try_into().unwrap_or_else(|_| {
393            panic!(
394                "conduit: bytes too large ({} bytes exceeds u32::MAX)",
395                self.0.len()
396            )
397        });
398        buf.extend_from_slice(&count.to_le_bytes());
399        buf.extend_from_slice(&self.0);
400    }
401
402    fn encode_size(&self) -> usize {
403        4 + self.0.len()
404    }
405}
406
407impl Decode for Bytes {
408    const MIN_SIZE: usize = 4;
409
410    fn decode(data: &[u8]) -> Option<(Self, usize)> {
411        if data.len() < 4 {
412            return None;
413        }
414        let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
415        let total = 4usize.checked_add(count)?;
416        if data.len() < total {
417            return None;
418        }
419        Some((Bytes(data[4..total].to_vec()), total))
420    }
421}
422
423// ---------------------------------------------------------------------------
424// Tests
425// ---------------------------------------------------------------------------
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn frame_header_roundtrip() {
433        let original = FrameHeader {
434            version: PROTOCOL_VERSION,
435            reserved: 0,
436            msg_type: MsgType::Request,
437            sequence: 42,
438            payload_len: 128,
439        };
440        let mut buf = Vec::new();
441        original.write_to(&mut buf);
442        assert_eq!(buf.len(), FRAME_HEADER_SIZE);
443        let parsed = FrameHeader::read_from(&buf).unwrap();
444        assert_eq!(original, parsed);
445    }
446
447    #[test]
448    fn frame_pack_unwrap() {
449        let header = FrameHeader {
450            version: PROTOCOL_VERSION,
451            reserved: 0,
452            msg_type: MsgType::Push,
453            sequence: 7,
454            payload_len: 5,
455        };
456        let payload = b"hello";
457        let frame = frame_pack(&header, payload);
458        assert_eq!(frame.len(), FRAME_HEADER_SIZE + 5);
459
460        let (parsed_header, parsed_payload) = frame_unpack(&frame).unwrap();
461        assert_eq!(parsed_header, header);
462        assert_eq!(parsed_payload, payload);
463    }
464
465    #[test]
466    fn frame_too_short() {
467        let short = [0u8; 5];
468        assert!(FrameHeader::read_from(&short).is_none());
469        assert!(frame_unpack(&short).is_none());
470    }
471
472    #[test]
473    fn encode_decode_primitives() {
474        // u8
475        let mut buf = Vec::new();
476        42u8.encode(&mut buf);
477        let (val, consumed) = u8::decode(&buf).unwrap();
478        assert_eq!(val, 42u8);
479        assert_eq!(consumed, 1);
480
481        // u32
482        buf.clear();
483        0xDEAD_BEEFu32.encode(&mut buf);
484        let (val, consumed) = u32::decode(&buf).unwrap();
485        assert_eq!(val, 0xDEAD_BEEFu32);
486        assert_eq!(consumed, 4);
487
488        // i64
489        buf.clear();
490        (-999_999i64).encode(&mut buf);
491        let (val, consumed) = i64::decode(&buf).unwrap();
492        assert_eq!(val, -999_999i64);
493        assert_eq!(consumed, 8);
494
495        // f64
496        buf.clear();
497        std::f64::consts::PI.encode(&mut buf);
498        let (val, consumed) = f64::decode(&buf).unwrap();
499        assert_eq!(val, std::f64::consts::PI);
500        assert_eq!(consumed, 8);
501
502        // bool
503        buf.clear();
504        true.encode(&mut buf);
505        let (val, consumed) = bool::decode(&buf).unwrap();
506        assert!(val);
507        assert_eq!(consumed, 1);
508
509        buf.clear();
510        false.encode(&mut buf);
511        let (val, consumed) = bool::decode(&buf).unwrap();
512        assert!(!val);
513        assert_eq!(consumed, 1);
514    }
515
516    #[test]
517    fn encode_decode_vec() {
518        let original: Vec<u8> = vec![0xCA, 0xFE, 0xBA, 0xBE];
519        let mut buf = Vec::new();
520        original.encode(&mut buf);
521        assert_eq!(buf.len(), 4 + 4); // 4-byte length + 4 bytes
522        let (decoded, consumed) = Vec::<u8>::decode(&buf).unwrap();
523        assert_eq!(decoded, original);
524        assert_eq!(consumed, 8);
525    }
526
527    #[test]
528    fn encode_decode_string() {
529        let original = String::from("conduit transport layer");
530        let mut buf = Vec::new();
531        original.encode(&mut buf);
532        assert_eq!(buf.len(), 4 + original.len());
533        let (decoded, consumed) = String::decode(&buf).unwrap();
534        assert_eq!(decoded, original);
535        assert_eq!(consumed, 4 + original.len());
536    }
537
538    #[test]
539    fn encode_decode_bytes() {
540        let original = Bytes(vec![10, 20, 30, 40, 50]);
541        let mut buf = Vec::new();
542        original.encode(&mut buf);
543        assert_eq!(original.encode_size(), buf.len());
544        let (decoded, consumed) = Bytes::decode(&buf).unwrap();
545        assert_eq!(decoded, original);
546        assert_eq!(consumed, buf.len());
547    }
548
549    #[test]
550    fn bytes_empty() {
551        let original = Bytes(Vec::new());
552        let mut buf = Vec::new();
553        original.encode(&mut buf);
554        assert_eq!(buf.len(), 4); // just the count
555        let (decoded, consumed) = Bytes::decode(&buf).unwrap();
556        assert_eq!(decoded.0.len(), 0);
557        assert_eq!(consumed, 4);
558    }
559
560    #[test]
561    fn bytes_wire_compatible_with_vec_u8() {
562        // Verify Bytes and Vec<u8> produce identical wire format
563        let data: Vec<u8> = vec![1, 2, 3, 4, 5];
564        let bytes = Bytes(data.clone());
565
566        let mut buf_vec = Vec::new();
567        data.encode(&mut buf_vec);
568
569        let mut buf_bytes = Vec::new();
570        bytes.encode(&mut buf_bytes);
571
572        assert_eq!(
573            buf_vec, buf_bytes,
574            "Bytes and Vec<u8> must produce identical wire format"
575        );
576    }
577
578    #[test]
579    fn min_size_primitives() {
580        assert_eq!(<u8 as Decode>::MIN_SIZE, 1);
581        assert_eq!(<u16 as Decode>::MIN_SIZE, 2);
582        assert_eq!(<u32 as Decode>::MIN_SIZE, 4);
583        assert_eq!(<u64 as Decode>::MIN_SIZE, 8);
584        assert_eq!(<i8 as Decode>::MIN_SIZE, 1);
585        assert_eq!(<i16 as Decode>::MIN_SIZE, 2);
586        assert_eq!(<i32 as Decode>::MIN_SIZE, 4);
587        assert_eq!(<i64 as Decode>::MIN_SIZE, 8);
588        assert_eq!(<f32 as Decode>::MIN_SIZE, 4);
589        assert_eq!(<f64 as Decode>::MIN_SIZE, 8);
590        assert_eq!(<bool as Decode>::MIN_SIZE, 1);
591        assert_eq!(<String as Decode>::MIN_SIZE, 4);
592        assert_eq!(<Vec<u8> as Decode>::MIN_SIZE, 4);
593        assert_eq!(<Bytes as Decode>::MIN_SIZE, 4);
594    }
595}