1#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod frame;
37pub mod manifest;
38pub mod session;
39
40pub use frame::{Frame, FrameBuildError};
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct BindIdentity {
45 pub project_root: PathBuf,
46 pub harness: String,
47 pub session: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(tag = "kind", rename_all = "snake_case")]
64pub enum RouteTarget {
65 ToolProvider {
66 module_id: String,
67 },
68 ManagementSurface {
69 module_id: String,
70 },
71 InternalService {
72 module_id: String,
73 service_id: String,
74 },
75}
76
77pub const PROTOCOL_VERSION: u8 = 1;
79
80pub const SUBC_MODULE_ID_ENV: &str = "SUBC_MODULE_ID";
83
84pub const HEADER_LEN: usize = 17;
86
87pub const FROZEN_PREFIX_LEN: usize = 5;
91
92pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct ErrorBody {
102 pub code: String,
103 pub message: String,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
108pub struct ModuleHelloBody {
109 pub manifest: manifest::ModuleManifest,
110 pub protocol_ver: u8,
111 #[serde(default)]
112 pub control_ops: Option<Vec<String>>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
117pub struct ModuleHelloAckBody {
118 pub negotiated_ver: u8,
119 pub subc_ops: Vec<String>,
120 pub subc_capabilities: Vec<String>,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128#[repr(u8)]
129pub enum FrameType {
130 Request = 0,
131 Response = 1,
132 Push = 2,
133 StreamData = 3,
134 StreamEnd = 4,
135 Error = 5,
136 Cancel = 6,
137 Ping = 7,
138 Pong = 8,
139 Hello = 9,
140 HelloAck = 10,
141 Goodbye = 11,
142}
143
144impl FrameType {
145 pub fn from_u8(b: u8) -> Option<Self> {
147 Some(match b {
148 0 => Self::Request,
149 1 => Self::Response,
150 2 => Self::Push,
151 3 => Self::StreamData,
152 4 => Self::StreamEnd,
153 5 => Self::Error,
154 6 => Self::Cancel,
155 7 => Self::Ping,
156 8 => Self::Pong,
157 9 => Self::Hello,
158 10 => Self::HelloAck,
159 11 => Self::Goodbye,
160 _ => return None,
161 })
162 }
163
164 pub fn is_pure_header(self) -> bool {
165 matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
166 }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172#[repr(u8)]
173pub enum Priority {
174 Passive = 0,
175 Interactive = 1,
176 Background = 2,
177}
178
179impl Priority {
180 fn from_bits(bits: u8) -> Option<Self> {
181 Some(match bits {
182 0 => Self::Passive,
183 1 => Self::Interactive,
184 2 => Self::Background,
185 _ => return None,
186 })
187 }
188}
189
190const FLAG_BINARY: u8 = 0b0000_0001; const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; const FLAG_PRIORITY_SHIFT: u8 = 1;
193const FLAG_LAST: u8 = 0b0000_1000; const FLAG_RESERVED_MASK: u8 = 0b1111_0000; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub struct Flags(pub u8);
199
200impl Flags {
201 pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
203 let mut b = 0u8;
204 if binary {
205 b |= FLAG_BINARY;
206 }
207 b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
208 if last {
209 b |= FLAG_LAST;
210 }
211 Flags(b)
212 }
213
214 pub fn is_binary(self) -> bool {
216 self.0 & FLAG_BINARY != 0
217 }
218
219 pub fn is_last(self) -> bool {
221 self.0 & FLAG_LAST != 0
222 }
223
224 pub fn priority(self) -> Option<Priority> {
226 Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
227 }
228
229 pub fn has_reserved_bits(self) -> bool {
231 self.0 & FLAG_RESERVED_MASK != 0
232 }
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct EnvelopeHeader {
238 pub len: u32,
240 pub ver: u8,
242 pub ty: FrameType,
244 pub flags: Flags,
246 pub channel: u16,
248 pub corr: u64,
250}
251
252impl EnvelopeHeader {
253 pub fn encode(&self) -> [u8; HEADER_LEN] {
255 let mut buf = [0u8; HEADER_LEN];
256 buf[0..4].copy_from_slice(&self.len.to_le_bytes());
257 buf[4] = self.ver;
258 buf[5] = self.ty as u8;
259 buf[6] = self.flags.0;
260 buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
261 buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
262 buf
263 }
264}
265
266#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub enum DecodeError {
269 TooShortForPrefix { have: usize },
271 UnsupportedVersion { ver: u8 },
273 TooShortForHeader { have: usize, need: usize },
275 UnknownFrameType { byte: u8 },
277 ReservedFlagBits { flags: u8 },
279 ReservedPriorityBits { flags: u8 },
281 PureHeaderFrameWithBody { ty: FrameType, len: u32 },
283}
284
285impl fmt::Display for DecodeError {
286 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287 match self {
288 Self::TooShortForPrefix { have } => {
289 write!(f, "header shorter than frozen prefix: have {have} bytes")
290 }
291 Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
292 Self::TooShortForHeader { have, need } => {
293 write!(
294 f,
295 "header too short for version: have {have} bytes, need {need}"
296 )
297 }
298 Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
299 Self::ReservedFlagBits { flags } => {
300 write!(f, "reserved flag bits set in flags 0b{flags:08b}")
301 }
302 Self::ReservedPriorityBits { flags } => {
303 write!(f, "reserved priority bits set in flags 0b{flags:08b}")
304 }
305 Self::PureHeaderFrameWithBody { ty, len } => {
306 write!(
307 f,
308 "pure-header frame {ty:?} declared non-zero body length {len}"
309 )
310 }
311 }
312 }
313}
314
315impl Error for DecodeError {}
316
317fn header_len_for_version(ver: u8) -> Option<usize> {
320 match ver {
321 1 => Some(HEADER_LEN),
322 _ => None,
323 }
324}
325
326pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
334 if bytes.len() < FROZEN_PREFIX_LEN {
335 return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
336 }
337 let ver = bytes[4];
338 let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
339 if bytes.len() < need {
340 return Err(DecodeError::TooShortForHeader {
341 have: bytes.len(),
342 need,
343 });
344 }
345
346 let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
347 let ty =
348 FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
349 let flags = Flags(bytes[6]);
350 if flags.has_reserved_bits() {
351 return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
352 }
353 if flags.priority().is_none() {
354 return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
355 }
356 if ty.is_pure_header() && len != 0 {
357 return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
358 }
359 let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
360 let corr = u64::from_le_bytes([
361 bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
362 ]);
363
364 Ok(EnvelopeHeader {
365 len,
366 ver,
367 ty,
368 flags,
369 channel,
370 corr,
371 })
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
379 EnvelopeHeader {
380 len,
381 ver: PROTOCOL_VERSION,
382 ty,
383 flags,
384 channel,
385 corr,
386 }
387 }
388
389 #[test]
390 fn bind_identity_round_trips_json() {
391 let identity = BindIdentity {
392 project_root: PathBuf::from("/tmp/project"),
393 harness: "opencode".to_string(),
394 session: "session-1".to_string(),
395 };
396
397 let encoded = serde_json::to_vec(&identity).unwrap();
398 let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
399
400 assert_eq!(decoded, identity);
401 }
402
403 #[test]
404 fn route_target_variants_round_trip_json() {
405 let targets = [
406 RouteTarget::ToolProvider {
407 module_id: "aft".to_string(),
408 },
409 RouteTarget::ManagementSurface {
410 module_id: "memory".to_string(),
411 },
412 RouteTarget::InternalService {
413 module_id: "bus".to_string(),
414 service_id: "dm".to_string(),
415 },
416 ];
417
418 for target in targets {
419 let encoded = serde_json::to_vec(&target).unwrap();
420 let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
421 assert_eq!(decoded, target);
422 }
423 }
424
425 #[test]
426 fn error_body_round_trips_json() {
427 let body = ErrorBody {
428 code: "config_divergence".to_string(),
429 message: "active config differs".to_string(),
430 };
431
432 let encoded = serde_json::to_vec(&body).unwrap();
433 let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
434
435 assert_eq!(decoded, body);
436 }
437
438 #[test]
439 fn round_trip_request() {
440 let h = hdr(
441 1234,
442 FrameType::Request,
443 Flags::new(false, Priority::Interactive, false),
444 42,
445 0xDEAD_BEEF_0000_0001,
446 );
447 let decoded = decode_header(&h.encode()).unwrap();
448 assert_eq!(h, decoded);
449 }
450
451 #[test]
452 fn round_trip_all_frame_types() {
453 for b in 0u8..=11 {
454 let ty = FrameType::from_u8(b).unwrap();
455 let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
456 assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
457 }
458 }
459
460 #[test]
461 fn pure_header_frame_has_zero_len() {
462 let h = hdr(
464 0,
465 FrameType::Cancel,
466 Flags::new(false, Priority::Passive, false),
467 7,
468 99,
469 );
470 let d = decode_header(&h.encode()).unwrap();
471 assert_eq!(d.len, 0);
472 assert_eq!(d.corr, 99);
473 }
474
475 #[test]
476 fn flags_round_trip() {
477 let f = Flags::new(true, Priority::Background, true);
478 assert!(f.is_binary());
479 assert!(f.is_last());
480 assert_eq!(f.priority(), Some(Priority::Background));
481 let h = hdr(8, FrameType::StreamData, f, 1, 1);
482 assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
483 }
484
485 #[test]
486 fn little_endian_and_frozen_prefix_layout() {
487 let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
489 let buf = h.encode();
490 assert_eq!(buf[0], 1);
491 assert_eq!(buf[1..4], [0, 0, 0]);
492 assert_eq!(buf[4], PROTOCOL_VERSION); assert_eq!(buf.len(), HEADER_LEN);
494 }
495
496 #[test]
497 fn reject_too_short_for_prefix() {
498 assert_eq!(
499 decode_header(&[0, 0, 0, 0]),
500 Err(DecodeError::TooShortForPrefix { have: 4 })
501 );
502 }
503
504 #[test]
505 fn reject_too_short_for_header() {
506 let mut b = [0u8; 10];
508 b[4] = PROTOCOL_VERSION;
509 assert_eq!(
510 decode_header(&b),
511 Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
512 );
513 }
514
515 #[test]
516 fn reject_unsupported_version() {
517 let mut b = [0u8; HEADER_LEN];
518 b[4] = 2; assert_eq!(
520 decode_header(&b),
521 Err(DecodeError::UnsupportedVersion { ver: 2 })
522 );
523 }
524
525 #[test]
526 fn reject_unknown_frame_type() {
527 let mut b = [0u8; HEADER_LEN];
528 b[4] = PROTOCOL_VERSION;
529 b[5] = 99;
530 assert_eq!(
531 decode_header(&b),
532 Err(DecodeError::UnknownFrameType { byte: 99 })
533 );
534 }
535
536 #[test]
537 fn reject_reserved_flag_bits() {
538 let mut b = [0u8; HEADER_LEN];
539 b[4] = PROTOCOL_VERSION;
540 b[5] = FrameType::Request as u8;
541 b[6] = 0b1000_0000; assert_eq!(
543 decode_header(&b),
544 Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
545 );
546 }
547
548 #[test]
549 fn reject_reserved_priority_bits() {
550 let mut b = [0u8; HEADER_LEN];
551 b[4] = PROTOCOL_VERSION;
552 b[5] = FrameType::Request as u8;
553 b[6] = 0b0000_0110; assert_eq!(
555 decode_header(&b),
556 Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
557 );
558 }
559
560 #[test]
561 fn reject_pure_header_frame_with_body_len() {
562 let h = hdr(
563 1,
564 FrameType::Ping,
565 Flags::new(false, Priority::Passive, false),
566 0,
567 1,
568 );
569 assert_eq!(
570 decode_header(&h.encode()),
571 Err(DecodeError::PureHeaderFrameWithBody {
572 ty: FrameType::Ping,
573 len: 1
574 })
575 );
576 }
577}