1use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::time::Duration;
9use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(tag = "type")]
14pub enum ParentMessage {
15 Execute {
17 code: String,
19 manifest: Option<Value>,
21 config: WorkerConfig,
23 },
24 ToolCallResult {
26 request_id: u64,
28 result: Result<Value, String>,
30 },
31 ResourceReadResult {
33 request_id: u64,
35 result: Result<Value, String>,
37 },
38 StashResult {
40 request_id: u64,
42 result: Result<Value, String>,
44 },
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type")]
50pub enum ChildMessage {
51 ToolCallRequest {
53 request_id: u64,
55 server: String,
57 tool: String,
59 args: Value,
61 },
62 ResourceReadRequest {
64 request_id: u64,
66 server: String,
68 uri: String,
70 },
71 StashPut {
73 request_id: u64,
75 key: String,
77 value: Value,
79 ttl_secs: Option<u32>,
81 },
82 StashGet {
84 request_id: u64,
86 key: String,
88 },
89 StashDelete {
91 request_id: u64,
93 key: String,
95 },
96 StashKeys {
98 request_id: u64,
100 },
101 ExecutionComplete {
103 result: Result<Value, String>,
105 },
106 Log {
108 message: String,
110 },
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct WorkerConfig {
116 pub timeout_ms: u64,
118 pub max_heap_size: usize,
120 pub max_tool_calls: usize,
122 pub max_tool_call_args_size: usize,
124 pub max_output_size: usize,
126 pub max_code_size: usize,
128 #[serde(default = "default_max_ipc_message_size")]
130 pub max_ipc_message_size: usize,
131 #[serde(default = "default_max_resource_size")]
133 pub max_resource_size: usize,
134 #[serde(default = "default_max_parallel")]
136 pub max_parallel: usize,
137}
138
139fn default_max_ipc_message_size() -> usize {
140 DEFAULT_MAX_IPC_MESSAGE_SIZE
141}
142
143fn default_max_resource_size() -> usize {
144 64 * 1024 * 1024 }
146
147fn default_max_parallel() -> usize {
148 8
149}
150
151impl From<&crate::SandboxConfig> for WorkerConfig {
152 fn from(config: &crate::SandboxConfig) -> Self {
153 Self {
154 timeout_ms: config.timeout.as_millis() as u64,
155 max_heap_size: config.max_heap_size,
156 max_tool_calls: config.max_tool_calls,
157 max_tool_call_args_size: config.max_tool_call_args_size,
158 max_output_size: config.max_output_size,
159 max_code_size: config.max_code_size,
160 max_ipc_message_size: config.max_ipc_message_size,
161 max_resource_size: config.max_resource_size,
162 max_parallel: config.max_parallel,
163 }
164 }
165}
166
167impl WorkerConfig {
168 pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
170 crate::SandboxConfig {
171 timeout: Duration::from_millis(self.timeout_ms),
172 max_code_size: self.max_code_size,
173 max_output_size: self.max_output_size,
174 max_heap_size: self.max_heap_size,
175 max_concurrent: 1, max_tool_calls: self.max_tool_calls,
177 max_tool_call_args_size: self.max_tool_call_args_size,
178 execution_mode: crate::executor::ExecutionMode::InProcess, max_resource_size: self.max_resource_size,
180 max_parallel: self.max_parallel,
181 max_ipc_message_size: self.max_ipc_message_size,
182 }
183 }
184}
185
186pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
190 writer: &mut W,
191 msg: &T,
192) -> Result<(), std::io::Error> {
193 let payload = serde_json::to_vec(msg)
194 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
195 let len = u32::try_from(payload.len()).map_err(|_| {
196 std::io::Error::new(
197 std::io::ErrorKind::InvalidData,
198 format!(
199 "IPC payload too large: {} bytes (max {} bytes)",
200 payload.len(),
201 u32::MAX
202 ),
203 )
204 })?;
205 writer.write_all(&len.to_be_bytes()).await?;
206 writer.write_all(&payload).await?;
207 writer.flush().await?;
208 Ok(())
209}
210
211pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
216
217pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
222 reader: &mut R,
223) -> Result<Option<T>, std::io::Error> {
224 read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
225}
226
227pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
232 reader: &mut R,
233 max_size: usize,
234) -> Result<Option<T>, std::io::Error> {
235 let mut len_buf = [0u8; 4];
236 match reader.read_exact(&mut len_buf).await {
237 Ok(_) => {}
238 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
239 Err(e) => return Err(e),
240 }
241
242 let len = u32::from_be_bytes(len_buf) as usize;
243
244 if len > max_size {
246 return Err(std::io::Error::new(
247 std::io::ErrorKind::InvalidData,
248 format!(
249 "IPC message too large: {} bytes (limit: {} bytes)",
250 len, max_size
251 ),
252 ));
253 }
254
255 let mut payload = vec![0u8; len];
256 reader.read_exact(&mut payload).await?;
257
258 let msg: T = serde_json::from_slice(&payload)
259 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
260 Ok(Some(msg))
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use std::io::Cursor;
267
268 #[tokio::test]
269 async fn roundtrip_parent_execute_message() {
270 let msg = ParentMessage::Execute {
271 code: "async () => { return 42; }".into(),
272 manifest: Some(serde_json::json!({"servers": []})),
273 config: WorkerConfig {
274 timeout_ms: 5000,
275 max_heap_size: 64 * 1024 * 1024,
276 max_tool_calls: 50,
277 max_tool_call_args_size: 1024 * 1024,
278 max_output_size: 1024 * 1024,
279 max_code_size: 64 * 1024,
280 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
281 max_resource_size: 64 * 1024 * 1024,
282 max_parallel: 8,
283 },
284 };
285
286 let mut buf = Vec::new();
287 write_message(&mut buf, &msg).await.unwrap();
288
289 let mut cursor = Cursor::new(buf);
290 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
291
292 match decoded {
293 ParentMessage::Execute {
294 code,
295 manifest,
296 config,
297 } => {
298 assert_eq!(code, "async () => { return 42; }");
299 assert!(manifest.is_some());
300 assert_eq!(config.timeout_ms, 5000);
301 }
302 other => panic!("expected Execute, got: {:?}", other),
303 }
304 }
305
306 #[tokio::test]
307 async fn roundtrip_parent_tool_result() {
308 let msg = ParentMessage::ToolCallResult {
309 request_id: 42,
310 result: Ok(serde_json::json!({"status": "ok"})),
311 };
312
313 let mut buf = Vec::new();
314 write_message(&mut buf, &msg).await.unwrap();
315
316 let mut cursor = Cursor::new(buf);
317 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
318
319 match decoded {
320 ParentMessage::ToolCallResult { request_id, result } => {
321 assert_eq!(request_id, 42);
322 assert!(result.is_ok());
323 }
324 other => panic!("expected ToolCallResult, got: {:?}", other),
325 }
326 }
327
328 #[tokio::test]
329 async fn roundtrip_parent_tool_result_error() {
330 let msg = ParentMessage::ToolCallResult {
331 request_id: 7,
332 result: Err("connection refused".into()),
333 };
334
335 let mut buf = Vec::new();
336 write_message(&mut buf, &msg).await.unwrap();
337
338 let mut cursor = Cursor::new(buf);
339 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
340
341 match decoded {
342 ParentMessage::ToolCallResult { request_id, result } => {
343 assert_eq!(request_id, 7);
344 assert_eq!(result.unwrap_err(), "connection refused");
345 }
346 other => panic!("expected ToolCallResult, got: {:?}", other),
347 }
348 }
349
350 #[tokio::test]
351 async fn roundtrip_child_tool_request() {
352 let msg = ChildMessage::ToolCallRequest {
353 request_id: 1,
354 server: "narsil".into(),
355 tool: "ast.parse".into(),
356 args: serde_json::json!({"file": "test.rs"}),
357 };
358
359 let mut buf = Vec::new();
360 write_message(&mut buf, &msg).await.unwrap();
361
362 let mut cursor = Cursor::new(buf);
363 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
364
365 match decoded {
366 ChildMessage::ToolCallRequest {
367 request_id,
368 server,
369 tool,
370 args,
371 } => {
372 assert_eq!(request_id, 1);
373 assert_eq!(server, "narsil");
374 assert_eq!(tool, "ast.parse");
375 assert_eq!(args["file"], "test.rs");
376 }
377 other => panic!("expected ToolCallRequest, got: {:?}", other),
378 }
379 }
380
381 #[tokio::test]
382 async fn roundtrip_child_execution_complete() {
383 let msg = ChildMessage::ExecutionComplete {
384 result: Ok(serde_json::json!([1, 2, 3])),
385 };
386
387 let mut buf = Vec::new();
388 write_message(&mut buf, &msg).await.unwrap();
389
390 let mut cursor = Cursor::new(buf);
391 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
392
393 match decoded {
394 ChildMessage::ExecutionComplete { result } => {
395 assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
396 }
397 other => panic!("expected ExecutionComplete, got: {:?}", other),
398 }
399 }
400
401 #[tokio::test]
402 async fn roundtrip_child_log() {
403 let msg = ChildMessage::Log {
404 message: "processing step 3".into(),
405 };
406
407 let mut buf = Vec::new();
408 write_message(&mut buf, &msg).await.unwrap();
409
410 let mut cursor = Cursor::new(buf);
411 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
412
413 match decoded {
414 ChildMessage::Log { message } => {
415 assert_eq!(message, "processing step 3");
416 }
417 other => panic!("expected Log, got: {:?}", other),
418 }
419 }
420
421 #[tokio::test]
422 async fn multiple_messages_in_stream() {
423 let msg1 = ChildMessage::Log {
424 message: "first".into(),
425 };
426 let msg2 = ChildMessage::ToolCallRequest {
427 request_id: 1,
428 server: "s".into(),
429 tool: "t".into(),
430 args: serde_json::json!({}),
431 };
432 let msg3 = ChildMessage::ExecutionComplete {
433 result: Ok(serde_json::json!("done")),
434 };
435
436 let mut buf = Vec::new();
437 write_message(&mut buf, &msg1).await.unwrap();
438 write_message(&mut buf, &msg2).await.unwrap();
439 write_message(&mut buf, &msg3).await.unwrap();
440
441 let mut cursor = Cursor::new(buf);
442 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
443 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
444 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
445
446 assert!(matches!(d1, ChildMessage::Log { .. }));
447 assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
448 assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
449
450 let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
452 assert!(d4.is_none());
453 }
454
455 #[tokio::test]
456 async fn execution_complete_error_roundtrip() {
457 let msg = ChildMessage::ExecutionComplete {
458 result: Err("failed to create tokio runtime: resource unavailable".into()),
459 };
460
461 let mut buf = Vec::new();
462 write_message(&mut buf, &msg).await.unwrap();
463
464 let mut cursor = Cursor::new(buf);
465 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
466
467 match decoded {
468 ChildMessage::ExecutionComplete { result } => {
469 let err = result.unwrap_err();
470 assert!(
471 err.contains("tokio runtime"),
472 "expected runtime error: {err}"
473 );
474 }
475 other => panic!("expected ExecutionComplete, got: {:?}", other),
476 }
477 }
478
479 #[tokio::test]
480 async fn eof_returns_none() {
481 let mut cursor = Cursor::new(Vec::<u8>::new());
482 let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
483 assert!(result.is_none());
484 }
485
486 #[test]
487 fn u32_try_from_overflow() {
488 let overflow_size = u32::MAX as usize + 1;
490 assert!(u32::try_from(overflow_size).is_err());
491 }
492
493 #[tokio::test]
494 async fn write_message_normal_size_succeeds() {
495 let msg = ChildMessage::Log {
497 message: "a".repeat(1024),
498 };
499 let mut buf = Vec::new();
500 write_message(&mut buf, &msg).await.unwrap();
501 assert!(buf.len() > 1024);
502 }
503
504 #[tokio::test]
505 async fn large_message_roundtrip() {
506 let large_data = "x".repeat(1_000_000);
508 let msg = ChildMessage::ExecutionComplete {
509 result: Ok(serde_json::json!(large_data)),
510 };
511
512 let mut buf = Vec::new();
513 write_message(&mut buf, &msg).await.unwrap();
514
515 let mut cursor = Cursor::new(buf);
516 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
517
518 match decoded {
519 ChildMessage::ExecutionComplete { result } => {
520 assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
521 }
522 other => panic!("expected ExecutionComplete, got: {:?}", other),
523 }
524 }
525
526 #[tokio::test]
527 async fn worker_config_roundtrip_from_sandbox_config() {
528 let sandbox = crate::SandboxConfig::default();
529 let worker = WorkerConfig::from(&sandbox);
530 let back = worker.to_sandbox_config();
531
532 assert_eq!(sandbox.timeout, back.timeout);
533 assert_eq!(sandbox.max_heap_size, back.max_heap_size);
534 assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
535 assert_eq!(sandbox.max_output_size, back.max_output_size);
536 assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
537 assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); }
539
540 #[tokio::test]
541 async fn read_message_with_limit_rejects_oversized() {
542 let msg = ChildMessage::Log {
543 message: "x".repeat(1024),
544 };
545 let mut buf = Vec::new();
546 write_message(&mut buf, &msg).await.unwrap();
547
548 let mut cursor = Cursor::new(buf);
550 let result: Result<Option<ChildMessage>, _> =
551 read_message_with_limit(&mut cursor, 64).await;
552 assert!(result.is_err());
553 let err_msg = result.unwrap_err().to_string();
554 assert!(err_msg.contains("too large"), "error: {err_msg}");
555 }
556
557 #[tokio::test]
558 async fn read_message_with_limit_accepts_within_limit() {
559 let msg = ChildMessage::Log {
560 message: "hello".into(),
561 };
562 let mut buf = Vec::new();
563 write_message(&mut buf, &msg).await.unwrap();
564
565 let mut cursor = Cursor::new(buf);
566 let result: Option<ChildMessage> =
567 read_message_with_limit(&mut cursor, 1024).await.unwrap();
568 assert!(result.is_some());
569 }
570
571 #[tokio::test]
572 async fn worker_config_ipc_limit_serde_default() {
573 let json = r#"{
575 "timeout_ms": 5000,
576 "max_heap_size": 67108864,
577 "max_tool_calls": 50,
578 "max_tool_call_args_size": 1048576,
579 "max_output_size": 1048576,
580 "max_code_size": 65536
581 }"#;
582 let config: WorkerConfig = serde_json::from_str(json).unwrap();
583 assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
584 }
585
586 #[tokio::test]
588 async fn ipc_01_resource_read_request_roundtrip() {
589 let msg = ChildMessage::ResourceReadRequest {
590 request_id: 10,
591 server: "postgres".into(),
592 uri: "file:///logs/app.log".into(),
593 };
594
595 let mut buf = Vec::new();
596 write_message(&mut buf, &msg).await.unwrap();
597
598 let mut cursor = Cursor::new(buf);
599 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
600
601 match decoded {
602 ChildMessage::ResourceReadRequest {
603 request_id,
604 server,
605 uri,
606 } => {
607 assert_eq!(request_id, 10);
608 assert_eq!(server, "postgres");
609 assert_eq!(uri, "file:///logs/app.log");
610 }
611 other => panic!("expected ResourceReadRequest, got: {:?}", other),
612 }
613 }
614
615 #[tokio::test]
617 async fn ipc_02_resource_read_result_success_roundtrip() {
618 let msg = ParentMessage::ResourceReadResult {
619 request_id: 11,
620 result: Ok(serde_json::json!({"content": "log data here"})),
621 };
622
623 let mut buf = Vec::new();
624 write_message(&mut buf, &msg).await.unwrap();
625
626 let mut cursor = Cursor::new(buf);
627 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
628
629 match decoded {
630 ParentMessage::ResourceReadResult { request_id, result } => {
631 assert_eq!(request_id, 11);
632 let val = result.unwrap();
633 assert_eq!(val["content"], "log data here");
634 }
635 other => panic!("expected ResourceReadResult, got: {:?}", other),
636 }
637 }
638
639 #[tokio::test]
641 async fn ipc_03_resource_read_result_error_roundtrip() {
642 let msg = ParentMessage::ResourceReadResult {
643 request_id: 12,
644 result: Err("resource not found".into()),
645 };
646
647 let mut buf = Vec::new();
648 write_message(&mut buf, &msg).await.unwrap();
649
650 let mut cursor = Cursor::new(buf);
651 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
652
653 match decoded {
654 ParentMessage::ResourceReadResult { request_id, result } => {
655 assert_eq!(request_id, 12);
656 assert_eq!(result.unwrap_err(), "resource not found");
657 }
658 other => panic!("expected ResourceReadResult, got: {:?}", other),
659 }
660 }
661
662 #[tokio::test]
664 async fn ipc_04_stash_put_roundtrip() {
665 let msg = ChildMessage::StashPut {
666 request_id: 20,
667 key: "my-key".into(),
668 value: serde_json::json!({"data": 42}),
669 ttl_secs: Some(60),
670 };
671
672 let mut buf = Vec::new();
673 write_message(&mut buf, &msg).await.unwrap();
674
675 let mut cursor = Cursor::new(buf);
676 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
677
678 match decoded {
679 ChildMessage::StashPut {
680 request_id,
681 key,
682 value,
683 ttl_secs,
684 } => {
685 assert_eq!(request_id, 20);
686 assert_eq!(key, "my-key");
687 assert_eq!(value["data"], 42);
688 assert_eq!(ttl_secs, Some(60));
689 }
690 other => panic!("expected StashPut, got: {:?}", other),
691 }
692 }
693
694 #[tokio::test]
696 async fn ipc_05_stash_get_roundtrip() {
697 let msg = ChildMessage::StashGet {
698 request_id: 21,
699 key: "lookup-key".into(),
700 };
701
702 let mut buf = Vec::new();
703 write_message(&mut buf, &msg).await.unwrap();
704
705 let mut cursor = Cursor::new(buf);
706 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
707
708 match decoded {
709 ChildMessage::StashGet { request_id, key } => {
710 assert_eq!(request_id, 21);
711 assert_eq!(key, "lookup-key");
712 }
713 other => panic!("expected StashGet, got: {:?}", other),
714 }
715 }
716
717 #[tokio::test]
719 async fn ipc_06_stash_delete_roundtrip() {
720 let msg = ChildMessage::StashDelete {
721 request_id: 22,
722 key: "delete-me".into(),
723 };
724
725 let mut buf = Vec::new();
726 write_message(&mut buf, &msg).await.unwrap();
727
728 let mut cursor = Cursor::new(buf);
729 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
730
731 match decoded {
732 ChildMessage::StashDelete { request_id, key } => {
733 assert_eq!(request_id, 22);
734 assert_eq!(key, "delete-me");
735 }
736 other => panic!("expected StashDelete, got: {:?}", other),
737 }
738 }
739
740 #[tokio::test]
742 async fn ipc_07_stash_keys_roundtrip() {
743 let msg = ChildMessage::StashKeys { request_id: 23 };
744
745 let mut buf = Vec::new();
746 write_message(&mut buf, &msg).await.unwrap();
747
748 let mut cursor = Cursor::new(buf);
749 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
750
751 match decoded {
752 ChildMessage::StashKeys { request_id } => {
753 assert_eq!(request_id, 23);
754 }
755 other => panic!("expected StashKeys, got: {:?}", other),
756 }
757 }
758
759 #[tokio::test]
761 async fn ipc_08_stash_result_roundtrip() {
762 let msg = ParentMessage::StashResult {
763 request_id: 24,
764 result: Ok(serde_json::json!({"ok": true})),
765 };
766
767 let mut buf = Vec::new();
768 write_message(&mut buf, &msg).await.unwrap();
769
770 let mut cursor = Cursor::new(buf);
771 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
772
773 match decoded {
774 ParentMessage::StashResult { request_id, result } => {
775 assert_eq!(request_id, 24);
776 assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
777 }
778 other => panic!("expected StashResult, got: {:?}", other),
779 }
780 }
781
782 #[tokio::test]
784 async fn ipc_09_mixed_message_interleaving() {
785 let msg1 = ChildMessage::ToolCallRequest {
786 request_id: 1,
787 server: "s".into(),
788 tool: "t".into(),
789 args: serde_json::json!({}),
790 };
791 let msg2 = ChildMessage::ResourceReadRequest {
792 request_id: 2,
793 server: "pg".into(),
794 uri: "file:///data".into(),
795 };
796 let msg3 = ChildMessage::StashPut {
797 request_id: 3,
798 key: "k".into(),
799 value: serde_json::json!("v"),
800 ttl_secs: None,
801 };
802 let msg4 = ChildMessage::StashGet {
803 request_id: 4,
804 key: "k".into(),
805 };
806 let msg5 = ChildMessage::ExecutionComplete {
807 result: Ok(serde_json::json!("done")),
808 };
809
810 let mut buf = Vec::new();
811 write_message(&mut buf, &msg1).await.unwrap();
812 write_message(&mut buf, &msg2).await.unwrap();
813 write_message(&mut buf, &msg3).await.unwrap();
814 write_message(&mut buf, &msg4).await.unwrap();
815 write_message(&mut buf, &msg5).await.unwrap();
816
817 let mut cursor = Cursor::new(buf);
818 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
819 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
820 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
821 let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
822 let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
823
824 assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
825 assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
826 assert!(matches!(d3, ChildMessage::StashPut { .. }));
827 assert!(matches!(d4, ChildMessage::StashGet { .. }));
828 assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
829
830 let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
832 assert!(d6.is_none());
833 }
834
835 #[tokio::test]
837 async fn ipc_10_oversized_stash_message_rejected() {
838 let msg = ChildMessage::StashPut {
839 request_id: 100,
840 key: "k".into(),
841 value: serde_json::json!("x".repeat(2048)),
842 ttl_secs: Some(60),
843 };
844 let mut buf = Vec::new();
845 write_message(&mut buf, &msg).await.unwrap();
846
847 let mut cursor = Cursor::new(buf);
849 let result: Result<Option<ChildMessage>, _> =
850 read_message_with_limit(&mut cursor, 64).await;
851 assert!(result.is_err());
852 let err_msg = result.unwrap_err().to_string();
853 assert!(
854 err_msg.contains("too large"),
855 "error should mention 'too large': {err_msg}"
856 );
857 }
858}