Skip to main content

forge_sandbox/
ipc.rs

1//! IPC protocol for parent ↔ worker communication.
2//!
3//! Uses length-delimited JSON messages: 4-byte big-endian length prefix + JSON payload.
4//! All messages are typed via [`ParentMessage`] and [`ChildMessage`] enums.
5
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12/// Error classification for structured error preservation across IPC.
13///
14/// When errors cross the process boundary via `ExecutionComplete`, the typed
15/// `SandboxError` variants are converted to strings. This enum preserves the
16/// error kind so the parent can reconstruct the correct `SandboxError` variant.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21    /// V8 execution timed out (CPU watchdog or async event loop).
22    Timeout,
23    /// V8 heap memory limit was exceeded.
24    HeapLimit,
25    /// A JavaScript error was thrown.
26    JsError,
27    /// Generic execution failure.
28    Execution,
29}
30
31/// Structured dispatch error for IPC transport.
32///
33/// Preserves [`forge_error::DispatchError`] variant information across the
34/// parent ↔ worker process boundary. Without this, all errors would be
35/// flattened to `DispatchError::Internal` in ChildProcess mode, losing
36/// error codes like `SERVER_NOT_FOUND` and fuzzy match suggestions.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39    /// Error code matching `DispatchError::code()` output.
40    pub code: String,
41    /// Human-readable error message.
42    pub message: String,
43    /// Server name (for ServerNotFound, ToolNotFound, ToolError, Timeout, CircuitOpen, Upstream).
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub server: Option<String>,
46    /// Tool name (for ToolNotFound, ToolError).
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub tool: Option<String>,
49    /// Timeout in milliseconds (for Timeout).
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55    /// Create from a plain string error message (used for non-dispatch errors
56    /// like "stash dispatcher not available" or URI validation failures).
57    pub fn from_string(msg: String) -> Self {
58        Self {
59            code: "INTERNAL".to_string(),
60            message: msg,
61            server: None,
62            tool: None,
63            timeout_ms: None,
64        }
65    }
66
67    /// Reconstruct the appropriate [`forge_error::DispatchError`] variant.
68    pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69        match self.code.as_str() {
70            "SERVER_NOT_FOUND" => {
71                forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72            }
73            "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74                server: self.server.unwrap_or_default(),
75                tool: self.tool.unwrap_or_default(),
76            },
77            "TIMEOUT" => forge_error::DispatchError::Timeout {
78                server: self.server.unwrap_or_default(),
79                timeout_ms: self.timeout_ms.unwrap_or(0),
80            },
81            "CIRCUIT_OPEN" => {
82                forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83            }
84            "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85                reason: self.message,
86            },
87            "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88                server: self.server.unwrap_or_default(),
89                message: self.message,
90            },
91            "TOOL_ERROR" => forge_error::DispatchError::ToolError {
92                server: self.server.unwrap_or_default(),
93                tool: self.tool.unwrap_or_default(),
94                message: self.message,
95            },
96            "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
97            _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
98        }
99    }
100}
101
102impl From<&forge_error::DispatchError> for IpcDispatchError {
103    fn from(e: &forge_error::DispatchError) -> Self {
104        let (server, tool, timeout_ms) = match e {
105            forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
106            forge_error::DispatchError::ToolNotFound { server, tool } => {
107                (Some(server.clone()), Some(tool.clone()), None)
108            }
109            forge_error::DispatchError::Timeout {
110                server, timeout_ms, ..
111            } => (Some(server.clone()), None, Some(*timeout_ms)),
112            forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
113            forge_error::DispatchError::Upstream { server, .. } => {
114                (Some(server.clone()), None, None)
115            }
116            forge_error::DispatchError::ToolError { server, tool, .. } => {
117                (Some(server.clone()), Some(tool.clone()), None)
118            }
119            _ => (None, None, None),
120        };
121
122        Self {
123            code: e.code().to_string(),
124            message: e.to_string(),
125            server,
126            tool,
127            timeout_ms,
128        }
129    }
130}
131
132/// Messages sent from the parent process to the worker child.
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(tag = "type")]
135#[non_exhaustive]
136pub enum ParentMessage {
137    /// Initial message: execute this code in the sandbox.
138    Execute {
139        /// The JavaScript async arrow function to execute.
140        code: String,
141        /// Optional capability manifest (for search mode — not used in child process execute).
142        manifest: Option<Value>,
143        /// Worker configuration.
144        config: WorkerConfig,
145    },
146    /// Response to a tool call request from the child.
147    ToolCallResult {
148        /// Matches the request_id from ChildMessage::ToolCallRequest.
149        request_id: u64,
150        /// The tool call result, or a structured dispatch error.
151        result: Result<Value, IpcDispatchError>,
152    },
153    /// Response to a resource read request from the child.
154    ResourceReadResult {
155        /// Matches the request_id from ChildMessage::ResourceReadRequest.
156        request_id: u64,
157        /// The resource content, or a structured dispatch error.
158        result: Result<Value, IpcDispatchError>,
159    },
160    /// Reset the worker for a new execution (pool mode).
161    ///
162    /// The worker drops its current JsRuntime and creates a fresh one.
163    /// It responds with [`ChildMessage::ResetComplete`].
164    Reset {
165        /// New worker configuration for the next execution.
166        config: WorkerConfig,
167    },
168    /// Response to a stash operation from the child.
169    StashResult {
170        /// Matches the request_id from the stash request.
171        request_id: u64,
172        /// The stash operation result, or a structured dispatch error.
173        result: Result<Value, IpcDispatchError>,
174    },
175}
176
177/// Messages sent from the worker child to the parent process.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179#[serde(tag = "type")]
180#[non_exhaustive]
181pub enum ChildMessage {
182    /// Request the parent to dispatch a tool call.
183    ToolCallRequest {
184        /// Unique ID for correlating request ↔ response.
185        request_id: u64,
186        /// Target server name.
187        server: String,
188        /// Tool identifier.
189        tool: String,
190        /// Tool arguments.
191        args: Value,
192    },
193    /// Request the parent to read a resource.
194    ResourceReadRequest {
195        /// Unique ID for correlating request ↔ response.
196        request_id: u64,
197        /// Target server name.
198        server: String,
199        /// Resource URI.
200        uri: String,
201    },
202    /// Request the parent to put a value in the stash.
203    StashPut {
204        /// Unique ID for correlating request ↔ response.
205        request_id: u64,
206        /// Stash key.
207        key: String,
208        /// Value to store.
209        value: Value,
210        /// TTL in seconds (None = use default).
211        ttl_secs: Option<u32>,
212        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
213        #[serde(default, skip_serializing_if = "Option::is_none")]
214        group: Option<String>,
215    },
216    /// Request the parent to get a value from the stash.
217    StashGet {
218        /// Unique ID for correlating request ↔ response.
219        request_id: u64,
220        /// Stash key.
221        key: String,
222        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
223        #[serde(default, skip_serializing_if = "Option::is_none")]
224        group: Option<String>,
225    },
226    /// Request the parent to delete a value from the stash.
227    StashDelete {
228        /// Unique ID for correlating request ↔ response.
229        request_id: u64,
230        /// Stash key.
231        key: String,
232        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
233        #[serde(default, skip_serializing_if = "Option::is_none")]
234        group: Option<String>,
235    },
236    /// Request the parent to list stash keys.
237    StashKeys {
238        /// Unique ID for correlating request ↔ response.
239        request_id: u64,
240        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
241        #[serde(default, skip_serializing_if = "Option::is_none")]
242        group: Option<String>,
243    },
244    /// Worker has been reset and is ready for a new execution.
245    ResetComplete,
246    /// The execution has finished.
247    ExecutionComplete {
248        /// The result value, or an error message.
249        result: Result<Value, String>,
250        /// Classification of the error for typed reconstruction on the parent side.
251        /// Present only when `result` is `Err`. Defaults to `JsError` if absent
252        /// (backward compatibility with workers that don't send this field).
253        #[serde(default, skip_serializing_if = "Option::is_none")]
254        error_kind: Option<ErrorKind>,
255        /// Structured timeout value in milliseconds (v0.3.1+).
256        /// Present only when `error_kind` is `Timeout`. Replaces fragile string parsing.
257        /// Absent in v0.3.0 workers → None.
258        #[serde(default, skip_serializing_if = "Option::is_none")]
259        timeout_ms: Option<u64>,
260    },
261    /// A log message from the worker.
262    Log {
263        /// The log message text.
264        message: String,
265    },
266}
267
268/// Configuration passed to the worker process.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct WorkerConfig {
271    /// Maximum execution time.
272    pub timeout_ms: u64,
273    /// V8 heap limit in bytes.
274    pub max_heap_size: usize,
275    /// Maximum tool calls per execution.
276    pub max_tool_calls: usize,
277    /// Maximum size of tool call arguments in bytes.
278    pub max_tool_call_args_size: usize,
279    /// Maximum size of the JSON result in bytes.
280    pub max_output_size: usize,
281    /// Maximum size of LLM-generated code in bytes.
282    pub max_code_size: usize,
283    /// Maximum IPC message size in bytes. Defaults to [`DEFAULT_MAX_IPC_MESSAGE_SIZE`].
284    #[serde(default = "default_max_ipc_message_size")]
285    pub max_ipc_message_size: usize,
286    /// Maximum resource content size in bytes.
287    #[serde(default = "default_max_resource_size")]
288    pub max_resource_size: usize,
289    /// Maximum concurrent calls in forge.parallel().
290    #[serde(default = "default_max_parallel")]
291    pub max_parallel: usize,
292    /// Known tools for structured error fuzzy matching (v0.3.1+).
293    /// Each entry is `(server_name, tool_name)`.
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub known_tools: Option<Vec<(String, String)>>,
296    /// Known server names for structured error detection (v0.3.1+).
297    #[serde(default, skip_serializing_if = "Option::is_none")]
298    pub known_servers: Option<std::collections::HashSet<String>>,
299}
300
301fn default_max_ipc_message_size() -> usize {
302    DEFAULT_MAX_IPC_MESSAGE_SIZE
303}
304
305fn default_max_resource_size() -> usize {
306    64 * 1024 * 1024 // 64 MB
307}
308
309fn default_max_parallel() -> usize {
310    8
311}
312
313impl From<&crate::SandboxConfig> for WorkerConfig {
314    fn from(config: &crate::SandboxConfig) -> Self {
315        Self {
316            timeout_ms: config.timeout.as_millis() as u64,
317            max_heap_size: config.max_heap_size,
318            max_tool_calls: config.max_tool_calls,
319            max_tool_call_args_size: config.max_tool_call_args_size,
320            max_output_size: config.max_output_size,
321            max_code_size: config.max_code_size,
322            max_ipc_message_size: config.max_ipc_message_size,
323            max_resource_size: config.max_resource_size,
324            max_parallel: config.max_parallel,
325            known_tools: None,
326            known_servers: None,
327        }
328    }
329}
330
331impl WorkerConfig {
332    /// Convert back to a SandboxConfig for use in the worker.
333    pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
334        crate::SandboxConfig {
335            timeout: Duration::from_millis(self.timeout_ms),
336            max_code_size: self.max_code_size,
337            max_output_size: self.max_output_size,
338            max_heap_size: self.max_heap_size,
339            max_concurrent: 1, // worker handles one execution
340            max_tool_calls: self.max_tool_calls,
341            max_tool_call_args_size: self.max_tool_call_args_size,
342            execution_mode: crate::executor::ExecutionMode::InProcess, // worker always runs in-process
343            max_resource_size: self.max_resource_size,
344            max_parallel: self.max_parallel,
345            max_ipc_message_size: self.max_ipc_message_size,
346        }
347    }
348}
349
350/// Write a length-delimited JSON message to an async writer.
351///
352/// Format: 4-byte big-endian length prefix followed by the JSON payload bytes.
353pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
354    writer: &mut W,
355    msg: &T,
356) -> Result<(), std::io::Error> {
357    let payload = serde_json::to_vec(msg)
358        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
359    let len = u32::try_from(payload.len()).map_err(|_| {
360        std::io::Error::new(
361            std::io::ErrorKind::InvalidData,
362            format!(
363                "IPC payload too large: {} bytes (max {} bytes)",
364                payload.len(),
365                u32::MAX
366            ),
367        )
368    })?;
369    writer.write_all(&len.to_be_bytes()).await?;
370    writer.write_all(&payload).await?;
371    writer.flush().await?;
372    Ok(())
373}
374
375/// Write a raw JSON byte payload as a length-delimited IPC message.
376///
377/// This bypasses serialization entirely — useful for forwarding large
378/// tool/resource results without deserializing and re-serializing.
379pub async fn write_raw_message<W: AsyncWrite + Unpin>(
380    writer: &mut W,
381    payload: &[u8],
382) -> Result<(), std::io::Error> {
383    let len = u32::try_from(payload.len()).map_err(|_| {
384        std::io::Error::new(
385            std::io::ErrorKind::InvalidData,
386            format!(
387                "raw IPC payload too large: {} bytes (max {} bytes)",
388                payload.len(),
389                u32::MAX
390            ),
391        )
392    })?;
393    writer.write_all(&len.to_be_bytes()).await?;
394    writer.write_all(payload).await?;
395    writer.flush().await?;
396    Ok(())
397}
398
399/// Read a raw JSON byte payload from an IPC message without deserializing.
400///
401/// Returns the raw bytes as an owned `Box<RawValue>` which can be forwarded
402/// without parsing. Returns `None` on EOF.
403pub async fn read_raw_message<R: AsyncRead + Unpin>(
404    reader: &mut R,
405    max_size: usize,
406) -> Result<Option<Box<RawValue>>, std::io::Error> {
407    let mut len_buf = [0u8; 4];
408    match reader.read_exact(&mut len_buf).await {
409        Ok(_) => {}
410        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
411        Err(e) => return Err(e),
412    }
413
414    let len = u32::from_be_bytes(len_buf) as usize;
415
416    if len > max_size {
417        return Err(std::io::Error::new(
418            std::io::ErrorKind::InvalidData,
419            format!(
420                "raw IPC message too large: {} bytes (limit: {} bytes)",
421                len, max_size
422            ),
423        ));
424    }
425
426    let mut payload = vec![0u8; len];
427    reader.read_exact(&mut payload).await?;
428
429    let raw: Box<RawValue> = serde_json::from_slice(&payload)
430        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
431    Ok(Some(raw))
432}
433
434/// Default maximum IPC message size: 8 MB.
435///
436/// Reduced from 64 MB to prevent single messages from causing memory pressure.
437/// Configurable via `sandbox.max_ipc_message_size_mb` in config.
438pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
439
440/// Read a length-delimited JSON message from an async reader.
441///
442/// Returns `None` if the reader has reached EOF (clean shutdown).
443/// Uses [`DEFAULT_MAX_IPC_MESSAGE_SIZE`] as the size limit.
444pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
445    reader: &mut R,
446) -> Result<Option<T>, std::io::Error> {
447    read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
448}
449
450/// Read a length-delimited JSON message with a configurable size limit.
451///
452/// Returns `None` if the reader has reached EOF (clean shutdown).
453/// The `max_size` parameter controls the maximum allowed message size in bytes.
454pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
455    reader: &mut R,
456    max_size: usize,
457) -> Result<Option<T>, std::io::Error> {
458    let mut len_buf = [0u8; 4];
459    match reader.read_exact(&mut len_buf).await {
460        Ok(_) => {}
461        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
462        Err(e) => return Err(e),
463    }
464
465    let len = u32::from_be_bytes(len_buf) as usize;
466
467    // Reject messages exceeding the configured limit
468    if len > max_size {
469        return Err(std::io::Error::new(
470            std::io::ErrorKind::InvalidData,
471            format!(
472                "IPC message too large: {} bytes (limit: {} bytes)",
473                len, max_size
474            ),
475        ));
476    }
477
478    let mut payload = vec![0u8; len];
479    reader.read_exact(&mut payload).await?;
480
481    let msg: T = serde_json::from_slice(&payload)
482        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
483    Ok(Some(msg))
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use std::io::Cursor;
490
491    #[tokio::test]
492    async fn roundtrip_parent_execute_message() {
493        let msg = ParentMessage::Execute {
494            code: "async () => { return 42; }".into(),
495            manifest: Some(serde_json::json!({"servers": []})),
496            config: WorkerConfig {
497                timeout_ms: 5000,
498                max_heap_size: 64 * 1024 * 1024,
499                max_tool_calls: 50,
500                max_tool_call_args_size: 1024 * 1024,
501                max_output_size: 1024 * 1024,
502                max_code_size: 64 * 1024,
503                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
504                max_resource_size: 64 * 1024 * 1024,
505                max_parallel: 8,
506                known_tools: None,
507                known_servers: None,
508            },
509        };
510
511        let mut buf = Vec::new();
512        write_message(&mut buf, &msg).await.unwrap();
513
514        let mut cursor = Cursor::new(buf);
515        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
516
517        match decoded {
518            ParentMessage::Execute {
519                code,
520                manifest,
521                config,
522            } => {
523                assert_eq!(code, "async () => { return 42; }");
524                assert!(manifest.is_some());
525                assert_eq!(config.timeout_ms, 5000);
526            }
527            other => panic!("expected Execute, got: {:?}", other),
528        }
529    }
530
531    #[tokio::test]
532    async fn roundtrip_parent_tool_result() {
533        let msg = ParentMessage::ToolCallResult {
534            request_id: 42,
535            result: Ok(serde_json::json!({"status": "ok"})),
536        };
537
538        let mut buf = Vec::new();
539        write_message(&mut buf, &msg).await.unwrap();
540
541        let mut cursor = Cursor::new(buf);
542        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
543
544        match decoded {
545            ParentMessage::ToolCallResult { request_id, result } => {
546                assert_eq!(request_id, 42);
547                assert!(result.is_ok());
548            }
549            other => panic!("expected ToolCallResult, got: {:?}", other),
550        }
551    }
552
553    #[tokio::test]
554    async fn roundtrip_parent_tool_result_error() {
555        let msg = ParentMessage::ToolCallResult {
556            request_id: 7,
557            result: Err(IpcDispatchError::from_string("connection refused".into())),
558        };
559
560        let mut buf = Vec::new();
561        write_message(&mut buf, &msg).await.unwrap();
562
563        let mut cursor = Cursor::new(buf);
564        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
565
566        match decoded {
567            ParentMessage::ToolCallResult { request_id, result } => {
568                assert_eq!(request_id, 7);
569                let err = result.unwrap_err();
570                assert_eq!(err.message, "connection refused");
571                assert_eq!(err.code, "INTERNAL");
572            }
573            other => panic!("expected ToolCallResult, got: {:?}", other),
574        }
575    }
576
577    #[tokio::test]
578    async fn roundtrip_child_tool_request() {
579        let msg = ChildMessage::ToolCallRequest {
580            request_id: 1,
581            server: "narsil".into(),
582            tool: "ast.parse".into(),
583            args: serde_json::json!({"file": "test.rs"}),
584        };
585
586        let mut buf = Vec::new();
587        write_message(&mut buf, &msg).await.unwrap();
588
589        let mut cursor = Cursor::new(buf);
590        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
591
592        match decoded {
593            ChildMessage::ToolCallRequest {
594                request_id,
595                server,
596                tool,
597                args,
598            } => {
599                assert_eq!(request_id, 1);
600                assert_eq!(server, "narsil");
601                assert_eq!(tool, "ast.parse");
602                assert_eq!(args["file"], "test.rs");
603            }
604            other => panic!("expected ToolCallRequest, got: {:?}", other),
605        }
606    }
607
608    #[tokio::test]
609    async fn roundtrip_child_execution_complete() {
610        let msg = ChildMessage::ExecutionComplete {
611            result: Ok(serde_json::json!([1, 2, 3])),
612            error_kind: None,
613            timeout_ms: None,
614        };
615
616        let mut buf = Vec::new();
617        write_message(&mut buf, &msg).await.unwrap();
618
619        let mut cursor = Cursor::new(buf);
620        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
621
622        match decoded {
623            ChildMessage::ExecutionComplete {
624                result, error_kind, ..
625            } => {
626                assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
627                assert_eq!(error_kind, None);
628            }
629            other => panic!("expected ExecutionComplete, got: {:?}", other),
630        }
631    }
632
633    #[tokio::test]
634    async fn roundtrip_child_log() {
635        let msg = ChildMessage::Log {
636            message: "processing step 3".into(),
637        };
638
639        let mut buf = Vec::new();
640        write_message(&mut buf, &msg).await.unwrap();
641
642        let mut cursor = Cursor::new(buf);
643        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
644
645        match decoded {
646            ChildMessage::Log { message } => {
647                assert_eq!(message, "processing step 3");
648            }
649            other => panic!("expected Log, got: {:?}", other),
650        }
651    }
652
653    #[tokio::test]
654    async fn multiple_messages_in_stream() {
655        let msg1 = ChildMessage::Log {
656            message: "first".into(),
657        };
658        let msg2 = ChildMessage::ToolCallRequest {
659            request_id: 1,
660            server: "s".into(),
661            tool: "t".into(),
662            args: serde_json::json!({}),
663        };
664        let msg3 = ChildMessage::ExecutionComplete {
665            result: Ok(serde_json::json!("done")),
666            error_kind: None,
667            timeout_ms: None,
668        };
669
670        let mut buf = Vec::new();
671        write_message(&mut buf, &msg1).await.unwrap();
672        write_message(&mut buf, &msg2).await.unwrap();
673        write_message(&mut buf, &msg3).await.unwrap();
674
675        let mut cursor = Cursor::new(buf);
676        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
677        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
678        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
679
680        assert!(matches!(d1, ChildMessage::Log { .. }));
681        assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
682        assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
683
684        // EOF after all messages
685        let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
686        assert!(d4.is_none());
687    }
688
689    #[tokio::test]
690    async fn execution_complete_error_roundtrip() {
691        let msg = ChildMessage::ExecutionComplete {
692            result: Err("failed to create tokio runtime: resource unavailable".into()),
693            error_kind: Some(ErrorKind::Execution),
694            timeout_ms: None,
695        };
696
697        let mut buf = Vec::new();
698        write_message(&mut buf, &msg).await.unwrap();
699
700        let mut cursor = Cursor::new(buf);
701        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
702
703        match decoded {
704            ChildMessage::ExecutionComplete {
705                result, error_kind, ..
706            } => {
707                let err = result.unwrap_err();
708                assert!(
709                    err.contains("tokio runtime"),
710                    "expected runtime error: {err}"
711                );
712                assert_eq!(error_kind, Some(ErrorKind::Execution));
713            }
714            other => panic!("expected ExecutionComplete, got: {:?}", other),
715        }
716    }
717
718    #[tokio::test]
719    async fn eof_returns_none() {
720        let mut cursor = Cursor::new(Vec::<u8>::new());
721        let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
722        assert!(result.is_none());
723    }
724
725    #[test]
726    fn u32_try_from_overflow() {
727        // Validates that the conversion logic correctly rejects sizes > u32::MAX
728        let overflow_size = u32::MAX as usize + 1;
729        assert!(u32::try_from(overflow_size).is_err());
730    }
731
732    #[tokio::test]
733    async fn write_message_normal_size_succeeds() {
734        // Regression guard: normal-sized messages still work after the try_from change
735        let msg = ChildMessage::Log {
736            message: "a".repeat(1024),
737        };
738        let mut buf = Vec::new();
739        write_message(&mut buf, &msg).await.unwrap();
740        assert!(buf.len() > 1024);
741    }
742
743    #[tokio::test]
744    async fn large_message_roundtrip() {
745        // A large payload (~1MB of data)
746        let large_data = "x".repeat(1_000_000);
747        let msg = ChildMessage::ExecutionComplete {
748            result: Ok(serde_json::json!(large_data)),
749            error_kind: None,
750            timeout_ms: None,
751        };
752
753        let mut buf = Vec::new();
754        write_message(&mut buf, &msg).await.unwrap();
755
756        let mut cursor = Cursor::new(buf);
757        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
758
759        match decoded {
760            ChildMessage::ExecutionComplete { result, .. } => {
761                assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
762            }
763            other => panic!("expected ExecutionComplete, got: {:?}", other),
764        }
765    }
766
767    #[tokio::test]
768    async fn worker_config_roundtrip_from_sandbox_config() {
769        let sandbox = crate::SandboxConfig::default();
770        let worker = WorkerConfig::from(&sandbox);
771        let back = worker.to_sandbox_config();
772
773        assert_eq!(sandbox.timeout, back.timeout);
774        assert_eq!(sandbox.max_heap_size, back.max_heap_size);
775        assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
776        assert_eq!(sandbox.max_output_size, back.max_output_size);
777        assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
778        assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); // 8 MB default
779    }
780
781    #[tokio::test]
782    async fn read_message_with_limit_rejects_oversized() {
783        let msg = ChildMessage::Log {
784            message: "x".repeat(1024),
785        };
786        let mut buf = Vec::new();
787        write_message(&mut buf, &msg).await.unwrap();
788
789        // Set limit smaller than the message payload
790        let mut cursor = Cursor::new(buf);
791        let result: Result<Option<ChildMessage>, _> =
792            read_message_with_limit(&mut cursor, 64).await;
793        assert!(result.is_err());
794        let err_msg = result.unwrap_err().to_string();
795        assert!(err_msg.contains("too large"), "error: {err_msg}");
796    }
797
798    #[tokio::test]
799    async fn read_message_with_limit_accepts_within_limit() {
800        let msg = ChildMessage::Log {
801            message: "hello".into(),
802        };
803        let mut buf = Vec::new();
804        write_message(&mut buf, &msg).await.unwrap();
805
806        let mut cursor = Cursor::new(buf);
807        let result: Option<ChildMessage> =
808            read_message_with_limit(&mut cursor, 1024).await.unwrap();
809        assert!(result.is_some());
810    }
811
812    #[tokio::test]
813    async fn worker_config_ipc_limit_serde_default() {
814        // Deserializing JSON without max_ipc_message_size should use the default
815        let json = r#"{
816            "timeout_ms": 5000,
817            "max_heap_size": 67108864,
818            "max_tool_calls": 50,
819            "max_tool_call_args_size": 1048576,
820            "max_output_size": 1048576,
821            "max_code_size": 65536
822        }"#;
823        let config: WorkerConfig = serde_json::from_str(json).unwrap();
824        assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
825    }
826
827    // --- IPC-01: ResourceReadRequest round-trip ---
828    #[tokio::test]
829    async fn ipc_01_resource_read_request_roundtrip() {
830        let msg = ChildMessage::ResourceReadRequest {
831            request_id: 10,
832            server: "postgres".into(),
833            uri: "file:///logs/app.log".into(),
834        };
835
836        let mut buf = Vec::new();
837        write_message(&mut buf, &msg).await.unwrap();
838
839        let mut cursor = Cursor::new(buf);
840        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
841
842        match decoded {
843            ChildMessage::ResourceReadRequest {
844                request_id,
845                server,
846                uri,
847            } => {
848                assert_eq!(request_id, 10);
849                assert_eq!(server, "postgres");
850                assert_eq!(uri, "file:///logs/app.log");
851            }
852            other => panic!("expected ResourceReadRequest, got: {:?}", other),
853        }
854    }
855
856    // --- IPC-02: ResourceReadResult (success) round-trip ---
857    #[tokio::test]
858    async fn ipc_02_resource_read_result_success_roundtrip() {
859        let msg = ParentMessage::ResourceReadResult {
860            request_id: 11,
861            result: Ok(serde_json::json!({"content": "log data here"})),
862        };
863
864        let mut buf = Vec::new();
865        write_message(&mut buf, &msg).await.unwrap();
866
867        let mut cursor = Cursor::new(buf);
868        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
869
870        match decoded {
871            ParentMessage::ResourceReadResult { request_id, result } => {
872                assert_eq!(request_id, 11);
873                let val = result.unwrap();
874                assert_eq!(val["content"], "log data here");
875            }
876            other => panic!("expected ResourceReadResult, got: {:?}", other),
877        }
878    }
879
880    // --- IPC-03: ResourceReadResult (error) round-trip ---
881    #[tokio::test]
882    async fn ipc_03_resource_read_result_error_roundtrip() {
883        let msg = ParentMessage::ResourceReadResult {
884            request_id: 12,
885            result: Err(IpcDispatchError::from_string("resource not found".into())),
886        };
887
888        let mut buf = Vec::new();
889        write_message(&mut buf, &msg).await.unwrap();
890
891        let mut cursor = Cursor::new(buf);
892        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
893
894        match decoded {
895            ParentMessage::ResourceReadResult { request_id, result } => {
896                assert_eq!(request_id, 12);
897                let err = result.unwrap_err();
898                assert_eq!(err.message, "resource not found");
899                assert_eq!(err.code, "INTERNAL");
900            }
901            other => panic!("expected ResourceReadResult, got: {:?}", other),
902        }
903    }
904
905    // --- IPC-04: StashPut round-trip ---
906    #[tokio::test]
907    async fn ipc_04_stash_put_roundtrip() {
908        let msg = ChildMessage::StashPut {
909            request_id: 20,
910            key: "my-key".into(),
911            value: serde_json::json!({"data": 42}),
912            ttl_secs: Some(60),
913            group: None,
914        };
915
916        let mut buf = Vec::new();
917        write_message(&mut buf, &msg).await.unwrap();
918
919        let mut cursor = Cursor::new(buf);
920        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
921
922        match decoded {
923            ChildMessage::StashPut {
924                request_id,
925                key,
926                value,
927                ttl_secs,
928                group,
929            } => {
930                assert_eq!(request_id, 20);
931                assert_eq!(key, "my-key");
932                assert_eq!(value["data"], 42);
933                assert_eq!(ttl_secs, Some(60));
934                assert_eq!(group, None);
935            }
936            other => panic!("expected StashPut, got: {:?}", other),
937        }
938    }
939
940    // --- IPC-05: StashGet round-trip ---
941    #[tokio::test]
942    async fn ipc_05_stash_get_roundtrip() {
943        let msg = ChildMessage::StashGet {
944            request_id: 21,
945            key: "lookup-key".into(),
946            group: None,
947        };
948
949        let mut buf = Vec::new();
950        write_message(&mut buf, &msg).await.unwrap();
951
952        let mut cursor = Cursor::new(buf);
953        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
954
955        match decoded {
956            ChildMessage::StashGet {
957                request_id,
958                key,
959                group,
960            } => {
961                assert_eq!(request_id, 21);
962                assert_eq!(key, "lookup-key");
963                assert_eq!(group, None);
964            }
965            other => panic!("expected StashGet, got: {:?}", other),
966        }
967    }
968
969    // --- IPC-06: StashDelete round-trip ---
970    #[tokio::test]
971    async fn ipc_06_stash_delete_roundtrip() {
972        let msg = ChildMessage::StashDelete {
973            request_id: 22,
974            key: "delete-me".into(),
975            group: None,
976        };
977
978        let mut buf = Vec::new();
979        write_message(&mut buf, &msg).await.unwrap();
980
981        let mut cursor = Cursor::new(buf);
982        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
983
984        match decoded {
985            ChildMessage::StashDelete {
986                request_id,
987                key,
988                group,
989            } => {
990                assert_eq!(request_id, 22);
991                assert_eq!(key, "delete-me");
992                assert_eq!(group, None);
993            }
994            other => panic!("expected StashDelete, got: {:?}", other),
995        }
996    }
997
998    // --- IPC-07: StashKeys round-trip ---
999    #[tokio::test]
1000    async fn ipc_07_stash_keys_roundtrip() {
1001        let msg = ChildMessage::StashKeys {
1002            request_id: 23,
1003            group: None,
1004        };
1005
1006        let mut buf = Vec::new();
1007        write_message(&mut buf, &msg).await.unwrap();
1008
1009        let mut cursor = Cursor::new(buf);
1010        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1011
1012        match decoded {
1013            ChildMessage::StashKeys { request_id, group } => {
1014                assert_eq!(request_id, 23);
1015                assert_eq!(group, None);
1016            }
1017            other => panic!("expected StashKeys, got: {:?}", other),
1018        }
1019    }
1020
1021    // --- IPC-08: StashResult round-trip ---
1022    #[tokio::test]
1023    async fn ipc_08_stash_result_roundtrip() {
1024        let msg = ParentMessage::StashResult {
1025            request_id: 24,
1026            result: Ok(serde_json::json!({"ok": true})),
1027        };
1028
1029        let mut buf = Vec::new();
1030        write_message(&mut buf, &msg).await.unwrap();
1031
1032        let mut cursor = Cursor::new(buf);
1033        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1034
1035        match decoded {
1036            ParentMessage::StashResult { request_id, result } => {
1037                assert_eq!(request_id, 24);
1038                assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1039            }
1040            other => panic!("expected StashResult, got: {:?}", other),
1041        }
1042    }
1043
1044    // --- IPC-09: Mixed message interleaving (tool + resource + stash in single stream) ---
1045    #[tokio::test]
1046    async fn ipc_09_mixed_message_interleaving() {
1047        let msg1 = ChildMessage::ToolCallRequest {
1048            request_id: 1,
1049            server: "s".into(),
1050            tool: "t".into(),
1051            args: serde_json::json!({}),
1052        };
1053        let msg2 = ChildMessage::ResourceReadRequest {
1054            request_id: 2,
1055            server: "pg".into(),
1056            uri: "file:///data".into(),
1057        };
1058        let msg3 = ChildMessage::StashPut {
1059            request_id: 3,
1060            key: "k".into(),
1061            value: serde_json::json!("v"),
1062            ttl_secs: None,
1063            group: None,
1064        };
1065        let msg4 = ChildMessage::StashGet {
1066            request_id: 4,
1067            key: "k".into(),
1068            group: None,
1069        };
1070        let msg5 = ChildMessage::ExecutionComplete {
1071            result: Ok(serde_json::json!("done")),
1072            error_kind: None,
1073            timeout_ms: None,
1074        };
1075
1076        let mut buf = Vec::new();
1077        write_message(&mut buf, &msg1).await.unwrap();
1078        write_message(&mut buf, &msg2).await.unwrap();
1079        write_message(&mut buf, &msg3).await.unwrap();
1080        write_message(&mut buf, &msg4).await.unwrap();
1081        write_message(&mut buf, &msg5).await.unwrap();
1082
1083        let mut cursor = Cursor::new(buf);
1084        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1085        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1086        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1087        let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1088        let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1089
1090        assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1091        assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1092        assert!(matches!(d3, ChildMessage::StashPut { .. }));
1093        assert!(matches!(d4, ChildMessage::StashGet { .. }));
1094        assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1095
1096        // EOF after all messages
1097        let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1098        assert!(d6.is_none());
1099    }
1100
1101    // --- IPC-P01: Reset round-trip ---
1102    #[tokio::test]
1103    async fn ipc_p01_reset_roundtrip() {
1104        let msg = ParentMessage::Reset {
1105            config: WorkerConfig {
1106                timeout_ms: 3000,
1107                max_heap_size: 32 * 1024 * 1024,
1108                max_tool_calls: 25,
1109                max_tool_call_args_size: 512 * 1024,
1110                max_output_size: 512 * 1024,
1111                max_code_size: 32 * 1024,
1112                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1113                max_resource_size: 32 * 1024 * 1024,
1114                max_parallel: 4,
1115                known_tools: None,
1116                known_servers: None,
1117            },
1118        };
1119
1120        let mut buf = Vec::new();
1121        write_message(&mut buf, &msg).await.unwrap();
1122
1123        let mut cursor = Cursor::new(buf);
1124        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1125
1126        match decoded {
1127            ParentMessage::Reset { config } => {
1128                assert_eq!(config.timeout_ms, 3000);
1129                assert_eq!(config.max_tool_calls, 25);
1130            }
1131            other => panic!("expected Reset, got: {:?}", other),
1132        }
1133    }
1134
1135    // --- IPC-P02: ResetComplete round-trip ---
1136    #[tokio::test]
1137    async fn ipc_p02_reset_complete_roundtrip() {
1138        let msg = ChildMessage::ResetComplete;
1139
1140        let mut buf = Vec::new();
1141        write_message(&mut buf, &msg).await.unwrap();
1142
1143        let mut cursor = Cursor::new(buf);
1144        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1145
1146        assert!(matches!(decoded, ChildMessage::ResetComplete));
1147    }
1148
1149    // --- IPC-P03: Reset + Execute interleaving in single stream ---
1150    #[tokio::test]
1151    async fn ipc_p03_reset_execute_interleaving() {
1152        let reset = ParentMessage::Reset {
1153            config: WorkerConfig {
1154                timeout_ms: 5000,
1155                max_heap_size: 64 * 1024 * 1024,
1156                max_tool_calls: 50,
1157                max_tool_call_args_size: 1024 * 1024,
1158                max_output_size: 1024 * 1024,
1159                max_code_size: 64 * 1024,
1160                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1161                max_resource_size: 64 * 1024 * 1024,
1162                max_parallel: 8,
1163                known_tools: None,
1164                known_servers: None,
1165            },
1166        };
1167        let execute = ParentMessage::Execute {
1168            code: "async () => 42".into(),
1169            manifest: None,
1170            config: WorkerConfig {
1171                timeout_ms: 5000,
1172                max_heap_size: 64 * 1024 * 1024,
1173                max_tool_calls: 50,
1174                max_tool_call_args_size: 1024 * 1024,
1175                max_output_size: 1024 * 1024,
1176                max_code_size: 64 * 1024,
1177                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1178                max_resource_size: 64 * 1024 * 1024,
1179                max_parallel: 8,
1180                known_tools: None,
1181                known_servers: None,
1182            },
1183        };
1184
1185        let mut buf = Vec::new();
1186        write_message(&mut buf, &reset).await.unwrap();
1187        write_message(&mut buf, &execute).await.unwrap();
1188
1189        let mut cursor = Cursor::new(buf);
1190        let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1191        let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1192
1193        assert!(matches!(d1, ParentMessage::Reset { .. }));
1194        assert!(matches!(d2, ParentMessage::Execute { .. }));
1195    }
1196
1197    // --- IPC-10: Oversized stash message rejected by read_message_with_limit ---
1198    #[tokio::test]
1199    async fn ipc_10_oversized_stash_message_rejected() {
1200        let msg = ChildMessage::StashPut {
1201            request_id: 100,
1202            key: "k".into(),
1203            value: serde_json::json!("x".repeat(2048)),
1204            ttl_secs: Some(60),
1205            group: None,
1206        };
1207        let mut buf = Vec::new();
1208        write_message(&mut buf, &msg).await.unwrap();
1209
1210        // Set limit smaller than the message payload
1211        let mut cursor = Cursor::new(buf);
1212        let result: Result<Option<ChildMessage>, _> =
1213            read_message_with_limit(&mut cursor, 64).await;
1214        assert!(result.is_err());
1215        let err_msg = result.unwrap_err().to_string();
1216        assert!(
1217            err_msg.contains("too large"),
1218            "error should mention 'too large': {err_msg}"
1219        );
1220    }
1221
1222    // --- IPC-O01: ErrorKind timeout round-trip ---
1223    #[tokio::test]
1224    async fn ipc_o01_error_kind_timeout_roundtrip() {
1225        let msg = ChildMessage::ExecutionComplete {
1226            result: Err("execution timed out after 500ms".into()),
1227            error_kind: Some(ErrorKind::Timeout),
1228            timeout_ms: Some(500),
1229        };
1230
1231        let mut buf = Vec::new();
1232        write_message(&mut buf, &msg).await.unwrap();
1233
1234        let mut cursor = Cursor::new(buf);
1235        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1236
1237        match decoded {
1238            ChildMessage::ExecutionComplete {
1239                result,
1240                error_kind,
1241                timeout_ms,
1242            } => {
1243                assert!(result.is_err());
1244                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1245                assert_eq!(timeout_ms, Some(500));
1246            }
1247            other => panic!("expected ExecutionComplete, got: {:?}", other),
1248        }
1249    }
1250
1251    // --- IPC-O02: ErrorKind heap_limit round-trip ---
1252    #[tokio::test]
1253    async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1254        let msg = ChildMessage::ExecutionComplete {
1255            result: Err("V8 heap limit exceeded".into()),
1256            error_kind: Some(ErrorKind::HeapLimit),
1257            timeout_ms: None,
1258        };
1259
1260        let mut buf = Vec::new();
1261        write_message(&mut buf, &msg).await.unwrap();
1262
1263        let mut cursor = Cursor::new(buf);
1264        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1265
1266        match decoded {
1267            ChildMessage::ExecutionComplete {
1268                result, error_kind, ..
1269            } => {
1270                assert!(result.is_err());
1271                assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1272            }
1273            other => panic!("expected ExecutionComplete, got: {:?}", other),
1274        }
1275    }
1276
1277    // --- IPC-O03: ErrorKind absent defaults to None (backward compatibility) ---
1278    #[tokio::test]
1279    async fn ipc_o03_error_kind_backward_compat() {
1280        // Simulate a message from an older worker that doesn't include error_kind.
1281        // The JSON doesn't have the error_kind field at all.
1282        let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1283        let mut buf = Vec::new();
1284        let payload = json.as_bytes();
1285        let len = payload.len() as u32;
1286        buf.extend_from_slice(&len.to_be_bytes());
1287        buf.extend_from_slice(payload);
1288
1289        let mut cursor = Cursor::new(buf);
1290        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1291
1292        match decoded {
1293            ChildMessage::ExecutionComplete {
1294                result,
1295                error_kind,
1296                timeout_ms,
1297            } => {
1298                assert!(result.is_err());
1299                assert_eq!(
1300                    error_kind, None,
1301                    "missing error_kind should default to None"
1302                );
1303                assert_eq!(
1304                    timeout_ms, None,
1305                    "missing timeout_ms should default to None"
1306                );
1307            }
1308            other => panic!("expected ExecutionComplete, got: {:?}", other),
1309        }
1310    }
1311
1312    // --- IPC-O04: ErrorKind js_error round-trip ---
1313    #[tokio::test]
1314    async fn ipc_o04_error_kind_js_error_roundtrip() {
1315        let msg = ChildMessage::ExecutionComplete {
1316            result: Err("ReferenceError: x is not defined".into()),
1317            error_kind: Some(ErrorKind::JsError),
1318            timeout_ms: None,
1319        };
1320
1321        let mut buf = Vec::new();
1322        write_message(&mut buf, &msg).await.unwrap();
1323
1324        let mut cursor = Cursor::new(buf);
1325        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1326
1327        match decoded {
1328            ChildMessage::ExecutionComplete {
1329                result, error_kind, ..
1330            } => {
1331                assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1332                assert_eq!(error_kind, Some(ErrorKind::JsError));
1333            }
1334            other => panic!("expected ExecutionComplete, got: {:?}", other),
1335        }
1336    }
1337
1338    // --- IPC-O05: Success result has no error_kind in serialized JSON ---
1339    #[tokio::test]
1340    async fn ipc_o05_success_omits_error_kind() {
1341        let msg = ChildMessage::ExecutionComplete {
1342            result: Ok(serde_json::json!(42)),
1343            error_kind: None,
1344            timeout_ms: None,
1345        };
1346
1347        let json = serde_json::to_string(&msg).unwrap();
1348        // error_kind: None should be skipped thanks to skip_serializing_if
1349        assert!(
1350            !json.contains("error_kind"),
1351            "success messages should not contain error_kind field: {json}"
1352        );
1353        assert!(
1354            !json.contains("timeout_ms"),
1355            "success messages should not contain timeout_ms field: {json}"
1356        );
1357    }
1358
1359    // --- H1: Stash Group Isolation Tests ---
1360
1361    #[tokio::test]
1362    async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1363        let msg = ChildMessage::StashPut {
1364            request_id: 50,
1365            key: "grouped-key".into(),
1366            value: serde_json::json!({"data": "secret"}),
1367            ttl_secs: Some(120),
1368            group: Some("analytics".into()),
1369        };
1370
1371        let mut buf = Vec::new();
1372        write_message(&mut buf, &msg).await.unwrap();
1373
1374        let mut cursor = Cursor::new(buf);
1375        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1376
1377        match decoded {
1378            ChildMessage::StashPut {
1379                request_id,
1380                key,
1381                group,
1382                ..
1383            } => {
1384                assert_eq!(request_id, 50);
1385                assert_eq!(key, "grouped-key");
1386                assert_eq!(group, Some("analytics".into()));
1387            }
1388            other => panic!("expected StashPut, got: {:?}", other),
1389        }
1390    }
1391
1392    #[tokio::test]
1393    async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1394        let msg = ChildMessage::StashGet {
1395            request_id: 51,
1396            key: "grouped-key".into(),
1397            group: Some("analytics".into()),
1398        };
1399
1400        let mut buf = Vec::new();
1401        write_message(&mut buf, &msg).await.unwrap();
1402
1403        let mut cursor = Cursor::new(buf);
1404        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1405
1406        match decoded {
1407            ChildMessage::StashGet {
1408                request_id,
1409                key,
1410                group,
1411            } => {
1412                assert_eq!(request_id, 51);
1413                assert_eq!(key, "grouped-key");
1414                assert_eq!(group, Some("analytics".into()));
1415            }
1416            other => panic!("expected StashGet, got: {:?}", other),
1417        }
1418    }
1419
1420    #[tokio::test]
1421    async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1422        let msg = ChildMessage::StashDelete {
1423            request_id: 52,
1424            key: "grouped-key".into(),
1425            group: Some("analytics".into()),
1426        };
1427
1428        let mut buf = Vec::new();
1429        write_message(&mut buf, &msg).await.unwrap();
1430
1431        let mut cursor = Cursor::new(buf);
1432        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1433
1434        match decoded {
1435            ChildMessage::StashDelete {
1436                request_id,
1437                key,
1438                group,
1439            } => {
1440                assert_eq!(request_id, 52);
1441                assert_eq!(key, "grouped-key");
1442                assert_eq!(group, Some("analytics".into()));
1443            }
1444            other => panic!("expected StashDelete, got: {:?}", other),
1445        }
1446    }
1447
1448    #[tokio::test]
1449    async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1450        let msg = ChildMessage::StashKeys {
1451            request_id: 53,
1452            group: Some("analytics".into()),
1453        };
1454
1455        let mut buf = Vec::new();
1456        write_message(&mut buf, &msg).await.unwrap();
1457
1458        let mut cursor = Cursor::new(buf);
1459        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1460
1461        match decoded {
1462            ChildMessage::StashKeys { request_id, group } => {
1463                assert_eq!(request_id, 53);
1464                assert_eq!(group, Some("analytics".into()));
1465            }
1466            other => panic!("expected StashKeys, got: {:?}", other),
1467        }
1468    }
1469
1470    #[tokio::test]
1471    async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1472        // group: None → field absent in JSON
1473        let msg = ChildMessage::StashPut {
1474            request_id: 54,
1475            key: "no-group-key".into(),
1476            value: serde_json::json!("val"),
1477            ttl_secs: None,
1478            group: None,
1479        };
1480
1481        let json = serde_json::to_string(&msg).unwrap();
1482        assert!(
1483            !json.contains("\"group\""),
1484            "group:None should be absent in serialized JSON: {json}"
1485        );
1486
1487        // Still deserializes correctly
1488        let mut buf = Vec::new();
1489        write_message(&mut buf, &msg).await.unwrap();
1490        let mut cursor = Cursor::new(buf);
1491        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1492        match decoded {
1493            ChildMessage::StashPut { group, .. } => {
1494                assert_eq!(group, None);
1495            }
1496            other => panic!("expected StashPut, got: {:?}", other),
1497        }
1498    }
1499
1500    #[tokio::test]
1501    async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1502        // Simulate a v0.3.0 worker message that lacks the group field entirely
1503        let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1504        let mut buf = Vec::new();
1505        let payload = json.as_bytes();
1506        let len = payload.len() as u32;
1507        buf.extend_from_slice(&len.to_be_bytes());
1508        buf.extend_from_slice(payload);
1509
1510        let mut cursor = Cursor::new(buf);
1511        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1512
1513        match decoded {
1514            ChildMessage::StashPut {
1515                request_id,
1516                key,
1517                group,
1518                ..
1519            } => {
1520                assert_eq!(request_id, 60);
1521                assert_eq!(key, "old-key");
1522                assert_eq!(
1523                    group, None,
1524                    "missing group field from v0.3.0 worker should deserialize as None"
1525                );
1526            }
1527            other => panic!("expected StashPut, got: {:?}", other),
1528        }
1529
1530        // Also test StashGet, StashDelete, StashKeys backward compat
1531        let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1532        let mut buf = Vec::new();
1533        let payload = json_get.as_bytes();
1534        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1535        buf.extend_from_slice(payload);
1536        let mut cursor = Cursor::new(buf);
1537        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1538        match decoded {
1539            ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1540            other => panic!("expected StashGet, got: {:?}", other),
1541        }
1542
1543        let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1544        let mut buf = Vec::new();
1545        let payload = json_del.as_bytes();
1546        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1547        buf.extend_from_slice(payload);
1548        let mut cursor = Cursor::new(buf);
1549        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1550        match decoded {
1551            ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1552            other => panic!("expected StashDelete, got: {:?}", other),
1553        }
1554
1555        let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1556        let mut buf = Vec::new();
1557        let payload = json_keys.as_bytes();
1558        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1559        buf.extend_from_slice(payload);
1560        let mut cursor = Cursor::new(buf);
1561        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1562        match decoded {
1563            ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1564            other => panic!("expected StashKeys, got: {:?}", other),
1565        }
1566    }
1567
1568    // --- Phase 2: Structured Timeout + RawValue Tests ---
1569
1570    #[tokio::test]
1571    async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1572        let msg = ChildMessage::ExecutionComplete {
1573            result: Err("execution timed out after 5000ms".into()),
1574            error_kind: Some(ErrorKind::Timeout),
1575            timeout_ms: Some(5000),
1576        };
1577
1578        let mut buf = Vec::new();
1579        write_message(&mut buf, &msg).await.unwrap();
1580
1581        let mut cursor = Cursor::new(buf);
1582        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1583
1584        match decoded {
1585            ChildMessage::ExecutionComplete {
1586                result,
1587                error_kind,
1588                timeout_ms,
1589            } => {
1590                assert!(result.is_err());
1591                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1592                assert_eq!(timeout_ms, Some(5000));
1593            }
1594            other => panic!("expected ExecutionComplete, got: {:?}", other),
1595        }
1596    }
1597
1598    #[tokio::test]
1599    async fn ipc_t02_timeout_ms_absent_backward_compat() {
1600        // Simulate an old v0.3.0 worker that doesn't include timeout_ms
1601        let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1602        let mut buf = Vec::new();
1603        let payload = json.as_bytes();
1604        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1605        buf.extend_from_slice(payload);
1606
1607        let mut cursor = Cursor::new(buf);
1608        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1609
1610        match decoded {
1611            ChildMessage::ExecutionComplete {
1612                error_kind,
1613                timeout_ms,
1614                ..
1615            } => {
1616                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1617                assert_eq!(
1618                    timeout_ms, None,
1619                    "missing timeout_ms should default to None"
1620                );
1621            }
1622            other => panic!("expected ExecutionComplete, got: {:?}", other),
1623        }
1624    }
1625
1626    #[tokio::test]
1627    async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1628        let msg = ChildMessage::ExecutionComplete {
1629            result: Err("some error".into()),
1630            error_kind: Some(ErrorKind::JsError),
1631            timeout_ms: None,
1632        };
1633
1634        let json = serde_json::to_string(&msg).unwrap();
1635        assert!(
1636            !json.contains("timeout_ms"),
1637            "timeout_ms:None should be omitted: {json}"
1638        );
1639    }
1640
1641    #[tokio::test]
1642    async fn ipc_t04_timeout_ms_present_when_some() {
1643        let msg = ChildMessage::ExecutionComplete {
1644            result: Err("timed out".into()),
1645            error_kind: Some(ErrorKind::Timeout),
1646            timeout_ms: Some(10000),
1647        };
1648
1649        let json = serde_json::to_string(&msg).unwrap();
1650        assert!(
1651            json.contains("\"timeout_ms\":10000"),
1652            "timeout_ms should be present: {json}"
1653        );
1654    }
1655
1656    // --- RawValue passthrough tests ---
1657
1658    #[tokio::test]
1659    async fn ipc_rv01_write_raw_message_roundtrip() {
1660        let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1661
1662        let mut buf = Vec::new();
1663        write_raw_message(&mut buf, payload).await.unwrap();
1664
1665        let mut cursor = Cursor::new(buf);
1666        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1667            .await
1668            .unwrap()
1669            .unwrap();
1670
1671        assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1672    }
1673
1674    #[tokio::test]
1675    async fn ipc_rv02_read_raw_message_preserves_bytes() {
1676        // Write a regular message, then read it raw
1677        let msg = ChildMessage::Log {
1678            message: "test raw".into(),
1679        };
1680        let mut buf = Vec::new();
1681        write_message(&mut buf, &msg).await.unwrap();
1682
1683        let mut cursor = Cursor::new(buf);
1684        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1685            .await
1686            .unwrap()
1687            .unwrap();
1688
1689        // The raw value should be valid JSON
1690        let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1691        assert!(matches!(parsed, ChildMessage::Log { .. }));
1692    }
1693
1694    #[tokio::test]
1695    async fn ipc_rv03_raw_message_size_limit_enforced() {
1696        let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1697        let mut buf = Vec::new();
1698        write_raw_message(&mut buf, large_payload.as_bytes())
1699            .await
1700            .unwrap();
1701
1702        let mut cursor = Cursor::new(buf);
1703        let result = read_raw_message(&mut cursor, 64).await;
1704        assert!(result.is_err());
1705        let err = result.unwrap_err().to_string();
1706        assert!(err.contains("too large"), "error: {err}");
1707    }
1708
1709    #[tokio::test]
1710    async fn ipc_rv04_large_payload_stays_raw() {
1711        // 1MB payload — read as raw without full Value parse
1712        let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1713        let mut buf = Vec::new();
1714        write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1715
1716        let mut cursor = Cursor::new(buf);
1717        let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1718            .await
1719            .unwrap()
1720            .unwrap();
1721
1722        // Should preserve the raw JSON string without parsing into Value
1723        assert!(raw.get().len() > 1_000_000);
1724        // Can still be parsed if needed
1725        let val: Value = serde_json::from_str(raw.get()).unwrap();
1726        assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1727    }
1728
1729    #[tokio::test]
1730    async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1731        // Write as regular message (Value), read as raw
1732        let msg = ParentMessage::ToolCallResult {
1733            request_id: 99,
1734            result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1735        };
1736        let mut buf = Vec::new();
1737        write_message(&mut buf, &msg).await.unwrap();
1738
1739        let mut cursor = Cursor::new(buf.clone());
1740        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1741            .await
1742            .unwrap()
1743            .unwrap();
1744
1745        // Parse the raw back as ParentMessage
1746        let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1747        match parsed {
1748            ParentMessage::ToolCallResult { request_id, result } => {
1749                assert_eq!(request_id, 99);
1750                assert!(result.is_ok());
1751            }
1752            other => panic!("expected ToolCallResult, got: {:?}", other),
1753        }
1754    }
1755
1756    #[tokio::test]
1757    async fn ipc_rv06_raw_eof_returns_none() {
1758        let mut cursor = Cursor::new(Vec::<u8>::new());
1759        let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1760            .await
1761            .unwrap();
1762        assert!(result.is_none());
1763    }
1764
1765    #[tokio::test]
1766    async fn ipc_rv07_mixed_raw_and_value_messages() {
1767        let mut buf = Vec::new();
1768
1769        // Write a typed message
1770        let msg1 = ChildMessage::Log {
1771            message: "first".into(),
1772        };
1773        write_message(&mut buf, &msg1).await.unwrap();
1774
1775        // Write a raw message
1776        let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1777        write_raw_message(&mut buf, raw_payload).await.unwrap();
1778
1779        // Read both: first typed, second raw
1780        let mut cursor = Cursor::new(buf);
1781        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1782        assert!(matches!(d1, ChildMessage::Log { .. }));
1783
1784        let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1785            .await
1786            .unwrap()
1787            .unwrap();
1788        let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1789        assert!(matches!(parsed, ChildMessage::Log { .. }));
1790    }
1791
1792    // --- ToolError IPC round-trip tests ---
1793
1794    #[tokio::test]
1795    async fn tool_error_round_trips_through_ipc() {
1796        let original = forge_error::DispatchError::ToolError {
1797            server: "arbiter".into(),
1798            tool: "scan_target".into(),
1799            message: "tool returned error: Invalid params: missing field 'base_url'".into(),
1800        };
1801
1802        // Convert to IPC form
1803        let ipc_err = IpcDispatchError::from(&original);
1804        assert_eq!(ipc_err.code, "TOOL_ERROR");
1805        assert_eq!(ipc_err.server, Some("arbiter".into()));
1806        assert_eq!(ipc_err.tool, Some("scan_target".into()));
1807        assert!(ipc_err.message.contains("Invalid params"));
1808
1809        // Round-trip through serialization
1810        let json = serde_json::to_string(&ipc_err).unwrap();
1811        let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
1812
1813        // Reconstruct DispatchError
1814        let reconstructed = deserialized.to_dispatch_error();
1815        assert!(matches!(
1816            reconstructed,
1817            forge_error::DispatchError::ToolError {
1818                ref server,
1819                ref tool,
1820                ..
1821            } if server == "arbiter" && tool == "scan_target"
1822        ));
1823        assert!(!reconstructed.trips_circuit_breaker());
1824        assert_eq!(reconstructed.code(), "TOOL_ERROR");
1825    }
1826
1827    #[tokio::test]
1828    async fn tool_error_ipc_message_roundtrip() {
1829        let msg = ParentMessage::ToolCallResult {
1830            request_id: 42,
1831            result: Err(IpcDispatchError::from(
1832                &forge_error::DispatchError::ToolError {
1833                    server: "arbiter".into(),
1834                    tool: "scan".into(),
1835                    message: "bad params".into(),
1836                },
1837            )),
1838        };
1839
1840        let mut buf = Vec::new();
1841        write_message(&mut buf, &msg).await.unwrap();
1842
1843        let mut cursor = Cursor::new(buf);
1844        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1845
1846        match decoded {
1847            ParentMessage::ToolCallResult { request_id, result } => {
1848                assert_eq!(request_id, 42);
1849                let err = result.unwrap_err();
1850                assert_eq!(err.code, "TOOL_ERROR");
1851                assert_eq!(err.server, Some("arbiter".into()));
1852                assert_eq!(err.tool, Some("scan".into()));
1853
1854                // Verify it reconstructs to the correct variant
1855                let dispatch_err = err.to_dispatch_error();
1856                assert!(!dispatch_err.trips_circuit_breaker());
1857            }
1858            other => panic!("expected ToolCallResult, got: {:?}", other),
1859        }
1860    }
1861}