1#![forbid(unsafe_code)]
31
32use std::{error::Error, fmt, path::PathBuf};
33
34use serde::{Deserialize, Serialize};
35
36pub mod manifest;
37pub mod session;
38
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
41pub struct BindIdentity {
42 pub project_root: PathBuf,
43 pub harness: String,
44 pub session: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "kind", rename_all = "snake_case")]
61pub enum RouteTarget {
62 ToolProvider {
63 module_id: String,
64 },
65 ManagementSurface {
66 module_id: String,
67 },
68 InternalService {
69 module_id: String,
70 service_id: String,
71 },
72}
73
74pub const PROTOCOL_VERSION: u8 = 1;
76
77pub const HEADER_LEN: usize = 17;
79
80pub const FROZEN_PREFIX_LEN: usize = 5;
84
85pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
91
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
94pub struct ErrorBody {
95 pub code: String,
96 pub message: String,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct ModuleHelloBody {
102 pub manifest: manifest::ModuleManifest,
103 pub protocol_ver: u8,
104 #[serde(default)]
105 pub control_ops: Option<Vec<String>>,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110pub struct ModuleHelloAckBody {
111 pub negotiated_ver: u8,
112 pub subc_ops: Vec<String>,
113 pub subc_capabilities: Vec<String>,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121#[repr(u8)]
122pub enum FrameType {
123 Request = 0,
124 Response = 1,
125 Push = 2,
126 StreamData = 3,
127 StreamEnd = 4,
128 Error = 5,
129 Cancel = 6,
130 Ping = 7,
131 Pong = 8,
132 Hello = 9,
133 HelloAck = 10,
134 Goodbye = 11,
135}
136
137impl FrameType {
138 pub fn from_u8(b: u8) -> Option<Self> {
140 Some(match b {
141 0 => Self::Request,
142 1 => Self::Response,
143 2 => Self::Push,
144 3 => Self::StreamData,
145 4 => Self::StreamEnd,
146 5 => Self::Error,
147 6 => Self::Cancel,
148 7 => Self::Ping,
149 8 => Self::Pong,
150 9 => Self::Hello,
151 10 => Self::HelloAck,
152 11 => Self::Goodbye,
153 _ => return None,
154 })
155 }
156
157 pub fn is_pure_header(self) -> bool {
158 matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
159 }
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165#[repr(u8)]
166pub enum Priority {
167 Passive = 0,
168 Interactive = 1,
169 Background = 2,
170}
171
172impl Priority {
173 fn from_bits(bits: u8) -> Option<Self> {
174 Some(match bits {
175 0 => Self::Passive,
176 1 => Self::Interactive,
177 2 => Self::Background,
178 _ => return None,
179 })
180 }
181}
182
183const FLAG_BINARY: u8 = 0b0000_0001; const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; const FLAG_PRIORITY_SHIFT: u8 = 1;
186const FLAG_LAST: u8 = 0b0000_1000; const FLAG_RESERVED_MASK: u8 = 0b1111_0000; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub struct Flags(pub u8);
192
193impl Flags {
194 pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
196 let mut b = 0u8;
197 if binary {
198 b |= FLAG_BINARY;
199 }
200 b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
201 if last {
202 b |= FLAG_LAST;
203 }
204 Flags(b)
205 }
206
207 pub fn is_binary(self) -> bool {
209 self.0 & FLAG_BINARY != 0
210 }
211
212 pub fn is_last(self) -> bool {
214 self.0 & FLAG_LAST != 0
215 }
216
217 pub fn priority(self) -> Option<Priority> {
219 Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
220 }
221
222 pub fn has_reserved_bits(self) -> bool {
224 self.0 & FLAG_RESERVED_MASK != 0
225 }
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub struct EnvelopeHeader {
231 pub len: u32,
233 pub ver: u8,
235 pub ty: FrameType,
237 pub flags: Flags,
239 pub channel: u16,
241 pub corr: u64,
243}
244
245impl EnvelopeHeader {
246 pub fn encode(&self) -> [u8; HEADER_LEN] {
248 let mut buf = [0u8; HEADER_LEN];
249 buf[0..4].copy_from_slice(&self.len.to_le_bytes());
250 buf[4] = self.ver;
251 buf[5] = self.ty as u8;
252 buf[6] = self.flags.0;
253 buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
254 buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
255 buf
256 }
257}
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
261pub enum DecodeError {
262 TooShortForPrefix { have: usize },
264 UnsupportedVersion { ver: u8 },
266 TooShortForHeader { have: usize, need: usize },
268 UnknownFrameType { byte: u8 },
270 ReservedFlagBits { flags: u8 },
272 ReservedPriorityBits { flags: u8 },
274 PureHeaderFrameWithBody { ty: FrameType, len: u32 },
276}
277
278impl fmt::Display for DecodeError {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 match self {
281 Self::TooShortForPrefix { have } => {
282 write!(f, "header shorter than frozen prefix: have {have} bytes")
283 }
284 Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
285 Self::TooShortForHeader { have, need } => {
286 write!(
287 f,
288 "header too short for version: have {have} bytes, need {need}"
289 )
290 }
291 Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
292 Self::ReservedFlagBits { flags } => {
293 write!(f, "reserved flag bits set in flags 0b{flags:08b}")
294 }
295 Self::ReservedPriorityBits { flags } => {
296 write!(f, "reserved priority bits set in flags 0b{flags:08b}")
297 }
298 Self::PureHeaderFrameWithBody { ty, len } => {
299 write!(
300 f,
301 "pure-header frame {ty:?} declared non-zero body length {len}"
302 )
303 }
304 }
305 }
306}
307
308impl Error for DecodeError {}
309
310fn header_len_for_version(ver: u8) -> Option<usize> {
313 match ver {
314 1 => Some(HEADER_LEN),
315 _ => None,
316 }
317}
318
319pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
327 if bytes.len() < FROZEN_PREFIX_LEN {
328 return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
329 }
330 let ver = bytes[4];
331 let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
332 if bytes.len() < need {
333 return Err(DecodeError::TooShortForHeader {
334 have: bytes.len(),
335 need,
336 });
337 }
338
339 let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
340 let ty =
341 FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
342 let flags = Flags(bytes[6]);
343 if flags.has_reserved_bits() {
344 return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
345 }
346 if flags.priority().is_none() {
347 return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
348 }
349 if ty.is_pure_header() && len != 0 {
350 return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
351 }
352 let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
353 let corr = u64::from_le_bytes([
354 bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
355 ]);
356
357 Ok(EnvelopeHeader {
358 len,
359 ver,
360 ty,
361 flags,
362 channel,
363 corr,
364 })
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
372 EnvelopeHeader {
373 len,
374 ver: PROTOCOL_VERSION,
375 ty,
376 flags,
377 channel,
378 corr,
379 }
380 }
381
382 #[test]
383 fn bind_identity_round_trips_json() {
384 let identity = BindIdentity {
385 project_root: PathBuf::from("/tmp/project"),
386 harness: "opencode".to_string(),
387 session: "session-1".to_string(),
388 };
389
390 let encoded = serde_json::to_vec(&identity).unwrap();
391 let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
392
393 assert_eq!(decoded, identity);
394 }
395
396 #[test]
397 fn route_target_variants_round_trip_json() {
398 let targets = [
399 RouteTarget::ToolProvider {
400 module_id: "aft".to_string(),
401 },
402 RouteTarget::ManagementSurface {
403 module_id: "memory".to_string(),
404 },
405 RouteTarget::InternalService {
406 module_id: "bus".to_string(),
407 service_id: "dm".to_string(),
408 },
409 ];
410
411 for target in targets {
412 let encoded = serde_json::to_vec(&target).unwrap();
413 let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
414 assert_eq!(decoded, target);
415 }
416 }
417
418 #[test]
419 fn error_body_round_trips_json() {
420 let body = ErrorBody {
421 code: "config_divergence".to_string(),
422 message: "active config differs".to_string(),
423 };
424
425 let encoded = serde_json::to_vec(&body).unwrap();
426 let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
427
428 assert_eq!(decoded, body);
429 }
430
431 #[test]
432 fn round_trip_request() {
433 let h = hdr(
434 1234,
435 FrameType::Request,
436 Flags::new(false, Priority::Interactive, false),
437 42,
438 0xDEAD_BEEF_0000_0001,
439 );
440 let decoded = decode_header(&h.encode()).unwrap();
441 assert_eq!(h, decoded);
442 }
443
444 #[test]
445 fn round_trip_all_frame_types() {
446 for b in 0u8..=11 {
447 let ty = FrameType::from_u8(b).unwrap();
448 let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
449 assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
450 }
451 }
452
453 #[test]
454 fn pure_header_frame_has_zero_len() {
455 let h = hdr(
457 0,
458 FrameType::Cancel,
459 Flags::new(false, Priority::Passive, false),
460 7,
461 99,
462 );
463 let d = decode_header(&h.encode()).unwrap();
464 assert_eq!(d.len, 0);
465 assert_eq!(d.corr, 99);
466 }
467
468 #[test]
469 fn flags_round_trip() {
470 let f = Flags::new(true, Priority::Background, true);
471 assert!(f.is_binary());
472 assert!(f.is_last());
473 assert_eq!(f.priority(), Some(Priority::Background));
474 let h = hdr(8, FrameType::StreamData, f, 1, 1);
475 assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
476 }
477
478 #[test]
479 fn little_endian_and_frozen_prefix_layout() {
480 let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
482 let buf = h.encode();
483 assert_eq!(buf[0], 1);
484 assert_eq!(buf[1..4], [0, 0, 0]);
485 assert_eq!(buf[4], PROTOCOL_VERSION); assert_eq!(buf.len(), HEADER_LEN);
487 }
488
489 #[test]
490 fn reject_too_short_for_prefix() {
491 assert_eq!(
492 decode_header(&[0, 0, 0, 0]),
493 Err(DecodeError::TooShortForPrefix { have: 4 })
494 );
495 }
496
497 #[test]
498 fn reject_too_short_for_header() {
499 let mut b = [0u8; 10];
501 b[4] = PROTOCOL_VERSION;
502 assert_eq!(
503 decode_header(&b),
504 Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
505 );
506 }
507
508 #[test]
509 fn reject_unsupported_version() {
510 let mut b = [0u8; HEADER_LEN];
511 b[4] = 2; assert_eq!(
513 decode_header(&b),
514 Err(DecodeError::UnsupportedVersion { ver: 2 })
515 );
516 }
517
518 #[test]
519 fn reject_unknown_frame_type() {
520 let mut b = [0u8; HEADER_LEN];
521 b[4] = PROTOCOL_VERSION;
522 b[5] = 99;
523 assert_eq!(
524 decode_header(&b),
525 Err(DecodeError::UnknownFrameType { byte: 99 })
526 );
527 }
528
529 #[test]
530 fn reject_reserved_flag_bits() {
531 let mut b = [0u8; HEADER_LEN];
532 b[4] = PROTOCOL_VERSION;
533 b[5] = FrameType::Request as u8;
534 b[6] = 0b1000_0000; assert_eq!(
536 decode_header(&b),
537 Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
538 );
539 }
540
541 #[test]
542 fn reject_reserved_priority_bits() {
543 let mut b = [0u8; HEADER_LEN];
544 b[4] = PROTOCOL_VERSION;
545 b[5] = FrameType::Request as u8;
546 b[6] = 0b0000_0110; assert_eq!(
548 decode_header(&b),
549 Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
550 );
551 }
552
553 #[test]
554 fn reject_pure_header_frame_with_body_len() {
555 let h = hdr(
556 1,
557 FrameType::Ping,
558 Flags::new(false, Priority::Passive, false),
559 0,
560 1,
561 );
562 assert_eq!(
563 decode_header(&h.encode()),
564 Err(DecodeError::PureHeaderFrameWithBody {
565 ty: FrameType::Ping,
566 len: 1
567 })
568 );
569 }
570}