1use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21 Timeout,
23 HeapLimit,
25 JsError,
27 Execution,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39 pub code: String,
41 pub message: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub server: Option<String>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub tool: Option<String>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55 pub fn from_string(msg: String) -> Self {
58 Self {
59 code: "INTERNAL".to_string(),
60 message: msg,
61 server: None,
62 tool: None,
63 timeout_ms: None,
64 }
65 }
66
67 pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69 match self.code.as_str() {
70 "SERVER_NOT_FOUND" => {
71 forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72 }
73 "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74 server: self.server.unwrap_or_default(),
75 tool: self.tool.unwrap_or_default(),
76 },
77 "TIMEOUT" => forge_error::DispatchError::Timeout {
78 server: self.server.unwrap_or_default(),
79 timeout_ms: self.timeout_ms.unwrap_or(0),
80 },
81 "CIRCUIT_OPEN" => {
82 forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83 }
84 "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85 reason: self.message,
86 },
87 "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88 server: self.server.unwrap_or_default(),
89 message: self.message,
90 },
91 "TRANSPORT_DEAD" => forge_error::DispatchError::TransportDead {
92 server: self.server.unwrap_or_default(),
93 reason: self.message,
94 },
95 "TOOL_ERROR" => forge_error::DispatchError::ToolError {
96 server: self.server.unwrap_or_default(),
97 tool: self.tool.unwrap_or_default(),
98 message: self.message,
99 },
100 "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
101 _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
102 }
103 }
104}
105
106impl From<&forge_error::DispatchError> for IpcDispatchError {
107 fn from(e: &forge_error::DispatchError) -> Self {
108 let (server, tool, timeout_ms) = match e {
109 forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
110 forge_error::DispatchError::ToolNotFound { server, tool } => {
111 (Some(server.clone()), Some(tool.clone()), None)
112 }
113 forge_error::DispatchError::Timeout {
114 server, timeout_ms, ..
115 } => (Some(server.clone()), None, Some(*timeout_ms)),
116 forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
117 forge_error::DispatchError::Upstream { server, .. } => {
118 (Some(server.clone()), None, None)
119 }
120 forge_error::DispatchError::TransportDead { server, .. } => {
121 (Some(server.clone()), None, None)
122 }
123 forge_error::DispatchError::ToolError { server, tool, .. } => {
124 (Some(server.clone()), Some(tool.clone()), None)
125 }
126 _ => (None, None, None),
127 };
128
129 Self {
130 code: e.code().to_string(),
131 message: e.to_string(),
132 server,
133 tool,
134 timeout_ms,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "type")]
142#[non_exhaustive]
143pub enum ParentMessage {
144 Execute {
146 code: String,
148 manifest: Option<Value>,
150 config: WorkerConfig,
152 },
153 ToolCallResult {
155 request_id: u64,
157 result: Result<Value, IpcDispatchError>,
159 },
160 ResourceReadResult {
162 request_id: u64,
164 result: Result<Value, IpcDispatchError>,
166 },
167 Reset {
172 config: WorkerConfig,
174 },
175 StashResult {
177 request_id: u64,
179 result: Result<Value, IpcDispatchError>,
181 },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186#[serde(tag = "type")]
187#[non_exhaustive]
188pub enum ChildMessage {
189 ToolCallRequest {
191 request_id: u64,
193 server: String,
195 tool: String,
197 args: Value,
199 },
200 ResourceReadRequest {
202 request_id: u64,
204 server: String,
206 uri: String,
208 },
209 StashPut {
211 request_id: u64,
213 key: String,
215 value: Value,
217 ttl_secs: Option<u32>,
219 #[serde(default, skip_serializing_if = "Option::is_none")]
221 group: Option<String>,
222 },
223 StashGet {
225 request_id: u64,
227 key: String,
229 #[serde(default, skip_serializing_if = "Option::is_none")]
231 group: Option<String>,
232 },
233 StashDelete {
235 request_id: u64,
237 key: String,
239 #[serde(default, skip_serializing_if = "Option::is_none")]
241 group: Option<String>,
242 },
243 StashKeys {
245 request_id: u64,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
249 group: Option<String>,
250 },
251 ResetComplete,
253 ExecutionComplete {
255 result: Result<Value, String>,
257 #[serde(default, skip_serializing_if = "Option::is_none")]
261 error_kind: Option<ErrorKind>,
262 #[serde(default, skip_serializing_if = "Option::is_none")]
266 timeout_ms: Option<u64>,
267 },
268 Log {
270 message: String,
272 },
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct WorkerConfig {
278 pub timeout_ms: u64,
280 pub max_heap_size: usize,
282 pub max_tool_calls: usize,
284 pub max_tool_call_args_size: usize,
286 pub max_output_size: usize,
288 pub max_code_size: usize,
290 #[serde(default = "default_max_ipc_message_size")]
292 pub max_ipc_message_size: usize,
293 #[serde(default = "default_max_resource_size")]
295 pub max_resource_size: usize,
296 #[serde(default = "default_max_parallel")]
298 pub max_parallel: usize,
299 #[serde(default, skip_serializing_if = "Option::is_none")]
302 pub known_tools: Option<Vec<(String, String)>>,
303 #[serde(default, skip_serializing_if = "Option::is_none")]
305 pub known_servers: Option<std::collections::HashSet<String>>,
306}
307
308fn default_max_ipc_message_size() -> usize {
309 DEFAULT_MAX_IPC_MESSAGE_SIZE
310}
311
312fn default_max_resource_size() -> usize {
313 64 * 1024 * 1024 }
315
316fn default_max_parallel() -> usize {
317 8
318}
319
320impl From<&crate::SandboxConfig> for WorkerConfig {
321 fn from(config: &crate::SandboxConfig) -> Self {
322 Self {
323 timeout_ms: config.timeout.as_millis() as u64,
324 max_heap_size: config.max_heap_size,
325 max_tool_calls: config.max_tool_calls,
326 max_tool_call_args_size: config.max_tool_call_args_size,
327 max_output_size: config.max_output_size,
328 max_code_size: config.max_code_size,
329 max_ipc_message_size: config.max_ipc_message_size,
330 max_resource_size: config.max_resource_size,
331 max_parallel: config.max_parallel,
332 known_tools: None,
333 known_servers: None,
334 }
335 }
336}
337
338impl WorkerConfig {
339 pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
341 crate::SandboxConfig {
342 timeout: Duration::from_millis(self.timeout_ms),
343 max_code_size: self.max_code_size,
344 max_output_size: self.max_output_size,
345 max_heap_size: self.max_heap_size,
346 max_concurrent: 1, max_tool_calls: self.max_tool_calls,
348 max_tool_call_args_size: self.max_tool_call_args_size,
349 execution_mode: crate::executor::ExecutionMode::InProcess, max_resource_size: self.max_resource_size,
351 max_parallel: self.max_parallel,
352 max_ipc_message_size: self.max_ipc_message_size,
353 }
354 }
355}
356
357pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
361 writer: &mut W,
362 msg: &T,
363) -> Result<(), std::io::Error> {
364 let payload = serde_json::to_vec(msg)
365 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
366 let len = u32::try_from(payload.len()).map_err(|_| {
367 std::io::Error::new(
368 std::io::ErrorKind::InvalidData,
369 format!(
370 "IPC payload too large: {} bytes (max {} bytes)",
371 payload.len(),
372 u32::MAX
373 ),
374 )
375 })?;
376 writer.write_all(&len.to_be_bytes()).await?;
377 writer.write_all(&payload).await?;
378 writer.flush().await?;
379 Ok(())
380}
381
382pub async fn write_raw_message<W: AsyncWrite + Unpin>(
387 writer: &mut W,
388 payload: &[u8],
389) -> Result<(), std::io::Error> {
390 let len = u32::try_from(payload.len()).map_err(|_| {
391 std::io::Error::new(
392 std::io::ErrorKind::InvalidData,
393 format!(
394 "raw IPC payload too large: {} bytes (max {} bytes)",
395 payload.len(),
396 u32::MAX
397 ),
398 )
399 })?;
400 writer.write_all(&len.to_be_bytes()).await?;
401 writer.write_all(payload).await?;
402 writer.flush().await?;
403 Ok(())
404}
405
406pub async fn read_raw_message<R: AsyncRead + Unpin>(
411 reader: &mut R,
412 max_size: usize,
413) -> Result<Option<Box<RawValue>>, std::io::Error> {
414 let mut len_buf = [0u8; 4];
415 match reader.read_exact(&mut len_buf).await {
416 Ok(_) => {}
417 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
418 Err(e) => return Err(e),
419 }
420
421 let len = u32::from_be_bytes(len_buf) as usize;
422
423 if len > max_size {
424 return Err(std::io::Error::new(
425 std::io::ErrorKind::InvalidData,
426 format!(
427 "raw IPC message too large: {} bytes (limit: {} bytes)",
428 len, max_size
429 ),
430 ));
431 }
432
433 let mut payload = vec![0u8; len];
434 reader.read_exact(&mut payload).await?;
435
436 let raw: Box<RawValue> = serde_json::from_slice(&payload)
437 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
438 Ok(Some(raw))
439}
440
441pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
446
447pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
452 reader: &mut R,
453) -> Result<Option<T>, std::io::Error> {
454 read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
455}
456
457pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
462 reader: &mut R,
463 max_size: usize,
464) -> Result<Option<T>, std::io::Error> {
465 let mut len_buf = [0u8; 4];
466 match reader.read_exact(&mut len_buf).await {
467 Ok(_) => {}
468 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
469 Err(e) => return Err(e),
470 }
471
472 let len = u32::from_be_bytes(len_buf) as usize;
473
474 if len > max_size {
476 return Err(std::io::Error::new(
477 std::io::ErrorKind::InvalidData,
478 format!(
479 "IPC message too large: {} bytes (limit: {} bytes)",
480 len, max_size
481 ),
482 ));
483 }
484
485 let mut payload = vec![0u8; len];
486 reader.read_exact(&mut payload).await?;
487
488 let msg: T = serde_json::from_slice(&payload)
489 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
490 Ok(Some(msg))
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use std::io::Cursor;
497
498 #[tokio::test]
499 async fn roundtrip_parent_execute_message() {
500 let msg = ParentMessage::Execute {
501 code: "async () => { return 42; }".into(),
502 manifest: Some(serde_json::json!({"servers": []})),
503 config: WorkerConfig {
504 timeout_ms: 5000,
505 max_heap_size: 64 * 1024 * 1024,
506 max_tool_calls: 50,
507 max_tool_call_args_size: 1024 * 1024,
508 max_output_size: 1024 * 1024,
509 max_code_size: 64 * 1024,
510 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
511 max_resource_size: 64 * 1024 * 1024,
512 max_parallel: 8,
513 known_tools: None,
514 known_servers: None,
515 },
516 };
517
518 let mut buf = Vec::new();
519 write_message(&mut buf, &msg).await.unwrap();
520
521 let mut cursor = Cursor::new(buf);
522 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
523
524 match decoded {
525 ParentMessage::Execute {
526 code,
527 manifest,
528 config,
529 } => {
530 assert_eq!(code, "async () => { return 42; }");
531 assert!(manifest.is_some());
532 assert_eq!(config.timeout_ms, 5000);
533 }
534 other => panic!("expected Execute, got: {:?}", other),
535 }
536 }
537
538 #[tokio::test]
539 async fn roundtrip_parent_tool_result() {
540 let msg = ParentMessage::ToolCallResult {
541 request_id: 42,
542 result: Ok(serde_json::json!({"status": "ok"})),
543 };
544
545 let mut buf = Vec::new();
546 write_message(&mut buf, &msg).await.unwrap();
547
548 let mut cursor = Cursor::new(buf);
549 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
550
551 match decoded {
552 ParentMessage::ToolCallResult { request_id, result } => {
553 assert_eq!(request_id, 42);
554 assert!(result.is_ok());
555 }
556 other => panic!("expected ToolCallResult, got: {:?}", other),
557 }
558 }
559
560 #[tokio::test]
561 async fn roundtrip_parent_tool_result_error() {
562 let msg = ParentMessage::ToolCallResult {
563 request_id: 7,
564 result: Err(IpcDispatchError::from_string("connection refused".into())),
565 };
566
567 let mut buf = Vec::new();
568 write_message(&mut buf, &msg).await.unwrap();
569
570 let mut cursor = Cursor::new(buf);
571 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
572
573 match decoded {
574 ParentMessage::ToolCallResult { request_id, result } => {
575 assert_eq!(request_id, 7);
576 let err = result.unwrap_err();
577 assert_eq!(err.message, "connection refused");
578 assert_eq!(err.code, "INTERNAL");
579 }
580 other => panic!("expected ToolCallResult, got: {:?}", other),
581 }
582 }
583
584 #[tokio::test]
585 async fn roundtrip_child_tool_request() {
586 let msg = ChildMessage::ToolCallRequest {
587 request_id: 1,
588 server: "narsil".into(),
589 tool: "ast.parse".into(),
590 args: serde_json::json!({"file": "test.rs"}),
591 };
592
593 let mut buf = Vec::new();
594 write_message(&mut buf, &msg).await.unwrap();
595
596 let mut cursor = Cursor::new(buf);
597 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
598
599 match decoded {
600 ChildMessage::ToolCallRequest {
601 request_id,
602 server,
603 tool,
604 args,
605 } => {
606 assert_eq!(request_id, 1);
607 assert_eq!(server, "narsil");
608 assert_eq!(tool, "ast.parse");
609 assert_eq!(args["file"], "test.rs");
610 }
611 other => panic!("expected ToolCallRequest, got: {:?}", other),
612 }
613 }
614
615 #[tokio::test]
616 async fn roundtrip_child_execution_complete() {
617 let msg = ChildMessage::ExecutionComplete {
618 result: Ok(serde_json::json!([1, 2, 3])),
619 error_kind: None,
620 timeout_ms: None,
621 };
622
623 let mut buf = Vec::new();
624 write_message(&mut buf, &msg).await.unwrap();
625
626 let mut cursor = Cursor::new(buf);
627 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
628
629 match decoded {
630 ChildMessage::ExecutionComplete {
631 result, error_kind, ..
632 } => {
633 assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
634 assert_eq!(error_kind, None);
635 }
636 other => panic!("expected ExecutionComplete, got: {:?}", other),
637 }
638 }
639
640 #[tokio::test]
641 async fn roundtrip_child_log() {
642 let msg = ChildMessage::Log {
643 message: "processing step 3".into(),
644 };
645
646 let mut buf = Vec::new();
647 write_message(&mut buf, &msg).await.unwrap();
648
649 let mut cursor = Cursor::new(buf);
650 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
651
652 match decoded {
653 ChildMessage::Log { message } => {
654 assert_eq!(message, "processing step 3");
655 }
656 other => panic!("expected Log, got: {:?}", other),
657 }
658 }
659
660 #[tokio::test]
661 async fn multiple_messages_in_stream() {
662 let msg1 = ChildMessage::Log {
663 message: "first".into(),
664 };
665 let msg2 = ChildMessage::ToolCallRequest {
666 request_id: 1,
667 server: "s".into(),
668 tool: "t".into(),
669 args: serde_json::json!({}),
670 };
671 let msg3 = ChildMessage::ExecutionComplete {
672 result: Ok(serde_json::json!("done")),
673 error_kind: None,
674 timeout_ms: None,
675 };
676
677 let mut buf = Vec::new();
678 write_message(&mut buf, &msg1).await.unwrap();
679 write_message(&mut buf, &msg2).await.unwrap();
680 write_message(&mut buf, &msg3).await.unwrap();
681
682 let mut cursor = Cursor::new(buf);
683 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
684 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
685 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
686
687 assert!(matches!(d1, ChildMessage::Log { .. }));
688 assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
689 assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
690
691 let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
693 assert!(d4.is_none());
694 }
695
696 #[tokio::test]
697 async fn execution_complete_error_roundtrip() {
698 let msg = ChildMessage::ExecutionComplete {
699 result: Err("failed to create tokio runtime: resource unavailable".into()),
700 error_kind: Some(ErrorKind::Execution),
701 timeout_ms: None,
702 };
703
704 let mut buf = Vec::new();
705 write_message(&mut buf, &msg).await.unwrap();
706
707 let mut cursor = Cursor::new(buf);
708 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
709
710 match decoded {
711 ChildMessage::ExecutionComplete {
712 result, error_kind, ..
713 } => {
714 let err = result.unwrap_err();
715 assert!(
716 err.contains("tokio runtime"),
717 "expected runtime error: {err}"
718 );
719 assert_eq!(error_kind, Some(ErrorKind::Execution));
720 }
721 other => panic!("expected ExecutionComplete, got: {:?}", other),
722 }
723 }
724
725 #[tokio::test]
726 async fn eof_returns_none() {
727 let mut cursor = Cursor::new(Vec::<u8>::new());
728 let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
729 assert!(result.is_none());
730 }
731
732 #[test]
733 fn u32_try_from_overflow() {
734 let overflow_size = u32::MAX as usize + 1;
736 assert!(u32::try_from(overflow_size).is_err());
737 }
738
739 #[tokio::test]
740 async fn write_message_normal_size_succeeds() {
741 let msg = ChildMessage::Log {
743 message: "a".repeat(1024),
744 };
745 let mut buf = Vec::new();
746 write_message(&mut buf, &msg).await.unwrap();
747 assert!(buf.len() > 1024);
748 }
749
750 #[tokio::test]
751 async fn large_message_roundtrip() {
752 let large_data = "x".repeat(1_000_000);
754 let msg = ChildMessage::ExecutionComplete {
755 result: Ok(serde_json::json!(large_data)),
756 error_kind: None,
757 timeout_ms: None,
758 };
759
760 let mut buf = Vec::new();
761 write_message(&mut buf, &msg).await.unwrap();
762
763 let mut cursor = Cursor::new(buf);
764 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
765
766 match decoded {
767 ChildMessage::ExecutionComplete { result, .. } => {
768 assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
769 }
770 other => panic!("expected ExecutionComplete, got: {:?}", other),
771 }
772 }
773
774 #[tokio::test]
775 async fn worker_config_roundtrip_from_sandbox_config() {
776 let sandbox = crate::SandboxConfig::default();
777 let worker = WorkerConfig::from(&sandbox);
778 let back = worker.to_sandbox_config();
779
780 assert_eq!(sandbox.timeout, back.timeout);
781 assert_eq!(sandbox.max_heap_size, back.max_heap_size);
782 assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
783 assert_eq!(sandbox.max_output_size, back.max_output_size);
784 assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
785 assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); }
787
788 #[tokio::test]
789 async fn read_message_with_limit_rejects_oversized() {
790 let msg = ChildMessage::Log {
791 message: "x".repeat(1024),
792 };
793 let mut buf = Vec::new();
794 write_message(&mut buf, &msg).await.unwrap();
795
796 let mut cursor = Cursor::new(buf);
798 let result: Result<Option<ChildMessage>, _> =
799 read_message_with_limit(&mut cursor, 64).await;
800 assert!(result.is_err());
801 let err_msg = result.unwrap_err().to_string();
802 assert!(err_msg.contains("too large"), "error: {err_msg}");
803 }
804
805 #[tokio::test]
806 async fn read_message_with_limit_accepts_within_limit() {
807 let msg = ChildMessage::Log {
808 message: "hello".into(),
809 };
810 let mut buf = Vec::new();
811 write_message(&mut buf, &msg).await.unwrap();
812
813 let mut cursor = Cursor::new(buf);
814 let result: Option<ChildMessage> =
815 read_message_with_limit(&mut cursor, 1024).await.unwrap();
816 assert!(result.is_some());
817 }
818
819 #[tokio::test]
820 async fn worker_config_ipc_limit_serde_default() {
821 let json = r#"{
823 "timeout_ms": 5000,
824 "max_heap_size": 67108864,
825 "max_tool_calls": 50,
826 "max_tool_call_args_size": 1048576,
827 "max_output_size": 1048576,
828 "max_code_size": 65536
829 }"#;
830 let config: WorkerConfig = serde_json::from_str(json).unwrap();
831 assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
832 }
833
834 #[tokio::test]
836 async fn ipc_01_resource_read_request_roundtrip() {
837 let msg = ChildMessage::ResourceReadRequest {
838 request_id: 10,
839 server: "postgres".into(),
840 uri: "file:///logs/app.log".into(),
841 };
842
843 let mut buf = Vec::new();
844 write_message(&mut buf, &msg).await.unwrap();
845
846 let mut cursor = Cursor::new(buf);
847 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
848
849 match decoded {
850 ChildMessage::ResourceReadRequest {
851 request_id,
852 server,
853 uri,
854 } => {
855 assert_eq!(request_id, 10);
856 assert_eq!(server, "postgres");
857 assert_eq!(uri, "file:///logs/app.log");
858 }
859 other => panic!("expected ResourceReadRequest, got: {:?}", other),
860 }
861 }
862
863 #[tokio::test]
865 async fn ipc_02_resource_read_result_success_roundtrip() {
866 let msg = ParentMessage::ResourceReadResult {
867 request_id: 11,
868 result: Ok(serde_json::json!({"content": "log data here"})),
869 };
870
871 let mut buf = Vec::new();
872 write_message(&mut buf, &msg).await.unwrap();
873
874 let mut cursor = Cursor::new(buf);
875 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
876
877 match decoded {
878 ParentMessage::ResourceReadResult { request_id, result } => {
879 assert_eq!(request_id, 11);
880 let val = result.unwrap();
881 assert_eq!(val["content"], "log data here");
882 }
883 other => panic!("expected ResourceReadResult, got: {:?}", other),
884 }
885 }
886
887 #[tokio::test]
889 async fn ipc_03_resource_read_result_error_roundtrip() {
890 let msg = ParentMessage::ResourceReadResult {
891 request_id: 12,
892 result: Err(IpcDispatchError::from_string("resource not found".into())),
893 };
894
895 let mut buf = Vec::new();
896 write_message(&mut buf, &msg).await.unwrap();
897
898 let mut cursor = Cursor::new(buf);
899 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
900
901 match decoded {
902 ParentMessage::ResourceReadResult { request_id, result } => {
903 assert_eq!(request_id, 12);
904 let err = result.unwrap_err();
905 assert_eq!(err.message, "resource not found");
906 assert_eq!(err.code, "INTERNAL");
907 }
908 other => panic!("expected ResourceReadResult, got: {:?}", other),
909 }
910 }
911
912 #[tokio::test]
914 async fn ipc_04_stash_put_roundtrip() {
915 let msg = ChildMessage::StashPut {
916 request_id: 20,
917 key: "my-key".into(),
918 value: serde_json::json!({"data": 42}),
919 ttl_secs: Some(60),
920 group: None,
921 };
922
923 let mut buf = Vec::new();
924 write_message(&mut buf, &msg).await.unwrap();
925
926 let mut cursor = Cursor::new(buf);
927 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
928
929 match decoded {
930 ChildMessage::StashPut {
931 request_id,
932 key,
933 value,
934 ttl_secs,
935 group,
936 } => {
937 assert_eq!(request_id, 20);
938 assert_eq!(key, "my-key");
939 assert_eq!(value["data"], 42);
940 assert_eq!(ttl_secs, Some(60));
941 assert_eq!(group, None);
942 }
943 other => panic!("expected StashPut, got: {:?}", other),
944 }
945 }
946
947 #[tokio::test]
949 async fn ipc_05_stash_get_roundtrip() {
950 let msg = ChildMessage::StashGet {
951 request_id: 21,
952 key: "lookup-key".into(),
953 group: None,
954 };
955
956 let mut buf = Vec::new();
957 write_message(&mut buf, &msg).await.unwrap();
958
959 let mut cursor = Cursor::new(buf);
960 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
961
962 match decoded {
963 ChildMessage::StashGet {
964 request_id,
965 key,
966 group,
967 } => {
968 assert_eq!(request_id, 21);
969 assert_eq!(key, "lookup-key");
970 assert_eq!(group, None);
971 }
972 other => panic!("expected StashGet, got: {:?}", other),
973 }
974 }
975
976 #[tokio::test]
978 async fn ipc_06_stash_delete_roundtrip() {
979 let msg = ChildMessage::StashDelete {
980 request_id: 22,
981 key: "delete-me".into(),
982 group: None,
983 };
984
985 let mut buf = Vec::new();
986 write_message(&mut buf, &msg).await.unwrap();
987
988 let mut cursor = Cursor::new(buf);
989 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
990
991 match decoded {
992 ChildMessage::StashDelete {
993 request_id,
994 key,
995 group,
996 } => {
997 assert_eq!(request_id, 22);
998 assert_eq!(key, "delete-me");
999 assert_eq!(group, None);
1000 }
1001 other => panic!("expected StashDelete, got: {:?}", other),
1002 }
1003 }
1004
1005 #[tokio::test]
1007 async fn ipc_07_stash_keys_roundtrip() {
1008 let msg = ChildMessage::StashKeys {
1009 request_id: 23,
1010 group: None,
1011 };
1012
1013 let mut buf = Vec::new();
1014 write_message(&mut buf, &msg).await.unwrap();
1015
1016 let mut cursor = Cursor::new(buf);
1017 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1018
1019 match decoded {
1020 ChildMessage::StashKeys { request_id, group } => {
1021 assert_eq!(request_id, 23);
1022 assert_eq!(group, None);
1023 }
1024 other => panic!("expected StashKeys, got: {:?}", other),
1025 }
1026 }
1027
1028 #[tokio::test]
1030 async fn ipc_08_stash_result_roundtrip() {
1031 let msg = ParentMessage::StashResult {
1032 request_id: 24,
1033 result: Ok(serde_json::json!({"ok": true})),
1034 };
1035
1036 let mut buf = Vec::new();
1037 write_message(&mut buf, &msg).await.unwrap();
1038
1039 let mut cursor = Cursor::new(buf);
1040 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1041
1042 match decoded {
1043 ParentMessage::StashResult { request_id, result } => {
1044 assert_eq!(request_id, 24);
1045 assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1046 }
1047 other => panic!("expected StashResult, got: {:?}", other),
1048 }
1049 }
1050
1051 #[tokio::test]
1053 async fn ipc_09_mixed_message_interleaving() {
1054 let msg1 = ChildMessage::ToolCallRequest {
1055 request_id: 1,
1056 server: "s".into(),
1057 tool: "t".into(),
1058 args: serde_json::json!({}),
1059 };
1060 let msg2 = ChildMessage::ResourceReadRequest {
1061 request_id: 2,
1062 server: "pg".into(),
1063 uri: "file:///data".into(),
1064 };
1065 let msg3 = ChildMessage::StashPut {
1066 request_id: 3,
1067 key: "k".into(),
1068 value: serde_json::json!("v"),
1069 ttl_secs: None,
1070 group: None,
1071 };
1072 let msg4 = ChildMessage::StashGet {
1073 request_id: 4,
1074 key: "k".into(),
1075 group: None,
1076 };
1077 let msg5 = ChildMessage::ExecutionComplete {
1078 result: Ok(serde_json::json!("done")),
1079 error_kind: None,
1080 timeout_ms: None,
1081 };
1082
1083 let mut buf = Vec::new();
1084 write_message(&mut buf, &msg1).await.unwrap();
1085 write_message(&mut buf, &msg2).await.unwrap();
1086 write_message(&mut buf, &msg3).await.unwrap();
1087 write_message(&mut buf, &msg4).await.unwrap();
1088 write_message(&mut buf, &msg5).await.unwrap();
1089
1090 let mut cursor = Cursor::new(buf);
1091 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1092 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1093 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1094 let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1095 let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1096
1097 assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1098 assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1099 assert!(matches!(d3, ChildMessage::StashPut { .. }));
1100 assert!(matches!(d4, ChildMessage::StashGet { .. }));
1101 assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1102
1103 let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1105 assert!(d6.is_none());
1106 }
1107
1108 #[tokio::test]
1110 async fn ipc_p01_reset_roundtrip() {
1111 let msg = ParentMessage::Reset {
1112 config: WorkerConfig {
1113 timeout_ms: 3000,
1114 max_heap_size: 32 * 1024 * 1024,
1115 max_tool_calls: 25,
1116 max_tool_call_args_size: 512 * 1024,
1117 max_output_size: 512 * 1024,
1118 max_code_size: 32 * 1024,
1119 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1120 max_resource_size: 32 * 1024 * 1024,
1121 max_parallel: 4,
1122 known_tools: None,
1123 known_servers: None,
1124 },
1125 };
1126
1127 let mut buf = Vec::new();
1128 write_message(&mut buf, &msg).await.unwrap();
1129
1130 let mut cursor = Cursor::new(buf);
1131 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1132
1133 match decoded {
1134 ParentMessage::Reset { config } => {
1135 assert_eq!(config.timeout_ms, 3000);
1136 assert_eq!(config.max_tool_calls, 25);
1137 }
1138 other => panic!("expected Reset, got: {:?}", other),
1139 }
1140 }
1141
1142 #[tokio::test]
1144 async fn ipc_p02_reset_complete_roundtrip() {
1145 let msg = ChildMessage::ResetComplete;
1146
1147 let mut buf = Vec::new();
1148 write_message(&mut buf, &msg).await.unwrap();
1149
1150 let mut cursor = Cursor::new(buf);
1151 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1152
1153 assert!(matches!(decoded, ChildMessage::ResetComplete));
1154 }
1155
1156 #[tokio::test]
1158 async fn ipc_p03_reset_execute_interleaving() {
1159 let reset = ParentMessage::Reset {
1160 config: WorkerConfig {
1161 timeout_ms: 5000,
1162 max_heap_size: 64 * 1024 * 1024,
1163 max_tool_calls: 50,
1164 max_tool_call_args_size: 1024 * 1024,
1165 max_output_size: 1024 * 1024,
1166 max_code_size: 64 * 1024,
1167 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1168 max_resource_size: 64 * 1024 * 1024,
1169 max_parallel: 8,
1170 known_tools: None,
1171 known_servers: None,
1172 },
1173 };
1174 let execute = ParentMessage::Execute {
1175 code: "async () => 42".into(),
1176 manifest: None,
1177 config: WorkerConfig {
1178 timeout_ms: 5000,
1179 max_heap_size: 64 * 1024 * 1024,
1180 max_tool_calls: 50,
1181 max_tool_call_args_size: 1024 * 1024,
1182 max_output_size: 1024 * 1024,
1183 max_code_size: 64 * 1024,
1184 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1185 max_resource_size: 64 * 1024 * 1024,
1186 max_parallel: 8,
1187 known_tools: None,
1188 known_servers: None,
1189 },
1190 };
1191
1192 let mut buf = Vec::new();
1193 write_message(&mut buf, &reset).await.unwrap();
1194 write_message(&mut buf, &execute).await.unwrap();
1195
1196 let mut cursor = Cursor::new(buf);
1197 let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1198 let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1199
1200 assert!(matches!(d1, ParentMessage::Reset { .. }));
1201 assert!(matches!(d2, ParentMessage::Execute { .. }));
1202 }
1203
1204 #[tokio::test]
1206 async fn ipc_10_oversized_stash_message_rejected() {
1207 let msg = ChildMessage::StashPut {
1208 request_id: 100,
1209 key: "k".into(),
1210 value: serde_json::json!("x".repeat(2048)),
1211 ttl_secs: Some(60),
1212 group: None,
1213 };
1214 let mut buf = Vec::new();
1215 write_message(&mut buf, &msg).await.unwrap();
1216
1217 let mut cursor = Cursor::new(buf);
1219 let result: Result<Option<ChildMessage>, _> =
1220 read_message_with_limit(&mut cursor, 64).await;
1221 assert!(result.is_err());
1222 let err_msg = result.unwrap_err().to_string();
1223 assert!(
1224 err_msg.contains("too large"),
1225 "error should mention 'too large': {err_msg}"
1226 );
1227 }
1228
1229 #[tokio::test]
1231 async fn ipc_o01_error_kind_timeout_roundtrip() {
1232 let msg = ChildMessage::ExecutionComplete {
1233 result: Err("execution timed out after 500ms".into()),
1234 error_kind: Some(ErrorKind::Timeout),
1235 timeout_ms: Some(500),
1236 };
1237
1238 let mut buf = Vec::new();
1239 write_message(&mut buf, &msg).await.unwrap();
1240
1241 let mut cursor = Cursor::new(buf);
1242 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1243
1244 match decoded {
1245 ChildMessage::ExecutionComplete {
1246 result,
1247 error_kind,
1248 timeout_ms,
1249 } => {
1250 assert!(result.is_err());
1251 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1252 assert_eq!(timeout_ms, Some(500));
1253 }
1254 other => panic!("expected ExecutionComplete, got: {:?}", other),
1255 }
1256 }
1257
1258 #[tokio::test]
1260 async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1261 let msg = ChildMessage::ExecutionComplete {
1262 result: Err("V8 heap limit exceeded".into()),
1263 error_kind: Some(ErrorKind::HeapLimit),
1264 timeout_ms: None,
1265 };
1266
1267 let mut buf = Vec::new();
1268 write_message(&mut buf, &msg).await.unwrap();
1269
1270 let mut cursor = Cursor::new(buf);
1271 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1272
1273 match decoded {
1274 ChildMessage::ExecutionComplete {
1275 result, error_kind, ..
1276 } => {
1277 assert!(result.is_err());
1278 assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1279 }
1280 other => panic!("expected ExecutionComplete, got: {:?}", other),
1281 }
1282 }
1283
1284 #[tokio::test]
1286 async fn ipc_o03_error_kind_backward_compat() {
1287 let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1290 let mut buf = Vec::new();
1291 let payload = json.as_bytes();
1292 let len = payload.len() as u32;
1293 buf.extend_from_slice(&len.to_be_bytes());
1294 buf.extend_from_slice(payload);
1295
1296 let mut cursor = Cursor::new(buf);
1297 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1298
1299 match decoded {
1300 ChildMessage::ExecutionComplete {
1301 result,
1302 error_kind,
1303 timeout_ms,
1304 } => {
1305 assert!(result.is_err());
1306 assert_eq!(
1307 error_kind, None,
1308 "missing error_kind should default to None"
1309 );
1310 assert_eq!(
1311 timeout_ms, None,
1312 "missing timeout_ms should default to None"
1313 );
1314 }
1315 other => panic!("expected ExecutionComplete, got: {:?}", other),
1316 }
1317 }
1318
1319 #[tokio::test]
1321 async fn ipc_o04_error_kind_js_error_roundtrip() {
1322 let msg = ChildMessage::ExecutionComplete {
1323 result: Err("ReferenceError: x is not defined".into()),
1324 error_kind: Some(ErrorKind::JsError),
1325 timeout_ms: None,
1326 };
1327
1328 let mut buf = Vec::new();
1329 write_message(&mut buf, &msg).await.unwrap();
1330
1331 let mut cursor = Cursor::new(buf);
1332 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1333
1334 match decoded {
1335 ChildMessage::ExecutionComplete {
1336 result, error_kind, ..
1337 } => {
1338 assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1339 assert_eq!(error_kind, Some(ErrorKind::JsError));
1340 }
1341 other => panic!("expected ExecutionComplete, got: {:?}", other),
1342 }
1343 }
1344
1345 #[tokio::test]
1347 async fn ipc_o05_success_omits_error_kind() {
1348 let msg = ChildMessage::ExecutionComplete {
1349 result: Ok(serde_json::json!(42)),
1350 error_kind: None,
1351 timeout_ms: None,
1352 };
1353
1354 let json = serde_json::to_string(&msg).unwrap();
1355 assert!(
1357 !json.contains("error_kind"),
1358 "success messages should not contain error_kind field: {json}"
1359 );
1360 assert!(
1361 !json.contains("timeout_ms"),
1362 "success messages should not contain timeout_ms field: {json}"
1363 );
1364 }
1365
1366 #[tokio::test]
1369 async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1370 let msg = ChildMessage::StashPut {
1371 request_id: 50,
1372 key: "grouped-key".into(),
1373 value: serde_json::json!({"data": "secret"}),
1374 ttl_secs: Some(120),
1375 group: Some("analytics".into()),
1376 };
1377
1378 let mut buf = Vec::new();
1379 write_message(&mut buf, &msg).await.unwrap();
1380
1381 let mut cursor = Cursor::new(buf);
1382 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1383
1384 match decoded {
1385 ChildMessage::StashPut {
1386 request_id,
1387 key,
1388 group,
1389 ..
1390 } => {
1391 assert_eq!(request_id, 50);
1392 assert_eq!(key, "grouped-key");
1393 assert_eq!(group, Some("analytics".into()));
1394 }
1395 other => panic!("expected StashPut, got: {:?}", other),
1396 }
1397 }
1398
1399 #[tokio::test]
1400 async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1401 let msg = ChildMessage::StashGet {
1402 request_id: 51,
1403 key: "grouped-key".into(),
1404 group: Some("analytics".into()),
1405 };
1406
1407 let mut buf = Vec::new();
1408 write_message(&mut buf, &msg).await.unwrap();
1409
1410 let mut cursor = Cursor::new(buf);
1411 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1412
1413 match decoded {
1414 ChildMessage::StashGet {
1415 request_id,
1416 key,
1417 group,
1418 } => {
1419 assert_eq!(request_id, 51);
1420 assert_eq!(key, "grouped-key");
1421 assert_eq!(group, Some("analytics".into()));
1422 }
1423 other => panic!("expected StashGet, got: {:?}", other),
1424 }
1425 }
1426
1427 #[tokio::test]
1428 async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1429 let msg = ChildMessage::StashDelete {
1430 request_id: 52,
1431 key: "grouped-key".into(),
1432 group: Some("analytics".into()),
1433 };
1434
1435 let mut buf = Vec::new();
1436 write_message(&mut buf, &msg).await.unwrap();
1437
1438 let mut cursor = Cursor::new(buf);
1439 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1440
1441 match decoded {
1442 ChildMessage::StashDelete {
1443 request_id,
1444 key,
1445 group,
1446 } => {
1447 assert_eq!(request_id, 52);
1448 assert_eq!(key, "grouped-key");
1449 assert_eq!(group, Some("analytics".into()));
1450 }
1451 other => panic!("expected StashDelete, got: {:?}", other),
1452 }
1453 }
1454
1455 #[tokio::test]
1456 async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1457 let msg = ChildMessage::StashKeys {
1458 request_id: 53,
1459 group: Some("analytics".into()),
1460 };
1461
1462 let mut buf = Vec::new();
1463 write_message(&mut buf, &msg).await.unwrap();
1464
1465 let mut cursor = Cursor::new(buf);
1466 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1467
1468 match decoded {
1469 ChildMessage::StashKeys { request_id, group } => {
1470 assert_eq!(request_id, 53);
1471 assert_eq!(group, Some("analytics".into()));
1472 }
1473 other => panic!("expected StashKeys, got: {:?}", other),
1474 }
1475 }
1476
1477 #[tokio::test]
1478 async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1479 let msg = ChildMessage::StashPut {
1481 request_id: 54,
1482 key: "no-group-key".into(),
1483 value: serde_json::json!("val"),
1484 ttl_secs: None,
1485 group: None,
1486 };
1487
1488 let json = serde_json::to_string(&msg).unwrap();
1489 assert!(
1490 !json.contains("\"group\""),
1491 "group:None should be absent in serialized JSON: {json}"
1492 );
1493
1494 let mut buf = Vec::new();
1496 write_message(&mut buf, &msg).await.unwrap();
1497 let mut cursor = Cursor::new(buf);
1498 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1499 match decoded {
1500 ChildMessage::StashPut { group, .. } => {
1501 assert_eq!(group, None);
1502 }
1503 other => panic!("expected StashPut, got: {:?}", other),
1504 }
1505 }
1506
1507 #[tokio::test]
1508 async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1509 let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1511 let mut buf = Vec::new();
1512 let payload = json.as_bytes();
1513 let len = payload.len() as u32;
1514 buf.extend_from_slice(&len.to_be_bytes());
1515 buf.extend_from_slice(payload);
1516
1517 let mut cursor = Cursor::new(buf);
1518 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1519
1520 match decoded {
1521 ChildMessage::StashPut {
1522 request_id,
1523 key,
1524 group,
1525 ..
1526 } => {
1527 assert_eq!(request_id, 60);
1528 assert_eq!(key, "old-key");
1529 assert_eq!(
1530 group, None,
1531 "missing group field from v0.3.0 worker should deserialize as None"
1532 );
1533 }
1534 other => panic!("expected StashPut, got: {:?}", other),
1535 }
1536
1537 let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1539 let mut buf = Vec::new();
1540 let payload = json_get.as_bytes();
1541 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1542 buf.extend_from_slice(payload);
1543 let mut cursor = Cursor::new(buf);
1544 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1545 match decoded {
1546 ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1547 other => panic!("expected StashGet, got: {:?}", other),
1548 }
1549
1550 let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1551 let mut buf = Vec::new();
1552 let payload = json_del.as_bytes();
1553 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1554 buf.extend_from_slice(payload);
1555 let mut cursor = Cursor::new(buf);
1556 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1557 match decoded {
1558 ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1559 other => panic!("expected StashDelete, got: {:?}", other),
1560 }
1561
1562 let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1563 let mut buf = Vec::new();
1564 let payload = json_keys.as_bytes();
1565 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1566 buf.extend_from_slice(payload);
1567 let mut cursor = Cursor::new(buf);
1568 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1569 match decoded {
1570 ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1571 other => panic!("expected StashKeys, got: {:?}", other),
1572 }
1573 }
1574
1575 #[tokio::test]
1578 async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1579 let msg = ChildMessage::ExecutionComplete {
1580 result: Err("execution timed out after 5000ms".into()),
1581 error_kind: Some(ErrorKind::Timeout),
1582 timeout_ms: Some(5000),
1583 };
1584
1585 let mut buf = Vec::new();
1586 write_message(&mut buf, &msg).await.unwrap();
1587
1588 let mut cursor = Cursor::new(buf);
1589 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1590
1591 match decoded {
1592 ChildMessage::ExecutionComplete {
1593 result,
1594 error_kind,
1595 timeout_ms,
1596 } => {
1597 assert!(result.is_err());
1598 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1599 assert_eq!(timeout_ms, Some(5000));
1600 }
1601 other => panic!("expected ExecutionComplete, got: {:?}", other),
1602 }
1603 }
1604
1605 #[tokio::test]
1606 async fn ipc_t02_timeout_ms_absent_backward_compat() {
1607 let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1609 let mut buf = Vec::new();
1610 let payload = json.as_bytes();
1611 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1612 buf.extend_from_slice(payload);
1613
1614 let mut cursor = Cursor::new(buf);
1615 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1616
1617 match decoded {
1618 ChildMessage::ExecutionComplete {
1619 error_kind,
1620 timeout_ms,
1621 ..
1622 } => {
1623 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1624 assert_eq!(
1625 timeout_ms, None,
1626 "missing timeout_ms should default to None"
1627 );
1628 }
1629 other => panic!("expected ExecutionComplete, got: {:?}", other),
1630 }
1631 }
1632
1633 #[tokio::test]
1634 async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1635 let msg = ChildMessage::ExecutionComplete {
1636 result: Err("some error".into()),
1637 error_kind: Some(ErrorKind::JsError),
1638 timeout_ms: None,
1639 };
1640
1641 let json = serde_json::to_string(&msg).unwrap();
1642 assert!(
1643 !json.contains("timeout_ms"),
1644 "timeout_ms:None should be omitted: {json}"
1645 );
1646 }
1647
1648 #[tokio::test]
1649 async fn ipc_t04_timeout_ms_present_when_some() {
1650 let msg = ChildMessage::ExecutionComplete {
1651 result: Err("timed out".into()),
1652 error_kind: Some(ErrorKind::Timeout),
1653 timeout_ms: Some(10000),
1654 };
1655
1656 let json = serde_json::to_string(&msg).unwrap();
1657 assert!(
1658 json.contains("\"timeout_ms\":10000"),
1659 "timeout_ms should be present: {json}"
1660 );
1661 }
1662
1663 #[tokio::test]
1666 async fn ipc_rv01_write_raw_message_roundtrip() {
1667 let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1668
1669 let mut buf = Vec::new();
1670 write_raw_message(&mut buf, payload).await.unwrap();
1671
1672 let mut cursor = Cursor::new(buf);
1673 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1674 .await
1675 .unwrap()
1676 .unwrap();
1677
1678 assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1679 }
1680
1681 #[tokio::test]
1682 async fn ipc_rv02_read_raw_message_preserves_bytes() {
1683 let msg = ChildMessage::Log {
1685 message: "test raw".into(),
1686 };
1687 let mut buf = Vec::new();
1688 write_message(&mut buf, &msg).await.unwrap();
1689
1690 let mut cursor = Cursor::new(buf);
1691 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1692 .await
1693 .unwrap()
1694 .unwrap();
1695
1696 let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1698 assert!(matches!(parsed, ChildMessage::Log { .. }));
1699 }
1700
1701 #[tokio::test]
1702 async fn ipc_rv03_raw_message_size_limit_enforced() {
1703 let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1704 let mut buf = Vec::new();
1705 write_raw_message(&mut buf, large_payload.as_bytes())
1706 .await
1707 .unwrap();
1708
1709 let mut cursor = Cursor::new(buf);
1710 let result = read_raw_message(&mut cursor, 64).await;
1711 assert!(result.is_err());
1712 let err = result.unwrap_err().to_string();
1713 assert!(err.contains("too large"), "error: {err}");
1714 }
1715
1716 #[tokio::test]
1717 async fn ipc_rv04_large_payload_stays_raw() {
1718 let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1720 let mut buf = Vec::new();
1721 write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1722
1723 let mut cursor = Cursor::new(buf);
1724 let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1725 .await
1726 .unwrap()
1727 .unwrap();
1728
1729 assert!(raw.get().len() > 1_000_000);
1731 let val: Value = serde_json::from_str(raw.get()).unwrap();
1733 assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1734 }
1735
1736 #[tokio::test]
1737 async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1738 let msg = ParentMessage::ToolCallResult {
1740 request_id: 99,
1741 result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1742 };
1743 let mut buf = Vec::new();
1744 write_message(&mut buf, &msg).await.unwrap();
1745
1746 let mut cursor = Cursor::new(buf.clone());
1747 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1748 .await
1749 .unwrap()
1750 .unwrap();
1751
1752 let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1754 match parsed {
1755 ParentMessage::ToolCallResult { request_id, result } => {
1756 assert_eq!(request_id, 99);
1757 assert!(result.is_ok());
1758 }
1759 other => panic!("expected ToolCallResult, got: {:?}", other),
1760 }
1761 }
1762
1763 #[tokio::test]
1764 async fn ipc_rv06_raw_eof_returns_none() {
1765 let mut cursor = Cursor::new(Vec::<u8>::new());
1766 let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1767 .await
1768 .unwrap();
1769 assert!(result.is_none());
1770 }
1771
1772 #[tokio::test]
1773 async fn ipc_rv07_mixed_raw_and_value_messages() {
1774 let mut buf = Vec::new();
1775
1776 let msg1 = ChildMessage::Log {
1778 message: "first".into(),
1779 };
1780 write_message(&mut buf, &msg1).await.unwrap();
1781
1782 let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1784 write_raw_message(&mut buf, raw_payload).await.unwrap();
1785
1786 let mut cursor = Cursor::new(buf);
1788 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1789 assert!(matches!(d1, ChildMessage::Log { .. }));
1790
1791 let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1792 .await
1793 .unwrap()
1794 .unwrap();
1795 let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1796 assert!(matches!(parsed, ChildMessage::Log { .. }));
1797 }
1798
1799 #[tokio::test]
1802 async fn tool_error_round_trips_through_ipc() {
1803 let original = forge_error::DispatchError::ToolError {
1804 server: "arbiter".into(),
1805 tool: "scan_target".into(),
1806 message: "tool returned error: Invalid params: missing field 'base_url'".into(),
1807 };
1808
1809 let ipc_err = IpcDispatchError::from(&original);
1811 assert_eq!(ipc_err.code, "TOOL_ERROR");
1812 assert_eq!(ipc_err.server, Some("arbiter".into()));
1813 assert_eq!(ipc_err.tool, Some("scan_target".into()));
1814 assert!(ipc_err.message.contains("Invalid params"));
1815
1816 let json = serde_json::to_string(&ipc_err).unwrap();
1818 let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
1819
1820 let reconstructed = deserialized.to_dispatch_error();
1822 assert!(matches!(
1823 reconstructed,
1824 forge_error::DispatchError::ToolError {
1825 ref server,
1826 ref tool,
1827 ..
1828 } if server == "arbiter" && tool == "scan_target"
1829 ));
1830 assert!(!reconstructed.trips_circuit_breaker());
1831 assert_eq!(reconstructed.code(), "TOOL_ERROR");
1832 }
1833
1834 #[tokio::test]
1835 async fn tool_error_ipc_message_roundtrip() {
1836 let msg = ParentMessage::ToolCallResult {
1837 request_id: 42,
1838 result: Err(IpcDispatchError::from(
1839 &forge_error::DispatchError::ToolError {
1840 server: "arbiter".into(),
1841 tool: "scan".into(),
1842 message: "bad params".into(),
1843 },
1844 )),
1845 };
1846
1847 let mut buf = Vec::new();
1848 write_message(&mut buf, &msg).await.unwrap();
1849
1850 let mut cursor = Cursor::new(buf);
1851 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1852
1853 match decoded {
1854 ParentMessage::ToolCallResult { request_id, result } => {
1855 assert_eq!(request_id, 42);
1856 let err = result.unwrap_err();
1857 assert_eq!(err.code, "TOOL_ERROR");
1858 assert_eq!(err.server, Some("arbiter".into()));
1859 assert_eq!(err.tool, Some("scan".into()));
1860
1861 let dispatch_err = err.to_dispatch_error();
1863 assert!(!dispatch_err.trips_circuit_breaker());
1864 }
1865 other => panic!("expected ToolCallResult, got: {:?}", other),
1866 }
1867 }
1868}