Skip to main content

subc_protocol/
lib.rs

1//! subc wire contract.
2//!
3//! This crate is the single source of truth for the subc <-> module wire,
4//! shared by subc-core and AFT. It defines the **envelope** (the fixed
5//! 17-byte routing header subc splices on), the canonical subc-generated body
6//! schemas such as [`ErrorBody`], and the capability manifest. JSON-RPC request
7//! and response bodies remain module-owned opaque payloads to subc.
8//!
9//! ## The envelope (locked — see docs/subc-core-architecture.md §4.8)
10//!
11//! ```text
12//!  offset  size  field     type    purpose
13//!    0      4    len       u32     # of BODY bytes after this 17-byte header
14//!    4      1    ver       u8      envelope version
15//!    5      1    type      u8      frame kind (see FrameType)
16//!    6      1    flags     u8      bit0 BINARY · bits1-2 PRIORITY · bit3 LAST · 4-7 reserved
17//!    7      2    channel   u16     route = (component, session); 0 = subc itself
18//!    9      8    corr      u64     correlation id; CANCEL carries the target call's corr
19//!   17 -> body
20//! ```
21//!
22//! Little-endian (same-machine, native, no byte-swap on the hot path).
23//!
24//! **Frozen prefix (the versioning invariant):** `len` (u32 @ 0) and `ver`
25//! (u8 @ 4) keep fixed meaning + position in *every* future version. A reader
26//! of any version can therefore always read the first 5 bytes, learn `ver`,
27//! look up that version's header length, read the rest, and splice `len` body
28//! bytes. `decode_header` enforces this discipline.
29
30#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod frame;
37pub mod manifest;
38pub mod session;
39
40pub use frame::{Frame, FrameBuildError};
41
42/// Per-route bind identity shared by client-facing and module-facing control.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct BindIdentity {
45    pub project_root: PathBuf,
46    pub harness: String,
47    pub session: String,
48}
49
50/// Explicit target for a route open/bind operation.
51///
52/// RouteTarget.kind ↔ ProviderRole mapping:
53///
54/// | RouteTarget.kind | required ProviderRole | disambiguator |
55/// |---|---|---|
56/// | `tool_provider` | `ToolProvider` | v1: ≤1 per module |
57/// | `management_surface` | `ManagementSurface` | v1: ≤1 per module |
58/// | `internal_service` | `InternalService` | `service_id` (multiple allowed) |
59///
60/// `ProviderRole::PipelineStage` is intentionally unroutable; pipeline modules
61/// are wired by an orchestrator rather than opened directly by clients.
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(tag = "kind", rename_all = "snake_case")]
64pub enum RouteTarget {
65    ToolProvider {
66        module_id: String,
67    },
68    ManagementSurface {
69        module_id: String,
70    },
71    InternalService {
72        module_id: String,
73        service_id: String,
74    },
75}
76
77/// Envelope protocol version this build speaks.
78pub const PROTOCOL_VERSION: u8 = 1;
79
80/// Env var subc sets on each supervised child telling it the module_id it is
81/// supervised under, so it can register under that id.
82pub const SUBC_MODULE_ID_ENV: &str = "SUBC_MODULE_ID";
83
84/// Env var subc sets, on each spawn of a `reserved` module only, to a fresh
85/// one-time launch nonce. The child echoes it in `ModuleHelloBody::launch_nonce`;
86/// subc accepts a reserved module_id's HELLO only when the nonce matches the one it
87/// last injected for that id. Non-reserved modules never receive it.
88pub const SUBC_LAUNCH_NONCE_ENV: &str = "SUBC_LAUNCH_NONCE";
89
90/// Fixed header length for `PROTOCOL_VERSION` 1.
91pub const HEADER_LEN: usize = 17;
92
93/// Bytes of the frozen prefix (`len` u32 + `ver` u8) that are stable across
94/// every envelope version. A reader needs only these to learn the version and
95/// thus the full header length.
96pub const FROZEN_PREFIX_LEN: usize = 5;
97
98/// Maximum v1 frame body accepted before allocation.
99///
100/// This 64 MiB starting cap prevents a malformed header from forcing an
101/// unbounded allocation. Future protocol versions can negotiate or encode a
102/// different cap while preserving the frozen prefix.
103pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
104
105/// Canonical JSON body for all subc-generated `ERROR` frames.
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
107pub struct ErrorBody {
108    pub code: String,
109    pub message: String,
110}
111
112/// Module-to-subc `HELLO` body used during module registration.
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub struct ModuleHelloBody {
115    pub manifest: manifest::ModuleManifest,
116    pub protocol_ver: u8,
117    #[serde(default)]
118    pub control_ops: Option<Vec<String>>,
119    /// One-time launch nonce, echoed back from the `SUBC_LAUNCH_NONCE` environment
120    /// variable the daemon injected when it spawned this process. Only a daemon-spawned
121    /// process for a `reserved` module receives a nonce; subc accepts a reserved
122    /// `module_id`'s HELLO only when this matches the nonce it last injected for that
123    /// id, so a different process cannot register as a reserved module while the real
124    /// one is down/restarting. Absent (`serde(default)`) for non-reserved modules and
125    /// self-connecting providers, which are never nonce-checked.
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub launch_nonce: Option<String>,
128}
129
130/// subc-to-module `HELLO_ACK` body used during module registration.
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
132pub struct ModuleHelloAckBody {
133    pub negotiated_ver: u8,
134    pub subc_ops: Vec<String>,
135    pub subc_capabilities: Vec<String>,
136    /// The module's resolved storage descriptor, when the daemon's central config
137    /// configures managed storage. Carried opaquely here (subc-protocol stays a
138    /// thin wire crate with no storage/database dependency); a module that uses
139    /// managed storage deserializes it into `cortexkit_store_types::StorageDescriptor`
140    /// and hands it to `cortexkit-store`. Absent when no storage is configured, and
141    /// `serde(default)` so an older module simply ignores it.
142    #[serde(default, skip_serializing_if = "Option::is_none")]
143    pub storage: Option<serde_json::Value>,
144}
145
146/// Frame kind (`type` byte at offset 5).
147///
148/// `CANCEL`, `PING`, `PONG`, and `GOODBYE` are pure-header frames (`len == 0`);
149/// only `HELLO`/`HELLO_ACK` and the RPC payloads carry bodies.
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151#[repr(u8)]
152pub enum FrameType {
153    Request = 0,
154    Response = 1,
155    Push = 2,
156    StreamData = 3,
157    StreamEnd = 4,
158    Error = 5,
159    Cancel = 6,
160    Ping = 7,
161    Pong = 8,
162    Hello = 9,
163    HelloAck = 10,
164    Goodbye = 11,
165}
166
167impl FrameType {
168    /// Map the raw `type` byte to a `FrameType`, or `None` if unknown.
169    pub fn from_u8(b: u8) -> Option<Self> {
170        Some(match b {
171            0 => Self::Request,
172            1 => Self::Response,
173            2 => Self::Push,
174            3 => Self::StreamData,
175            4 => Self::StreamEnd,
176            5 => Self::Error,
177            6 => Self::Cancel,
178            7 => Self::Ping,
179            8 => Self::Pong,
180            9 => Self::Hello,
181            10 => Self::HelloAck,
182            11 => Self::Goodbye,
183            _ => return None,
184        })
185    }
186
187    pub fn is_pure_header(self) -> bool {
188        matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
189    }
190}
191
192/// Scheduling priority carried in `flags` bits 1-2. subc schedules on this
193/// without parsing the body.
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195#[repr(u8)]
196pub enum Priority {
197    Passive = 0,
198    Interactive = 1,
199    Background = 2,
200}
201
202impl Priority {
203    fn from_bits(bits: u8) -> Option<Self> {
204        Some(match bits {
205            0 => Self::Passive,
206            1 => Self::Interactive,
207            2 => Self::Background,
208            _ => return None,
209        })
210    }
211}
212
213const FLAG_BINARY: u8 = 0b0000_0001; // bit 0
214const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; // bits 1-2
215const FLAG_PRIORITY_SHIFT: u8 = 1;
216const FLAG_LAST: u8 = 0b0000_1000; // bit 3
217const FLAG_RESERVED_MASK: u8 = 0b1111_0000; // bits 4-7 must be zero
218
219/// The `flags` byte (offset 6): `bit0 BINARY · bits1-2 PRIORITY · bit3 LAST`.
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub struct Flags(pub u8);
222
223impl Flags {
224    /// Build flags from typed components.
225    pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
226        let mut b = 0u8;
227        if binary {
228            b |= FLAG_BINARY;
229        }
230        b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
231        if last {
232            b |= FLAG_LAST;
233        }
234        Flags(b)
235    }
236
237    /// Body is raw bytes (bulk lane) rather than JSON-RPC.
238    pub fn is_binary(self) -> bool {
239        self.0 & FLAG_BINARY != 0
240    }
241
242    /// Final frame of a streamed message.
243    pub fn is_last(self) -> bool {
244        self.0 & FLAG_LAST != 0
245    }
246
247    /// Decode the priority bits, or `None` if they hold a reserved value.
248    pub fn priority(self) -> Option<Priority> {
249        Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
250    }
251
252    /// True if any reserved bit (4-7) is set — a malformed/forward frame.
253    pub fn has_reserved_bits(self) -> bool {
254        self.0 & FLAG_RESERVED_MASK != 0
255    }
256}
257
258/// A decoded envelope header. The body is the `len` bytes that follow it.
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub struct EnvelopeHeader {
261    /// Number of body bytes after the header.
262    pub len: u32,
263    /// Envelope version.
264    pub ver: u8,
265    /// Frame kind.
266    pub ty: FrameType,
267    /// Flag bits.
268    pub flags: Flags,
269    /// Route = (component, session); 0 = subc itself.
270    pub channel: u16,
271    /// Correlation id.
272    pub corr: u64,
273}
274
275impl EnvelopeHeader {
276    /// Serialize the header to its fixed 17-byte little-endian form.
277    pub fn encode(&self) -> [u8; HEADER_LEN] {
278        let mut buf = [0u8; HEADER_LEN];
279        buf[0..4].copy_from_slice(&self.len.to_le_bytes());
280        buf[4] = self.ver;
281        buf[5] = self.ty as u8;
282        buf[6] = self.flags.0;
283        buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
284        buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
285        buf
286    }
287}
288
289/// Why a header could not be decoded.
290#[derive(Debug, Clone, Copy, PartialEq, Eq)]
291pub enum DecodeError {
292    /// Fewer than `FROZEN_PREFIX_LEN` bytes — cannot even read `len`/`ver`.
293    TooShortForPrefix { have: usize },
294    /// `ver` is not a version this build understands.
295    UnsupportedVersion { ver: u8 },
296    /// Version known but fewer than its header length is present.
297    TooShortForHeader { have: usize, need: usize },
298    /// `type` byte is not a known `FrameType`.
299    UnknownFrameType { byte: u8 },
300    /// A reserved flag bit (4-7) is set.
301    ReservedFlagBits { flags: u8 },
302    /// Priority bits 1-2 hold the reserved value `0b11`.
303    ReservedPriorityBits { flags: u8 },
304    /// A pure-header frame declared body bytes.
305    PureHeaderFrameWithBody { ty: FrameType, len: u32 },
306}
307
308impl fmt::Display for DecodeError {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        match self {
311            Self::TooShortForPrefix { have } => {
312                write!(f, "header shorter than frozen prefix: have {have} bytes")
313            }
314            Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
315            Self::TooShortForHeader { have, need } => {
316                write!(
317                    f,
318                    "header too short for version: have {have} bytes, need {need}"
319                )
320            }
321            Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
322            Self::ReservedFlagBits { flags } => {
323                write!(f, "reserved flag bits set in flags 0b{flags:08b}")
324            }
325            Self::ReservedPriorityBits { flags } => {
326                write!(f, "reserved priority bits set in flags 0b{flags:08b}")
327            }
328            Self::PureHeaderFrameWithBody { ty, len } => {
329                write!(
330                    f,
331                    "pure-header frame {ty:?} declared non-zero body length {len}"
332                )
333            }
334        }
335    }
336}
337
338impl Error for DecodeError {}
339
340/// How many header bytes a given envelope version occupies. Driven by the
341/// frozen prefix: read `ver`, then learn the full header length here.
342fn header_len_for_version(ver: u8) -> Option<usize> {
343    match ver {
344        1 => Some(HEADER_LEN),
345        _ => None,
346    }
347}
348
349/// Decode an envelope header from the front of `bytes`, following the
350/// frozen-prefix discipline:
351/// 1. need at least the 5-byte prefix to read `len` + `ver`;
352/// 2. dispatch the full header length on `ver`;
353/// 3. need the full header present; then parse the rest.
354///
355/// Never panics on malformed input — returns a typed [`DecodeError`].
356pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
357    if bytes.len() < FROZEN_PREFIX_LEN {
358        return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
359    }
360    let ver = bytes[4];
361    let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
362    if bytes.len() < need {
363        return Err(DecodeError::TooShortForHeader {
364            have: bytes.len(),
365            need,
366        });
367    }
368
369    let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
370    let ty =
371        FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
372    let flags = Flags(bytes[6]);
373    if flags.has_reserved_bits() {
374        return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
375    }
376    if flags.priority().is_none() {
377        return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
378    }
379    if ty.is_pure_header() && len != 0 {
380        return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
381    }
382    let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
383    let corr = u64::from_le_bytes([
384        bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
385    ]);
386
387    Ok(EnvelopeHeader {
388        len,
389        ver,
390        ty,
391        flags,
392        channel,
393        corr,
394    })
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
402        EnvelopeHeader {
403            len,
404            ver: PROTOCOL_VERSION,
405            ty,
406            flags,
407            channel,
408            corr,
409        }
410    }
411
412    #[test]
413    fn bind_identity_round_trips_json() {
414        let identity = BindIdentity {
415            project_root: PathBuf::from("/tmp/project"),
416            harness: "opencode".to_string(),
417            session: "session-1".to_string(),
418        };
419
420        let encoded = serde_json::to_vec(&identity).unwrap();
421        let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
422
423        assert_eq!(decoded, identity);
424    }
425
426    #[test]
427    fn route_target_variants_round_trip_json() {
428        let targets = [
429            RouteTarget::ToolProvider {
430                module_id: "aft".to_string(),
431            },
432            RouteTarget::ManagementSurface {
433                module_id: "memory".to_string(),
434            },
435            RouteTarget::InternalService {
436                module_id: "bus".to_string(),
437                service_id: "dm".to_string(),
438            },
439        ];
440
441        for target in targets {
442            let encoded = serde_json::to_vec(&target).unwrap();
443            let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
444            assert_eq!(decoded, target);
445        }
446    }
447
448    #[test]
449    fn error_body_round_trips_json() {
450        let body = ErrorBody {
451            code: "config_divergence".to_string(),
452            message: "active config differs".to_string(),
453        };
454
455        let encoded = serde_json::to_vec(&body).unwrap();
456        let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
457
458        assert_eq!(decoded, body);
459    }
460
461    #[test]
462    fn round_trip_request() {
463        let h = hdr(
464            1234,
465            FrameType::Request,
466            Flags::new(false, Priority::Interactive, false),
467            42,
468            0xDEAD_BEEF_0000_0001,
469        );
470        let decoded = decode_header(&h.encode()).unwrap();
471        assert_eq!(h, decoded);
472    }
473
474    #[test]
475    fn round_trip_all_frame_types() {
476        for b in 0u8..=11 {
477            let ty = FrameType::from_u8(b).unwrap();
478            let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
479            assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
480        }
481    }
482
483    #[test]
484    fn pure_header_frame_has_zero_len() {
485        // CANCEL carries only header (len = 0) + the target corr.
486        let h = hdr(
487            0,
488            FrameType::Cancel,
489            Flags::new(false, Priority::Passive, false),
490            7,
491            99,
492        );
493        let d = decode_header(&h.encode()).unwrap();
494        assert_eq!(d.len, 0);
495        assert_eq!(d.corr, 99);
496    }
497
498    #[test]
499    fn flags_round_trip() {
500        let f = Flags::new(true, Priority::Background, true);
501        assert!(f.is_binary());
502        assert!(f.is_last());
503        assert_eq!(f.priority(), Some(Priority::Background));
504        let h = hdr(8, FrameType::StreamData, f, 1, 1);
505        assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
506    }
507
508    #[test]
509    fn little_endian_and_frozen_prefix_layout() {
510        // len = 1 occupies byte 0; ver sits at byte 4 (the frozen prefix).
511        let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
512        let buf = h.encode();
513        assert_eq!(buf[0], 1);
514        assert_eq!(buf[1..4], [0, 0, 0]);
515        assert_eq!(buf[4], PROTOCOL_VERSION); // ver @ offset 4
516        assert_eq!(buf.len(), HEADER_LEN);
517    }
518
519    #[test]
520    fn reject_too_short_for_prefix() {
521        assert_eq!(
522            decode_header(&[0, 0, 0, 0]),
523            Err(DecodeError::TooShortForPrefix { have: 4 })
524        );
525    }
526
527    #[test]
528    fn reject_too_short_for_header() {
529        // Valid 5-byte prefix (ver = 1) but header truncated.
530        let mut b = [0u8; 10];
531        b[4] = PROTOCOL_VERSION;
532        assert_eq!(
533            decode_header(&b),
534            Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
535        );
536    }
537
538    #[test]
539    fn reject_unsupported_version() {
540        let mut b = [0u8; HEADER_LEN];
541        b[4] = 2; // no header layout known for ver 2
542        assert_eq!(
543            decode_header(&b),
544            Err(DecodeError::UnsupportedVersion { ver: 2 })
545        );
546    }
547
548    #[test]
549    fn reject_unknown_frame_type() {
550        let mut b = [0u8; HEADER_LEN];
551        b[4] = PROTOCOL_VERSION;
552        b[5] = 99;
553        assert_eq!(
554            decode_header(&b),
555            Err(DecodeError::UnknownFrameType { byte: 99 })
556        );
557    }
558
559    #[test]
560    fn reject_reserved_flag_bits() {
561        let mut b = [0u8; HEADER_LEN];
562        b[4] = PROTOCOL_VERSION;
563        b[5] = FrameType::Request as u8;
564        b[6] = 0b1000_0000; // reserved bit 7 set
565        assert_eq!(
566            decode_header(&b),
567            Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
568        );
569    }
570
571    #[test]
572    fn reject_reserved_priority_bits() {
573        let mut b = [0u8; HEADER_LEN];
574        b[4] = PROTOCOL_VERSION;
575        b[5] = FrameType::Request as u8;
576        b[6] = 0b0000_0110; // priority bits 1-2 are reserved value 0b11
577        assert_eq!(
578            decode_header(&b),
579            Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
580        );
581    }
582
583    #[test]
584    fn reject_pure_header_frame_with_body_len() {
585        let h = hdr(
586            1,
587            FrameType::Ping,
588            Flags::new(false, Priority::Passive, false),
589            0,
590            1,
591        );
592        assert_eq!(
593            decode_header(&h.encode()),
594            Err(DecodeError::PureHeaderFrameWithBody {
595                ty: FrameType::Ping,
596                len: 1
597            })
598        );
599    }
600}