1use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21 Timeout,
23 HeapLimit,
25 JsError,
27 Execution,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39 pub code: String,
41 pub message: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub server: Option<String>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub tool: Option<String>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55 pub fn from_string(msg: String) -> Self {
58 Self {
59 code: "INTERNAL".to_string(),
60 message: msg,
61 server: None,
62 tool: None,
63 timeout_ms: None,
64 }
65 }
66
67 pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69 match self.code.as_str() {
70 "SERVER_NOT_FOUND" => {
71 forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72 }
73 "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74 server: self.server.unwrap_or_default(),
75 tool: self.tool.unwrap_or_default(),
76 },
77 "TIMEOUT" => forge_error::DispatchError::Timeout {
78 server: self.server.unwrap_or_default(),
79 timeout_ms: self.timeout_ms.unwrap_or(0),
80 },
81 "CIRCUIT_OPEN" => {
82 forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83 }
84 "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85 reason: self.message,
86 },
87 "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88 server: self.server.unwrap_or_default(),
89 message: self.message,
90 },
91 "TOOL_ERROR" => forge_error::DispatchError::ToolError {
92 server: self.server.unwrap_or_default(),
93 tool: self.tool.unwrap_or_default(),
94 message: self.message,
95 },
96 "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
97 _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
98 }
99 }
100}
101
102impl From<&forge_error::DispatchError> for IpcDispatchError {
103 fn from(e: &forge_error::DispatchError) -> Self {
104 let (server, tool, timeout_ms) = match e {
105 forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
106 forge_error::DispatchError::ToolNotFound { server, tool } => {
107 (Some(server.clone()), Some(tool.clone()), None)
108 }
109 forge_error::DispatchError::Timeout {
110 server, timeout_ms, ..
111 } => (Some(server.clone()), None, Some(*timeout_ms)),
112 forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
113 forge_error::DispatchError::Upstream { server, .. } => {
114 (Some(server.clone()), None, None)
115 }
116 forge_error::DispatchError::ToolError { server, tool, .. } => {
117 (Some(server.clone()), Some(tool.clone()), None)
118 }
119 _ => (None, None, None),
120 };
121
122 Self {
123 code: e.code().to_string(),
124 message: e.to_string(),
125 server,
126 tool,
127 timeout_ms,
128 }
129 }
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(tag = "type")]
135#[non_exhaustive]
136pub enum ParentMessage {
137 Execute {
139 code: String,
141 manifest: Option<Value>,
143 config: WorkerConfig,
145 },
146 ToolCallResult {
148 request_id: u64,
150 result: Result<Value, IpcDispatchError>,
152 },
153 ResourceReadResult {
155 request_id: u64,
157 result: Result<Value, IpcDispatchError>,
159 },
160 Reset {
165 config: WorkerConfig,
167 },
168 StashResult {
170 request_id: u64,
172 result: Result<Value, IpcDispatchError>,
174 },
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179#[serde(tag = "type")]
180#[non_exhaustive]
181pub enum ChildMessage {
182 ToolCallRequest {
184 request_id: u64,
186 server: String,
188 tool: String,
190 args: Value,
192 },
193 ResourceReadRequest {
195 request_id: u64,
197 server: String,
199 uri: String,
201 },
202 StashPut {
204 request_id: u64,
206 key: String,
208 value: Value,
210 ttl_secs: Option<u32>,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
214 group: Option<String>,
215 },
216 StashGet {
218 request_id: u64,
220 key: String,
222 #[serde(default, skip_serializing_if = "Option::is_none")]
224 group: Option<String>,
225 },
226 StashDelete {
228 request_id: u64,
230 key: String,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
234 group: Option<String>,
235 },
236 StashKeys {
238 request_id: u64,
240 #[serde(default, skip_serializing_if = "Option::is_none")]
242 group: Option<String>,
243 },
244 ResetComplete,
246 ExecutionComplete {
248 result: Result<Value, String>,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
254 error_kind: Option<ErrorKind>,
255 #[serde(default, skip_serializing_if = "Option::is_none")]
259 timeout_ms: Option<u64>,
260 },
261 Log {
263 message: String,
265 },
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct WorkerConfig {
271 pub timeout_ms: u64,
273 pub max_heap_size: usize,
275 pub max_tool_calls: usize,
277 pub max_tool_call_args_size: usize,
279 pub max_output_size: usize,
281 pub max_code_size: usize,
283 #[serde(default = "default_max_ipc_message_size")]
285 pub max_ipc_message_size: usize,
286 #[serde(default = "default_max_resource_size")]
288 pub max_resource_size: usize,
289 #[serde(default = "default_max_parallel")]
291 pub max_parallel: usize,
292 #[serde(default, skip_serializing_if = "Option::is_none")]
295 pub known_tools: Option<Vec<(String, String)>>,
296 #[serde(default, skip_serializing_if = "Option::is_none")]
298 pub known_servers: Option<std::collections::HashSet<String>>,
299}
300
301fn default_max_ipc_message_size() -> usize {
302 DEFAULT_MAX_IPC_MESSAGE_SIZE
303}
304
305fn default_max_resource_size() -> usize {
306 64 * 1024 * 1024 }
308
309fn default_max_parallel() -> usize {
310 8
311}
312
313impl From<&crate::SandboxConfig> for WorkerConfig {
314 fn from(config: &crate::SandboxConfig) -> Self {
315 Self {
316 timeout_ms: config.timeout.as_millis() as u64,
317 max_heap_size: config.max_heap_size,
318 max_tool_calls: config.max_tool_calls,
319 max_tool_call_args_size: config.max_tool_call_args_size,
320 max_output_size: config.max_output_size,
321 max_code_size: config.max_code_size,
322 max_ipc_message_size: config.max_ipc_message_size,
323 max_resource_size: config.max_resource_size,
324 max_parallel: config.max_parallel,
325 known_tools: None,
326 known_servers: None,
327 }
328 }
329}
330
331impl WorkerConfig {
332 pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
334 crate::SandboxConfig {
335 timeout: Duration::from_millis(self.timeout_ms),
336 max_code_size: self.max_code_size,
337 max_output_size: self.max_output_size,
338 max_heap_size: self.max_heap_size,
339 max_concurrent: 1, max_tool_calls: self.max_tool_calls,
341 max_tool_call_args_size: self.max_tool_call_args_size,
342 execution_mode: crate::executor::ExecutionMode::InProcess, max_resource_size: self.max_resource_size,
344 max_parallel: self.max_parallel,
345 max_ipc_message_size: self.max_ipc_message_size,
346 }
347 }
348}
349
350pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
354 writer: &mut W,
355 msg: &T,
356) -> Result<(), std::io::Error> {
357 let payload = serde_json::to_vec(msg)
358 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
359 let len = u32::try_from(payload.len()).map_err(|_| {
360 std::io::Error::new(
361 std::io::ErrorKind::InvalidData,
362 format!(
363 "IPC payload too large: {} bytes (max {} bytes)",
364 payload.len(),
365 u32::MAX
366 ),
367 )
368 })?;
369 writer.write_all(&len.to_be_bytes()).await?;
370 writer.write_all(&payload).await?;
371 writer.flush().await?;
372 Ok(())
373}
374
375pub async fn write_raw_message<W: AsyncWrite + Unpin>(
380 writer: &mut W,
381 payload: &[u8],
382) -> Result<(), std::io::Error> {
383 let len = u32::try_from(payload.len()).map_err(|_| {
384 std::io::Error::new(
385 std::io::ErrorKind::InvalidData,
386 format!(
387 "raw IPC payload too large: {} bytes (max {} bytes)",
388 payload.len(),
389 u32::MAX
390 ),
391 )
392 })?;
393 writer.write_all(&len.to_be_bytes()).await?;
394 writer.write_all(payload).await?;
395 writer.flush().await?;
396 Ok(())
397}
398
399pub async fn read_raw_message<R: AsyncRead + Unpin>(
404 reader: &mut R,
405 max_size: usize,
406) -> Result<Option<Box<RawValue>>, std::io::Error> {
407 let mut len_buf = [0u8; 4];
408 match reader.read_exact(&mut len_buf).await {
409 Ok(_) => {}
410 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
411 Err(e) => return Err(e),
412 }
413
414 let len = u32::from_be_bytes(len_buf) as usize;
415
416 if len > max_size {
417 return Err(std::io::Error::new(
418 std::io::ErrorKind::InvalidData,
419 format!(
420 "raw IPC message too large: {} bytes (limit: {} bytes)",
421 len, max_size
422 ),
423 ));
424 }
425
426 let mut payload = vec![0u8; len];
427 reader.read_exact(&mut payload).await?;
428
429 let raw: Box<RawValue> = serde_json::from_slice(&payload)
430 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
431 Ok(Some(raw))
432}
433
434pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
439
440pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
445 reader: &mut R,
446) -> Result<Option<T>, std::io::Error> {
447 read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
448}
449
450pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
455 reader: &mut R,
456 max_size: usize,
457) -> Result<Option<T>, std::io::Error> {
458 let mut len_buf = [0u8; 4];
459 match reader.read_exact(&mut len_buf).await {
460 Ok(_) => {}
461 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
462 Err(e) => return Err(e),
463 }
464
465 let len = u32::from_be_bytes(len_buf) as usize;
466
467 if len > max_size {
469 return Err(std::io::Error::new(
470 std::io::ErrorKind::InvalidData,
471 format!(
472 "IPC message too large: {} bytes (limit: {} bytes)",
473 len, max_size
474 ),
475 ));
476 }
477
478 let mut payload = vec![0u8; len];
479 reader.read_exact(&mut payload).await?;
480
481 let msg: T = serde_json::from_slice(&payload)
482 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
483 Ok(Some(msg))
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use std::io::Cursor;
490
491 #[tokio::test]
492 async fn roundtrip_parent_execute_message() {
493 let msg = ParentMessage::Execute {
494 code: "async () => { return 42; }".into(),
495 manifest: Some(serde_json::json!({"servers": []})),
496 config: WorkerConfig {
497 timeout_ms: 5000,
498 max_heap_size: 64 * 1024 * 1024,
499 max_tool_calls: 50,
500 max_tool_call_args_size: 1024 * 1024,
501 max_output_size: 1024 * 1024,
502 max_code_size: 64 * 1024,
503 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
504 max_resource_size: 64 * 1024 * 1024,
505 max_parallel: 8,
506 known_tools: None,
507 known_servers: None,
508 },
509 };
510
511 let mut buf = Vec::new();
512 write_message(&mut buf, &msg).await.unwrap();
513
514 let mut cursor = Cursor::new(buf);
515 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
516
517 match decoded {
518 ParentMessage::Execute {
519 code,
520 manifest,
521 config,
522 } => {
523 assert_eq!(code, "async () => { return 42; }");
524 assert!(manifest.is_some());
525 assert_eq!(config.timeout_ms, 5000);
526 }
527 other => panic!("expected Execute, got: {:?}", other),
528 }
529 }
530
531 #[tokio::test]
532 async fn roundtrip_parent_tool_result() {
533 let msg = ParentMessage::ToolCallResult {
534 request_id: 42,
535 result: Ok(serde_json::json!({"status": "ok"})),
536 };
537
538 let mut buf = Vec::new();
539 write_message(&mut buf, &msg).await.unwrap();
540
541 let mut cursor = Cursor::new(buf);
542 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
543
544 match decoded {
545 ParentMessage::ToolCallResult { request_id, result } => {
546 assert_eq!(request_id, 42);
547 assert!(result.is_ok());
548 }
549 other => panic!("expected ToolCallResult, got: {:?}", other),
550 }
551 }
552
553 #[tokio::test]
554 async fn roundtrip_parent_tool_result_error() {
555 let msg = ParentMessage::ToolCallResult {
556 request_id: 7,
557 result: Err(IpcDispatchError::from_string("connection refused".into())),
558 };
559
560 let mut buf = Vec::new();
561 write_message(&mut buf, &msg).await.unwrap();
562
563 let mut cursor = Cursor::new(buf);
564 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
565
566 match decoded {
567 ParentMessage::ToolCallResult { request_id, result } => {
568 assert_eq!(request_id, 7);
569 let err = result.unwrap_err();
570 assert_eq!(err.message, "connection refused");
571 assert_eq!(err.code, "INTERNAL");
572 }
573 other => panic!("expected ToolCallResult, got: {:?}", other),
574 }
575 }
576
577 #[tokio::test]
578 async fn roundtrip_child_tool_request() {
579 let msg = ChildMessage::ToolCallRequest {
580 request_id: 1,
581 server: "narsil".into(),
582 tool: "ast.parse".into(),
583 args: serde_json::json!({"file": "test.rs"}),
584 };
585
586 let mut buf = Vec::new();
587 write_message(&mut buf, &msg).await.unwrap();
588
589 let mut cursor = Cursor::new(buf);
590 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
591
592 match decoded {
593 ChildMessage::ToolCallRequest {
594 request_id,
595 server,
596 tool,
597 args,
598 } => {
599 assert_eq!(request_id, 1);
600 assert_eq!(server, "narsil");
601 assert_eq!(tool, "ast.parse");
602 assert_eq!(args["file"], "test.rs");
603 }
604 other => panic!("expected ToolCallRequest, got: {:?}", other),
605 }
606 }
607
608 #[tokio::test]
609 async fn roundtrip_child_execution_complete() {
610 let msg = ChildMessage::ExecutionComplete {
611 result: Ok(serde_json::json!([1, 2, 3])),
612 error_kind: None,
613 timeout_ms: None,
614 };
615
616 let mut buf = Vec::new();
617 write_message(&mut buf, &msg).await.unwrap();
618
619 let mut cursor = Cursor::new(buf);
620 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
621
622 match decoded {
623 ChildMessage::ExecutionComplete {
624 result, error_kind, ..
625 } => {
626 assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
627 assert_eq!(error_kind, None);
628 }
629 other => panic!("expected ExecutionComplete, got: {:?}", other),
630 }
631 }
632
633 #[tokio::test]
634 async fn roundtrip_child_log() {
635 let msg = ChildMessage::Log {
636 message: "processing step 3".into(),
637 };
638
639 let mut buf = Vec::new();
640 write_message(&mut buf, &msg).await.unwrap();
641
642 let mut cursor = Cursor::new(buf);
643 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
644
645 match decoded {
646 ChildMessage::Log { message } => {
647 assert_eq!(message, "processing step 3");
648 }
649 other => panic!("expected Log, got: {:?}", other),
650 }
651 }
652
653 #[tokio::test]
654 async fn multiple_messages_in_stream() {
655 let msg1 = ChildMessage::Log {
656 message: "first".into(),
657 };
658 let msg2 = ChildMessage::ToolCallRequest {
659 request_id: 1,
660 server: "s".into(),
661 tool: "t".into(),
662 args: serde_json::json!({}),
663 };
664 let msg3 = ChildMessage::ExecutionComplete {
665 result: Ok(serde_json::json!("done")),
666 error_kind: None,
667 timeout_ms: None,
668 };
669
670 let mut buf = Vec::new();
671 write_message(&mut buf, &msg1).await.unwrap();
672 write_message(&mut buf, &msg2).await.unwrap();
673 write_message(&mut buf, &msg3).await.unwrap();
674
675 let mut cursor = Cursor::new(buf);
676 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
677 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
678 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
679
680 assert!(matches!(d1, ChildMessage::Log { .. }));
681 assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
682 assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
683
684 let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
686 assert!(d4.is_none());
687 }
688
689 #[tokio::test]
690 async fn execution_complete_error_roundtrip() {
691 let msg = ChildMessage::ExecutionComplete {
692 result: Err("failed to create tokio runtime: resource unavailable".into()),
693 error_kind: Some(ErrorKind::Execution),
694 timeout_ms: None,
695 };
696
697 let mut buf = Vec::new();
698 write_message(&mut buf, &msg).await.unwrap();
699
700 let mut cursor = Cursor::new(buf);
701 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
702
703 match decoded {
704 ChildMessage::ExecutionComplete {
705 result, error_kind, ..
706 } => {
707 let err = result.unwrap_err();
708 assert!(
709 err.contains("tokio runtime"),
710 "expected runtime error: {err}"
711 );
712 assert_eq!(error_kind, Some(ErrorKind::Execution));
713 }
714 other => panic!("expected ExecutionComplete, got: {:?}", other),
715 }
716 }
717
718 #[tokio::test]
719 async fn eof_returns_none() {
720 let mut cursor = Cursor::new(Vec::<u8>::new());
721 let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
722 assert!(result.is_none());
723 }
724
725 #[test]
726 fn u32_try_from_overflow() {
727 let overflow_size = u32::MAX as usize + 1;
729 assert!(u32::try_from(overflow_size).is_err());
730 }
731
732 #[tokio::test]
733 async fn write_message_normal_size_succeeds() {
734 let msg = ChildMessage::Log {
736 message: "a".repeat(1024),
737 };
738 let mut buf = Vec::new();
739 write_message(&mut buf, &msg).await.unwrap();
740 assert!(buf.len() > 1024);
741 }
742
743 #[tokio::test]
744 async fn large_message_roundtrip() {
745 let large_data = "x".repeat(1_000_000);
747 let msg = ChildMessage::ExecutionComplete {
748 result: Ok(serde_json::json!(large_data)),
749 error_kind: None,
750 timeout_ms: None,
751 };
752
753 let mut buf = Vec::new();
754 write_message(&mut buf, &msg).await.unwrap();
755
756 let mut cursor = Cursor::new(buf);
757 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
758
759 match decoded {
760 ChildMessage::ExecutionComplete { result, .. } => {
761 assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
762 }
763 other => panic!("expected ExecutionComplete, got: {:?}", other),
764 }
765 }
766
767 #[tokio::test]
768 async fn worker_config_roundtrip_from_sandbox_config() {
769 let sandbox = crate::SandboxConfig::default();
770 let worker = WorkerConfig::from(&sandbox);
771 let back = worker.to_sandbox_config();
772
773 assert_eq!(sandbox.timeout, back.timeout);
774 assert_eq!(sandbox.max_heap_size, back.max_heap_size);
775 assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
776 assert_eq!(sandbox.max_output_size, back.max_output_size);
777 assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
778 assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); }
780
781 #[tokio::test]
782 async fn read_message_with_limit_rejects_oversized() {
783 let msg = ChildMessage::Log {
784 message: "x".repeat(1024),
785 };
786 let mut buf = Vec::new();
787 write_message(&mut buf, &msg).await.unwrap();
788
789 let mut cursor = Cursor::new(buf);
791 let result: Result<Option<ChildMessage>, _> =
792 read_message_with_limit(&mut cursor, 64).await;
793 assert!(result.is_err());
794 let err_msg = result.unwrap_err().to_string();
795 assert!(err_msg.contains("too large"), "error: {err_msg}");
796 }
797
798 #[tokio::test]
799 async fn read_message_with_limit_accepts_within_limit() {
800 let msg = ChildMessage::Log {
801 message: "hello".into(),
802 };
803 let mut buf = Vec::new();
804 write_message(&mut buf, &msg).await.unwrap();
805
806 let mut cursor = Cursor::new(buf);
807 let result: Option<ChildMessage> =
808 read_message_with_limit(&mut cursor, 1024).await.unwrap();
809 assert!(result.is_some());
810 }
811
812 #[tokio::test]
813 async fn worker_config_ipc_limit_serde_default() {
814 let json = r#"{
816 "timeout_ms": 5000,
817 "max_heap_size": 67108864,
818 "max_tool_calls": 50,
819 "max_tool_call_args_size": 1048576,
820 "max_output_size": 1048576,
821 "max_code_size": 65536
822 }"#;
823 let config: WorkerConfig = serde_json::from_str(json).unwrap();
824 assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
825 }
826
827 #[tokio::test]
829 async fn ipc_01_resource_read_request_roundtrip() {
830 let msg = ChildMessage::ResourceReadRequest {
831 request_id: 10,
832 server: "postgres".into(),
833 uri: "file:///logs/app.log".into(),
834 };
835
836 let mut buf = Vec::new();
837 write_message(&mut buf, &msg).await.unwrap();
838
839 let mut cursor = Cursor::new(buf);
840 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
841
842 match decoded {
843 ChildMessage::ResourceReadRequest {
844 request_id,
845 server,
846 uri,
847 } => {
848 assert_eq!(request_id, 10);
849 assert_eq!(server, "postgres");
850 assert_eq!(uri, "file:///logs/app.log");
851 }
852 other => panic!("expected ResourceReadRequest, got: {:?}", other),
853 }
854 }
855
856 #[tokio::test]
858 async fn ipc_02_resource_read_result_success_roundtrip() {
859 let msg = ParentMessage::ResourceReadResult {
860 request_id: 11,
861 result: Ok(serde_json::json!({"content": "log data here"})),
862 };
863
864 let mut buf = Vec::new();
865 write_message(&mut buf, &msg).await.unwrap();
866
867 let mut cursor = Cursor::new(buf);
868 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
869
870 match decoded {
871 ParentMessage::ResourceReadResult { request_id, result } => {
872 assert_eq!(request_id, 11);
873 let val = result.unwrap();
874 assert_eq!(val["content"], "log data here");
875 }
876 other => panic!("expected ResourceReadResult, got: {:?}", other),
877 }
878 }
879
880 #[tokio::test]
882 async fn ipc_03_resource_read_result_error_roundtrip() {
883 let msg = ParentMessage::ResourceReadResult {
884 request_id: 12,
885 result: Err(IpcDispatchError::from_string("resource not found".into())),
886 };
887
888 let mut buf = Vec::new();
889 write_message(&mut buf, &msg).await.unwrap();
890
891 let mut cursor = Cursor::new(buf);
892 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
893
894 match decoded {
895 ParentMessage::ResourceReadResult { request_id, result } => {
896 assert_eq!(request_id, 12);
897 let err = result.unwrap_err();
898 assert_eq!(err.message, "resource not found");
899 assert_eq!(err.code, "INTERNAL");
900 }
901 other => panic!("expected ResourceReadResult, got: {:?}", other),
902 }
903 }
904
905 #[tokio::test]
907 async fn ipc_04_stash_put_roundtrip() {
908 let msg = ChildMessage::StashPut {
909 request_id: 20,
910 key: "my-key".into(),
911 value: serde_json::json!({"data": 42}),
912 ttl_secs: Some(60),
913 group: None,
914 };
915
916 let mut buf = Vec::new();
917 write_message(&mut buf, &msg).await.unwrap();
918
919 let mut cursor = Cursor::new(buf);
920 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
921
922 match decoded {
923 ChildMessage::StashPut {
924 request_id,
925 key,
926 value,
927 ttl_secs,
928 group,
929 } => {
930 assert_eq!(request_id, 20);
931 assert_eq!(key, "my-key");
932 assert_eq!(value["data"], 42);
933 assert_eq!(ttl_secs, Some(60));
934 assert_eq!(group, None);
935 }
936 other => panic!("expected StashPut, got: {:?}", other),
937 }
938 }
939
940 #[tokio::test]
942 async fn ipc_05_stash_get_roundtrip() {
943 let msg = ChildMessage::StashGet {
944 request_id: 21,
945 key: "lookup-key".into(),
946 group: None,
947 };
948
949 let mut buf = Vec::new();
950 write_message(&mut buf, &msg).await.unwrap();
951
952 let mut cursor = Cursor::new(buf);
953 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
954
955 match decoded {
956 ChildMessage::StashGet {
957 request_id,
958 key,
959 group,
960 } => {
961 assert_eq!(request_id, 21);
962 assert_eq!(key, "lookup-key");
963 assert_eq!(group, None);
964 }
965 other => panic!("expected StashGet, got: {:?}", other),
966 }
967 }
968
969 #[tokio::test]
971 async fn ipc_06_stash_delete_roundtrip() {
972 let msg = ChildMessage::StashDelete {
973 request_id: 22,
974 key: "delete-me".into(),
975 group: None,
976 };
977
978 let mut buf = Vec::new();
979 write_message(&mut buf, &msg).await.unwrap();
980
981 let mut cursor = Cursor::new(buf);
982 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
983
984 match decoded {
985 ChildMessage::StashDelete {
986 request_id,
987 key,
988 group,
989 } => {
990 assert_eq!(request_id, 22);
991 assert_eq!(key, "delete-me");
992 assert_eq!(group, None);
993 }
994 other => panic!("expected StashDelete, got: {:?}", other),
995 }
996 }
997
998 #[tokio::test]
1000 async fn ipc_07_stash_keys_roundtrip() {
1001 let msg = ChildMessage::StashKeys {
1002 request_id: 23,
1003 group: None,
1004 };
1005
1006 let mut buf = Vec::new();
1007 write_message(&mut buf, &msg).await.unwrap();
1008
1009 let mut cursor = Cursor::new(buf);
1010 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1011
1012 match decoded {
1013 ChildMessage::StashKeys { request_id, group } => {
1014 assert_eq!(request_id, 23);
1015 assert_eq!(group, None);
1016 }
1017 other => panic!("expected StashKeys, got: {:?}", other),
1018 }
1019 }
1020
1021 #[tokio::test]
1023 async fn ipc_08_stash_result_roundtrip() {
1024 let msg = ParentMessage::StashResult {
1025 request_id: 24,
1026 result: Ok(serde_json::json!({"ok": true})),
1027 };
1028
1029 let mut buf = Vec::new();
1030 write_message(&mut buf, &msg).await.unwrap();
1031
1032 let mut cursor = Cursor::new(buf);
1033 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1034
1035 match decoded {
1036 ParentMessage::StashResult { request_id, result } => {
1037 assert_eq!(request_id, 24);
1038 assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1039 }
1040 other => panic!("expected StashResult, got: {:?}", other),
1041 }
1042 }
1043
1044 #[tokio::test]
1046 async fn ipc_09_mixed_message_interleaving() {
1047 let msg1 = ChildMessage::ToolCallRequest {
1048 request_id: 1,
1049 server: "s".into(),
1050 tool: "t".into(),
1051 args: serde_json::json!({}),
1052 };
1053 let msg2 = ChildMessage::ResourceReadRequest {
1054 request_id: 2,
1055 server: "pg".into(),
1056 uri: "file:///data".into(),
1057 };
1058 let msg3 = ChildMessage::StashPut {
1059 request_id: 3,
1060 key: "k".into(),
1061 value: serde_json::json!("v"),
1062 ttl_secs: None,
1063 group: None,
1064 };
1065 let msg4 = ChildMessage::StashGet {
1066 request_id: 4,
1067 key: "k".into(),
1068 group: None,
1069 };
1070 let msg5 = ChildMessage::ExecutionComplete {
1071 result: Ok(serde_json::json!("done")),
1072 error_kind: None,
1073 timeout_ms: None,
1074 };
1075
1076 let mut buf = Vec::new();
1077 write_message(&mut buf, &msg1).await.unwrap();
1078 write_message(&mut buf, &msg2).await.unwrap();
1079 write_message(&mut buf, &msg3).await.unwrap();
1080 write_message(&mut buf, &msg4).await.unwrap();
1081 write_message(&mut buf, &msg5).await.unwrap();
1082
1083 let mut cursor = Cursor::new(buf);
1084 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1085 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1086 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1087 let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1088 let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1089
1090 assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1091 assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1092 assert!(matches!(d3, ChildMessage::StashPut { .. }));
1093 assert!(matches!(d4, ChildMessage::StashGet { .. }));
1094 assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1095
1096 let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1098 assert!(d6.is_none());
1099 }
1100
1101 #[tokio::test]
1103 async fn ipc_p01_reset_roundtrip() {
1104 let msg = ParentMessage::Reset {
1105 config: WorkerConfig {
1106 timeout_ms: 3000,
1107 max_heap_size: 32 * 1024 * 1024,
1108 max_tool_calls: 25,
1109 max_tool_call_args_size: 512 * 1024,
1110 max_output_size: 512 * 1024,
1111 max_code_size: 32 * 1024,
1112 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1113 max_resource_size: 32 * 1024 * 1024,
1114 max_parallel: 4,
1115 known_tools: None,
1116 known_servers: None,
1117 },
1118 };
1119
1120 let mut buf = Vec::new();
1121 write_message(&mut buf, &msg).await.unwrap();
1122
1123 let mut cursor = Cursor::new(buf);
1124 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1125
1126 match decoded {
1127 ParentMessage::Reset { config } => {
1128 assert_eq!(config.timeout_ms, 3000);
1129 assert_eq!(config.max_tool_calls, 25);
1130 }
1131 other => panic!("expected Reset, got: {:?}", other),
1132 }
1133 }
1134
1135 #[tokio::test]
1137 async fn ipc_p02_reset_complete_roundtrip() {
1138 let msg = ChildMessage::ResetComplete;
1139
1140 let mut buf = Vec::new();
1141 write_message(&mut buf, &msg).await.unwrap();
1142
1143 let mut cursor = Cursor::new(buf);
1144 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1145
1146 assert!(matches!(decoded, ChildMessage::ResetComplete));
1147 }
1148
1149 #[tokio::test]
1151 async fn ipc_p03_reset_execute_interleaving() {
1152 let reset = ParentMessage::Reset {
1153 config: WorkerConfig {
1154 timeout_ms: 5000,
1155 max_heap_size: 64 * 1024 * 1024,
1156 max_tool_calls: 50,
1157 max_tool_call_args_size: 1024 * 1024,
1158 max_output_size: 1024 * 1024,
1159 max_code_size: 64 * 1024,
1160 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1161 max_resource_size: 64 * 1024 * 1024,
1162 max_parallel: 8,
1163 known_tools: None,
1164 known_servers: None,
1165 },
1166 };
1167 let execute = ParentMessage::Execute {
1168 code: "async () => 42".into(),
1169 manifest: None,
1170 config: WorkerConfig {
1171 timeout_ms: 5000,
1172 max_heap_size: 64 * 1024 * 1024,
1173 max_tool_calls: 50,
1174 max_tool_call_args_size: 1024 * 1024,
1175 max_output_size: 1024 * 1024,
1176 max_code_size: 64 * 1024,
1177 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1178 max_resource_size: 64 * 1024 * 1024,
1179 max_parallel: 8,
1180 known_tools: None,
1181 known_servers: None,
1182 },
1183 };
1184
1185 let mut buf = Vec::new();
1186 write_message(&mut buf, &reset).await.unwrap();
1187 write_message(&mut buf, &execute).await.unwrap();
1188
1189 let mut cursor = Cursor::new(buf);
1190 let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1191 let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1192
1193 assert!(matches!(d1, ParentMessage::Reset { .. }));
1194 assert!(matches!(d2, ParentMessage::Execute { .. }));
1195 }
1196
1197 #[tokio::test]
1199 async fn ipc_10_oversized_stash_message_rejected() {
1200 let msg = ChildMessage::StashPut {
1201 request_id: 100,
1202 key: "k".into(),
1203 value: serde_json::json!("x".repeat(2048)),
1204 ttl_secs: Some(60),
1205 group: None,
1206 };
1207 let mut buf = Vec::new();
1208 write_message(&mut buf, &msg).await.unwrap();
1209
1210 let mut cursor = Cursor::new(buf);
1212 let result: Result<Option<ChildMessage>, _> =
1213 read_message_with_limit(&mut cursor, 64).await;
1214 assert!(result.is_err());
1215 let err_msg = result.unwrap_err().to_string();
1216 assert!(
1217 err_msg.contains("too large"),
1218 "error should mention 'too large': {err_msg}"
1219 );
1220 }
1221
1222 #[tokio::test]
1224 async fn ipc_o01_error_kind_timeout_roundtrip() {
1225 let msg = ChildMessage::ExecutionComplete {
1226 result: Err("execution timed out after 500ms".into()),
1227 error_kind: Some(ErrorKind::Timeout),
1228 timeout_ms: Some(500),
1229 };
1230
1231 let mut buf = Vec::new();
1232 write_message(&mut buf, &msg).await.unwrap();
1233
1234 let mut cursor = Cursor::new(buf);
1235 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1236
1237 match decoded {
1238 ChildMessage::ExecutionComplete {
1239 result,
1240 error_kind,
1241 timeout_ms,
1242 } => {
1243 assert!(result.is_err());
1244 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1245 assert_eq!(timeout_ms, Some(500));
1246 }
1247 other => panic!("expected ExecutionComplete, got: {:?}", other),
1248 }
1249 }
1250
1251 #[tokio::test]
1253 async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1254 let msg = ChildMessage::ExecutionComplete {
1255 result: Err("V8 heap limit exceeded".into()),
1256 error_kind: Some(ErrorKind::HeapLimit),
1257 timeout_ms: None,
1258 };
1259
1260 let mut buf = Vec::new();
1261 write_message(&mut buf, &msg).await.unwrap();
1262
1263 let mut cursor = Cursor::new(buf);
1264 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1265
1266 match decoded {
1267 ChildMessage::ExecutionComplete {
1268 result, error_kind, ..
1269 } => {
1270 assert!(result.is_err());
1271 assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1272 }
1273 other => panic!("expected ExecutionComplete, got: {:?}", other),
1274 }
1275 }
1276
1277 #[tokio::test]
1279 async fn ipc_o03_error_kind_backward_compat() {
1280 let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1283 let mut buf = Vec::new();
1284 let payload = json.as_bytes();
1285 let len = payload.len() as u32;
1286 buf.extend_from_slice(&len.to_be_bytes());
1287 buf.extend_from_slice(payload);
1288
1289 let mut cursor = Cursor::new(buf);
1290 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1291
1292 match decoded {
1293 ChildMessage::ExecutionComplete {
1294 result,
1295 error_kind,
1296 timeout_ms,
1297 } => {
1298 assert!(result.is_err());
1299 assert_eq!(
1300 error_kind, None,
1301 "missing error_kind should default to None"
1302 );
1303 assert_eq!(
1304 timeout_ms, None,
1305 "missing timeout_ms should default to None"
1306 );
1307 }
1308 other => panic!("expected ExecutionComplete, got: {:?}", other),
1309 }
1310 }
1311
1312 #[tokio::test]
1314 async fn ipc_o04_error_kind_js_error_roundtrip() {
1315 let msg = ChildMessage::ExecutionComplete {
1316 result: Err("ReferenceError: x is not defined".into()),
1317 error_kind: Some(ErrorKind::JsError),
1318 timeout_ms: None,
1319 };
1320
1321 let mut buf = Vec::new();
1322 write_message(&mut buf, &msg).await.unwrap();
1323
1324 let mut cursor = Cursor::new(buf);
1325 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1326
1327 match decoded {
1328 ChildMessage::ExecutionComplete {
1329 result, error_kind, ..
1330 } => {
1331 assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1332 assert_eq!(error_kind, Some(ErrorKind::JsError));
1333 }
1334 other => panic!("expected ExecutionComplete, got: {:?}", other),
1335 }
1336 }
1337
1338 #[tokio::test]
1340 async fn ipc_o05_success_omits_error_kind() {
1341 let msg = ChildMessage::ExecutionComplete {
1342 result: Ok(serde_json::json!(42)),
1343 error_kind: None,
1344 timeout_ms: None,
1345 };
1346
1347 let json = serde_json::to_string(&msg).unwrap();
1348 assert!(
1350 !json.contains("error_kind"),
1351 "success messages should not contain error_kind field: {json}"
1352 );
1353 assert!(
1354 !json.contains("timeout_ms"),
1355 "success messages should not contain timeout_ms field: {json}"
1356 );
1357 }
1358
1359 #[tokio::test]
1362 async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1363 let msg = ChildMessage::StashPut {
1364 request_id: 50,
1365 key: "grouped-key".into(),
1366 value: serde_json::json!({"data": "secret"}),
1367 ttl_secs: Some(120),
1368 group: Some("analytics".into()),
1369 };
1370
1371 let mut buf = Vec::new();
1372 write_message(&mut buf, &msg).await.unwrap();
1373
1374 let mut cursor = Cursor::new(buf);
1375 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1376
1377 match decoded {
1378 ChildMessage::StashPut {
1379 request_id,
1380 key,
1381 group,
1382 ..
1383 } => {
1384 assert_eq!(request_id, 50);
1385 assert_eq!(key, "grouped-key");
1386 assert_eq!(group, Some("analytics".into()));
1387 }
1388 other => panic!("expected StashPut, got: {:?}", other),
1389 }
1390 }
1391
1392 #[tokio::test]
1393 async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1394 let msg = ChildMessage::StashGet {
1395 request_id: 51,
1396 key: "grouped-key".into(),
1397 group: Some("analytics".into()),
1398 };
1399
1400 let mut buf = Vec::new();
1401 write_message(&mut buf, &msg).await.unwrap();
1402
1403 let mut cursor = Cursor::new(buf);
1404 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1405
1406 match decoded {
1407 ChildMessage::StashGet {
1408 request_id,
1409 key,
1410 group,
1411 } => {
1412 assert_eq!(request_id, 51);
1413 assert_eq!(key, "grouped-key");
1414 assert_eq!(group, Some("analytics".into()));
1415 }
1416 other => panic!("expected StashGet, got: {:?}", other),
1417 }
1418 }
1419
1420 #[tokio::test]
1421 async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1422 let msg = ChildMessage::StashDelete {
1423 request_id: 52,
1424 key: "grouped-key".into(),
1425 group: Some("analytics".into()),
1426 };
1427
1428 let mut buf = Vec::new();
1429 write_message(&mut buf, &msg).await.unwrap();
1430
1431 let mut cursor = Cursor::new(buf);
1432 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1433
1434 match decoded {
1435 ChildMessage::StashDelete {
1436 request_id,
1437 key,
1438 group,
1439 } => {
1440 assert_eq!(request_id, 52);
1441 assert_eq!(key, "grouped-key");
1442 assert_eq!(group, Some("analytics".into()));
1443 }
1444 other => panic!("expected StashDelete, got: {:?}", other),
1445 }
1446 }
1447
1448 #[tokio::test]
1449 async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1450 let msg = ChildMessage::StashKeys {
1451 request_id: 53,
1452 group: Some("analytics".into()),
1453 };
1454
1455 let mut buf = Vec::new();
1456 write_message(&mut buf, &msg).await.unwrap();
1457
1458 let mut cursor = Cursor::new(buf);
1459 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1460
1461 match decoded {
1462 ChildMessage::StashKeys { request_id, group } => {
1463 assert_eq!(request_id, 53);
1464 assert_eq!(group, Some("analytics".into()));
1465 }
1466 other => panic!("expected StashKeys, got: {:?}", other),
1467 }
1468 }
1469
1470 #[tokio::test]
1471 async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1472 let msg = ChildMessage::StashPut {
1474 request_id: 54,
1475 key: "no-group-key".into(),
1476 value: serde_json::json!("val"),
1477 ttl_secs: None,
1478 group: None,
1479 };
1480
1481 let json = serde_json::to_string(&msg).unwrap();
1482 assert!(
1483 !json.contains("\"group\""),
1484 "group:None should be absent in serialized JSON: {json}"
1485 );
1486
1487 let mut buf = Vec::new();
1489 write_message(&mut buf, &msg).await.unwrap();
1490 let mut cursor = Cursor::new(buf);
1491 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1492 match decoded {
1493 ChildMessage::StashPut { group, .. } => {
1494 assert_eq!(group, None);
1495 }
1496 other => panic!("expected StashPut, got: {:?}", other),
1497 }
1498 }
1499
1500 #[tokio::test]
1501 async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1502 let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1504 let mut buf = Vec::new();
1505 let payload = json.as_bytes();
1506 let len = payload.len() as u32;
1507 buf.extend_from_slice(&len.to_be_bytes());
1508 buf.extend_from_slice(payload);
1509
1510 let mut cursor = Cursor::new(buf);
1511 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1512
1513 match decoded {
1514 ChildMessage::StashPut {
1515 request_id,
1516 key,
1517 group,
1518 ..
1519 } => {
1520 assert_eq!(request_id, 60);
1521 assert_eq!(key, "old-key");
1522 assert_eq!(
1523 group, None,
1524 "missing group field from v0.3.0 worker should deserialize as None"
1525 );
1526 }
1527 other => panic!("expected StashPut, got: {:?}", other),
1528 }
1529
1530 let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1532 let mut buf = Vec::new();
1533 let payload = json_get.as_bytes();
1534 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1535 buf.extend_from_slice(payload);
1536 let mut cursor = Cursor::new(buf);
1537 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1538 match decoded {
1539 ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1540 other => panic!("expected StashGet, got: {:?}", other),
1541 }
1542
1543 let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1544 let mut buf = Vec::new();
1545 let payload = json_del.as_bytes();
1546 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1547 buf.extend_from_slice(payload);
1548 let mut cursor = Cursor::new(buf);
1549 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1550 match decoded {
1551 ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1552 other => panic!("expected StashDelete, got: {:?}", other),
1553 }
1554
1555 let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1556 let mut buf = Vec::new();
1557 let payload = json_keys.as_bytes();
1558 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1559 buf.extend_from_slice(payload);
1560 let mut cursor = Cursor::new(buf);
1561 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1562 match decoded {
1563 ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1564 other => panic!("expected StashKeys, got: {:?}", other),
1565 }
1566 }
1567
1568 #[tokio::test]
1571 async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1572 let msg = ChildMessage::ExecutionComplete {
1573 result: Err("execution timed out after 5000ms".into()),
1574 error_kind: Some(ErrorKind::Timeout),
1575 timeout_ms: Some(5000),
1576 };
1577
1578 let mut buf = Vec::new();
1579 write_message(&mut buf, &msg).await.unwrap();
1580
1581 let mut cursor = Cursor::new(buf);
1582 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1583
1584 match decoded {
1585 ChildMessage::ExecutionComplete {
1586 result,
1587 error_kind,
1588 timeout_ms,
1589 } => {
1590 assert!(result.is_err());
1591 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1592 assert_eq!(timeout_ms, Some(5000));
1593 }
1594 other => panic!("expected ExecutionComplete, got: {:?}", other),
1595 }
1596 }
1597
1598 #[tokio::test]
1599 async fn ipc_t02_timeout_ms_absent_backward_compat() {
1600 let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1602 let mut buf = Vec::new();
1603 let payload = json.as_bytes();
1604 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1605 buf.extend_from_slice(payload);
1606
1607 let mut cursor = Cursor::new(buf);
1608 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1609
1610 match decoded {
1611 ChildMessage::ExecutionComplete {
1612 error_kind,
1613 timeout_ms,
1614 ..
1615 } => {
1616 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1617 assert_eq!(
1618 timeout_ms, None,
1619 "missing timeout_ms should default to None"
1620 );
1621 }
1622 other => panic!("expected ExecutionComplete, got: {:?}", other),
1623 }
1624 }
1625
1626 #[tokio::test]
1627 async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1628 let msg = ChildMessage::ExecutionComplete {
1629 result: Err("some error".into()),
1630 error_kind: Some(ErrorKind::JsError),
1631 timeout_ms: None,
1632 };
1633
1634 let json = serde_json::to_string(&msg).unwrap();
1635 assert!(
1636 !json.contains("timeout_ms"),
1637 "timeout_ms:None should be omitted: {json}"
1638 );
1639 }
1640
1641 #[tokio::test]
1642 async fn ipc_t04_timeout_ms_present_when_some() {
1643 let msg = ChildMessage::ExecutionComplete {
1644 result: Err("timed out".into()),
1645 error_kind: Some(ErrorKind::Timeout),
1646 timeout_ms: Some(10000),
1647 };
1648
1649 let json = serde_json::to_string(&msg).unwrap();
1650 assert!(
1651 json.contains("\"timeout_ms\":10000"),
1652 "timeout_ms should be present: {json}"
1653 );
1654 }
1655
1656 #[tokio::test]
1659 async fn ipc_rv01_write_raw_message_roundtrip() {
1660 let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1661
1662 let mut buf = Vec::new();
1663 write_raw_message(&mut buf, payload).await.unwrap();
1664
1665 let mut cursor = Cursor::new(buf);
1666 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1667 .await
1668 .unwrap()
1669 .unwrap();
1670
1671 assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1672 }
1673
1674 #[tokio::test]
1675 async fn ipc_rv02_read_raw_message_preserves_bytes() {
1676 let msg = ChildMessage::Log {
1678 message: "test raw".into(),
1679 };
1680 let mut buf = Vec::new();
1681 write_message(&mut buf, &msg).await.unwrap();
1682
1683 let mut cursor = Cursor::new(buf);
1684 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1685 .await
1686 .unwrap()
1687 .unwrap();
1688
1689 let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1691 assert!(matches!(parsed, ChildMessage::Log { .. }));
1692 }
1693
1694 #[tokio::test]
1695 async fn ipc_rv03_raw_message_size_limit_enforced() {
1696 let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1697 let mut buf = Vec::new();
1698 write_raw_message(&mut buf, large_payload.as_bytes())
1699 .await
1700 .unwrap();
1701
1702 let mut cursor = Cursor::new(buf);
1703 let result = read_raw_message(&mut cursor, 64).await;
1704 assert!(result.is_err());
1705 let err = result.unwrap_err().to_string();
1706 assert!(err.contains("too large"), "error: {err}");
1707 }
1708
1709 #[tokio::test]
1710 async fn ipc_rv04_large_payload_stays_raw() {
1711 let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1713 let mut buf = Vec::new();
1714 write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1715
1716 let mut cursor = Cursor::new(buf);
1717 let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1718 .await
1719 .unwrap()
1720 .unwrap();
1721
1722 assert!(raw.get().len() > 1_000_000);
1724 let val: Value = serde_json::from_str(raw.get()).unwrap();
1726 assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1727 }
1728
1729 #[tokio::test]
1730 async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1731 let msg = ParentMessage::ToolCallResult {
1733 request_id: 99,
1734 result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1735 };
1736 let mut buf = Vec::new();
1737 write_message(&mut buf, &msg).await.unwrap();
1738
1739 let mut cursor = Cursor::new(buf.clone());
1740 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1741 .await
1742 .unwrap()
1743 .unwrap();
1744
1745 let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1747 match parsed {
1748 ParentMessage::ToolCallResult { request_id, result } => {
1749 assert_eq!(request_id, 99);
1750 assert!(result.is_ok());
1751 }
1752 other => panic!("expected ToolCallResult, got: {:?}", other),
1753 }
1754 }
1755
1756 #[tokio::test]
1757 async fn ipc_rv06_raw_eof_returns_none() {
1758 let mut cursor = Cursor::new(Vec::<u8>::new());
1759 let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1760 .await
1761 .unwrap();
1762 assert!(result.is_none());
1763 }
1764
1765 #[tokio::test]
1766 async fn ipc_rv07_mixed_raw_and_value_messages() {
1767 let mut buf = Vec::new();
1768
1769 let msg1 = ChildMessage::Log {
1771 message: "first".into(),
1772 };
1773 write_message(&mut buf, &msg1).await.unwrap();
1774
1775 let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1777 write_raw_message(&mut buf, raw_payload).await.unwrap();
1778
1779 let mut cursor = Cursor::new(buf);
1781 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1782 assert!(matches!(d1, ChildMessage::Log { .. }));
1783
1784 let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1785 .await
1786 .unwrap()
1787 .unwrap();
1788 let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1789 assert!(matches!(parsed, ChildMessage::Log { .. }));
1790 }
1791
1792 #[tokio::test]
1795 async fn tool_error_round_trips_through_ipc() {
1796 let original = forge_error::DispatchError::ToolError {
1797 server: "arbiter".into(),
1798 tool: "scan_target".into(),
1799 message: "tool returned error: Invalid params: missing field 'base_url'".into(),
1800 };
1801
1802 let ipc_err = IpcDispatchError::from(&original);
1804 assert_eq!(ipc_err.code, "TOOL_ERROR");
1805 assert_eq!(ipc_err.server, Some("arbiter".into()));
1806 assert_eq!(ipc_err.tool, Some("scan_target".into()));
1807 assert!(ipc_err.message.contains("Invalid params"));
1808
1809 let json = serde_json::to_string(&ipc_err).unwrap();
1811 let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
1812
1813 let reconstructed = deserialized.to_dispatch_error();
1815 assert!(matches!(
1816 reconstructed,
1817 forge_error::DispatchError::ToolError {
1818 ref server,
1819 ref tool,
1820 ..
1821 } if server == "arbiter" && tool == "scan_target"
1822 ));
1823 assert!(!reconstructed.trips_circuit_breaker());
1824 assert_eq!(reconstructed.code(), "TOOL_ERROR");
1825 }
1826
1827 #[tokio::test]
1828 async fn tool_error_ipc_message_roundtrip() {
1829 let msg = ParentMessage::ToolCallResult {
1830 request_id: 42,
1831 result: Err(IpcDispatchError::from(
1832 &forge_error::DispatchError::ToolError {
1833 server: "arbiter".into(),
1834 tool: "scan".into(),
1835 message: "bad params".into(),
1836 },
1837 )),
1838 };
1839
1840 let mut buf = Vec::new();
1841 write_message(&mut buf, &msg).await.unwrap();
1842
1843 let mut cursor = Cursor::new(buf);
1844 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1845
1846 match decoded {
1847 ParentMessage::ToolCallResult { request_id, result } => {
1848 assert_eq!(request_id, 42);
1849 let err = result.unwrap_err();
1850 assert_eq!(err.code, "TOOL_ERROR");
1851 assert_eq!(err.server, Some("arbiter".into()));
1852 assert_eq!(err.tool, Some("scan".into()));
1853
1854 let dispatch_err = err.to_dispatch_error();
1856 assert!(!dispatch_err.trips_circuit_breaker());
1857 }
1858 other => panic!("expected ToolCallResult, got: {:?}", other),
1859 }
1860 }
1861}