Skip to main content

subc_protocol/
lib.rs

1//! subc wire contract.
2//!
3//! This crate is the single source of truth for the subc <-> module wire,
4//! shared by subc-core and AFT. It defines the **envelope** (the fixed
5//! 17-byte routing header subc splices on), the canonical subc-generated body
6//! schemas such as [`ErrorBody`], and the capability manifest. JSON-RPC request
7//! and response bodies remain module-owned opaque payloads to subc.
8//!
9//! ## The envelope (locked — see docs/subc-core-architecture.md §4.8)
10//!
11//! ```text
12//!  offset  size  field     type    purpose
13//!    0      4    len       u32     # of BODY bytes after this 17-byte header
14//!    4      1    ver       u8      envelope version
15//!    5      1    type      u8      frame kind (see FrameType)
16//!    6      1    flags     u8      bit0 BINARY · bits1-2 PRIORITY · bit3 LAST · 4-7 reserved
17//!    7      2    channel   u16     route = (component, session); 0 = subc itself
18//!    9      8    corr      u64     correlation id; CANCEL carries the target call's corr
19//!   17 -> body
20//! ```
21//!
22//! Little-endian (same-machine, native, no byte-swap on the hot path).
23//!
24//! **Frozen prefix (the versioning invariant):** `len` (u32 @ 0) and `ver`
25//! (u8 @ 4) keep fixed meaning + position in *every* future version. A reader
26//! of any version can therefore always read the first 5 bytes, learn `ver`,
27//! look up that version's header length, read the rest, and splice `len` body
28//! bytes. `decode_header` enforces this discipline.
29
30#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt};
33
34use serde::{Deserialize, Serialize};
35
36pub mod manifest;
37pub mod session;
38
39/// Envelope protocol version this build speaks.
40pub const PROTOCOL_VERSION: u8 = 1;
41
42/// Fixed header length for `PROTOCOL_VERSION` 1.
43pub const HEADER_LEN: usize = 17;
44
45/// Bytes of the frozen prefix (`len` u32 + `ver` u8) that are stable across
46/// every envelope version. A reader needs only these to learn the version and
47/// thus the full header length.
48pub const FROZEN_PREFIX_LEN: usize = 5;
49
50/// Maximum v1 frame body accepted before allocation.
51///
52/// This 64 MiB starting cap prevents a malformed header from forcing an
53/// unbounded allocation. Future protocol versions can negotiate or encode a
54/// different cap while preserving the frozen prefix.
55pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
56
57/// Canonical JSON body for all subc-generated `ERROR` frames.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59pub struct ErrorBody {
60    pub code: String,
61    pub message: String,
62}
63
64/// Frame kind (`type` byte at offset 5).
65///
66/// `CANCEL`, `PING`, `PONG`, and `GOODBYE` are pure-header frames (`len == 0`);
67/// only `HELLO`/`HELLO_ACK` and the RPC payloads carry bodies.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69#[repr(u8)]
70pub enum FrameType {
71    Request = 0,
72    Response = 1,
73    Push = 2,
74    StreamData = 3,
75    StreamEnd = 4,
76    Error = 5,
77    Cancel = 6,
78    Ping = 7,
79    Pong = 8,
80    Hello = 9,
81    HelloAck = 10,
82    Goodbye = 11,
83}
84
85impl FrameType {
86    /// Map the raw `type` byte to a `FrameType`, or `None` if unknown.
87    pub fn from_u8(b: u8) -> Option<Self> {
88        Some(match b {
89            0 => Self::Request,
90            1 => Self::Response,
91            2 => Self::Push,
92            3 => Self::StreamData,
93            4 => Self::StreamEnd,
94            5 => Self::Error,
95            6 => Self::Cancel,
96            7 => Self::Ping,
97            8 => Self::Pong,
98            9 => Self::Hello,
99            10 => Self::HelloAck,
100            11 => Self::Goodbye,
101            _ => return None,
102        })
103    }
104
105    pub fn is_pure_header(self) -> bool {
106        matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
107    }
108}
109
110/// Scheduling priority carried in `flags` bits 1-2. subc schedules on this
111/// without parsing the body.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113#[repr(u8)]
114pub enum Priority {
115    Passive = 0,
116    Interactive = 1,
117    Background = 2,
118}
119
120impl Priority {
121    fn from_bits(bits: u8) -> Option<Self> {
122        Some(match bits {
123            0 => Self::Passive,
124            1 => Self::Interactive,
125            2 => Self::Background,
126            _ => return None,
127        })
128    }
129}
130
131const FLAG_BINARY: u8 = 0b0000_0001; // bit 0
132const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; // bits 1-2
133const FLAG_PRIORITY_SHIFT: u8 = 1;
134const FLAG_LAST: u8 = 0b0000_1000; // bit 3
135const FLAG_RESERVED_MASK: u8 = 0b1111_0000; // bits 4-7 must be zero
136
137/// The `flags` byte (offset 6): `bit0 BINARY · bits1-2 PRIORITY · bit3 LAST`.
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub struct Flags(pub u8);
140
141impl Flags {
142    /// Build flags from typed components.
143    pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
144        let mut b = 0u8;
145        if binary {
146            b |= FLAG_BINARY;
147        }
148        b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
149        if last {
150            b |= FLAG_LAST;
151        }
152        Flags(b)
153    }
154
155    /// Body is raw bytes (bulk lane) rather than JSON-RPC.
156    pub fn is_binary(self) -> bool {
157        self.0 & FLAG_BINARY != 0
158    }
159
160    /// Final frame of a streamed message.
161    pub fn is_last(self) -> bool {
162        self.0 & FLAG_LAST != 0
163    }
164
165    /// Decode the priority bits, or `None` if they hold a reserved value.
166    pub fn priority(self) -> Option<Priority> {
167        Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
168    }
169
170    /// True if any reserved bit (4-7) is set — a malformed/forward frame.
171    pub fn has_reserved_bits(self) -> bool {
172        self.0 & FLAG_RESERVED_MASK != 0
173    }
174}
175
176/// A decoded envelope header. The body is the `len` bytes that follow it.
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub struct EnvelopeHeader {
179    /// Number of body bytes after the header.
180    pub len: u32,
181    /// Envelope version.
182    pub ver: u8,
183    /// Frame kind.
184    pub ty: FrameType,
185    /// Flag bits.
186    pub flags: Flags,
187    /// Route = (component, session); 0 = subc itself.
188    pub channel: u16,
189    /// Correlation id.
190    pub corr: u64,
191}
192
193impl EnvelopeHeader {
194    /// Serialize the header to its fixed 17-byte little-endian form.
195    pub fn encode(&self) -> [u8; HEADER_LEN] {
196        let mut buf = [0u8; HEADER_LEN];
197        buf[0..4].copy_from_slice(&self.len.to_le_bytes());
198        buf[4] = self.ver;
199        buf[5] = self.ty as u8;
200        buf[6] = self.flags.0;
201        buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
202        buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
203        buf
204    }
205}
206
207/// Why a header could not be decoded.
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209pub enum DecodeError {
210    /// Fewer than `FROZEN_PREFIX_LEN` bytes — cannot even read `len`/`ver`.
211    TooShortForPrefix { have: usize },
212    /// `ver` is not a version this build understands.
213    UnsupportedVersion { ver: u8 },
214    /// Version known but fewer than its header length is present.
215    TooShortForHeader { have: usize, need: usize },
216    /// `type` byte is not a known `FrameType`.
217    UnknownFrameType { byte: u8 },
218    /// A reserved flag bit (4-7) is set.
219    ReservedFlagBits { flags: u8 },
220    /// Priority bits 1-2 hold the reserved value `0b11`.
221    ReservedPriorityBits { flags: u8 },
222    /// A pure-header frame declared body bytes.
223    PureHeaderFrameWithBody { ty: FrameType, len: u32 },
224}
225
226impl fmt::Display for DecodeError {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        match self {
229            Self::TooShortForPrefix { have } => {
230                write!(f, "header shorter than frozen prefix: have {have} bytes")
231            }
232            Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
233            Self::TooShortForHeader { have, need } => {
234                write!(
235                    f,
236                    "header too short for version: have {have} bytes, need {need}"
237                )
238            }
239            Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
240            Self::ReservedFlagBits { flags } => {
241                write!(f, "reserved flag bits set in flags 0b{flags:08b}")
242            }
243            Self::ReservedPriorityBits { flags } => {
244                write!(f, "reserved priority bits set in flags 0b{flags:08b}")
245            }
246            Self::PureHeaderFrameWithBody { ty, len } => {
247                write!(
248                    f,
249                    "pure-header frame {ty:?} declared non-zero body length {len}"
250                )
251            }
252        }
253    }
254}
255
256impl Error for DecodeError {}
257
258/// How many header bytes a given envelope version occupies. Driven by the
259/// frozen prefix: read `ver`, then learn the full header length here.
260fn header_len_for_version(ver: u8) -> Option<usize> {
261    match ver {
262        1 => Some(HEADER_LEN),
263        _ => None,
264    }
265}
266
267/// Decode an envelope header from the front of `bytes`, following the
268/// frozen-prefix discipline:
269/// 1. need at least the 5-byte prefix to read `len` + `ver`;
270/// 2. dispatch the full header length on `ver`;
271/// 3. need the full header present; then parse the rest.
272///
273/// Never panics on malformed input — returns a typed [`DecodeError`].
274pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
275    if bytes.len() < FROZEN_PREFIX_LEN {
276        return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
277    }
278    let ver = bytes[4];
279    let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
280    if bytes.len() < need {
281        return Err(DecodeError::TooShortForHeader {
282            have: bytes.len(),
283            need,
284        });
285    }
286
287    let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
288    let ty =
289        FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
290    let flags = Flags(bytes[6]);
291    if flags.has_reserved_bits() {
292        return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
293    }
294    if flags.priority().is_none() {
295        return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
296    }
297    if ty.is_pure_header() && len != 0 {
298        return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
299    }
300    let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
301    let corr = u64::from_le_bytes([
302        bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
303    ]);
304
305    Ok(EnvelopeHeader {
306        len,
307        ver,
308        ty,
309        flags,
310        channel,
311        corr,
312    })
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
320        EnvelopeHeader {
321            len,
322            ver: PROTOCOL_VERSION,
323            ty,
324            flags,
325            channel,
326            corr,
327        }
328    }
329
330    #[test]
331    fn round_trip_request() {
332        let h = hdr(
333            1234,
334            FrameType::Request,
335            Flags::new(false, Priority::Interactive, false),
336            42,
337            0xDEAD_BEEF_0000_0001,
338        );
339        let decoded = decode_header(&h.encode()).unwrap();
340        assert_eq!(h, decoded);
341    }
342
343    #[test]
344    fn round_trip_all_frame_types() {
345        for b in 0u8..=11 {
346            let ty = FrameType::from_u8(b).unwrap();
347            let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
348            assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
349        }
350    }
351
352    #[test]
353    fn pure_header_frame_has_zero_len() {
354        // CANCEL carries only header (len = 0) + the target corr.
355        let h = hdr(
356            0,
357            FrameType::Cancel,
358            Flags::new(false, Priority::Passive, false),
359            7,
360            99,
361        );
362        let d = decode_header(&h.encode()).unwrap();
363        assert_eq!(d.len, 0);
364        assert_eq!(d.corr, 99);
365    }
366
367    #[test]
368    fn flags_round_trip() {
369        let f = Flags::new(true, Priority::Background, true);
370        assert!(f.is_binary());
371        assert!(f.is_last());
372        assert_eq!(f.priority(), Some(Priority::Background));
373        let h = hdr(8, FrameType::StreamData, f, 1, 1);
374        assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
375    }
376
377    #[test]
378    fn little_endian_and_frozen_prefix_layout() {
379        // len = 1 occupies byte 0; ver sits at byte 4 (the frozen prefix).
380        let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
381        let buf = h.encode();
382        assert_eq!(buf[0], 1);
383        assert_eq!(buf[1..4], [0, 0, 0]);
384        assert_eq!(buf[4], PROTOCOL_VERSION); // ver @ offset 4
385        assert_eq!(buf.len(), HEADER_LEN);
386    }
387
388    #[test]
389    fn reject_too_short_for_prefix() {
390        assert_eq!(
391            decode_header(&[0, 0, 0, 0]),
392            Err(DecodeError::TooShortForPrefix { have: 4 })
393        );
394    }
395
396    #[test]
397    fn reject_too_short_for_header() {
398        // Valid 5-byte prefix (ver = 1) but header truncated.
399        let mut b = [0u8; 10];
400        b[4] = PROTOCOL_VERSION;
401        assert_eq!(
402            decode_header(&b),
403            Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
404        );
405    }
406
407    #[test]
408    fn reject_unsupported_version() {
409        let mut b = [0u8; HEADER_LEN];
410        b[4] = 2; // no header layout known for ver 2
411        assert_eq!(
412            decode_header(&b),
413            Err(DecodeError::UnsupportedVersion { ver: 2 })
414        );
415    }
416
417    #[test]
418    fn reject_unknown_frame_type() {
419        let mut b = [0u8; HEADER_LEN];
420        b[4] = PROTOCOL_VERSION;
421        b[5] = 99;
422        assert_eq!(
423            decode_header(&b),
424            Err(DecodeError::UnknownFrameType { byte: 99 })
425        );
426    }
427
428    #[test]
429    fn reject_reserved_flag_bits() {
430        let mut b = [0u8; HEADER_LEN];
431        b[4] = PROTOCOL_VERSION;
432        b[5] = FrameType::Request as u8;
433        b[6] = 0b1000_0000; // reserved bit 7 set
434        assert_eq!(
435            decode_header(&b),
436            Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
437        );
438    }
439
440    #[test]
441    fn reject_reserved_priority_bits() {
442        let mut b = [0u8; HEADER_LEN];
443        b[4] = PROTOCOL_VERSION;
444        b[5] = FrameType::Request as u8;
445        b[6] = 0b0000_0110; // priority bits 1-2 are reserved value 0b11
446        assert_eq!(
447            decode_header(&b),
448            Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
449        );
450    }
451
452    #[test]
453    fn reject_pure_header_frame_with_body_len() {
454        let h = hdr(
455            1,
456            FrameType::Ping,
457            Flags::new(false, Priority::Passive, false),
458            0,
459            1,
460        );
461        assert_eq!(
462            decode_header(&h.encode()),
463            Err(DecodeError::PureHeaderFrameWithBody {
464                ty: FrameType::Ping,
465                len: 1
466            })
467        );
468    }
469}