#![forbid(unsafe_code)]
use std::{error::Error, fmt, path::PathBuf};
use serde::{Deserialize, Serialize};
pub mod frame;
pub mod manifest;
pub mod session;
pub use frame::{Frame, FrameBuildError};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct BindIdentity {
pub project_root: PathBuf,
pub harness: String,
pub session: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum RouteTarget {
ToolProvider {
module_id: String,
},
ManagementSurface {
module_id: String,
},
InternalService {
module_id: String,
service_id: String,
},
}
pub const PROTOCOL_VERSION: u8 = 1;
pub const SUBC_MODULE_ID_ENV: &str = "SUBC_MODULE_ID";
pub const HEADER_LEN: usize = 17;
pub const FROZEN_PREFIX_LEN: usize = 5;
pub const MAX_FRAME_BODY_LEN: u32 = 64 * 1024 * 1024;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ErrorBody {
pub code: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ModuleHelloBody {
pub manifest: manifest::ModuleManifest,
pub protocol_ver: u8,
#[serde(default)]
pub control_ops: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ModuleHelloAckBody {
pub negotiated_ver: u8,
pub subc_ops: Vec<String>,
pub subc_capabilities: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum FrameType {
Request = 0,
Response = 1,
Push = 2,
StreamData = 3,
StreamEnd = 4,
Error = 5,
Cancel = 6,
Ping = 7,
Pong = 8,
Hello = 9,
HelloAck = 10,
Goodbye = 11,
}
impl FrameType {
pub fn from_u8(b: u8) -> Option<Self> {
Some(match b {
0 => Self::Request,
1 => Self::Response,
2 => Self::Push,
3 => Self::StreamData,
4 => Self::StreamEnd,
5 => Self::Error,
6 => Self::Cancel,
7 => Self::Ping,
8 => Self::Pong,
9 => Self::Hello,
10 => Self::HelloAck,
11 => Self::Goodbye,
_ => return None,
})
}
pub fn is_pure_header(self) -> bool {
matches!(self, Self::Cancel | Self::Ping | Self::Pong | Self::Goodbye)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum Priority {
Passive = 0,
Interactive = 1,
Background = 2,
}
impl Priority {
fn from_bits(bits: u8) -> Option<Self> {
Some(match bits {
0 => Self::Passive,
1 => Self::Interactive,
2 => Self::Background,
_ => return None,
})
}
}
const FLAG_BINARY: u8 = 0b0000_0001; const FLAG_PRIORITY_MASK: u8 = 0b0000_0110; const FLAG_PRIORITY_SHIFT: u8 = 1;
const FLAG_LAST: u8 = 0b0000_1000; const FLAG_RESERVED_MASK: u8 = 0b1111_0000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Flags(pub u8);
impl Flags {
pub fn new(binary: bool, priority: Priority, last: bool) -> Self {
let mut b = 0u8;
if binary {
b |= FLAG_BINARY;
}
b |= (priority as u8) << FLAG_PRIORITY_SHIFT;
if last {
b |= FLAG_LAST;
}
Flags(b)
}
pub fn is_binary(self) -> bool {
self.0 & FLAG_BINARY != 0
}
pub fn is_last(self) -> bool {
self.0 & FLAG_LAST != 0
}
pub fn priority(self) -> Option<Priority> {
Priority::from_bits((self.0 & FLAG_PRIORITY_MASK) >> FLAG_PRIORITY_SHIFT)
}
pub fn has_reserved_bits(self) -> bool {
self.0 & FLAG_RESERVED_MASK != 0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EnvelopeHeader {
pub len: u32,
pub ver: u8,
pub ty: FrameType,
pub flags: Flags,
pub channel: u16,
pub corr: u64,
}
impl EnvelopeHeader {
pub fn encode(&self) -> [u8; HEADER_LEN] {
let mut buf = [0u8; HEADER_LEN];
buf[0..4].copy_from_slice(&self.len.to_le_bytes());
buf[4] = self.ver;
buf[5] = self.ty as u8;
buf[6] = self.flags.0;
buf[7..9].copy_from_slice(&self.channel.to_le_bytes());
buf[9..17].copy_from_slice(&self.corr.to_le_bytes());
buf
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DecodeError {
TooShortForPrefix { have: usize },
UnsupportedVersion { ver: u8 },
TooShortForHeader { have: usize, need: usize },
UnknownFrameType { byte: u8 },
ReservedFlagBits { flags: u8 },
ReservedPriorityBits { flags: u8 },
PureHeaderFrameWithBody { ty: FrameType, len: u32 },
}
impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TooShortForPrefix { have } => {
write!(f, "header shorter than frozen prefix: have {have} bytes")
}
Self::UnsupportedVersion { ver } => write!(f, "unsupported envelope version {ver}"),
Self::TooShortForHeader { have, need } => {
write!(
f,
"header too short for version: have {have} bytes, need {need}"
)
}
Self::UnknownFrameType { byte } => write!(f, "unknown frame type byte {byte}"),
Self::ReservedFlagBits { flags } => {
write!(f, "reserved flag bits set in flags 0b{flags:08b}")
}
Self::ReservedPriorityBits { flags } => {
write!(f, "reserved priority bits set in flags 0b{flags:08b}")
}
Self::PureHeaderFrameWithBody { ty, len } => {
write!(
f,
"pure-header frame {ty:?} declared non-zero body length {len}"
)
}
}
}
}
impl Error for DecodeError {}
fn header_len_for_version(ver: u8) -> Option<usize> {
match ver {
1 => Some(HEADER_LEN),
_ => None,
}
}
pub fn decode_header(bytes: &[u8]) -> Result<EnvelopeHeader, DecodeError> {
if bytes.len() < FROZEN_PREFIX_LEN {
return Err(DecodeError::TooShortForPrefix { have: bytes.len() });
}
let ver = bytes[4];
let need = header_len_for_version(ver).ok_or(DecodeError::UnsupportedVersion { ver })?;
if bytes.len() < need {
return Err(DecodeError::TooShortForHeader {
have: bytes.len(),
need,
});
}
let len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let ty =
FrameType::from_u8(bytes[5]).ok_or(DecodeError::UnknownFrameType { byte: bytes[5] })?;
let flags = Flags(bytes[6]);
if flags.has_reserved_bits() {
return Err(DecodeError::ReservedFlagBits { flags: bytes[6] });
}
if flags.priority().is_none() {
return Err(DecodeError::ReservedPriorityBits { flags: bytes[6] });
}
if ty.is_pure_header() && len != 0 {
return Err(DecodeError::PureHeaderFrameWithBody { ty, len });
}
let channel = u16::from_le_bytes([bytes[7], bytes[8]]);
let corr = u64::from_le_bytes([
bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], bytes[16],
]);
Ok(EnvelopeHeader {
len,
ver,
ty,
flags,
channel,
corr,
})
}
#[cfg(test)]
mod tests {
use super::*;
fn hdr(len: u32, ty: FrameType, flags: Flags, channel: u16, corr: u64) -> EnvelopeHeader {
EnvelopeHeader {
len,
ver: PROTOCOL_VERSION,
ty,
flags,
channel,
corr,
}
}
#[test]
fn bind_identity_round_trips_json() {
let identity = BindIdentity {
project_root: PathBuf::from("/tmp/project"),
harness: "opencode".to_string(),
session: "session-1".to_string(),
};
let encoded = serde_json::to_vec(&identity).unwrap();
let decoded: BindIdentity = serde_json::from_slice(&encoded).unwrap();
assert_eq!(decoded, identity);
}
#[test]
fn route_target_variants_round_trip_json() {
let targets = [
RouteTarget::ToolProvider {
module_id: "aft".to_string(),
},
RouteTarget::ManagementSurface {
module_id: "memory".to_string(),
},
RouteTarget::InternalService {
module_id: "bus".to_string(),
service_id: "dm".to_string(),
},
];
for target in targets {
let encoded = serde_json::to_vec(&target).unwrap();
let decoded: RouteTarget = serde_json::from_slice(&encoded).unwrap();
assert_eq!(decoded, target);
}
}
#[test]
fn error_body_round_trips_json() {
let body = ErrorBody {
code: "config_divergence".to_string(),
message: "active config differs".to_string(),
};
let encoded = serde_json::to_vec(&body).unwrap();
let decoded: ErrorBody = serde_json::from_slice(&encoded).unwrap();
assert_eq!(decoded, body);
}
#[test]
fn round_trip_request() {
let h = hdr(
1234,
FrameType::Request,
Flags::new(false, Priority::Interactive, false),
42,
0xDEAD_BEEF_0000_0001,
);
let decoded = decode_header(&h.encode()).unwrap();
assert_eq!(h, decoded);
}
#[test]
fn round_trip_all_frame_types() {
for b in 0u8..=11 {
let ty = FrameType::from_u8(b).unwrap();
let h = hdr(0, ty, Flags::new(false, Priority::Passive, false), 0, 0);
assert_eq!(decode_header(&h.encode()).unwrap().ty, ty);
}
}
#[test]
fn pure_header_frame_has_zero_len() {
let h = hdr(
0,
FrameType::Cancel,
Flags::new(false, Priority::Passive, false),
7,
99,
);
let d = decode_header(&h.encode()).unwrap();
assert_eq!(d.len, 0);
assert_eq!(d.corr, 99);
}
#[test]
fn flags_round_trip() {
let f = Flags::new(true, Priority::Background, true);
assert!(f.is_binary());
assert!(f.is_last());
assert_eq!(f.priority(), Some(Priority::Background));
let h = hdr(8, FrameType::StreamData, f, 1, 1);
assert_eq!(decode_header(&h.encode()).unwrap().flags, f);
}
#[test]
fn little_endian_and_frozen_prefix_layout() {
let h = hdr(1, FrameType::Request, Flags(0), 0, 0);
let buf = h.encode();
assert_eq!(buf[0], 1);
assert_eq!(buf[1..4], [0, 0, 0]);
assert_eq!(buf[4], PROTOCOL_VERSION); assert_eq!(buf.len(), HEADER_LEN);
}
#[test]
fn reject_too_short_for_prefix() {
assert_eq!(
decode_header(&[0, 0, 0, 0]),
Err(DecodeError::TooShortForPrefix { have: 4 })
);
}
#[test]
fn reject_too_short_for_header() {
let mut b = [0u8; 10];
b[4] = PROTOCOL_VERSION;
assert_eq!(
decode_header(&b),
Err(DecodeError::TooShortForHeader { have: 10, need: 17 })
);
}
#[test]
fn reject_unsupported_version() {
let mut b = [0u8; HEADER_LEN];
b[4] = 2; assert_eq!(
decode_header(&b),
Err(DecodeError::UnsupportedVersion { ver: 2 })
);
}
#[test]
fn reject_unknown_frame_type() {
let mut b = [0u8; HEADER_LEN];
b[4] = PROTOCOL_VERSION;
b[5] = 99;
assert_eq!(
decode_header(&b),
Err(DecodeError::UnknownFrameType { byte: 99 })
);
}
#[test]
fn reject_reserved_flag_bits() {
let mut b = [0u8; HEADER_LEN];
b[4] = PROTOCOL_VERSION;
b[5] = FrameType::Request as u8;
b[6] = 0b1000_0000; assert_eq!(
decode_header(&b),
Err(DecodeError::ReservedFlagBits { flags: 0b1000_0000 })
);
}
#[test]
fn reject_reserved_priority_bits() {
let mut b = [0u8; HEADER_LEN];
b[4] = PROTOCOL_VERSION;
b[5] = FrameType::Request as u8;
b[6] = 0b0000_0110; assert_eq!(
decode_header(&b),
Err(DecodeError::ReservedPriorityBits { flags: 0b0000_0110 })
);
}
#[test]
fn reject_pure_header_frame_with_body_len() {
let h = hdr(
1,
FrameType::Ping,
Flags::new(false, Priority::Passive, false),
0,
1,
);
assert_eq!(
decode_header(&h.encode()),
Err(DecodeError::PureHeaderFrameWithBody {
ty: FrameType::Ping,
len: 1
})
);
}
}