Skip to main content

subc_protocol/
lib.rs

1//! subc wire contract.
2//!
3//! This crate is the single source of truth for the subc <-> module wire,
4//! shared by subc-core and AFT. It defines the **envelope** (the fixed
5//! 17-byte routing header subc splices on), the canonical subc-generated body
6//! schemas such as [`ErrorBody`], and the capability manifest. JSON-RPC request
7//! and response bodies remain module-owned opaque payloads to subc.
8//!
9//! ## The envelope (locked — see docs/subc-core-architecture.md §4.8)
10//!
11//! ```text
12//!  offset  size  field     type    purpose
13//!    0      4    len       u32     # of BODY bytes after this 17-byte header
14//!    4      1    ver       u8      envelope version
15//!    5      1    type      u8      frame kind (see FrameType)
16//!    6      1    flags     u8      bit0 BINARY · bits1-2 PRIORITY · bit3 LAST · 4-7 reserved
17//!    7      2    channel   u16     route = (component, session); 0 = subc itself
18//!    9      8    corr      u64     correlation id; CANCEL carries the target call's corr
19//!   17 -> body
20//! ```
21//!
22//! Little-endian (same-machine, native, no byte-swap on the hot path).
23//!
24//! **Frozen prefix (the versioning invariant):** `len` (u32 @ 0) and `ver`
25//! (u8 @ 4) keep fixed meaning + position in *every* future version. A reader
26//! of any version can therefore always read the first 5 bytes, learn `ver`,
27//! look up that version's header length, read the rest, and splice `len` body
28//! bytes. `decode_header` enforces this discipline.
29
30#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod frame;
37pub mod manifest;
38pub mod session;
39
40pub use frame::{Frame, FrameBuildError};
41
42/// Per-route bind identity shared by client-facing and module-facing control.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct BindIdentity {
45    pub project_root: PathBuf,
46    pub harness: String,
47    pub session: String,
48}
49
50/// Explicit target for a route open/bind operation.
51///
52/// RouteTarget.kind ↔ ProviderRole mapping:
53///
54/// | RouteTarget.kind | required ProviderRole | disambiguator |
55/// |---|---|---|
56/// | `tool_provider` | `ToolProvider` | v1: ≤1 per module |
57/// | `management_surface` | `ManagementSurface` | v1: ≤1 per module |
58/// | `internal_service` | `InternalService` | `service_id` (multiple allowed) |
59///
60/// `ProviderRole::PipelineStage` is intentionally unroutable; pipeline modules
61/// are wired by an orchestrator rather than opened directly by clients.
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(tag = "kind", rename_all = "snake_case")]
64pub enum RouteTarget {
65    ToolProvider {
66        module_id: String,
67    },
68    ManagementSurface {
69        module_id: String,
70    },
71    InternalService {
72        module_id: String,
73        service_id: String,
74    },
75}
76
77/// Envelope protocol version this build speaks.
78pub const PROTOCOL_VERSION: u8 = 1;
79
80/// Env var subc sets on each supervised child telling it the module_id it is
81/// supervised under, so it can register under that id.
82pub const SUBC_MODULE_ID_ENV: &str = "SUBC_MODULE_ID";
83
84/// Fixed header length for `PROTOCOL_VERSION` 1.
85pub const HEADER_LEN: usize = 17;
86
87/// Bytes of the frozen prefix (`len` u32 + `ver` u8) that are stable across
88/// every envelope version. A reader needs only these to learn the version and
89/// thus the full header length.
90pub const FROZEN_PREFIX_LEN: usize = 5;
91
92/// Maximum v1 frame body accepted before allocation.
93///
94/// This 64 MiB starting cap prevents a malformed header from forcing an
95/// unbounded allocation. Future protocol versions can negotiate or encode a
96/// different cap while preserving the frozen prefix.
97pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
98
99/// Canonical JSON body for all subc-generated `ERROR` frames.
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct ErrorBody {
102    pub code: String,
103    pub message: String,
104}
105
106/// Module-to-subc `HELLO` body used during module registration.
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
108pub struct ModuleHelloBody {
109    pub manifest: manifest::ModuleManifest,
110    pub protocol_ver: u8,
111    #[serde(default)]
112    pub control_ops: Option<Vec<String>>,
113}
114
115/// subc-to-module `HELLO_ACK` body used during module registration.
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
117pub struct ModuleHelloAckBody {
118    pub negotiated_ver: u8,
119    pub subc_ops: Vec<String>,
120    pub subc_capabilities: Vec<String>,
121}
122
123/// Frame kind (`type` byte at offset 5).
124///
125/// `CANCEL`, `PING`, `PONG`, and `GOODBYE` are pure-header frames (`len == 0`);
126/// only `HELLO`/`HELLO_ACK` and the RPC payloads carry bodies.
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128#[repr(u8)]
129pub enum FrameType {
130    Request = 0,
131    Response = 1,
132    Push = 2,
133    StreamData = 3,
134    StreamEnd = 4,
135    Error = 5,
136    Cancel = 6,
137    Ping = 7,
138    Pong = 8,
139    Hello = 9,
140    HelloAck = 10,
141    Goodbye = 11,
142}
143
144impl FrameType {
145    /// Map the raw `type` byte to a `FrameType`, or `None` if unknown.
146    pub fn from_u8(b: u8) -> Option<Self> {
147        Some(match b {
148            0 => Self::Request,
149            1 => Self::Response,
150            2 => Self::Push,
151            3 => Self::StreamData,
152            4 => Self::StreamEnd,
153            5 => Self::Error,
154            6 => Self::Cancel,
155            7 => Self::Ping,
156            8 => Self::Pong,
157            9 => Self::Hello,
158            10 => Self::HelloAck,
159            11 => Self::Goodbye,
160            _ => return None,
161        })
162    }
163
164    pub fn is_pure_header(self) -> bool {
165        matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
166    }
167}
168
169/// Scheduling priority carried in `flags` bits 1-2. subc schedules on this
170/// without parsing the body.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172#[repr(u8)]
173pub enum Priority {
174    Passive = 0,
175    Interactive = 1,
176    Background = 2,
177}
178
179impl Priority {
180    fn from_bits(bits: u8) -> Option<Self> {
181        Some(match bits {
182            0 => Self::Passive,
183            1 => Self::Interactive,
184            2 => Self::Background,
185            _ => return None,
186        })
187    }
188}
189
190const FLAG_BINARY: u8 = 0b0000_0001; // bit 0
191const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; // bits 1-2
192const FLAG_PRIORITY_SHIFT: u8 = 1;
193const FLAG_LAST: u8 = 0b0000_1000; // bit 3
194const FLAG_RESERVED_MASK: u8 = 0b1111_0000; // bits 4-7 must be zero
195
196/// The `flags` byte (offset 6): `bit0 BINARY · bits1-2 PRIORITY · bit3 LAST`.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub struct Flags(pub u8);
199
200impl Flags {
201    /// Build flags from typed components.
202    pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
203        let mut b = 0u8;
204        if binary {
205            b |= FLAG_BINARY;
206        }
207        b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
208        if last {
209            b |= FLAG_LAST;
210        }
211        Flags(b)
212    }
213
214    /// Body is raw bytes (bulk lane) rather than JSON-RPC.
215    pub fn is_binary(self) -> bool {
216        self.0 & FLAG_BINARY != 0
217    }
218
219    /// Final frame of a streamed message.
220    pub fn is_last(self) -> bool {
221        self.0 & FLAG_LAST != 0
222    }
223
224    /// Decode the priority bits, or `None` if they hold a reserved value.
225    pub fn priority(self) -> Option<Priority> {
226        Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
227    }
228
229    /// True if any reserved bit (4-7) is set — a malformed/forward frame.
230    pub fn has_reserved_bits(self) -> bool {
231        self.0 & FLAG_RESERVED_MASK != 0
232    }
233}
234
235/// A decoded envelope header. The body is the `len` bytes that follow it.
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct EnvelopeHeader {
238    /// Number of body bytes after the header.
239    pub len: u32,
240    /// Envelope version.
241    pub ver: u8,
242    /// Frame kind.
243    pub ty: FrameType,
244    /// Flag bits.
245    pub flags: Flags,
246    /// Route = (component, session); 0 = subc itself.
247    pub channel: u16,
248    /// Correlation id.
249    pub corr: u64,
250}
251
252impl EnvelopeHeader {
253    /// Serialize the header to its fixed 17-byte little-endian form.
254    pub fn encode(&self) -> [u8; HEADER_LEN] {
255        let mut buf = [0u8; HEADER_LEN];
256        buf[0..4].copy_from_slice(&self.len.to_le_bytes());
257        buf[4] = self.ver;
258        buf[5] = self.ty as u8;
259        buf[6] = self.flags.0;
260        buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
261        buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
262        buf
263    }
264}
265
266/// Why a header could not be decoded.
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub enum DecodeError {
269    /// Fewer than `FROZEN_PREFIX_LEN` bytes — cannot even read `len`/`ver`.
270    TooShortForPrefix { have: usize },
271    /// `ver` is not a version this build understands.
272    UnsupportedVersion { ver: u8 },
273    /// Version known but fewer than its header length is present.
274    TooShortForHeader { have: usize, need: usize },
275    /// `type` byte is not a known `FrameType`.
276    UnknownFrameType { byte: u8 },
277    /// A reserved flag bit (4-7) is set.
278    ReservedFlagBits { flags: u8 },
279    /// Priority bits 1-2 hold the reserved value `0b11`.
280    ReservedPriorityBits { flags: u8 },
281    /// A pure-header frame declared body bytes.
282    PureHeaderFrameWithBody { ty: FrameType, len: u32 },
283}
284
285impl fmt::Display for DecodeError {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        match self {
288            Self::TooShortForPrefix { have } => {
289                write!(f, "header shorter than frozen prefix: have {have} bytes")
290            }
291            Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
292            Self::TooShortForHeader { have, need } => {
293                write!(
294                    f,
295                    "header too short for version: have {have} bytes, need {need}"
296                )
297            }
298            Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
299            Self::ReservedFlagBits { flags } => {
300                write!(f, "reserved flag bits set in flags 0b{flags:08b}")
301            }
302            Self::ReservedPriorityBits { flags } => {
303                write!(f, "reserved priority bits set in flags 0b{flags:08b}")
304            }
305            Self::PureHeaderFrameWithBody { ty, len } => {
306                write!(
307                    f,
308                    "pure-header frame {ty:?} declared non-zero body length {len}"
309                )
310            }
311        }
312    }
313}
314
315impl Error for DecodeError {}
316
317/// How many header bytes a given envelope version occupies. Driven by the
318/// frozen prefix: read `ver`, then learn the full header length here.
319fn header_len_for_version(ver: u8) -> Option<usize> {
320    match ver {
321        1 => Some(HEADER_LEN),
322        _ => None,
323    }
324}
325
326/// Decode an envelope header from the front of `bytes`, following the
327/// frozen-prefix discipline:
328/// 1. need at least the 5-byte prefix to read `len` + `ver`;
329/// 2. dispatch the full header length on `ver`;
330/// 3. need the full header present; then parse the rest.
331///
332/// Never panics on malformed input — returns a typed [`DecodeError`].
333pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
334    if bytes.len() < FROZEN_PREFIX_LEN {
335        return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
336    }
337    let ver = bytes[4];
338    let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
339    if bytes.len() < need {
340        return Err(DecodeError::TooShortForHeader {
341            have: bytes.len(),
342            need,
343        });
344    }
345
346    let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
347    let ty =
348        FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
349    let flags = Flags(bytes[6]);
350    if flags.has_reserved_bits() {
351        return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
352    }
353    if flags.priority().is_none() {
354        return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
355    }
356    if ty.is_pure_header() && len != 0 {
357        return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
358    }
359    let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
360    let corr = u64::from_le_bytes([
361        bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
362    ]);
363
364    Ok(EnvelopeHeader {
365        len,
366        ver,
367        ty,
368        flags,
369        channel,
370        corr,
371    })
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
379        EnvelopeHeader {
380            len,
381            ver: PROTOCOL_VERSION,
382            ty,
383            flags,
384            channel,
385            corr,
386        }
387    }
388
389    #[test]
390    fn bind_identity_round_trips_json() {
391        let identity = BindIdentity {
392            project_root: PathBuf::from("/tmp/project"),
393            harness: "opencode".to_string(),
394            session: "session-1".to_string(),
395        };
396
397        let encoded = serde_json::to_vec(&identity).unwrap();
398        let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
399
400        assert_eq!(decoded, identity);
401    }
402
403    #[test]
404    fn route_target_variants_round_trip_json() {
405        let targets = [
406            RouteTarget::ToolProvider {
407                module_id: "aft".to_string(),
408            },
409            RouteTarget::ManagementSurface {
410                module_id: "memory".to_string(),
411            },
412            RouteTarget::InternalService {
413                module_id: "bus".to_string(),
414                service_id: "dm".to_string(),
415            },
416        ];
417
418        for target in targets {
419            let encoded = serde_json::to_vec(&target).unwrap();
420            let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
421            assert_eq!(decoded, target);
422        }
423    }
424
425    #[test]
426    fn error_body_round_trips_json() {
427        let body = ErrorBody {
428            code: "config_divergence".to_string(),
429            message: "active config differs".to_string(),
430        };
431
432        let encoded = serde_json::to_vec(&body).unwrap();
433        let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
434
435        assert_eq!(decoded, body);
436    }
437
438    #[test]
439    fn round_trip_request() {
440        let h = hdr(
441            1234,
442            FrameType::Request,
443            Flags::new(false, Priority::Interactive, false),
444            42,
445            0xDEAD_BEEF_0000_0001,
446        );
447        let decoded = decode_header(&h.encode()).unwrap();
448        assert_eq!(h, decoded);
449    }
450
451    #[test]
452    fn round_trip_all_frame_types() {
453        for b in 0u8..=11 {
454            let ty = FrameType::from_u8(b).unwrap();
455            let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
456            assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
457        }
458    }
459
460    #[test]
461    fn pure_header_frame_has_zero_len() {
462        // CANCEL carries only header (len = 0) + the target corr.
463        let h = hdr(
464            0,
465            FrameType::Cancel,
466            Flags::new(false, Priority::Passive, false),
467            7,
468            99,
469        );
470        let d = decode_header(&h.encode()).unwrap();
471        assert_eq!(d.len, 0);
472        assert_eq!(d.corr, 99);
473    }
474
475    #[test]
476    fn flags_round_trip() {
477        let f = Flags::new(true, Priority::Background, true);
478        assert!(f.is_binary());
479        assert!(f.is_last());
480        assert_eq!(f.priority(), Some(Priority::Background));
481        let h = hdr(8, FrameType::StreamData, f, 1, 1);
482        assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
483    }
484
485    #[test]
486    fn little_endian_and_frozen_prefix_layout() {
487        // len = 1 occupies byte 0; ver sits at byte 4 (the frozen prefix).
488        let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
489        let buf = h.encode();
490        assert_eq!(buf[0], 1);
491        assert_eq!(buf[1..4], [0, 0, 0]);
492        assert_eq!(buf[4], PROTOCOL_VERSION); // ver @ offset 4
493        assert_eq!(buf.len(), HEADER_LEN);
494    }
495
496    #[test]
497    fn reject_too_short_for_prefix() {
498        assert_eq!(
499            decode_header(&[0, 0, 0, 0]),
500            Err(DecodeError::TooShortForPrefix { have: 4 })
501        );
502    }
503
504    #[test]
505    fn reject_too_short_for_header() {
506        // Valid 5-byte prefix (ver = 1) but header truncated.
507        let mut b = [0u8; 10];
508        b[4] = PROTOCOL_VERSION;
509        assert_eq!(
510            decode_header(&b),
511            Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
512        );
513    }
514
515    #[test]
516    fn reject_unsupported_version() {
517        let mut b = [0u8; HEADER_LEN];
518        b[4] = 2; // no header layout known for ver 2
519        assert_eq!(
520            decode_header(&b),
521            Err(DecodeError::UnsupportedVersion { ver: 2 })
522        );
523    }
524
525    #[test]
526    fn reject_unknown_frame_type() {
527        let mut b = [0u8; HEADER_LEN];
528        b[4] = PROTOCOL_VERSION;
529        b[5] = 99;
530        assert_eq!(
531            decode_header(&b),
532            Err(DecodeError::UnknownFrameType { byte: 99 })
533        );
534    }
535
536    #[test]
537    fn reject_reserved_flag_bits() {
538        let mut b = [0u8; HEADER_LEN];
539        b[4] = PROTOCOL_VERSION;
540        b[5] = FrameType::Request as u8;
541        b[6] = 0b1000_0000; // reserved bit 7 set
542        assert_eq!(
543            decode_header(&b),
544            Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
545        );
546    }
547
548    #[test]
549    fn reject_reserved_priority_bits() {
550        let mut b = [0u8; HEADER_LEN];
551        b[4] = PROTOCOL_VERSION;
552        b[5] = FrameType::Request as u8;
553        b[6] = 0b0000_0110; // priority bits 1-2 are reserved value 0b11
554        assert_eq!(
555            decode_header(&b),
556            Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
557        );
558    }
559
560    #[test]
561    fn reject_pure_header_frame_with_body_len() {
562        let h = hdr(
563            1,
564            FrameType::Ping,
565            Flags::new(false, Priority::Passive, false),
566            0,
567            1,
568        );
569        assert_eq!(
570            decode_header(&h.encode()),
571            Err(DecodeError::PureHeaderFrameWithBody {
572                ty: FrameType::Ping,
573                len: 1
574            })
575        );
576    }
577}