Skip to main content

subc_protocol/
lib.rs

1//! subc wire contract.
2//!
3//! This crate is the single source of truth for the subc <-> module wire,
4//! shared by subc-core and AFT. It defines the **envelope** (the fixed
5//! 17-byte routing header subc splices on), the canonical subc-generated body
6//! schemas such as [`ErrorBody`], and the capability manifest. JSON-RPC request
7//! and response bodies remain module-owned opaque payloads to subc.
8//!
9//! ## The envelope (locked — see docs/subc-core-architecture.md §4.8)
10//!
11//! ```text
12//!  offset  size  field     type    purpose
13//!    0      4    len       u32     # of BODY bytes after this 17-byte header
14//!    4      1    ver       u8      envelope version
15//!    5      1    type      u8      frame kind (see FrameType)
16//!    6      1    flags     u8      bit0 BINARY · bits1-2 PRIORITY · bit3 LAST · 4-7 reserved
17//!    7      2    channel   u16     route = (component, session); 0 = subc itself
18//!    9      8    corr      u64     correlation id; CANCEL carries the target call's corr
19//!   17 -> body
20//! ```
21//!
22//! Little-endian (same-machine, native, no byte-swap on the hot path).
23//!
24//! **Frozen prefix (the versioning invariant):** `len` (u32 @ 0) and `ver`
25//! (u8 @ 4) keep fixed meaning + position in *every* future version. A reader
26//! of any version can therefore always read the first 5 bytes, learn `ver`,
27//! look up that version's header length, read the rest, and splice `len` body
28//! bytes. `decode_header` enforces this discipline.
29
30#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod frame;
37pub mod manifest;
38pub mod session;
39
40pub use frame::{Frame, FrameBuildError};
41
42/// Per-route bind identity shared by client-facing and module-facing control.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct BindIdentity {
45    pub project_root: PathBuf,
46    pub harness: String,
47    pub session: String,
48}
49
50/// Explicit target for a route open/bind operation.
51///
52/// RouteTarget.kind ↔ ProviderRole mapping:
53///
54/// | RouteTarget.kind | required ProviderRole | disambiguator |
55/// |---|---|---|
56/// | `tool_provider` | `ToolProvider` | v1: ≤1 per module |
57/// | `management_surface` | `ManagementSurface` | v1: ≤1 per module |
58/// | `internal_service` | `InternalService` | `service_id` (multiple allowed) |
59///
60/// `ProviderRole::PipelineStage` is intentionally unroutable; pipeline modules
61/// are wired by an orchestrator rather than opened directly by clients.
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(tag = "kind", rename_all = "snake_case")]
64pub enum RouteTarget {
65    ToolProvider {
66        module_id: String,
67    },
68    ManagementSurface {
69        module_id: String,
70    },
71    InternalService {
72        module_id: String,
73        service_id: String,
74    },
75}
76
77/// Envelope protocol version this build speaks.
78pub const PROTOCOL_VERSION: u8 = 1;
79
80/// Fixed header length for `PROTOCOL_VERSION` 1.
81pub const HEADER_LEN: usize = 17;
82
83/// Bytes of the frozen prefix (`len` u32 + `ver` u8) that are stable across
84/// every envelope version. A reader needs only these to learn the version and
85/// thus the full header length.
86pub const FROZEN_PREFIX_LEN: usize = 5;
87
88/// Maximum v1 frame body accepted before allocation.
89///
90/// This 64 MiB starting cap prevents a malformed header from forcing an
91/// unbounded allocation. Future protocol versions can negotiate or encode a
92/// different cap while preserving the frozen prefix.
93pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
94
95/// Canonical JSON body for all subc-generated `ERROR` frames.
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct ErrorBody {
98    pub code: String,
99    pub message: String,
100}
101
102/// Module-to-subc `HELLO` body used during module registration.
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
104pub struct ModuleHelloBody {
105    pub manifest: manifest::ModuleManifest,
106    pub protocol_ver: u8,
107    #[serde(default)]
108    pub control_ops: Option<Vec<String>>,
109}
110
111/// subc-to-module `HELLO_ACK` body used during module registration.
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
113pub struct ModuleHelloAckBody {
114    pub negotiated_ver: u8,
115    pub subc_ops: Vec<String>,
116    pub subc_capabilities: Vec<String>,
117}
118
119/// Frame kind (`type` byte at offset 5).
120///
121/// `CANCEL`, `PING`, `PONG`, and `GOODBYE` are pure-header frames (`len == 0`);
122/// only `HELLO`/`HELLO_ACK` and the RPC payloads carry bodies.
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124#[repr(u8)]
125pub enum FrameType {
126    Request = 0,
127    Response = 1,
128    Push = 2,
129    StreamData = 3,
130    StreamEnd = 4,
131    Error = 5,
132    Cancel = 6,
133    Ping = 7,
134    Pong = 8,
135    Hello = 9,
136    HelloAck = 10,
137    Goodbye = 11,
138}
139
140impl FrameType {
141    /// Map the raw `type` byte to a `FrameType`, or `None` if unknown.
142    pub fn from_u8(b: u8) -> Option<Self> {
143        Some(match b {
144            0 => Self::Request,
145            1 => Self::Response,
146            2 => Self::Push,
147            3 => Self::StreamData,
148            4 => Self::StreamEnd,
149            5 => Self::Error,
150            6 => Self::Cancel,
151            7 => Self::Ping,
152            8 => Self::Pong,
153            9 => Self::Hello,
154            10 => Self::HelloAck,
155            11 => Self::Goodbye,
156            _ => return None,
157        })
158    }
159
160    pub fn is_pure_header(self) -> bool {
161        matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
162    }
163}
164
165/// Scheduling priority carried in `flags` bits 1-2. subc schedules on this
166/// without parsing the body.
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168#[repr(u8)]
169pub enum Priority {
170    Passive = 0,
171    Interactive = 1,
172    Background = 2,
173}
174
175impl Priority {
176    fn from_bits(bits: u8) -> Option<Self> {
177        Some(match bits {
178            0 => Self::Passive,
179            1 => Self::Interactive,
180            2 => Self::Background,
181            _ => return None,
182        })
183    }
184}
185
186const FLAG_BINARY: u8 = 0b0000_0001; // bit 0
187const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; // bits 1-2
188const FLAG_PRIORITY_SHIFT: u8 = 1;
189const FLAG_LAST: u8 = 0b0000_1000; // bit 3
190const FLAG_RESERVED_MASK: u8 = 0b1111_0000; // bits 4-7 must be zero
191
192/// The `flags` byte (offset 6): `bit0 BINARY · bits1-2 PRIORITY · bit3 LAST`.
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub struct Flags(pub u8);
195
196impl Flags {
197    /// Build flags from typed components.
198    pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
199        let mut b = 0u8;
200        if binary {
201            b |= FLAG_BINARY;
202        }
203        b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
204        if last {
205            b |= FLAG_LAST;
206        }
207        Flags(b)
208    }
209
210    /// Body is raw bytes (bulk lane) rather than JSON-RPC.
211    pub fn is_binary(self) -> bool {
212        self.0 & FLAG_BINARY != 0
213    }
214
215    /// Final frame of a streamed message.
216    pub fn is_last(self) -> bool {
217        self.0 & FLAG_LAST != 0
218    }
219
220    /// Decode the priority bits, or `None` if they hold a reserved value.
221    pub fn priority(self) -> Option<Priority> {
222        Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
223    }
224
225    /// True if any reserved bit (4-7) is set — a malformed/forward frame.
226    pub fn has_reserved_bits(self) -> bool {
227        self.0 & FLAG_RESERVED_MASK != 0
228    }
229}
230
231/// A decoded envelope header. The body is the `len` bytes that follow it.
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct EnvelopeHeader {
234    /// Number of body bytes after the header.
235    pub len: u32,
236    /// Envelope version.
237    pub ver: u8,
238    /// Frame kind.
239    pub ty: FrameType,
240    /// Flag bits.
241    pub flags: Flags,
242    /// Route = (component, session); 0 = subc itself.
243    pub channel: u16,
244    /// Correlation id.
245    pub corr: u64,
246}
247
248impl EnvelopeHeader {
249    /// Serialize the header to its fixed 17-byte little-endian form.
250    pub fn encode(&self) -> [u8; HEADER_LEN] {
251        let mut buf = [0u8; HEADER_LEN];
252        buf[0..4].copy_from_slice(&self.len.to_le_bytes());
253        buf[4] = self.ver;
254        buf[5] = self.ty as u8;
255        buf[6] = self.flags.0;
256        buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
257        buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
258        buf
259    }
260}
261
262/// Why a header could not be decoded.
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum DecodeError {
265    /// Fewer than `FROZEN_PREFIX_LEN` bytes — cannot even read `len`/`ver`.
266    TooShortForPrefix { have: usize },
267    /// `ver` is not a version this build understands.
268    UnsupportedVersion { ver: u8 },
269    /// Version known but fewer than its header length is present.
270    TooShortForHeader { have: usize, need: usize },
271    /// `type` byte is not a known `FrameType`.
272    UnknownFrameType { byte: u8 },
273    /// A reserved flag bit (4-7) is set.
274    ReservedFlagBits { flags: u8 },
275    /// Priority bits 1-2 hold the reserved value `0b11`.
276    ReservedPriorityBits { flags: u8 },
277    /// A pure-header frame declared body bytes.
278    PureHeaderFrameWithBody { ty: FrameType, len: u32 },
279}
280
281impl fmt::Display for DecodeError {
282    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283        match self {
284            Self::TooShortForPrefix { have } => {
285                write!(f, "header shorter than frozen prefix: have {have} bytes")
286            }
287            Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
288            Self::TooShortForHeader { have, need } => {
289                write!(
290                    f,
291                    "header too short for version: have {have} bytes, need {need}"
292                )
293            }
294            Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
295            Self::ReservedFlagBits { flags } => {
296                write!(f, "reserved flag bits set in flags 0b{flags:08b}")
297            }
298            Self::ReservedPriorityBits { flags } => {
299                write!(f, "reserved priority bits set in flags 0b{flags:08b}")
300            }
301            Self::PureHeaderFrameWithBody { ty, len } => {
302                write!(
303                    f,
304                    "pure-header frame {ty:?} declared non-zero body length {len}"
305                )
306            }
307        }
308    }
309}
310
311impl Error for DecodeError {}
312
313/// How many header bytes a given envelope version occupies. Driven by the
314/// frozen prefix: read `ver`, then learn the full header length here.
315fn header_len_for_version(ver: u8) -> Option<usize> {
316    match ver {
317        1 => Some(HEADER_LEN),
318        _ => None,
319    }
320}
321
322/// Decode an envelope header from the front of `bytes`, following the
323/// frozen-prefix discipline:
324/// 1. need at least the 5-byte prefix to read `len` + `ver`;
325/// 2. dispatch the full header length on `ver`;
326/// 3. need the full header present; then parse the rest.
327///
328/// Never panics on malformed input — returns a typed [`DecodeError`].
329pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
330    if bytes.len() < FROZEN_PREFIX_LEN {
331        return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
332    }
333    let ver = bytes[4];
334    let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
335    if bytes.len() < need {
336        return Err(DecodeError::TooShortForHeader {
337            have: bytes.len(),
338            need,
339        });
340    }
341
342    let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
343    let ty =
344        FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
345    let flags = Flags(bytes[6]);
346    if flags.has_reserved_bits() {
347        return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
348    }
349    if flags.priority().is_none() {
350        return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
351    }
352    if ty.is_pure_header() && len != 0 {
353        return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
354    }
355    let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
356    let corr = u64::from_le_bytes([
357        bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
358    ]);
359
360    Ok(EnvelopeHeader {
361        len,
362        ver,
363        ty,
364        flags,
365        channel,
366        corr,
367    })
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
375        EnvelopeHeader {
376            len,
377            ver: PROTOCOL_VERSION,
378            ty,
379            flags,
380            channel,
381            corr,
382        }
383    }
384
385    #[test]
386    fn bind_identity_round_trips_json() {
387        let identity = BindIdentity {
388            project_root: PathBuf::from("/tmp/project"),
389            harness: "opencode".to_string(),
390            session: "session-1".to_string(),
391        };
392
393        let encoded = serde_json::to_vec(&identity).unwrap();
394        let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
395
396        assert_eq!(decoded, identity);
397    }
398
399    #[test]
400    fn route_target_variants_round_trip_json() {
401        let targets = [
402            RouteTarget::ToolProvider {
403                module_id: "aft".to_string(),
404            },
405            RouteTarget::ManagementSurface {
406                module_id: "memory".to_string(),
407            },
408            RouteTarget::InternalService {
409                module_id: "bus".to_string(),
410                service_id: "dm".to_string(),
411            },
412        ];
413
414        for target in targets {
415            let encoded = serde_json::to_vec(&target).unwrap();
416            let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
417            assert_eq!(decoded, target);
418        }
419    }
420
421    #[test]
422    fn error_body_round_trips_json() {
423        let body = ErrorBody {
424            code: "config_divergence".to_string(),
425            message: "active config differs".to_string(),
426        };
427
428        let encoded = serde_json::to_vec(&body).unwrap();
429        let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
430
431        assert_eq!(decoded, body);
432    }
433
434    #[test]
435    fn round_trip_request() {
436        let h = hdr(
437            1234,
438            FrameType::Request,
439            Flags::new(false, Priority::Interactive, false),
440            42,
441            0xDEAD_BEEF_0000_0001,
442        );
443        let decoded = decode_header(&h.encode()).unwrap();
444        assert_eq!(h, decoded);
445    }
446
447    #[test]
448    fn round_trip_all_frame_types() {
449        for b in 0u8..=11 {
450            let ty = FrameType::from_u8(b).unwrap();
451            let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
452            assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
453        }
454    }
455
456    #[test]
457    fn pure_header_frame_has_zero_len() {
458        // CANCEL carries only header (len = 0) + the target corr.
459        let h = hdr(
460            0,
461            FrameType::Cancel,
462            Flags::new(false, Priority::Passive, false),
463            7,
464            99,
465        );
466        let d = decode_header(&h.encode()).unwrap();
467        assert_eq!(d.len, 0);
468        assert_eq!(d.corr, 99);
469    }
470
471    #[test]
472    fn flags_round_trip() {
473        let f = Flags::new(true, Priority::Background, true);
474        assert!(f.is_binary());
475        assert!(f.is_last());
476        assert_eq!(f.priority(), Some(Priority::Background));
477        let h = hdr(8, FrameType::StreamData, f, 1, 1);
478        assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
479    }
480
481    #[test]
482    fn little_endian_and_frozen_prefix_layout() {
483        // len = 1 occupies byte 0; ver sits at byte 4 (the frozen prefix).
484        let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
485        let buf = h.encode();
486        assert_eq!(buf[0], 1);
487        assert_eq!(buf[1..4], [0, 0, 0]);
488        assert_eq!(buf[4], PROTOCOL_VERSION); // ver @ offset 4
489        assert_eq!(buf.len(), HEADER_LEN);
490    }
491
492    #[test]
493    fn reject_too_short_for_prefix() {
494        assert_eq!(
495            decode_header(&[0, 0, 0, 0]),
496            Err(DecodeError::TooShortForPrefix { have: 4 })
497        );
498    }
499
500    #[test]
501    fn reject_too_short_for_header() {
502        // Valid 5-byte prefix (ver = 1) but header truncated.
503        let mut b = [0u8; 10];
504        b[4] = PROTOCOL_VERSION;
505        assert_eq!(
506            decode_header(&b),
507            Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
508        );
509    }
510
511    #[test]
512    fn reject_unsupported_version() {
513        let mut b = [0u8; HEADER_LEN];
514        b[4] = 2; // no header layout known for ver 2
515        assert_eq!(
516            decode_header(&b),
517            Err(DecodeError::UnsupportedVersion { ver: 2 })
518        );
519    }
520
521    #[test]
522    fn reject_unknown_frame_type() {
523        let mut b = [0u8; HEADER_LEN];
524        b[4] = PROTOCOL_VERSION;
525        b[5] = 99;
526        assert_eq!(
527            decode_header(&b),
528            Err(DecodeError::UnknownFrameType { byte: 99 })
529        );
530    }
531
532    #[test]
533    fn reject_reserved_flag_bits() {
534        let mut b = [0u8; HEADER_LEN];
535        b[4] = PROTOCOL_VERSION;
536        b[5] = FrameType::Request as u8;
537        b[6] = 0b1000_0000; // reserved bit 7 set
538        assert_eq!(
539            decode_header(&b),
540            Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
541        );
542    }
543
544    #[test]
545    fn reject_reserved_priority_bits() {
546        let mut b = [0u8; HEADER_LEN];
547        b[4] = PROTOCOL_VERSION;
548        b[5] = FrameType::Request as u8;
549        b[6] = 0b0000_0110; // priority bits 1-2 are reserved value 0b11
550        assert_eq!(
551            decode_header(&b),
552            Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
553        );
554    }
555
556    #[test]
557    fn reject_pure_header_frame_with_body_len() {
558        let h = hdr(
559            1,
560            FrameType::Ping,
561            Flags::new(false, Priority::Passive, false),
562            0,
563            1,
564        );
565        assert_eq!(
566            decode_header(&h.encode()),
567            Err(DecodeError::PureHeaderFrameWithBody {
568                ty: FrameType::Ping,
569                len: 1
570            })
571        );
572    }
573}