Skip to main content

forge_sandbox/
ipc.rs

1//! IPC protocol for parent ↔ worker communication.
2//!
3//! Uses length-delimited JSON messages: 4-byte big-endian length prefix + JSON payload.
4//! All messages are typed via [`ParentMessage`] and [`ChildMessage`] enums.
5
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12/// Error classification for structured error preservation across IPC.
13///
14/// When errors cross the process boundary via `ExecutionComplete`, the typed
15/// `SandboxError` variants are converted to strings. This enum preserves the
16/// error kind so the parent can reconstruct the correct `SandboxError` variant.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21    /// V8 execution timed out (CPU watchdog or async event loop).
22    Timeout,
23    /// V8 heap memory limit was exceeded.
24    HeapLimit,
25    /// A JavaScript error was thrown.
26    JsError,
27    /// Generic execution failure.
28    Execution,
29}
30
31/// Structured dispatch error for IPC transport.
32///
33/// Preserves [`forge_error::DispatchError`] variant information across the
34/// parent ↔ worker process boundary. Without this, all errors would be
35/// flattened to `DispatchError::Internal` in ChildProcess mode, losing
36/// error codes like `SERVER_NOT_FOUND` and fuzzy match suggestions.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39    /// Error code matching `DispatchError::code()` output.
40    pub code: String,
41    /// Human-readable error message.
42    pub message: String,
43    /// Server name (for ServerNotFound, ToolNotFound, ToolError, Timeout, CircuitOpen, Upstream).
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub server: Option<String>,
46    /// Tool name (for ToolNotFound, ToolError).
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub tool: Option<String>,
49    /// Timeout in milliseconds (for Timeout).
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55    /// Create from a plain string error message (used for non-dispatch errors
56    /// like "stash dispatcher not available" or URI validation failures).
57    pub fn from_string(msg: String) -> Self {
58        Self {
59            code: "INTERNAL".to_string(),
60            message: msg,
61            server: None,
62            tool: None,
63            timeout_ms: None,
64        }
65    }
66
67    /// Reconstruct the appropriate [`forge_error::DispatchError`] variant.
68    pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69        match self.code.as_str() {
70            "SERVER_NOT_FOUND" => {
71                forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72            }
73            "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74                server: self.server.unwrap_or_default(),
75                tool: self.tool.unwrap_or_default(),
76            },
77            "TIMEOUT" => forge_error::DispatchError::Timeout {
78                server: self.server.unwrap_or_default(),
79                timeout_ms: self.timeout_ms.unwrap_or(0),
80            },
81            "CIRCUIT_OPEN" => {
82                forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83            }
84            "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85                reason: self.message,
86            },
87            "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88                server: self.server.unwrap_or_default(),
89                message: self.message,
90            },
91            "TRANSPORT_DEAD" => forge_error::DispatchError::TransportDead {
92                server: self.server.unwrap_or_default(),
93                reason: self.message,
94            },
95            "TOOL_ERROR" => forge_error::DispatchError::ToolError {
96                server: self.server.unwrap_or_default(),
97                tool: self.tool.unwrap_or_default(),
98                message: self.message,
99            },
100            "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
101            _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
102        }
103    }
104}
105
106impl From<&forge_error::DispatchError> for IpcDispatchError {
107    fn from(e: &forge_error::DispatchError) -> Self {
108        let (server, tool, timeout_ms) = match e {
109            forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
110            forge_error::DispatchError::ToolNotFound { server, tool } => {
111                (Some(server.clone()), Some(tool.clone()), None)
112            }
113            forge_error::DispatchError::Timeout {
114                server, timeout_ms, ..
115            } => (Some(server.clone()), None, Some(*timeout_ms)),
116            forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
117            forge_error::DispatchError::Upstream { server, .. } => {
118                (Some(server.clone()), None, None)
119            }
120            forge_error::DispatchError::TransportDead { server, .. } => {
121                (Some(server.clone()), None, None)
122            }
123            forge_error::DispatchError::ToolError { server, tool, .. } => {
124                (Some(server.clone()), Some(tool.clone()), None)
125            }
126            _ => (None, None, None),
127        };
128
129        Self {
130            code: e.code().to_string(),
131            message: e.to_string(),
132            server,
133            tool,
134            timeout_ms,
135        }
136    }
137}
138
139/// Messages sent from the parent process to the worker child.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "type")]
142#[non_exhaustive]
143pub enum ParentMessage {
144    /// Initial message: execute this code in the sandbox.
145    Execute {
146        /// The JavaScript async arrow function to execute.
147        code: String,
148        /// Optional capability manifest (for search mode — not used in child process execute).
149        manifest: Option<Value>,
150        /// Worker configuration.
151        config: WorkerConfig,
152    },
153    /// Response to a tool call request from the child.
154    ToolCallResult {
155        /// Matches the request_id from ChildMessage::ToolCallRequest.
156        request_id: u64,
157        /// The tool call result, or a structured dispatch error.
158        result: Result<Value, IpcDispatchError>,
159    },
160    /// Response to a resource read request from the child.
161    ResourceReadResult {
162        /// Matches the request_id from ChildMessage::ResourceReadRequest.
163        request_id: u64,
164        /// The resource content, or a structured dispatch error.
165        result: Result<Value, IpcDispatchError>,
166    },
167    /// Reset the worker for a new execution (pool mode).
168    ///
169    /// The worker drops its current JsRuntime and creates a fresh one.
170    /// It responds with [`ChildMessage::ResetComplete`].
171    Reset {
172        /// New worker configuration for the next execution.
173        config: WorkerConfig,
174    },
175    /// Response to a stash operation from the child.
176    StashResult {
177        /// Matches the request_id from the stash request.
178        request_id: u64,
179        /// The stash operation result, or a structured dispatch error.
180        result: Result<Value, IpcDispatchError>,
181    },
182}
183
184/// Messages sent from the worker child to the parent process.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186#[serde(tag = "type")]
187#[non_exhaustive]
188pub enum ChildMessage {
189    /// Request the parent to dispatch a tool call.
190    ToolCallRequest {
191        /// Unique ID for correlating request ↔ response.
192        request_id: u64,
193        /// Target server name.
194        server: String,
195        /// Tool identifier.
196        tool: String,
197        /// Tool arguments.
198        args: Value,
199    },
200    /// Request the parent to read a resource.
201    ResourceReadRequest {
202        /// Unique ID for correlating request ↔ response.
203        request_id: u64,
204        /// Target server name.
205        server: String,
206        /// Resource URI.
207        uri: String,
208    },
209    /// Request the parent to put a value in the stash.
210    StashPut {
211        /// Unique ID for correlating request ↔ response.
212        request_id: u64,
213        /// Stash key.
214        key: String,
215        /// Value to store.
216        value: Value,
217        /// TTL in seconds (None = use default).
218        ttl_secs: Option<u32>,
219        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
220        #[serde(default, skip_serializing_if = "Option::is_none")]
221        group: Option<String>,
222    },
223    /// Request the parent to get a value from the stash.
224    StashGet {
225        /// Unique ID for correlating request ↔ response.
226        request_id: u64,
227        /// Stash key.
228        key: String,
229        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
230        #[serde(default, skip_serializing_if = "Option::is_none")]
231        group: Option<String>,
232    },
233    /// Request the parent to delete a value from the stash.
234    StashDelete {
235        /// Unique ID for correlating request ↔ response.
236        request_id: u64,
237        /// Stash key.
238        key: String,
239        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
240        #[serde(default, skip_serializing_if = "Option::is_none")]
241        group: Option<String>,
242    },
243    /// Request the parent to list stash keys.
244    StashKeys {
245        /// Unique ID for correlating request ↔ response.
246        request_id: u64,
247        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
248        #[serde(default, skip_serializing_if = "Option::is_none")]
249        group: Option<String>,
250    },
251    /// Worker has been reset and is ready for a new execution.
252    ResetComplete,
253    /// The execution has finished.
254    ExecutionComplete {
255        /// The result value, or an error message.
256        result: Result<Value, String>,
257        /// Classification of the error for typed reconstruction on the parent side.
258        /// Present only when `result` is `Err`. Defaults to `JsError` if absent
259        /// (backward compatibility with workers that don't send this field).
260        #[serde(default, skip_serializing_if = "Option::is_none")]
261        error_kind: Option<ErrorKind>,
262        /// Structured timeout value in milliseconds (v0.3.1+).
263        /// Present only when `error_kind` is `Timeout`. Replaces fragile string parsing.
264        /// Absent in v0.3.0 workers → None.
265        #[serde(default, skip_serializing_if = "Option::is_none")]
266        timeout_ms: Option<u64>,
267    },
268    /// A log message from the worker.
269    Log {
270        /// The log message text.
271        message: String,
272    },
273}
274
275/// Configuration passed to the worker process.
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct WorkerConfig {
278    /// Maximum execution time.
279    pub timeout_ms: u64,
280    /// V8 heap limit in bytes.
281    pub max_heap_size: usize,
282    /// Maximum tool calls per execution.
283    pub max_tool_calls: usize,
284    /// Maximum stash operations per execution.
285    #[serde(default, skip_serializing_if = "Option::is_none")]
286    pub max_stash_calls: Option<usize>,
287    /// Maximum size of tool call arguments in bytes.
288    pub max_tool_call_args_size: usize,
289    /// Maximum size of the JSON result in bytes.
290    pub max_output_size: usize,
291    /// Maximum size of LLM-generated code in bytes.
292    pub max_code_size: usize,
293    /// Maximum IPC message size in bytes. Defaults to [`DEFAULT_MAX_IPC_MESSAGE_SIZE`].
294    #[serde(default = "default_max_ipc_message_size")]
295    pub max_ipc_message_size: usize,
296    /// Maximum resource content size in bytes.
297    #[serde(default = "default_max_resource_size")]
298    pub max_resource_size: usize,
299    /// Maximum concurrent calls in forge.parallel().
300    #[serde(default = "default_max_parallel")]
301    pub max_parallel: usize,
302    /// Known tools for structured error fuzzy matching (v0.3.1+).
303    /// Each entry is `(server_name, tool_name)`.
304    #[serde(default, skip_serializing_if = "Option::is_none")]
305    pub known_tools: Option<Vec<(String, String)>>,
306    /// Known server names for structured error detection (v0.3.1+).
307    #[serde(default, skip_serializing_if = "Option::is_none")]
308    pub known_servers: Option<std::collections::HashSet<String>>,
309}
310
311fn default_max_ipc_message_size() -> usize {
312    DEFAULT_MAX_IPC_MESSAGE_SIZE
313}
314
315fn default_max_resource_size() -> usize {
316    DEFAULT_MAX_RESOURCE_SIZE
317}
318
319fn default_max_parallel() -> usize {
320    8
321}
322
323impl From<&crate::SandboxConfig> for WorkerConfig {
324    fn from(config: &crate::SandboxConfig) -> Self {
325        Self {
326            timeout_ms: config.timeout.as_millis() as u64,
327            max_heap_size: config.max_heap_size,
328            max_tool_calls: config.max_tool_calls,
329            max_stash_calls: config.max_stash_calls,
330            max_tool_call_args_size: config.max_tool_call_args_size,
331            max_output_size: config.max_output_size,
332            max_code_size: config.max_code_size,
333            max_ipc_message_size: config.max_ipc_message_size,
334            max_resource_size: config.max_resource_size,
335            max_parallel: config.max_parallel,
336            known_tools: None,
337            known_servers: None,
338        }
339    }
340}
341
342impl WorkerConfig {
343    /// Convert back to a SandboxConfig for use in the worker.
344    pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
345        crate::SandboxConfig {
346            timeout: Duration::from_millis(self.timeout_ms),
347            max_code_size: self.max_code_size,
348            max_output_size: self.max_output_size,
349            max_heap_size: self.max_heap_size,
350            max_concurrent: 1, // worker handles one execution
351            max_tool_calls: self.max_tool_calls,
352            max_stash_calls: self.max_stash_calls,
353            max_tool_call_args_size: self.max_tool_call_args_size,
354            execution_mode: crate::executor::ExecutionMode::InProcess, // worker always runs in-process
355            max_resource_size: self.max_resource_size,
356            max_parallel: self.max_parallel,
357            max_ipc_message_size: self.max_ipc_message_size,
358        }
359    }
360}
361
362/// Write a length-delimited JSON message to an async writer.
363///
364/// Format: 4-byte big-endian length prefix followed by the JSON payload bytes.
365pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
366    writer: &mut W,
367    msg: &T,
368) -> Result<(), std::io::Error> {
369    let payload = serde_json::to_vec(msg)
370        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
371    let len = u32::try_from(payload.len()).map_err(|_| {
372        std::io::Error::new(
373            std::io::ErrorKind::InvalidData,
374            format!(
375                "IPC payload too large: {} bytes (max {} bytes)",
376                payload.len(),
377                u32::MAX
378            ),
379        )
380    })?;
381    writer.write_all(&len.to_be_bytes()).await?;
382    writer.write_all(&payload).await?;
383    writer.flush().await?;
384    Ok(())
385}
386
387/// Write a raw JSON byte payload as a length-delimited IPC message.
388///
389/// This bypasses serialization entirely — useful for forwarding large
390/// tool/resource results without deserializing and re-serializing.
391pub async fn write_raw_message<W: AsyncWrite + Unpin>(
392    writer: &mut W,
393    payload: &[u8],
394) -> Result<(), std::io::Error> {
395    let len = u32::try_from(payload.len()).map_err(|_| {
396        std::io::Error::new(
397            std::io::ErrorKind::InvalidData,
398            format!(
399                "raw IPC payload too large: {} bytes (max {} bytes)",
400                payload.len(),
401                u32::MAX
402            ),
403        )
404    })?;
405    writer.write_all(&len.to_be_bytes()).await?;
406    writer.write_all(payload).await?;
407    writer.flush().await?;
408    Ok(())
409}
410
411/// Read a raw JSON byte payload from an IPC message without deserializing.
412///
413/// Returns the raw bytes as an owned `Box<RawValue>` which can be forwarded
414/// without parsing. Returns `None` on EOF.
415pub async fn read_raw_message<R: AsyncRead + Unpin>(
416    reader: &mut R,
417    max_size: usize,
418) -> Result<Option<Box<RawValue>>, std::io::Error> {
419    let mut len_buf = [0u8; 4];
420    match reader.read_exact(&mut len_buf).await {
421        Ok(_) => {}
422        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
423        Err(e) => return Err(e),
424    }
425
426    let len = u32::from_be_bytes(len_buf) as usize;
427
428    if len > max_size {
429        return Err(std::io::Error::new(
430            std::io::ErrorKind::InvalidData,
431            format!(
432                "raw IPC message too large: {} bytes (limit: {} bytes)",
433                len, max_size
434            ),
435        ));
436    }
437
438    let mut payload = vec![0u8; len];
439    reader.read_exact(&mut payload).await?;
440
441    let raw: Box<RawValue> = serde_json::from_slice(&payload)
442        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
443    Ok(Some(raw))
444}
445
446/// Default maximum IPC message size: 65 MB.
447///
448/// Sized to fit a [`DEFAULT_MAX_RESOURCE_SIZE`] payload plus 1 MB of IPC
449/// envelope overhead. Configurable via `sandbox.max_ipc_message_size_mb`
450/// in config — tighten for memory-constrained deployments, raise for
451/// large-payload workflows.
452pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 65 * 1024 * 1024;
453
454/// Default maximum serialized resource size: 64 MB.
455///
456/// Permissive by default to support large-payload use cases out of the box;
457/// callers in memory-constrained deployments should tighten via
458/// `sandbox.max_resource_size_mb`. Kept below [`DEFAULT_MAX_IPC_MESSAGE_SIZE`]
459/// to leave room for IPC envelope overhead in child-process mode.
460pub const DEFAULT_MAX_RESOURCE_SIZE: usize = 64 * 1024 * 1024;
461
462/// Read a length-delimited JSON message from an async reader.
463///
464/// Returns `None` if the reader has reached EOF (clean shutdown).
465/// Uses [`DEFAULT_MAX_IPC_MESSAGE_SIZE`] as the size limit.
466pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
467    reader: &mut R,
468) -> Result<Option<T>, std::io::Error> {
469    read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
470}
471
472/// Read a length-delimited JSON message with a configurable size limit.
473///
474/// Returns `None` if the reader has reached EOF (clean shutdown).
475/// The `max_size` parameter controls the maximum allowed message size in bytes.
476pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
477    reader: &mut R,
478    max_size: usize,
479) -> Result<Option<T>, std::io::Error> {
480    let mut len_buf = [0u8; 4];
481    match reader.read_exact(&mut len_buf).await {
482        Ok(_) => {}
483        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
484        Err(e) => return Err(e),
485    }
486
487    let len = u32::from_be_bytes(len_buf) as usize;
488
489    // Reject messages exceeding the configured limit
490    if len > max_size {
491        return Err(std::io::Error::new(
492            std::io::ErrorKind::InvalidData,
493            format!(
494                "IPC message too large: {} bytes (limit: {} bytes)",
495                len, max_size
496            ),
497        ));
498    }
499
500    let mut payload = vec![0u8; len];
501    reader.read_exact(&mut payload).await?;
502
503    let msg: T = serde_json::from_slice(&payload)
504        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
505    Ok(Some(msg))
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511    use std::io::Cursor;
512
513    #[tokio::test]
514    async fn roundtrip_parent_execute_message() {
515        let msg = ParentMessage::Execute {
516            code: "async () => { return 42; }".into(),
517            manifest: Some(serde_json::json!({"servers": []})),
518            config: WorkerConfig {
519                timeout_ms: 5000,
520                max_heap_size: 64 * 1024 * 1024,
521                max_tool_calls: 50,
522                max_stash_calls: None,
523                max_tool_call_args_size: 1024 * 1024,
524                max_output_size: 1024 * 1024,
525                max_code_size: 64 * 1024,
526                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
527                max_resource_size: 64 * 1024 * 1024,
528                max_parallel: 8,
529                known_tools: None,
530                known_servers: None,
531            },
532        };
533
534        let mut buf = Vec::new();
535        write_message(&mut buf, &msg).await.unwrap();
536
537        let mut cursor = Cursor::new(buf);
538        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
539
540        match decoded {
541            ParentMessage::Execute {
542                code,
543                manifest,
544                config,
545            } => {
546                assert_eq!(code, "async () => { return 42; }");
547                assert!(manifest.is_some());
548                assert_eq!(config.timeout_ms, 5000);
549            }
550            other => panic!("expected Execute, got: {:?}", other),
551        }
552    }
553
554    #[tokio::test]
555    async fn roundtrip_parent_tool_result() {
556        let msg = ParentMessage::ToolCallResult {
557            request_id: 42,
558            result: Ok(serde_json::json!({"status": "ok"})),
559        };
560
561        let mut buf = Vec::new();
562        write_message(&mut buf, &msg).await.unwrap();
563
564        let mut cursor = Cursor::new(buf);
565        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
566
567        match decoded {
568            ParentMessage::ToolCallResult { request_id, result } => {
569                assert_eq!(request_id, 42);
570                assert!(result.is_ok());
571            }
572            other => panic!("expected ToolCallResult, got: {:?}", other),
573        }
574    }
575
576    #[tokio::test]
577    async fn roundtrip_parent_tool_result_error() {
578        let msg = ParentMessage::ToolCallResult {
579            request_id: 7,
580            result: Err(IpcDispatchError::from_string("connection refused".into())),
581        };
582
583        let mut buf = Vec::new();
584        write_message(&mut buf, &msg).await.unwrap();
585
586        let mut cursor = Cursor::new(buf);
587        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
588
589        match decoded {
590            ParentMessage::ToolCallResult { request_id, result } => {
591                assert_eq!(request_id, 7);
592                let err = result.unwrap_err();
593                assert_eq!(err.message, "connection refused");
594                assert_eq!(err.code, "INTERNAL");
595            }
596            other => panic!("expected ToolCallResult, got: {:?}", other),
597        }
598    }
599
600    #[tokio::test]
601    async fn roundtrip_child_tool_request() {
602        let msg = ChildMessage::ToolCallRequest {
603            request_id: 1,
604            server: "narsil".into(),
605            tool: "ast.parse".into(),
606            args: serde_json::json!({"file": "test.rs"}),
607        };
608
609        let mut buf = Vec::new();
610        write_message(&mut buf, &msg).await.unwrap();
611
612        let mut cursor = Cursor::new(buf);
613        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
614
615        match decoded {
616            ChildMessage::ToolCallRequest {
617                request_id,
618                server,
619                tool,
620                args,
621            } => {
622                assert_eq!(request_id, 1);
623                assert_eq!(server, "narsil");
624                assert_eq!(tool, "ast.parse");
625                assert_eq!(args["file"], "test.rs");
626            }
627            other => panic!("expected ToolCallRequest, got: {:?}", other),
628        }
629    }
630
631    #[tokio::test]
632    async fn roundtrip_child_execution_complete() {
633        let msg = ChildMessage::ExecutionComplete {
634            result: Ok(serde_json::json!([1, 2, 3])),
635            error_kind: None,
636            timeout_ms: None,
637        };
638
639        let mut buf = Vec::new();
640        write_message(&mut buf, &msg).await.unwrap();
641
642        let mut cursor = Cursor::new(buf);
643        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
644
645        match decoded {
646            ChildMessage::ExecutionComplete {
647                result, error_kind, ..
648            } => {
649                assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
650                assert_eq!(error_kind, None);
651            }
652            other => panic!("expected ExecutionComplete, got: {:?}", other),
653        }
654    }
655
656    #[tokio::test]
657    async fn roundtrip_child_log() {
658        let msg = ChildMessage::Log {
659            message: "processing step 3".into(),
660        };
661
662        let mut buf = Vec::new();
663        write_message(&mut buf, &msg).await.unwrap();
664
665        let mut cursor = Cursor::new(buf);
666        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
667
668        match decoded {
669            ChildMessage::Log { message } => {
670                assert_eq!(message, "processing step 3");
671            }
672            other => panic!("expected Log, got: {:?}", other),
673        }
674    }
675
676    #[tokio::test]
677    async fn multiple_messages_in_stream() {
678        let msg1 = ChildMessage::Log {
679            message: "first".into(),
680        };
681        let msg2 = ChildMessage::ToolCallRequest {
682            request_id: 1,
683            server: "s".into(),
684            tool: "t".into(),
685            args: serde_json::json!({}),
686        };
687        let msg3 = ChildMessage::ExecutionComplete {
688            result: Ok(serde_json::json!("done")),
689            error_kind: None,
690            timeout_ms: None,
691        };
692
693        let mut buf = Vec::new();
694        write_message(&mut buf, &msg1).await.unwrap();
695        write_message(&mut buf, &msg2).await.unwrap();
696        write_message(&mut buf, &msg3).await.unwrap();
697
698        let mut cursor = Cursor::new(buf);
699        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
700        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
701        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
702
703        assert!(matches!(d1, ChildMessage::Log { .. }));
704        assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
705        assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
706
707        // EOF after all messages
708        let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
709        assert!(d4.is_none());
710    }
711
712    #[tokio::test]
713    async fn execution_complete_error_roundtrip() {
714        let msg = ChildMessage::ExecutionComplete {
715            result: Err("failed to create tokio runtime: resource unavailable".into()),
716            error_kind: Some(ErrorKind::Execution),
717            timeout_ms: None,
718        };
719
720        let mut buf = Vec::new();
721        write_message(&mut buf, &msg).await.unwrap();
722
723        let mut cursor = Cursor::new(buf);
724        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
725
726        match decoded {
727            ChildMessage::ExecutionComplete {
728                result, error_kind, ..
729            } => {
730                let err = result.unwrap_err();
731                assert!(
732                    err.contains("tokio runtime"),
733                    "expected runtime error: {err}"
734                );
735                assert_eq!(error_kind, Some(ErrorKind::Execution));
736            }
737            other => panic!("expected ExecutionComplete, got: {:?}", other),
738        }
739    }
740
741    #[tokio::test]
742    async fn eof_returns_none() {
743        let mut cursor = Cursor::new(Vec::<u8>::new());
744        let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
745        assert!(result.is_none());
746    }
747
748    #[test]
749    fn u32_try_from_overflow() {
750        // Validates that the conversion logic correctly rejects sizes > u32::MAX
751        let overflow_size = u32::MAX as usize + 1;
752        assert!(u32::try_from(overflow_size).is_err());
753    }
754
755    #[tokio::test]
756    async fn write_message_normal_size_succeeds() {
757        // Regression guard: normal-sized messages still work after the try_from change
758        let msg = ChildMessage::Log {
759            message: "a".repeat(1024),
760        };
761        let mut buf = Vec::new();
762        write_message(&mut buf, &msg).await.unwrap();
763        assert!(buf.len() > 1024);
764    }
765
766    #[tokio::test]
767    async fn large_message_roundtrip() {
768        // A large payload (~1MB of data)
769        let large_data = "x".repeat(1_000_000);
770        let msg = ChildMessage::ExecutionComplete {
771            result: Ok(serde_json::json!(large_data)),
772            error_kind: None,
773            timeout_ms: None,
774        };
775
776        let mut buf = Vec::new();
777        write_message(&mut buf, &msg).await.unwrap();
778
779        let mut cursor = Cursor::new(buf);
780        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
781
782        match decoded {
783            ChildMessage::ExecutionComplete { result, .. } => {
784                assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
785            }
786            other => panic!("expected ExecutionComplete, got: {:?}", other),
787        }
788    }
789
790    #[tokio::test]
791    async fn worker_config_roundtrip_from_sandbox_config() {
792        let sandbox = crate::SandboxConfig::default();
793        let worker = WorkerConfig::from(&sandbox);
794        let back = worker.to_sandbox_config();
795
796        assert_eq!(sandbox.timeout, back.timeout);
797        assert_eq!(sandbox.max_heap_size, back.max_heap_size);
798        assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
799        assert_eq!(sandbox.max_output_size, back.max_output_size);
800        assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
801        assert_eq!(worker.max_ipc_message_size, 65 * 1024 * 1024); // 65 MB default
802    }
803
804    #[tokio::test]
805    async fn read_message_with_limit_rejects_oversized() {
806        let msg = ChildMessage::Log {
807            message: "x".repeat(1024),
808        };
809        let mut buf = Vec::new();
810        write_message(&mut buf, &msg).await.unwrap();
811
812        // Set limit smaller than the message payload
813        let mut cursor = Cursor::new(buf);
814        let result: Result<Option<ChildMessage>, _> =
815            read_message_with_limit(&mut cursor, 64).await;
816        assert!(result.is_err());
817        let err_msg = result.unwrap_err().to_string();
818        assert!(err_msg.contains("too large"), "error: {err_msg}");
819    }
820
821    #[tokio::test]
822    async fn read_message_with_limit_accepts_within_limit() {
823        let msg = ChildMessage::Log {
824            message: "hello".into(),
825        };
826        let mut buf = Vec::new();
827        write_message(&mut buf, &msg).await.unwrap();
828
829        let mut cursor = Cursor::new(buf);
830        let result: Option<ChildMessage> =
831            read_message_with_limit(&mut cursor, 1024).await.unwrap();
832        assert!(result.is_some());
833    }
834
835    #[tokio::test]
836    async fn worker_config_ipc_limit_serde_default() {
837        // Deserializing JSON without max_ipc_message_size should use the default
838        let json = r#"{
839            "timeout_ms": 5000,
840            "max_heap_size": 67108864,
841            "max_tool_calls": 50,
842            "max_tool_call_args_size": 1048576,
843            "max_output_size": 1048576,
844            "max_code_size": 65536
845        }"#;
846        let config: WorkerConfig = serde_json::from_str(json).unwrap();
847        assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
848    }
849
850    // --- IPC-01: ResourceReadRequest round-trip ---
851    #[tokio::test]
852    async fn ipc_01_resource_read_request_roundtrip() {
853        let msg = ChildMessage::ResourceReadRequest {
854            request_id: 10,
855            server: "postgres".into(),
856            uri: "file:///logs/app.log".into(),
857        };
858
859        let mut buf = Vec::new();
860        write_message(&mut buf, &msg).await.unwrap();
861
862        let mut cursor = Cursor::new(buf);
863        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
864
865        match decoded {
866            ChildMessage::ResourceReadRequest {
867                request_id,
868                server,
869                uri,
870            } => {
871                assert_eq!(request_id, 10);
872                assert_eq!(server, "postgres");
873                assert_eq!(uri, "file:///logs/app.log");
874            }
875            other => panic!("expected ResourceReadRequest, got: {:?}", other),
876        }
877    }
878
879    // --- IPC-02: ResourceReadResult (success) round-trip ---
880    #[tokio::test]
881    async fn ipc_02_resource_read_result_success_roundtrip() {
882        let msg = ParentMessage::ResourceReadResult {
883            request_id: 11,
884            result: Ok(serde_json::json!({"content": "log data here"})),
885        };
886
887        let mut buf = Vec::new();
888        write_message(&mut buf, &msg).await.unwrap();
889
890        let mut cursor = Cursor::new(buf);
891        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
892
893        match decoded {
894            ParentMessage::ResourceReadResult { request_id, result } => {
895                assert_eq!(request_id, 11);
896                let val = result.unwrap();
897                assert_eq!(val["content"], "log data here");
898            }
899            other => panic!("expected ResourceReadResult, got: {:?}", other),
900        }
901    }
902
903    // --- IPC-03: ResourceReadResult (error) round-trip ---
904    #[tokio::test]
905    async fn ipc_03_resource_read_result_error_roundtrip() {
906        let msg = ParentMessage::ResourceReadResult {
907            request_id: 12,
908            result: Err(IpcDispatchError::from_string("resource not found".into())),
909        };
910
911        let mut buf = Vec::new();
912        write_message(&mut buf, &msg).await.unwrap();
913
914        let mut cursor = Cursor::new(buf);
915        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
916
917        match decoded {
918            ParentMessage::ResourceReadResult { request_id, result } => {
919                assert_eq!(request_id, 12);
920                let err = result.unwrap_err();
921                assert_eq!(err.message, "resource not found");
922                assert_eq!(err.code, "INTERNAL");
923            }
924            other => panic!("expected ResourceReadResult, got: {:?}", other),
925        }
926    }
927
928    // --- IPC-04: StashPut round-trip ---
929    #[tokio::test]
930    async fn ipc_04_stash_put_roundtrip() {
931        let msg = ChildMessage::StashPut {
932            request_id: 20,
933            key: "my-key".into(),
934            value: serde_json::json!({"data": 42}),
935            ttl_secs: Some(60),
936            group: None,
937        };
938
939        let mut buf = Vec::new();
940        write_message(&mut buf, &msg).await.unwrap();
941
942        let mut cursor = Cursor::new(buf);
943        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
944
945        match decoded {
946            ChildMessage::StashPut {
947                request_id,
948                key,
949                value,
950                ttl_secs,
951                group,
952            } => {
953                assert_eq!(request_id, 20);
954                assert_eq!(key, "my-key");
955                assert_eq!(value["data"], 42);
956                assert_eq!(ttl_secs, Some(60));
957                assert_eq!(group, None);
958            }
959            other => panic!("expected StashPut, got: {:?}", other),
960        }
961    }
962
963    // --- IPC-05: StashGet round-trip ---
964    #[tokio::test]
965    async fn ipc_05_stash_get_roundtrip() {
966        let msg = ChildMessage::StashGet {
967            request_id: 21,
968            key: "lookup-key".into(),
969            group: None,
970        };
971
972        let mut buf = Vec::new();
973        write_message(&mut buf, &msg).await.unwrap();
974
975        let mut cursor = Cursor::new(buf);
976        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
977
978        match decoded {
979            ChildMessage::StashGet {
980                request_id,
981                key,
982                group,
983            } => {
984                assert_eq!(request_id, 21);
985                assert_eq!(key, "lookup-key");
986                assert_eq!(group, None);
987            }
988            other => panic!("expected StashGet, got: {:?}", other),
989        }
990    }
991
992    // --- IPC-06: StashDelete round-trip ---
993    #[tokio::test]
994    async fn ipc_06_stash_delete_roundtrip() {
995        let msg = ChildMessage::StashDelete {
996            request_id: 22,
997            key: "delete-me".into(),
998            group: None,
999        };
1000
1001        let mut buf = Vec::new();
1002        write_message(&mut buf, &msg).await.unwrap();
1003
1004        let mut cursor = Cursor::new(buf);
1005        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1006
1007        match decoded {
1008            ChildMessage::StashDelete {
1009                request_id,
1010                key,
1011                group,
1012            } => {
1013                assert_eq!(request_id, 22);
1014                assert_eq!(key, "delete-me");
1015                assert_eq!(group, None);
1016            }
1017            other => panic!("expected StashDelete, got: {:?}", other),
1018        }
1019    }
1020
1021    // --- IPC-07: StashKeys round-trip ---
1022    #[tokio::test]
1023    async fn ipc_07_stash_keys_roundtrip() {
1024        let msg = ChildMessage::StashKeys {
1025            request_id: 23,
1026            group: None,
1027        };
1028
1029        let mut buf = Vec::new();
1030        write_message(&mut buf, &msg).await.unwrap();
1031
1032        let mut cursor = Cursor::new(buf);
1033        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1034
1035        match decoded {
1036            ChildMessage::StashKeys { request_id, group } => {
1037                assert_eq!(request_id, 23);
1038                assert_eq!(group, None);
1039            }
1040            other => panic!("expected StashKeys, got: {:?}", other),
1041        }
1042    }
1043
1044    // --- IPC-08: StashResult round-trip ---
1045    #[tokio::test]
1046    async fn ipc_08_stash_result_roundtrip() {
1047        let msg = ParentMessage::StashResult {
1048            request_id: 24,
1049            result: Ok(serde_json::json!({"ok": true})),
1050        };
1051
1052        let mut buf = Vec::new();
1053        write_message(&mut buf, &msg).await.unwrap();
1054
1055        let mut cursor = Cursor::new(buf);
1056        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1057
1058        match decoded {
1059            ParentMessage::StashResult { request_id, result } => {
1060                assert_eq!(request_id, 24);
1061                assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1062            }
1063            other => panic!("expected StashResult, got: {:?}", other),
1064        }
1065    }
1066
1067    // --- IPC-09: Mixed message interleaving (tool + resource + stash in single stream) ---
1068    #[tokio::test]
1069    async fn ipc_09_mixed_message_interleaving() {
1070        let msg1 = ChildMessage::ToolCallRequest {
1071            request_id: 1,
1072            server: "s".into(),
1073            tool: "t".into(),
1074            args: serde_json::json!({}),
1075        };
1076        let msg2 = ChildMessage::ResourceReadRequest {
1077            request_id: 2,
1078            server: "pg".into(),
1079            uri: "file:///data".into(),
1080        };
1081        let msg3 = ChildMessage::StashPut {
1082            request_id: 3,
1083            key: "k".into(),
1084            value: serde_json::json!("v"),
1085            ttl_secs: None,
1086            group: None,
1087        };
1088        let msg4 = ChildMessage::StashGet {
1089            request_id: 4,
1090            key: "k".into(),
1091            group: None,
1092        };
1093        let msg5 = ChildMessage::ExecutionComplete {
1094            result: Ok(serde_json::json!("done")),
1095            error_kind: None,
1096            timeout_ms: None,
1097        };
1098
1099        let mut buf = Vec::new();
1100        write_message(&mut buf, &msg1).await.unwrap();
1101        write_message(&mut buf, &msg2).await.unwrap();
1102        write_message(&mut buf, &msg3).await.unwrap();
1103        write_message(&mut buf, &msg4).await.unwrap();
1104        write_message(&mut buf, &msg5).await.unwrap();
1105
1106        let mut cursor = Cursor::new(buf);
1107        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1108        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1109        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1110        let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1111        let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1112
1113        assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1114        assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1115        assert!(matches!(d3, ChildMessage::StashPut { .. }));
1116        assert!(matches!(d4, ChildMessage::StashGet { .. }));
1117        assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1118
1119        // EOF after all messages
1120        let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1121        assert!(d6.is_none());
1122    }
1123
1124    // --- IPC-P01: Reset round-trip ---
1125    #[tokio::test]
1126    async fn ipc_p01_reset_roundtrip() {
1127        let msg = ParentMessage::Reset {
1128            config: WorkerConfig {
1129                timeout_ms: 3000,
1130                max_heap_size: 32 * 1024 * 1024,
1131                max_tool_calls: 25,
1132                max_stash_calls: None,
1133                max_tool_call_args_size: 512 * 1024,
1134                max_output_size: 512 * 1024,
1135                max_code_size: 32 * 1024,
1136                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1137                max_resource_size: 32 * 1024 * 1024,
1138                max_parallel: 4,
1139                known_tools: None,
1140                known_servers: None,
1141            },
1142        };
1143
1144        let mut buf = Vec::new();
1145        write_message(&mut buf, &msg).await.unwrap();
1146
1147        let mut cursor = Cursor::new(buf);
1148        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1149
1150        match decoded {
1151            ParentMessage::Reset { config } => {
1152                assert_eq!(config.timeout_ms, 3000);
1153                assert_eq!(config.max_tool_calls, 25);
1154            }
1155            other => panic!("expected Reset, got: {:?}", other),
1156        }
1157    }
1158
1159    // --- IPC-P02: ResetComplete round-trip ---
1160    #[tokio::test]
1161    async fn ipc_p02_reset_complete_roundtrip() {
1162        let msg = ChildMessage::ResetComplete;
1163
1164        let mut buf = Vec::new();
1165        write_message(&mut buf, &msg).await.unwrap();
1166
1167        let mut cursor = Cursor::new(buf);
1168        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1169
1170        assert!(matches!(decoded, ChildMessage::ResetComplete));
1171    }
1172
1173    // --- IPC-P03: Reset + Execute interleaving in single stream ---
1174    #[tokio::test]
1175    async fn ipc_p03_reset_execute_interleaving() {
1176        let reset = ParentMessage::Reset {
1177            config: WorkerConfig {
1178                timeout_ms: 5000,
1179                max_heap_size: 64 * 1024 * 1024,
1180                max_tool_calls: 50,
1181                max_stash_calls: None,
1182                max_tool_call_args_size: 1024 * 1024,
1183                max_output_size: 1024 * 1024,
1184                max_code_size: 64 * 1024,
1185                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1186                max_resource_size: 64 * 1024 * 1024,
1187                max_parallel: 8,
1188                known_tools: None,
1189                known_servers: None,
1190            },
1191        };
1192        let execute = ParentMessage::Execute {
1193            code: "async () => 42".into(),
1194            manifest: None,
1195            config: WorkerConfig {
1196                timeout_ms: 5000,
1197                max_heap_size: 64 * 1024 * 1024,
1198                max_tool_calls: 50,
1199                max_stash_calls: None,
1200                max_tool_call_args_size: 1024 * 1024,
1201                max_output_size: 1024 * 1024,
1202                max_code_size: 64 * 1024,
1203                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1204                max_resource_size: 64 * 1024 * 1024,
1205                max_parallel: 8,
1206                known_tools: None,
1207                known_servers: None,
1208            },
1209        };
1210
1211        let mut buf = Vec::new();
1212        write_message(&mut buf, &reset).await.unwrap();
1213        write_message(&mut buf, &execute).await.unwrap();
1214
1215        let mut cursor = Cursor::new(buf);
1216        let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1217        let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1218
1219        assert!(matches!(d1, ParentMessage::Reset { .. }));
1220        assert!(matches!(d2, ParentMessage::Execute { .. }));
1221    }
1222
1223    // --- IPC-10: Oversized stash message rejected by read_message_with_limit ---
1224    #[tokio::test]
1225    async fn ipc_10_oversized_stash_message_rejected() {
1226        let msg = ChildMessage::StashPut {
1227            request_id: 100,
1228            key: "k".into(),
1229            value: serde_json::json!("x".repeat(2048)),
1230            ttl_secs: Some(60),
1231            group: None,
1232        };
1233        let mut buf = Vec::new();
1234        write_message(&mut buf, &msg).await.unwrap();
1235
1236        // Set limit smaller than the message payload
1237        let mut cursor = Cursor::new(buf);
1238        let result: Result<Option<ChildMessage>, _> =
1239            read_message_with_limit(&mut cursor, 64).await;
1240        assert!(result.is_err());
1241        let err_msg = result.unwrap_err().to_string();
1242        assert!(
1243            err_msg.contains("too large"),
1244            "error should mention 'too large': {err_msg}"
1245        );
1246    }
1247
1248    // --- IPC-O01: ErrorKind timeout round-trip ---
1249    #[tokio::test]
1250    async fn ipc_o01_error_kind_timeout_roundtrip() {
1251        let msg = ChildMessage::ExecutionComplete {
1252            result: Err("execution timed out after 500ms".into()),
1253            error_kind: Some(ErrorKind::Timeout),
1254            timeout_ms: Some(500),
1255        };
1256
1257        let mut buf = Vec::new();
1258        write_message(&mut buf, &msg).await.unwrap();
1259
1260        let mut cursor = Cursor::new(buf);
1261        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1262
1263        match decoded {
1264            ChildMessage::ExecutionComplete {
1265                result,
1266                error_kind,
1267                timeout_ms,
1268            } => {
1269                assert!(result.is_err());
1270                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1271                assert_eq!(timeout_ms, Some(500));
1272            }
1273            other => panic!("expected ExecutionComplete, got: {:?}", other),
1274        }
1275    }
1276
1277    // --- IPC-O02: ErrorKind heap_limit round-trip ---
1278    #[tokio::test]
1279    async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1280        let msg = ChildMessage::ExecutionComplete {
1281            result: Err("V8 heap limit exceeded".into()),
1282            error_kind: Some(ErrorKind::HeapLimit),
1283            timeout_ms: None,
1284        };
1285
1286        let mut buf = Vec::new();
1287        write_message(&mut buf, &msg).await.unwrap();
1288
1289        let mut cursor = Cursor::new(buf);
1290        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1291
1292        match decoded {
1293            ChildMessage::ExecutionComplete {
1294                result, error_kind, ..
1295            } => {
1296                assert!(result.is_err());
1297                assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1298            }
1299            other => panic!("expected ExecutionComplete, got: {:?}", other),
1300        }
1301    }
1302
1303    // --- IPC-O03: ErrorKind absent defaults to None (backward compatibility) ---
1304    #[tokio::test]
1305    async fn ipc_o03_error_kind_backward_compat() {
1306        // Simulate a message from an older worker that doesn't include error_kind.
1307        // The JSON doesn't have the error_kind field at all.
1308        let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1309        let mut buf = Vec::new();
1310        let payload = json.as_bytes();
1311        let len = payload.len() as u32;
1312        buf.extend_from_slice(&len.to_be_bytes());
1313        buf.extend_from_slice(payload);
1314
1315        let mut cursor = Cursor::new(buf);
1316        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1317
1318        match decoded {
1319            ChildMessage::ExecutionComplete {
1320                result,
1321                error_kind,
1322                timeout_ms,
1323            } => {
1324                assert!(result.is_err());
1325                assert_eq!(
1326                    error_kind, None,
1327                    "missing error_kind should default to None"
1328                );
1329                assert_eq!(
1330                    timeout_ms, None,
1331                    "missing timeout_ms should default to None"
1332                );
1333            }
1334            other => panic!("expected ExecutionComplete, got: {:?}", other),
1335        }
1336    }
1337
1338    // --- IPC-O04: ErrorKind js_error round-trip ---
1339    #[tokio::test]
1340    async fn ipc_o04_error_kind_js_error_roundtrip() {
1341        let msg = ChildMessage::ExecutionComplete {
1342            result: Err("ReferenceError: x is not defined".into()),
1343            error_kind: Some(ErrorKind::JsError),
1344            timeout_ms: None,
1345        };
1346
1347        let mut buf = Vec::new();
1348        write_message(&mut buf, &msg).await.unwrap();
1349
1350        let mut cursor = Cursor::new(buf);
1351        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1352
1353        match decoded {
1354            ChildMessage::ExecutionComplete {
1355                result, error_kind, ..
1356            } => {
1357                assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1358                assert_eq!(error_kind, Some(ErrorKind::JsError));
1359            }
1360            other => panic!("expected ExecutionComplete, got: {:?}", other),
1361        }
1362    }
1363
1364    // --- IPC-O05: Success result has no error_kind in serialized JSON ---
1365    #[tokio::test]
1366    async fn ipc_o05_success_omits_error_kind() {
1367        let msg = ChildMessage::ExecutionComplete {
1368            result: Ok(serde_json::json!(42)),
1369            error_kind: None,
1370            timeout_ms: None,
1371        };
1372
1373        let json = serde_json::to_string(&msg).unwrap();
1374        // error_kind: None should be skipped thanks to skip_serializing_if
1375        assert!(
1376            !json.contains("error_kind"),
1377            "success messages should not contain error_kind field: {json}"
1378        );
1379        assert!(
1380            !json.contains("timeout_ms"),
1381            "success messages should not contain timeout_ms field: {json}"
1382        );
1383    }
1384
1385    // --- H1: Stash Group Isolation Tests ---
1386
1387    #[tokio::test]
1388    async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1389        let msg = ChildMessage::StashPut {
1390            request_id: 50,
1391            key: "grouped-key".into(),
1392            value: serde_json::json!({"data": "secret"}),
1393            ttl_secs: Some(120),
1394            group: Some("analytics".into()),
1395        };
1396
1397        let mut buf = Vec::new();
1398        write_message(&mut buf, &msg).await.unwrap();
1399
1400        let mut cursor = Cursor::new(buf);
1401        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1402
1403        match decoded {
1404            ChildMessage::StashPut {
1405                request_id,
1406                key,
1407                group,
1408                ..
1409            } => {
1410                assert_eq!(request_id, 50);
1411                assert_eq!(key, "grouped-key");
1412                assert_eq!(group, Some("analytics".into()));
1413            }
1414            other => panic!("expected StashPut, got: {:?}", other),
1415        }
1416    }
1417
1418    #[tokio::test]
1419    async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1420        let msg = ChildMessage::StashGet {
1421            request_id: 51,
1422            key: "grouped-key".into(),
1423            group: Some("analytics".into()),
1424        };
1425
1426        let mut buf = Vec::new();
1427        write_message(&mut buf, &msg).await.unwrap();
1428
1429        let mut cursor = Cursor::new(buf);
1430        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1431
1432        match decoded {
1433            ChildMessage::StashGet {
1434                request_id,
1435                key,
1436                group,
1437            } => {
1438                assert_eq!(request_id, 51);
1439                assert_eq!(key, "grouped-key");
1440                assert_eq!(group, Some("analytics".into()));
1441            }
1442            other => panic!("expected StashGet, got: {:?}", other),
1443        }
1444    }
1445
1446    #[tokio::test]
1447    async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1448        let msg = ChildMessage::StashDelete {
1449            request_id: 52,
1450            key: "grouped-key".into(),
1451            group: Some("analytics".into()),
1452        };
1453
1454        let mut buf = Vec::new();
1455        write_message(&mut buf, &msg).await.unwrap();
1456
1457        let mut cursor = Cursor::new(buf);
1458        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1459
1460        match decoded {
1461            ChildMessage::StashDelete {
1462                request_id,
1463                key,
1464                group,
1465            } => {
1466                assert_eq!(request_id, 52);
1467                assert_eq!(key, "grouped-key");
1468                assert_eq!(group, Some("analytics".into()));
1469            }
1470            other => panic!("expected StashDelete, got: {:?}", other),
1471        }
1472    }
1473
1474    #[tokio::test]
1475    async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1476        let msg = ChildMessage::StashKeys {
1477            request_id: 53,
1478            group: Some("analytics".into()),
1479        };
1480
1481        let mut buf = Vec::new();
1482        write_message(&mut buf, &msg).await.unwrap();
1483
1484        let mut cursor = Cursor::new(buf);
1485        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1486
1487        match decoded {
1488            ChildMessage::StashKeys { request_id, group } => {
1489                assert_eq!(request_id, 53);
1490                assert_eq!(group, Some("analytics".into()));
1491            }
1492            other => panic!("expected StashKeys, got: {:?}", other),
1493        }
1494    }
1495
1496    #[tokio::test]
1497    async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1498        // group: None → field absent in JSON
1499        let msg = ChildMessage::StashPut {
1500            request_id: 54,
1501            key: "no-group-key".into(),
1502            value: serde_json::json!("val"),
1503            ttl_secs: None,
1504            group: None,
1505        };
1506
1507        let json = serde_json::to_string(&msg).unwrap();
1508        assert!(
1509            !json.contains("\"group\""),
1510            "group:None should be absent in serialized JSON: {json}"
1511        );
1512
1513        // Still deserializes correctly
1514        let mut buf = Vec::new();
1515        write_message(&mut buf, &msg).await.unwrap();
1516        let mut cursor = Cursor::new(buf);
1517        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1518        match decoded {
1519            ChildMessage::StashPut { group, .. } => {
1520                assert_eq!(group, None);
1521            }
1522            other => panic!("expected StashPut, got: {:?}", other),
1523        }
1524    }
1525
1526    #[tokio::test]
1527    async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1528        // Simulate a v0.3.0 worker message that lacks the group field entirely
1529        let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1530        let mut buf = Vec::new();
1531        let payload = json.as_bytes();
1532        let len = payload.len() as u32;
1533        buf.extend_from_slice(&len.to_be_bytes());
1534        buf.extend_from_slice(payload);
1535
1536        let mut cursor = Cursor::new(buf);
1537        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1538
1539        match decoded {
1540            ChildMessage::StashPut {
1541                request_id,
1542                key,
1543                group,
1544                ..
1545            } => {
1546                assert_eq!(request_id, 60);
1547                assert_eq!(key, "old-key");
1548                assert_eq!(
1549                    group, None,
1550                    "missing group field from v0.3.0 worker should deserialize as None"
1551                );
1552            }
1553            other => panic!("expected StashPut, got: {:?}", other),
1554        }
1555
1556        // Also test StashGet, StashDelete, StashKeys backward compat
1557        let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1558        let mut buf = Vec::new();
1559        let payload = json_get.as_bytes();
1560        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1561        buf.extend_from_slice(payload);
1562        let mut cursor = Cursor::new(buf);
1563        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1564        match decoded {
1565            ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1566            other => panic!("expected StashGet, got: {:?}", other),
1567        }
1568
1569        let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1570        let mut buf = Vec::new();
1571        let payload = json_del.as_bytes();
1572        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1573        buf.extend_from_slice(payload);
1574        let mut cursor = Cursor::new(buf);
1575        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1576        match decoded {
1577            ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1578            other => panic!("expected StashDelete, got: {:?}", other),
1579        }
1580
1581        let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1582        let mut buf = Vec::new();
1583        let payload = json_keys.as_bytes();
1584        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1585        buf.extend_from_slice(payload);
1586        let mut cursor = Cursor::new(buf);
1587        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1588        match decoded {
1589            ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1590            other => panic!("expected StashKeys, got: {:?}", other),
1591        }
1592    }
1593
1594    // --- Phase 2: Structured Timeout + RawValue Tests ---
1595
1596    #[tokio::test]
1597    async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1598        let msg = ChildMessage::ExecutionComplete {
1599            result: Err("execution timed out after 5000ms".into()),
1600            error_kind: Some(ErrorKind::Timeout),
1601            timeout_ms: Some(5000),
1602        };
1603
1604        let mut buf = Vec::new();
1605        write_message(&mut buf, &msg).await.unwrap();
1606
1607        let mut cursor = Cursor::new(buf);
1608        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1609
1610        match decoded {
1611            ChildMessage::ExecutionComplete {
1612                result,
1613                error_kind,
1614                timeout_ms,
1615            } => {
1616                assert!(result.is_err());
1617                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1618                assert_eq!(timeout_ms, Some(5000));
1619            }
1620            other => panic!("expected ExecutionComplete, got: {:?}", other),
1621        }
1622    }
1623
1624    #[tokio::test]
1625    async fn ipc_t02_timeout_ms_absent_backward_compat() {
1626        // Simulate an old v0.3.0 worker that doesn't include timeout_ms
1627        let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1628        let mut buf = Vec::new();
1629        let payload = json.as_bytes();
1630        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1631        buf.extend_from_slice(payload);
1632
1633        let mut cursor = Cursor::new(buf);
1634        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1635
1636        match decoded {
1637            ChildMessage::ExecutionComplete {
1638                error_kind,
1639                timeout_ms,
1640                ..
1641            } => {
1642                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1643                assert_eq!(
1644                    timeout_ms, None,
1645                    "missing timeout_ms should default to None"
1646                );
1647            }
1648            other => panic!("expected ExecutionComplete, got: {:?}", other),
1649        }
1650    }
1651
1652    #[tokio::test]
1653    async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1654        let msg = ChildMessage::ExecutionComplete {
1655            result: Err("some error".into()),
1656            error_kind: Some(ErrorKind::JsError),
1657            timeout_ms: None,
1658        };
1659
1660        let json = serde_json::to_string(&msg).unwrap();
1661        assert!(
1662            !json.contains("timeout_ms"),
1663            "timeout_ms:None should be omitted: {json}"
1664        );
1665    }
1666
1667    #[tokio::test]
1668    async fn ipc_t04_timeout_ms_present_when_some() {
1669        let msg = ChildMessage::ExecutionComplete {
1670            result: Err("timed out".into()),
1671            error_kind: Some(ErrorKind::Timeout),
1672            timeout_ms: Some(10000),
1673        };
1674
1675        let json = serde_json::to_string(&msg).unwrap();
1676        assert!(
1677            json.contains("\"timeout_ms\":10000"),
1678            "timeout_ms should be present: {json}"
1679        );
1680    }
1681
1682    // --- RawValue passthrough tests ---
1683
1684    #[tokio::test]
1685    async fn ipc_rv01_write_raw_message_roundtrip() {
1686        let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1687
1688        let mut buf = Vec::new();
1689        write_raw_message(&mut buf, payload).await.unwrap();
1690
1691        let mut cursor = Cursor::new(buf);
1692        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1693            .await
1694            .unwrap()
1695            .unwrap();
1696
1697        assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1698    }
1699
1700    #[tokio::test]
1701    async fn ipc_rv02_read_raw_message_preserves_bytes() {
1702        // Write a regular message, then read it raw
1703        let msg = ChildMessage::Log {
1704            message: "test raw".into(),
1705        };
1706        let mut buf = Vec::new();
1707        write_message(&mut buf, &msg).await.unwrap();
1708
1709        let mut cursor = Cursor::new(buf);
1710        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1711            .await
1712            .unwrap()
1713            .unwrap();
1714
1715        // The raw value should be valid JSON
1716        let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1717        assert!(matches!(parsed, ChildMessage::Log { .. }));
1718    }
1719
1720    #[tokio::test]
1721    async fn ipc_rv03_raw_message_size_limit_enforced() {
1722        let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1723        let mut buf = Vec::new();
1724        write_raw_message(&mut buf, large_payload.as_bytes())
1725            .await
1726            .unwrap();
1727
1728        let mut cursor = Cursor::new(buf);
1729        let result = read_raw_message(&mut cursor, 64).await;
1730        assert!(result.is_err());
1731        let err = result.unwrap_err().to_string();
1732        assert!(err.contains("too large"), "error: {err}");
1733    }
1734
1735    #[tokio::test]
1736    async fn ipc_rv04_large_payload_stays_raw() {
1737        // 1MB payload — read as raw without full Value parse
1738        let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1739        let mut buf = Vec::new();
1740        write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1741
1742        let mut cursor = Cursor::new(buf);
1743        let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1744            .await
1745            .unwrap()
1746            .unwrap();
1747
1748        // Should preserve the raw JSON string without parsing into Value
1749        assert!(raw.get().len() > 1_000_000);
1750        // Can still be parsed if needed
1751        let val: Value = serde_json::from_str(raw.get()).unwrap();
1752        assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1753    }
1754
1755    #[tokio::test]
1756    async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1757        // Write as regular message (Value), read as raw
1758        let msg = ParentMessage::ToolCallResult {
1759            request_id: 99,
1760            result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1761        };
1762        let mut buf = Vec::new();
1763        write_message(&mut buf, &msg).await.unwrap();
1764
1765        let mut cursor = Cursor::new(buf.clone());
1766        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1767            .await
1768            .unwrap()
1769            .unwrap();
1770
1771        // Parse the raw back as ParentMessage
1772        let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1773        match parsed {
1774            ParentMessage::ToolCallResult { request_id, result } => {
1775                assert_eq!(request_id, 99);
1776                assert!(result.is_ok());
1777            }
1778            other => panic!("expected ToolCallResult, got: {:?}", other),
1779        }
1780    }
1781
1782    #[tokio::test]
1783    async fn ipc_rv06_raw_eof_returns_none() {
1784        let mut cursor = Cursor::new(Vec::<u8>::new());
1785        let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1786            .await
1787            .unwrap();
1788        assert!(result.is_none());
1789    }
1790
1791    #[tokio::test]
1792    async fn ipc_rv07_mixed_raw_and_value_messages() {
1793        let mut buf = Vec::new();
1794
1795        // Write a typed message
1796        let msg1 = ChildMessage::Log {
1797            message: "first".into(),
1798        };
1799        write_message(&mut buf, &msg1).await.unwrap();
1800
1801        // Write a raw message
1802        let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1803        write_raw_message(&mut buf, raw_payload).await.unwrap();
1804
1805        // Read both: first typed, second raw
1806        let mut cursor = Cursor::new(buf);
1807        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1808        assert!(matches!(d1, ChildMessage::Log { .. }));
1809
1810        let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1811            .await
1812            .unwrap()
1813            .unwrap();
1814        let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1815        assert!(matches!(parsed, ChildMessage::Log { .. }));
1816    }
1817
1818    // --- ToolError IPC round-trip tests ---
1819
1820    #[tokio::test]
1821    async fn tool_error_round_trips_through_ipc() {
1822        let original = forge_error::DispatchError::ToolError {
1823            server: "arbiter".into(),
1824            tool: "scan_target".into(),
1825            message: "tool returned error: Invalid params: missing field 'base_url'".into(),
1826        };
1827
1828        // Convert to IPC form
1829        let ipc_err = IpcDispatchError::from(&original);
1830        assert_eq!(ipc_err.code, "TOOL_ERROR");
1831        assert_eq!(ipc_err.server, Some("arbiter".into()));
1832        assert_eq!(ipc_err.tool, Some("scan_target".into()));
1833        assert!(ipc_err.message.contains("Invalid params"));
1834
1835        // Round-trip through serialization
1836        let json = serde_json::to_string(&ipc_err).unwrap();
1837        let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
1838
1839        // Reconstruct DispatchError
1840        let reconstructed = deserialized.to_dispatch_error();
1841        assert!(matches!(
1842            reconstructed,
1843            forge_error::DispatchError::ToolError {
1844                ref server,
1845                ref tool,
1846                ..
1847            } if server == "arbiter" && tool == "scan_target"
1848        ));
1849        assert!(!reconstructed.trips_circuit_breaker());
1850        assert_eq!(reconstructed.code(), "TOOL_ERROR");
1851    }
1852
1853    #[tokio::test]
1854    async fn tool_error_ipc_message_roundtrip() {
1855        let msg = ParentMessage::ToolCallResult {
1856            request_id: 42,
1857            result: Err(IpcDispatchError::from(
1858                &forge_error::DispatchError::ToolError {
1859                    server: "arbiter".into(),
1860                    tool: "scan".into(),
1861                    message: "bad params".into(),
1862                },
1863            )),
1864        };
1865
1866        let mut buf = Vec::new();
1867        write_message(&mut buf, &msg).await.unwrap();
1868
1869        let mut cursor = Cursor::new(buf);
1870        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1871
1872        match decoded {
1873            ParentMessage::ToolCallResult { request_id, result } => {
1874                assert_eq!(request_id, 42);
1875                let err = result.unwrap_err();
1876                assert_eq!(err.code, "TOOL_ERROR");
1877                assert_eq!(err.server, Some("arbiter".into()));
1878                assert_eq!(err.tool, Some("scan".into()));
1879
1880                // Verify it reconstructs to the correct variant
1881                let dispatch_err = err.to_dispatch_error();
1882                assert!(!dispatch_err.trips_circuit_breaker());
1883            }
1884            other => panic!("expected ToolCallResult, got: {:?}", other),
1885        }
1886    }
1887}