1use std::io::{BufReader, BufWriter, Read, Write};
9
10use chio_core::canonical::canonical_json_bytes;
11use chio_core::message::{AgentMessage, KernelMessage};
12
13#[derive(Debug, thiserror::Error)]
15pub enum TransportError {
16 #[error("i/o error: {0}")]
17 Io(#[from] std::io::Error),
18
19 #[error("message too large: {size} bytes (max {max})")]
20 MessageTooLarge { size: u32, max: u32 },
21
22 #[error("json deserialization error: {0}")]
23 Deserialize(#[from] serde_json::Error),
24
25 #[error("canonical json serialization error: {0}")]
26 Serialize(String),
27
28 #[error("connection closed")]
29 ConnectionClosed,
30}
31
32const MAX_MESSAGE_SIZE: u32 = 16 * 1024 * 1024;
34
35pub struct ChioTransport<R: Read, W: Write> {
41 reader: BufReader<R>,
42 writer: BufWriter<W>,
43}
44
45impl<R: Read, W: Write> ChioTransport<R, W> {
46 pub fn new(reader: R, writer: W) -> Self {
47 Self {
48 reader: BufReader::new(reader),
49 writer: BufWriter::new(writer),
50 }
51 }
52
53 pub fn recv(&mut self) -> Result<AgentMessage, TransportError> {
59 let bytes = read_frame(&mut self.reader)?;
60 let msg: AgentMessage = serde_json::from_slice(&bytes)?;
61 Ok(msg)
62 }
63
64 pub fn send(&mut self, msg: &KernelMessage) -> Result<(), TransportError> {
69 let bytes =
70 canonical_json_bytes(msg).map_err(|e| TransportError::Serialize(e.to_string()))?;
71 write_frame(&mut self.writer, &bytes)?;
72 self.writer.flush()?;
73 Ok(())
74 }
75}
76
77pub fn read_frame<R: Read>(reader: &mut R) -> Result<Vec<u8>, TransportError> {
79 let mut len_buf = [0u8; 4];
80 match reader.read_exact(&mut len_buf) {
81 Ok(()) => {}
82 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
83 return Err(TransportError::ConnectionClosed);
84 }
85 Err(e) => return Err(TransportError::Io(e)),
86 }
87
88 let len = u32::from_be_bytes(len_buf);
89 if len > MAX_MESSAGE_SIZE {
90 return Err(TransportError::MessageTooLarge {
91 size: len,
92 max: MAX_MESSAGE_SIZE,
93 });
94 }
95
96 let mut buf = vec![0u8; len as usize];
97 match reader.read_exact(&mut buf) {
98 Ok(()) => {}
99 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
100 return Err(TransportError::ConnectionClosed);
101 }
102 Err(e) => return Err(TransportError::Io(e)),
103 }
104 Ok(buf)
105}
106
107pub fn write_frame<W: Write>(writer: &mut W, data: &[u8]) -> Result<(), TransportError> {
109 let len = u32::try_from(data.len()).map_err(|_| TransportError::MessageTooLarge {
110 size: u32::MAX,
111 max: MAX_MESSAGE_SIZE,
112 })?;
113 if len > MAX_MESSAGE_SIZE {
114 return Err(TransportError::MessageTooLarge {
115 size: len,
116 max: MAX_MESSAGE_SIZE,
117 });
118 }
119 writer.write_all(&len.to_be_bytes())?;
120 writer.write_all(data)?;
121 Ok(())
122}
123
124#[cfg(test)]
125#[allow(clippy::unwrap_used, clippy::expect_used)]
126mod tests {
127 use super::*;
128 use chio_core::capability::{
129 CapabilityToken, CapabilityTokenBody, ChioScope, Operation, ToolGrant,
130 };
131 use chio_core::crypto::Keypair;
132 use chio_core::receipt::{
133 ChioReceipt, ChioReceiptBody, Decision, GuardEvidence, ToolCallAction,
134 };
135 use std::io::Cursor;
136
137 fn make_token(kp: &Keypair) -> CapabilityToken {
138 let body = CapabilityTokenBody {
139 id: "cap-transport-001".to_string(),
140 issuer: kp.public_key(),
141 subject: kp.public_key(),
142 scope: ChioScope {
143 grants: vec![ToolGrant {
144 server_id: "srv".to_string(),
145 tool_name: "echo".to_string(),
146 operations: vec![Operation::Invoke],
147 constraints: vec![],
148 max_invocations: None,
149 max_cost_per_invocation: None,
150 max_total_cost: None,
151 dpop_required: None,
152 }],
153 ..ChioScope::default()
154 },
155 issued_at: 1000,
156 expires_at: 2000,
157 delegation_chain: vec![],
158 };
159 CapabilityToken::sign(body, kp).unwrap()
160 }
161
162 fn make_receipt(kp: &Keypair) -> ChioReceipt {
163 let body = ChioReceiptBody {
164 id: "rcpt-transport-001".to_string(),
165 timestamp: 1500,
166 capability_id: "cap-transport-001".to_string(),
167 tool_server: "srv".to_string(),
168 tool_name: "echo".to_string(),
169 action: ToolCallAction::from_parameters(serde_json::json!({"text": "hello"})).unwrap(),
170 decision: Decision::Allow,
171 content_hash: chio_core::sha256_hex(br#"{"output":"world"}"#),
172 policy_hash: "deadbeef".to_string(),
173 evidence: vec![GuardEvidence {
174 guard_name: "ShellCommandGuard".to_string(),
175 verdict: true,
176 details: None,
177 }],
178 metadata: None,
179 trust_level: chio_core::TrustLevel::default(),
180 tenant_id: None,
181 kernel_key: kp.public_key(),
182 };
183 ChioReceipt::sign(body, kp).unwrap()
184 }
185
186 #[test]
187 fn frame_roundtrip() {
188 let data = b"hello, world";
189 let mut buf = Vec::new();
190 write_frame(&mut buf, data).unwrap();
191
192 let mut cursor = Cursor::new(buf);
193 let recovered = read_frame(&mut cursor).unwrap();
194 assert_eq!(recovered, data);
195 }
196
197 #[test]
198 fn length_prefix_encoding() {
199 let data = vec![0xAA; 256];
200 let mut buf = Vec::new();
201 write_frame(&mut buf, &data).unwrap();
202
203 assert_eq!(&buf[..4], &[0, 0, 1, 0]);
205 assert_eq!(buf.len(), 4 + 256);
206 }
207
208 #[test]
209 fn transport_agent_message_roundtrip() {
210 let kp = Keypair::generate();
211 let msg = AgentMessage::ToolCallRequest {
212 id: "req-001".to_string(),
213 capability_token: Box::new(make_token(&kp)),
214 server_id: "srv".to_string(),
215 tool: "echo".to_string(),
216 params: serde_json::json!({"text": "hello"}),
217 };
218
219 let bytes = canonical_json_bytes(&msg).expect("canonical serialization");
221 let mut wire = Vec::new();
222 write_frame(&mut wire, &bytes).unwrap();
223
224 let mut cursor = Cursor::new(wire);
226 let frame = read_frame(&mut cursor).unwrap();
227 let recovered: AgentMessage = serde_json::from_slice(&frame).unwrap();
228
229 let (id, server_id, tool) = match recovered {
230 AgentMessage::ToolCallRequest {
231 id,
232 server_id,
233 tool,
234 ..
235 } => Some((id, server_id, tool)),
236 _ => None,
237 }
238 .expect("wrong variant");
239 assert_eq!(id, "req-001");
240 assert_eq!(server_id, "srv");
241 assert_eq!(tool, "echo");
242 }
243
244 #[test]
245 fn transport_kernel_message_roundtrip() {
246 let kp = Keypair::generate();
247 let receipt = make_receipt(&kp);
248 let kernel_msg = KernelMessage::ToolCallResponse {
249 id: "req-001".to_string(),
250 result: chio_core::message::ToolCallResult::Ok {
251 value: serde_json::json!({"output": "world"}),
252 },
253 receipt: Box::new(receipt),
254 };
255
256 let mut wire = Vec::new();
258 {
259 let bytes = canonical_json_bytes(&kernel_msg).expect("canonical serialization");
260 write_frame(&mut wire, &bytes).unwrap();
261 }
262
263 let mut cursor = Cursor::new(wire);
264 let frame = read_frame(&mut cursor).unwrap();
265 let recovered: KernelMessage = serde_json::from_slice(&frame).unwrap();
266
267 let (id, result, receipt) = match recovered {
268 KernelMessage::ToolCallResponse {
269 id,
270 result,
271 receipt,
272 } => Some((id, result, receipt)),
273 _ => None,
274 }
275 .expect("wrong variant");
276 assert_eq!(id, "req-001");
277 assert!(matches!(
278 result,
279 chio_core::message::ToolCallResult::Ok { .. }
280 ));
281 assert!(receipt.verify_signature().unwrap());
282 }
283
284 #[test]
285 fn transport_kernel_chunk_roundtrip() {
286 let kernel_msg = KernelMessage::ToolCallChunk {
287 id: "req-stream-1".to_string(),
288 chunk_index: 1,
289 data: serde_json::json!({"delta": "world"}),
290 };
291
292 let mut wire = Vec::new();
293 {
294 let bytes = canonical_json_bytes(&kernel_msg).expect("canonical serialization");
295 write_frame(&mut wire, &bytes).unwrap();
296 }
297
298 let mut cursor = Cursor::new(wire);
299 let frame = read_frame(&mut cursor).unwrap();
300 let recovered: KernelMessage = serde_json::from_slice(&frame).unwrap();
301
302 let (id, chunk_index, data) = match recovered {
303 KernelMessage::ToolCallChunk {
304 id,
305 chunk_index,
306 data,
307 } => Some((id, chunk_index, data)),
308 _ => None,
309 }
310 .expect("wrong variant");
311 assert_eq!(id, "req-stream-1");
312 assert_eq!(chunk_index, 1);
313 assert_eq!(data["delta"], "world");
314 }
315
316 #[test]
317 fn transport_send_recv_roundtrip() {
318 let agent_msg = AgentMessage::Heartbeat;
320 let agent_bytes = canonical_json_bytes(&agent_msg).expect("canonical");
321 let mut agent_wire = Vec::new();
322 write_frame(&mut agent_wire, &agent_bytes).unwrap();
323
324 let kernel_msg = KernelMessage::Heartbeat;
326
327 let kernel_buf: Vec<u8> = Vec::new();
330 let mut transport = ChioTransport::new(Cursor::new(agent_wire), kernel_buf);
331
332 let received = transport.recv().unwrap();
334 assert!(matches!(received, AgentMessage::Heartbeat));
335
336 transport.send(&kernel_msg).unwrap();
338 }
339
340 #[test]
341 fn connection_closed_on_empty_read() {
342 let empty: Vec<u8> = Vec::new();
343 let mut cursor = Cursor::new(empty);
344 let err = read_frame(&mut cursor).unwrap_err();
345 assert!(matches!(err, TransportError::ConnectionClosed));
346 }
347
348 #[test]
349 fn rejects_oversized_frame() {
350 let len: u32 = 20 * 1024 * 1024;
352 let mut buf = Vec::new();
353 buf.extend_from_slice(&len.to_be_bytes());
354 buf.extend_from_slice(&[0u8; 16]); let mut cursor = Cursor::new(buf);
357 let err = read_frame(&mut cursor).unwrap_err();
358 assert!(matches!(err, TransportError::MessageTooLarge { .. }));
359 }
360
361 #[test]
362 fn multiple_frames_in_sequence() {
363 let mut wire = Vec::new();
364 write_frame(&mut wire, b"first").unwrap();
365 write_frame(&mut wire, b"second").unwrap();
366 write_frame(&mut wire, b"third").unwrap();
367
368 let mut cursor = Cursor::new(wire);
369 assert_eq!(read_frame(&mut cursor).unwrap(), b"first");
370 assert_eq!(read_frame(&mut cursor).unwrap(), b"second");
371 assert_eq!(read_frame(&mut cursor).unwrap(), b"third");
372
373 assert!(matches!(
375 read_frame(&mut cursor).unwrap_err(),
376 TransportError::ConnectionClosed
377 ));
378 }
379}