Skip to main content

forge_sandbox/
ipc.rs

1//! IPC protocol for parent ↔ worker communication.
2//!
3//! Uses length-delimited JSON messages: 4-byte big-endian length prefix + JSON payload.
4//! All messages are typed via [`ParentMessage`] and [`ChildMessage`] enums.
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::time::Duration;
9use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10
11/// Messages sent from the parent process to the worker child.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type")]
14pub enum ParentMessage {
15    /// Initial message: execute this code in the sandbox.
16    Execute {
17        /// The JavaScript async arrow function to execute.
18        code: String,
19        /// Optional capability manifest (for search mode — not used in child process execute).
20        manifest: Option<Value>,
21        /// Worker configuration.
22        config: WorkerConfig,
23    },
24    /// Response to a tool call request from the child.
25    ToolCallResult {
26        /// Matches the request_id from ChildMessage::ToolCallRequest.
27        request_id: u64,
28        /// The tool call result, or an error message.
29        result: Result<Value, String>,
30    },
31    /// Response to a resource read request from the child.
32    ResourceReadResult {
33        /// Matches the request_id from ChildMessage::ResourceReadRequest.
34        request_id: u64,
35        /// The resource content, or an error message.
36        result: Result<Value, String>,
37    },
38    /// Response to a stash operation from the child.
39    StashResult {
40        /// Matches the request_id from the stash request.
41        request_id: u64,
42        /// The stash operation result, or an error message.
43        result: Result<Value, String>,
44    },
45}
46
47/// Messages sent from the worker child to the parent process.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type")]
50pub enum ChildMessage {
51    /// Request the parent to dispatch a tool call.
52    ToolCallRequest {
53        /// Unique ID for correlating request ↔ response.
54        request_id: u64,
55        /// Target server name.
56        server: String,
57        /// Tool identifier.
58        tool: String,
59        /// Tool arguments.
60        args: Value,
61    },
62    /// Request the parent to read a resource.
63    ResourceReadRequest {
64        /// Unique ID for correlating request ↔ response.
65        request_id: u64,
66        /// Target server name.
67        server: String,
68        /// Resource URI.
69        uri: String,
70    },
71    /// Request the parent to put a value in the stash.
72    StashPut {
73        /// Unique ID for correlating request ↔ response.
74        request_id: u64,
75        /// Stash key.
76        key: String,
77        /// Value to store.
78        value: Value,
79        /// TTL in seconds (None = use default).
80        ttl_secs: Option<u32>,
81    },
82    /// Request the parent to get a value from the stash.
83    StashGet {
84        /// Unique ID for correlating request ↔ response.
85        request_id: u64,
86        /// Stash key.
87        key: String,
88    },
89    /// Request the parent to delete a value from the stash.
90    StashDelete {
91        /// Unique ID for correlating request ↔ response.
92        request_id: u64,
93        /// Stash key.
94        key: String,
95    },
96    /// Request the parent to list stash keys.
97    StashKeys {
98        /// Unique ID for correlating request ↔ response.
99        request_id: u64,
100    },
101    /// The execution has finished.
102    ExecutionComplete {
103        /// The result value, or an error message.
104        result: Result<Value, String>,
105    },
106    /// A log message from the worker.
107    Log {
108        /// The log message text.
109        message: String,
110    },
111}
112
113/// Configuration passed to the worker process.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct WorkerConfig {
116    /// Maximum execution time.
117    pub timeout_ms: u64,
118    /// V8 heap limit in bytes.
119    pub max_heap_size: usize,
120    /// Maximum tool calls per execution.
121    pub max_tool_calls: usize,
122    /// Maximum size of tool call arguments in bytes.
123    pub max_tool_call_args_size: usize,
124    /// Maximum size of the JSON result in bytes.
125    pub max_output_size: usize,
126    /// Maximum size of LLM-generated code in bytes.
127    pub max_code_size: usize,
128    /// Maximum IPC message size in bytes. Defaults to [`DEFAULT_MAX_IPC_MESSAGE_SIZE`].
129    #[serde(default = "default_max_ipc_message_size")]
130    pub max_ipc_message_size: usize,
131    /// Maximum resource content size in bytes.
132    #[serde(default = "default_max_resource_size")]
133    pub max_resource_size: usize,
134    /// Maximum concurrent calls in forge.parallel().
135    #[serde(default = "default_max_parallel")]
136    pub max_parallel: usize,
137}
138
139fn default_max_ipc_message_size() -> usize {
140    DEFAULT_MAX_IPC_MESSAGE_SIZE
141}
142
143fn default_max_resource_size() -> usize {
144    64 * 1024 * 1024 // 64 MB
145}
146
147fn default_max_parallel() -> usize {
148    8
149}
150
151impl From<&crate::SandboxConfig> for WorkerConfig {
152    fn from(config: &crate::SandboxConfig) -> Self {
153        Self {
154            timeout_ms: config.timeout.as_millis() as u64,
155            max_heap_size: config.max_heap_size,
156            max_tool_calls: config.max_tool_calls,
157            max_tool_call_args_size: config.max_tool_call_args_size,
158            max_output_size: config.max_output_size,
159            max_code_size: config.max_code_size,
160            max_ipc_message_size: config.max_ipc_message_size,
161            max_resource_size: config.max_resource_size,
162            max_parallel: config.max_parallel,
163        }
164    }
165}
166
167impl WorkerConfig {
168    /// Convert back to a SandboxConfig for use in the worker.
169    pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
170        crate::SandboxConfig {
171            timeout: Duration::from_millis(self.timeout_ms),
172            max_code_size: self.max_code_size,
173            max_output_size: self.max_output_size,
174            max_heap_size: self.max_heap_size,
175            max_concurrent: 1, // worker handles one execution
176            max_tool_calls: self.max_tool_calls,
177            max_tool_call_args_size: self.max_tool_call_args_size,
178            execution_mode: crate::executor::ExecutionMode::InProcess, // worker always runs in-process
179            max_resource_size: self.max_resource_size,
180            max_parallel: self.max_parallel,
181            max_ipc_message_size: self.max_ipc_message_size,
182        }
183    }
184}
185
186/// Write a length-delimited JSON message to an async writer.
187///
188/// Format: 4-byte big-endian length prefix followed by the JSON payload bytes.
189pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
190    writer: &mut W,
191    msg: &T,
192) -> Result<(), std::io::Error> {
193    let payload = serde_json::to_vec(msg)
194        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
195    let len = u32::try_from(payload.len()).map_err(|_| {
196        std::io::Error::new(
197            std::io::ErrorKind::InvalidData,
198            format!(
199                "IPC payload too large: {} bytes (max {} bytes)",
200                payload.len(),
201                u32::MAX
202            ),
203        )
204    })?;
205    writer.write_all(&len.to_be_bytes()).await?;
206    writer.write_all(&payload).await?;
207    writer.flush().await?;
208    Ok(())
209}
210
211/// Default maximum IPC message size: 8 MB.
212///
213/// Reduced from 64 MB to prevent single messages from causing memory pressure.
214/// Configurable via `sandbox.max_ipc_message_size_mb` in config.
215pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
216
217/// Read a length-delimited JSON message from an async reader.
218///
219/// Returns `None` if the reader has reached EOF (clean shutdown).
220/// Uses [`DEFAULT_MAX_IPC_MESSAGE_SIZE`] as the size limit.
221pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
222    reader: &mut R,
223) -> Result<Option<T>, std::io::Error> {
224    read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
225}
226
227/// Read a length-delimited JSON message with a configurable size limit.
228///
229/// Returns `None` if the reader has reached EOF (clean shutdown).
230/// The `max_size` parameter controls the maximum allowed message size in bytes.
231pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
232    reader: &mut R,
233    max_size: usize,
234) -> Result<Option<T>, std::io::Error> {
235    let mut len_buf = [0u8; 4];
236    match reader.read_exact(&mut len_buf).await {
237        Ok(_) => {}
238        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
239        Err(e) => return Err(e),
240    }
241
242    let len = u32::from_be_bytes(len_buf) as usize;
243
244    // Reject messages exceeding the configured limit
245    if len > max_size {
246        return Err(std::io::Error::new(
247            std::io::ErrorKind::InvalidData,
248            format!(
249                "IPC message too large: {} bytes (limit: {} bytes)",
250                len, max_size
251            ),
252        ));
253    }
254
255    let mut payload = vec![0u8; len];
256    reader.read_exact(&mut payload).await?;
257
258    let msg: T = serde_json::from_slice(&payload)
259        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
260    Ok(Some(msg))
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use std::io::Cursor;
267
268    #[tokio::test]
269    async fn roundtrip_parent_execute_message() {
270        let msg = ParentMessage::Execute {
271            code: "async () => { return 42; }".into(),
272            manifest: Some(serde_json::json!({"servers": []})),
273            config: WorkerConfig {
274                timeout_ms: 5000,
275                max_heap_size: 64 * 1024 * 1024,
276                max_tool_calls: 50,
277                max_tool_call_args_size: 1024 * 1024,
278                max_output_size: 1024 * 1024,
279                max_code_size: 64 * 1024,
280                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
281                max_resource_size: 64 * 1024 * 1024,
282                max_parallel: 8,
283            },
284        };
285
286        let mut buf = Vec::new();
287        write_message(&mut buf, &msg).await.unwrap();
288
289        let mut cursor = Cursor::new(buf);
290        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
291
292        match decoded {
293            ParentMessage::Execute {
294                code,
295                manifest,
296                config,
297            } => {
298                assert_eq!(code, "async () => { return 42; }");
299                assert!(manifest.is_some());
300                assert_eq!(config.timeout_ms, 5000);
301            }
302            other => panic!("expected Execute, got: {:?}", other),
303        }
304    }
305
306    #[tokio::test]
307    async fn roundtrip_parent_tool_result() {
308        let msg = ParentMessage::ToolCallResult {
309            request_id: 42,
310            result: Ok(serde_json::json!({"status": "ok"})),
311        };
312
313        let mut buf = Vec::new();
314        write_message(&mut buf, &msg).await.unwrap();
315
316        let mut cursor = Cursor::new(buf);
317        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
318
319        match decoded {
320            ParentMessage::ToolCallResult { request_id, result } => {
321                assert_eq!(request_id, 42);
322                assert!(result.is_ok());
323            }
324            other => panic!("expected ToolCallResult, got: {:?}", other),
325        }
326    }
327
328    #[tokio::test]
329    async fn roundtrip_parent_tool_result_error() {
330        let msg = ParentMessage::ToolCallResult {
331            request_id: 7,
332            result: Err("connection refused".into()),
333        };
334
335        let mut buf = Vec::new();
336        write_message(&mut buf, &msg).await.unwrap();
337
338        let mut cursor = Cursor::new(buf);
339        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
340
341        match decoded {
342            ParentMessage::ToolCallResult { request_id, result } => {
343                assert_eq!(request_id, 7);
344                assert_eq!(result.unwrap_err(), "connection refused");
345            }
346            other => panic!("expected ToolCallResult, got: {:?}", other),
347        }
348    }
349
350    #[tokio::test]
351    async fn roundtrip_child_tool_request() {
352        let msg = ChildMessage::ToolCallRequest {
353            request_id: 1,
354            server: "narsil".into(),
355            tool: "ast.parse".into(),
356            args: serde_json::json!({"file": "test.rs"}),
357        };
358
359        let mut buf = Vec::new();
360        write_message(&mut buf, &msg).await.unwrap();
361
362        let mut cursor = Cursor::new(buf);
363        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
364
365        match decoded {
366            ChildMessage::ToolCallRequest {
367                request_id,
368                server,
369                tool,
370                args,
371            } => {
372                assert_eq!(request_id, 1);
373                assert_eq!(server, "narsil");
374                assert_eq!(tool, "ast.parse");
375                assert_eq!(args["file"], "test.rs");
376            }
377            other => panic!("expected ToolCallRequest, got: {:?}", other),
378        }
379    }
380
381    #[tokio::test]
382    async fn roundtrip_child_execution_complete() {
383        let msg = ChildMessage::ExecutionComplete {
384            result: Ok(serde_json::json!([1, 2, 3])),
385        };
386
387        let mut buf = Vec::new();
388        write_message(&mut buf, &msg).await.unwrap();
389
390        let mut cursor = Cursor::new(buf);
391        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
392
393        match decoded {
394            ChildMessage::ExecutionComplete { result } => {
395                assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
396            }
397            other => panic!("expected ExecutionComplete, got: {:?}", other),
398        }
399    }
400
401    #[tokio::test]
402    async fn roundtrip_child_log() {
403        let msg = ChildMessage::Log {
404            message: "processing step 3".into(),
405        };
406
407        let mut buf = Vec::new();
408        write_message(&mut buf, &msg).await.unwrap();
409
410        let mut cursor = Cursor::new(buf);
411        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
412
413        match decoded {
414            ChildMessage::Log { message } => {
415                assert_eq!(message, "processing step 3");
416            }
417            other => panic!("expected Log, got: {:?}", other),
418        }
419    }
420
421    #[tokio::test]
422    async fn multiple_messages_in_stream() {
423        let msg1 = ChildMessage::Log {
424            message: "first".into(),
425        };
426        let msg2 = ChildMessage::ToolCallRequest {
427            request_id: 1,
428            server: "s".into(),
429            tool: "t".into(),
430            args: serde_json::json!({}),
431        };
432        let msg3 = ChildMessage::ExecutionComplete {
433            result: Ok(serde_json::json!("done")),
434        };
435
436        let mut buf = Vec::new();
437        write_message(&mut buf, &msg1).await.unwrap();
438        write_message(&mut buf, &msg2).await.unwrap();
439        write_message(&mut buf, &msg3).await.unwrap();
440
441        let mut cursor = Cursor::new(buf);
442        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
443        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
444        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
445
446        assert!(matches!(d1, ChildMessage::Log { .. }));
447        assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
448        assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
449
450        // EOF after all messages
451        let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
452        assert!(d4.is_none());
453    }
454
455    #[tokio::test]
456    async fn execution_complete_error_roundtrip() {
457        let msg = ChildMessage::ExecutionComplete {
458            result: Err("failed to create tokio runtime: resource unavailable".into()),
459        };
460
461        let mut buf = Vec::new();
462        write_message(&mut buf, &msg).await.unwrap();
463
464        let mut cursor = Cursor::new(buf);
465        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
466
467        match decoded {
468            ChildMessage::ExecutionComplete { result } => {
469                let err = result.unwrap_err();
470                assert!(
471                    err.contains("tokio runtime"),
472                    "expected runtime error: {err}"
473                );
474            }
475            other => panic!("expected ExecutionComplete, got: {:?}", other),
476        }
477    }
478
479    #[tokio::test]
480    async fn eof_returns_none() {
481        let mut cursor = Cursor::new(Vec::<u8>::new());
482        let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
483        assert!(result.is_none());
484    }
485
486    #[test]
487    fn u32_try_from_overflow() {
488        // Validates that the conversion logic correctly rejects sizes > u32::MAX
489        let overflow_size = u32::MAX as usize + 1;
490        assert!(u32::try_from(overflow_size).is_err());
491    }
492
493    #[tokio::test]
494    async fn write_message_normal_size_succeeds() {
495        // Regression guard: normal-sized messages still work after the try_from change
496        let msg = ChildMessage::Log {
497            message: "a".repeat(1024),
498        };
499        let mut buf = Vec::new();
500        write_message(&mut buf, &msg).await.unwrap();
501        assert!(buf.len() > 1024);
502    }
503
504    #[tokio::test]
505    async fn large_message_roundtrip() {
506        // A large payload (~1MB of data)
507        let large_data = "x".repeat(1_000_000);
508        let msg = ChildMessage::ExecutionComplete {
509            result: Ok(serde_json::json!(large_data)),
510        };
511
512        let mut buf = Vec::new();
513        write_message(&mut buf, &msg).await.unwrap();
514
515        let mut cursor = Cursor::new(buf);
516        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
517
518        match decoded {
519            ChildMessage::ExecutionComplete { result } => {
520                assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
521            }
522            other => panic!("expected ExecutionComplete, got: {:?}", other),
523        }
524    }
525
526    #[tokio::test]
527    async fn worker_config_roundtrip_from_sandbox_config() {
528        let sandbox = crate::SandboxConfig::default();
529        let worker = WorkerConfig::from(&sandbox);
530        let back = worker.to_sandbox_config();
531
532        assert_eq!(sandbox.timeout, back.timeout);
533        assert_eq!(sandbox.max_heap_size, back.max_heap_size);
534        assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
535        assert_eq!(sandbox.max_output_size, back.max_output_size);
536        assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
537        assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); // 8 MB default
538    }
539
540    #[tokio::test]
541    async fn read_message_with_limit_rejects_oversized() {
542        let msg = ChildMessage::Log {
543            message: "x".repeat(1024),
544        };
545        let mut buf = Vec::new();
546        write_message(&mut buf, &msg).await.unwrap();
547
548        // Set limit smaller than the message payload
549        let mut cursor = Cursor::new(buf);
550        let result: Result<Option<ChildMessage>, _> =
551            read_message_with_limit(&mut cursor, 64).await;
552        assert!(result.is_err());
553        let err_msg = result.unwrap_err().to_string();
554        assert!(err_msg.contains("too large"), "error: {err_msg}");
555    }
556
557    #[tokio::test]
558    async fn read_message_with_limit_accepts_within_limit() {
559        let msg = ChildMessage::Log {
560            message: "hello".into(),
561        };
562        let mut buf = Vec::new();
563        write_message(&mut buf, &msg).await.unwrap();
564
565        let mut cursor = Cursor::new(buf);
566        let result: Option<ChildMessage> =
567            read_message_with_limit(&mut cursor, 1024).await.unwrap();
568        assert!(result.is_some());
569    }
570
571    #[tokio::test]
572    async fn worker_config_ipc_limit_serde_default() {
573        // Deserializing JSON without max_ipc_message_size should use the default
574        let json = r#"{
575            "timeout_ms": 5000,
576            "max_heap_size": 67108864,
577            "max_tool_calls": 50,
578            "max_tool_call_args_size": 1048576,
579            "max_output_size": 1048576,
580            "max_code_size": 65536
581        }"#;
582        let config: WorkerConfig = serde_json::from_str(json).unwrap();
583        assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
584    }
585
586    // --- IPC-01: ResourceReadRequest round-trip ---
587    #[tokio::test]
588    async fn ipc_01_resource_read_request_roundtrip() {
589        let msg = ChildMessage::ResourceReadRequest {
590            request_id: 10,
591            server: "postgres".into(),
592            uri: "file:///logs/app.log".into(),
593        };
594
595        let mut buf = Vec::new();
596        write_message(&mut buf, &msg).await.unwrap();
597
598        let mut cursor = Cursor::new(buf);
599        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
600
601        match decoded {
602            ChildMessage::ResourceReadRequest {
603                request_id,
604                server,
605                uri,
606            } => {
607                assert_eq!(request_id, 10);
608                assert_eq!(server, "postgres");
609                assert_eq!(uri, "file:///logs/app.log");
610            }
611            other => panic!("expected ResourceReadRequest, got: {:?}", other),
612        }
613    }
614
615    // --- IPC-02: ResourceReadResult (success) round-trip ---
616    #[tokio::test]
617    async fn ipc_02_resource_read_result_success_roundtrip() {
618        let msg = ParentMessage::ResourceReadResult {
619            request_id: 11,
620            result: Ok(serde_json::json!({"content": "log data here"})),
621        };
622
623        let mut buf = Vec::new();
624        write_message(&mut buf, &msg).await.unwrap();
625
626        let mut cursor = Cursor::new(buf);
627        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
628
629        match decoded {
630            ParentMessage::ResourceReadResult { request_id, result } => {
631                assert_eq!(request_id, 11);
632                let val = result.unwrap();
633                assert_eq!(val["content"], "log data here");
634            }
635            other => panic!("expected ResourceReadResult, got: {:?}", other),
636        }
637    }
638
639    // --- IPC-03: ResourceReadResult (error) round-trip ---
640    #[tokio::test]
641    async fn ipc_03_resource_read_result_error_roundtrip() {
642        let msg = ParentMessage::ResourceReadResult {
643            request_id: 12,
644            result: Err("resource not found".into()),
645        };
646
647        let mut buf = Vec::new();
648        write_message(&mut buf, &msg).await.unwrap();
649
650        let mut cursor = Cursor::new(buf);
651        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
652
653        match decoded {
654            ParentMessage::ResourceReadResult { request_id, result } => {
655                assert_eq!(request_id, 12);
656                assert_eq!(result.unwrap_err(), "resource not found");
657            }
658            other => panic!("expected ResourceReadResult, got: {:?}", other),
659        }
660    }
661
662    // --- IPC-04: StashPut round-trip ---
663    #[tokio::test]
664    async fn ipc_04_stash_put_roundtrip() {
665        let msg = ChildMessage::StashPut {
666            request_id: 20,
667            key: "my-key".into(),
668            value: serde_json::json!({"data": 42}),
669            ttl_secs: Some(60),
670        };
671
672        let mut buf = Vec::new();
673        write_message(&mut buf, &msg).await.unwrap();
674
675        let mut cursor = Cursor::new(buf);
676        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
677
678        match decoded {
679            ChildMessage::StashPut {
680                request_id,
681                key,
682                value,
683                ttl_secs,
684            } => {
685                assert_eq!(request_id, 20);
686                assert_eq!(key, "my-key");
687                assert_eq!(value["data"], 42);
688                assert_eq!(ttl_secs, Some(60));
689            }
690            other => panic!("expected StashPut, got: {:?}", other),
691        }
692    }
693
694    // --- IPC-05: StashGet round-trip ---
695    #[tokio::test]
696    async fn ipc_05_stash_get_roundtrip() {
697        let msg = ChildMessage::StashGet {
698            request_id: 21,
699            key: "lookup-key".into(),
700        };
701
702        let mut buf = Vec::new();
703        write_message(&mut buf, &msg).await.unwrap();
704
705        let mut cursor = Cursor::new(buf);
706        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
707
708        match decoded {
709            ChildMessage::StashGet { request_id, key } => {
710                assert_eq!(request_id, 21);
711                assert_eq!(key, "lookup-key");
712            }
713            other => panic!("expected StashGet, got: {:?}", other),
714        }
715    }
716
717    // --- IPC-06: StashDelete round-trip ---
718    #[tokio::test]
719    async fn ipc_06_stash_delete_roundtrip() {
720        let msg = ChildMessage::StashDelete {
721            request_id: 22,
722            key: "delete-me".into(),
723        };
724
725        let mut buf = Vec::new();
726        write_message(&mut buf, &msg).await.unwrap();
727
728        let mut cursor = Cursor::new(buf);
729        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
730
731        match decoded {
732            ChildMessage::StashDelete { request_id, key } => {
733                assert_eq!(request_id, 22);
734                assert_eq!(key, "delete-me");
735            }
736            other => panic!("expected StashDelete, got: {:?}", other),
737        }
738    }
739
740    // --- IPC-07: StashKeys round-trip ---
741    #[tokio::test]
742    async fn ipc_07_stash_keys_roundtrip() {
743        let msg = ChildMessage::StashKeys { request_id: 23 };
744
745        let mut buf = Vec::new();
746        write_message(&mut buf, &msg).await.unwrap();
747
748        let mut cursor = Cursor::new(buf);
749        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
750
751        match decoded {
752            ChildMessage::StashKeys { request_id } => {
753                assert_eq!(request_id, 23);
754            }
755            other => panic!("expected StashKeys, got: {:?}", other),
756        }
757    }
758
759    // --- IPC-08: StashResult round-trip ---
760    #[tokio::test]
761    async fn ipc_08_stash_result_roundtrip() {
762        let msg = ParentMessage::StashResult {
763            request_id: 24,
764            result: Ok(serde_json::json!({"ok": true})),
765        };
766
767        let mut buf = Vec::new();
768        write_message(&mut buf, &msg).await.unwrap();
769
770        let mut cursor = Cursor::new(buf);
771        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
772
773        match decoded {
774            ParentMessage::StashResult { request_id, result } => {
775                assert_eq!(request_id, 24);
776                assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
777            }
778            other => panic!("expected StashResult, got: {:?}", other),
779        }
780    }
781
782    // --- IPC-09: Mixed message interleaving (tool + resource + stash in single stream) ---
783    #[tokio::test]
784    async fn ipc_09_mixed_message_interleaving() {
785        let msg1 = ChildMessage::ToolCallRequest {
786            request_id: 1,
787            server: "s".into(),
788            tool: "t".into(),
789            args: serde_json::json!({}),
790        };
791        let msg2 = ChildMessage::ResourceReadRequest {
792            request_id: 2,
793            server: "pg".into(),
794            uri: "file:///data".into(),
795        };
796        let msg3 = ChildMessage::StashPut {
797            request_id: 3,
798            key: "k".into(),
799            value: serde_json::json!("v"),
800            ttl_secs: None,
801        };
802        let msg4 = ChildMessage::StashGet {
803            request_id: 4,
804            key: "k".into(),
805        };
806        let msg5 = ChildMessage::ExecutionComplete {
807            result: Ok(serde_json::json!("done")),
808        };
809
810        let mut buf = Vec::new();
811        write_message(&mut buf, &msg1).await.unwrap();
812        write_message(&mut buf, &msg2).await.unwrap();
813        write_message(&mut buf, &msg3).await.unwrap();
814        write_message(&mut buf, &msg4).await.unwrap();
815        write_message(&mut buf, &msg5).await.unwrap();
816
817        let mut cursor = Cursor::new(buf);
818        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
819        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
820        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
821        let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
822        let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
823
824        assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
825        assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
826        assert!(matches!(d3, ChildMessage::StashPut { .. }));
827        assert!(matches!(d4, ChildMessage::StashGet { .. }));
828        assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
829
830        // EOF after all messages
831        let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
832        assert!(d6.is_none());
833    }
834
835    // --- IPC-10: Oversized stash message rejected by read_message_with_limit ---
836    #[tokio::test]
837    async fn ipc_10_oversized_stash_message_rejected() {
838        let msg = ChildMessage::StashPut {
839            request_id: 100,
840            key: "k".into(),
841            value: serde_json::json!("x".repeat(2048)),
842            ttl_secs: Some(60),
843        };
844        let mut buf = Vec::new();
845        write_message(&mut buf, &msg).await.unwrap();
846
847        // Set limit smaller than the message payload
848        let mut cursor = Cursor::new(buf);
849        let result: Result<Option<ChildMessage>, _> =
850            read_message_with_limit(&mut cursor, 64).await;
851        assert!(result.is_err());
852        let err_msg = result.unwrap_err().to_string();
853        assert!(
854            err_msg.contains("too large"),
855            "error should mention 'too large': {err_msg}"
856        );
857    }
858}