1#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod frame;
37pub mod manifest;
38pub mod session;
39
40pub use frame::{Frame, FrameBuildError};
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct BindIdentity {
45 pub project_root: PathBuf,
46 pub harness: String,
47 pub session: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(tag = "kind", rename_all = "snake_case")]
64pub enum RouteTarget {
65 ToolProvider {
66 module_id: String,
67 },
68 ManagementSurface {
69 module_id: String,
70 },
71 InternalService {
72 module_id: String,
73 service_id: String,
74 },
75}
76
77pub const PROTOCOL_VERSION: u8 = 1;
79
80pub const HEADER_LEN: usize = 17;
82
83pub const FROZEN_PREFIX_LEN: usize = 5;
87
88pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97pub struct ErrorBody {
98 pub code: String,
99 pub message: String,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
104pub struct ModuleHelloBody {
105 pub manifest: manifest::ModuleManifest,
106 pub protocol_ver: u8,
107 #[serde(default)]
108 pub control_ops: Option<Vec<String>>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
113pub struct ModuleHelloAckBody {
114 pub negotiated_ver: u8,
115 pub subc_ops: Vec<String>,
116 pub subc_capabilities: Vec<String>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124#[repr(u8)]
125pub enum FrameType {
126 Request = 0,
127 Response = 1,
128 Push = 2,
129 StreamData = 3,
130 StreamEnd = 4,
131 Error = 5,
132 Cancel = 6,
133 Ping = 7,
134 Pong = 8,
135 Hello = 9,
136 HelloAck = 10,
137 Goodbye = 11,
138}
139
140impl FrameType {
141 pub fn from_u8(b: u8) -> Option<Self> {
143 Some(match b {
144 0 => Self::Request,
145 1 => Self::Response,
146 2 => Self::Push,
147 3 => Self::StreamData,
148 4 => Self::StreamEnd,
149 5 => Self::Error,
150 6 => Self::Cancel,
151 7 => Self::Ping,
152 8 => Self::Pong,
153 9 => Self::Hello,
154 10 => Self::HelloAck,
155 11 => Self::Goodbye,
156 _ => return None,
157 })
158 }
159
160 pub fn is_pure_header(self) -> bool {
161 matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168#[repr(u8)]
169pub enum Priority {
170 Passive = 0,
171 Interactive = 1,
172 Background = 2,
173}
174
175impl Priority {
176 fn from_bits(bits: u8) -> Option<Self> {
177 Some(match bits {
178 0 => Self::Passive,
179 1 => Self::Interactive,
180 2 => Self::Background,
181 _ => return None,
182 })
183 }
184}
185
186const FLAG_BINARY: u8 = 0b0000_0001; const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; const FLAG_PRIORITY_SHIFT: u8 = 1;
189const FLAG_LAST: u8 = 0b0000_1000; const FLAG_RESERVED_MASK: u8 = 0b1111_0000; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub struct Flags(pub u8);
195
196impl Flags {
197 pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
199 let mut b = 0u8;
200 if binary {
201 b |= FLAG_BINARY;
202 }
203 b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
204 if last {
205 b |= FLAG_LAST;
206 }
207 Flags(b)
208 }
209
210 pub fn is_binary(self) -> bool {
212 self.0 & FLAG_BINARY != 0
213 }
214
215 pub fn is_last(self) -> bool {
217 self.0 & FLAG_LAST != 0
218 }
219
220 pub fn priority(self) -> Option<Priority> {
222 Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
223 }
224
225 pub fn has_reserved_bits(self) -> bool {
227 self.0 & FLAG_RESERVED_MASK != 0
228 }
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct EnvelopeHeader {
234 pub len: u32,
236 pub ver: u8,
238 pub ty: FrameType,
240 pub flags: Flags,
242 pub channel: u16,
244 pub corr: u64,
246}
247
248impl EnvelopeHeader {
249 pub fn encode(&self) -> [u8; HEADER_LEN] {
251 let mut buf = [0u8; HEADER_LEN];
252 buf[0..4].copy_from_slice(&self.len.to_le_bytes());
253 buf[4] = self.ver;
254 buf[5] = self.ty as u8;
255 buf[6] = self.flags.0;
256 buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
257 buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
258 buf
259 }
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum DecodeError {
265 TooShortForPrefix { have: usize },
267 UnsupportedVersion { ver: u8 },
269 TooShortForHeader { have: usize, need: usize },
271 UnknownFrameType { byte: u8 },
273 ReservedFlagBits { flags: u8 },
275 ReservedPriorityBits { flags: u8 },
277 PureHeaderFrameWithBody { ty: FrameType, len: u32 },
279}
280
281impl fmt::Display for DecodeError {
282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283 match self {
284 Self::TooShortForPrefix { have } => {
285 write!(f, "header shorter than frozen prefix: have {have} bytes")
286 }
287 Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
288 Self::TooShortForHeader { have, need } => {
289 write!(
290 f,
291 "header too short for version: have {have} bytes, need {need}"
292 )
293 }
294 Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
295 Self::ReservedFlagBits { flags } => {
296 write!(f, "reserved flag bits set in flags 0b{flags:08b}")
297 }
298 Self::ReservedPriorityBits { flags } => {
299 write!(f, "reserved priority bits set in flags 0b{flags:08b}")
300 }
301 Self::PureHeaderFrameWithBody { ty, len } => {
302 write!(
303 f,
304 "pure-header frame {ty:?} declared non-zero body length {len}"
305 )
306 }
307 }
308 }
309}
310
311impl Error for DecodeError {}
312
313fn header_len_for_version(ver: u8) -> Option<usize> {
316 match ver {
317 1 => Some(HEADER_LEN),
318 _ => None,
319 }
320}
321
322pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
330 if bytes.len() < FROZEN_PREFIX_LEN {
331 return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
332 }
333 let ver = bytes[4];
334 let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
335 if bytes.len() < need {
336 return Err(DecodeError::TooShortForHeader {
337 have: bytes.len(),
338 need,
339 });
340 }
341
342 let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
343 let ty =
344 FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
345 let flags = Flags(bytes[6]);
346 if flags.has_reserved_bits() {
347 return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
348 }
349 if flags.priority().is_none() {
350 return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
351 }
352 if ty.is_pure_header() && len != 0 {
353 return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
354 }
355 let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
356 let corr = u64::from_le_bytes([
357 bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
358 ]);
359
360 Ok(EnvelopeHeader {
361 len,
362 ver,
363 ty,
364 flags,
365 channel,
366 corr,
367 })
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
375 EnvelopeHeader {
376 len,
377 ver: PROTOCOL_VERSION,
378 ty,
379 flags,
380 channel,
381 corr,
382 }
383 }
384
385 #[test]
386 fn bind_identity_round_trips_json() {
387 let identity = BindIdentity {
388 project_root: PathBuf::from("/tmp/project"),
389 harness: "opencode".to_string(),
390 session: "session-1".to_string(),
391 };
392
393 let encoded = serde_json::to_vec(&identity).unwrap();
394 let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
395
396 assert_eq!(decoded, identity);
397 }
398
399 #[test]
400 fn route_target_variants_round_trip_json() {
401 let targets = [
402 RouteTarget::ToolProvider {
403 module_id: "aft".to_string(),
404 },
405 RouteTarget::ManagementSurface {
406 module_id: "memory".to_string(),
407 },
408 RouteTarget::InternalService {
409 module_id: "bus".to_string(),
410 service_id: "dm".to_string(),
411 },
412 ];
413
414 for target in targets {
415 let encoded = serde_json::to_vec(&target).unwrap();
416 let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
417 assert_eq!(decoded, target);
418 }
419 }
420
421 #[test]
422 fn error_body_round_trips_json() {
423 let body = ErrorBody {
424 code: "config_divergence".to_string(),
425 message: "active config differs".to_string(),
426 };
427
428 let encoded = serde_json::to_vec(&body).unwrap();
429 let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
430
431 assert_eq!(decoded, body);
432 }
433
434 #[test]
435 fn round_trip_request() {
436 let h = hdr(
437 1234,
438 FrameType::Request,
439 Flags::new(false, Priority::Interactive, false),
440 42,
441 0xDEAD_BEEF_0000_0001,
442 );
443 let decoded = decode_header(&h.encode()).unwrap();
444 assert_eq!(h, decoded);
445 }
446
447 #[test]
448 fn round_trip_all_frame_types() {
449 for b in 0u8..=11 {
450 let ty = FrameType::from_u8(b).unwrap();
451 let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
452 assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
453 }
454 }
455
456 #[test]
457 fn pure_header_frame_has_zero_len() {
458 let h = hdr(
460 0,
461 FrameType::Cancel,
462 Flags::new(false, Priority::Passive, false),
463 7,
464 99,
465 );
466 let d = decode_header(&h.encode()).unwrap();
467 assert_eq!(d.len, 0);
468 assert_eq!(d.corr, 99);
469 }
470
471 #[test]
472 fn flags_round_trip() {
473 let f = Flags::new(true, Priority::Background, true);
474 assert!(f.is_binary());
475 assert!(f.is_last());
476 assert_eq!(f.priority(), Some(Priority::Background));
477 let h = hdr(8, FrameType::StreamData, f, 1, 1);
478 assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
479 }
480
481 #[test]
482 fn little_endian_and_frozen_prefix_layout() {
483 let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
485 let buf = h.encode();
486 assert_eq!(buf[0], 1);
487 assert_eq!(buf[1..4], [0, 0, 0]);
488 assert_eq!(buf[4], PROTOCOL_VERSION); assert_eq!(buf.len(), HEADER_LEN);
490 }
491
492 #[test]
493 fn reject_too_short_for_prefix() {
494 assert_eq!(
495 decode_header(&[0, 0, 0, 0]),
496 Err(DecodeError::TooShortForPrefix { have: 4 })
497 );
498 }
499
500 #[test]
501 fn reject_too_short_for_header() {
502 let mut b = [0u8; 10];
504 b[4] = PROTOCOL_VERSION;
505 assert_eq!(
506 decode_header(&b),
507 Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
508 );
509 }
510
511 #[test]
512 fn reject_unsupported_version() {
513 let mut b = [0u8; HEADER_LEN];
514 b[4] = 2; assert_eq!(
516 decode_header(&b),
517 Err(DecodeError::UnsupportedVersion { ver: 2 })
518 );
519 }
520
521 #[test]
522 fn reject_unknown_frame_type() {
523 let mut b = [0u8; HEADER_LEN];
524 b[4] = PROTOCOL_VERSION;
525 b[5] = 99;
526 assert_eq!(
527 decode_header(&b),
528 Err(DecodeError::UnknownFrameType { byte: 99 })
529 );
530 }
531
532 #[test]
533 fn reject_reserved_flag_bits() {
534 let mut b = [0u8; HEADER_LEN];
535 b[4] = PROTOCOL_VERSION;
536 b[5] = FrameType::Request as u8;
537 b[6] = 0b1000_0000; assert_eq!(
539 decode_header(&b),
540 Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
541 );
542 }
543
544 #[test]
545 fn reject_reserved_priority_bits() {
546 let mut b = [0u8; HEADER_LEN];
547 b[4] = PROTOCOL_VERSION;
548 b[5] = FrameType::Request as u8;
549 b[6] = 0b0000_0110; assert_eq!(
551 decode_header(&b),
552 Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
553 );
554 }
555
556 #[test]
557 fn reject_pure_header_frame_with_body_len() {
558 let h = hdr(
559 1,
560 FrameType::Ping,
561 Flags::new(false, Priority::Passive, false),
562 0,
563 1,
564 );
565 assert_eq!(
566 decode_header(&h.encode()),
567 Err(DecodeError::PureHeaderFrameWithBody {
568 ty: FrameType::Ping,
569 len: 1
570 })
571 );
572 }
573}