1use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21 Timeout,
23 HeapLimit,
25 JsError,
27 Execution,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39 pub code: String,
41 pub message: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub server: Option<String>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub tool: Option<String>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55 pub fn from_string(msg: String) -> Self {
58 Self {
59 code: "INTERNAL".to_string(),
60 message: msg,
61 server: None,
62 tool: None,
63 timeout_ms: None,
64 }
65 }
66
67 pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69 match self.code.as_str() {
70 "SERVER_NOT_FOUND" => {
71 forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72 }
73 "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74 server: self.server.unwrap_or_default(),
75 tool: self.tool.unwrap_or_default(),
76 },
77 "TIMEOUT" => forge_error::DispatchError::Timeout {
78 server: self.server.unwrap_or_default(),
79 timeout_ms: self.timeout_ms.unwrap_or(0),
80 },
81 "CIRCUIT_OPEN" => {
82 forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83 }
84 "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85 reason: self.message,
86 },
87 "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88 server: self.server.unwrap_or_default(),
89 message: self.message,
90 },
91 "TRANSPORT_DEAD" => forge_error::DispatchError::TransportDead {
92 server: self.server.unwrap_or_default(),
93 reason: self.message,
94 },
95 "TOOL_ERROR" => forge_error::DispatchError::ToolError {
96 server: self.server.unwrap_or_default(),
97 tool: self.tool.unwrap_or_default(),
98 message: self.message,
99 },
100 "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
101 _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
102 }
103 }
104}
105
106impl From<&forge_error::DispatchError> for IpcDispatchError {
107 fn from(e: &forge_error::DispatchError) -> Self {
108 let (server, tool, timeout_ms) = match e {
109 forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
110 forge_error::DispatchError::ToolNotFound { server, tool } => {
111 (Some(server.clone()), Some(tool.clone()), None)
112 }
113 forge_error::DispatchError::Timeout {
114 server, timeout_ms, ..
115 } => (Some(server.clone()), None, Some(*timeout_ms)),
116 forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
117 forge_error::DispatchError::Upstream { server, .. } => {
118 (Some(server.clone()), None, None)
119 }
120 forge_error::DispatchError::TransportDead { server, .. } => {
121 (Some(server.clone()), None, None)
122 }
123 forge_error::DispatchError::ToolError { server, tool, .. } => {
124 (Some(server.clone()), Some(tool.clone()), None)
125 }
126 _ => (None, None, None),
127 };
128
129 Self {
130 code: e.code().to_string(),
131 message: e.to_string(),
132 server,
133 tool,
134 timeout_ms,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "type")]
142#[non_exhaustive]
143pub enum ParentMessage {
144 Execute {
146 code: String,
148 manifest: Option<Value>,
150 config: WorkerConfig,
152 },
153 ToolCallResult {
155 request_id: u64,
157 result: Result<Value, IpcDispatchError>,
159 },
160 ResourceReadResult {
162 request_id: u64,
164 result: Result<Value, IpcDispatchError>,
166 },
167 Reset {
172 config: WorkerConfig,
174 },
175 StashResult {
177 request_id: u64,
179 result: Result<Value, IpcDispatchError>,
181 },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186#[serde(tag = "type")]
187#[non_exhaustive]
188pub enum ChildMessage {
189 ToolCallRequest {
191 request_id: u64,
193 server: String,
195 tool: String,
197 args: Value,
199 },
200 ResourceReadRequest {
202 request_id: u64,
204 server: String,
206 uri: String,
208 },
209 StashPut {
211 request_id: u64,
213 key: String,
215 value: Value,
217 ttl_secs: Option<u32>,
219 #[serde(default, skip_serializing_if = "Option::is_none")]
221 group: Option<String>,
222 },
223 StashGet {
225 request_id: u64,
227 key: String,
229 #[serde(default, skip_serializing_if = "Option::is_none")]
231 group: Option<String>,
232 },
233 StashDelete {
235 request_id: u64,
237 key: String,
239 #[serde(default, skip_serializing_if = "Option::is_none")]
241 group: Option<String>,
242 },
243 StashKeys {
245 request_id: u64,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
249 group: Option<String>,
250 },
251 ResetComplete,
253 ExecutionComplete {
255 result: Result<Value, String>,
257 #[serde(default, skip_serializing_if = "Option::is_none")]
261 error_kind: Option<ErrorKind>,
262 #[serde(default, skip_serializing_if = "Option::is_none")]
266 timeout_ms: Option<u64>,
267 },
268 Log {
270 message: String,
272 },
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct WorkerConfig {
278 pub timeout_ms: u64,
280 pub max_heap_size: usize,
282 pub max_tool_calls: usize,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
286 pub max_stash_calls: Option<usize>,
287 pub max_tool_call_args_size: usize,
289 pub max_output_size: usize,
291 pub max_code_size: usize,
293 #[serde(default = "default_max_ipc_message_size")]
295 pub max_ipc_message_size: usize,
296 #[serde(default = "default_max_resource_size")]
298 pub max_resource_size: usize,
299 #[serde(default = "default_max_parallel")]
301 pub max_parallel: usize,
302 #[serde(default, skip_serializing_if = "Option::is_none")]
305 pub known_tools: Option<Vec<(String, String)>>,
306 #[serde(default, skip_serializing_if = "Option::is_none")]
308 pub known_servers: Option<std::collections::HashSet<String>>,
309}
310
311fn default_max_ipc_message_size() -> usize {
312 DEFAULT_MAX_IPC_MESSAGE_SIZE
313}
314
315fn default_max_resource_size() -> usize {
316 DEFAULT_MAX_RESOURCE_SIZE
317}
318
319fn default_max_parallel() -> usize {
320 8
321}
322
323impl From<&crate::SandboxConfig> for WorkerConfig {
324 fn from(config: &crate::SandboxConfig) -> Self {
325 Self {
326 timeout_ms: config.timeout.as_millis() as u64,
327 max_heap_size: config.max_heap_size,
328 max_tool_calls: config.max_tool_calls,
329 max_stash_calls: config.max_stash_calls,
330 max_tool_call_args_size: config.max_tool_call_args_size,
331 max_output_size: config.max_output_size,
332 max_code_size: config.max_code_size,
333 max_ipc_message_size: config.max_ipc_message_size,
334 max_resource_size: config.max_resource_size,
335 max_parallel: config.max_parallel,
336 known_tools: None,
337 known_servers: None,
338 }
339 }
340}
341
342impl WorkerConfig {
343 pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
345 crate::SandboxConfig {
346 timeout: Duration::from_millis(self.timeout_ms),
347 max_code_size: self.max_code_size,
348 max_output_size: self.max_output_size,
349 max_heap_size: self.max_heap_size,
350 max_concurrent: 1, max_tool_calls: self.max_tool_calls,
352 max_stash_calls: self.max_stash_calls,
353 max_tool_call_args_size: self.max_tool_call_args_size,
354 execution_mode: crate::executor::ExecutionMode::InProcess, max_resource_size: self.max_resource_size,
356 max_parallel: self.max_parallel,
357 max_ipc_message_size: self.max_ipc_message_size,
358 }
359 }
360}
361
362pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
366 writer: &mut W,
367 msg: &T,
368) -> Result<(), std::io::Error> {
369 let payload = serde_json::to_vec(msg)
370 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
371 let len = u32::try_from(payload.len()).map_err(|_| {
372 std::io::Error::new(
373 std::io::ErrorKind::InvalidData,
374 format!(
375 "IPC payload too large: {} bytes (max {} bytes)",
376 payload.len(),
377 u32::MAX
378 ),
379 )
380 })?;
381 writer.write_all(&len.to_be_bytes()).await?;
382 writer.write_all(&payload).await?;
383 writer.flush().await?;
384 Ok(())
385}
386
387pub async fn write_raw_message<W: AsyncWrite + Unpin>(
392 writer: &mut W,
393 payload: &[u8],
394) -> Result<(), std::io::Error> {
395 let len = u32::try_from(payload.len()).map_err(|_| {
396 std::io::Error::new(
397 std::io::ErrorKind::InvalidData,
398 format!(
399 "raw IPC payload too large: {} bytes (max {} bytes)",
400 payload.len(),
401 u32::MAX
402 ),
403 )
404 })?;
405 writer.write_all(&len.to_be_bytes()).await?;
406 writer.write_all(payload).await?;
407 writer.flush().await?;
408 Ok(())
409}
410
411pub async fn read_raw_message<R: AsyncRead + Unpin>(
416 reader: &mut R,
417 max_size: usize,
418) -> Result<Option<Box<RawValue>>, std::io::Error> {
419 let mut len_buf = [0u8; 4];
420 match reader.read_exact(&mut len_buf).await {
421 Ok(_) => {}
422 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
423 Err(e) => return Err(e),
424 }
425
426 let len = u32::from_be_bytes(len_buf) as usize;
427
428 if len > max_size {
429 return Err(std::io::Error::new(
430 std::io::ErrorKind::InvalidData,
431 format!(
432 "raw IPC message too large: {} bytes (limit: {} bytes)",
433 len, max_size
434 ),
435 ));
436 }
437
438 let mut payload = vec![0u8; len];
439 reader.read_exact(&mut payload).await?;
440
441 let raw: Box<RawValue> = serde_json::from_slice(&payload)
442 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
443 Ok(Some(raw))
444}
445
446pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 65 * 1024 * 1024;
453
454pub const DEFAULT_MAX_RESOURCE_SIZE: usize = 64 * 1024 * 1024;
461
462pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
467 reader: &mut R,
468) -> Result<Option<T>, std::io::Error> {
469 read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
470}
471
472pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
477 reader: &mut R,
478 max_size: usize,
479) -> Result<Option<T>, std::io::Error> {
480 let mut len_buf = [0u8; 4];
481 match reader.read_exact(&mut len_buf).await {
482 Ok(_) => {}
483 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
484 Err(e) => return Err(e),
485 }
486
487 let len = u32::from_be_bytes(len_buf) as usize;
488
489 if len > max_size {
491 return Err(std::io::Error::new(
492 std::io::ErrorKind::InvalidData,
493 format!(
494 "IPC message too large: {} bytes (limit: {} bytes)",
495 len, max_size
496 ),
497 ));
498 }
499
500 let mut payload = vec![0u8; len];
501 reader.read_exact(&mut payload).await?;
502
503 let msg: T = serde_json::from_slice(&payload)
504 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
505 Ok(Some(msg))
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511 use std::io::Cursor;
512
513 #[tokio::test]
514 async fn roundtrip_parent_execute_message() {
515 let msg = ParentMessage::Execute {
516 code: "async () => { return 42; }".into(),
517 manifest: Some(serde_json::json!({"servers": []})),
518 config: WorkerConfig {
519 timeout_ms: 5000,
520 max_heap_size: 64 * 1024 * 1024,
521 max_tool_calls: 50,
522 max_stash_calls: None,
523 max_tool_call_args_size: 1024 * 1024,
524 max_output_size: 1024 * 1024,
525 max_code_size: 64 * 1024,
526 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
527 max_resource_size: 64 * 1024 * 1024,
528 max_parallel: 8,
529 known_tools: None,
530 known_servers: None,
531 },
532 };
533
534 let mut buf = Vec::new();
535 write_message(&mut buf, &msg).await.unwrap();
536
537 let mut cursor = Cursor::new(buf);
538 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
539
540 match decoded {
541 ParentMessage::Execute {
542 code,
543 manifest,
544 config,
545 } => {
546 assert_eq!(code, "async () => { return 42; }");
547 assert!(manifest.is_some());
548 assert_eq!(config.timeout_ms, 5000);
549 }
550 other => panic!("expected Execute, got: {:?}", other),
551 }
552 }
553
554 #[tokio::test]
555 async fn roundtrip_parent_tool_result() {
556 let msg = ParentMessage::ToolCallResult {
557 request_id: 42,
558 result: Ok(serde_json::json!({"status": "ok"})),
559 };
560
561 let mut buf = Vec::new();
562 write_message(&mut buf, &msg).await.unwrap();
563
564 let mut cursor = Cursor::new(buf);
565 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
566
567 match decoded {
568 ParentMessage::ToolCallResult { request_id, result } => {
569 assert_eq!(request_id, 42);
570 assert!(result.is_ok());
571 }
572 other => panic!("expected ToolCallResult, got: {:?}", other),
573 }
574 }
575
576 #[tokio::test]
577 async fn roundtrip_parent_tool_result_error() {
578 let msg = ParentMessage::ToolCallResult {
579 request_id: 7,
580 result: Err(IpcDispatchError::from_string("connection refused".into())),
581 };
582
583 let mut buf = Vec::new();
584 write_message(&mut buf, &msg).await.unwrap();
585
586 let mut cursor = Cursor::new(buf);
587 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
588
589 match decoded {
590 ParentMessage::ToolCallResult { request_id, result } => {
591 assert_eq!(request_id, 7);
592 let err = result.unwrap_err();
593 assert_eq!(err.message, "connection refused");
594 assert_eq!(err.code, "INTERNAL");
595 }
596 other => panic!("expected ToolCallResult, got: {:?}", other),
597 }
598 }
599
600 #[tokio::test]
601 async fn roundtrip_child_tool_request() {
602 let msg = ChildMessage::ToolCallRequest {
603 request_id: 1,
604 server: "narsil".into(),
605 tool: "ast.parse".into(),
606 args: serde_json::json!({"file": "test.rs"}),
607 };
608
609 let mut buf = Vec::new();
610 write_message(&mut buf, &msg).await.unwrap();
611
612 let mut cursor = Cursor::new(buf);
613 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
614
615 match decoded {
616 ChildMessage::ToolCallRequest {
617 request_id,
618 server,
619 tool,
620 args,
621 } => {
622 assert_eq!(request_id, 1);
623 assert_eq!(server, "narsil");
624 assert_eq!(tool, "ast.parse");
625 assert_eq!(args["file"], "test.rs");
626 }
627 other => panic!("expected ToolCallRequest, got: {:?}", other),
628 }
629 }
630
631 #[tokio::test]
632 async fn roundtrip_child_execution_complete() {
633 let msg = ChildMessage::ExecutionComplete {
634 result: Ok(serde_json::json!([1, 2, 3])),
635 error_kind: None,
636 timeout_ms: None,
637 };
638
639 let mut buf = Vec::new();
640 write_message(&mut buf, &msg).await.unwrap();
641
642 let mut cursor = Cursor::new(buf);
643 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
644
645 match decoded {
646 ChildMessage::ExecutionComplete {
647 result, error_kind, ..
648 } => {
649 assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
650 assert_eq!(error_kind, None);
651 }
652 other => panic!("expected ExecutionComplete, got: {:?}", other),
653 }
654 }
655
656 #[tokio::test]
657 async fn roundtrip_child_log() {
658 let msg = ChildMessage::Log {
659 message: "processing step 3".into(),
660 };
661
662 let mut buf = Vec::new();
663 write_message(&mut buf, &msg).await.unwrap();
664
665 let mut cursor = Cursor::new(buf);
666 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
667
668 match decoded {
669 ChildMessage::Log { message } => {
670 assert_eq!(message, "processing step 3");
671 }
672 other => panic!("expected Log, got: {:?}", other),
673 }
674 }
675
676 #[tokio::test]
677 async fn multiple_messages_in_stream() {
678 let msg1 = ChildMessage::Log {
679 message: "first".into(),
680 };
681 let msg2 = ChildMessage::ToolCallRequest {
682 request_id: 1,
683 server: "s".into(),
684 tool: "t".into(),
685 args: serde_json::json!({}),
686 };
687 let msg3 = ChildMessage::ExecutionComplete {
688 result: Ok(serde_json::json!("done")),
689 error_kind: None,
690 timeout_ms: None,
691 };
692
693 let mut buf = Vec::new();
694 write_message(&mut buf, &msg1).await.unwrap();
695 write_message(&mut buf, &msg2).await.unwrap();
696 write_message(&mut buf, &msg3).await.unwrap();
697
698 let mut cursor = Cursor::new(buf);
699 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
700 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
701 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
702
703 assert!(matches!(d1, ChildMessage::Log { .. }));
704 assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
705 assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
706
707 let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
709 assert!(d4.is_none());
710 }
711
712 #[tokio::test]
713 async fn execution_complete_error_roundtrip() {
714 let msg = ChildMessage::ExecutionComplete {
715 result: Err("failed to create tokio runtime: resource unavailable".into()),
716 error_kind: Some(ErrorKind::Execution),
717 timeout_ms: None,
718 };
719
720 let mut buf = Vec::new();
721 write_message(&mut buf, &msg).await.unwrap();
722
723 let mut cursor = Cursor::new(buf);
724 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
725
726 match decoded {
727 ChildMessage::ExecutionComplete {
728 result, error_kind, ..
729 } => {
730 let err = result.unwrap_err();
731 assert!(
732 err.contains("tokio runtime"),
733 "expected runtime error: {err}"
734 );
735 assert_eq!(error_kind, Some(ErrorKind::Execution));
736 }
737 other => panic!("expected ExecutionComplete, got: {:?}", other),
738 }
739 }
740
741 #[tokio::test]
742 async fn eof_returns_none() {
743 let mut cursor = Cursor::new(Vec::<u8>::new());
744 let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
745 assert!(result.is_none());
746 }
747
748 #[test]
749 fn u32_try_from_overflow() {
750 let overflow_size = u32::MAX as usize + 1;
752 assert!(u32::try_from(overflow_size).is_err());
753 }
754
755 #[tokio::test]
756 async fn write_message_normal_size_succeeds() {
757 let msg = ChildMessage::Log {
759 message: "a".repeat(1024),
760 };
761 let mut buf = Vec::new();
762 write_message(&mut buf, &msg).await.unwrap();
763 assert!(buf.len() > 1024);
764 }
765
766 #[tokio::test]
767 async fn large_message_roundtrip() {
768 let large_data = "x".repeat(1_000_000);
770 let msg = ChildMessage::ExecutionComplete {
771 result: Ok(serde_json::json!(large_data)),
772 error_kind: None,
773 timeout_ms: None,
774 };
775
776 let mut buf = Vec::new();
777 write_message(&mut buf, &msg).await.unwrap();
778
779 let mut cursor = Cursor::new(buf);
780 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
781
782 match decoded {
783 ChildMessage::ExecutionComplete { result, .. } => {
784 assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
785 }
786 other => panic!("expected ExecutionComplete, got: {:?}", other),
787 }
788 }
789
790 #[tokio::test]
791 async fn worker_config_roundtrip_from_sandbox_config() {
792 let sandbox = crate::SandboxConfig::default();
793 let worker = WorkerConfig::from(&sandbox);
794 let back = worker.to_sandbox_config();
795
796 assert_eq!(sandbox.timeout, back.timeout);
797 assert_eq!(sandbox.max_heap_size, back.max_heap_size);
798 assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
799 assert_eq!(sandbox.max_output_size, back.max_output_size);
800 assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
801 assert_eq!(worker.max_ipc_message_size, 65 * 1024 * 1024); }
803
804 #[tokio::test]
805 async fn read_message_with_limit_rejects_oversized() {
806 let msg = ChildMessage::Log {
807 message: "x".repeat(1024),
808 };
809 let mut buf = Vec::new();
810 write_message(&mut buf, &msg).await.unwrap();
811
812 let mut cursor = Cursor::new(buf);
814 let result: Result<Option<ChildMessage>, _> =
815 read_message_with_limit(&mut cursor, 64).await;
816 assert!(result.is_err());
817 let err_msg = result.unwrap_err().to_string();
818 assert!(err_msg.contains("too large"), "error: {err_msg}");
819 }
820
821 #[tokio::test]
822 async fn read_message_with_limit_accepts_within_limit() {
823 let msg = ChildMessage::Log {
824 message: "hello".into(),
825 };
826 let mut buf = Vec::new();
827 write_message(&mut buf, &msg).await.unwrap();
828
829 let mut cursor = Cursor::new(buf);
830 let result: Option<ChildMessage> =
831 read_message_with_limit(&mut cursor, 1024).await.unwrap();
832 assert!(result.is_some());
833 }
834
835 #[tokio::test]
836 async fn worker_config_ipc_limit_serde_default() {
837 let json = r#"{
839 "timeout_ms": 5000,
840 "max_heap_size": 67108864,
841 "max_tool_calls": 50,
842 "max_tool_call_args_size": 1048576,
843 "max_output_size": 1048576,
844 "max_code_size": 65536
845 }"#;
846 let config: WorkerConfig = serde_json::from_str(json).unwrap();
847 assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
848 }
849
850 #[tokio::test]
852 async fn ipc_01_resource_read_request_roundtrip() {
853 let msg = ChildMessage::ResourceReadRequest {
854 request_id: 10,
855 server: "postgres".into(),
856 uri: "file:///logs/app.log".into(),
857 };
858
859 let mut buf = Vec::new();
860 write_message(&mut buf, &msg).await.unwrap();
861
862 let mut cursor = Cursor::new(buf);
863 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
864
865 match decoded {
866 ChildMessage::ResourceReadRequest {
867 request_id,
868 server,
869 uri,
870 } => {
871 assert_eq!(request_id, 10);
872 assert_eq!(server, "postgres");
873 assert_eq!(uri, "file:///logs/app.log");
874 }
875 other => panic!("expected ResourceReadRequest, got: {:?}", other),
876 }
877 }
878
879 #[tokio::test]
881 async fn ipc_02_resource_read_result_success_roundtrip() {
882 let msg = ParentMessage::ResourceReadResult {
883 request_id: 11,
884 result: Ok(serde_json::json!({"content": "log data here"})),
885 };
886
887 let mut buf = Vec::new();
888 write_message(&mut buf, &msg).await.unwrap();
889
890 let mut cursor = Cursor::new(buf);
891 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
892
893 match decoded {
894 ParentMessage::ResourceReadResult { request_id, result } => {
895 assert_eq!(request_id, 11);
896 let val = result.unwrap();
897 assert_eq!(val["content"], "log data here");
898 }
899 other => panic!("expected ResourceReadResult, got: {:?}", other),
900 }
901 }
902
903 #[tokio::test]
905 async fn ipc_03_resource_read_result_error_roundtrip() {
906 let msg = ParentMessage::ResourceReadResult {
907 request_id: 12,
908 result: Err(IpcDispatchError::from_string("resource not found".into())),
909 };
910
911 let mut buf = Vec::new();
912 write_message(&mut buf, &msg).await.unwrap();
913
914 let mut cursor = Cursor::new(buf);
915 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
916
917 match decoded {
918 ParentMessage::ResourceReadResult { request_id, result } => {
919 assert_eq!(request_id, 12);
920 let err = result.unwrap_err();
921 assert_eq!(err.message, "resource not found");
922 assert_eq!(err.code, "INTERNAL");
923 }
924 other => panic!("expected ResourceReadResult, got: {:?}", other),
925 }
926 }
927
928 #[tokio::test]
930 async fn ipc_04_stash_put_roundtrip() {
931 let msg = ChildMessage::StashPut {
932 request_id: 20,
933 key: "my-key".into(),
934 value: serde_json::json!({"data": 42}),
935 ttl_secs: Some(60),
936 group: None,
937 };
938
939 let mut buf = Vec::new();
940 write_message(&mut buf, &msg).await.unwrap();
941
942 let mut cursor = Cursor::new(buf);
943 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
944
945 match decoded {
946 ChildMessage::StashPut {
947 request_id,
948 key,
949 value,
950 ttl_secs,
951 group,
952 } => {
953 assert_eq!(request_id, 20);
954 assert_eq!(key, "my-key");
955 assert_eq!(value["data"], 42);
956 assert_eq!(ttl_secs, Some(60));
957 assert_eq!(group, None);
958 }
959 other => panic!("expected StashPut, got: {:?}", other),
960 }
961 }
962
963 #[tokio::test]
965 async fn ipc_05_stash_get_roundtrip() {
966 let msg = ChildMessage::StashGet {
967 request_id: 21,
968 key: "lookup-key".into(),
969 group: None,
970 };
971
972 let mut buf = Vec::new();
973 write_message(&mut buf, &msg).await.unwrap();
974
975 let mut cursor = Cursor::new(buf);
976 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
977
978 match decoded {
979 ChildMessage::StashGet {
980 request_id,
981 key,
982 group,
983 } => {
984 assert_eq!(request_id, 21);
985 assert_eq!(key, "lookup-key");
986 assert_eq!(group, None);
987 }
988 other => panic!("expected StashGet, got: {:?}", other),
989 }
990 }
991
992 #[tokio::test]
994 async fn ipc_06_stash_delete_roundtrip() {
995 let msg = ChildMessage::StashDelete {
996 request_id: 22,
997 key: "delete-me".into(),
998 group: None,
999 };
1000
1001 let mut buf = Vec::new();
1002 write_message(&mut buf, &msg).await.unwrap();
1003
1004 let mut cursor = Cursor::new(buf);
1005 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1006
1007 match decoded {
1008 ChildMessage::StashDelete {
1009 request_id,
1010 key,
1011 group,
1012 } => {
1013 assert_eq!(request_id, 22);
1014 assert_eq!(key, "delete-me");
1015 assert_eq!(group, None);
1016 }
1017 other => panic!("expected StashDelete, got: {:?}", other),
1018 }
1019 }
1020
1021 #[tokio::test]
1023 async fn ipc_07_stash_keys_roundtrip() {
1024 let msg = ChildMessage::StashKeys {
1025 request_id: 23,
1026 group: None,
1027 };
1028
1029 let mut buf = Vec::new();
1030 write_message(&mut buf, &msg).await.unwrap();
1031
1032 let mut cursor = Cursor::new(buf);
1033 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1034
1035 match decoded {
1036 ChildMessage::StashKeys { request_id, group } => {
1037 assert_eq!(request_id, 23);
1038 assert_eq!(group, None);
1039 }
1040 other => panic!("expected StashKeys, got: {:?}", other),
1041 }
1042 }
1043
1044 #[tokio::test]
1046 async fn ipc_08_stash_result_roundtrip() {
1047 let msg = ParentMessage::StashResult {
1048 request_id: 24,
1049 result: Ok(serde_json::json!({"ok": true})),
1050 };
1051
1052 let mut buf = Vec::new();
1053 write_message(&mut buf, &msg).await.unwrap();
1054
1055 let mut cursor = Cursor::new(buf);
1056 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1057
1058 match decoded {
1059 ParentMessage::StashResult { request_id, result } => {
1060 assert_eq!(request_id, 24);
1061 assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1062 }
1063 other => panic!("expected StashResult, got: {:?}", other),
1064 }
1065 }
1066
1067 #[tokio::test]
1069 async fn ipc_09_mixed_message_interleaving() {
1070 let msg1 = ChildMessage::ToolCallRequest {
1071 request_id: 1,
1072 server: "s".into(),
1073 tool: "t".into(),
1074 args: serde_json::json!({}),
1075 };
1076 let msg2 = ChildMessage::ResourceReadRequest {
1077 request_id: 2,
1078 server: "pg".into(),
1079 uri: "file:///data".into(),
1080 };
1081 let msg3 = ChildMessage::StashPut {
1082 request_id: 3,
1083 key: "k".into(),
1084 value: serde_json::json!("v"),
1085 ttl_secs: None,
1086 group: None,
1087 };
1088 let msg4 = ChildMessage::StashGet {
1089 request_id: 4,
1090 key: "k".into(),
1091 group: None,
1092 };
1093 let msg5 = ChildMessage::ExecutionComplete {
1094 result: Ok(serde_json::json!("done")),
1095 error_kind: None,
1096 timeout_ms: None,
1097 };
1098
1099 let mut buf = Vec::new();
1100 write_message(&mut buf, &msg1).await.unwrap();
1101 write_message(&mut buf, &msg2).await.unwrap();
1102 write_message(&mut buf, &msg3).await.unwrap();
1103 write_message(&mut buf, &msg4).await.unwrap();
1104 write_message(&mut buf, &msg5).await.unwrap();
1105
1106 let mut cursor = Cursor::new(buf);
1107 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1108 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1109 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1110 let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1111 let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1112
1113 assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1114 assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1115 assert!(matches!(d3, ChildMessage::StashPut { .. }));
1116 assert!(matches!(d4, ChildMessage::StashGet { .. }));
1117 assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1118
1119 let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1121 assert!(d6.is_none());
1122 }
1123
1124 #[tokio::test]
1126 async fn ipc_p01_reset_roundtrip() {
1127 let msg = ParentMessage::Reset {
1128 config: WorkerConfig {
1129 timeout_ms: 3000,
1130 max_heap_size: 32 * 1024 * 1024,
1131 max_tool_calls: 25,
1132 max_stash_calls: None,
1133 max_tool_call_args_size: 512 * 1024,
1134 max_output_size: 512 * 1024,
1135 max_code_size: 32 * 1024,
1136 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1137 max_resource_size: 32 * 1024 * 1024,
1138 max_parallel: 4,
1139 known_tools: None,
1140 known_servers: None,
1141 },
1142 };
1143
1144 let mut buf = Vec::new();
1145 write_message(&mut buf, &msg).await.unwrap();
1146
1147 let mut cursor = Cursor::new(buf);
1148 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1149
1150 match decoded {
1151 ParentMessage::Reset { config } => {
1152 assert_eq!(config.timeout_ms, 3000);
1153 assert_eq!(config.max_tool_calls, 25);
1154 }
1155 other => panic!("expected Reset, got: {:?}", other),
1156 }
1157 }
1158
1159 #[tokio::test]
1161 async fn ipc_p02_reset_complete_roundtrip() {
1162 let msg = ChildMessage::ResetComplete;
1163
1164 let mut buf = Vec::new();
1165 write_message(&mut buf, &msg).await.unwrap();
1166
1167 let mut cursor = Cursor::new(buf);
1168 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1169
1170 assert!(matches!(decoded, ChildMessage::ResetComplete));
1171 }
1172
1173 #[tokio::test]
1175 async fn ipc_p03_reset_execute_interleaving() {
1176 let reset = ParentMessage::Reset {
1177 config: WorkerConfig {
1178 timeout_ms: 5000,
1179 max_heap_size: 64 * 1024 * 1024,
1180 max_tool_calls: 50,
1181 max_stash_calls: None,
1182 max_tool_call_args_size: 1024 * 1024,
1183 max_output_size: 1024 * 1024,
1184 max_code_size: 64 * 1024,
1185 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1186 max_resource_size: 64 * 1024 * 1024,
1187 max_parallel: 8,
1188 known_tools: None,
1189 known_servers: None,
1190 },
1191 };
1192 let execute = ParentMessage::Execute {
1193 code: "async () => 42".into(),
1194 manifest: None,
1195 config: WorkerConfig {
1196 timeout_ms: 5000,
1197 max_heap_size: 64 * 1024 * 1024,
1198 max_tool_calls: 50,
1199 max_stash_calls: None,
1200 max_tool_call_args_size: 1024 * 1024,
1201 max_output_size: 1024 * 1024,
1202 max_code_size: 64 * 1024,
1203 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1204 max_resource_size: 64 * 1024 * 1024,
1205 max_parallel: 8,
1206 known_tools: None,
1207 known_servers: None,
1208 },
1209 };
1210
1211 let mut buf = Vec::new();
1212 write_message(&mut buf, &reset).await.unwrap();
1213 write_message(&mut buf, &execute).await.unwrap();
1214
1215 let mut cursor = Cursor::new(buf);
1216 let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1217 let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1218
1219 assert!(matches!(d1, ParentMessage::Reset { .. }));
1220 assert!(matches!(d2, ParentMessage::Execute { .. }));
1221 }
1222
1223 #[tokio::test]
1225 async fn ipc_10_oversized_stash_message_rejected() {
1226 let msg = ChildMessage::StashPut {
1227 request_id: 100,
1228 key: "k".into(),
1229 value: serde_json::json!("x".repeat(2048)),
1230 ttl_secs: Some(60),
1231 group: None,
1232 };
1233 let mut buf = Vec::new();
1234 write_message(&mut buf, &msg).await.unwrap();
1235
1236 let mut cursor = Cursor::new(buf);
1238 let result: Result<Option<ChildMessage>, _> =
1239 read_message_with_limit(&mut cursor, 64).await;
1240 assert!(result.is_err());
1241 let err_msg = result.unwrap_err().to_string();
1242 assert!(
1243 err_msg.contains("too large"),
1244 "error should mention 'too large': {err_msg}"
1245 );
1246 }
1247
1248 #[tokio::test]
1250 async fn ipc_o01_error_kind_timeout_roundtrip() {
1251 let msg = ChildMessage::ExecutionComplete {
1252 result: Err("execution timed out after 500ms".into()),
1253 error_kind: Some(ErrorKind::Timeout),
1254 timeout_ms: Some(500),
1255 };
1256
1257 let mut buf = Vec::new();
1258 write_message(&mut buf, &msg).await.unwrap();
1259
1260 let mut cursor = Cursor::new(buf);
1261 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1262
1263 match decoded {
1264 ChildMessage::ExecutionComplete {
1265 result,
1266 error_kind,
1267 timeout_ms,
1268 } => {
1269 assert!(result.is_err());
1270 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1271 assert_eq!(timeout_ms, Some(500));
1272 }
1273 other => panic!("expected ExecutionComplete, got: {:?}", other),
1274 }
1275 }
1276
1277 #[tokio::test]
1279 async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1280 let msg = ChildMessage::ExecutionComplete {
1281 result: Err("V8 heap limit exceeded".into()),
1282 error_kind: Some(ErrorKind::HeapLimit),
1283 timeout_ms: None,
1284 };
1285
1286 let mut buf = Vec::new();
1287 write_message(&mut buf, &msg).await.unwrap();
1288
1289 let mut cursor = Cursor::new(buf);
1290 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1291
1292 match decoded {
1293 ChildMessage::ExecutionComplete {
1294 result, error_kind, ..
1295 } => {
1296 assert!(result.is_err());
1297 assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1298 }
1299 other => panic!("expected ExecutionComplete, got: {:?}", other),
1300 }
1301 }
1302
1303 #[tokio::test]
1305 async fn ipc_o03_error_kind_backward_compat() {
1306 let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1309 let mut buf = Vec::new();
1310 let payload = json.as_bytes();
1311 let len = payload.len() as u32;
1312 buf.extend_from_slice(&len.to_be_bytes());
1313 buf.extend_from_slice(payload);
1314
1315 let mut cursor = Cursor::new(buf);
1316 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1317
1318 match decoded {
1319 ChildMessage::ExecutionComplete {
1320 result,
1321 error_kind,
1322 timeout_ms,
1323 } => {
1324 assert!(result.is_err());
1325 assert_eq!(
1326 error_kind, None,
1327 "missing error_kind should default to None"
1328 );
1329 assert_eq!(
1330 timeout_ms, None,
1331 "missing timeout_ms should default to None"
1332 );
1333 }
1334 other => panic!("expected ExecutionComplete, got: {:?}", other),
1335 }
1336 }
1337
1338 #[tokio::test]
1340 async fn ipc_o04_error_kind_js_error_roundtrip() {
1341 let msg = ChildMessage::ExecutionComplete {
1342 result: Err("ReferenceError: x is not defined".into()),
1343 error_kind: Some(ErrorKind::JsError),
1344 timeout_ms: None,
1345 };
1346
1347 let mut buf = Vec::new();
1348 write_message(&mut buf, &msg).await.unwrap();
1349
1350 let mut cursor = Cursor::new(buf);
1351 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1352
1353 match decoded {
1354 ChildMessage::ExecutionComplete {
1355 result, error_kind, ..
1356 } => {
1357 assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1358 assert_eq!(error_kind, Some(ErrorKind::JsError));
1359 }
1360 other => panic!("expected ExecutionComplete, got: {:?}", other),
1361 }
1362 }
1363
1364 #[tokio::test]
1366 async fn ipc_o05_success_omits_error_kind() {
1367 let msg = ChildMessage::ExecutionComplete {
1368 result: Ok(serde_json::json!(42)),
1369 error_kind: None,
1370 timeout_ms: None,
1371 };
1372
1373 let json = serde_json::to_string(&msg).unwrap();
1374 assert!(
1376 !json.contains("error_kind"),
1377 "success messages should not contain error_kind field: {json}"
1378 );
1379 assert!(
1380 !json.contains("timeout_ms"),
1381 "success messages should not contain timeout_ms field: {json}"
1382 );
1383 }
1384
1385 #[tokio::test]
1388 async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1389 let msg = ChildMessage::StashPut {
1390 request_id: 50,
1391 key: "grouped-key".into(),
1392 value: serde_json::json!({"data": "secret"}),
1393 ttl_secs: Some(120),
1394 group: Some("analytics".into()),
1395 };
1396
1397 let mut buf = Vec::new();
1398 write_message(&mut buf, &msg).await.unwrap();
1399
1400 let mut cursor = Cursor::new(buf);
1401 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1402
1403 match decoded {
1404 ChildMessage::StashPut {
1405 request_id,
1406 key,
1407 group,
1408 ..
1409 } => {
1410 assert_eq!(request_id, 50);
1411 assert_eq!(key, "grouped-key");
1412 assert_eq!(group, Some("analytics".into()));
1413 }
1414 other => panic!("expected StashPut, got: {:?}", other),
1415 }
1416 }
1417
1418 #[tokio::test]
1419 async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1420 let msg = ChildMessage::StashGet {
1421 request_id: 51,
1422 key: "grouped-key".into(),
1423 group: Some("analytics".into()),
1424 };
1425
1426 let mut buf = Vec::new();
1427 write_message(&mut buf, &msg).await.unwrap();
1428
1429 let mut cursor = Cursor::new(buf);
1430 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1431
1432 match decoded {
1433 ChildMessage::StashGet {
1434 request_id,
1435 key,
1436 group,
1437 } => {
1438 assert_eq!(request_id, 51);
1439 assert_eq!(key, "grouped-key");
1440 assert_eq!(group, Some("analytics".into()));
1441 }
1442 other => panic!("expected StashGet, got: {:?}", other),
1443 }
1444 }
1445
1446 #[tokio::test]
1447 async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1448 let msg = ChildMessage::StashDelete {
1449 request_id: 52,
1450 key: "grouped-key".into(),
1451 group: Some("analytics".into()),
1452 };
1453
1454 let mut buf = Vec::new();
1455 write_message(&mut buf, &msg).await.unwrap();
1456
1457 let mut cursor = Cursor::new(buf);
1458 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1459
1460 match decoded {
1461 ChildMessage::StashDelete {
1462 request_id,
1463 key,
1464 group,
1465 } => {
1466 assert_eq!(request_id, 52);
1467 assert_eq!(key, "grouped-key");
1468 assert_eq!(group, Some("analytics".into()));
1469 }
1470 other => panic!("expected StashDelete, got: {:?}", other),
1471 }
1472 }
1473
1474 #[tokio::test]
1475 async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1476 let msg = ChildMessage::StashKeys {
1477 request_id: 53,
1478 group: Some("analytics".into()),
1479 };
1480
1481 let mut buf = Vec::new();
1482 write_message(&mut buf, &msg).await.unwrap();
1483
1484 let mut cursor = Cursor::new(buf);
1485 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1486
1487 match decoded {
1488 ChildMessage::StashKeys { request_id, group } => {
1489 assert_eq!(request_id, 53);
1490 assert_eq!(group, Some("analytics".into()));
1491 }
1492 other => panic!("expected StashKeys, got: {:?}", other),
1493 }
1494 }
1495
1496 #[tokio::test]
1497 async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1498 let msg = ChildMessage::StashPut {
1500 request_id: 54,
1501 key: "no-group-key".into(),
1502 value: serde_json::json!("val"),
1503 ttl_secs: None,
1504 group: None,
1505 };
1506
1507 let json = serde_json::to_string(&msg).unwrap();
1508 assert!(
1509 !json.contains("\"group\""),
1510 "group:None should be absent in serialized JSON: {json}"
1511 );
1512
1513 let mut buf = Vec::new();
1515 write_message(&mut buf, &msg).await.unwrap();
1516 let mut cursor = Cursor::new(buf);
1517 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1518 match decoded {
1519 ChildMessage::StashPut { group, .. } => {
1520 assert_eq!(group, None);
1521 }
1522 other => panic!("expected StashPut, got: {:?}", other),
1523 }
1524 }
1525
1526 #[tokio::test]
1527 async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1528 let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1530 let mut buf = Vec::new();
1531 let payload = json.as_bytes();
1532 let len = payload.len() as u32;
1533 buf.extend_from_slice(&len.to_be_bytes());
1534 buf.extend_from_slice(payload);
1535
1536 let mut cursor = Cursor::new(buf);
1537 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1538
1539 match decoded {
1540 ChildMessage::StashPut {
1541 request_id,
1542 key,
1543 group,
1544 ..
1545 } => {
1546 assert_eq!(request_id, 60);
1547 assert_eq!(key, "old-key");
1548 assert_eq!(
1549 group, None,
1550 "missing group field from v0.3.0 worker should deserialize as None"
1551 );
1552 }
1553 other => panic!("expected StashPut, got: {:?}", other),
1554 }
1555
1556 let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1558 let mut buf = Vec::new();
1559 let payload = json_get.as_bytes();
1560 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1561 buf.extend_from_slice(payload);
1562 let mut cursor = Cursor::new(buf);
1563 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1564 match decoded {
1565 ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1566 other => panic!("expected StashGet, got: {:?}", other),
1567 }
1568
1569 let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1570 let mut buf = Vec::new();
1571 let payload = json_del.as_bytes();
1572 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1573 buf.extend_from_slice(payload);
1574 let mut cursor = Cursor::new(buf);
1575 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1576 match decoded {
1577 ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1578 other => panic!("expected StashDelete, got: {:?}", other),
1579 }
1580
1581 let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1582 let mut buf = Vec::new();
1583 let payload = json_keys.as_bytes();
1584 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1585 buf.extend_from_slice(payload);
1586 let mut cursor = Cursor::new(buf);
1587 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1588 match decoded {
1589 ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1590 other => panic!("expected StashKeys, got: {:?}", other),
1591 }
1592 }
1593
1594 #[tokio::test]
1597 async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1598 let msg = ChildMessage::ExecutionComplete {
1599 result: Err("execution timed out after 5000ms".into()),
1600 error_kind: Some(ErrorKind::Timeout),
1601 timeout_ms: Some(5000),
1602 };
1603
1604 let mut buf = Vec::new();
1605 write_message(&mut buf, &msg).await.unwrap();
1606
1607 let mut cursor = Cursor::new(buf);
1608 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1609
1610 match decoded {
1611 ChildMessage::ExecutionComplete {
1612 result,
1613 error_kind,
1614 timeout_ms,
1615 } => {
1616 assert!(result.is_err());
1617 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1618 assert_eq!(timeout_ms, Some(5000));
1619 }
1620 other => panic!("expected ExecutionComplete, got: {:?}", other),
1621 }
1622 }
1623
1624 #[tokio::test]
1625 async fn ipc_t02_timeout_ms_absent_backward_compat() {
1626 let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1628 let mut buf = Vec::new();
1629 let payload = json.as_bytes();
1630 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1631 buf.extend_from_slice(payload);
1632
1633 let mut cursor = Cursor::new(buf);
1634 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1635
1636 match decoded {
1637 ChildMessage::ExecutionComplete {
1638 error_kind,
1639 timeout_ms,
1640 ..
1641 } => {
1642 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1643 assert_eq!(
1644 timeout_ms, None,
1645 "missing timeout_ms should default to None"
1646 );
1647 }
1648 other => panic!("expected ExecutionComplete, got: {:?}", other),
1649 }
1650 }
1651
1652 #[tokio::test]
1653 async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1654 let msg = ChildMessage::ExecutionComplete {
1655 result: Err("some error".into()),
1656 error_kind: Some(ErrorKind::JsError),
1657 timeout_ms: None,
1658 };
1659
1660 let json = serde_json::to_string(&msg).unwrap();
1661 assert!(
1662 !json.contains("timeout_ms"),
1663 "timeout_ms:None should be omitted: {json}"
1664 );
1665 }
1666
1667 #[tokio::test]
1668 async fn ipc_t04_timeout_ms_present_when_some() {
1669 let msg = ChildMessage::ExecutionComplete {
1670 result: Err("timed out".into()),
1671 error_kind: Some(ErrorKind::Timeout),
1672 timeout_ms: Some(10000),
1673 };
1674
1675 let json = serde_json::to_string(&msg).unwrap();
1676 assert!(
1677 json.contains("\"timeout_ms\":10000"),
1678 "timeout_ms should be present: {json}"
1679 );
1680 }
1681
1682 #[tokio::test]
1685 async fn ipc_rv01_write_raw_message_roundtrip() {
1686 let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1687
1688 let mut buf = Vec::new();
1689 write_raw_message(&mut buf, payload).await.unwrap();
1690
1691 let mut cursor = Cursor::new(buf);
1692 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1693 .await
1694 .unwrap()
1695 .unwrap();
1696
1697 assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1698 }
1699
1700 #[tokio::test]
1701 async fn ipc_rv02_read_raw_message_preserves_bytes() {
1702 let msg = ChildMessage::Log {
1704 message: "test raw".into(),
1705 };
1706 let mut buf = Vec::new();
1707 write_message(&mut buf, &msg).await.unwrap();
1708
1709 let mut cursor = Cursor::new(buf);
1710 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1711 .await
1712 .unwrap()
1713 .unwrap();
1714
1715 let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1717 assert!(matches!(parsed, ChildMessage::Log { .. }));
1718 }
1719
1720 #[tokio::test]
1721 async fn ipc_rv03_raw_message_size_limit_enforced() {
1722 let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1723 let mut buf = Vec::new();
1724 write_raw_message(&mut buf, large_payload.as_bytes())
1725 .await
1726 .unwrap();
1727
1728 let mut cursor = Cursor::new(buf);
1729 let result = read_raw_message(&mut cursor, 64).await;
1730 assert!(result.is_err());
1731 let err = result.unwrap_err().to_string();
1732 assert!(err.contains("too large"), "error: {err}");
1733 }
1734
1735 #[tokio::test]
1736 async fn ipc_rv04_large_payload_stays_raw() {
1737 let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1739 let mut buf = Vec::new();
1740 write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1741
1742 let mut cursor = Cursor::new(buf);
1743 let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1744 .await
1745 .unwrap()
1746 .unwrap();
1747
1748 assert!(raw.get().len() > 1_000_000);
1750 let val: Value = serde_json::from_str(raw.get()).unwrap();
1752 assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1753 }
1754
1755 #[tokio::test]
1756 async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1757 let msg = ParentMessage::ToolCallResult {
1759 request_id: 99,
1760 result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1761 };
1762 let mut buf = Vec::new();
1763 write_message(&mut buf, &msg).await.unwrap();
1764
1765 let mut cursor = Cursor::new(buf.clone());
1766 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1767 .await
1768 .unwrap()
1769 .unwrap();
1770
1771 let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1773 match parsed {
1774 ParentMessage::ToolCallResult { request_id, result } => {
1775 assert_eq!(request_id, 99);
1776 assert!(result.is_ok());
1777 }
1778 other => panic!("expected ToolCallResult, got: {:?}", other),
1779 }
1780 }
1781
1782 #[tokio::test]
1783 async fn ipc_rv06_raw_eof_returns_none() {
1784 let mut cursor = Cursor::new(Vec::<u8>::new());
1785 let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1786 .await
1787 .unwrap();
1788 assert!(result.is_none());
1789 }
1790
1791 #[tokio::test]
1792 async fn ipc_rv07_mixed_raw_and_value_messages() {
1793 let mut buf = Vec::new();
1794
1795 let msg1 = ChildMessage::Log {
1797 message: "first".into(),
1798 };
1799 write_message(&mut buf, &msg1).await.unwrap();
1800
1801 let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1803 write_raw_message(&mut buf, raw_payload).await.unwrap();
1804
1805 let mut cursor = Cursor::new(buf);
1807 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1808 assert!(matches!(d1, ChildMessage::Log { .. }));
1809
1810 let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1811 .await
1812 .unwrap()
1813 .unwrap();
1814 let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1815 assert!(matches!(parsed, ChildMessage::Log { .. }));
1816 }
1817
1818 #[tokio::test]
1821 async fn tool_error_round_trips_through_ipc() {
1822 let original = forge_error::DispatchError::ToolError {
1823 server: "arbiter".into(),
1824 tool: "scan_target".into(),
1825 message: "tool returned error: Invalid params: missing field 'base_url'".into(),
1826 };
1827
1828 let ipc_err = IpcDispatchError::from(&original);
1830 assert_eq!(ipc_err.code, "TOOL_ERROR");
1831 assert_eq!(ipc_err.server, Some("arbiter".into()));
1832 assert_eq!(ipc_err.tool, Some("scan_target".into()));
1833 assert!(ipc_err.message.contains("Invalid params"));
1834
1835 let json = serde_json::to_string(&ipc_err).unwrap();
1837 let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
1838
1839 let reconstructed = deserialized.to_dispatch_error();
1841 assert!(matches!(
1842 reconstructed,
1843 forge_error::DispatchError::ToolError {
1844 ref server,
1845 ref tool,
1846 ..
1847 } if server == "arbiter" && tool == "scan_target"
1848 ));
1849 assert!(!reconstructed.trips_circuit_breaker());
1850 assert_eq!(reconstructed.code(), "TOOL_ERROR");
1851 }
1852
1853 #[tokio::test]
1854 async fn tool_error_ipc_message_roundtrip() {
1855 let msg = ParentMessage::ToolCallResult {
1856 request_id: 42,
1857 result: Err(IpcDispatchError::from(
1858 &forge_error::DispatchError::ToolError {
1859 server: "arbiter".into(),
1860 tool: "scan".into(),
1861 message: "bad params".into(),
1862 },
1863 )),
1864 };
1865
1866 let mut buf = Vec::new();
1867 write_message(&mut buf, &msg).await.unwrap();
1868
1869 let mut cursor = Cursor::new(buf);
1870 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1871
1872 match decoded {
1873 ParentMessage::ToolCallResult { request_id, result } => {
1874 assert_eq!(request_id, 42);
1875 let err = result.unwrap_err();
1876 assert_eq!(err.code, "TOOL_ERROR");
1877 assert_eq!(err.server, Some("arbiter".into()));
1878 assert_eq!(err.tool, Some("scan".into()));
1879
1880 let dispatch_err = err.to_dispatch_error();
1882 assert!(!dispatch_err.trips_circuit_breaker());
1883 }
1884 other => panic!("expected ToolCallResult, got: {:?}", other),
1885 }
1886 }
1887}