use crate::protocol::{
parse_line, AssistantEventData, StreamEvent, SystemEventData, ToolUseRequest,
};
pub type StreamEventEmitter = Arc<dyn Fn(StreamEvent) + Send + Sync + 'static>;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt};
use tokio::process::Command;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct InvokeOptions {
#[serde(default)]
pub cwd: Option<PathBuf>,
#[serde(default)]
pub allowed_tools: Option<Vec<String>>,
#[serde(default)]
pub max_turns: Option<u32>,
#[serde(default)]
pub timeout_secs: Option<u64>,
#[serde(default)]
pub mcp_endpoint: Option<String>,
}
const DEFAULT_TIMEOUT_SECS: u64 = 300;
const MAX_TIMEOUT_SECS: u64 = 3600;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct InvokeResult {
pub answer: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default)]
pub turns: u32,
#[serde(default)]
pub tool_calls: u32,
#[serde(default)]
pub tool_uses: Vec<ToolUseRequest>,
#[serde(default)]
pub duration_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub total_cost_usd: Option<f64>,
#[serde(default)]
pub is_error: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Error)]
pub enum InvokeError {
#[error("subprocess spawn failed: {0}")]
Spawn(String),
#[error("subprocess I/O failed: {0}")]
Io(String),
#[error("subprocess timed out after {0}s")]
Timeout(u64),
#[error("stream produced malformed JSON: {0}")]
BadJson(String),
}
pub(crate) fn build_claude_args(
opts: &InvokeOptions,
mcp_config_path: Option<&std::path::Path>,
) -> Vec<String> {
let mut args = vec![
"-p".to_string(),
"--output-format".to_string(),
"stream-json".to_string(),
"--input-format".to_string(),
"stream-json".to_string(),
"--verbose".to_string(),
];
if let Some(allowed) = &opts.allowed_tools {
args.push("--allowed-tools".to_string());
args.push(allowed.join(" "));
}
if let Some(turns) = opts.max_turns {
args.push("--max-turns".to_string());
args.push(turns.to_string());
}
if let Some(path) = mcp_config_path {
args.push("--mcp-config".to_string());
args.push(path.to_string_lossy().into_owned());
}
args
}
pub(crate) fn build_mcp_config_json(endpoint: &str) -> String {
let cfg = serde_json::json!({
"mcpServers": {
"car": {
"type": "http",
"url": endpoint,
}
}
});
cfg.to_string()
}
pub(crate) fn build_user_message(task: &str) -> String {
let payload = serde_json::json!({
"type": "user",
"message": {
"role": "user",
"content": task,
},
});
payload.to_string()
}
pub async fn invoke(
id: &str,
task: &str,
opts: InvokeOptions,
) -> Result<InvokeResult, InvokeError> {
invoke_with_emitter(id, task, opts, None).await
}
pub async fn invoke_with_emitter(
id: &str,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
let detected = crate::detect().await;
let spec = detected
.iter()
.find(|s| s.id == id)
.ok_or_else(|| InvokeError::Spawn(format!("no detected external agent with id `{id}`")))?;
match id {
"claude-code" => invoke_claude_code(&spec.binary_path, task, opts, emitter).await,
"codex" => invoke_codex(&spec.binary_path, task, opts, emitter).await,
"gemini" => invoke_gemini(&spec.binary_path, task, opts, emitter).await,
_ => Err(InvokeError::Spawn(format!("unknown adapter id: {id}"))),
}
}
pub async fn invoke_claude_code(
binary_path: &Path,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
tracing::info!(
adapter = "claude-code",
binary = %binary_path.display(),
task_len = task.len(),
"external agent invocation started"
);
let timeout_secs = opts
.timeout_secs
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS)
.max(1);
let timeout = Duration::from_secs(timeout_secs);
let mcp_config: Option<tempfile::NamedTempFile> = match opts.mcp_endpoint.as_deref() {
Some(endpoint) if !endpoint.is_empty() => {
let json = build_mcp_config_json(endpoint);
let mut tmp = tempfile::Builder::new()
.prefix("car-mcp-config-")
.suffix(".json")
.tempfile()
.map_err(|e| InvokeError::Io(format!("mcp config tempfile: {e}")))?;
std::io::Write::write_all(&mut tmp, json.as_bytes())
.map_err(|e| InvokeError::Io(format!("mcp config write: {e}")))?;
Some(tmp)
}
_ => None,
};
let mcp_config_path = mcp_config.as_ref().map(|t| t.path());
let args = build_claude_args(&opts, mcp_config_path);
let mut cmd = Command::new(binary_path);
cmd.args(&args);
if let Some(cwd) = &opts.cwd {
cmd.current_dir(cwd);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| InvokeError::Spawn(e.to_string()))?;
{
let stdin = child
.stdin
.take()
.ok_or_else(|| InvokeError::Io("stdin closed unexpectedly".to_string()))?;
let user_message = build_user_message(task);
let mut stdin = stdin;
stdin
.write_all(user_message.as_bytes())
.await
.map_err(|e| InvokeError::Io(format!("stdin write: {e}")))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| InvokeError::Io(format!("stdin newline: {e}")))?;
}
let stdout = child
.stdout
.take()
.ok_or_else(|| InvokeError::Io("stdout missing".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| InvokeError::Io("stderr missing".to_string()))?;
let reader = tokio::io::BufReader::new(stdout);
let process_fut = async {
let mut result = process_stream(reader, emitter).await?;
let exit = child
.wait()
.await
.map_err(|e| InvokeError::Io(format!("wait: {e}")))?;
if !exit.success() && !result.is_error {
let mut stderr_buf = Vec::new();
let mut stderr_reader = tokio::io::BufReader::new(stderr);
let _ = tokio::io::AsyncReadExt::read_to_end(&mut stderr_reader, &mut stderr_buf).await;
let stderr_text = String::from_utf8_lossy(&stderr_buf).to_string();
result.is_error = true;
result.error = Some(format!(
"exit code {}: {}",
exit.code().unwrap_or(-1),
stderr_text.trim()
));
}
Ok::<_, InvokeError>(result)
};
match tokio::time::timeout(timeout, process_fut).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => Err(InvokeError::Timeout(timeout_secs)),
}
}
pub async fn process_stream<R>(
reader: R,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError>
where
R: AsyncBufRead + Unpin,
{
let mut result = InvokeResult::default();
let mut lines = reader.lines();
loop {
let line = match lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => return Err(InvokeError::Io(format!("stdout read: {e}"))),
};
let event = match parse_line(&line) {
Ok(Some(e)) => e,
Ok(None) => continue,
Err(e) => {
if result.answer.is_empty() && result.error.is_none() {
result.error = Some(format!("malformed JSON: {e}"));
}
continue;
}
};
if let Some(e) = &emitter {
e(event.clone());
}
apply_event(&mut result, event);
}
Ok(result)
}
fn apply_event(result: &mut InvokeResult, event: StreamEvent) {
match event {
StreamEvent::System(s) => {
if result.session_id.is_none() {
result.session_id = Some(s.session_id);
}
}
StreamEvent::Assistant(a) => {
result.turns = result.turns.saturating_add(1);
if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
for block in content {
if block.get("type").and_then(|v| v.as_str()) == Some("tool_use") {
result.tool_calls = result.tool_calls.saturating_add(1);
let id = block
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = block
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let input = block
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Null);
tracing::info!(
adapter = "claude-code",
tool_name = %name,
"external agent emitted tool_use (observation-only — \
policy gating via MCP route lands in Phase 2 stage 4b)"
);
result.tool_uses.push(ToolUseRequest { id, name, input });
}
}
}
}
StreamEvent::Result(r) => {
result.answer = r.result.unwrap_or_default();
result.duration_ms = r.duration_ms.unwrap_or(0);
result.total_cost_usd = r.total_cost_usd;
result.is_error = r.is_error;
if let Some(t) = r.num_turns {
result.turns = t;
}
if r.is_error {
result.error = Some(format!(
"agent reported error (subtype={}, terminal={:?})",
r.subtype, r.terminal_reason
));
}
}
StreamEvent::User(_) | StreamEvent::RateLimitEvent(_) | StreamEvent::Other => {
}
}
}
pub(crate) fn build_codex_args(opts: &InvokeOptions, mcp_endpoint: Option<&str>) -> Vec<String> {
let mut args = vec![
"exec".to_string(),
"--json".to_string(),
"--skip-git-repo-check".to_string(),
"--ephemeral".to_string(),
];
if let Some(cwd) = &opts.cwd {
args.push("--cd".to_string());
args.push(cwd.to_string_lossy().into_owned());
}
if let Some(endpoint) = mcp_endpoint.filter(|s| !s.is_empty()) {
let value = format!(r#"{{type="http",url="{}"}}"#, endpoint);
args.push("-c".to_string());
args.push(format!("mcp_servers.car={}", value));
}
args.push("-".to_string());
args
}
pub async fn invoke_codex(
binary_path: &Path,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
tracing::info!(
adapter = "codex",
binary = %binary_path.display(),
task_len = task.len(),
"external agent invocation started"
);
let timeout_secs = opts
.timeout_secs
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS)
.max(1);
let timeout = Duration::from_secs(timeout_secs);
let args = build_codex_args(&opts, opts.mcp_endpoint.as_deref());
let mut cmd = Command::new(binary_path);
cmd.args(&args);
if let Some(cwd) = &opts.cwd {
cmd.current_dir(cwd);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| InvokeError::Spawn(e.to_string()))?;
{
let mut stdin = child
.stdin
.take()
.ok_or_else(|| InvokeError::Io("stdin closed unexpectedly".to_string()))?;
stdin
.write_all(task.as_bytes())
.await
.map_err(|e| InvokeError::Io(format!("stdin write: {e}")))?;
}
let stdout = child
.stdout
.take()
.ok_or_else(|| InvokeError::Io("stdout missing".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| InvokeError::Io("stderr missing".to_string()))?;
let reader = tokio::io::BufReader::new(stdout);
let process_fut = async {
let mut result = process_codex_stream(reader, emitter).await?;
let exit = child
.wait()
.await
.map_err(|e| InvokeError::Io(format!("wait: {e}")))?;
if !exit.success() && !result.is_error {
let mut stderr_buf = Vec::new();
let mut stderr_reader = tokio::io::BufReader::new(stderr);
let _ = tokio::io::AsyncReadExt::read_to_end(&mut stderr_reader, &mut stderr_buf).await;
let stderr_text = String::from_utf8_lossy(&stderr_buf).to_string();
result.is_error = true;
result.error = Some(format!(
"exit code {}: {}",
exit.code().unwrap_or(-1),
stderr_text.trim()
));
}
Ok::<_, InvokeError>(result)
};
match tokio::time::timeout(timeout, process_fut).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => Err(InvokeError::Timeout(timeout_secs)),
}
}
fn synth_assistant_event(session_id: &str, content: serde_json::Value) -> StreamEvent {
StreamEvent::Assistant(AssistantEventData {
message: serde_json::json!({ "role": "assistant", "content": [content] }),
session_id: session_id.to_string(),
uuid: String::new(),
parent_tool_use_id: None,
})
}
pub async fn process_codex_stream<R>(
reader: R,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError>
where
R: tokio::io::AsyncBufRead + Unpin,
{
use serde_json::Value;
let started = std::time::Instant::now();
let mut result = InvokeResult::default();
let mut answer_parts: Vec<String> = Vec::new();
let mut lines = reader.lines();
loop {
let line = match lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => return Err(InvokeError::Io(format!("stdout read: {e}"))),
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let value: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
continue;
}
};
let kind = value.get("type").and_then(Value::as_str).unwrap_or("");
match kind {
"thread.started" => {
if let Some(id) = value.get("thread_id").and_then(Value::as_str) {
if result.session_id.is_none() {
result.session_id = Some(id.to_string());
}
}
if let Some(e) = &emitter {
e(StreamEvent::System(SystemEventData {
subtype: "init".to_string(),
session_id: result.session_id.clone().unwrap_or_default(),
model: None,
cwd: None,
tools: Vec::new(),
permission_mode: None,
claude_code_version: None,
extra: serde_json::Map::new(),
}));
}
}
"turn.started" => {
result.turns = result.turns.saturating_add(1);
}
"item.completed" => {
let Some(item) = value.get("item") else {
continue;
};
let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
match item_type {
"agent_message" => {
if let Some(text) = item.get("text").and_then(Value::as_str) {
if let Some(e) = &emitter {
let sid = result.session_id.clone().unwrap_or_default();
e(synth_assistant_event(
&sid,
serde_json::json!({ "type": "text", "text": text }),
));
}
answer_parts.push(text.to_string());
}
}
other if other.contains("tool") || other.contains("call") => {
result.tool_calls = result.tool_calls.saturating_add(1);
let id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let name = item
.get("name")
.and_then(Value::as_str)
.unwrap_or(other)
.to_string();
let input = item
.get("arguments")
.or_else(|| item.get("input"))
.cloned()
.unwrap_or(Value::Null);
tracing::info!(
adapter = "codex",
tool_name = %name,
"external agent emitted tool_use (observation-only)"
);
if let Some(e) = &emitter {
let sid = result.session_id.clone().unwrap_or_default();
e(synth_assistant_event(
&sid,
serde_json::json!({
"type": "tool_use",
"id": id,
"name": name,
"input": input,
}),
));
}
result.tool_uses.push(ToolUseRequest { id, name, input });
}
_ => {}
}
}
"turn.completed" => {
}
_ => {}
}
}
result.answer = answer_parts.join("");
result.duration_ms = started.elapsed().as_millis() as u64;
if result.answer.is_empty() && result.error.is_none() {
result.is_error = true;
result.error = Some("codex produced no agent_message".to_string());
}
Ok(result)
}
pub(crate) fn build_gemini_args(opts: &InvokeOptions, task: &str) -> Vec<String> {
let mut args = vec!["-p".to_string(), task.to_string(), "--yolo".to_string()];
if let Some(cwd) = &opts.cwd {
let _ = cwd;
}
args
}
pub async fn invoke_gemini(
binary_path: &Path,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
tracing::info!(
adapter = "gemini",
binary = %binary_path.display(),
task_len = task.len(),
"external agent invocation started"
);
if opts
.mcp_endpoint
.as_deref()
.filter(|s| !s.is_empty())
.is_some()
{
tracing::warn!(
adapter = "gemini",
"mcp_endpoint supplied but Gemini CLI v0.1.x doesn't support \
--mcp-config; agent will run without CAR's MCP namespace"
);
}
let timeout_secs = opts
.timeout_secs
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS)
.max(1);
let timeout = Duration::from_secs(timeout_secs);
let started = std::time::Instant::now();
let args = build_gemini_args(&opts, task);
let mut cmd = Command::new(binary_path);
cmd.args(&args);
if let Some(cwd) = &opts.cwd {
cmd.current_dir(cwd);
}
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
let child = cmd.spawn().map_err(|e| InvokeError::Spawn(e.to_string()))?;
let process_fut = async {
let output = child
.wait_with_output()
.await
.map_err(|e| InvokeError::Io(format!("wait: {e}")))?;
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
let mut result = InvokeResult {
answer: stdout.trim().to_string(),
duration_ms: started.elapsed().as_millis() as u64,
..Default::default()
};
if !output.status.success() {
result.is_error = true;
result.error = Some(format!(
"exit code {}: {}",
output.status.code().unwrap_or(-1),
stderr.trim()
));
} else if result.answer.is_empty() {
result.is_error = true;
result.error = Some("gemini produced no stdout output".to_string());
} else if let Some(e) = &emitter {
e(synth_assistant_event(
"",
serde_json::json!({ "type": "text", "text": result.answer.clone() }),
));
}
Ok::<_, InvokeError>(result)
};
match tokio::time::timeout(timeout, process_fut).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => Err(InvokeError::Timeout(timeout_secs)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tokio::io::BufReader;
fn mock_reader(lines: &[&str]) -> BufReader<Cursor<Vec<u8>>> {
let joined = lines.join("\n");
BufReader::new(Cursor::new(joined.into_bytes()))
}
#[tokio::test]
async fn aggregates_simple_text_response() {
let lines = [
r#"{"type":"system","subtype":"init","session_id":"s1","model":"opus","tools":[]}"#,
r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"ok"}],"usage":{}},"session_id":"s1","uuid":"u1"}"#,
r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":1500,"num_turns":1,"result":"ok","session_id":"s1","total_cost_usd":0.05,"usage":{},"modelUsage":{},"uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert_eq!(result.answer, "ok");
assert_eq!(result.session_id.as_deref(), Some("s1"));
assert_eq!(result.turns, 1);
assert_eq!(result.tool_calls, 0);
assert_eq!(result.duration_ms, 1500);
assert_eq!(result.total_cost_usd, Some(0.05));
assert!(!result.is_error);
assert!(result.error.is_none());
}
#[tokio::test]
async fn counts_tool_use_blocks_across_turns() {
let lines = [
r#"{"type":"system","subtype":"init","session_id":"s2","model":"opus","tools":[]}"#,
r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"t1","name":"Read","input":{"file_path":"/x"}}],"usage":{}},"session_id":"s2","uuid":"u1"}"#,
r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"t2","name":"Bash","input":{"command":"ls"}},{"type":"text","text":"done"}],"usage":{}},"session_id":"s2","uuid":"u2"}"#,
r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":3000,"num_turns":2,"result":"done","session_id":"s2","total_cost_usd":0.10,"usage":{},"modelUsage":{},"uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert_eq!(result.tool_calls, 2);
assert_eq!(result.turns, 2);
assert_eq!(result.answer, "done");
assert_eq!(result.tool_uses.len(), 2);
assert_eq!(result.tool_uses[0].id, "t1");
assert_eq!(result.tool_uses[0].name, "Read");
assert_eq!(
result.tool_uses[0]
.input
.get("file_path")
.and_then(|v| v.as_str()),
Some("/x")
);
assert_eq!(result.tool_uses[1].id, "t2");
assert_eq!(result.tool_uses[1].name, "Bash");
}
#[tokio::test]
async fn surfaces_agent_reported_error() {
let lines = [
r#"{"type":"system","subtype":"init","session_id":"s3","model":"opus","tools":[]}"#,
r#"{"type":"result","subtype":"error","is_error":true,"duration_ms":500,"session_id":"s3","total_cost_usd":0.0,"usage":{},"modelUsage":{},"terminal_reason":"timeout","uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert!(result.is_error);
assert!(result.error.as_deref().unwrap().contains("error"));
}
#[tokio::test]
async fn empty_stream_produces_empty_result_with_no_panic() {
let result = process_stream(mock_reader(&[]), None).await.unwrap();
assert_eq!(result.answer, "");
assert_eq!(result.turns, 0);
assert!(result.session_id.is_none());
}
#[tokio::test]
async fn malformed_line_logged_but_stream_continues() {
let lines = [
r#"{not valid"#,
r#"{"type":"system","subtype":"init","session_id":"s4","model":"opus","tools":[]}"#,
r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":100,"num_turns":0,"result":"recovered","session_id":"s4","total_cost_usd":0.0,"usage":{},"modelUsage":{},"uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert_eq!(result.answer, "recovered");
assert_eq!(result.session_id.as_deref(), Some("s4"));
}
#[test]
fn build_claude_args_includes_required_format_flags() {
let args = build_claude_args(&InvokeOptions::default(), None);
assert!(args.contains(&"-p".to_string()));
assert!(args.iter().any(|a| a == "stream-json"));
assert!(!args.iter().any(|a| a == "--allowed-tools"));
assert!(!args.iter().any(|a| a == "--max-turns"));
assert!(!args.iter().any(|a| a == "--mcp-config"));
}
#[test]
fn build_claude_args_passes_allowed_tools() {
let opts = InvokeOptions {
allowed_tools: Some(vec!["Read".to_string(), "Bash".to_string()]),
max_turns: Some(5),
..Default::default()
};
let args = build_claude_args(&opts, None);
let pos = args.iter().position(|a| a == "--allowed-tools").unwrap();
assert_eq!(args[pos + 1], "Read Bash");
let pos = args.iter().position(|a| a == "--max-turns").unwrap();
assert_eq!(args[pos + 1], "5");
}
#[test]
fn build_claude_args_empty_allowed_tools_denies_everything() {
let opts = InvokeOptions {
allowed_tools: Some(vec![]),
..Default::default()
};
let args = build_claude_args(&opts, None);
let pos = args.iter().position(|a| a == "--allowed-tools").unwrap();
assert_eq!(args[pos + 1], "");
}
#[test]
fn build_claude_args_threads_mcp_config_path() {
let path = std::path::Path::new("/tmp/mcp-config.json");
let args = build_claude_args(&InvokeOptions::default(), Some(path));
let pos = args.iter().position(|a| a == "--mcp-config").unwrap();
assert_eq!(args[pos + 1], "/tmp/mcp-config.json");
}
#[test]
fn build_mcp_config_renders_http_server_entry() {
let json = build_mcp_config_json("http://127.0.0.1:9102/mcp");
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["mcpServers"]["car"]["type"], "http");
assert_eq!(
parsed["mcpServers"]["car"]["url"],
"http://127.0.0.1:9102/mcp"
);
}
#[test]
fn build_user_message_is_valid_json() {
let msg = build_user_message("hello world");
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
assert_eq!(parsed["type"], "user");
assert_eq!(parsed["message"]["role"], "user");
assert_eq!(parsed["message"]["content"], "hello world");
}
#[test]
fn codex_args_use_exec_subcommand_and_json_output() {
let args = build_codex_args(&InvokeOptions::default(), None);
assert_eq!(args[0], "exec");
assert!(args.contains(&"--json".to_string()));
assert!(args.contains(&"--skip-git-repo-check".to_string()));
assert!(args.contains(&"--ephemeral".to_string()));
assert_eq!(args.last().map(String::as_str), Some("-"));
}
#[test]
fn codex_args_inject_mcp_via_inline_config_override() {
let args = build_codex_args(&InvokeOptions::default(), Some("http://127.0.0.1:9102/mcp"));
let mcp_pos = args.iter().position(|a| a == "-c").unwrap();
assert!(args[mcp_pos + 1].starts_with("mcp_servers.car="));
assert!(args[mcp_pos + 1].contains("http://127.0.0.1:9102/mcp"));
assert!(args[mcp_pos + 1].contains("type=\"http\""));
}
#[test]
fn codex_args_threads_cd_flag() {
let opts = InvokeOptions {
cwd: Some("/tmp/work".into()),
..Default::default()
};
let args = build_codex_args(&opts, None);
let pos = args.iter().position(|a| a == "--cd").unwrap();
assert_eq!(args[pos + 1], "/tmp/work");
}
#[tokio::test]
async fn codex_stream_aggregates_agent_message() {
let lines = [
r#"{"type":"thread.started","thread_id":"thread-abc"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"hello world"}}"#,
r#"{"type":"turn.completed","usage":{"input_tokens":5,"output_tokens":2}}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.answer, "hello world");
assert_eq!(result.session_id.as_deref(), Some("thread-abc"));
assert_eq!(result.turns, 1);
assert!(!result.is_error);
}
#[tokio::test]
async fn codex_stream_concatenates_multiple_agent_messages() {
let lines = [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"part 1 "}}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"part 2"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.answer, "part 1 part 2");
}
#[tokio::test]
async fn codex_stream_records_tool_calls() {
let lines = [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"id":"call_1","type":"tool_call","name":"shell","arguments":{"cmd":"ls"}}}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"done"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.tool_calls, 1);
assert_eq!(result.tool_uses.len(), 1);
assert_eq!(result.tool_uses[0].name, "shell");
assert_eq!(result.answer, "done");
}
#[tokio::test]
async fn codex_stream_skips_non_json_banner_lines() {
let lines = [
"Reading prompt from stdin...",
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.answer, "ok");
}
#[tokio::test]
async fn codex_stream_no_agent_message_marks_error() {
let lines = [r#"{"type":"thread.started","thread_id":"t"}"#];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert!(result.is_error);
assert!(result.answer.is_empty());
}
#[tokio::test]
async fn codex_stream_fires_assistant_events_to_emitter() {
use std::sync::Mutex;
let captured: Arc<Mutex<Vec<StreamEvent>>> = Arc::new(Mutex::new(Vec::new()));
let sink = captured.clone();
let emitter: StreamEventEmitter = Arc::new(move |ev| sink.lock().unwrap().push(ev));
let lines = [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"hi there"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), Some(emitter))
.await
.unwrap();
assert_eq!(result.answer, "hi there");
let events = captured.lock().unwrap();
let texts: Vec<String> = events
.iter()
.filter_map(|e| match e {
StreamEvent::Assistant(a) => a
.message
.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.filter(|b| b.get("type").and_then(|t| t.as_str()) == Some("text"))
.and_then(|b| b.get("text"))
.and_then(|t| t.as_str())
.map(str::to_string),
_ => None,
})
.collect();
assert!(
texts.iter().any(|t| t == "hi there"),
"codex emitter must fire an Assistant text event (car#213); got {events:?}"
);
}
#[test]
fn gemini_args_use_prompt_and_yolo() {
let args = build_gemini_args(&InvokeOptions::default(), "say hi");
assert_eq!(args[0], "-p");
assert_eq!(args[1], "say hi");
assert!(args.contains(&"--yolo".to_string()));
}
}