Skip to main content

chio_kernel/
transport.rs

1//! Length-prefixed canonical JSON transport.
2//!
3//! Wire format: `[4-byte big-endian length][canonical JSON bytes]`
4//!
5//! The transport is generic over `Read` and `Write` so it works with pipes,
6//! TCP, Unix domain sockets, or in-memory buffers for testing.
7
8use std::io::{BufReader, BufWriter, Read, Write};
9
10use chio_core::canonical::canonical_json_bytes;
11use chio_core::message::{AgentMessage, KernelMessage};
12
13/// Errors produced by the transport layer.
14#[derive(Debug, thiserror::Error)]
15pub enum TransportError {
16    #[error("i/o error: {0}")]
17    Io(#[from] std::io::Error),
18
19    #[error("message too large: {size} bytes (max {max})")]
20    MessageTooLarge { size: u32, max: u32 },
21
22    #[error("json deserialization error: {0}")]
23    Deserialize(#[from] serde_json::Error),
24
25    #[error("canonical json serialization error: {0}")]
26    Serialize(String),
27
28    #[error("connection closed")]
29    ConnectionClosed,
30}
31
32/// Maximum message size: 16 MiB.
33const MAX_MESSAGE_SIZE: u32 = 16 * 1024 * 1024;
34
35/// Length-prefixed canonical JSON transport.
36///
37/// Reads `AgentMessage` frames from the reader and writes `KernelMessage`
38/// frames to the writer. Each frame is a 4-byte big-endian length prefix
39/// followed by that many bytes of canonical JSON.
40pub struct ChioTransport<R: Read, W: Write> {
41    reader: BufReader<R>,
42    writer: BufWriter<W>,
43}
44
45impl<R: Read, W: Write> ChioTransport<R, W> {
46    pub fn new(reader: R, writer: W) -> Self {
47        Self {
48            reader: BufReader::new(reader),
49            writer: BufWriter::new(writer),
50        }
51    }
52
53    /// Read one `AgentMessage` from the transport.
54    ///
55    /// Blocks until a complete frame is available. Returns
56    /// `TransportError::ConnectionClosed` if the reader reaches EOF before
57    /// a complete frame is read.
58    pub fn recv(&mut self) -> Result<AgentMessage, TransportError> {
59        let bytes = read_frame(&mut self.reader)?;
60        let msg: AgentMessage = serde_json::from_slice(&bytes)?;
61        Ok(msg)
62    }
63
64    /// Send one `KernelMessage` over the transport.
65    ///
66    /// The message is serialized to canonical JSON (RFC 8785) and written
67    /// as a length-prefixed frame. The writer is flushed after each send.
68    pub fn send(&mut self, msg: &KernelMessage) -> Result<(), TransportError> {
69        let bytes =
70            canonical_json_bytes(msg).map_err(|e| TransportError::Serialize(e.to_string()))?;
71        write_frame(&mut self.writer, &bytes)?;
72        self.writer.flush()?;
73        Ok(())
74    }
75}
76
77/// Read a single length-prefixed frame from a reader.
78pub fn read_frame<R: Read>(reader: &mut R) -> Result<Vec<u8>, TransportError> {
79    let mut len_buf = [0u8; 4];
80    match reader.read_exact(&mut len_buf) {
81        Ok(()) => {}
82        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
83            return Err(TransportError::ConnectionClosed);
84        }
85        Err(e) => return Err(TransportError::Io(e)),
86    }
87
88    let len = u32::from_be_bytes(len_buf);
89    if len > MAX_MESSAGE_SIZE {
90        return Err(TransportError::MessageTooLarge {
91            size: len,
92            max: MAX_MESSAGE_SIZE,
93        });
94    }
95
96    let mut buf = vec![0u8; len as usize];
97    match reader.read_exact(&mut buf) {
98        Ok(()) => {}
99        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
100            return Err(TransportError::ConnectionClosed);
101        }
102        Err(e) => return Err(TransportError::Io(e)),
103    }
104    Ok(buf)
105}
106
107/// Write a single length-prefixed frame to a writer.
108pub fn write_frame<W: Write>(writer: &mut W, data: &[u8]) -> Result<(), TransportError> {
109    let len = u32::try_from(data.len()).map_err(|_| TransportError::MessageTooLarge {
110        size: u32::MAX,
111        max: MAX_MESSAGE_SIZE,
112    })?;
113    if len > MAX_MESSAGE_SIZE {
114        return Err(TransportError::MessageTooLarge {
115            size: len,
116            max: MAX_MESSAGE_SIZE,
117        });
118    }
119    writer.write_all(&len.to_be_bytes())?;
120    writer.write_all(data)?;
121    Ok(())
122}
123
124#[cfg(test)]
125#[allow(clippy::unwrap_used, clippy::expect_used)]
126mod tests {
127    use super::*;
128    use chio_core::capability::{
129        CapabilityToken, CapabilityTokenBody, ChioScope, Operation, ToolGrant,
130    };
131    use chio_core::crypto::Keypair;
132    use chio_core::receipt::{
133        ChioReceipt, ChioReceiptBody, Decision, GuardEvidence, ToolCallAction,
134    };
135    use std::io::Cursor;
136
137    fn make_token(kp: &Keypair) -> CapabilityToken {
138        let body = CapabilityTokenBody {
139            id: "cap-transport-001".to_string(),
140            issuer: kp.public_key(),
141            subject: kp.public_key(),
142            scope: ChioScope {
143                grants: vec![ToolGrant {
144                    server_id: "srv".to_string(),
145                    tool_name: "echo".to_string(),
146                    operations: vec![Operation::Invoke],
147                    constraints: vec![],
148                    max_invocations: None,
149                    max_cost_per_invocation: None,
150                    max_total_cost: None,
151                    dpop_required: None,
152                }],
153                ..ChioScope::default()
154            },
155            issued_at: 1000,
156            expires_at: 2000,
157            delegation_chain: vec![],
158        };
159        CapabilityToken::sign(body, kp).unwrap()
160    }
161
162    fn make_receipt(kp: &Keypair) -> ChioReceipt {
163        let body = ChioReceiptBody {
164            id: "rcpt-transport-001".to_string(),
165            timestamp: 1500,
166            capability_id: "cap-transport-001".to_string(),
167            tool_server: "srv".to_string(),
168            tool_name: "echo".to_string(),
169            action: ToolCallAction::from_parameters(serde_json::json!({"text": "hello"})).unwrap(),
170            decision: Decision::Allow,
171            content_hash: chio_core::sha256_hex(br#"{"output":"world"}"#),
172            policy_hash: "deadbeef".to_string(),
173            evidence: vec![GuardEvidence {
174                guard_name: "ShellCommandGuard".to_string(),
175                verdict: true,
176                details: None,
177            }],
178            metadata: None,
179            trust_level: chio_core::TrustLevel::default(),
180            tenant_id: None,
181            kernel_key: kp.public_key(),
182        };
183        ChioReceipt::sign(body, kp).unwrap()
184    }
185
186    #[test]
187    fn frame_roundtrip() {
188        let data = b"hello, world";
189        let mut buf = Vec::new();
190        write_frame(&mut buf, data).unwrap();
191
192        let mut cursor = Cursor::new(buf);
193        let recovered = read_frame(&mut cursor).unwrap();
194        assert_eq!(recovered, data);
195    }
196
197    #[test]
198    fn length_prefix_encoding() {
199        let data = vec![0xAA; 256];
200        let mut buf = Vec::new();
201        write_frame(&mut buf, &data).unwrap();
202
203        // First 4 bytes should be big-endian 256.
204        assert_eq!(&buf[..4], &[0, 0, 1, 0]);
205        assert_eq!(buf.len(), 4 + 256);
206    }
207
208    #[test]
209    fn transport_agent_message_roundtrip() {
210        let kp = Keypair::generate();
211        let msg = AgentMessage::ToolCallRequest {
212            id: "req-001".to_string(),
213            capability_token: Box::new(make_token(&kp)),
214            server_id: "srv".to_string(),
215            tool: "echo".to_string(),
216            params: serde_json::json!({"text": "hello"}),
217        };
218
219        // Serialize to a buffer (using canonical JSON, same as KernelMessage path).
220        let bytes = canonical_json_bytes(&msg).expect("canonical serialization");
221        let mut wire = Vec::new();
222        write_frame(&mut wire, &bytes).unwrap();
223
224        // Read it back.
225        let mut cursor = Cursor::new(wire);
226        let frame = read_frame(&mut cursor).unwrap();
227        let recovered: AgentMessage = serde_json::from_slice(&frame).unwrap();
228
229        let (id, server_id, tool) = match recovered {
230            AgentMessage::ToolCallRequest {
231                id,
232                server_id,
233                tool,
234                ..
235            } => Some((id, server_id, tool)),
236            _ => None,
237        }
238        .expect("wrong variant");
239        assert_eq!(id, "req-001");
240        assert_eq!(server_id, "srv");
241        assert_eq!(tool, "echo");
242    }
243
244    #[test]
245    fn transport_kernel_message_roundtrip() {
246        let kp = Keypair::generate();
247        let receipt = make_receipt(&kp);
248        let kernel_msg = KernelMessage::ToolCallResponse {
249            id: "req-001".to_string(),
250            result: chio_core::message::ToolCallResult::Ok {
251                value: serde_json::json!({"output": "world"}),
252            },
253            receipt: Box::new(receipt),
254        };
255
256        // Use a shared buffer as the "pipe".
257        let mut wire = Vec::new();
258        {
259            let bytes = canonical_json_bytes(&kernel_msg).expect("canonical serialization");
260            write_frame(&mut wire, &bytes).unwrap();
261        }
262
263        let mut cursor = Cursor::new(wire);
264        let frame = read_frame(&mut cursor).unwrap();
265        let recovered: KernelMessage = serde_json::from_slice(&frame).unwrap();
266
267        let (id, result, receipt) = match recovered {
268            KernelMessage::ToolCallResponse {
269                id,
270                result,
271                receipt,
272            } => Some((id, result, receipt)),
273            _ => None,
274        }
275        .expect("wrong variant");
276        assert_eq!(id, "req-001");
277        assert!(matches!(
278            result,
279            chio_core::message::ToolCallResult::Ok { .. }
280        ));
281        assert!(receipt.verify_signature().unwrap());
282    }
283
284    #[test]
285    fn transport_kernel_chunk_roundtrip() {
286        let kernel_msg = KernelMessage::ToolCallChunk {
287            id: "req-stream-1".to_string(),
288            chunk_index: 1,
289            data: serde_json::json!({"delta": "world"}),
290        };
291
292        let mut wire = Vec::new();
293        {
294            let bytes = canonical_json_bytes(&kernel_msg).expect("canonical serialization");
295            write_frame(&mut wire, &bytes).unwrap();
296        }
297
298        let mut cursor = Cursor::new(wire);
299        let frame = read_frame(&mut cursor).unwrap();
300        let recovered: KernelMessage = serde_json::from_slice(&frame).unwrap();
301
302        let (id, chunk_index, data) = match recovered {
303            KernelMessage::ToolCallChunk {
304                id,
305                chunk_index,
306                data,
307            } => Some((id, chunk_index, data)),
308            _ => None,
309        }
310        .expect("wrong variant");
311        assert_eq!(id, "req-stream-1");
312        assert_eq!(chunk_index, 1);
313        assert_eq!(data["delta"], "world");
314    }
315
316    #[test]
317    fn transport_send_recv_roundtrip() {
318        // Build the agent message to send.
319        let agent_msg = AgentMessage::Heartbeat;
320        let agent_bytes = canonical_json_bytes(&agent_msg).expect("canonical");
321        let mut agent_wire = Vec::new();
322        write_frame(&mut agent_wire, &agent_bytes).unwrap();
323
324        // Build a kernel message to send.
325        let kernel_msg = KernelMessage::Heartbeat;
326
327        // Create transport: agent_wire is what the "agent" wrote, kernel_buf
328        // is where the kernel writes its response.
329        let kernel_buf: Vec<u8> = Vec::new();
330        let mut transport = ChioTransport::new(Cursor::new(agent_wire), kernel_buf);
331
332        // Receive the agent message.
333        let received = transport.recv().unwrap();
334        assert!(matches!(received, AgentMessage::Heartbeat));
335
336        // Send a kernel message.
337        transport.send(&kernel_msg).unwrap();
338    }
339
340    #[test]
341    fn connection_closed_on_empty_read() {
342        let empty: Vec<u8> = Vec::new();
343        let mut cursor = Cursor::new(empty);
344        let err = read_frame(&mut cursor).unwrap_err();
345        assert!(matches!(err, TransportError::ConnectionClosed));
346    }
347
348    #[test]
349    fn rejects_oversized_frame() {
350        // Craft a length prefix claiming 20 MiB.
351        let len: u32 = 20 * 1024 * 1024;
352        let mut buf = Vec::new();
353        buf.extend_from_slice(&len.to_be_bytes());
354        buf.extend_from_slice(&[0u8; 16]); // some trailing data
355
356        let mut cursor = Cursor::new(buf);
357        let err = read_frame(&mut cursor).unwrap_err();
358        assert!(matches!(err, TransportError::MessageTooLarge { .. }));
359    }
360
361    #[test]
362    fn multiple_frames_in_sequence() {
363        let mut wire = Vec::new();
364        write_frame(&mut wire, b"first").unwrap();
365        write_frame(&mut wire, b"second").unwrap();
366        write_frame(&mut wire, b"third").unwrap();
367
368        let mut cursor = Cursor::new(wire);
369        assert_eq!(read_frame(&mut cursor).unwrap(), b"first");
370        assert_eq!(read_frame(&mut cursor).unwrap(), b"second");
371        assert_eq!(read_frame(&mut cursor).unwrap(), b"third");
372
373        // Next read should get ConnectionClosed.
374        assert!(matches!(
375            read_frame(&mut cursor).unwrap_err(),
376            TransportError::ConnectionClosed
377        ));
378    }
379}