Skip to main content

forge_sandbox/
ipc.rs

1//! IPC protocol for parent ↔ worker communication.
2//!
3//! Uses length-delimited JSON messages: 4-byte big-endian length prefix + JSON payload.
4//! All messages are typed via [`ParentMessage`] and [`ChildMessage`] enums.
5
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12/// Error classification for structured error preservation across IPC.
13///
14/// When errors cross the process boundary via `ExecutionComplete`, the typed
15/// `SandboxError` variants are converted to strings. This enum preserves the
16/// error kind so the parent can reconstruct the correct `SandboxError` variant.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21    /// V8 execution timed out (CPU watchdog or async event loop).
22    Timeout,
23    /// V8 heap memory limit was exceeded.
24    HeapLimit,
25    /// A JavaScript error was thrown.
26    JsError,
27    /// Generic execution failure.
28    Execution,
29}
30
31/// Structured dispatch error for IPC transport.
32///
33/// Preserves [`forge_error::DispatchError`] variant information across the
34/// parent ↔ worker process boundary. Without this, all errors would be
35/// flattened to `DispatchError::Internal` in ChildProcess mode, losing
36/// error codes like `SERVER_NOT_FOUND` and fuzzy match suggestions.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39    /// Error code matching `DispatchError::code()` output.
40    pub code: String,
41    /// Human-readable error message.
42    pub message: String,
43    /// Server name (for ServerNotFound, ToolNotFound, ToolError, Timeout, CircuitOpen, Upstream).
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub server: Option<String>,
46    /// Tool name (for ToolNotFound, ToolError).
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub tool: Option<String>,
49    /// Timeout in milliseconds (for Timeout).
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55    /// Create from a plain string error message (used for non-dispatch errors
56    /// like "stash dispatcher not available" or URI validation failures).
57    pub fn from_string(msg: String) -> Self {
58        Self {
59            code: "INTERNAL".to_string(),
60            message: msg,
61            server: None,
62            tool: None,
63            timeout_ms: None,
64        }
65    }
66
67    /// Reconstruct the appropriate [`forge_error::DispatchError`] variant.
68    pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69        match self.code.as_str() {
70            "SERVER_NOT_FOUND" => {
71                forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72            }
73            "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74                server: self.server.unwrap_or_default(),
75                tool: self.tool.unwrap_or_default(),
76            },
77            "TIMEOUT" => forge_error::DispatchError::Timeout {
78                server: self.server.unwrap_or_default(),
79                timeout_ms: self.timeout_ms.unwrap_or(0),
80            },
81            "CIRCUIT_OPEN" => {
82                forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83            }
84            "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85                reason: self.message,
86            },
87            "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88                server: self.server.unwrap_or_default(),
89                message: self.message,
90            },
91            "TRANSPORT_DEAD" => forge_error::DispatchError::TransportDead {
92                server: self.server.unwrap_or_default(),
93                reason: self.message,
94            },
95            "TOOL_ERROR" => forge_error::DispatchError::ToolError {
96                server: self.server.unwrap_or_default(),
97                tool: self.tool.unwrap_or_default(),
98                message: self.message,
99            },
100            "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
101            _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
102        }
103    }
104}
105
106impl From<&forge_error::DispatchError> for IpcDispatchError {
107    fn from(e: &forge_error::DispatchError) -> Self {
108        let (server, tool, timeout_ms) = match e {
109            forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
110            forge_error::DispatchError::ToolNotFound { server, tool } => {
111                (Some(server.clone()), Some(tool.clone()), None)
112            }
113            forge_error::DispatchError::Timeout {
114                server, timeout_ms, ..
115            } => (Some(server.clone()), None, Some(*timeout_ms)),
116            forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
117            forge_error::DispatchError::Upstream { server, .. } => {
118                (Some(server.clone()), None, None)
119            }
120            forge_error::DispatchError::TransportDead { server, .. } => {
121                (Some(server.clone()), None, None)
122            }
123            forge_error::DispatchError::ToolError { server, tool, .. } => {
124                (Some(server.clone()), Some(tool.clone()), None)
125            }
126            _ => (None, None, None),
127        };
128
129        Self {
130            code: e.code().to_string(),
131            message: e.to_string(),
132            server,
133            tool,
134            timeout_ms,
135        }
136    }
137}
138
139/// Messages sent from the parent process to the worker child.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "type")]
142#[non_exhaustive]
143pub enum ParentMessage {
144    /// Initial message: execute this code in the sandbox.
145    Execute {
146        /// The JavaScript async arrow function to execute.
147        code: String,
148        /// Optional capability manifest (for search mode — not used in child process execute).
149        manifest: Option<Value>,
150        /// Worker configuration.
151        config: WorkerConfig,
152    },
153    /// Response to a tool call request from the child.
154    ToolCallResult {
155        /// Matches the request_id from ChildMessage::ToolCallRequest.
156        request_id: u64,
157        /// The tool call result, or a structured dispatch error.
158        result: Result<Value, IpcDispatchError>,
159    },
160    /// Response to a resource read request from the child.
161    ResourceReadResult {
162        /// Matches the request_id from ChildMessage::ResourceReadRequest.
163        request_id: u64,
164        /// The resource content, or a structured dispatch error.
165        result: Result<Value, IpcDispatchError>,
166    },
167    /// Reset the worker for a new execution (pool mode).
168    ///
169    /// The worker drops its current JsRuntime and creates a fresh one.
170    /// It responds with [`ChildMessage::ResetComplete`].
171    Reset {
172        /// New worker configuration for the next execution.
173        config: WorkerConfig,
174    },
175    /// Response to a stash operation from the child.
176    StashResult {
177        /// Matches the request_id from the stash request.
178        request_id: u64,
179        /// The stash operation result, or a structured dispatch error.
180        result: Result<Value, IpcDispatchError>,
181    },
182}
183
184/// Messages sent from the worker child to the parent process.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186#[serde(tag = "type")]
187#[non_exhaustive]
188pub enum ChildMessage {
189    /// Request the parent to dispatch a tool call.
190    ToolCallRequest {
191        /// Unique ID for correlating request ↔ response.
192        request_id: u64,
193        /// Target server name.
194        server: String,
195        /// Tool identifier.
196        tool: String,
197        /// Tool arguments.
198        args: Value,
199    },
200    /// Request the parent to read a resource.
201    ResourceReadRequest {
202        /// Unique ID for correlating request ↔ response.
203        request_id: u64,
204        /// Target server name.
205        server: String,
206        /// Resource URI.
207        uri: String,
208    },
209    /// Request the parent to put a value in the stash.
210    StashPut {
211        /// Unique ID for correlating request ↔ response.
212        request_id: u64,
213        /// Stash key.
214        key: String,
215        /// Value to store.
216        value: Value,
217        /// TTL in seconds (None = use default).
218        ttl_secs: Option<u32>,
219        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
220        #[serde(default, skip_serializing_if = "Option::is_none")]
221        group: Option<String>,
222    },
223    /// Request the parent to get a value from the stash.
224    StashGet {
225        /// Unique ID for correlating request ↔ response.
226        request_id: u64,
227        /// Stash key.
228        key: String,
229        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
230        #[serde(default, skip_serializing_if = "Option::is_none")]
231        group: Option<String>,
232    },
233    /// Request the parent to delete a value from the stash.
234    StashDelete {
235        /// Unique ID for correlating request ↔ response.
236        request_id: u64,
237        /// Stash key.
238        key: String,
239        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
240        #[serde(default, skip_serializing_if = "Option::is_none")]
241        group: Option<String>,
242    },
243    /// Request the parent to list stash keys.
244    StashKeys {
245        /// Unique ID for correlating request ↔ response.
246        request_id: u64,
247        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
248        #[serde(default, skip_serializing_if = "Option::is_none")]
249        group: Option<String>,
250    },
251    /// Worker has been reset and is ready for a new execution.
252    ResetComplete,
253    /// The execution has finished.
254    ExecutionComplete {
255        /// The result value, or an error message.
256        result: Result<Value, String>,
257        /// Classification of the error for typed reconstruction on the parent side.
258        /// Present only when `result` is `Err`. Defaults to `JsError` if absent
259        /// (backward compatibility with workers that don't send this field).
260        #[serde(default, skip_serializing_if = "Option::is_none")]
261        error_kind: Option<ErrorKind>,
262        /// Structured timeout value in milliseconds (v0.3.1+).
263        /// Present only when `error_kind` is `Timeout`. Replaces fragile string parsing.
264        /// Absent in v0.3.0 workers → None.
265        #[serde(default, skip_serializing_if = "Option::is_none")]
266        timeout_ms: Option<u64>,
267    },
268    /// A log message from the worker.
269    Log {
270        /// The log message text.
271        message: String,
272    },
273}
274
275/// Configuration passed to the worker process.
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct WorkerConfig {
278    /// Maximum execution time.
279    pub timeout_ms: u64,
280    /// V8 heap limit in bytes.
281    pub max_heap_size: usize,
282    /// Maximum tool calls per execution.
283    pub max_tool_calls: usize,
284    /// Maximum size of tool call arguments in bytes.
285    pub max_tool_call_args_size: usize,
286    /// Maximum size of the JSON result in bytes.
287    pub max_output_size: usize,
288    /// Maximum size of LLM-generated code in bytes.
289    pub max_code_size: usize,
290    /// Maximum IPC message size in bytes. Defaults to [`DEFAULT_MAX_IPC_MESSAGE_SIZE`].
291    #[serde(default = "default_max_ipc_message_size")]
292    pub max_ipc_message_size: usize,
293    /// Maximum resource content size in bytes.
294    #[serde(default = "default_max_resource_size")]
295    pub max_resource_size: usize,
296    /// Maximum concurrent calls in forge.parallel().
297    #[serde(default = "default_max_parallel")]
298    pub max_parallel: usize,
299    /// Known tools for structured error fuzzy matching (v0.3.1+).
300    /// Each entry is `(server_name, tool_name)`.
301    #[serde(default, skip_serializing_if = "Option::is_none")]
302    pub known_tools: Option<Vec<(String, String)>>,
303    /// Known server names for structured error detection (v0.3.1+).
304    #[serde(default, skip_serializing_if = "Option::is_none")]
305    pub known_servers: Option<std::collections::HashSet<String>>,
306}
307
308fn default_max_ipc_message_size() -> usize {
309    DEFAULT_MAX_IPC_MESSAGE_SIZE
310}
311
312fn default_max_resource_size() -> usize {
313    64 * 1024 * 1024 // 64 MB
314}
315
316fn default_max_parallel() -> usize {
317    8
318}
319
320impl From<&crate::SandboxConfig> for WorkerConfig {
321    fn from(config: &crate::SandboxConfig) -> Self {
322        Self {
323            timeout_ms: config.timeout.as_millis() as u64,
324            max_heap_size: config.max_heap_size,
325            max_tool_calls: config.max_tool_calls,
326            max_tool_call_args_size: config.max_tool_call_args_size,
327            max_output_size: config.max_output_size,
328            max_code_size: config.max_code_size,
329            max_ipc_message_size: config.max_ipc_message_size,
330            max_resource_size: config.max_resource_size,
331            max_parallel: config.max_parallel,
332            known_tools: None,
333            known_servers: None,
334        }
335    }
336}
337
338impl WorkerConfig {
339    /// Convert back to a SandboxConfig for use in the worker.
340    pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
341        crate::SandboxConfig {
342            timeout: Duration::from_millis(self.timeout_ms),
343            max_code_size: self.max_code_size,
344            max_output_size: self.max_output_size,
345            max_heap_size: self.max_heap_size,
346            max_concurrent: 1, // worker handles one execution
347            max_tool_calls: self.max_tool_calls,
348            max_tool_call_args_size: self.max_tool_call_args_size,
349            execution_mode: crate::executor::ExecutionMode::InProcess, // worker always runs in-process
350            max_resource_size: self.max_resource_size,
351            max_parallel: self.max_parallel,
352            max_ipc_message_size: self.max_ipc_message_size,
353        }
354    }
355}
356
357/// Write a length-delimited JSON message to an async writer.
358///
359/// Format: 4-byte big-endian length prefix followed by the JSON payload bytes.
360pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
361    writer: &mut W,
362    msg: &T,
363) -> Result<(), std::io::Error> {
364    let payload = serde_json::to_vec(msg)
365        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
366    let len = u32::try_from(payload.len()).map_err(|_| {
367        std::io::Error::new(
368            std::io::ErrorKind::InvalidData,
369            format!(
370                "IPC payload too large: {} bytes (max {} bytes)",
371                payload.len(),
372                u32::MAX
373            ),
374        )
375    })?;
376    writer.write_all(&len.to_be_bytes()).await?;
377    writer.write_all(&payload).await?;
378    writer.flush().await?;
379    Ok(())
380}
381
382/// Write a raw JSON byte payload as a length-delimited IPC message.
383///
384/// This bypasses serialization entirely — useful for forwarding large
385/// tool/resource results without deserializing and re-serializing.
386pub async fn write_raw_message<W: AsyncWrite + Unpin>(
387    writer: &mut W,
388    payload: &[u8],
389) -> Result<(), std::io::Error> {
390    let len = u32::try_from(payload.len()).map_err(|_| {
391        std::io::Error::new(
392            std::io::ErrorKind::InvalidData,
393            format!(
394                "raw IPC payload too large: {} bytes (max {} bytes)",
395                payload.len(),
396                u32::MAX
397            ),
398        )
399    })?;
400    writer.write_all(&len.to_be_bytes()).await?;
401    writer.write_all(payload).await?;
402    writer.flush().await?;
403    Ok(())
404}
405
406/// Read a raw JSON byte payload from an IPC message without deserializing.
407///
408/// Returns the raw bytes as an owned `Box<RawValue>` which can be forwarded
409/// without parsing. Returns `None` on EOF.
410pub async fn read_raw_message<R: AsyncRead + Unpin>(
411    reader: &mut R,
412    max_size: usize,
413) -> Result<Option<Box<RawValue>>, std::io::Error> {
414    let mut len_buf = [0u8; 4];
415    match reader.read_exact(&mut len_buf).await {
416        Ok(_) => {}
417        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
418        Err(e) => return Err(e),
419    }
420
421    let len = u32::from_be_bytes(len_buf) as usize;
422
423    if len > max_size {
424        return Err(std::io::Error::new(
425            std::io::ErrorKind::InvalidData,
426            format!(
427                "raw IPC message too large: {} bytes (limit: {} bytes)",
428                len, max_size
429            ),
430        ));
431    }
432
433    let mut payload = vec![0u8; len];
434    reader.read_exact(&mut payload).await?;
435
436    let raw: Box<RawValue> = serde_json::from_slice(&payload)
437        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
438    Ok(Some(raw))
439}
440
441/// Default maximum IPC message size: 8 MB.
442///
443/// Reduced from 64 MB to prevent single messages from causing memory pressure.
444/// Configurable via `sandbox.max_ipc_message_size_mb` in config.
445pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
446
447/// Read a length-delimited JSON message from an async reader.
448///
449/// Returns `None` if the reader has reached EOF (clean shutdown).
450/// Uses [`DEFAULT_MAX_IPC_MESSAGE_SIZE`] as the size limit.
451pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
452    reader: &mut R,
453) -> Result<Option<T>, std::io::Error> {
454    read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
455}
456
457/// Read a length-delimited JSON message with a configurable size limit.
458///
459/// Returns `None` if the reader has reached EOF (clean shutdown).
460/// The `max_size` parameter controls the maximum allowed message size in bytes.
461pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
462    reader: &mut R,
463    max_size: usize,
464) -> Result<Option<T>, std::io::Error> {
465    let mut len_buf = [0u8; 4];
466    match reader.read_exact(&mut len_buf).await {
467        Ok(_) => {}
468        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
469        Err(e) => return Err(e),
470    }
471
472    let len = u32::from_be_bytes(len_buf) as usize;
473
474    // Reject messages exceeding the configured limit
475    if len > max_size {
476        return Err(std::io::Error::new(
477            std::io::ErrorKind::InvalidData,
478            format!(
479                "IPC message too large: {} bytes (limit: {} bytes)",
480                len, max_size
481            ),
482        ));
483    }
484
485    let mut payload = vec![0u8; len];
486    reader.read_exact(&mut payload).await?;
487
488    let msg: T = serde_json::from_slice(&payload)
489        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
490    Ok(Some(msg))
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use std::io::Cursor;
497
498    #[tokio::test]
499    async fn roundtrip_parent_execute_message() {
500        let msg = ParentMessage::Execute {
501            code: "async () => { return 42; }".into(),
502            manifest: Some(serde_json::json!({"servers": []})),
503            config: WorkerConfig {
504                timeout_ms: 5000,
505                max_heap_size: 64 * 1024 * 1024,
506                max_tool_calls: 50,
507                max_tool_call_args_size: 1024 * 1024,
508                max_output_size: 1024 * 1024,
509                max_code_size: 64 * 1024,
510                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
511                max_resource_size: 64 * 1024 * 1024,
512                max_parallel: 8,
513                known_tools: None,
514                known_servers: None,
515            },
516        };
517
518        let mut buf = Vec::new();
519        write_message(&mut buf, &msg).await.unwrap();
520
521        let mut cursor = Cursor::new(buf);
522        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
523
524        match decoded {
525            ParentMessage::Execute {
526                code,
527                manifest,
528                config,
529            } => {
530                assert_eq!(code, "async () => { return 42; }");
531                assert!(manifest.is_some());
532                assert_eq!(config.timeout_ms, 5000);
533            }
534            other => panic!("expected Execute, got: {:?}", other),
535        }
536    }
537
538    #[tokio::test]
539    async fn roundtrip_parent_tool_result() {
540        let msg = ParentMessage::ToolCallResult {
541            request_id: 42,
542            result: Ok(serde_json::json!({"status": "ok"})),
543        };
544
545        let mut buf = Vec::new();
546        write_message(&mut buf, &msg).await.unwrap();
547
548        let mut cursor = Cursor::new(buf);
549        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
550
551        match decoded {
552            ParentMessage::ToolCallResult { request_id, result } => {
553                assert_eq!(request_id, 42);
554                assert!(result.is_ok());
555            }
556            other => panic!("expected ToolCallResult, got: {:?}", other),
557        }
558    }
559
560    #[tokio::test]
561    async fn roundtrip_parent_tool_result_error() {
562        let msg = ParentMessage::ToolCallResult {
563            request_id: 7,
564            result: Err(IpcDispatchError::from_string("connection refused".into())),
565        };
566
567        let mut buf = Vec::new();
568        write_message(&mut buf, &msg).await.unwrap();
569
570        let mut cursor = Cursor::new(buf);
571        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
572
573        match decoded {
574            ParentMessage::ToolCallResult { request_id, result } => {
575                assert_eq!(request_id, 7);
576                let err = result.unwrap_err();
577                assert_eq!(err.message, "connection refused");
578                assert_eq!(err.code, "INTERNAL");
579            }
580            other => panic!("expected ToolCallResult, got: {:?}", other),
581        }
582    }
583
584    #[tokio::test]
585    async fn roundtrip_child_tool_request() {
586        let msg = ChildMessage::ToolCallRequest {
587            request_id: 1,
588            server: "narsil".into(),
589            tool: "ast.parse".into(),
590            args: serde_json::json!({"file": "test.rs"}),
591        };
592
593        let mut buf = Vec::new();
594        write_message(&mut buf, &msg).await.unwrap();
595
596        let mut cursor = Cursor::new(buf);
597        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
598
599        match decoded {
600            ChildMessage::ToolCallRequest {
601                request_id,
602                server,
603                tool,
604                args,
605            } => {
606                assert_eq!(request_id, 1);
607                assert_eq!(server, "narsil");
608                assert_eq!(tool, "ast.parse");
609                assert_eq!(args["file"], "test.rs");
610            }
611            other => panic!("expected ToolCallRequest, got: {:?}", other),
612        }
613    }
614
615    #[tokio::test]
616    async fn roundtrip_child_execution_complete() {
617        let msg = ChildMessage::ExecutionComplete {
618            result: Ok(serde_json::json!([1, 2, 3])),
619            error_kind: None,
620            timeout_ms: None,
621        };
622
623        let mut buf = Vec::new();
624        write_message(&mut buf, &msg).await.unwrap();
625
626        let mut cursor = Cursor::new(buf);
627        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
628
629        match decoded {
630            ChildMessage::ExecutionComplete {
631                result, error_kind, ..
632            } => {
633                assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
634                assert_eq!(error_kind, None);
635            }
636            other => panic!("expected ExecutionComplete, got: {:?}", other),
637        }
638    }
639
640    #[tokio::test]
641    async fn roundtrip_child_log() {
642        let msg = ChildMessage::Log {
643            message: "processing step 3".into(),
644        };
645
646        let mut buf = Vec::new();
647        write_message(&mut buf, &msg).await.unwrap();
648
649        let mut cursor = Cursor::new(buf);
650        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
651
652        match decoded {
653            ChildMessage::Log { message } => {
654                assert_eq!(message, "processing step 3");
655            }
656            other => panic!("expected Log, got: {:?}", other),
657        }
658    }
659
660    #[tokio::test]
661    async fn multiple_messages_in_stream() {
662        let msg1 = ChildMessage::Log {
663            message: "first".into(),
664        };
665        let msg2 = ChildMessage::ToolCallRequest {
666            request_id: 1,
667            server: "s".into(),
668            tool: "t".into(),
669            args: serde_json::json!({}),
670        };
671        let msg3 = ChildMessage::ExecutionComplete {
672            result: Ok(serde_json::json!("done")),
673            error_kind: None,
674            timeout_ms: None,
675        };
676
677        let mut buf = Vec::new();
678        write_message(&mut buf, &msg1).await.unwrap();
679        write_message(&mut buf, &msg2).await.unwrap();
680        write_message(&mut buf, &msg3).await.unwrap();
681
682        let mut cursor = Cursor::new(buf);
683        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
684        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
685        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
686
687        assert!(matches!(d1, ChildMessage::Log { .. }));
688        assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
689        assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
690
691        // EOF after all messages
692        let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
693        assert!(d4.is_none());
694    }
695
696    #[tokio::test]
697    async fn execution_complete_error_roundtrip() {
698        let msg = ChildMessage::ExecutionComplete {
699            result: Err("failed to create tokio runtime: resource unavailable".into()),
700            error_kind: Some(ErrorKind::Execution),
701            timeout_ms: None,
702        };
703
704        let mut buf = Vec::new();
705        write_message(&mut buf, &msg).await.unwrap();
706
707        let mut cursor = Cursor::new(buf);
708        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
709
710        match decoded {
711            ChildMessage::ExecutionComplete {
712                result, error_kind, ..
713            } => {
714                let err = result.unwrap_err();
715                assert!(
716                    err.contains("tokio runtime"),
717                    "expected runtime error: {err}"
718                );
719                assert_eq!(error_kind, Some(ErrorKind::Execution));
720            }
721            other => panic!("expected ExecutionComplete, got: {:?}", other),
722        }
723    }
724
725    #[tokio::test]
726    async fn eof_returns_none() {
727        let mut cursor = Cursor::new(Vec::<u8>::new());
728        let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
729        assert!(result.is_none());
730    }
731
732    #[test]
733    fn u32_try_from_overflow() {
734        // Validates that the conversion logic correctly rejects sizes > u32::MAX
735        let overflow_size = u32::MAX as usize + 1;
736        assert!(u32::try_from(overflow_size).is_err());
737    }
738
739    #[tokio::test]
740    async fn write_message_normal_size_succeeds() {
741        // Regression guard: normal-sized messages still work after the try_from change
742        let msg = ChildMessage::Log {
743            message: "a".repeat(1024),
744        };
745        let mut buf = Vec::new();
746        write_message(&mut buf, &msg).await.unwrap();
747        assert!(buf.len() > 1024);
748    }
749
750    #[tokio::test]
751    async fn large_message_roundtrip() {
752        // A large payload (~1MB of data)
753        let large_data = "x".repeat(1_000_000);
754        let msg = ChildMessage::ExecutionComplete {
755            result: Ok(serde_json::json!(large_data)),
756            error_kind: None,
757            timeout_ms: None,
758        };
759
760        let mut buf = Vec::new();
761        write_message(&mut buf, &msg).await.unwrap();
762
763        let mut cursor = Cursor::new(buf);
764        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
765
766        match decoded {
767            ChildMessage::ExecutionComplete { result, .. } => {
768                assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
769            }
770            other => panic!("expected ExecutionComplete, got: {:?}", other),
771        }
772    }
773
774    #[tokio::test]
775    async fn worker_config_roundtrip_from_sandbox_config() {
776        let sandbox = crate::SandboxConfig::default();
777        let worker = WorkerConfig::from(&sandbox);
778        let back = worker.to_sandbox_config();
779
780        assert_eq!(sandbox.timeout, back.timeout);
781        assert_eq!(sandbox.max_heap_size, back.max_heap_size);
782        assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
783        assert_eq!(sandbox.max_output_size, back.max_output_size);
784        assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
785        assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); // 8 MB default
786    }
787
788    #[tokio::test]
789    async fn read_message_with_limit_rejects_oversized() {
790        let msg = ChildMessage::Log {
791            message: "x".repeat(1024),
792        };
793        let mut buf = Vec::new();
794        write_message(&mut buf, &msg).await.unwrap();
795
796        // Set limit smaller than the message payload
797        let mut cursor = Cursor::new(buf);
798        let result: Result<Option<ChildMessage>, _> =
799            read_message_with_limit(&mut cursor, 64).await;
800        assert!(result.is_err());
801        let err_msg = result.unwrap_err().to_string();
802        assert!(err_msg.contains("too large"), "error: {err_msg}");
803    }
804
805    #[tokio::test]
806    async fn read_message_with_limit_accepts_within_limit() {
807        let msg = ChildMessage::Log {
808            message: "hello".into(),
809        };
810        let mut buf = Vec::new();
811        write_message(&mut buf, &msg).await.unwrap();
812
813        let mut cursor = Cursor::new(buf);
814        let result: Option<ChildMessage> =
815            read_message_with_limit(&mut cursor, 1024).await.unwrap();
816        assert!(result.is_some());
817    }
818
819    #[tokio::test]
820    async fn worker_config_ipc_limit_serde_default() {
821        // Deserializing JSON without max_ipc_message_size should use the default
822        let json = r#"{
823            "timeout_ms": 5000,
824            "max_heap_size": 67108864,
825            "max_tool_calls": 50,
826            "max_tool_call_args_size": 1048576,
827            "max_output_size": 1048576,
828            "max_code_size": 65536
829        }"#;
830        let config: WorkerConfig = serde_json::from_str(json).unwrap();
831        assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
832    }
833
834    // --- IPC-01: ResourceReadRequest round-trip ---
835    #[tokio::test]
836    async fn ipc_01_resource_read_request_roundtrip() {
837        let msg = ChildMessage::ResourceReadRequest {
838            request_id: 10,
839            server: "postgres".into(),
840            uri: "file:///logs/app.log".into(),
841        };
842
843        let mut buf = Vec::new();
844        write_message(&mut buf, &msg).await.unwrap();
845
846        let mut cursor = Cursor::new(buf);
847        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
848
849        match decoded {
850            ChildMessage::ResourceReadRequest {
851                request_id,
852                server,
853                uri,
854            } => {
855                assert_eq!(request_id, 10);
856                assert_eq!(server, "postgres");
857                assert_eq!(uri, "file:///logs/app.log");
858            }
859            other => panic!("expected ResourceReadRequest, got: {:?}", other),
860        }
861    }
862
863    // --- IPC-02: ResourceReadResult (success) round-trip ---
864    #[tokio::test]
865    async fn ipc_02_resource_read_result_success_roundtrip() {
866        let msg = ParentMessage::ResourceReadResult {
867            request_id: 11,
868            result: Ok(serde_json::json!({"content": "log data here"})),
869        };
870
871        let mut buf = Vec::new();
872        write_message(&mut buf, &msg).await.unwrap();
873
874        let mut cursor = Cursor::new(buf);
875        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
876
877        match decoded {
878            ParentMessage::ResourceReadResult { request_id, result } => {
879                assert_eq!(request_id, 11);
880                let val = result.unwrap();
881                assert_eq!(val["content"], "log data here");
882            }
883            other => panic!("expected ResourceReadResult, got: {:?}", other),
884        }
885    }
886
887    // --- IPC-03: ResourceReadResult (error) round-trip ---
888    #[tokio::test]
889    async fn ipc_03_resource_read_result_error_roundtrip() {
890        let msg = ParentMessage::ResourceReadResult {
891            request_id: 12,
892            result: Err(IpcDispatchError::from_string("resource not found".into())),
893        };
894
895        let mut buf = Vec::new();
896        write_message(&mut buf, &msg).await.unwrap();
897
898        let mut cursor = Cursor::new(buf);
899        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
900
901        match decoded {
902            ParentMessage::ResourceReadResult { request_id, result } => {
903                assert_eq!(request_id, 12);
904                let err = result.unwrap_err();
905                assert_eq!(err.message, "resource not found");
906                assert_eq!(err.code, "INTERNAL");
907            }
908            other => panic!("expected ResourceReadResult, got: {:?}", other),
909        }
910    }
911
912    // --- IPC-04: StashPut round-trip ---
913    #[tokio::test]
914    async fn ipc_04_stash_put_roundtrip() {
915        let msg = ChildMessage::StashPut {
916            request_id: 20,
917            key: "my-key".into(),
918            value: serde_json::json!({"data": 42}),
919            ttl_secs: Some(60),
920            group: None,
921        };
922
923        let mut buf = Vec::new();
924        write_message(&mut buf, &msg).await.unwrap();
925
926        let mut cursor = Cursor::new(buf);
927        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
928
929        match decoded {
930            ChildMessage::StashPut {
931                request_id,
932                key,
933                value,
934                ttl_secs,
935                group,
936            } => {
937                assert_eq!(request_id, 20);
938                assert_eq!(key, "my-key");
939                assert_eq!(value["data"], 42);
940                assert_eq!(ttl_secs, Some(60));
941                assert_eq!(group, None);
942            }
943            other => panic!("expected StashPut, got: {:?}", other),
944        }
945    }
946
947    // --- IPC-05: StashGet round-trip ---
948    #[tokio::test]
949    async fn ipc_05_stash_get_roundtrip() {
950        let msg = ChildMessage::StashGet {
951            request_id: 21,
952            key: "lookup-key".into(),
953            group: None,
954        };
955
956        let mut buf = Vec::new();
957        write_message(&mut buf, &msg).await.unwrap();
958
959        let mut cursor = Cursor::new(buf);
960        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
961
962        match decoded {
963            ChildMessage::StashGet {
964                request_id,
965                key,
966                group,
967            } => {
968                assert_eq!(request_id, 21);
969                assert_eq!(key, "lookup-key");
970                assert_eq!(group, None);
971            }
972            other => panic!("expected StashGet, got: {:?}", other),
973        }
974    }
975
976    // --- IPC-06: StashDelete round-trip ---
977    #[tokio::test]
978    async fn ipc_06_stash_delete_roundtrip() {
979        let msg = ChildMessage::StashDelete {
980            request_id: 22,
981            key: "delete-me".into(),
982            group: None,
983        };
984
985        let mut buf = Vec::new();
986        write_message(&mut buf, &msg).await.unwrap();
987
988        let mut cursor = Cursor::new(buf);
989        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
990
991        match decoded {
992            ChildMessage::StashDelete {
993                request_id,
994                key,
995                group,
996            } => {
997                assert_eq!(request_id, 22);
998                assert_eq!(key, "delete-me");
999                assert_eq!(group, None);
1000            }
1001            other => panic!("expected StashDelete, got: {:?}", other),
1002        }
1003    }
1004
1005    // --- IPC-07: StashKeys round-trip ---
1006    #[tokio::test]
1007    async fn ipc_07_stash_keys_roundtrip() {
1008        let msg = ChildMessage::StashKeys {
1009            request_id: 23,
1010            group: None,
1011        };
1012
1013        let mut buf = Vec::new();
1014        write_message(&mut buf, &msg).await.unwrap();
1015
1016        let mut cursor = Cursor::new(buf);
1017        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1018
1019        match decoded {
1020            ChildMessage::StashKeys { request_id, group } => {
1021                assert_eq!(request_id, 23);
1022                assert_eq!(group, None);
1023            }
1024            other => panic!("expected StashKeys, got: {:?}", other),
1025        }
1026    }
1027
1028    // --- IPC-08: StashResult round-trip ---
1029    #[tokio::test]
1030    async fn ipc_08_stash_result_roundtrip() {
1031        let msg = ParentMessage::StashResult {
1032            request_id: 24,
1033            result: Ok(serde_json::json!({"ok": true})),
1034        };
1035
1036        let mut buf = Vec::new();
1037        write_message(&mut buf, &msg).await.unwrap();
1038
1039        let mut cursor = Cursor::new(buf);
1040        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1041
1042        match decoded {
1043            ParentMessage::StashResult { request_id, result } => {
1044                assert_eq!(request_id, 24);
1045                assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1046            }
1047            other => panic!("expected StashResult, got: {:?}", other),
1048        }
1049    }
1050
1051    // --- IPC-09: Mixed message interleaving (tool + resource + stash in single stream) ---
1052    #[tokio::test]
1053    async fn ipc_09_mixed_message_interleaving() {
1054        let msg1 = ChildMessage::ToolCallRequest {
1055            request_id: 1,
1056            server: "s".into(),
1057            tool: "t".into(),
1058            args: serde_json::json!({}),
1059        };
1060        let msg2 = ChildMessage::ResourceReadRequest {
1061            request_id: 2,
1062            server: "pg".into(),
1063            uri: "file:///data".into(),
1064        };
1065        let msg3 = ChildMessage::StashPut {
1066            request_id: 3,
1067            key: "k".into(),
1068            value: serde_json::json!("v"),
1069            ttl_secs: None,
1070            group: None,
1071        };
1072        let msg4 = ChildMessage::StashGet {
1073            request_id: 4,
1074            key: "k".into(),
1075            group: None,
1076        };
1077        let msg5 = ChildMessage::ExecutionComplete {
1078            result: Ok(serde_json::json!("done")),
1079            error_kind: None,
1080            timeout_ms: None,
1081        };
1082
1083        let mut buf = Vec::new();
1084        write_message(&mut buf, &msg1).await.unwrap();
1085        write_message(&mut buf, &msg2).await.unwrap();
1086        write_message(&mut buf, &msg3).await.unwrap();
1087        write_message(&mut buf, &msg4).await.unwrap();
1088        write_message(&mut buf, &msg5).await.unwrap();
1089
1090        let mut cursor = Cursor::new(buf);
1091        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1092        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1093        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1094        let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1095        let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1096
1097        assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1098        assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1099        assert!(matches!(d3, ChildMessage::StashPut { .. }));
1100        assert!(matches!(d4, ChildMessage::StashGet { .. }));
1101        assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1102
1103        // EOF after all messages
1104        let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1105        assert!(d6.is_none());
1106    }
1107
1108    // --- IPC-P01: Reset round-trip ---
1109    #[tokio::test]
1110    async fn ipc_p01_reset_roundtrip() {
1111        let msg = ParentMessage::Reset {
1112            config: WorkerConfig {
1113                timeout_ms: 3000,
1114                max_heap_size: 32 * 1024 * 1024,
1115                max_tool_calls: 25,
1116                max_tool_call_args_size: 512 * 1024,
1117                max_output_size: 512 * 1024,
1118                max_code_size: 32 * 1024,
1119                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1120                max_resource_size: 32 * 1024 * 1024,
1121                max_parallel: 4,
1122                known_tools: None,
1123                known_servers: None,
1124            },
1125        };
1126
1127        let mut buf = Vec::new();
1128        write_message(&mut buf, &msg).await.unwrap();
1129
1130        let mut cursor = Cursor::new(buf);
1131        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1132
1133        match decoded {
1134            ParentMessage::Reset { config } => {
1135                assert_eq!(config.timeout_ms, 3000);
1136                assert_eq!(config.max_tool_calls, 25);
1137            }
1138            other => panic!("expected Reset, got: {:?}", other),
1139        }
1140    }
1141
1142    // --- IPC-P02: ResetComplete round-trip ---
1143    #[tokio::test]
1144    async fn ipc_p02_reset_complete_roundtrip() {
1145        let msg = ChildMessage::ResetComplete;
1146
1147        let mut buf = Vec::new();
1148        write_message(&mut buf, &msg).await.unwrap();
1149
1150        let mut cursor = Cursor::new(buf);
1151        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1152
1153        assert!(matches!(decoded, ChildMessage::ResetComplete));
1154    }
1155
1156    // --- IPC-P03: Reset + Execute interleaving in single stream ---
1157    #[tokio::test]
1158    async fn ipc_p03_reset_execute_interleaving() {
1159        let reset = ParentMessage::Reset {
1160            config: WorkerConfig {
1161                timeout_ms: 5000,
1162                max_heap_size: 64 * 1024 * 1024,
1163                max_tool_calls: 50,
1164                max_tool_call_args_size: 1024 * 1024,
1165                max_output_size: 1024 * 1024,
1166                max_code_size: 64 * 1024,
1167                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1168                max_resource_size: 64 * 1024 * 1024,
1169                max_parallel: 8,
1170                known_tools: None,
1171                known_servers: None,
1172            },
1173        };
1174        let execute = ParentMessage::Execute {
1175            code: "async () => 42".into(),
1176            manifest: None,
1177            config: WorkerConfig {
1178                timeout_ms: 5000,
1179                max_heap_size: 64 * 1024 * 1024,
1180                max_tool_calls: 50,
1181                max_tool_call_args_size: 1024 * 1024,
1182                max_output_size: 1024 * 1024,
1183                max_code_size: 64 * 1024,
1184                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1185                max_resource_size: 64 * 1024 * 1024,
1186                max_parallel: 8,
1187                known_tools: None,
1188                known_servers: None,
1189            },
1190        };
1191
1192        let mut buf = Vec::new();
1193        write_message(&mut buf, &reset).await.unwrap();
1194        write_message(&mut buf, &execute).await.unwrap();
1195
1196        let mut cursor = Cursor::new(buf);
1197        let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1198        let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1199
1200        assert!(matches!(d1, ParentMessage::Reset { .. }));
1201        assert!(matches!(d2, ParentMessage::Execute { .. }));
1202    }
1203
1204    // --- IPC-10: Oversized stash message rejected by read_message_with_limit ---
1205    #[tokio::test]
1206    async fn ipc_10_oversized_stash_message_rejected() {
1207        let msg = ChildMessage::StashPut {
1208            request_id: 100,
1209            key: "k".into(),
1210            value: serde_json::json!("x".repeat(2048)),
1211            ttl_secs: Some(60),
1212            group: None,
1213        };
1214        let mut buf = Vec::new();
1215        write_message(&mut buf, &msg).await.unwrap();
1216
1217        // Set limit smaller than the message payload
1218        let mut cursor = Cursor::new(buf);
1219        let result: Result<Option<ChildMessage>, _> =
1220            read_message_with_limit(&mut cursor, 64).await;
1221        assert!(result.is_err());
1222        let err_msg = result.unwrap_err().to_string();
1223        assert!(
1224            err_msg.contains("too large"),
1225            "error should mention 'too large': {err_msg}"
1226        );
1227    }
1228
1229    // --- IPC-O01: ErrorKind timeout round-trip ---
1230    #[tokio::test]
1231    async fn ipc_o01_error_kind_timeout_roundtrip() {
1232        let msg = ChildMessage::ExecutionComplete {
1233            result: Err("execution timed out after 500ms".into()),
1234            error_kind: Some(ErrorKind::Timeout),
1235            timeout_ms: Some(500),
1236        };
1237
1238        let mut buf = Vec::new();
1239        write_message(&mut buf, &msg).await.unwrap();
1240
1241        let mut cursor = Cursor::new(buf);
1242        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1243
1244        match decoded {
1245            ChildMessage::ExecutionComplete {
1246                result,
1247                error_kind,
1248                timeout_ms,
1249            } => {
1250                assert!(result.is_err());
1251                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1252                assert_eq!(timeout_ms, Some(500));
1253            }
1254            other => panic!("expected ExecutionComplete, got: {:?}", other),
1255        }
1256    }
1257
1258    // --- IPC-O02: ErrorKind heap_limit round-trip ---
1259    #[tokio::test]
1260    async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1261        let msg = ChildMessage::ExecutionComplete {
1262            result: Err("V8 heap limit exceeded".into()),
1263            error_kind: Some(ErrorKind::HeapLimit),
1264            timeout_ms: None,
1265        };
1266
1267        let mut buf = Vec::new();
1268        write_message(&mut buf, &msg).await.unwrap();
1269
1270        let mut cursor = Cursor::new(buf);
1271        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1272
1273        match decoded {
1274            ChildMessage::ExecutionComplete {
1275                result, error_kind, ..
1276            } => {
1277                assert!(result.is_err());
1278                assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1279            }
1280            other => panic!("expected ExecutionComplete, got: {:?}", other),
1281        }
1282    }
1283
1284    // --- IPC-O03: ErrorKind absent defaults to None (backward compatibility) ---
1285    #[tokio::test]
1286    async fn ipc_o03_error_kind_backward_compat() {
1287        // Simulate a message from an older worker that doesn't include error_kind.
1288        // The JSON doesn't have the error_kind field at all.
1289        let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1290        let mut buf = Vec::new();
1291        let payload = json.as_bytes();
1292        let len = payload.len() as u32;
1293        buf.extend_from_slice(&len.to_be_bytes());
1294        buf.extend_from_slice(payload);
1295
1296        let mut cursor = Cursor::new(buf);
1297        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1298
1299        match decoded {
1300            ChildMessage::ExecutionComplete {
1301                result,
1302                error_kind,
1303                timeout_ms,
1304            } => {
1305                assert!(result.is_err());
1306                assert_eq!(
1307                    error_kind, None,
1308                    "missing error_kind should default to None"
1309                );
1310                assert_eq!(
1311                    timeout_ms, None,
1312                    "missing timeout_ms should default to None"
1313                );
1314            }
1315            other => panic!("expected ExecutionComplete, got: {:?}", other),
1316        }
1317    }
1318
1319    // --- IPC-O04: ErrorKind js_error round-trip ---
1320    #[tokio::test]
1321    async fn ipc_o04_error_kind_js_error_roundtrip() {
1322        let msg = ChildMessage::ExecutionComplete {
1323            result: Err("ReferenceError: x is not defined".into()),
1324            error_kind: Some(ErrorKind::JsError),
1325            timeout_ms: None,
1326        };
1327
1328        let mut buf = Vec::new();
1329        write_message(&mut buf, &msg).await.unwrap();
1330
1331        let mut cursor = Cursor::new(buf);
1332        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1333
1334        match decoded {
1335            ChildMessage::ExecutionComplete {
1336                result, error_kind, ..
1337            } => {
1338                assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1339                assert_eq!(error_kind, Some(ErrorKind::JsError));
1340            }
1341            other => panic!("expected ExecutionComplete, got: {:?}", other),
1342        }
1343    }
1344
1345    // --- IPC-O05: Success result has no error_kind in serialized JSON ---
1346    #[tokio::test]
1347    async fn ipc_o05_success_omits_error_kind() {
1348        let msg = ChildMessage::ExecutionComplete {
1349            result: Ok(serde_json::json!(42)),
1350            error_kind: None,
1351            timeout_ms: None,
1352        };
1353
1354        let json = serde_json::to_string(&msg).unwrap();
1355        // error_kind: None should be skipped thanks to skip_serializing_if
1356        assert!(
1357            !json.contains("error_kind"),
1358            "success messages should not contain error_kind field: {json}"
1359        );
1360        assert!(
1361            !json.contains("timeout_ms"),
1362            "success messages should not contain timeout_ms field: {json}"
1363        );
1364    }
1365
1366    // --- H1: Stash Group Isolation Tests ---
1367
1368    #[tokio::test]
1369    async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1370        let msg = ChildMessage::StashPut {
1371            request_id: 50,
1372            key: "grouped-key".into(),
1373            value: serde_json::json!({"data": "secret"}),
1374            ttl_secs: Some(120),
1375            group: Some("analytics".into()),
1376        };
1377
1378        let mut buf = Vec::new();
1379        write_message(&mut buf, &msg).await.unwrap();
1380
1381        let mut cursor = Cursor::new(buf);
1382        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1383
1384        match decoded {
1385            ChildMessage::StashPut {
1386                request_id,
1387                key,
1388                group,
1389                ..
1390            } => {
1391                assert_eq!(request_id, 50);
1392                assert_eq!(key, "grouped-key");
1393                assert_eq!(group, Some("analytics".into()));
1394            }
1395            other => panic!("expected StashPut, got: {:?}", other),
1396        }
1397    }
1398
1399    #[tokio::test]
1400    async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1401        let msg = ChildMessage::StashGet {
1402            request_id: 51,
1403            key: "grouped-key".into(),
1404            group: Some("analytics".into()),
1405        };
1406
1407        let mut buf = Vec::new();
1408        write_message(&mut buf, &msg).await.unwrap();
1409
1410        let mut cursor = Cursor::new(buf);
1411        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1412
1413        match decoded {
1414            ChildMessage::StashGet {
1415                request_id,
1416                key,
1417                group,
1418            } => {
1419                assert_eq!(request_id, 51);
1420                assert_eq!(key, "grouped-key");
1421                assert_eq!(group, Some("analytics".into()));
1422            }
1423            other => panic!("expected StashGet, got: {:?}", other),
1424        }
1425    }
1426
1427    #[tokio::test]
1428    async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1429        let msg = ChildMessage::StashDelete {
1430            request_id: 52,
1431            key: "grouped-key".into(),
1432            group: Some("analytics".into()),
1433        };
1434
1435        let mut buf = Vec::new();
1436        write_message(&mut buf, &msg).await.unwrap();
1437
1438        let mut cursor = Cursor::new(buf);
1439        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1440
1441        match decoded {
1442            ChildMessage::StashDelete {
1443                request_id,
1444                key,
1445                group,
1446            } => {
1447                assert_eq!(request_id, 52);
1448                assert_eq!(key, "grouped-key");
1449                assert_eq!(group, Some("analytics".into()));
1450            }
1451            other => panic!("expected StashDelete, got: {:?}", other),
1452        }
1453    }
1454
1455    #[tokio::test]
1456    async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1457        let msg = ChildMessage::StashKeys {
1458            request_id: 53,
1459            group: Some("analytics".into()),
1460        };
1461
1462        let mut buf = Vec::new();
1463        write_message(&mut buf, &msg).await.unwrap();
1464
1465        let mut cursor = Cursor::new(buf);
1466        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1467
1468        match decoded {
1469            ChildMessage::StashKeys { request_id, group } => {
1470                assert_eq!(request_id, 53);
1471                assert_eq!(group, Some("analytics".into()));
1472            }
1473            other => panic!("expected StashKeys, got: {:?}", other),
1474        }
1475    }
1476
1477    #[tokio::test]
1478    async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1479        // group: None → field absent in JSON
1480        let msg = ChildMessage::StashPut {
1481            request_id: 54,
1482            key: "no-group-key".into(),
1483            value: serde_json::json!("val"),
1484            ttl_secs: None,
1485            group: None,
1486        };
1487
1488        let json = serde_json::to_string(&msg).unwrap();
1489        assert!(
1490            !json.contains("\"group\""),
1491            "group:None should be absent in serialized JSON: {json}"
1492        );
1493
1494        // Still deserializes correctly
1495        let mut buf = Vec::new();
1496        write_message(&mut buf, &msg).await.unwrap();
1497        let mut cursor = Cursor::new(buf);
1498        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1499        match decoded {
1500            ChildMessage::StashPut { group, .. } => {
1501                assert_eq!(group, None);
1502            }
1503            other => panic!("expected StashPut, got: {:?}", other),
1504        }
1505    }
1506
1507    #[tokio::test]
1508    async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1509        // Simulate a v0.3.0 worker message that lacks the group field entirely
1510        let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1511        let mut buf = Vec::new();
1512        let payload = json.as_bytes();
1513        let len = payload.len() as u32;
1514        buf.extend_from_slice(&len.to_be_bytes());
1515        buf.extend_from_slice(payload);
1516
1517        let mut cursor = Cursor::new(buf);
1518        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1519
1520        match decoded {
1521            ChildMessage::StashPut {
1522                request_id,
1523                key,
1524                group,
1525                ..
1526            } => {
1527                assert_eq!(request_id, 60);
1528                assert_eq!(key, "old-key");
1529                assert_eq!(
1530                    group, None,
1531                    "missing group field from v0.3.0 worker should deserialize as None"
1532                );
1533            }
1534            other => panic!("expected StashPut, got: {:?}", other),
1535        }
1536
1537        // Also test StashGet, StashDelete, StashKeys backward compat
1538        let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1539        let mut buf = Vec::new();
1540        let payload = json_get.as_bytes();
1541        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1542        buf.extend_from_slice(payload);
1543        let mut cursor = Cursor::new(buf);
1544        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1545        match decoded {
1546            ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1547            other => panic!("expected StashGet, got: {:?}", other),
1548        }
1549
1550        let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1551        let mut buf = Vec::new();
1552        let payload = json_del.as_bytes();
1553        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1554        buf.extend_from_slice(payload);
1555        let mut cursor = Cursor::new(buf);
1556        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1557        match decoded {
1558            ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1559            other => panic!("expected StashDelete, got: {:?}", other),
1560        }
1561
1562        let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1563        let mut buf = Vec::new();
1564        let payload = json_keys.as_bytes();
1565        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1566        buf.extend_from_slice(payload);
1567        let mut cursor = Cursor::new(buf);
1568        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1569        match decoded {
1570            ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1571            other => panic!("expected StashKeys, got: {:?}", other),
1572        }
1573    }
1574
1575    // --- Phase 2: Structured Timeout + RawValue Tests ---
1576
1577    #[tokio::test]
1578    async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1579        let msg = ChildMessage::ExecutionComplete {
1580            result: Err("execution timed out after 5000ms".into()),
1581            error_kind: Some(ErrorKind::Timeout),
1582            timeout_ms: Some(5000),
1583        };
1584
1585        let mut buf = Vec::new();
1586        write_message(&mut buf, &msg).await.unwrap();
1587
1588        let mut cursor = Cursor::new(buf);
1589        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1590
1591        match decoded {
1592            ChildMessage::ExecutionComplete {
1593                result,
1594                error_kind,
1595                timeout_ms,
1596            } => {
1597                assert!(result.is_err());
1598                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1599                assert_eq!(timeout_ms, Some(5000));
1600            }
1601            other => panic!("expected ExecutionComplete, got: {:?}", other),
1602        }
1603    }
1604
1605    #[tokio::test]
1606    async fn ipc_t02_timeout_ms_absent_backward_compat() {
1607        // Simulate an old v0.3.0 worker that doesn't include timeout_ms
1608        let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1609        let mut buf = Vec::new();
1610        let payload = json.as_bytes();
1611        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1612        buf.extend_from_slice(payload);
1613
1614        let mut cursor = Cursor::new(buf);
1615        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1616
1617        match decoded {
1618            ChildMessage::ExecutionComplete {
1619                error_kind,
1620                timeout_ms,
1621                ..
1622            } => {
1623                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1624                assert_eq!(
1625                    timeout_ms, None,
1626                    "missing timeout_ms should default to None"
1627                );
1628            }
1629            other => panic!("expected ExecutionComplete, got: {:?}", other),
1630        }
1631    }
1632
1633    #[tokio::test]
1634    async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1635        let msg = ChildMessage::ExecutionComplete {
1636            result: Err("some error".into()),
1637            error_kind: Some(ErrorKind::JsError),
1638            timeout_ms: None,
1639        };
1640
1641        let json = serde_json::to_string(&msg).unwrap();
1642        assert!(
1643            !json.contains("timeout_ms"),
1644            "timeout_ms:None should be omitted: {json}"
1645        );
1646    }
1647
1648    #[tokio::test]
1649    async fn ipc_t04_timeout_ms_present_when_some() {
1650        let msg = ChildMessage::ExecutionComplete {
1651            result: Err("timed out".into()),
1652            error_kind: Some(ErrorKind::Timeout),
1653            timeout_ms: Some(10000),
1654        };
1655
1656        let json = serde_json::to_string(&msg).unwrap();
1657        assert!(
1658            json.contains("\"timeout_ms\":10000"),
1659            "timeout_ms should be present: {json}"
1660        );
1661    }
1662
1663    // --- RawValue passthrough tests ---
1664
1665    #[tokio::test]
1666    async fn ipc_rv01_write_raw_message_roundtrip() {
1667        let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1668
1669        let mut buf = Vec::new();
1670        write_raw_message(&mut buf, payload).await.unwrap();
1671
1672        let mut cursor = Cursor::new(buf);
1673        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1674            .await
1675            .unwrap()
1676            .unwrap();
1677
1678        assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1679    }
1680
1681    #[tokio::test]
1682    async fn ipc_rv02_read_raw_message_preserves_bytes() {
1683        // Write a regular message, then read it raw
1684        let msg = ChildMessage::Log {
1685            message: "test raw".into(),
1686        };
1687        let mut buf = Vec::new();
1688        write_message(&mut buf, &msg).await.unwrap();
1689
1690        let mut cursor = Cursor::new(buf);
1691        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1692            .await
1693            .unwrap()
1694            .unwrap();
1695
1696        // The raw value should be valid JSON
1697        let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1698        assert!(matches!(parsed, ChildMessage::Log { .. }));
1699    }
1700
1701    #[tokio::test]
1702    async fn ipc_rv03_raw_message_size_limit_enforced() {
1703        let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1704        let mut buf = Vec::new();
1705        write_raw_message(&mut buf, large_payload.as_bytes())
1706            .await
1707            .unwrap();
1708
1709        let mut cursor = Cursor::new(buf);
1710        let result = read_raw_message(&mut cursor, 64).await;
1711        assert!(result.is_err());
1712        let err = result.unwrap_err().to_string();
1713        assert!(err.contains("too large"), "error: {err}");
1714    }
1715
1716    #[tokio::test]
1717    async fn ipc_rv04_large_payload_stays_raw() {
1718        // 1MB payload — read as raw without full Value parse
1719        let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1720        let mut buf = Vec::new();
1721        write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1722
1723        let mut cursor = Cursor::new(buf);
1724        let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1725            .await
1726            .unwrap()
1727            .unwrap();
1728
1729        // Should preserve the raw JSON string without parsing into Value
1730        assert!(raw.get().len() > 1_000_000);
1731        // Can still be parsed if needed
1732        let val: Value = serde_json::from_str(raw.get()).unwrap();
1733        assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1734    }
1735
1736    #[tokio::test]
1737    async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1738        // Write as regular message (Value), read as raw
1739        let msg = ParentMessage::ToolCallResult {
1740            request_id: 99,
1741            result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1742        };
1743        let mut buf = Vec::new();
1744        write_message(&mut buf, &msg).await.unwrap();
1745
1746        let mut cursor = Cursor::new(buf.clone());
1747        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1748            .await
1749            .unwrap()
1750            .unwrap();
1751
1752        // Parse the raw back as ParentMessage
1753        let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1754        match parsed {
1755            ParentMessage::ToolCallResult { request_id, result } => {
1756                assert_eq!(request_id, 99);
1757                assert!(result.is_ok());
1758            }
1759            other => panic!("expected ToolCallResult, got: {:?}", other),
1760        }
1761    }
1762
1763    #[tokio::test]
1764    async fn ipc_rv06_raw_eof_returns_none() {
1765        let mut cursor = Cursor::new(Vec::<u8>::new());
1766        let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1767            .await
1768            .unwrap();
1769        assert!(result.is_none());
1770    }
1771
1772    #[tokio::test]
1773    async fn ipc_rv07_mixed_raw_and_value_messages() {
1774        let mut buf = Vec::new();
1775
1776        // Write a typed message
1777        let msg1 = ChildMessage::Log {
1778            message: "first".into(),
1779        };
1780        write_message(&mut buf, &msg1).await.unwrap();
1781
1782        // Write a raw message
1783        let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1784        write_raw_message(&mut buf, raw_payload).await.unwrap();
1785
1786        // Read both: first typed, second raw
1787        let mut cursor = Cursor::new(buf);
1788        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1789        assert!(matches!(d1, ChildMessage::Log { .. }));
1790
1791        let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1792            .await
1793            .unwrap()
1794            .unwrap();
1795        let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1796        assert!(matches!(parsed, ChildMessage::Log { .. }));
1797    }
1798
1799    // --- ToolError IPC round-trip tests ---
1800
1801    #[tokio::test]
1802    async fn tool_error_round_trips_through_ipc() {
1803        let original = forge_error::DispatchError::ToolError {
1804            server: "arbiter".into(),
1805            tool: "scan_target".into(),
1806            message: "tool returned error: Invalid params: missing field 'base_url'".into(),
1807        };
1808
1809        // Convert to IPC form
1810        let ipc_err = IpcDispatchError::from(&original);
1811        assert_eq!(ipc_err.code, "TOOL_ERROR");
1812        assert_eq!(ipc_err.server, Some("arbiter".into()));
1813        assert_eq!(ipc_err.tool, Some("scan_target".into()));
1814        assert!(ipc_err.message.contains("Invalid params"));
1815
1816        // Round-trip through serialization
1817        let json = serde_json::to_string(&ipc_err).unwrap();
1818        let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
1819
1820        // Reconstruct DispatchError
1821        let reconstructed = deserialized.to_dispatch_error();
1822        assert!(matches!(
1823            reconstructed,
1824            forge_error::DispatchError::ToolError {
1825                ref server,
1826                ref tool,
1827                ..
1828            } if server == "arbiter" && tool == "scan_target"
1829        ));
1830        assert!(!reconstructed.trips_circuit_breaker());
1831        assert_eq!(reconstructed.code(), "TOOL_ERROR");
1832    }
1833
1834    #[tokio::test]
1835    async fn tool_error_ipc_message_roundtrip() {
1836        let msg = ParentMessage::ToolCallResult {
1837            request_id: 42,
1838            result: Err(IpcDispatchError::from(
1839                &forge_error::DispatchError::ToolError {
1840                    server: "arbiter".into(),
1841                    tool: "scan".into(),
1842                    message: "bad params".into(),
1843                },
1844            )),
1845        };
1846
1847        let mut buf = Vec::new();
1848        write_message(&mut buf, &msg).await.unwrap();
1849
1850        let mut cursor = Cursor::new(buf);
1851        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1852
1853        match decoded {
1854            ParentMessage::ToolCallResult { request_id, result } => {
1855                assert_eq!(request_id, 42);
1856                let err = result.unwrap_err();
1857                assert_eq!(err.code, "TOOL_ERROR");
1858                assert_eq!(err.server, Some("arbiter".into()));
1859                assert_eq!(err.tool, Some("scan".into()));
1860
1861                // Verify it reconstructs to the correct variant
1862                let dispatch_err = err.to_dispatch_error();
1863                assert!(!dispatch_err.trips_circuit_breaker());
1864            }
1865            other => panic!("expected ToolCallResult, got: {:?}", other),
1866        }
1867    }
1868}