Skip to main content

subc_protocol/
lib.rs

1//! subc wire contract.
2//!
3//! This crate is the single source of truth for the subc <-> module wire,
4//! shared by subc-core and AFT. It defines the **envelope** (the fixed
5//! 17-byte routing header subc splices on), the canonical subc-generated body
6//! schemas such as [`ErrorBody`], and the capability manifest. JSON-RPC request
7//! and response bodies remain module-owned opaque payloads to subc.
8//!
9//! ## The envelope (locked — see docs/subc-core-architecture.md §4.8)
10//!
11//! ```text
12//!  offset  size  field     type    purpose
13//!    0      4    len       u32     # of BODY bytes after this 17-byte header
14//!    4      1    ver       u8      envelope version
15//!    5      1    type      u8      frame kind (see FrameType)
16//!    6      1    flags     u8      bit0 BINARY · bits1-2 PRIORITY · bit3 LAST · 4-7 reserved
17//!    7      2    channel   u16     route = (component, session); 0 = subc itself
18//!    9      8    corr      u64     correlation id; CANCEL carries the target call's corr
19//!   17 -> body
20//! ```
21//!
22//! Little-endian (same-machine, native, no byte-swap on the hot path).
23//!
24//! **Frozen prefix (the versioning invariant):** `len` (u32 @ 0) and `ver`
25//! (u8 @ 4) keep fixed meaning + position in *every* future version. A reader
26//! of any version can therefore always read the first 5 bytes, learn `ver`,
27//! look up that version's header length, read the rest, and splice `len` body
28//! bytes. `decode_header` enforces this discipline.
29
30#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod manifest;
37pub mod session;
38
39/// Per-route bind identity shared by client-facing and module-facing control.
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
41pub struct BindIdentity {
42    pub project_root: PathBuf,
43    pub harness: String,
44    pub session: String,
45}
46
47/// Explicit target for a route open/bind operation.
48///
49/// RouteTarget.kind ↔ ProviderRole mapping:
50///
51/// | RouteTarget.kind | required ProviderRole | disambiguator |
52/// |---|---|---|
53/// | `tool_provider` | `ToolProvider` | v1: ≤1 per module |
54/// | `management_surface` | `ManagementSurface` | v1: ≤1 per module |
55/// | `internal_service` | `InternalService` | `service_id` (multiple allowed) |
56///
57/// `ProviderRole::PipelineStage` is intentionally unroutable; pipeline modules
58/// are wired by an orchestrator rather than opened directly by clients.
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "kind", rename_all = "snake_case")]
61pub enum RouteTarget {
62    ToolProvider {
63        module_id: String,
64    },
65    ManagementSurface {
66        module_id: String,
67    },
68    InternalService {
69        module_id: String,
70        service_id: String,
71    },
72}
73
74/// Envelope protocol version this build speaks.
75pub const PROTOCOL_VERSION: u8 = 1;
76
77/// Fixed header length for `PROTOCOL_VERSION` 1.
78pub const HEADER_LEN: usize = 17;
79
80/// Bytes of the frozen prefix (`len` u32 + `ver` u8) that are stable across
81/// every envelope version. A reader needs only these to learn the version and
82/// thus the full header length.
83pub const FROZEN_PREFIX_LEN: usize = 5;
84
85/// Maximum v1 frame body accepted before allocation.
86///
87/// This 64 MiB starting cap prevents a malformed header from forcing an
88/// unbounded allocation. Future protocol versions can negotiate or encode a
89/// different cap while preserving the frozen prefix.
90pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
91
92/// Canonical JSON body for all subc-generated `ERROR` frames.
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
94pub struct ErrorBody {
95    pub code: String,
96    pub message: String,
97}
98
99/// Module-to-subc `HELLO` body used during module registration.
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct ModuleHelloBody {
102    pub manifest: manifest::ModuleManifest,
103    pub protocol_ver: u8,
104    #[serde(default)]
105    pub control_ops: Option<Vec<String>>,
106}
107
108/// subc-to-module `HELLO_ACK` body used during module registration.
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110pub struct ModuleHelloAckBody {
111    pub negotiated_ver: u8,
112    pub subc_ops: Vec<String>,
113    pub subc_capabilities: Vec<String>,
114}
115
116/// Frame kind (`type` byte at offset 5).
117///
118/// `CANCEL`, `PING`, `PONG`, and `GOODBYE` are pure-header frames (`len == 0`);
119/// only `HELLO`/`HELLO_ACK` and the RPC payloads carry bodies.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121#[repr(u8)]
122pub enum FrameType {
123    Request = 0,
124    Response = 1,
125    Push = 2,
126    StreamData = 3,
127    StreamEnd = 4,
128    Error = 5,
129    Cancel = 6,
130    Ping = 7,
131    Pong = 8,
132    Hello = 9,
133    HelloAck = 10,
134    Goodbye = 11,
135}
136
137impl FrameType {
138    /// Map the raw `type` byte to a `FrameType`, or `None` if unknown.
139    pub fn from_u8(b: u8) -> Option<Self> {
140        Some(match b {
141            0 => Self::Request,
142            1 => Self::Response,
143            2 => Self::Push,
144            3 => Self::StreamData,
145            4 => Self::StreamEnd,
146            5 => Self::Error,
147            6 => Self::Cancel,
148            7 => Self::Ping,
149            8 => Self::Pong,
150            9 => Self::Hello,
151            10 => Self::HelloAck,
152            11 => Self::Goodbye,
153            _ => return None,
154        })
155    }
156
157    pub fn is_pure_header(self) -> bool {
158        matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
159    }
160}
161
162/// Scheduling priority carried in `flags` bits 1-2. subc schedules on this
163/// without parsing the body.
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165#[repr(u8)]
166pub enum Priority {
167    Passive = 0,
168    Interactive = 1,
169    Background = 2,
170}
171
172impl Priority {
173    fn from_bits(bits: u8) -> Option<Self> {
174        Some(match bits {
175            0 => Self::Passive,
176            1 => Self::Interactive,
177            2 => Self::Background,
178            _ => return None,
179        })
180    }
181}
182
183const FLAG_BINARY: u8 = 0b0000_0001; // bit 0
184const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; // bits 1-2
185const FLAG_PRIORITY_SHIFT: u8 = 1;
186const FLAG_LAST: u8 = 0b0000_1000; // bit 3
187const FLAG_RESERVED_MASK: u8 = 0b1111_0000; // bits 4-7 must be zero
188
189/// The `flags` byte (offset 6): `bit0 BINARY · bits1-2 PRIORITY · bit3 LAST`.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub struct Flags(pub u8);
192
193impl Flags {
194    /// Build flags from typed components.
195    pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
196        let mut b = 0u8;
197        if binary {
198            b |= FLAG_BINARY;
199        }
200        b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
201        if last {
202            b |= FLAG_LAST;
203        }
204        Flags(b)
205    }
206
207    /// Body is raw bytes (bulk lane) rather than JSON-RPC.
208    pub fn is_binary(self) -> bool {
209        self.0 & FLAG_BINARY != 0
210    }
211
212    /// Final frame of a streamed message.
213    pub fn is_last(self) -> bool {
214        self.0 & FLAG_LAST != 0
215    }
216
217    /// Decode the priority bits, or `None` if they hold a reserved value.
218    pub fn priority(self) -> Option<Priority> {
219        Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
220    }
221
222    /// True if any reserved bit (4-7) is set — a malformed/forward frame.
223    pub fn has_reserved_bits(self) -> bool {
224        self.0 & FLAG_RESERVED_MASK != 0
225    }
226}
227
228/// A decoded envelope header. The body is the `len` bytes that follow it.
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub struct EnvelopeHeader {
231    /// Number of body bytes after the header.
232    pub len: u32,
233    /// Envelope version.
234    pub ver: u8,
235    /// Frame kind.
236    pub ty: FrameType,
237    /// Flag bits.
238    pub flags: Flags,
239    /// Route = (component, session); 0 = subc itself.
240    pub channel: u16,
241    /// Correlation id.
242    pub corr: u64,
243}
244
245impl EnvelopeHeader {
246    /// Serialize the header to its fixed 17-byte little-endian form.
247    pub fn encode(&self) -> [u8; HEADER_LEN] {
248        let mut buf = [0u8; HEADER_LEN];
249        buf[0..4].copy_from_slice(&self.len.to_le_bytes());
250        buf[4] = self.ver;
251        buf[5] = self.ty as u8;
252        buf[6] = self.flags.0;
253        buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
254        buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
255        buf
256    }
257}
258
259/// Why a header could not be decoded.
260#[derive(Debug, Clone, Copy, PartialEq, Eq)]
261pub enum DecodeError {
262    /// Fewer than `FROZEN_PREFIX_LEN` bytes — cannot even read `len`/`ver`.
263    TooShortForPrefix { have: usize },
264    /// `ver` is not a version this build understands.
265    UnsupportedVersion { ver: u8 },
266    /// Version known but fewer than its header length is present.
267    TooShortForHeader { have: usize, need: usize },
268    /// `type` byte is not a known `FrameType`.
269    UnknownFrameType { byte: u8 },
270    /// A reserved flag bit (4-7) is set.
271    ReservedFlagBits { flags: u8 },
272    /// Priority bits 1-2 hold the reserved value `0b11`.
273    ReservedPriorityBits { flags: u8 },
274    /// A pure-header frame declared body bytes.
275    PureHeaderFrameWithBody { ty: FrameType, len: u32 },
276}
277
278impl fmt::Display for DecodeError {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        match self {
281            Self::TooShortForPrefix { have } => {
282                write!(f, "header shorter than frozen prefix: have {have} bytes")
283            }
284            Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
285            Self::TooShortForHeader { have, need } => {
286                write!(
287                    f,
288                    "header too short for version: have {have} bytes, need {need}"
289                )
290            }
291            Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
292            Self::ReservedFlagBits { flags } => {
293                write!(f, "reserved flag bits set in flags 0b{flags:08b}")
294            }
295            Self::ReservedPriorityBits { flags } => {
296                write!(f, "reserved priority bits set in flags 0b{flags:08b}")
297            }
298            Self::PureHeaderFrameWithBody { ty, len } => {
299                write!(
300                    f,
301                    "pure-header frame {ty:?} declared non-zero body length {len}"
302                )
303            }
304        }
305    }
306}
307
308impl Error for DecodeError {}
309
310/// How many header bytes a given envelope version occupies. Driven by the
311/// frozen prefix: read `ver`, then learn the full header length here.
312fn header_len_for_version(ver: u8) -> Option<usize> {
313    match ver {
314        1 => Some(HEADER_LEN),
315        _ => None,
316    }
317}
318
319/// Decode an envelope header from the front of `bytes`, following the
320/// frozen-prefix discipline:
321/// 1. need at least the 5-byte prefix to read `len` + `ver`;
322/// 2. dispatch the full header length on `ver`;
323/// 3. need the full header present; then parse the rest.
324///
325/// Never panics on malformed input — returns a typed [`DecodeError`].
326pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
327    if bytes.len() < FROZEN_PREFIX_LEN {
328        return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
329    }
330    let ver = bytes[4];
331    let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
332    if bytes.len() < need {
333        return Err(DecodeError::TooShortForHeader {
334            have: bytes.len(),
335            need,
336        });
337    }
338
339    let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
340    let ty =
341        FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
342    let flags = Flags(bytes[6]);
343    if flags.has_reserved_bits() {
344        return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
345    }
346    if flags.priority().is_none() {
347        return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
348    }
349    if ty.is_pure_header() && len != 0 {
350        return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
351    }
352    let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
353    let corr = u64::from_le_bytes([
354        bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
355    ]);
356
357    Ok(EnvelopeHeader {
358        len,
359        ver,
360        ty,
361        flags,
362        channel,
363        corr,
364    })
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
372        EnvelopeHeader {
373            len,
374            ver: PROTOCOL_VERSION,
375            ty,
376            flags,
377            channel,
378            corr,
379        }
380    }
381
382    #[test]
383    fn bind_identity_round_trips_json() {
384        let identity = BindIdentity {
385            project_root: PathBuf::from("/tmp/project"),
386            harness: "opencode".to_string(),
387            session: "session-1".to_string(),
388        };
389
390        let encoded = serde_json::to_vec(&identity).unwrap();
391        let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
392
393        assert_eq!(decoded, identity);
394    }
395
396    #[test]
397    fn route_target_variants_round_trip_json() {
398        let targets = [
399            RouteTarget::ToolProvider {
400                module_id: "aft".to_string(),
401            },
402            RouteTarget::ManagementSurface {
403                module_id: "memory".to_string(),
404            },
405            RouteTarget::InternalService {
406                module_id: "bus".to_string(),
407                service_id: "dm".to_string(),
408            },
409        ];
410
411        for target in targets {
412            let encoded = serde_json::to_vec(&target).unwrap();
413            let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
414            assert_eq!(decoded, target);
415        }
416    }
417
418    #[test]
419    fn error_body_round_trips_json() {
420        let body = ErrorBody {
421            code: "config_divergence".to_string(),
422            message: "active config differs".to_string(),
423        };
424
425        let encoded = serde_json::to_vec(&body).unwrap();
426        let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
427
428        assert_eq!(decoded, body);
429    }
430
431    #[test]
432    fn round_trip_request() {
433        let h = hdr(
434            1234,
435            FrameType::Request,
436            Flags::new(false, Priority::Interactive, false),
437            42,
438            0xDEAD_BEEF_0000_0001,
439        );
440        let decoded = decode_header(&h.encode()).unwrap();
441        assert_eq!(h, decoded);
442    }
443
444    #[test]
445    fn round_trip_all_frame_types() {
446        for b in 0u8..=11 {
447            let ty = FrameType::from_u8(b).unwrap();
448            let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
449            assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
450        }
451    }
452
453    #[test]
454    fn pure_header_frame_has_zero_len() {
455        // CANCEL carries only header (len = 0) + the target corr.
456        let h = hdr(
457            0,
458            FrameType::Cancel,
459            Flags::new(false, Priority::Passive, false),
460            7,
461            99,
462        );
463        let d = decode_header(&h.encode()).unwrap();
464        assert_eq!(d.len, 0);
465        assert_eq!(d.corr, 99);
466    }
467
468    #[test]
469    fn flags_round_trip() {
470        let f = Flags::new(true, Priority::Background, true);
471        assert!(f.is_binary());
472        assert!(f.is_last());
473        assert_eq!(f.priority(), Some(Priority::Background));
474        let h = hdr(8, FrameType::StreamData, f, 1, 1);
475        assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
476    }
477
478    #[test]
479    fn little_endian_and_frozen_prefix_layout() {
480        // len = 1 occupies byte 0; ver sits at byte 4 (the frozen prefix).
481        let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
482        let buf = h.encode();
483        assert_eq!(buf[0], 1);
484        assert_eq!(buf[1..4], [0, 0, 0]);
485        assert_eq!(buf[4], PROTOCOL_VERSION); // ver @ offset 4
486        assert_eq!(buf.len(), HEADER_LEN);
487    }
488
489    #[test]
490    fn reject_too_short_for_prefix() {
491        assert_eq!(
492            decode_header(&[0, 0, 0, 0]),
493            Err(DecodeError::TooShortForPrefix { have: 4 })
494        );
495    }
496
497    #[test]
498    fn reject_too_short_for_header() {
499        // Valid 5-byte prefix (ver = 1) but header truncated.
500        let mut b = [0u8; 10];
501        b[4] = PROTOCOL_VERSION;
502        assert_eq!(
503            decode_header(&b),
504            Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
505        );
506    }
507
508    #[test]
509    fn reject_unsupported_version() {
510        let mut b = [0u8; HEADER_LEN];
511        b[4] = 2; // no header layout known for ver 2
512        assert_eq!(
513            decode_header(&b),
514            Err(DecodeError::UnsupportedVersion { ver: 2 })
515        );
516    }
517
518    #[test]
519    fn reject_unknown_frame_type() {
520        let mut b = [0u8; HEADER_LEN];
521        b[4] = PROTOCOL_VERSION;
522        b[5] = 99;
523        assert_eq!(
524            decode_header(&b),
525            Err(DecodeError::UnknownFrameType { byte: 99 })
526        );
527    }
528
529    #[test]
530    fn reject_reserved_flag_bits() {
531        let mut b = [0u8; HEADER_LEN];
532        b[4] = PROTOCOL_VERSION;
533        b[5] = FrameType::Request as u8;
534        b[6] = 0b1000_0000; // reserved bit 7 set
535        assert_eq!(
536            decode_header(&b),
537            Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
538        );
539    }
540
541    #[test]
542    fn reject_reserved_priority_bits() {
543        let mut b = [0u8; HEADER_LEN];
544        b[4] = PROTOCOL_VERSION;
545        b[5] = FrameType::Request as u8;
546        b[6] = 0b0000_0110; // priority bits 1-2 are reserved value 0b11
547        assert_eq!(
548            decode_header(&b),
549            Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
550        );
551    }
552
553    #[test]
554    fn reject_pure_header_frame_with_body_len() {
555        let h = hdr(
556            1,
557            FrameType::Ping,
558            Flags::new(false, Priority::Passive, false),
559            0,
560            1,
561        );
562        assert_eq!(
563            decode_header(&h.encode()),
564            Err(DecodeError::PureHeaderFrameWithBody {
565                ty: FrameType::Ping,
566                len: 1
567            })
568        );
569    }
570}