use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::Value;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum ErrorKind {
Timeout,
HeapLimit,
JsError,
Execution,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct IpcDispatchError {
pub code: String,
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub server: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_ms: Option<u64>,
}
impl IpcDispatchError {
pub fn from_string(msg: String) -> Self {
Self {
code: "INTERNAL".to_string(),
message: msg,
server: None,
tool: None,
timeout_ms: None,
}
}
pub fn to_dispatch_error(self) -> forge_error::DispatchError {
match self.code.as_str() {
"SERVER_NOT_FOUND" => {
forge_error::DispatchError::ServerNotFound(self.server.unwrap_or(self.message))
}
"TOOL_NOT_FOUND" => forge_error::DispatchError::ToolNotFound {
server: self.server.unwrap_or_default(),
tool: self.tool.unwrap_or_default(),
},
"TIMEOUT" => forge_error::DispatchError::Timeout {
server: self.server.unwrap_or_default(),
timeout_ms: self.timeout_ms.unwrap_or(0),
},
"CIRCUIT_OPEN" => {
forge_error::DispatchError::CircuitOpen(self.server.unwrap_or(self.message))
}
"GROUP_POLICY_DENIED" => forge_error::DispatchError::GroupPolicyDenied {
reason: self.message,
},
"UPSTREAM_ERROR" => forge_error::DispatchError::Upstream {
server: self.server.unwrap_or_default(),
message: self.message,
},
"TRANSPORT_DEAD" => forge_error::DispatchError::TransportDead {
server: self.server.unwrap_or_default(),
reason: self.message,
},
"TOOL_ERROR" => forge_error::DispatchError::ToolError {
server: self.server.unwrap_or_default(),
tool: self.tool.unwrap_or_default(),
message: self.message,
},
"RATE_LIMIT" => forge_error::DispatchError::RateLimit(self.message),
_ => forge_error::DispatchError::Internal(anyhow::anyhow!("{}", self.message)),
}
}
}
impl From<&forge_error::DispatchError> for IpcDispatchError {
fn from(e: &forge_error::DispatchError) -> Self {
let (server, tool, timeout_ms) = match e {
forge_error::DispatchError::ServerNotFound(s) => (Some(s.clone()), None, None),
forge_error::DispatchError::ToolNotFound { server, tool } => {
(Some(server.clone()), Some(tool.clone()), None)
}
forge_error::DispatchError::Timeout {
server, timeout_ms, ..
} => (Some(server.clone()), None, Some(*timeout_ms)),
forge_error::DispatchError::CircuitOpen(s) => (Some(s.clone()), None, None),
forge_error::DispatchError::Upstream { server, .. } => {
(Some(server.clone()), None, None)
}
forge_error::DispatchError::TransportDead { server, .. } => {
(Some(server.clone()), None, None)
}
forge_error::DispatchError::ToolError { server, tool, .. } => {
(Some(server.clone()), Some(tool.clone()), None)
}
_ => (None, None, None),
};
Self {
code: e.code().to_string(),
message: e.to_string(),
server,
tool,
timeout_ms,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum ParentMessage {
Execute {
code: String,
manifest: Option<Value>,
config: WorkerConfig,
},
ToolCallResult {
request_id: u64,
result: Result<Value, IpcDispatchError>,
},
ResourceReadResult {
request_id: u64,
result: Result<Value, IpcDispatchError>,
},
Reset {
config: WorkerConfig,
},
StashResult {
request_id: u64,
result: Result<Value, IpcDispatchError>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum ChildMessage {
ToolCallRequest {
request_id: u64,
server: String,
tool: String,
args: Value,
},
ResourceReadRequest {
request_id: u64,
server: String,
uri: String,
},
StashPut {
request_id: u64,
key: String,
value: Value,
ttl_secs: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
group: Option<String>,
},
StashGet {
request_id: u64,
key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
group: Option<String>,
},
StashDelete {
request_id: u64,
key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
group: Option<String>,
},
StashKeys {
request_id: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
group: Option<String>,
},
ResetComplete,
ExecutionComplete {
result: Result<Value, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
error_kind: Option<ErrorKind>,
#[serde(default, skip_serializing_if = "Option::is_none")]
timeout_ms: Option<u64>,
},
Log {
message: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
pub timeout_ms: u64,
pub max_heap_size: usize,
pub max_tool_calls: usize,
pub max_tool_call_args_size: usize,
pub max_output_size: usize,
pub max_code_size: usize,
#[serde(default = "default_max_ipc_message_size")]
pub max_ipc_message_size: usize,
#[serde(default = "default_max_resource_size")]
pub max_resource_size: usize,
#[serde(default = "default_max_parallel")]
pub max_parallel: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub known_tools: Option<Vec<(String, String)>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub known_servers: Option<std::collections::HashSet<String>>,
}
fn default_max_ipc_message_size() -> usize {
DEFAULT_MAX_IPC_MESSAGE_SIZE
}
fn default_max_resource_size() -> usize {
64 * 1024 * 1024 }
fn default_max_parallel() -> usize {
8
}
impl From<&crate::SandboxConfig> for WorkerConfig {
fn from(config: &crate::SandboxConfig) -> Self {
Self {
timeout_ms: config.timeout.as_millis() as u64,
max_heap_size: config.max_heap_size,
max_tool_calls: config.max_tool_calls,
max_tool_call_args_size: config.max_tool_call_args_size,
max_output_size: config.max_output_size,
max_code_size: config.max_code_size,
max_ipc_message_size: config.max_ipc_message_size,
max_resource_size: config.max_resource_size,
max_parallel: config.max_parallel,
known_tools: None,
known_servers: None,
}
}
}
impl WorkerConfig {
pub fn to_sandbox_config(&self) -> crate::SandboxConfig {
crate::SandboxConfig {
timeout: Duration::from_millis(self.timeout_ms),
max_code_size: self.max_code_size,
max_output_size: self.max_output_size,
max_heap_size: self.max_heap_size,
max_concurrent: 1, max_tool_calls: self.max_tool_calls,
max_tool_call_args_size: self.max_tool_call_args_size,
execution_mode: crate::executor::ExecutionMode::InProcess, max_resource_size: self.max_resource_size,
max_parallel: self.max_parallel,
max_ipc_message_size: self.max_ipc_message_size,
}
}
}
pub async fn write_message<T: Serialize, W: AsyncWrite + Unpin>(
writer: &mut W,
msg: &T,
) -> Result<(), std::io::Error> {
let payload = serde_json::to_vec(msg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let len = u32::try_from(payload.len()).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"IPC payload too large: {} bytes (max {} bytes)",
payload.len(),
u32::MAX
),
)
})?;
writer.write_all(&len.to_be_bytes()).await?;
writer.write_all(&payload).await?;
writer.flush().await?;
Ok(())
}
pub async fn write_raw_message<W: AsyncWrite + Unpin>(
writer: &mut W,
payload: &[u8],
) -> Result<(), std::io::Error> {
let len = u32::try_from(payload.len()).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"raw IPC payload too large: {} bytes (max {} bytes)",
payload.len(),
u32::MAX
),
)
})?;
writer.write_all(&len.to_be_bytes()).await?;
writer.write_all(payload).await?;
writer.flush().await?;
Ok(())
}
pub async fn read_raw_message<R: AsyncRead + Unpin>(
reader: &mut R,
max_size: usize,
) -> Result<Option<Box<RawValue>>, std::io::Error> {
let mut len_buf = [0u8; 4];
match reader.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > max_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"raw IPC message too large: {} bytes (limit: {} bytes)",
len, max_size
),
));
}
let mut payload = vec![0u8; len];
reader.read_exact(&mut payload).await?;
let raw: Box<RawValue> = serde_json::from_slice(&payload)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(raw))
}
pub const DEFAULT_MAX_IPC_MESSAGE_SIZE: usize = 8 * 1024 * 1024;
pub async fn read_message<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
reader: &mut R,
) -> Result<Option<T>, std::io::Error> {
read_message_with_limit(reader, DEFAULT_MAX_IPC_MESSAGE_SIZE).await
}
pub async fn read_message_with_limit<T: for<'de> Deserialize<'de>, R: AsyncRead + Unpin>(
reader: &mut R,
max_size: usize,
) -> Result<Option<T>, std::io::Error> {
let mut len_buf = [0u8; 4];
match reader.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
}
let len = u32::from_be_bytes(len_buf) as usize;
if len > max_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"IPC message too large: {} bytes (limit: {} bytes)",
len, max_size
),
));
}
let mut payload = vec![0u8; len];
reader.read_exact(&mut payload).await?;
let msg: T = serde_json::from_slice(&payload)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(Some(msg))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[tokio::test]
async fn roundtrip_parent_execute_message() {
let msg = ParentMessage::Execute {
code: "async () => { return 42; }".into(),
manifest: Some(serde_json::json!({"servers": []})),
config: WorkerConfig {
timeout_ms: 5000,
max_heap_size: 64 * 1024 * 1024,
max_tool_calls: 50,
max_tool_call_args_size: 1024 * 1024,
max_output_size: 1024 * 1024,
max_code_size: 64 * 1024,
max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
max_resource_size: 64 * 1024 * 1024,
max_parallel: 8,
known_tools: None,
known_servers: None,
},
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::Execute {
code,
manifest,
config,
} => {
assert_eq!(code, "async () => { return 42; }");
assert!(manifest.is_some());
assert_eq!(config.timeout_ms, 5000);
}
other => panic!("expected Execute, got: {:?}", other),
}
}
#[tokio::test]
async fn roundtrip_parent_tool_result() {
let msg = ParentMessage::ToolCallResult {
request_id: 42,
result: Ok(serde_json::json!({"status": "ok"})),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::ToolCallResult { request_id, result } => {
assert_eq!(request_id, 42);
assert!(result.is_ok());
}
other => panic!("expected ToolCallResult, got: {:?}", other),
}
}
#[tokio::test]
async fn roundtrip_parent_tool_result_error() {
let msg = ParentMessage::ToolCallResult {
request_id: 7,
result: Err(IpcDispatchError::from_string("connection refused".into())),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::ToolCallResult { request_id, result } => {
assert_eq!(request_id, 7);
let err = result.unwrap_err();
assert_eq!(err.message, "connection refused");
assert_eq!(err.code, "INTERNAL");
}
other => panic!("expected ToolCallResult, got: {:?}", other),
}
}
#[tokio::test]
async fn roundtrip_child_tool_request() {
let msg = ChildMessage::ToolCallRequest {
request_id: 1,
server: "narsil".into(),
tool: "ast.parse".into(),
args: serde_json::json!({"file": "test.rs"}),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ToolCallRequest {
request_id,
server,
tool,
args,
} => {
assert_eq!(request_id, 1);
assert_eq!(server, "narsil");
assert_eq!(tool, "ast.parse");
assert_eq!(args["file"], "test.rs");
}
other => panic!("expected ToolCallRequest, got: {:?}", other),
}
}
#[tokio::test]
async fn roundtrip_child_execution_complete() {
let msg = ChildMessage::ExecutionComplete {
result: Ok(serde_json::json!([1, 2, 3])),
error_kind: None,
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result, error_kind, ..
} => {
assert_eq!(result.unwrap(), serde_json::json!([1, 2, 3]));
assert_eq!(error_kind, None);
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn roundtrip_child_log() {
let msg = ChildMessage::Log {
message: "processing step 3".into(),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::Log { message } => {
assert_eq!(message, "processing step 3");
}
other => panic!("expected Log, got: {:?}", other),
}
}
#[tokio::test]
async fn multiple_messages_in_stream() {
let msg1 = ChildMessage::Log {
message: "first".into(),
};
let msg2 = ChildMessage::ToolCallRequest {
request_id: 1,
server: "s".into(),
tool: "t".into(),
args: serde_json::json!({}),
};
let msg3 = ChildMessage::ExecutionComplete {
result: Ok(serde_json::json!("done")),
error_kind: None,
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg1).await.unwrap();
write_message(&mut buf, &msg2).await.unwrap();
write_message(&mut buf, &msg3).await.unwrap();
let mut cursor = Cursor::new(buf);
let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
assert!(matches!(d1, ChildMessage::Log { .. }));
assert!(matches!(d2, ChildMessage::ToolCallRequest { .. }));
assert!(matches!(d3, ChildMessage::ExecutionComplete { .. }));
let d4: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
assert!(d4.is_none());
}
#[tokio::test]
async fn execution_complete_error_roundtrip() {
let msg = ChildMessage::ExecutionComplete {
result: Err("failed to create tokio runtime: resource unavailable".into()),
error_kind: Some(ErrorKind::Execution),
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result, error_kind, ..
} => {
let err = result.unwrap_err();
assert!(
err.contains("tokio runtime"),
"expected runtime error: {err}"
);
assert_eq!(error_kind, Some(ErrorKind::Execution));
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn eof_returns_none() {
let mut cursor = Cursor::new(Vec::<u8>::new());
let result: Option<ParentMessage> = read_message(&mut cursor).await.unwrap();
assert!(result.is_none());
}
#[test]
fn u32_try_from_overflow() {
let overflow_size = u32::MAX as usize + 1;
assert!(u32::try_from(overflow_size).is_err());
}
#[tokio::test]
async fn write_message_normal_size_succeeds() {
let msg = ChildMessage::Log {
message: "a".repeat(1024),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
assert!(buf.len() > 1024);
}
#[tokio::test]
async fn large_message_roundtrip() {
let large_data = "x".repeat(1_000_000);
let msg = ChildMessage::ExecutionComplete {
result: Ok(serde_json::json!(large_data)),
error_kind: None,
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete { result, .. } => {
assert_eq!(result.unwrap().as_str().unwrap().len(), 1_000_000);
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn worker_config_roundtrip_from_sandbox_config() {
let sandbox = crate::SandboxConfig::default();
let worker = WorkerConfig::from(&sandbox);
let back = worker.to_sandbox_config();
assert_eq!(sandbox.timeout, back.timeout);
assert_eq!(sandbox.max_heap_size, back.max_heap_size);
assert_eq!(sandbox.max_tool_calls, back.max_tool_calls);
assert_eq!(sandbox.max_output_size, back.max_output_size);
assert_eq!(worker.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
assert_eq!(worker.max_ipc_message_size, 8 * 1024 * 1024); }
#[tokio::test]
async fn read_message_with_limit_rejects_oversized() {
let msg = ChildMessage::Log {
message: "x".repeat(1024),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let result: Result<Option<ChildMessage>, _> =
read_message_with_limit(&mut cursor, 64).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("too large"), "error: {err_msg}");
}
#[tokio::test]
async fn read_message_with_limit_accepts_within_limit() {
let msg = ChildMessage::Log {
message: "hello".into(),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let result: Option<ChildMessage> =
read_message_with_limit(&mut cursor, 1024).await.unwrap();
assert!(result.is_some());
}
#[tokio::test]
async fn worker_config_ipc_limit_serde_default() {
let json = r#"{
"timeout_ms": 5000,
"max_heap_size": 67108864,
"max_tool_calls": 50,
"max_tool_call_args_size": 1048576,
"max_output_size": 1048576,
"max_code_size": 65536
}"#;
let config: WorkerConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.max_ipc_message_size, DEFAULT_MAX_IPC_MESSAGE_SIZE);
}
#[tokio::test]
async fn ipc_01_resource_read_request_roundtrip() {
let msg = ChildMessage::ResourceReadRequest {
request_id: 10,
server: "postgres".into(),
uri: "file:///logs/app.log".into(),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ResourceReadRequest {
request_id,
server,
uri,
} => {
assert_eq!(request_id, 10);
assert_eq!(server, "postgres");
assert_eq!(uri, "file:///logs/app.log");
}
other => panic!("expected ResourceReadRequest, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_02_resource_read_result_success_roundtrip() {
let msg = ParentMessage::ResourceReadResult {
request_id: 11,
result: Ok(serde_json::json!({"content": "log data here"})),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::ResourceReadResult { request_id, result } => {
assert_eq!(request_id, 11);
let val = result.unwrap();
assert_eq!(val["content"], "log data here");
}
other => panic!("expected ResourceReadResult, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_03_resource_read_result_error_roundtrip() {
let msg = ParentMessage::ResourceReadResult {
request_id: 12,
result: Err(IpcDispatchError::from_string("resource not found".into())),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::ResourceReadResult { request_id, result } => {
assert_eq!(request_id, 12);
let err = result.unwrap_err();
assert_eq!(err.message, "resource not found");
assert_eq!(err.code, "INTERNAL");
}
other => panic!("expected ResourceReadResult, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_04_stash_put_roundtrip() {
let msg = ChildMessage::StashPut {
request_id: 20,
key: "my-key".into(),
value: serde_json::json!({"data": 42}),
ttl_secs: Some(60),
group: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashPut {
request_id,
key,
value,
ttl_secs,
group,
} => {
assert_eq!(request_id, 20);
assert_eq!(key, "my-key");
assert_eq!(value["data"], 42);
assert_eq!(ttl_secs, Some(60));
assert_eq!(group, None);
}
other => panic!("expected StashPut, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_05_stash_get_roundtrip() {
let msg = ChildMessage::StashGet {
request_id: 21,
key: "lookup-key".into(),
group: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashGet {
request_id,
key,
group,
} => {
assert_eq!(request_id, 21);
assert_eq!(key, "lookup-key");
assert_eq!(group, None);
}
other => panic!("expected StashGet, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_06_stash_delete_roundtrip() {
let msg = ChildMessage::StashDelete {
request_id: 22,
key: "delete-me".into(),
group: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashDelete {
request_id,
key,
group,
} => {
assert_eq!(request_id, 22);
assert_eq!(key, "delete-me");
assert_eq!(group, None);
}
other => panic!("expected StashDelete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_07_stash_keys_roundtrip() {
let msg = ChildMessage::StashKeys {
request_id: 23,
group: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashKeys { request_id, group } => {
assert_eq!(request_id, 23);
assert_eq!(group, None);
}
other => panic!("expected StashKeys, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_08_stash_result_roundtrip() {
let msg = ParentMessage::StashResult {
request_id: 24,
result: Ok(serde_json::json!({"ok": true})),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::StashResult { request_id, result } => {
assert_eq!(request_id, 24);
assert_eq!(result.unwrap(), serde_json::json!({"ok": true}));
}
other => panic!("expected StashResult, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_09_mixed_message_interleaving() {
let msg1 = ChildMessage::ToolCallRequest {
request_id: 1,
server: "s".into(),
tool: "t".into(),
args: serde_json::json!({}),
};
let msg2 = ChildMessage::ResourceReadRequest {
request_id: 2,
server: "pg".into(),
uri: "file:///data".into(),
};
let msg3 = ChildMessage::StashPut {
request_id: 3,
key: "k".into(),
value: serde_json::json!("v"),
ttl_secs: None,
group: None,
};
let msg4 = ChildMessage::StashGet {
request_id: 4,
key: "k".into(),
group: None,
};
let msg5 = ChildMessage::ExecutionComplete {
result: Ok(serde_json::json!("done")),
error_kind: None,
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg1).await.unwrap();
write_message(&mut buf, &msg2).await.unwrap();
write_message(&mut buf, &msg3).await.unwrap();
write_message(&mut buf, &msg4).await.unwrap();
write_message(&mut buf, &msg5).await.unwrap();
let mut cursor = Cursor::new(buf);
let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d2: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d3: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d4: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d5: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
assert!(matches!(d1, ChildMessage::ToolCallRequest { .. }));
assert!(matches!(d2, ChildMessage::ResourceReadRequest { .. }));
assert!(matches!(d3, ChildMessage::StashPut { .. }));
assert!(matches!(d4, ChildMessage::StashGet { .. }));
assert!(matches!(d5, ChildMessage::ExecutionComplete { .. }));
let d6: Option<ChildMessage> = read_message(&mut cursor).await.unwrap();
assert!(d6.is_none());
}
#[tokio::test]
async fn ipc_p01_reset_roundtrip() {
let msg = ParentMessage::Reset {
config: WorkerConfig {
timeout_ms: 3000,
max_heap_size: 32 * 1024 * 1024,
max_tool_calls: 25,
max_tool_call_args_size: 512 * 1024,
max_output_size: 512 * 1024,
max_code_size: 32 * 1024,
max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
max_resource_size: 32 * 1024 * 1024,
max_parallel: 4,
known_tools: None,
known_servers: None,
},
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::Reset { config } => {
assert_eq!(config.timeout_ms, 3000);
assert_eq!(config.max_tool_calls, 25);
}
other => panic!("expected Reset, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_p02_reset_complete_roundtrip() {
let msg = ChildMessage::ResetComplete;
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
assert!(matches!(decoded, ChildMessage::ResetComplete));
}
#[tokio::test]
async fn ipc_p03_reset_execute_interleaving() {
let reset = ParentMessage::Reset {
config: WorkerConfig {
timeout_ms: 5000,
max_heap_size: 64 * 1024 * 1024,
max_tool_calls: 50,
max_tool_call_args_size: 1024 * 1024,
max_output_size: 1024 * 1024,
max_code_size: 64 * 1024,
max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
max_resource_size: 64 * 1024 * 1024,
max_parallel: 8,
known_tools: None,
known_servers: None,
},
};
let execute = ParentMessage::Execute {
code: "async () => 42".into(),
manifest: None,
config: WorkerConfig {
timeout_ms: 5000,
max_heap_size: 64 * 1024 * 1024,
max_tool_calls: 50,
max_tool_call_args_size: 1024 * 1024,
max_output_size: 1024 * 1024,
max_code_size: 64 * 1024,
max_ipc_message_size: DEFAULT_MAX_IPC_MESSAGE_SIZE,
max_resource_size: 64 * 1024 * 1024,
max_parallel: 8,
known_tools: None,
known_servers: None,
},
};
let mut buf = Vec::new();
write_message(&mut buf, &reset).await.unwrap();
write_message(&mut buf, &execute).await.unwrap();
let mut cursor = Cursor::new(buf);
let d1: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
let d2: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
assert!(matches!(d1, ParentMessage::Reset { .. }));
assert!(matches!(d2, ParentMessage::Execute { .. }));
}
#[tokio::test]
async fn ipc_10_oversized_stash_message_rejected() {
let msg = ChildMessage::StashPut {
request_id: 100,
key: "k".into(),
value: serde_json::json!("x".repeat(2048)),
ttl_secs: Some(60),
group: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let result: Result<Option<ChildMessage>, _> =
read_message_with_limit(&mut cursor, 64).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("too large"),
"error should mention 'too large': {err_msg}"
);
}
#[tokio::test]
async fn ipc_o01_error_kind_timeout_roundtrip() {
let msg = ChildMessage::ExecutionComplete {
result: Err("execution timed out after 500ms".into()),
error_kind: Some(ErrorKind::Timeout),
timeout_ms: Some(500),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result,
error_kind,
timeout_ms,
} => {
assert!(result.is_err());
assert_eq!(error_kind, Some(ErrorKind::Timeout));
assert_eq!(timeout_ms, Some(500));
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_o02_error_kind_heap_limit_roundtrip() {
let msg = ChildMessage::ExecutionComplete {
result: Err("V8 heap limit exceeded".into()),
error_kind: Some(ErrorKind::HeapLimit),
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result, error_kind, ..
} => {
assert!(result.is_err());
assert_eq!(error_kind, Some(ErrorKind::HeapLimit));
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_o03_error_kind_backward_compat() {
let json = r#"{"type":"ExecutionComplete","result":{"Err":"some old error"}}"#;
let mut buf = Vec::new();
let payload = json.as_bytes();
let len = payload.len() as u32;
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(payload);
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result,
error_kind,
timeout_ms,
} => {
assert!(result.is_err());
assert_eq!(
error_kind, None,
"missing error_kind should default to None"
);
assert_eq!(
timeout_ms, None,
"missing timeout_ms should default to None"
);
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_o04_error_kind_js_error_roundtrip() {
let msg = ChildMessage::ExecutionComplete {
result: Err("ReferenceError: x is not defined".into()),
error_kind: Some(ErrorKind::JsError),
timeout_ms: None,
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result, error_kind, ..
} => {
assert_eq!(result.unwrap_err(), "ReferenceError: x is not defined");
assert_eq!(error_kind, Some(ErrorKind::JsError));
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_o05_success_omits_error_kind() {
let msg = ChildMessage::ExecutionComplete {
result: Ok(serde_json::json!(42)),
error_kind: None,
timeout_ms: None,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(
!json.contains("error_kind"),
"success messages should not contain error_kind field: {json}"
);
assert!(
!json.contains("timeout_ms"),
"success messages should not contain timeout_ms field: {json}"
);
}
#[tokio::test]
async fn ipc_h1_01_stash_put_with_group_roundtrip() {
let msg = ChildMessage::StashPut {
request_id: 50,
key: "grouped-key".into(),
value: serde_json::json!({"data": "secret"}),
ttl_secs: Some(120),
group: Some("analytics".into()),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashPut {
request_id,
key,
group,
..
} => {
assert_eq!(request_id, 50);
assert_eq!(key, "grouped-key");
assert_eq!(group, Some("analytics".into()));
}
other => panic!("expected StashPut, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_h1_02_stash_get_with_group_roundtrip() {
let msg = ChildMessage::StashGet {
request_id: 51,
key: "grouped-key".into(),
group: Some("analytics".into()),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashGet {
request_id,
key,
group,
} => {
assert_eq!(request_id, 51);
assert_eq!(key, "grouped-key");
assert_eq!(group, Some("analytics".into()));
}
other => panic!("expected StashGet, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_h1_03_stash_delete_with_group_roundtrip() {
let msg = ChildMessage::StashDelete {
request_id: 52,
key: "grouped-key".into(),
group: Some("analytics".into()),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashDelete {
request_id,
key,
group,
} => {
assert_eq!(request_id, 52);
assert_eq!(key, "grouped-key");
assert_eq!(group, Some("analytics".into()));
}
other => panic!("expected StashDelete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_h1_04_stash_keys_with_group_roundtrip() {
let msg = ChildMessage::StashKeys {
request_id: 53,
group: Some("analytics".into()),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashKeys { request_id, group } => {
assert_eq!(request_id, 53);
assert_eq!(group, Some("analytics".into()));
}
other => panic!("expected StashKeys, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_h1_05_stash_put_without_group_backward_compat() {
let msg = ChildMessage::StashPut {
request_id: 54,
key: "no-group-key".into(),
value: serde_json::json!("val"),
ttl_secs: None,
group: None,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(
!json.contains("\"group\""),
"group:None should be absent in serialized JSON: {json}"
);
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashPut { group, .. } => {
assert_eq!(group, None);
}
other => panic!("expected StashPut, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_h1_06_old_message_without_group_field_deserializes() {
let json = r#"{"type":"StashPut","request_id":60,"key":"old-key","value":"old-val","ttl_secs":30}"#;
let mut buf = Vec::new();
let payload = json.as_bytes();
let len = payload.len() as u32;
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(payload);
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashPut {
request_id,
key,
group,
..
} => {
assert_eq!(request_id, 60);
assert_eq!(key, "old-key");
assert_eq!(
group, None,
"missing group field from v0.3.0 worker should deserialize as None"
);
}
other => panic!("expected StashPut, got: {:?}", other),
}
let json_get = r#"{"type":"StashGet","request_id":61,"key":"old-key"}"#;
let mut buf = Vec::new();
let payload = json_get.as_bytes();
buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
buf.extend_from_slice(payload);
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashGet { group, .. } => assert_eq!(group, None),
other => panic!("expected StashGet, got: {:?}", other),
}
let json_del = r#"{"type":"StashDelete","request_id":62,"key":"old-key"}"#;
let mut buf = Vec::new();
let payload = json_del.as_bytes();
buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
buf.extend_from_slice(payload);
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashDelete { group, .. } => assert_eq!(group, None),
other => panic!("expected StashDelete, got: {:?}", other),
}
let json_keys = r#"{"type":"StashKeys","request_id":63}"#;
let mut buf = Vec::new();
let payload = json_keys.as_bytes();
buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
buf.extend_from_slice(payload);
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::StashKeys { group, .. } => assert_eq!(group, None),
other => panic!("expected StashKeys, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_t01_exec_complete_timeout_with_timeout_ms_roundtrip() {
let msg = ChildMessage::ExecutionComplete {
result: Err("execution timed out after 5000ms".into()),
error_kind: Some(ErrorKind::Timeout),
timeout_ms: Some(5000),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
result,
error_kind,
timeout_ms,
} => {
assert!(result.is_err());
assert_eq!(error_kind, Some(ErrorKind::Timeout));
assert_eq!(timeout_ms, Some(5000));
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_t02_timeout_ms_absent_backward_compat() {
let json = r#"{"type":"ExecutionComplete","result":{"Err":"timed out after 3000ms"},"error_kind":"timeout"}"#;
let mut buf = Vec::new();
let payload = json.as_bytes();
buf.extend_from_slice(&(payload.len() as u32).to_be_bytes());
buf.extend_from_slice(payload);
let mut cursor = Cursor::new(buf);
let decoded: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ChildMessage::ExecutionComplete {
error_kind,
timeout_ms,
..
} => {
assert_eq!(error_kind, Some(ErrorKind::Timeout));
assert_eq!(
timeout_ms, None,
"missing timeout_ms should default to None"
);
}
other => panic!("expected ExecutionComplete, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_t03_timeout_ms_serialization_omitted_when_none() {
let msg = ChildMessage::ExecutionComplete {
result: Err("some error".into()),
error_kind: Some(ErrorKind::JsError),
timeout_ms: None,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(
!json.contains("timeout_ms"),
"timeout_ms:None should be omitted: {json}"
);
}
#[tokio::test]
async fn ipc_t04_timeout_ms_present_when_some() {
let msg = ChildMessage::ExecutionComplete {
result: Err("timed out".into()),
error_kind: Some(ErrorKind::Timeout),
timeout_ms: Some(10000),
};
let json = serde_json::to_string(&msg).unwrap();
assert!(
json.contains("\"timeout_ms\":10000"),
"timeout_ms should be present: {json}"
);
}
#[tokio::test]
async fn ipc_rv01_write_raw_message_roundtrip() {
let payload = br#"{"type":"StashResult","request_id":1,"result":{"Ok":{"data":42}}}"#;
let mut buf = Vec::new();
write_raw_message(&mut buf, payload).await.unwrap();
let mut cursor = Cursor::new(buf);
let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
.await
.unwrap()
.unwrap();
assert_eq!(raw.get(), std::str::from_utf8(payload).unwrap());
}
#[tokio::test]
async fn ipc_rv02_read_raw_message_preserves_bytes() {
let msg = ChildMessage::Log {
message: "test raw".into(),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
.await
.unwrap()
.unwrap();
let parsed: ChildMessage = serde_json::from_str(raw.get()).unwrap();
assert!(matches!(parsed, ChildMessage::Log { .. }));
}
#[tokio::test]
async fn ipc_rv03_raw_message_size_limit_enforced() {
let large_payload = format!(r#"{{"data":"{}"}}"#, "x".repeat(1024));
let mut buf = Vec::new();
write_raw_message(&mut buf, large_payload.as_bytes())
.await
.unwrap();
let mut cursor = Cursor::new(buf);
let result = read_raw_message(&mut cursor, 64).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("too large"), "error: {err}");
}
#[tokio::test]
async fn ipc_rv04_large_payload_stays_raw() {
let large = format!(r#"{{"big":"{}"}}"#, "x".repeat(1_000_000));
let mut buf = Vec::new();
write_raw_message(&mut buf, large.as_bytes()).await.unwrap();
let mut cursor = Cursor::new(buf);
let raw = read_raw_message(&mut cursor, 2 * 1024 * 1024)
.await
.unwrap()
.unwrap();
assert!(raw.get().len() > 1_000_000);
let val: Value = serde_json::from_str(raw.get()).unwrap();
assert_eq!(val["big"].as_str().unwrap().len(), 1_000_000);
}
#[tokio::test]
async fn ipc_rv05_rawvalue_backward_compat_with_value() {
let msg = ParentMessage::ToolCallResult {
request_id: 99,
result: Ok(serde_json::json!({"status": "ok", "count": 42})),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf.clone());
let raw = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
.await
.unwrap()
.unwrap();
let parsed: ParentMessage = serde_json::from_str(raw.get()).unwrap();
match parsed {
ParentMessage::ToolCallResult { request_id, result } => {
assert_eq!(request_id, 99);
assert!(result.is_ok());
}
other => panic!("expected ToolCallResult, got: {:?}", other),
}
}
#[tokio::test]
async fn ipc_rv06_raw_eof_returns_none() {
let mut cursor = Cursor::new(Vec::<u8>::new());
let result = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
.await
.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn ipc_rv07_mixed_raw_and_value_messages() {
let mut buf = Vec::new();
let msg1 = ChildMessage::Log {
message: "first".into(),
};
write_message(&mut buf, &msg1).await.unwrap();
let raw_payload = br#"{"type":"Log","message":"raw second"}"#;
write_raw_message(&mut buf, raw_payload).await.unwrap();
let mut cursor = Cursor::new(buf);
let d1: ChildMessage = read_message(&mut cursor).await.unwrap().unwrap();
assert!(matches!(d1, ChildMessage::Log { .. }));
let d2 = read_raw_message(&mut cursor, DEFAULT_MAX_IPC_MESSAGE_SIZE)
.await
.unwrap()
.unwrap();
let parsed: ChildMessage = serde_json::from_str(d2.get()).unwrap();
assert!(matches!(parsed, ChildMessage::Log { .. }));
}
#[tokio::test]
async fn tool_error_round_trips_through_ipc() {
let original = forge_error::DispatchError::ToolError {
server: "arbiter".into(),
tool: "scan_target".into(),
message: "tool returned error: Invalid params: missing field 'base_url'".into(),
};
let ipc_err = IpcDispatchError::from(&original);
assert_eq!(ipc_err.code, "TOOL_ERROR");
assert_eq!(ipc_err.server, Some("arbiter".into()));
assert_eq!(ipc_err.tool, Some("scan_target".into()));
assert!(ipc_err.message.contains("Invalid params"));
let json = serde_json::to_string(&ipc_err).unwrap();
let deserialized: IpcDispatchError = serde_json::from_str(&json).unwrap();
let reconstructed = deserialized.to_dispatch_error();
assert!(matches!(
reconstructed,
forge_error::DispatchError::ToolError {
ref server,
ref tool,
..
} if server == "arbiter" && tool == "scan_target"
));
assert!(!reconstructed.trips_circuit_breaker());
assert_eq!(reconstructed.code(), "TOOL_ERROR");
}
#[tokio::test]
async fn tool_error_ipc_message_roundtrip() {
let msg = ParentMessage::ToolCallResult {
request_id: 42,
result: Err(IpcDispatchError::from(
&forge_error::DispatchError::ToolError {
server: "arbiter".into(),
tool: "scan".into(),
message: "bad params".into(),
},
)),
};
let mut buf = Vec::new();
write_message(&mut buf, &msg).await.unwrap();
let mut cursor = Cursor::new(buf);
let decoded: ParentMessage = read_message(&mut cursor).await.unwrap().unwrap();
match decoded {
ParentMessage::ToolCallResult { request_id, result } => {
assert_eq!(request_id, 42);
let err = result.unwrap_err();
assert_eq!(err.code, "TOOL_ERROR");
assert_eq!(err.server, Some("arbiter".into()));
assert_eq!(err.tool, Some("scan".into()));
let dispatch_err = err.to_dispatch_error();
assert!(!dispatch_err.trips_circuit_breaker());
}
other => panic!("expected ToolCallResult, got: {:?}", other),
}
}
}