Skip to main content

forge_sandbox/
ipc.rs

1//! IPC protocol for parent ↔ worker communication.
2//!
3//! Uses length-delimited JSON messages: 4-byte big-endian length prefix + JSON payload.
4//! All messages are typed via [`ParentMessage`] and [`ChildMessage`] enums.
5
6use serde::{Deserialize, Serialize};
7use serde_json::value::RawValue;
8use serde_json::Value;
9use std::time::Duration;
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12/// Error classification for structured error preservation across IPC.
13///
14/// When errors cross the process boundary via `ExecutionComplete`, the typed
15/// `SandboxError` variants are converted to strings. This enum preserves the
16/// error kind so the parent can reconstruct the correct `SandboxError` variant.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19#[non_exhaustive]
20pub enum ErrorKind {
21    /// V8 execution timed out (CPU watchdog or async event loop).
22    Timeout,
23    /// V8 heap memory limit was exceeded.
24    HeapLimit,
25    /// A JavaScript error was thrown.
26    JsError,
27    /// Generic execution failure.
28    Execution,
29}
30
31/// Structured dispatch error for IPC transport.
32///
33/// Preserves [`forge_error::DispatchError`] variant information across the
34/// parent ↔ worker process boundary. Without this, all errors would be
35/// flattened to `DispatchError::Internal` in ChildProcess mode, losing
36/// error codes like `SERVER_NOT_FOUND` and fuzzy match suggestions.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct IpcDispatchError {
39    /// Error code matching `DispatchError::code()` output.
40    pub code: String,
41    /// Human-readable error message.
42    pub message: String,
43    /// Server name (for ServerNotFound, ToolNotFound, Timeout, CircuitOpen, Upstream).
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub server: Option<String>,
46    /// Tool name (for ToolNotFound).
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub tool: Option<String>,
49    /// Timeout in milliseconds (for Timeout).
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub timeout_ms: Option<u64>,
52}
53
54impl IpcDispatchError {
55    /// Create from a plain string error message (used for non-dispatch errors
56    /// like "stash dispatcher not available" or URI validation failures).
57    pub fn from_string(msg: String) -> Self {
58        Self {
59            code: "INTERNAL".to_string(),
60            message: msg,
61            server: None,
62            tool: None,
63            timeout_ms: None,
64        }
65    }
66
67    /// Reconstruct the appropriate [`forge_error::DispatchError`] variant.
68    pub fn to_dispatch_error(self) -> forge_error::DispatchError {
69        match self.code.as_str() {
70            "SERVER_NOT_FOUND" => {
71                forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
72            }
73            "TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
74                server: self.server.unwrap_or_default(),
75                tool: self.tool.unwrap_or_default(),
76            },
77            "TIMEOUT" => forge_error::DispatchError::Timeout {
78                server: self.server.unwrap_or_default(),
79                timeout_ms: self.timeout_ms.unwrap_or(0),
80            },
81            "CIRCUIT_OPEN" => {
82                forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
83            }
84            "GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
85                reason: self.message,
86            },
87            "UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
88                server: self.server.unwrap_or_default(),
89                message: self.message,
90            },
91            "RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
92            _ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
93        }
94    }
95}
96
97impl From<&forge_error::DispatchError> for IpcDispatchError {
98    fn from(e: &forge_error::DispatchError) -> Self {
99        let (server, tool, timeout_ms) = match e {
100            forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
101            forge_error::DispatchError::ToolNotFound { server, tool } => {
102                (Some(server.clone()), Some(tool.clone()), None)
103            }
104            forge_error::DispatchError::Timeout {
105                server, timeout_ms, ..
106            } => (Some(server.clone()), None, Some(*timeout_ms)),
107            forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
108            forge_error::DispatchError::Upstream { server, .. } => {
109                (Some(server.clone()), None, None)
110            }
111            _ => (None, None, None),
112        };
113
114        Self {
115            code: e.code().to_string(),
116            message: e.to_string(),
117            server,
118            tool,
119            timeout_ms,
120        }
121    }
122}
123
124/// Messages sent from the parent process to the worker child.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(tag = "type")]
127#[non_exhaustive]
128pub enum ParentMessage {
129    /// Initial message: execute this code in the sandbox.
130    Execute {
131        /// The JavaScript async arrow function to execute.
132        code: String,
133        /// Optional capability manifest (for search mode — not used in child process execute).
134        manifest: Option<Value>,
135        /// Worker configuration.
136        config: WorkerConfig,
137    },
138    /// Response to a tool call request from the child.
139    ToolCallResult {
140        /// Matches the request_id from ChildMessage::ToolCallRequest.
141        request_id: u64,
142        /// The tool call result, or a structured dispatch error.
143        result: Result<Value, IpcDispatchError>,
144    },
145    /// Response to a resource read request from the child.
146    ResourceReadResult {
147        /// Matches the request_id from ChildMessage::ResourceReadRequest.
148        request_id: u64,
149        /// The resource content, or a structured dispatch error.
150        result: Result<Value, IpcDispatchError>,
151    },
152    /// Reset the worker for a new execution (pool mode).
153    ///
154    /// The worker drops its current JsRuntime and creates a fresh one.
155    /// It responds with [`ChildMessage::ResetComplete`].
156    Reset {
157        /// New worker configuration for the next execution.
158        config: WorkerConfig,
159    },
160    /// Response to a stash operation from the child.
161    StashResult {
162        /// Matches the request_id from the stash request.
163        request_id: u64,
164        /// The stash operation result, or a structured dispatch error.
165        result: Result<Value, IpcDispatchError>,
166    },
167}
168
169/// Messages sent from the worker child to the parent process.
170#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(tag = "type")]
172#[non_exhaustive]
173pub enum ChildMessage {
174    /// Request the parent to dispatch a tool call.
175    ToolCallRequest {
176        /// Unique ID for correlating request ↔ response.
177        request_id: u64,
178        /// Target server name.
179        server: String,
180        /// Tool identifier.
181        tool: String,
182        /// Tool arguments.
183        args: Value,
184    },
185    /// Request the parent to read a resource.
186    ResourceReadRequest {
187        /// Unique ID for correlating request ↔ response.
188        request_id: u64,
189        /// Target server name.
190        server: String,
191        /// Resource URI.
192        uri: String,
193    },
194    /// Request the parent to put a value in the stash.
195    StashPut {
196        /// Unique ID for correlating request ↔ response.
197        request_id: u64,
198        /// Stash key.
199        key: String,
200        /// Value to store.
201        value: Value,
202        /// TTL in seconds (None = use default).
203        ttl_secs: Option<u32>,
204        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
205        #[serde(default, skip_serializing_if = "Option::is_none")]
206        group: Option<String>,
207    },
208    /// Request the parent to get a value from the stash.
209    StashGet {
210        /// Unique ID for correlating request ↔ response.
211        request_id: u64,
212        /// Stash key.
213        key: String,
214        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
215        #[serde(default, skip_serializing_if = "Option::is_none")]
216        group: Option<String>,
217    },
218    /// Request the parent to delete a value from the stash.
219    StashDelete {
220        /// Unique ID for correlating request ↔ response.
221        request_id: u64,
222        /// Stash key.
223        key: String,
224        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
225        #[serde(default, skip_serializing_if = "Option::is_none")]
226        group: Option<String>,
227    },
228    /// Request the parent to list stash keys.
229    StashKeys {
230        /// Unique ID for correlating request ↔ response.
231        request_id: u64,
232        /// Stash group for isolation (v0.3.1+). Absent in v0.3.0 workers → None.
233        #[serde(default, skip_serializing_if = "Option::is_none")]
234        group: Option<String>,
235    },
236    /// Worker has been reset and is ready for a new execution.
237    ResetComplete,
238    /// The execution has finished.
239    ExecutionComplete {
240        /// The result value, or an error message.
241        result: Result<Value, String>,
242        /// Classification of the error for typed reconstruction on the parent side.
243        /// Present only when `result` is `Err`. Defaults to `JsError` if absent
244        /// (backward compatibility with workers that don't send this field).
245        #[serde(default, skip_serializing_if = "Option::is_none")]
246        error_kind: Option<ErrorKind>,
247        /// Structured timeout value in milliseconds (v0.3.1+).
248        /// Present only when `error_kind` is `Timeout`. Replaces fragile string parsing.
249        /// Absent in v0.3.0 workers → None.
250        #[serde(default, skip_serializing_if = "Option::is_none")]
251        timeout_ms: Option<u64>,
252    },
253    /// A log message from the worker.
254    Log {
255        /// The log message text.
256        message: String,
257    },
258}
259
260/// Configuration passed to the worker process.
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct WorkerConfig {
263    /// Maximum execution time.
264    pub timeout_ms: u64,
265    /// V8 heap limit in bytes.
266    pub max_heap_size: usize,
267    /// Maximum tool calls per execution.
268    pub max_tool_calls: usize,
269    /// Maximum size of tool call arguments in bytes.
270    pub max_tool_call_args_size: usize,
271    /// Maximum size of the JSON result in bytes.
272    pub max_output_size: usize,
273    /// Maximum size of LLM-generated code in bytes.
274    pub max_code_size: usize,
275    /// Maximum IPC message size in bytes. Defaults to [`DEFAULT_MAX_IPC_MESSAGE_SIZE`].
276    #[serde(default = "default_max_ipc_message_size")]
277    pub max_ipc_message_size: usize,
278    /// Maximum resource content size in bytes.
279    #[serde(default = "default_max_resource_size")]
280    pub max_resource_size: usize,
281    /// Maximum concurrent calls in forge.parallel().
282    #[serde(default = "default_max_parallel")]
283    pub max_parallel: usize,
284    /// Known tools for structured error fuzzy matching (v0.3.1+).
285    /// Each entry is `(server_name, tool_name)`.
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    pub known_tools: Option<Vec<(String, String)>>,
288    /// Known server names for structured error detection (v0.3.1+).
289    #[serde(default, skip_serializing_if = "Option::is_none")]
290    pub known_servers: Option<std::collections::HashSet<String>>,
291}
292
293fn default_max_ipc_message_size() -> usize {
294    DEFAULT_MAX_IPC_MESSAGE_SIZE
295}
296
297fn default_max_resource_size() -> usize {
298    64 * 1024 * 1024 // 64 MB
299}
300
301fn default_max_parallel() -> usize {
302    8
303}
304
305impl From<&crate::SandboxConfig> for WorkerConfig {
306    fn from(config: &crate::SandboxConfig) -> Self {
307        Self {
308            timeout_ms: config.timeout.as_millis() as u64,
309            max_heap_size: config.max_heap_size,
310            max_tool_calls: config.max_tool_calls,
311            max_tool_call_args_size: config.max_tool_call_args_size,
312            max_output_size: config.max_output_size,
313            max_code_size: config.max_code_size,
314            max_ipc_message_size: config.max_ipc_message_size,
315            max_resource_size: config.max_resource_size,
316            max_parallel: config.max_parallel,
317            known_tools: None,
318            known_servers: None,
319        }
320    }
321}
322
323impl WorkerConfig {
324    /// Convert back to a SandboxConfig for use in the worker.
325    pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
326        crate::SandboxConfig {
327            timeout: Duration::from_millis(self.timeout_ms),
328            max_code_size: self.max_code_size,
329            max_output_size: self.max_output_size,
330            max_heap_size: self.max_heap_size,
331            max_concurrent: 1, // worker handles one execution
332            max_tool_calls: self.max_tool_calls,
333            max_tool_call_args_size: self.max_tool_call_args_size,
334            execution_mode: crate::executor::ExecutionMode::InProcess, // worker always runs in-process
335            max_resource_size: self.max_resource_size,
336            max_parallel: self.max_parallel,
337            max_ipc_message_size: self.max_ipc_message_size,
338        }
339    }
340}
341
342/// Write a length-delimited JSON message to an async writer.
343///
344/// Format: 4-byte big-endian length prefix followed by the JSON payload bytes.
345pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
346    writer: &mut W,
347    msg: &T,
348) -> Result<(), std::io::Error> {
349    let payload = serde_json::to_vec(msg)
350        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
351    let len = u32::try_from(payload.len()).map_err(|_| {
352        std::io::Error::new(
353            std::io::ErrorKind::InvalidData,
354            format!(
355                "IPC payload too large: {} bytes (max {} bytes)",
356                payload.len(),
357                u32::MAX
358            ),
359        )
360    })?;
361    writer.write_all(&len.to_be_bytes()).await?;
362    writer.write_all(&payload).await?;
363    writer.flush().await?;
364    Ok(())
365}
366
367/// Write a raw JSON byte payload as a length-delimited IPC message.
368///
369/// This bypasses serialization entirely — useful for forwarding large
370/// tool/resource results without deserializing and re-serializing.
371pub async fn write_raw_message<W: AsyncWrite + Unpin>(
372    writer: &mut W,
373    payload: &[u8],
374) -> Result<(), std::io::Error> {
375    let len = u32::try_from(payload.len()).map_err(|_| {
376        std::io::Error::new(
377            std::io::ErrorKind::InvalidData,
378            format!(
379                "raw IPC payload too large: {} bytes (max {} bytes)",
380                payload.len(),
381                u32::MAX
382            ),
383        )
384    })?;
385    writer.write_all(&len.to_be_bytes()).await?;
386    writer.write_all(payload).await?;
387    writer.flush().await?;
388    Ok(())
389}
390
391/// Read a raw JSON byte payload from an IPC message without deserializing.
392///
393/// Returns the raw bytes as an owned `Box<RawValue>` which can be forwarded
394/// without parsing. Returns `None` on EOF.
395pub async fn read_raw_message<R: AsyncRead + Unpin>(
396    reader: &mut R,
397    max_size: usize,
398) -> Result<Option<Box<RawValue>>, std::io::Error> {
399    let mut len_buf = [0u8; 4];
400    match reader.read_exact(&mut len_buf).await {
401        Ok(_) => {}
402        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
403        Err(e) => return Err(e),
404    }
405
406    let len = u32::from_be_bytes(len_buf) as usize;
407
408    if len > max_size {
409        return Err(std::io::Error::new(
410            std::io::ErrorKind::InvalidData,
411            format!(
412                "raw IPC message too large: {} bytes (limit: {} bytes)",
413                len, max_size
414            ),
415        ));
416    }
417
418    let mut payload = vec![0u8; len];
419    reader.read_exact(&mut payload).await?;
420
421    let raw: Box<RawValue> = serde_json::from_slice(&payload)
422        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
423    Ok(Some(raw))
424}
425
426/// Default maximum IPC message size: 8 MB.
427///
428/// Reduced from 64 MB to prevent single messages from causing memory pressure.
429/// Configurable via `sandbox.max_ipc_message_size_mb` in config.
430pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
431
432/// Read a length-delimited JSON message from an async reader.
433///
434/// Returns `None` if the reader has reached EOF (clean shutdown).
435/// Uses [`DEFAULT_MAX_IPC_MESSAGE_SIZE`] as the size limit.
436pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
437    reader: &mut R,
438) -> Result<Option<T>, std::io::Error> {
439    read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
440}
441
442/// Read a length-delimited JSON message with a configurable size limit.
443///
444/// Returns `None` if the reader has reached EOF (clean shutdown).
445/// The `max_size` parameter controls the maximum allowed message size in bytes.
446pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
447    reader: &mut R,
448    max_size: usize,
449) -> Result<Option<T>, std::io::Error> {
450    let mut len_buf = [0u8; 4];
451    match reader.read_exact(&mut len_buf).await {
452        Ok(_) => {}
453        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
454        Err(e) => return Err(e),
455    }
456
457    let len = u32::from_be_bytes(len_buf) as usize;
458
459    // Reject messages exceeding the configured limit
460    if len > max_size {
461        return Err(std::io::Error::new(
462            std::io::ErrorKind::InvalidData,
463            format!(
464                "IPC message too large: {} bytes (limit: {} bytes)",
465                len, max_size
466            ),
467        ));
468    }
469
470    let mut payload = vec![0u8; len];
471    reader.read_exact(&mut payload).await?;
472
473    let msg: T = serde_json::from_slice(&payload)
474        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
475    Ok(Some(msg))
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481    use std::io::Cursor;
482
483    #[tokio::test]
484    async fn roundtrip_parent_execute_message() {
485        let msg = ParentMessage::Execute {
486            code: "async () => { return 42; }".into(),
487            manifest: Some(serde_json::json!({"servers": []})),
488            config: WorkerConfig {
489                timeout_ms: 5000,
490                max_heap_size: 64 * 1024 * 1024,
491                max_tool_calls: 50,
492                max_tool_call_args_size: 1024 * 1024,
493                max_output_size: 1024 * 1024,
494                max_code_size: 64 * 1024,
495                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
496                max_resource_size: 64 * 1024 * 1024,
497                max_parallel: 8,
498                known_tools: None,
499                known_servers: None,
500            },
501        };
502
503        let mut buf = Vec::new();
504        write_message(&mut buf, &msg).await.unwrap();
505
506        let mut cursor = Cursor::new(buf);
507        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
508
509        match decoded {
510            ParentMessage::Execute {
511                code,
512                manifest,
513                config,
514            } => {
515                assert_eq!(code, "async () => { return 42; }");
516                assert!(manifest.is_some());
517                assert_eq!(config.timeout_ms, 5000);
518            }
519            other => panic!("expected Execute, got: {:?}", other),
520        }
521    }
522
523    #[tokio::test]
524    async fn roundtrip_parent_tool_result() {
525        let msg = ParentMessage::ToolCallResult {
526            request_id: 42,
527            result: Ok(serde_json::json!({"status": "ok"})),
528        };
529
530        let mut buf = Vec::new();
531        write_message(&mut buf, &msg).await.unwrap();
532
533        let mut cursor = Cursor::new(buf);
534        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
535
536        match decoded {
537            ParentMessage::ToolCallResult { request_id, result } => {
538                assert_eq!(request_id, 42);
539                assert!(result.is_ok());
540            }
541            other => panic!("expected ToolCallResult, got: {:?}", other),
542        }
543    }
544
545    #[tokio::test]
546    async fn roundtrip_parent_tool_result_error() {
547        let msg = ParentMessage::ToolCallResult {
548            request_id: 7,
549            result: Err(IpcDispatchError::from_string("connection refused".into())),
550        };
551
552        let mut buf = Vec::new();
553        write_message(&mut buf, &msg).await.unwrap();
554
555        let mut cursor = Cursor::new(buf);
556        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
557
558        match decoded {
559            ParentMessage::ToolCallResult { request_id, result } => {
560                assert_eq!(request_id, 7);
561                let err = result.unwrap_err();
562                assert_eq!(err.message, "connection refused");
563                assert_eq!(err.code, "INTERNAL");
564            }
565            other => panic!("expected ToolCallResult, got: {:?}", other),
566        }
567    }
568
569    #[tokio::test]
570    async fn roundtrip_child_tool_request() {
571        let msg = ChildMessage::ToolCallRequest {
572            request_id: 1,
573            server: "narsil".into(),
574            tool: "ast.parse".into(),
575            args: serde_json::json!({"file": "test.rs"}),
576        };
577
578        let mut buf = Vec::new();
579        write_message(&mut buf, &msg).await.unwrap();
580
581        let mut cursor = Cursor::new(buf);
582        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
583
584        match decoded {
585            ChildMessage::ToolCallRequest {
586                request_id,
587                server,
588                tool,
589                args,
590            } => {
591                assert_eq!(request_id, 1);
592                assert_eq!(server, "narsil");
593                assert_eq!(tool, "ast.parse");
594                assert_eq!(args["file"], "test.rs");
595            }
596            other => panic!("expected ToolCallRequest, got: {:?}", other),
597        }
598    }
599
600    #[tokio::test]
601    async fn roundtrip_child_execution_complete() {
602        let msg = ChildMessage::ExecutionComplete {
603            result: Ok(serde_json::json!([1, 2, 3])),
604            error_kind: None,
605            timeout_ms: None,
606        };
607
608        let mut buf = Vec::new();
609        write_message(&mut buf, &msg).await.unwrap();
610
611        let mut cursor = Cursor::new(buf);
612        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
613
614        match decoded {
615            ChildMessage::ExecutionComplete {
616                result, error_kind, ..
617            } => {
618                assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
619                assert_eq!(error_kind, None);
620            }
621            other => panic!("expected ExecutionComplete, got: {:?}", other),
622        }
623    }
624
625    #[tokio::test]
626    async fn roundtrip_child_log() {
627        let msg = ChildMessage::Log {
628            message: "processing step 3".into(),
629        };
630
631        let mut buf = Vec::new();
632        write_message(&mut buf, &msg).await.unwrap();
633
634        let mut cursor = Cursor::new(buf);
635        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
636
637        match decoded {
638            ChildMessage::Log { message } => {
639                assert_eq!(message, "processing step 3");
640            }
641            other => panic!("expected Log, got: {:?}", other),
642        }
643    }
644
645    #[tokio::test]
646    async fn multiple_messages_in_stream() {
647        let msg1 = ChildMessage::Log {
648            message: "first".into(),
649        };
650        let msg2 = ChildMessage::ToolCallRequest {
651            request_id: 1,
652            server: "s".into(),
653            tool: "t".into(),
654            args: serde_json::json!({}),
655        };
656        let msg3 = ChildMessage::ExecutionComplete {
657            result: Ok(serde_json::json!("done")),
658            error_kind: None,
659            timeout_ms: None,
660        };
661
662        let mut buf = Vec::new();
663        write_message(&mut buf, &msg1).await.unwrap();
664        write_message(&mut buf, &msg2).await.unwrap();
665        write_message(&mut buf, &msg3).await.unwrap();
666
667        let mut cursor = Cursor::new(buf);
668        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
669        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
670        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
671
672        assert!(matches!(d1, ChildMessage::Log { .. }));
673        assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
674        assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
675
676        // EOF after all messages
677        let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
678        assert!(d4.is_none());
679    }
680
681    #[tokio::test]
682    async fn execution_complete_error_roundtrip() {
683        let msg = ChildMessage::ExecutionComplete {
684            result: Err("failed to create tokio runtime: resource unavailable".into()),
685            error_kind: Some(ErrorKind::Execution),
686            timeout_ms: None,
687        };
688
689        let mut buf = Vec::new();
690        write_message(&mut buf, &msg).await.unwrap();
691
692        let mut cursor = Cursor::new(buf);
693        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
694
695        match decoded {
696            ChildMessage::ExecutionComplete {
697                result, error_kind, ..
698            } => {
699                let err = result.unwrap_err();
700                assert!(
701                    err.contains("tokio runtime"),
702                    "expected runtime error: {err}"
703                );
704                assert_eq!(error_kind, Some(ErrorKind::Execution));
705            }
706            other => panic!("expected ExecutionComplete, got: {:?}", other),
707        }
708    }
709
710    #[tokio::test]
711    async fn eof_returns_none() {
712        let mut cursor = Cursor::new(Vec::<u8>::new());
713        let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
714        assert!(result.is_none());
715    }
716
717    #[test]
718    fn u32_try_from_overflow() {
719        // Validates that the conversion logic correctly rejects sizes > u32::MAX
720        let overflow_size = u32::MAX as usize + 1;
721        assert!(u32::try_from(overflow_size).is_err());
722    }
723
724    #[tokio::test]
725    async fn write_message_normal_size_succeeds() {
726        // Regression guard: normal-sized messages still work after the try_from change
727        let msg = ChildMessage::Log {
728            message: "a".repeat(1024),
729        };
730        let mut buf = Vec::new();
731        write_message(&mut buf, &msg).await.unwrap();
732        assert!(buf.len() > 1024);
733    }
734
735    #[tokio::test]
736    async fn large_message_roundtrip() {
737        // A large payload (~1MB of data)
738        let large_data = "x".repeat(1_000_000);
739        let msg = ChildMessage::ExecutionComplete {
740            result: Ok(serde_json::json!(large_data)),
741            error_kind: None,
742            timeout_ms: None,
743        };
744
745        let mut buf = Vec::new();
746        write_message(&mut buf, &msg).await.unwrap();
747
748        let mut cursor = Cursor::new(buf);
749        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
750
751        match decoded {
752            ChildMessage::ExecutionComplete { result, .. } => {
753                assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
754            }
755            other => panic!("expected ExecutionComplete, got: {:?}", other),
756        }
757    }
758
759    #[tokio::test]
760    async fn worker_config_roundtrip_from_sandbox_config() {
761        let sandbox = crate::SandboxConfig::default();
762        let worker = WorkerConfig::from(&sandbox);
763        let back = worker.to_sandbox_config();
764
765        assert_eq!(sandbox.timeout, back.timeout);
766        assert_eq!(sandbox.max_heap_size, back.max_heap_size);
767        assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
768        assert_eq!(sandbox.max_output_size, back.max_output_size);
769        assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
770        assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); // 8 MB default
771    }
772
773    #[tokio::test]
774    async fn read_message_with_limit_rejects_oversized() {
775        let msg = ChildMessage::Log {
776            message: "x".repeat(1024),
777        };
778        let mut buf = Vec::new();
779        write_message(&mut buf, &msg).await.unwrap();
780
781        // Set limit smaller than the message payload
782        let mut cursor = Cursor::new(buf);
783        let result: Result<Option<ChildMessage>, _> =
784            read_message_with_limit(&mut cursor, 64).await;
785        assert!(result.is_err());
786        let err_msg = result.unwrap_err().to_string();
787        assert!(err_msg.contains("too large"), "error: {err_msg}");
788    }
789
790    #[tokio::test]
791    async fn read_message_with_limit_accepts_within_limit() {
792        let msg = ChildMessage::Log {
793            message: "hello".into(),
794        };
795        let mut buf = Vec::new();
796        write_message(&mut buf, &msg).await.unwrap();
797
798        let mut cursor = Cursor::new(buf);
799        let result: Option<ChildMessage> =
800            read_message_with_limit(&mut cursor, 1024).await.unwrap();
801        assert!(result.is_some());
802    }
803
804    #[tokio::test]
805    async fn worker_config_ipc_limit_serde_default() {
806        // Deserializing JSON without max_ipc_message_size should use the default
807        let json = r#"{
808            "timeout_ms": 5000,
809            "max_heap_size": 67108864,
810            "max_tool_calls": 50,
811            "max_tool_call_args_size": 1048576,
812            "max_output_size": 1048576,
813            "max_code_size": 65536
814        }"#;
815        let config: WorkerConfig = serde_json::from_str(json).unwrap();
816        assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
817    }
818
819    // --- IPC-01: ResourceReadRequest round-trip ---
820    #[tokio::test]
821    async fn ipc_01_resource_read_request_roundtrip() {
822        let msg = ChildMessage::ResourceReadRequest {
823            request_id: 10,
824            server: "postgres".into(),
825            uri: "file:///logs/app.log".into(),
826        };
827
828        let mut buf = Vec::new();
829        write_message(&mut buf, &msg).await.unwrap();
830
831        let mut cursor = Cursor::new(buf);
832        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
833
834        match decoded {
835            ChildMessage::ResourceReadRequest {
836                request_id,
837                server,
838                uri,
839            } => {
840                assert_eq!(request_id, 10);
841                assert_eq!(server, "postgres");
842                assert_eq!(uri, "file:///logs/app.log");
843            }
844            other => panic!("expected ResourceReadRequest, got: {:?}", other),
845        }
846    }
847
848    // --- IPC-02: ResourceReadResult (success) round-trip ---
849    #[tokio::test]
850    async fn ipc_02_resource_read_result_success_roundtrip() {
851        let msg = ParentMessage::ResourceReadResult {
852            request_id: 11,
853            result: Ok(serde_json::json!({"content": "log data here"})),
854        };
855
856        let mut buf = Vec::new();
857        write_message(&mut buf, &msg).await.unwrap();
858
859        let mut cursor = Cursor::new(buf);
860        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
861
862        match decoded {
863            ParentMessage::ResourceReadResult { request_id, result } => {
864                assert_eq!(request_id, 11);
865                let val = result.unwrap();
866                assert_eq!(val["content"], "log data here");
867            }
868            other => panic!("expected ResourceReadResult, got: {:?}", other),
869        }
870    }
871
872    // --- IPC-03: ResourceReadResult (error) round-trip ---
873    #[tokio::test]
874    async fn ipc_03_resource_read_result_error_roundtrip() {
875        let msg = ParentMessage::ResourceReadResult {
876            request_id: 12,
877            result: Err(IpcDispatchError::from_string("resource not found".into())),
878        };
879
880        let mut buf = Vec::new();
881        write_message(&mut buf, &msg).await.unwrap();
882
883        let mut cursor = Cursor::new(buf);
884        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
885
886        match decoded {
887            ParentMessage::ResourceReadResult { request_id, result } => {
888                assert_eq!(request_id, 12);
889                let err = result.unwrap_err();
890                assert_eq!(err.message, "resource not found");
891                assert_eq!(err.code, "INTERNAL");
892            }
893            other => panic!("expected ResourceReadResult, got: {:?}", other),
894        }
895    }
896
897    // --- IPC-04: StashPut round-trip ---
898    #[tokio::test]
899    async fn ipc_04_stash_put_roundtrip() {
900        let msg = ChildMessage::StashPut {
901            request_id: 20,
902            key: "my-key".into(),
903            value: serde_json::json!({"data": 42}),
904            ttl_secs: Some(60),
905            group: None,
906        };
907
908        let mut buf = Vec::new();
909        write_message(&mut buf, &msg).await.unwrap();
910
911        let mut cursor = Cursor::new(buf);
912        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
913
914        match decoded {
915            ChildMessage::StashPut {
916                request_id,
917                key,
918                value,
919                ttl_secs,
920                group,
921            } => {
922                assert_eq!(request_id, 20);
923                assert_eq!(key, "my-key");
924                assert_eq!(value["data"], 42);
925                assert_eq!(ttl_secs, Some(60));
926                assert_eq!(group, None);
927            }
928            other => panic!("expected StashPut, got: {:?}", other),
929        }
930    }
931
932    // --- IPC-05: StashGet round-trip ---
933    #[tokio::test]
934    async fn ipc_05_stash_get_roundtrip() {
935        let msg = ChildMessage::StashGet {
936            request_id: 21,
937            key: "lookup-key".into(),
938            group: None,
939        };
940
941        let mut buf = Vec::new();
942        write_message(&mut buf, &msg).await.unwrap();
943
944        let mut cursor = Cursor::new(buf);
945        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
946
947        match decoded {
948            ChildMessage::StashGet {
949                request_id,
950                key,
951                group,
952            } => {
953                assert_eq!(request_id, 21);
954                assert_eq!(key, "lookup-key");
955                assert_eq!(group, None);
956            }
957            other => panic!("expected StashGet, got: {:?}", other),
958        }
959    }
960
961    // --- IPC-06: StashDelete round-trip ---
962    #[tokio::test]
963    async fn ipc_06_stash_delete_roundtrip() {
964        let msg = ChildMessage::StashDelete {
965            request_id: 22,
966            key: "delete-me".into(),
967            group: None,
968        };
969
970        let mut buf = Vec::new();
971        write_message(&mut buf, &msg).await.unwrap();
972
973        let mut cursor = Cursor::new(buf);
974        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
975
976        match decoded {
977            ChildMessage::StashDelete {
978                request_id,
979                key,
980                group,
981            } => {
982                assert_eq!(request_id, 22);
983                assert_eq!(key, "delete-me");
984                assert_eq!(group, None);
985            }
986            other => panic!("expected StashDelete, got: {:?}", other),
987        }
988    }
989
990    // --- IPC-07: StashKeys round-trip ---
991    #[tokio::test]
992    async fn ipc_07_stash_keys_roundtrip() {
993        let msg = ChildMessage::StashKeys {
994            request_id: 23,
995            group: None,
996        };
997
998        let mut buf = Vec::new();
999        write_message(&mut buf, &msg).await.unwrap();
1000
1001        let mut cursor = Cursor::new(buf);
1002        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1003
1004        match decoded {
1005            ChildMessage::StashKeys { request_id, group } => {
1006                assert_eq!(request_id, 23);
1007                assert_eq!(group, None);
1008            }
1009            other => panic!("expected StashKeys, got: {:?}", other),
1010        }
1011    }
1012
1013    // --- IPC-08: StashResult round-trip ---
1014    #[tokio::test]
1015    async fn ipc_08_stash_result_roundtrip() {
1016        let msg = ParentMessage::StashResult {
1017            request_id: 24,
1018            result: Ok(serde_json::json!({"ok": true})),
1019        };
1020
1021        let mut buf = Vec::new();
1022        write_message(&mut buf, &msg).await.unwrap();
1023
1024        let mut cursor = Cursor::new(buf);
1025        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1026
1027        match decoded {
1028            ParentMessage::StashResult { request_id, result } => {
1029                assert_eq!(request_id, 24);
1030                assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
1031            }
1032            other => panic!("expected StashResult, got: {:?}", other),
1033        }
1034    }
1035
1036    // --- IPC-09: Mixed message interleaving (tool + resource + stash in single stream) ---
1037    #[tokio::test]
1038    async fn ipc_09_mixed_message_interleaving() {
1039        let msg1 = ChildMessage::ToolCallRequest {
1040            request_id: 1,
1041            server: "s".into(),
1042            tool: "t".into(),
1043            args: serde_json::json!({}),
1044        };
1045        let msg2 = ChildMessage::ResourceReadRequest {
1046            request_id: 2,
1047            server: "pg".into(),
1048            uri: "file:///data".into(),
1049        };
1050        let msg3 = ChildMessage::StashPut {
1051            request_id: 3,
1052            key: "k".into(),
1053            value: serde_json::json!("v"),
1054            ttl_secs: None,
1055            group: None,
1056        };
1057        let msg4 = ChildMessage::StashGet {
1058            request_id: 4,
1059            key: "k".into(),
1060            group: None,
1061        };
1062        let msg5 = ChildMessage::ExecutionComplete {
1063            result: Ok(serde_json::json!("done")),
1064            error_kind: None,
1065            timeout_ms: None,
1066        };
1067
1068        let mut buf = Vec::new();
1069        write_message(&mut buf, &msg1).await.unwrap();
1070        write_message(&mut buf, &msg2).await.unwrap();
1071        write_message(&mut buf, &msg3).await.unwrap();
1072        write_message(&mut buf, &msg4).await.unwrap();
1073        write_message(&mut buf, &msg5).await.unwrap();
1074
1075        let mut cursor = Cursor::new(buf);
1076        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1077        let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1078        let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1079        let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1080        let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1081
1082        assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
1083        assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
1084        assert!(matches!(d3, ChildMessage::StashPut { .. }));
1085        assert!(matches!(d4, ChildMessage::StashGet { .. }));
1086        assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
1087
1088        // EOF after all messages
1089        let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
1090        assert!(d6.is_none());
1091    }
1092
1093    // --- IPC-P01: Reset round-trip ---
1094    #[tokio::test]
1095    async fn ipc_p01_reset_roundtrip() {
1096        let msg = ParentMessage::Reset {
1097            config: WorkerConfig {
1098                timeout_ms: 3000,
1099                max_heap_size: 32 * 1024 * 1024,
1100                max_tool_calls: 25,
1101                max_tool_call_args_size: 512 * 1024,
1102                max_output_size: 512 * 1024,
1103                max_code_size: 32 * 1024,
1104                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1105                max_resource_size: 32 * 1024 * 1024,
1106                max_parallel: 4,
1107                known_tools: None,
1108                known_servers: None,
1109            },
1110        };
1111
1112        let mut buf = Vec::new();
1113        write_message(&mut buf, &msg).await.unwrap();
1114
1115        let mut cursor = Cursor::new(buf);
1116        let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1117
1118        match decoded {
1119            ParentMessage::Reset { config } => {
1120                assert_eq!(config.timeout_ms, 3000);
1121                assert_eq!(config.max_tool_calls, 25);
1122            }
1123            other => panic!("expected Reset, got: {:?}", other),
1124        }
1125    }
1126
1127    // --- IPC-P02: ResetComplete round-trip ---
1128    #[tokio::test]
1129    async fn ipc_p02_reset_complete_roundtrip() {
1130        let msg = ChildMessage::ResetComplete;
1131
1132        let mut buf = Vec::new();
1133        write_message(&mut buf, &msg).await.unwrap();
1134
1135        let mut cursor = Cursor::new(buf);
1136        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1137
1138        assert!(matches!(decoded, ChildMessage::ResetComplete));
1139    }
1140
1141    // --- IPC-P03: Reset + Execute interleaving in single stream ---
1142    #[tokio::test]
1143    async fn ipc_p03_reset_execute_interleaving() {
1144        let reset = ParentMessage::Reset {
1145            config: WorkerConfig {
1146                timeout_ms: 5000,
1147                max_heap_size: 64 * 1024 * 1024,
1148                max_tool_calls: 50,
1149                max_tool_call_args_size: 1024 * 1024,
1150                max_output_size: 1024 * 1024,
1151                max_code_size: 64 * 1024,
1152                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1153                max_resource_size: 64 * 1024 * 1024,
1154                max_parallel: 8,
1155                known_tools: None,
1156                known_servers: None,
1157            },
1158        };
1159        let execute = ParentMessage::Execute {
1160            code: "async () => 42".into(),
1161            manifest: None,
1162            config: WorkerConfig {
1163                timeout_ms: 5000,
1164                max_heap_size: 64 * 1024 * 1024,
1165                max_tool_calls: 50,
1166                max_tool_call_args_size: 1024 * 1024,
1167                max_output_size: 1024 * 1024,
1168                max_code_size: 64 * 1024,
1169                max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
1170                max_resource_size: 64 * 1024 * 1024,
1171                max_parallel: 8,
1172                known_tools: None,
1173                known_servers: None,
1174            },
1175        };
1176
1177        let mut buf = Vec::new();
1178        write_message(&mut buf, &reset).await.unwrap();
1179        write_message(&mut buf, &execute).await.unwrap();
1180
1181        let mut cursor = Cursor::new(buf);
1182        let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1183        let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
1184
1185        assert!(matches!(d1, ParentMessage::Reset { .. }));
1186        assert!(matches!(d2, ParentMessage::Execute { .. }));
1187    }
1188
1189    // --- IPC-10: Oversized stash message rejected by read_message_with_limit ---
1190    #[tokio::test]
1191    async fn ipc_10_oversized_stash_message_rejected() {
1192        let msg = ChildMessage::StashPut {
1193            request_id: 100,
1194            key: "k".into(),
1195            value: serde_json::json!("x".repeat(2048)),
1196            ttl_secs: Some(60),
1197            group: None,
1198        };
1199        let mut buf = Vec::new();
1200        write_message(&mut buf, &msg).await.unwrap();
1201
1202        // Set limit smaller than the message payload
1203        let mut cursor = Cursor::new(buf);
1204        let result: Result<Option<ChildMessage>, _> =
1205            read_message_with_limit(&mut cursor, 64).await;
1206        assert!(result.is_err());
1207        let err_msg = result.unwrap_err().to_string();
1208        assert!(
1209            err_msg.contains("too large"),
1210            "error should mention 'too large': {err_msg}"
1211        );
1212    }
1213
1214    // --- IPC-O01: ErrorKind timeout round-trip ---
1215    #[tokio::test]
1216    async fn ipc_o01_error_kind_timeout_roundtrip() {
1217        let msg = ChildMessage::ExecutionComplete {
1218            result: Err("execution timed out after 500ms".into()),
1219            error_kind: Some(ErrorKind::Timeout),
1220            timeout_ms: Some(500),
1221        };
1222
1223        let mut buf = Vec::new();
1224        write_message(&mut buf, &msg).await.unwrap();
1225
1226        let mut cursor = Cursor::new(buf);
1227        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1228
1229        match decoded {
1230            ChildMessage::ExecutionComplete {
1231                result,
1232                error_kind,
1233                timeout_ms,
1234            } => {
1235                assert!(result.is_err());
1236                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1237                assert_eq!(timeout_ms, Some(500));
1238            }
1239            other => panic!("expected ExecutionComplete, got: {:?}", other),
1240        }
1241    }
1242
1243    // --- IPC-O02: ErrorKind heap_limit round-trip ---
1244    #[tokio::test]
1245    async fn ipc_o02_error_kind_heap_limit_roundtrip() {
1246        let msg = ChildMessage::ExecutionComplete {
1247            result: Err("V8 heap limit exceeded".into()),
1248            error_kind: Some(ErrorKind::HeapLimit),
1249            timeout_ms: None,
1250        };
1251
1252        let mut buf = Vec::new();
1253        write_message(&mut buf, &msg).await.unwrap();
1254
1255        let mut cursor = Cursor::new(buf);
1256        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1257
1258        match decoded {
1259            ChildMessage::ExecutionComplete {
1260                result, error_kind, ..
1261            } => {
1262                assert!(result.is_err());
1263                assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
1264            }
1265            other => panic!("expected ExecutionComplete, got: {:?}", other),
1266        }
1267    }
1268
1269    // --- IPC-O03: ErrorKind absent defaults to None (backward compatibility) ---
1270    #[tokio::test]
1271    async fn ipc_o03_error_kind_backward_compat() {
1272        // Simulate a message from an older worker that doesn't include error_kind.
1273        // The JSON doesn't have the error_kind field at all.
1274        let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
1275        let mut buf = Vec::new();
1276        let payload = json.as_bytes();
1277        let len = payload.len() as u32;
1278        buf.extend_from_slice(&len.to_be_bytes());
1279        buf.extend_from_slice(payload);
1280
1281        let mut cursor = Cursor::new(buf);
1282        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1283
1284        match decoded {
1285            ChildMessage::ExecutionComplete {
1286                result,
1287                error_kind,
1288                timeout_ms,
1289            } => {
1290                assert!(result.is_err());
1291                assert_eq!(
1292                    error_kind, None,
1293                    "missing error_kind should default to None"
1294                );
1295                assert_eq!(
1296                    timeout_ms, None,
1297                    "missing timeout_ms should default to None"
1298                );
1299            }
1300            other => panic!("expected ExecutionComplete, got: {:?}", other),
1301        }
1302    }
1303
1304    // --- IPC-O04: ErrorKind js_error round-trip ---
1305    #[tokio::test]
1306    async fn ipc_o04_error_kind_js_error_roundtrip() {
1307        let msg = ChildMessage::ExecutionComplete {
1308            result: Err("ReferenceError: x is not defined".into()),
1309            error_kind: Some(ErrorKind::JsError),
1310            timeout_ms: None,
1311        };
1312
1313        let mut buf = Vec::new();
1314        write_message(&mut buf, &msg).await.unwrap();
1315
1316        let mut cursor = Cursor::new(buf);
1317        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1318
1319        match decoded {
1320            ChildMessage::ExecutionComplete {
1321                result, error_kind, ..
1322            } => {
1323                assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
1324                assert_eq!(error_kind, Some(ErrorKind::JsError));
1325            }
1326            other => panic!("expected ExecutionComplete, got: {:?}", other),
1327        }
1328    }
1329
1330    // --- IPC-O05: Success result has no error_kind in serialized JSON ---
1331    #[tokio::test]
1332    async fn ipc_o05_success_omits_error_kind() {
1333        let msg = ChildMessage::ExecutionComplete {
1334            result: Ok(serde_json::json!(42)),
1335            error_kind: None,
1336            timeout_ms: None,
1337        };
1338
1339        let json = serde_json::to_string(&msg).unwrap();
1340        // error_kind: None should be skipped thanks to skip_serializing_if
1341        assert!(
1342            !json.contains("error_kind"),
1343            "success messages should not contain error_kind field: {json}"
1344        );
1345        assert!(
1346            !json.contains("timeout_ms"),
1347            "success messages should not contain timeout_ms field: {json}"
1348        );
1349    }
1350
1351    // --- H1: Stash Group Isolation Tests ---
1352
1353    #[tokio::test]
1354    async fn ipc_h1_01_stash_put_with_group_roundtrip() {
1355        let msg = ChildMessage::StashPut {
1356            request_id: 50,
1357            key: "grouped-key".into(),
1358            value: serde_json::json!({"data": "secret"}),
1359            ttl_secs: Some(120),
1360            group: Some("analytics".into()),
1361        };
1362
1363        let mut buf = Vec::new();
1364        write_message(&mut buf, &msg).await.unwrap();
1365
1366        let mut cursor = Cursor::new(buf);
1367        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1368
1369        match decoded {
1370            ChildMessage::StashPut {
1371                request_id,
1372                key,
1373                group,
1374                ..
1375            } => {
1376                assert_eq!(request_id, 50);
1377                assert_eq!(key, "grouped-key");
1378                assert_eq!(group, Some("analytics".into()));
1379            }
1380            other => panic!("expected StashPut, got: {:?}", other),
1381        }
1382    }
1383
1384    #[tokio::test]
1385    async fn ipc_h1_02_stash_get_with_group_roundtrip() {
1386        let msg = ChildMessage::StashGet {
1387            request_id: 51,
1388            key: "grouped-key".into(),
1389            group: Some("analytics".into()),
1390        };
1391
1392        let mut buf = Vec::new();
1393        write_message(&mut buf, &msg).await.unwrap();
1394
1395        let mut cursor = Cursor::new(buf);
1396        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1397
1398        match decoded {
1399            ChildMessage::StashGet {
1400                request_id,
1401                key,
1402                group,
1403            } => {
1404                assert_eq!(request_id, 51);
1405                assert_eq!(key, "grouped-key");
1406                assert_eq!(group, Some("analytics".into()));
1407            }
1408            other => panic!("expected StashGet, got: {:?}", other),
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
1414        let msg = ChildMessage::StashDelete {
1415            request_id: 52,
1416            key: "grouped-key".into(),
1417            group: Some("analytics".into()),
1418        };
1419
1420        let mut buf = Vec::new();
1421        write_message(&mut buf, &msg).await.unwrap();
1422
1423        let mut cursor = Cursor::new(buf);
1424        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1425
1426        match decoded {
1427            ChildMessage::StashDelete {
1428                request_id,
1429                key,
1430                group,
1431            } => {
1432                assert_eq!(request_id, 52);
1433                assert_eq!(key, "grouped-key");
1434                assert_eq!(group, Some("analytics".into()));
1435            }
1436            other => panic!("expected StashDelete, got: {:?}", other),
1437        }
1438    }
1439
1440    #[tokio::test]
1441    async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
1442        let msg = ChildMessage::StashKeys {
1443            request_id: 53,
1444            group: Some("analytics".into()),
1445        };
1446
1447        let mut buf = Vec::new();
1448        write_message(&mut buf, &msg).await.unwrap();
1449
1450        let mut cursor = Cursor::new(buf);
1451        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1452
1453        match decoded {
1454            ChildMessage::StashKeys { request_id, group } => {
1455                assert_eq!(request_id, 53);
1456                assert_eq!(group, Some("analytics".into()));
1457            }
1458            other => panic!("expected StashKeys, got: {:?}", other),
1459        }
1460    }
1461
1462    #[tokio::test]
1463    async fn ipc_h1_05_stash_put_without_group_backward_compat() {
1464        // group: None → field absent in JSON
1465        let msg = ChildMessage::StashPut {
1466            request_id: 54,
1467            key: "no-group-key".into(),
1468            value: serde_json::json!("val"),
1469            ttl_secs: None,
1470            group: None,
1471        };
1472
1473        let json = serde_json::to_string(&msg).unwrap();
1474        assert!(
1475            !json.contains("\"group\""),
1476            "group:None should be absent in serialized JSON: {json}"
1477        );
1478
1479        // Still deserializes correctly
1480        let mut buf = Vec::new();
1481        write_message(&mut buf, &msg).await.unwrap();
1482        let mut cursor = Cursor::new(buf);
1483        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1484        match decoded {
1485            ChildMessage::StashPut { group, .. } => {
1486                assert_eq!(group, None);
1487            }
1488            other => panic!("expected StashPut, got: {:?}", other),
1489        }
1490    }
1491
1492    #[tokio::test]
1493    async fn ipc_h1_06_old_message_without_group_field_deserializes() {
1494        // Simulate a v0.3.0 worker message that lacks the group field entirely
1495        let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
1496        let mut buf = Vec::new();
1497        let payload = json.as_bytes();
1498        let len = payload.len() as u32;
1499        buf.extend_from_slice(&len.to_be_bytes());
1500        buf.extend_from_slice(payload);
1501
1502        let mut cursor = Cursor::new(buf);
1503        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1504
1505        match decoded {
1506            ChildMessage::StashPut {
1507                request_id,
1508                key,
1509                group,
1510                ..
1511            } => {
1512                assert_eq!(request_id, 60);
1513                assert_eq!(key, "old-key");
1514                assert_eq!(
1515                    group, None,
1516                    "missing group field from v0.3.0 worker should deserialize as None"
1517                );
1518            }
1519            other => panic!("expected StashPut, got: {:?}", other),
1520        }
1521
1522        // Also test StashGet, StashDelete, StashKeys backward compat
1523        let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
1524        let mut buf = Vec::new();
1525        let payload = json_get.as_bytes();
1526        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1527        buf.extend_from_slice(payload);
1528        let mut cursor = Cursor::new(buf);
1529        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1530        match decoded {
1531            ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
1532            other => panic!("expected StashGet, got: {:?}", other),
1533        }
1534
1535        let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
1536        let mut buf = Vec::new();
1537        let payload = json_del.as_bytes();
1538        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1539        buf.extend_from_slice(payload);
1540        let mut cursor = Cursor::new(buf);
1541        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1542        match decoded {
1543            ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
1544            other => panic!("expected StashDelete, got: {:?}", other),
1545        }
1546
1547        let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
1548        let mut buf = Vec::new();
1549        let payload = json_keys.as_bytes();
1550        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1551        buf.extend_from_slice(payload);
1552        let mut cursor = Cursor::new(buf);
1553        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1554        match decoded {
1555            ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
1556            other => panic!("expected StashKeys, got: {:?}", other),
1557        }
1558    }
1559
1560    // --- Phase 2: Structured Timeout + RawValue Tests ---
1561
1562    #[tokio::test]
1563    async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
1564        let msg = ChildMessage::ExecutionComplete {
1565            result: Err("execution timed out after 5000ms".into()),
1566            error_kind: Some(ErrorKind::Timeout),
1567            timeout_ms: Some(5000),
1568        };
1569
1570        let mut buf = Vec::new();
1571        write_message(&mut buf, &msg).await.unwrap();
1572
1573        let mut cursor = Cursor::new(buf);
1574        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1575
1576        match decoded {
1577            ChildMessage::ExecutionComplete {
1578                result,
1579                error_kind,
1580                timeout_ms,
1581            } => {
1582                assert!(result.is_err());
1583                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1584                assert_eq!(timeout_ms, Some(5000));
1585            }
1586            other => panic!("expected ExecutionComplete, got: {:?}", other),
1587        }
1588    }
1589
1590    #[tokio::test]
1591    async fn ipc_t02_timeout_ms_absent_backward_compat() {
1592        // Simulate an old v0.3.0 worker that doesn't include timeout_ms
1593        let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
1594        let mut buf = Vec::new();
1595        let payload = json.as_bytes();
1596        buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1597        buf.extend_from_slice(payload);
1598
1599        let mut cursor = Cursor::new(buf);
1600        let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1601
1602        match decoded {
1603            ChildMessage::ExecutionComplete {
1604                error_kind,
1605                timeout_ms,
1606                ..
1607            } => {
1608                assert_eq!(error_kind, Some(ErrorKind::Timeout));
1609                assert_eq!(
1610                    timeout_ms, None,
1611                    "missing timeout_ms should default to None"
1612                );
1613            }
1614            other => panic!("expected ExecutionComplete, got: {:?}", other),
1615        }
1616    }
1617
1618    #[tokio::test]
1619    async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
1620        let msg = ChildMessage::ExecutionComplete {
1621            result: Err("some error".into()),
1622            error_kind: Some(ErrorKind::JsError),
1623            timeout_ms: None,
1624        };
1625
1626        let json = serde_json::to_string(&msg).unwrap();
1627        assert!(
1628            !json.contains("timeout_ms"),
1629            "timeout_ms:None should be omitted: {json}"
1630        );
1631    }
1632
1633    #[tokio::test]
1634    async fn ipc_t04_timeout_ms_present_when_some() {
1635        let msg = ChildMessage::ExecutionComplete {
1636            result: Err("timed out".into()),
1637            error_kind: Some(ErrorKind::Timeout),
1638            timeout_ms: Some(10000),
1639        };
1640
1641        let json = serde_json::to_string(&msg).unwrap();
1642        assert!(
1643            json.contains("\"timeout_ms\":10000"),
1644            "timeout_ms should be present: {json}"
1645        );
1646    }
1647
1648    // --- RawValue passthrough tests ---
1649
1650    #[tokio::test]
1651    async fn ipc_rv01_write_raw_message_roundtrip() {
1652        let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
1653
1654        let mut buf = Vec::new();
1655        write_raw_message(&mut buf, payload).await.unwrap();
1656
1657        let mut cursor = Cursor::new(buf);
1658        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1659            .await
1660            .unwrap()
1661            .unwrap();
1662
1663        assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
1664    }
1665
1666    #[tokio::test]
1667    async fn ipc_rv02_read_raw_message_preserves_bytes() {
1668        // Write a regular message, then read it raw
1669        let msg = ChildMessage::Log {
1670            message: "test raw".into(),
1671        };
1672        let mut buf = Vec::new();
1673        write_message(&mut buf, &msg).await.unwrap();
1674
1675        let mut cursor = Cursor::new(buf);
1676        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1677            .await
1678            .unwrap()
1679            .unwrap();
1680
1681        // The raw value should be valid JSON
1682        let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
1683        assert!(matches!(parsed, ChildMessage::Log { .. }));
1684    }
1685
1686    #[tokio::test]
1687    async fn ipc_rv03_raw_message_size_limit_enforced() {
1688        let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
1689        let mut buf = Vec::new();
1690        write_raw_message(&mut buf, large_payload.as_bytes())
1691            .await
1692            .unwrap();
1693
1694        let mut cursor = Cursor::new(buf);
1695        let result = read_raw_message(&mut cursor, 64).await;
1696        assert!(result.is_err());
1697        let err = result.unwrap_err().to_string();
1698        assert!(err.contains("too large"), "error: {err}");
1699    }
1700
1701    #[tokio::test]
1702    async fn ipc_rv04_large_payload_stays_raw() {
1703        // 1MB payload — read as raw without full Value parse
1704        let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
1705        let mut buf = Vec::new();
1706        write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
1707
1708        let mut cursor = Cursor::new(buf);
1709        let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
1710            .await
1711            .unwrap()
1712            .unwrap();
1713
1714        // Should preserve the raw JSON string without parsing into Value
1715        assert!(raw.get().len() > 1_000_000);
1716        // Can still be parsed if needed
1717        let val: Value = serde_json::from_str(raw.get()).unwrap();
1718        assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
1719    }
1720
1721    #[tokio::test]
1722    async fn ipc_rv05_rawvalue_backward_compat_with_value() {
1723        // Write as regular message (Value), read as raw
1724        let msg = ParentMessage::ToolCallResult {
1725            request_id: 99,
1726            result: Ok(serde_json::json!({"status": "ok", "count": 42})),
1727        };
1728        let mut buf = Vec::new();
1729        write_message(&mut buf, &msg).await.unwrap();
1730
1731        let mut cursor = Cursor::new(buf.clone());
1732        let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1733            .await
1734            .unwrap()
1735            .unwrap();
1736
1737        // Parse the raw back as ParentMessage
1738        let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
1739        match parsed {
1740            ParentMessage::ToolCallResult { request_id, result } => {
1741                assert_eq!(request_id, 99);
1742                assert!(result.is_ok());
1743            }
1744            other => panic!("expected ToolCallResult, got: {:?}", other),
1745        }
1746    }
1747
1748    #[tokio::test]
1749    async fn ipc_rv06_raw_eof_returns_none() {
1750        let mut cursor = Cursor::new(Vec::<u8>::new());
1751        let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1752            .await
1753            .unwrap();
1754        assert!(result.is_none());
1755    }
1756
1757    #[tokio::test]
1758    async fn ipc_rv07_mixed_raw_and_value_messages() {
1759        let mut buf = Vec::new();
1760
1761        // Write a typed message
1762        let msg1 = ChildMessage::Log {
1763            message: "first".into(),
1764        };
1765        write_message(&mut buf, &msg1).await.unwrap();
1766
1767        // Write a raw message
1768        let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
1769        write_raw_message(&mut buf, raw_payload).await.unwrap();
1770
1771        // Read both: first typed, second raw
1772        let mut cursor = Cursor::new(buf);
1773        let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
1774        assert!(matches!(d1, ChildMessage::Log { .. }));
1775
1776        let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
1777            .await
1778            .unwrap()
1779            .unwrap();
1780        let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
1781        assert!(matches!(parsed, ChildMessage::Log { .. }));
1782    }
1783}