1use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21 Timeout,
23 HeapLimit,
25 JsError,
27 Execution,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39 pub code: String,
41 pub message: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub server: Option<String>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub tool: Option<String>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55 pub fn from_string(msg: String) -> Self {
58 Self {
59 code: "INTERNAL".to_string(),
60 message: msg,
61 server: None,
62 tool: None,
63 timeout_ms: None,
64 }
65 }
66
67 pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69 match self.code.as_str() {
70 "SERVER_NOT_FOUND" => {
71 forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72 }
73 "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74 server: self.server.unwrap_or_default(),
75 tool: self.tool.unwrap_or_default(),
76 },
77 "TIMEOUT" => forge_error::DispatchError::Timeout {
78 server: self.server.unwrap_or_default(),
79 timeout_ms: self.timeout_ms.unwrap_or(0),
80 },
81 "CIRCUIT_OPEN" => {
82 forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83 }
84 "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85 reason: self.message,
86 },
87 "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88 server: self.server.unwrap_or_default(),
89 message: self.message,
90 },
91 "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
92 _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
93 }
94 }
95}
96
97impl From<&forge_error::DispatchError> for IpcDispatchError {
98 fn from(e: &forge_error::DispatchError) -> Self {
99 let (server, tool, timeout_ms) = match e {
100 forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
101 forge_error::DispatchError::ToolNotFound { server, tool } => {
102 (Some(server.clone()), Some(tool.clone()), None)
103 }
104 forge_error::DispatchError::Timeout {
105 server, timeout_ms, ..
106 } => (Some(server.clone()), None, Some(*timeout_ms)),
107 forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
108 forge_error::DispatchError::Upstream { server, .. } => {
109 (Some(server.clone()), None, None)
110 }
111 _ => (None, None, None),
112 };
113
114 Self {
115 code: e.code().to_string(),
116 message: e.to_string(),
117 server,
118 tool,
119 timeout_ms,
120 }
121 }
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(tag = "type")]
127#[non_exhaustive]
128pub enum ParentMessage {
129 Execute {
131 code: String,
133 manifest: Option<Value>,
135 config: WorkerConfig,
137 },
138 ToolCallResult {
140 request_id: u64,
142 result: Result<Value, IpcDispatchError>,
144 },
145 ResourceReadResult {
147 request_id: u64,
149 result: Result<Value, IpcDispatchError>,
151 },
152 Reset {
157 config: WorkerConfig,
159 },
160 StashResult {
162 request_id: u64,
164 result: Result<Value, IpcDispatchError>,
166 },
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(tag = "type")]
172#[non_exhaustive]
173pub enum ChildMessage {
174 ToolCallRequest {
176 request_id: u64,
178 server: String,
180 tool: String,
182 args: Value,
184 },
185 ResourceReadRequest {
187 request_id: u64,
189 server: String,
191 uri: String,
193 },
194 StashPut {
196 request_id: u64,
198 key: String,
200 value: Value,
202 ttl_secs: Option<u32>,
204 #[serde(default, skip_serializing_if = "Option::is_none")]
206 group: Option<String>,
207 },
208 StashGet {
210 request_id: u64,
212 key: String,
214 #[serde(default, skip_serializing_if = "Option::is_none")]
216 group: Option<String>,
217 },
218 StashDelete {
220 request_id: u64,
222 key: String,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
226 group: Option<String>,
227 },
228 StashKeys {
230 request_id: u64,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
234 group: Option<String>,
235 },
236 ResetComplete,
238 ExecutionComplete {
240 result: Result<Value, String>,
242 #[serde(default, skip_serializing_if = "Option::is_none")]
246 error_kind: Option<ErrorKind>,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
251 timeout_ms: Option<u64>,
252 },
253 Log {
255 message: String,
257 },
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct WorkerConfig {
263 pub timeout_ms: u64,
265 pub max_heap_size: usize,
267 pub max_tool_calls: usize,
269 pub max_tool_call_args_size: usize,
271 pub max_output_size: usize,
273 pub max_code_size: usize,
275 #[serde(default = "default_max_ipc_message_size")]
277 pub max_ipc_message_size: usize,
278 #[serde(default = "default_max_resource_size")]
280 pub max_resource_size: usize,
281 #[serde(default = "default_max_parallel")]
283 pub max_parallel: usize,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
287 pub known_tools: Option<Vec<(String, String)>>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
290 pub known_servers: Option<std::collections::HashSet<String>>,
291}
292
293fn default_max_ipc_message_size() -> usize {
294 DEFAULT_MAX_IPC_MESSAGE_SIZE
295}
296
297fn default_max_resource_size() -> usize {
298 64 * 1024 * 1024 }
300
301fn default_max_parallel() -> usize {
302 8
303}
304
305impl From<&crate::SandboxConfig> for WorkerConfig {
306 fn from(config: &crate::SandboxConfig) -> Self {
307 Self {
308 timeout_ms: config.timeout.as_millis() as u64,
309 max_heap_size: config.max_heap_size,
310 max_tool_calls: config.max_tool_calls,
311 max_tool_call_args_size: config.max_tool_call_args_size,
312 max_output_size: config.max_output_size,
313 max_code_size: config.max_code_size,
314 max_ipc_message_size: config.max_ipc_message_size,
315 max_resource_size: config.max_resource_size,
316 max_parallel: config.max_parallel,
317 known_tools: None,
318 known_servers: None,
319 }
320 }
321}
322
323impl WorkerConfig {
324 pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
326 crate::SandboxConfig {
327 timeout: Duration::from_millis(self.timeout_ms),
328 max_code_size: self.max_code_size,
329 max_output_size: self.max_output_size,
330 max_heap_size: self.max_heap_size,
331 max_concurrent: 1, max_tool_calls: self.max_tool_calls,
333 max_tool_call_args_size: self.max_tool_call_args_size,
334 execution_mode: crate::executor::ExecutionMode::InProcess, max_resource_size: self.max_resource_size,
336 max_parallel: self.max_parallel,
337 max_ipc_message_size: self.max_ipc_message_size,
338 }
339 }
340}
341
342pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
346 writer: &mut W,
347 msg: &T,
348) -> Result<(), std::io::Error> {
349 let payload = serde_json::to_vec(msg)
350 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
351 let len = u32::try_from(payload.len()).map_err(|_| {
352 std::io::Error::new(
353 std::io::ErrorKind::InvalidData,
354 format!(
355 "IPC payload too large: {} bytes (max {} bytes)",
356 payload.len(),
357 u32::MAX
358 ),
359 )
360 })?;
361 writer.write_all(&len.to_be_bytes()).await?;
362 writer.write_all(&payload).await?;
363 writer.flush().await?;
364 Ok(())
365}
366
367pub async fn write_raw_message<W: AsyncWrite + Unpin>(
372 writer: &mut W,
373 payload: &[u8],
374) -> Result<(), std::io::Error> {
375 let len = u32::try_from(payload.len()).map_err(|_| {
376 std::io::Error::new(
377 std::io::ErrorKind::InvalidData,
378 format!(
379 "raw IPC payload too large: {} bytes (max {} bytes)",
380 payload.len(),
381 u32::MAX
382 ),
383 )
384 })?;
385 writer.write_all(&len.to_be_bytes()).await?;
386 writer.write_all(payload).await?;
387 writer.flush().await?;
388 Ok(())
389}
390
391pub async fn read_raw_message<R: AsyncRead + Unpin>(
396 reader: &mut R,
397 max_size: usize,
398) -> Result<Option<Box<RawValue>>, std::io::Error> {
399 let mut len_buf = [0u8; 4];
400 match reader.read_exact(&mut len_buf).await {
401 Ok(_) => {}
402 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
403 Err(e) => return Err(e),
404 }
405
406 let len = u32::from_be_bytes(len_buf) as usize;
407
408 if len > max_size {
409 return Err(std::io::Error::new(
410 std::io::ErrorKind::InvalidData,
411 format!(
412 "raw IPC message too large: {} bytes (limit: {} bytes)",
413 len, max_size
414 ),
415 ));
416 }
417
418 let mut payload = vec![0u8; len];
419 reader.read_exact(&mut payload).await?;
420
421 let raw: Box<RawValue> = serde_json::from_slice(&payload)
422 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
423 Ok(Some(raw))
424}
425
426pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
431
432pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
437 reader: &mut R,
438) -> Result<Option<T>, std::io::Error> {
439 read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
440}
441
442pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
447 reader: &mut R,
448 max_size: usize,
449) -> Result<Option<T>, std::io::Error> {
450 let mut len_buf = [0u8; 4];
451 match reader.read_exact(&mut len_buf).await {
452 Ok(_) => {}
453 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
454 Err(e) => return Err(e),
455 }
456
457 let len = u32::from_be_bytes(len_buf) as usize;
458
459 if len > max_size {
461 return Err(std::io::Error::new(
462 std::io::ErrorKind::InvalidData,
463 format!(
464 "IPC message too large: {} bytes (limit: {} bytes)",
465 len, max_size
466 ),
467 ));
468 }
469
470 let mut payload = vec![0u8; len];
471 reader.read_exact(&mut payload).await?;
472
473 let msg: T = serde_json::from_slice(&payload)
474 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
475 Ok(Some(msg))
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481 use std::io::Cursor;
482
483 #[tokio::test]
484 async fn roundtrip_parent_execute_message() {
485 let msg = ParentMessage::Execute {
486 code: "async () => { return 42; }".into(),
487 manifest: Some(serde_json::json!({"servers": []})),
488 config: WorkerConfig {
489 timeout_ms: 5000,
490 max_heap_size: 64 * 1024 * 1024,
491 max_tool_calls: 50,
492 max_tool_call_args_size: 1024 * 1024,
493 max_output_size: 1024 * 1024,
494 max_code_size: 64 * 1024,
495 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
496 max_resource_size: 64 * 1024 * 1024,
497 max_parallel: 8,
498 known_tools: None,
499 known_servers: None,
500 },
501 };
502
503 let mut buf = Vec::new();
504 write_message(&mut buf, &msg).await.unwrap();
505
506 let mut cursor = Cursor::new(buf);
507 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
508
509 match decoded {
510 ParentMessage::Execute {
511 code,
512 manifest,
513 config,
514 } => {
515 assert_eq!(code, "async () => { return 42; }");
516 assert!(manifest.is_some());
517 assert_eq!(config.timeout_ms, 5000);
518 }
519 other => panic!("expected Execute, got: {:?}", other),
520 }
521 }
522
523 #[tokio::test]
524 async fn roundtrip_parent_tool_result() {
525 let msg = ParentMessage::ToolCallResult {
526 request_id: 42,
527 result: Ok(serde_json::json!({"status": "ok"})),
528 };
529
530 let mut buf = Vec::new();
531 write_message(&mut buf, &msg).await.unwrap();
532
533 let mut cursor = Cursor::new(buf);
534 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
535
536 match decoded {
537 ParentMessage::ToolCallResult { request_id, result } => {
538 assert_eq!(request_id, 42);
539 assert!(result.is_ok());
540 }
541 other => panic!("expected ToolCallResult, got: {:?}", other),
542 }
543 }
544
545 #[tokio::test]
546 async fn roundtrip_parent_tool_result_error() {
547 let msg = ParentMessage::ToolCallResult {
548 request_id: 7,
549 result: Err(IpcDispatchError::from_string("connection refused".into())),
550 };
551
552 let mut buf = Vec::new();
553 write_message(&mut buf, &msg).await.unwrap();
554
555 let mut cursor = Cursor::new(buf);
556 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
557
558 match decoded {
559 ParentMessage::ToolCallResult { request_id, result } => {
560 assert_eq!(request_id, 7);
561 let err = result.unwrap_err();
562 assert_eq!(err.message, "connection refused");
563 assert_eq!(err.code, "INTERNAL");
564 }
565 other => panic!("expected ToolCallResult, got: {:?}", other),
566 }
567 }
568
569 #[tokio::test]
570 async fn roundtrip_child_tool_request() {
571 let msg = ChildMessage::ToolCallRequest {
572 request_id: 1,
573 server: "narsil".into(),
574 tool: "ast.parse".into(),
575 args: serde_json::json!({"file": "test.rs"}),
576 };
577
578 let mut buf = Vec::new();
579 write_message(&mut buf, &msg).await.unwrap();
580
581 let mut cursor = Cursor::new(buf);
582 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
583
584 match decoded {
585 ChildMessage::ToolCallRequest {
586 request_id,
587 server,
588 tool,
589 args,
590 } => {
591 assert_eq!(request_id, 1);
592 assert_eq!(server, "narsil");
593 assert_eq!(tool, "ast.parse");
594 assert_eq!(args["file"], "test.rs");
595 }
596 other => panic!("expected ToolCallRequest, got: {:?}", other),
597 }
598 }
599
600 #[tokio::test]
601 async fn roundtrip_child_execution_complete() {
602 let msg = ChildMessage::ExecutionComplete {
603 result: Ok(serde_json::json!([1, 2, 3])),
604 error_kind: None,
605 timeout_ms: None,
606 };
607
608 let mut buf = Vec::new();
609 write_message(&mut buf, &msg).await.unwrap();
610
611 let mut cursor = Cursor::new(buf);
612 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
613
614 match decoded {
615 ChildMessage::ExecutionComplete {
616 result, error_kind, ..
617 } => {
618 assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
619 assert_eq!(error_kind, None);
620 }
621 other => panic!("expected ExecutionComplete, got: {:?}", other),
622 }
623 }
624
625 #[tokio::test]
626 async fn roundtrip_child_log() {
627 let msg = ChildMessage::Log {
628 message: "processing step 3".into(),
629 };
630
631 let mut buf = Vec::new();
632 write_message(&mut buf, &msg).await.unwrap();
633
634 let mut cursor = Cursor::new(buf);
635 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
636
637 match decoded {
638 ChildMessage::Log { message } => {
639 assert_eq!(message, "processing step 3");
640 }
641 other => panic!("expected Log, got: {:?}", other),
642 }
643 }
644
645 #[tokio::test]
646 async fn multiple_messages_in_stream() {
647 let msg1 = ChildMessage::Log {
648 message: "first".into(),
649 };
650 let msg2 = ChildMessage::ToolCallRequest {
651 request_id: 1,
652 server: "s".into(),
653 tool: "t".into(),
654 args: serde_json::json!({}),
655 };
656 let msg3 = ChildMessage::ExecutionComplete {
657 result: Ok(serde_json::json!("done")),
658 error_kind: None,
659 timeout_ms: None,
660 };
661
662 let mut buf = Vec::new();
663 write_message(&mut buf, &msg1).await.unwrap();
664 write_message(&mut buf, &msg2).await.unwrap();
665 write_message(&mut buf, &msg3).await.unwrap();
666
667 let mut cursor = Cursor::new(buf);
668 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
669 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
670 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
671
672 assert!(matches!(d1, ChildMessage::Log { .. }));
673 assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
674 assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
675
676 let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
678 assert!(d4.is_none());
679 }
680
681 #[tokio::test]
682 async fn execution_complete_error_roundtrip() {
683 let msg = ChildMessage::ExecutionComplete {
684 result: Err("failed to create tokio runtime: resource unavailable".into()),
685 error_kind: Some(ErrorKind::Execution),
686 timeout_ms: None,
687 };
688
689 let mut buf = Vec::new();
690 write_message(&mut buf, &msg).await.unwrap();
691
692 let mut cursor = Cursor::new(buf);
693 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
694
695 match decoded {
696 ChildMessage::ExecutionComplete {
697 result, error_kind, ..
698 } => {
699 let err = result.unwrap_err();
700 assert!(
701 err.contains("tokio runtime"),
702 "expected runtime error: {err}"
703 );
704 assert_eq!(error_kind, Some(ErrorKind::Execution));
705 }
706 other => panic!("expected ExecutionComplete, got: {:?}", other),
707 }
708 }
709
710 #[tokio::test]
711 async fn eof_returns_none() {
712 let mut cursor = Cursor::new(Vec::<u8>::new());
713 let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
714 assert!(result.is_none());
715 }
716
717 #[test]
718 fn u32_try_from_overflow() {
719 let overflow_size = u32::MAX as usize + 1;
721 assert!(u32::try_from(overflow_size).is_err());
722 }
723
724 #[tokio::test]
725 async fn write_message_normal_size_succeeds() {
726 let msg = ChildMessage::Log {
728 message: "a".repeat(1024),
729 };
730 let mut buf = Vec::new();
731 write_message(&mut buf, &msg).await.unwrap();
732 assert!(buf.len() > 1024);
733 }
734
735 #[tokio::test]
736 async fn large_message_roundtrip() {
737 let large_data = "x".repeat(1_000_000);
739 let msg = ChildMessage::ExecutionComplete {
740 result: Ok(serde_json::json!(large_data)),
741 error_kind: None,
742 timeout_ms: None,
743 };
744
745 let mut buf = Vec::new();
746 write_message(&mut buf, &msg).await.unwrap();
747
748 let mut cursor = Cursor::new(buf);
749 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
750
751 match decoded {
752 ChildMessage::ExecutionComplete { result, .. } => {
753 assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
754 }
755 other => panic!("expected ExecutionComplete, got: {:?}", other),
756 }
757 }
758
759 #[tokio::test]
760 async fn worker_config_roundtrip_from_sandbox_config() {
761 let sandbox = crate::SandboxConfig::default();
762 let worker = WorkerConfig::from(&sandbox);
763 let back = worker.to_sandbox_config();
764
765 assert_eq!(sandbox.timeout, back.timeout);
766 assert_eq!(sandbox.max_heap_size, back.max_heap_size);
767 assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
768 assert_eq!(sandbox.max_output_size, back.max_output_size);
769 assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
770 assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); }
772
773 #[tokio::test]
774 async fn read_message_with_limit_rejects_oversized() {
775 let msg = ChildMessage::Log {
776 message: "x".repeat(1024),
777 };
778 let mut buf = Vec::new();
779 write_message(&mut buf, &msg).await.unwrap();
780
781 let mut cursor = Cursor::new(buf);
783 let result: Result<Option<ChildMessage>, _> =
784 read_message_with_limit(&mut cursor, 64).await;
785 assert!(result.is_err());
786 let err_msg = result.unwrap_err().to_string();
787 assert!(err_msg.contains("too large"), "error: {err_msg}");
788 }
789
790 #[tokio::test]
791 async fn read_message_with_limit_accepts_within_limit() {
792 let msg = ChildMessage::Log {
793 message: "hello".into(),
794 };
795 let mut buf = Vec::new();
796 write_message(&mut buf, &msg).await.unwrap();
797
798 let mut cursor = Cursor::new(buf);
799 let result: Option<ChildMessage> =
800 read_message_with_limit(&mut cursor, 1024).await.unwrap();
801 assert!(result.is_some());
802 }
803
804 #[tokio::test]
805 async fn worker_config_ipc_limit_serde_default() {
806 let json = r#"{
808 "timeout_ms": 5000,
809 "max_heap_size": 67108864,
810 "max_tool_calls": 50,
811 "max_tool_call_args_size": 1048576,
812 "max_output_size": 1048576,
813 "max_code_size": 65536
814 }"#;
815 let config: WorkerConfig = serde_json::from_str(json).unwrap();
816 assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
817 }
818
819 #[tokio::test]
821 async fn ipc_01_resource_read_request_roundtrip() {
822 let msg = ChildMessage::ResourceReadRequest {
823 request_id: 10,
824 server: "postgres".into(),
825 uri: "file:///logs/app.log".into(),
826 };
827
828 let mut buf = Vec::new();
829 write_message(&mut buf, &msg).await.unwrap();
830
831 let mut cursor = Cursor::new(buf);
832 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
833
834 match decoded {
835 ChildMessage::ResourceReadRequest {
836 request_id,
837 server,
838 uri,
839 } => {
840 assert_eq!(request_id, 10);
841 assert_eq!(server, "postgres");
842 assert_eq!(uri, "file:///logs/app.log");
843 }
844 other => panic!("expected ResourceReadRequest, got: {:?}", other),
845 }
846 }
847
848 #[tokio::test]
850 async fn ipc_02_resource_read_result_success_roundtrip() {
851 let msg = ParentMessage::ResourceReadResult {
852 request_id: 11,
853 result: Ok(serde_json::json!({"content": "log data here"})),
854 };
855
856 let mut buf = Vec::new();
857 write_message(&mut buf, &msg).await.unwrap();
858
859 let mut cursor = Cursor::new(buf);
860 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
861
862 match decoded {
863 ParentMessage::ResourceReadResult { request_id, result } => {
864 assert_eq!(request_id, 11);
865 let val = result.unwrap();
866 assert_eq!(val["content"], "log data here");
867 }
868 other => panic!("expected ResourceReadResult, got: {:?}", other),
869 }
870 }
871
872 #[tokio::test]
874 async fn ipc_03_resource_read_result_error_roundtrip() {
875 let msg = ParentMessage::ResourceReadResult {
876 request_id: 12,
877 result: Err(IpcDispatchError::from_string("resource not found".into())),
878 };
879
880 let mut buf = Vec::new();
881 write_message(&mut buf, &msg).await.unwrap();
882
883 let mut cursor = Cursor::new(buf);
884 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
885
886 match decoded {
887 ParentMessage::ResourceReadResult { request_id, result } => {
888 assert_eq!(request_id, 12);
889 let err = result.unwrap_err();
890 assert_eq!(err.message, "resource not found");
891 assert_eq!(err.code, "INTERNAL");
892 }
893 other => panic!("expected ResourceReadResult, got: {:?}", other),
894 }
895 }
896
897 #[tokio::test]
899 async fn ipc_04_stash_put_roundtrip() {
900 let msg = ChildMessage::StashPut {
901 request_id: 20,
902 key: "my-key".into(),
903 value: serde_json::json!({"data": 42}),
904 ttl_secs: Some(60),
905 group: None,
906 };
907
908 let mut buf = Vec::new();
909 write_message(&mut buf, &msg).await.unwrap();
910
911 let mut cursor = Cursor::new(buf);
912 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
913
914 match decoded {
915 ChildMessage::StashPut {
916 request_id,
917 key,
918 value,
919 ttl_secs,
920 group,
921 } => {
922 assert_eq!(request_id, 20);
923 assert_eq!(key, "my-key");
924 assert_eq!(value["data"], 42);
925 assert_eq!(ttl_secs, Some(60));
926 assert_eq!(group, None);
927 }
928 other => panic!("expected StashPut, got: {:?}", other),
929 }
930 }
931
932 #[tokio::test]
934 async fn ipc_05_stash_get_roundtrip() {
935 let msg = ChildMessage::StashGet {
936 request_id: 21,
937 key: "lookup-key".into(),
938 group: None,
939 };
940
941 let mut buf = Vec::new();
942 write_message(&mut buf, &msg).await.unwrap();
943
944 let mut cursor = Cursor::new(buf);
945 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
946
947 match decoded {
948 ChildMessage::StashGet {
949 request_id,
950 key,
951 group,
952 } => {
953 assert_eq!(request_id, 21);
954 assert_eq!(key, "lookup-key");
955 assert_eq!(group, None);
956 }
957 other => panic!("expected StashGet, got: {:?}", other),
958 }
959 }
960
961 #[tokio::test]
963 async fn ipc_06_stash_delete_roundtrip() {
964 let msg = ChildMessage::StashDelete {
965 request_id: 22,
966 key: "delete-me".into(),
967 group: None,
968 };
969
970 let mut buf = Vec::new();
971 write_message(&mut buf, &msg).await.unwrap();
972
973 let mut cursor = Cursor::new(buf);
974 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
975
976 match decoded {
977 ChildMessage::StashDelete {
978 request_id,
979 key,
980 group,
981 } => {
982 assert_eq!(request_id, 22);
983 assert_eq!(key, "delete-me");
984 assert_eq!(group, None);
985 }
986 other => panic!("expected StashDelete, got: {:?}", other),
987 }
988 }
989
990 #[tokio::test]
992 async fn ipc_07_stash_keys_roundtrip() {
993 let msg = ChildMessage::StashKeys {
994 request_id: 23,
995 group: None,
996 };
997
998 let mut buf = Vec::new();
999 write_message(&mut buf, &msg).await.unwrap();
1000
1001 let mut cursor = Cursor::new(buf);
1002 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1003
1004 match decoded {
1005 ChildMessage::StashKeys { request_id, group } => {
1006 assert_eq!(request_id, 23);
1007 assert_eq!(group, None);
1008 }
1009 other => panic!("expected StashKeys, got: {:?}", other),
1010 }
1011 }
1012
1013 #[tokio::test]
1015 async fn ipc_08_stash_result_roundtrip() {
1016 let msg = ParentMessage::StashResult {
1017 request_id: 24,
1018 result: Ok(serde_json::json!({"ok": true})),
1019 };
1020
1021 let mut buf = Vec::new();
1022 write_message(&mut buf, &msg).await.unwrap();
1023
1024 let mut cursor = Cursor::new(buf);
1025 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1026
1027 match decoded {
1028 ParentMessage::StashResult { request_id, result } => {
1029 assert_eq!(request_id, 24);
1030 assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1031 }
1032 other => panic!("expected StashResult, got: {:?}", other),
1033 }
1034 }
1035
1036 #[tokio::test]
1038 async fn ipc_09_mixed_message_interleaving() {
1039 let msg1 = ChildMessage::ToolCallRequest {
1040 request_id: 1,
1041 server: "s".into(),
1042 tool: "t".into(),
1043 args: serde_json::json!({}),
1044 };
1045 let msg2 = ChildMessage::ResourceReadRequest {
1046 request_id: 2,
1047 server: "pg".into(),
1048 uri: "file:///data".into(),
1049 };
1050 let msg3 = ChildMessage::StashPut {
1051 request_id: 3,
1052 key: "k".into(),
1053 value: serde_json::json!("v"),
1054 ttl_secs: None,
1055 group: None,
1056 };
1057 let msg4 = ChildMessage::StashGet {
1058 request_id: 4,
1059 key: "k".into(),
1060 group: None,
1061 };
1062 let msg5 = ChildMessage::ExecutionComplete {
1063 result: Ok(serde_json::json!("done")),
1064 error_kind: None,
1065 timeout_ms: None,
1066 };
1067
1068 let mut buf = Vec::new();
1069 write_message(&mut buf, &msg1).await.unwrap();
1070 write_message(&mut buf, &msg2).await.unwrap();
1071 write_message(&mut buf, &msg3).await.unwrap();
1072 write_message(&mut buf, &msg4).await.unwrap();
1073 write_message(&mut buf, &msg5).await.unwrap();
1074
1075 let mut cursor = Cursor::new(buf);
1076 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1077 let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1078 let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1079 let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1080 let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1081
1082 assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1083 assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1084 assert!(matches!(d3, ChildMessage::StashPut { .. }));
1085 assert!(matches!(d4, ChildMessage::StashGet { .. }));
1086 assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1087
1088 let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1090 assert!(d6.is_none());
1091 }
1092
1093 #[tokio::test]
1095 async fn ipc_p01_reset_roundtrip() {
1096 let msg = ParentMessage::Reset {
1097 config: WorkerConfig {
1098 timeout_ms: 3000,
1099 max_heap_size: 32 * 1024 * 1024,
1100 max_tool_calls: 25,
1101 max_tool_call_args_size: 512 * 1024,
1102 max_output_size: 512 * 1024,
1103 max_code_size: 32 * 1024,
1104 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1105 max_resource_size: 32 * 1024 * 1024,
1106 max_parallel: 4,
1107 known_tools: None,
1108 known_servers: None,
1109 },
1110 };
1111
1112 let mut buf = Vec::new();
1113 write_message(&mut buf, &msg).await.unwrap();
1114
1115 let mut cursor = Cursor::new(buf);
1116 let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1117
1118 match decoded {
1119 ParentMessage::Reset { config } => {
1120 assert_eq!(config.timeout_ms, 3000);
1121 assert_eq!(config.max_tool_calls, 25);
1122 }
1123 other => panic!("expected Reset, got: {:?}", other),
1124 }
1125 }
1126
1127 #[tokio::test]
1129 async fn ipc_p02_reset_complete_roundtrip() {
1130 let msg = ChildMessage::ResetComplete;
1131
1132 let mut buf = Vec::new();
1133 write_message(&mut buf, &msg).await.unwrap();
1134
1135 let mut cursor = Cursor::new(buf);
1136 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1137
1138 assert!(matches!(decoded, ChildMessage::ResetComplete));
1139 }
1140
1141 #[tokio::test]
1143 async fn ipc_p03_reset_execute_interleaving() {
1144 let reset = ParentMessage::Reset {
1145 config: WorkerConfig {
1146 timeout_ms: 5000,
1147 max_heap_size: 64 * 1024 * 1024,
1148 max_tool_calls: 50,
1149 max_tool_call_args_size: 1024 * 1024,
1150 max_output_size: 1024 * 1024,
1151 max_code_size: 64 * 1024,
1152 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1153 max_resource_size: 64 * 1024 * 1024,
1154 max_parallel: 8,
1155 known_tools: None,
1156 known_servers: None,
1157 },
1158 };
1159 let execute = ParentMessage::Execute {
1160 code: "async () => 42".into(),
1161 manifest: None,
1162 config: WorkerConfig {
1163 timeout_ms: 5000,
1164 max_heap_size: 64 * 1024 * 1024,
1165 max_tool_calls: 50,
1166 max_tool_call_args_size: 1024 * 1024,
1167 max_output_size: 1024 * 1024,
1168 max_code_size: 64 * 1024,
1169 max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1170 max_resource_size: 64 * 1024 * 1024,
1171 max_parallel: 8,
1172 known_tools: None,
1173 known_servers: None,
1174 },
1175 };
1176
1177 let mut buf = Vec::new();
1178 write_message(&mut buf, &reset).await.unwrap();
1179 write_message(&mut buf, &execute).await.unwrap();
1180
1181 let mut cursor = Cursor::new(buf);
1182 let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1183 let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1184
1185 assert!(matches!(d1, ParentMessage::Reset { .. }));
1186 assert!(matches!(d2, ParentMessage::Execute { .. }));
1187 }
1188
1189 #[tokio::test]
1191 async fn ipc_10_oversized_stash_message_rejected() {
1192 let msg = ChildMessage::StashPut {
1193 request_id: 100,
1194 key: "k".into(),
1195 value: serde_json::json!("x".repeat(2048)),
1196 ttl_secs: Some(60),
1197 group: None,
1198 };
1199 let mut buf = Vec::new();
1200 write_message(&mut buf, &msg).await.unwrap();
1201
1202 let mut cursor = Cursor::new(buf);
1204 let result: Result<Option<ChildMessage>, _> =
1205 read_message_with_limit(&mut cursor, 64).await;
1206 assert!(result.is_err());
1207 let err_msg = result.unwrap_err().to_string();
1208 assert!(
1209 err_msg.contains("too large"),
1210 "error should mention 'too large': {err_msg}"
1211 );
1212 }
1213
1214 #[tokio::test]
1216 async fn ipc_o01_error_kind_timeout_roundtrip() {
1217 let msg = ChildMessage::ExecutionComplete {
1218 result: Err("execution timed out after 500ms".into()),
1219 error_kind: Some(ErrorKind::Timeout),
1220 timeout_ms: Some(500),
1221 };
1222
1223 let mut buf = Vec::new();
1224 write_message(&mut buf, &msg).await.unwrap();
1225
1226 let mut cursor = Cursor::new(buf);
1227 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1228
1229 match decoded {
1230 ChildMessage::ExecutionComplete {
1231 result,
1232 error_kind,
1233 timeout_ms,
1234 } => {
1235 assert!(result.is_err());
1236 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1237 assert_eq!(timeout_ms, Some(500));
1238 }
1239 other => panic!("expected ExecutionComplete, got: {:?}", other),
1240 }
1241 }
1242
1243 #[tokio::test]
1245 async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1246 let msg = ChildMessage::ExecutionComplete {
1247 result: Err("V8 heap limit exceeded".into()),
1248 error_kind: Some(ErrorKind::HeapLimit),
1249 timeout_ms: None,
1250 };
1251
1252 let mut buf = Vec::new();
1253 write_message(&mut buf, &msg).await.unwrap();
1254
1255 let mut cursor = Cursor::new(buf);
1256 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1257
1258 match decoded {
1259 ChildMessage::ExecutionComplete {
1260 result, error_kind, ..
1261 } => {
1262 assert!(result.is_err());
1263 assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1264 }
1265 other => panic!("expected ExecutionComplete, got: {:?}", other),
1266 }
1267 }
1268
1269 #[tokio::test]
1271 async fn ipc_o03_error_kind_backward_compat() {
1272 let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1275 let mut buf = Vec::new();
1276 let payload = json.as_bytes();
1277 let len = payload.len() as u32;
1278 buf.extend_from_slice(&len.to_be_bytes());
1279 buf.extend_from_slice(payload);
1280
1281 let mut cursor = Cursor::new(buf);
1282 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1283
1284 match decoded {
1285 ChildMessage::ExecutionComplete {
1286 result,
1287 error_kind,
1288 timeout_ms,
1289 } => {
1290 assert!(result.is_err());
1291 assert_eq!(
1292 error_kind, None,
1293 "missing error_kind should default to None"
1294 );
1295 assert_eq!(
1296 timeout_ms, None,
1297 "missing timeout_ms should default to None"
1298 );
1299 }
1300 other => panic!("expected ExecutionComplete, got: {:?}", other),
1301 }
1302 }
1303
1304 #[tokio::test]
1306 async fn ipc_o04_error_kind_js_error_roundtrip() {
1307 let msg = ChildMessage::ExecutionComplete {
1308 result: Err("ReferenceError: x is not defined".into()),
1309 error_kind: Some(ErrorKind::JsError),
1310 timeout_ms: None,
1311 };
1312
1313 let mut buf = Vec::new();
1314 write_message(&mut buf, &msg).await.unwrap();
1315
1316 let mut cursor = Cursor::new(buf);
1317 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1318
1319 match decoded {
1320 ChildMessage::ExecutionComplete {
1321 result, error_kind, ..
1322 } => {
1323 assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1324 assert_eq!(error_kind, Some(ErrorKind::JsError));
1325 }
1326 other => panic!("expected ExecutionComplete, got: {:?}", other),
1327 }
1328 }
1329
1330 #[tokio::test]
1332 async fn ipc_o05_success_omits_error_kind() {
1333 let msg = ChildMessage::ExecutionComplete {
1334 result: Ok(serde_json::json!(42)),
1335 error_kind: None,
1336 timeout_ms: None,
1337 };
1338
1339 let json = serde_json::to_string(&msg).unwrap();
1340 assert!(
1342 !json.contains("error_kind"),
1343 "success messages should not contain error_kind field: {json}"
1344 );
1345 assert!(
1346 !json.contains("timeout_ms"),
1347 "success messages should not contain timeout_ms field: {json}"
1348 );
1349 }
1350
1351 #[tokio::test]
1354 async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1355 let msg = ChildMessage::StashPut {
1356 request_id: 50,
1357 key: "grouped-key".into(),
1358 value: serde_json::json!({"data": "secret"}),
1359 ttl_secs: Some(120),
1360 group: Some("analytics".into()),
1361 };
1362
1363 let mut buf = Vec::new();
1364 write_message(&mut buf, &msg).await.unwrap();
1365
1366 let mut cursor = Cursor::new(buf);
1367 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1368
1369 match decoded {
1370 ChildMessage::StashPut {
1371 request_id,
1372 key,
1373 group,
1374 ..
1375 } => {
1376 assert_eq!(request_id, 50);
1377 assert_eq!(key, "grouped-key");
1378 assert_eq!(group, Some("analytics".into()));
1379 }
1380 other => panic!("expected StashPut, got: {:?}", other),
1381 }
1382 }
1383
1384 #[tokio::test]
1385 async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1386 let msg = ChildMessage::StashGet {
1387 request_id: 51,
1388 key: "grouped-key".into(),
1389 group: Some("analytics".into()),
1390 };
1391
1392 let mut buf = Vec::new();
1393 write_message(&mut buf, &msg).await.unwrap();
1394
1395 let mut cursor = Cursor::new(buf);
1396 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1397
1398 match decoded {
1399 ChildMessage::StashGet {
1400 request_id,
1401 key,
1402 group,
1403 } => {
1404 assert_eq!(request_id, 51);
1405 assert_eq!(key, "grouped-key");
1406 assert_eq!(group, Some("analytics".into()));
1407 }
1408 other => panic!("expected StashGet, got: {:?}", other),
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1414 let msg = ChildMessage::StashDelete {
1415 request_id: 52,
1416 key: "grouped-key".into(),
1417 group: Some("analytics".into()),
1418 };
1419
1420 let mut buf = Vec::new();
1421 write_message(&mut buf, &msg).await.unwrap();
1422
1423 let mut cursor = Cursor::new(buf);
1424 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1425
1426 match decoded {
1427 ChildMessage::StashDelete {
1428 request_id,
1429 key,
1430 group,
1431 } => {
1432 assert_eq!(request_id, 52);
1433 assert_eq!(key, "grouped-key");
1434 assert_eq!(group, Some("analytics".into()));
1435 }
1436 other => panic!("expected StashDelete, got: {:?}", other),
1437 }
1438 }
1439
1440 #[tokio::test]
1441 async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1442 let msg = ChildMessage::StashKeys {
1443 request_id: 53,
1444 group: Some("analytics".into()),
1445 };
1446
1447 let mut buf = Vec::new();
1448 write_message(&mut buf, &msg).await.unwrap();
1449
1450 let mut cursor = Cursor::new(buf);
1451 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1452
1453 match decoded {
1454 ChildMessage::StashKeys { request_id, group } => {
1455 assert_eq!(request_id, 53);
1456 assert_eq!(group, Some("analytics".into()));
1457 }
1458 other => panic!("expected StashKeys, got: {:?}", other),
1459 }
1460 }
1461
1462 #[tokio::test]
1463 async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1464 let msg = ChildMessage::StashPut {
1466 request_id: 54,
1467 key: "no-group-key".into(),
1468 value: serde_json::json!("val"),
1469 ttl_secs: None,
1470 group: None,
1471 };
1472
1473 let json = serde_json::to_string(&msg).unwrap();
1474 assert!(
1475 !json.contains("\"group\""),
1476 "group:None should be absent in serialized JSON: {json}"
1477 );
1478
1479 let mut buf = Vec::new();
1481 write_message(&mut buf, &msg).await.unwrap();
1482 let mut cursor = Cursor::new(buf);
1483 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1484 match decoded {
1485 ChildMessage::StashPut { group, .. } => {
1486 assert_eq!(group, None);
1487 }
1488 other => panic!("expected StashPut, got: {:?}", other),
1489 }
1490 }
1491
1492 #[tokio::test]
1493 async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1494 let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1496 let mut buf = Vec::new();
1497 let payload = json.as_bytes();
1498 let len = payload.len() as u32;
1499 buf.extend_from_slice(&len.to_be_bytes());
1500 buf.extend_from_slice(payload);
1501
1502 let mut cursor = Cursor::new(buf);
1503 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1504
1505 match decoded {
1506 ChildMessage::StashPut {
1507 request_id,
1508 key,
1509 group,
1510 ..
1511 } => {
1512 assert_eq!(request_id, 60);
1513 assert_eq!(key, "old-key");
1514 assert_eq!(
1515 group, None,
1516 "missing group field from v0.3.0 worker should deserialize as None"
1517 );
1518 }
1519 other => panic!("expected StashPut, got: {:?}", other),
1520 }
1521
1522 let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1524 let mut buf = Vec::new();
1525 let payload = json_get.as_bytes();
1526 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1527 buf.extend_from_slice(payload);
1528 let mut cursor = Cursor::new(buf);
1529 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1530 match decoded {
1531 ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1532 other => panic!("expected StashGet, got: {:?}", other),
1533 }
1534
1535 let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1536 let mut buf = Vec::new();
1537 let payload = json_del.as_bytes();
1538 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1539 buf.extend_from_slice(payload);
1540 let mut cursor = Cursor::new(buf);
1541 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1542 match decoded {
1543 ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1544 other => panic!("expected StashDelete, got: {:?}", other),
1545 }
1546
1547 let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1548 let mut buf = Vec::new();
1549 let payload = json_keys.as_bytes();
1550 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1551 buf.extend_from_slice(payload);
1552 let mut cursor = Cursor::new(buf);
1553 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1554 match decoded {
1555 ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1556 other => panic!("expected StashKeys, got: {:?}", other),
1557 }
1558 }
1559
1560 #[tokio::test]
1563 async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1564 let msg = ChildMessage::ExecutionComplete {
1565 result: Err("execution timed out after 5000ms".into()),
1566 error_kind: Some(ErrorKind::Timeout),
1567 timeout_ms: Some(5000),
1568 };
1569
1570 let mut buf = Vec::new();
1571 write_message(&mut buf, &msg).await.unwrap();
1572
1573 let mut cursor = Cursor::new(buf);
1574 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1575
1576 match decoded {
1577 ChildMessage::ExecutionComplete {
1578 result,
1579 error_kind,
1580 timeout_ms,
1581 } => {
1582 assert!(result.is_err());
1583 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1584 assert_eq!(timeout_ms, Some(5000));
1585 }
1586 other => panic!("expected ExecutionComplete, got: {:?}", other),
1587 }
1588 }
1589
1590 #[tokio::test]
1591 async fn ipc_t02_timeout_ms_absent_backward_compat() {
1592 let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1594 let mut buf = Vec::new();
1595 let payload = json.as_bytes();
1596 buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1597 buf.extend_from_slice(payload);
1598
1599 let mut cursor = Cursor::new(buf);
1600 let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1601
1602 match decoded {
1603 ChildMessage::ExecutionComplete {
1604 error_kind,
1605 timeout_ms,
1606 ..
1607 } => {
1608 assert_eq!(error_kind, Some(ErrorKind::Timeout));
1609 assert_eq!(
1610 timeout_ms, None,
1611 "missing timeout_ms should default to None"
1612 );
1613 }
1614 other => panic!("expected ExecutionComplete, got: {:?}", other),
1615 }
1616 }
1617
1618 #[tokio::test]
1619 async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1620 let msg = ChildMessage::ExecutionComplete {
1621 result: Err("some error".into()),
1622 error_kind: Some(ErrorKind::JsError),
1623 timeout_ms: None,
1624 };
1625
1626 let json = serde_json::to_string(&msg).unwrap();
1627 assert!(
1628 !json.contains("timeout_ms"),
1629 "timeout_ms:None should be omitted: {json}"
1630 );
1631 }
1632
1633 #[tokio::test]
1634 async fn ipc_t04_timeout_ms_present_when_some() {
1635 let msg = ChildMessage::ExecutionComplete {
1636 result: Err("timed out".into()),
1637 error_kind: Some(ErrorKind::Timeout),
1638 timeout_ms: Some(10000),
1639 };
1640
1641 let json = serde_json::to_string(&msg).unwrap();
1642 assert!(
1643 json.contains("\"timeout_ms\":10000"),
1644 "timeout_ms should be present: {json}"
1645 );
1646 }
1647
1648 #[tokio::test]
1651 async fn ipc_rv01_write_raw_message_roundtrip() {
1652 let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1653
1654 let mut buf = Vec::new();
1655 write_raw_message(&mut buf, payload).await.unwrap();
1656
1657 let mut cursor = Cursor::new(buf);
1658 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1659 .await
1660 .unwrap()
1661 .unwrap();
1662
1663 assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1664 }
1665
1666 #[tokio::test]
1667 async fn ipc_rv02_read_raw_message_preserves_bytes() {
1668 let msg = ChildMessage::Log {
1670 message: "test raw".into(),
1671 };
1672 let mut buf = Vec::new();
1673 write_message(&mut buf, &msg).await.unwrap();
1674
1675 let mut cursor = Cursor::new(buf);
1676 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1677 .await
1678 .unwrap()
1679 .unwrap();
1680
1681 let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1683 assert!(matches!(parsed, ChildMessage::Log { .. }));
1684 }
1685
1686 #[tokio::test]
1687 async fn ipc_rv03_raw_message_size_limit_enforced() {
1688 let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1689 let mut buf = Vec::new();
1690 write_raw_message(&mut buf, large_payload.as_bytes())
1691 .await
1692 .unwrap();
1693
1694 let mut cursor = Cursor::new(buf);
1695 let result = read_raw_message(&mut cursor, 64).await;
1696 assert!(result.is_err());
1697 let err = result.unwrap_err().to_string();
1698 assert!(err.contains("too large"), "error: {err}");
1699 }
1700
1701 #[tokio::test]
1702 async fn ipc_rv04_large_payload_stays_raw() {
1703 let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1705 let mut buf = Vec::new();
1706 write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1707
1708 let mut cursor = Cursor::new(buf);
1709 let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1710 .await
1711 .unwrap()
1712 .unwrap();
1713
1714 assert!(raw.get().len() > 1_000_000);
1716 let val: Value = serde_json::from_str(raw.get()).unwrap();
1718 assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1719 }
1720
1721 #[tokio::test]
1722 async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1723 let msg = ParentMessage::ToolCallResult {
1725 request_id: 99,
1726 result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1727 };
1728 let mut buf = Vec::new();
1729 write_message(&mut buf, &msg).await.unwrap();
1730
1731 let mut cursor = Cursor::new(buf.clone());
1732 let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1733 .await
1734 .unwrap()
1735 .unwrap();
1736
1737 let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1739 match parsed {
1740 ParentMessage::ToolCallResult { request_id, result } => {
1741 assert_eq!(request_id, 99);
1742 assert!(result.is_ok());
1743 }
1744 other => panic!("expected ToolCallResult, got: {:?}", other),
1745 }
1746 }
1747
1748 #[tokio::test]
1749 async fn ipc_rv06_raw_eof_returns_none() {
1750 let mut cursor = Cursor::new(Vec::<u8>::new());
1751 let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1752 .await
1753 .unwrap();
1754 assert!(result.is_none());
1755 }
1756
1757 #[tokio::test]
1758 async fn ipc_rv07_mixed_raw_and_value_messages() {
1759 let mut buf = Vec::new();
1760
1761 let msg1 = ChildMessage::Log {
1763 message: "first".into(),
1764 };
1765 write_message(&mut buf, &msg1).await.unwrap();
1766
1767 let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1769 write_raw_message(&mut buf, raw_payload).await.unwrap();
1770
1771 let mut cursor = Cursor::new(buf);
1773 let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1774 assert!(matches!(d1, ChildMessage::Log { .. }));
1775
1776 let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1777 .await
1778 .unwrap()
1779 .unwrap();
1780 let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1781 assert!(matches!(parsed, ChildMessage::Log { .. }));
1782 }
1783}