use crate::protocol::{
parse_line, AssistantEventData, StreamEvent, SystemEventData, ToolUseRequest,
};
pub type StreamEventEmitter = Arc<dyn Fn(StreamEvent) + Send + Sync + 'static>;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt};
use tokio::process::Command;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImageAttachment {
pub path: PathBuf,
#[serde(default)]
pub media_type: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct InvokeOptions {
#[serde(default)]
pub cwd: Option<PathBuf>,
#[serde(default)]
pub allowed_tools: Option<Vec<String>>,
#[serde(default)]
pub max_turns: Option<u32>,
#[serde(default)]
pub timeout_secs: Option<u64>,
#[serde(default)]
pub mcp_endpoint: Option<String>,
#[serde(default)]
pub attachments: Vec<ImageAttachment>,
}
const DEFAULT_TIMEOUT_SECS: u64 = 300;
const MAX_TIMEOUT_SECS: u64 = 3600;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct InvokeResult {
pub answer: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default)]
pub turns: u32,
#[serde(default)]
pub tool_calls: u32,
#[serde(default)]
pub tool_uses: Vec<ToolUseRequest>,
#[serde(default)]
pub duration_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub total_cost_usd: Option<f64>,
#[serde(default, skip_serializing_if = "is_zero_u32")]
pub dropped_attachments: u32,
#[serde(default)]
pub is_error: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
fn is_zero_u32(n: &u32) -> bool {
*n == 0
}
#[derive(Debug, Error)]
pub enum InvokeError {
#[error("subprocess spawn failed: {0}")]
Spawn(String),
#[error("subprocess I/O failed: {0}")]
Io(String),
#[error("subprocess timed out after {0}s")]
Timeout(u64),
#[error("stream produced malformed JSON: {0}")]
BadJson(String),
}
pub(crate) fn build_claude_args(
opts: &InvokeOptions,
mcp_config_path: Option<&std::path::Path>,
) -> Vec<String> {
let mut args = vec![
"-p".to_string(),
"--output-format".to_string(),
"stream-json".to_string(),
"--input-format".to_string(),
"stream-json".to_string(),
"--verbose".to_string(),
];
if let Some(allowed) = &opts.allowed_tools {
args.push("--allowed-tools".to_string());
args.push(allowed.join(" "));
}
if let Some(turns) = opts.max_turns {
args.push("--max-turns".to_string());
args.push(turns.to_string());
}
if let Some(path) = mcp_config_path {
args.push("--mcp-config".to_string());
args.push(path.to_string_lossy().into_owned());
}
args
}
pub(crate) fn build_mcp_config_json(endpoint: &str) -> String {
let cfg = serde_json::json!({
"mcpServers": {
"car": {
"type": "http",
"url": endpoint,
}
}
});
cfg.to_string()
}
const MAX_IMAGE_BYTES: u64 = 32 * 1024 * 1024;
fn sniff_image_media_type(bytes: &[u8]) -> Option<&'static str> {
if bytes.starts_with(&[0x89, 0x50, 0x4E, 0x47]) {
Some("image/png")
} else if bytes.starts_with(&[0xFF, 0xD8, 0xFF]) {
Some("image/jpeg")
} else if bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a") {
Some("image/gif")
} else if bytes.len() >= 12 && &bytes[0..4] == b"RIFF" && &bytes[8..12] == b"WEBP" {
Some("image/webp")
} else {
None
}
}
fn image_within_size_cap(path: &Path) -> bool {
match std::fs::metadata(path) {
Ok(m) if m.is_file() && m.len() <= MAX_IMAGE_BYTES => true,
Ok(m) if m.len() > MAX_IMAGE_BYTES => {
tracing::warn!(path = %path.display(), bytes = m.len(), "image attachment exceeds size cap; skipping");
false
}
_ => false,
}
}
fn read_validated_image(path: &Path) -> Option<(Vec<u8>, &'static str)> {
if !image_within_size_cap(path) {
return None;
}
let bytes = match std::fs::read(path) {
Ok(b) => b,
Err(e) => {
tracing::warn!(path = %path.display(), error = %e, "skipping unreadable image attachment");
return None;
}
};
match sniff_image_media_type(&bytes) {
Some(mt) => Some((bytes, mt)),
None => {
tracing::warn!(path = %path.display(), "attachment is not a recognized image (png/jpeg/gif/webp); skipping");
None
}
}
}
fn claude_image_block(att: &ImageAttachment) -> Option<serde_json::Value> {
use base64::Engine as _;
let (bytes, media_type) = read_validated_image(&att.path)?;
let data = base64::engine::general_purpose::STANDARD.encode(&bytes);
Some(serde_json::json!({
"type": "image",
"source": { "type": "base64", "media_type": media_type, "data": data },
}))
}
pub(crate) fn build_user_message(task: &str, attachments: &[ImageAttachment]) -> (String, usize) {
let mut accepted = 0usize;
let content = if attachments.is_empty() {
serde_json::json!(task)
} else {
let mut blocks: Vec<serde_json::Value> = Vec::new();
if !task.is_empty() {
blocks.push(serde_json::json!({ "type": "text", "text": task }));
}
for att in attachments {
if let Some(block) = claude_image_block(att) {
blocks.push(block);
accepted += 1;
}
}
serde_json::json!(blocks)
};
let payload = serde_json::json!({
"type": "user",
"message": {
"role": "user",
"content": content,
},
});
(payload.to_string(), accepted)
}
pub async fn invoke(
id: &str,
task: &str,
opts: InvokeOptions,
) -> Result<InvokeResult, InvokeError> {
invoke_with_emitter(id, task, opts, None).await
}
pub async fn invoke_with_emitter(
id: &str,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
let detected = crate::detect().await;
let spec = detected
.iter()
.find(|s| s.id == id)
.ok_or_else(|| InvokeError::Spawn(format!("no detected external agent with id `{id}`")))?;
match id {
"claude-code" => invoke_claude_code(&spec.binary_path, task, opts, emitter).await,
"codex" => invoke_codex(&spec.binary_path, task, opts, emitter).await,
"gemini" => invoke_gemini(&spec.binary_path, task, opts, emitter).await,
_ => Err(InvokeError::Spawn(format!("unknown adapter id: {id}"))),
}
}
pub async fn invoke_claude_code(
binary_path: &Path,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
tracing::info!(
adapter = "claude-code",
binary = %binary_path.display(),
task_len = task.len(),
"external agent invocation started"
);
let timeout_secs = opts
.timeout_secs
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS)
.max(1);
let timeout = Duration::from_secs(timeout_secs);
let mcp_config: Option<tempfile::NamedTempFile> = match opts.mcp_endpoint.as_deref() {
Some(endpoint) if !endpoint.is_empty() => {
let json = build_mcp_config_json(endpoint);
let mut tmp = tempfile::Builder::new()
.prefix("car-mcp-config-")
.suffix(".json")
.tempfile()
.map_err(|e| InvokeError::Io(format!("mcp config tempfile: {e}")))?;
std::io::Write::write_all(&mut tmp, json.as_bytes())
.map_err(|e| InvokeError::Io(format!("mcp config write: {e}")))?;
Some(tmp)
}
_ => None,
};
let mcp_config_path = mcp_config.as_ref().map(|t| t.path());
let args = build_claude_args(&opts, mcp_config_path);
let mut cmd = Command::new(binary_path);
cmd.args(&args);
if let Some(cwd) = &opts.cwd {
cmd.current_dir(cwd);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| InvokeError::Spawn(e.to_string()))?;
let (user_message, accepted_images) = build_user_message(task, &opts.attachments);
let dropped_attachments = (opts.attachments.len().saturating_sub(accepted_images)) as u32;
{
let stdin = child
.stdin
.take()
.ok_or_else(|| InvokeError::Io("stdin closed unexpectedly".to_string()))?;
let mut stdin = stdin;
stdin
.write_all(user_message.as_bytes())
.await
.map_err(|e| InvokeError::Io(format!("stdin write: {e}")))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| InvokeError::Io(format!("stdin newline: {e}")))?;
}
let stdout = child
.stdout
.take()
.ok_or_else(|| InvokeError::Io("stdout missing".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| InvokeError::Io("stderr missing".to_string()))?;
let reader = tokio::io::BufReader::new(stdout);
let process_fut = async {
let mut result = process_stream(reader, emitter).await?;
result.dropped_attachments = dropped_attachments;
let exit = child
.wait()
.await
.map_err(|e| InvokeError::Io(format!("wait: {e}")))?;
if !exit.success() && !result.is_error {
let mut stderr_buf = Vec::new();
let mut stderr_reader = tokio::io::BufReader::new(stderr);
let _ = tokio::io::AsyncReadExt::read_to_end(&mut stderr_reader, &mut stderr_buf).await;
let stderr_text = String::from_utf8_lossy(&stderr_buf).to_string();
result.is_error = true;
result.error = Some(format!(
"exit code {}: {}",
exit.code().unwrap_or(-1),
stderr_text.trim()
));
}
Ok::<_, InvokeError>(result)
};
match tokio::time::timeout(timeout, process_fut).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => Err(InvokeError::Timeout(timeout_secs)),
}
}
pub async fn process_stream<R>(
reader: R,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError>
where
R: AsyncBufRead + Unpin,
{
let mut result = InvokeResult::default();
let mut lines = reader.lines();
loop {
let line = match lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => return Err(InvokeError::Io(format!("stdout read: {e}"))),
};
let event = match parse_line(&line) {
Ok(Some(e)) => e,
Ok(None) => continue,
Err(e) => {
if result.answer.is_empty() && result.error.is_none() {
result.error = Some(format!("malformed JSON: {e}"));
}
continue;
}
};
if let Some(e) = &emitter {
e(event.clone());
}
apply_event(&mut result, event);
}
Ok(result)
}
fn apply_event(result: &mut InvokeResult, event: StreamEvent) {
match event {
StreamEvent::System(s) => {
if result.session_id.is_none() {
result.session_id = Some(s.session_id);
}
}
StreamEvent::Assistant(a) => {
result.turns = result.turns.saturating_add(1);
if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
for block in content {
if block.get("type").and_then(|v| v.as_str()) == Some("tool_use") {
result.tool_calls = result.tool_calls.saturating_add(1);
let id = block
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = block
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let input = block
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Null);
tracing::info!(
adapter = "claude-code",
tool_name = %name,
"external agent emitted tool_use (observation-only — \
policy gating via MCP route lands in Phase 2 stage 4b)"
);
result.tool_uses.push(ToolUseRequest { id, name, input });
}
}
}
}
StreamEvent::Result(r) => {
result.answer = r.result.unwrap_or_default();
result.duration_ms = r.duration_ms.unwrap_or(0);
result.total_cost_usd = r.total_cost_usd;
result.is_error = r.is_error;
if let Some(t) = r.num_turns {
result.turns = t;
}
if r.is_error {
result.error = Some(format!(
"agent reported error (subtype={}, terminal={:?})",
r.subtype, r.terminal_reason
));
}
}
StreamEvent::User(_) | StreamEvent::RateLimitEvent(_) | StreamEvent::Other => {
}
}
}
pub(crate) fn build_codex_args(opts: &InvokeOptions, mcp_endpoint: Option<&str>) -> Vec<String> {
let mut args = vec![
"exec".to_string(),
"--json".to_string(),
"--skip-git-repo-check".to_string(),
"--ephemeral".to_string(),
];
if let Some(cwd) = &opts.cwd {
args.push("--cd".to_string());
args.push(cwd.to_string_lossy().into_owned());
}
if let Some(endpoint) = mcp_endpoint.filter(|s| !s.is_empty()) {
let value = format!(r#"{{type="http",url="{}"}}"#, endpoint);
args.push("-c".to_string());
args.push(format!("mcp_servers.car={}", value));
}
for att in &opts.attachments {
if image_within_size_cap(&att.path) {
args.push("--image".to_string());
args.push(att.path.to_string_lossy().into_owned());
}
}
args.push("-".to_string());
args
}
pub async fn invoke_codex(
binary_path: &Path,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
tracing::info!(
adapter = "codex",
binary = %binary_path.display(),
task_len = task.len(),
"external agent invocation started"
);
let timeout_secs = opts
.timeout_secs
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS)
.max(1);
let timeout = Duration::from_secs(timeout_secs);
let args = build_codex_args(&opts, opts.mcp_endpoint.as_deref());
let accepted_images = args.iter().filter(|a| a.as_str() == "--image").count();
let dropped_attachments = (opts.attachments.len().saturating_sub(accepted_images)) as u32;
let mut cmd = Command::new(binary_path);
cmd.args(&args);
if let Some(cwd) = &opts.cwd {
cmd.current_dir(cwd);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| InvokeError::Spawn(e.to_string()))?;
{
let mut stdin = child
.stdin
.take()
.ok_or_else(|| InvokeError::Io("stdin closed unexpectedly".to_string()))?;
stdin
.write_all(task.as_bytes())
.await
.map_err(|e| InvokeError::Io(format!("stdin write: {e}")))?;
}
let stdout = child
.stdout
.take()
.ok_or_else(|| InvokeError::Io("stdout missing".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| InvokeError::Io("stderr missing".to_string()))?;
let reader = tokio::io::BufReader::new(stdout);
let process_fut = async {
let mut result = process_codex_stream(reader, emitter).await?;
result.dropped_attachments = dropped_attachments;
let exit = child
.wait()
.await
.map_err(|e| InvokeError::Io(format!("wait: {e}")))?;
if !exit.success() && !result.is_error {
let mut stderr_buf = Vec::new();
let mut stderr_reader = tokio::io::BufReader::new(stderr);
let _ = tokio::io::AsyncReadExt::read_to_end(&mut stderr_reader, &mut stderr_buf).await;
let stderr_text = String::from_utf8_lossy(&stderr_buf).to_string();
result.is_error = true;
result.error = Some(format!(
"exit code {}: {}",
exit.code().unwrap_or(-1),
stderr_text.trim()
));
}
Ok::<_, InvokeError>(result)
};
match tokio::time::timeout(timeout, process_fut).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => Err(InvokeError::Timeout(timeout_secs)),
}
}
fn synth_assistant_event(session_id: &str, content: serde_json::Value) -> StreamEvent {
StreamEvent::Assistant(AssistantEventData {
message: serde_json::json!({ "role": "assistant", "content": [content] }),
session_id: session_id.to_string(),
uuid: String::new(),
parent_tool_use_id: None,
})
}
pub async fn process_codex_stream<R>(
reader: R,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError>
where
R: tokio::io::AsyncBufRead + Unpin,
{
use serde_json::Value;
let started = std::time::Instant::now();
let mut result = InvokeResult::default();
let mut answer_parts: Vec<String> = Vec::new();
let mut lines = reader.lines();
loop {
let line = match lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => return Err(InvokeError::Io(format!("stdout read: {e}"))),
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let value: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
continue;
}
};
let kind = value.get("type").and_then(Value::as_str).unwrap_or("");
match kind {
"thread.started" => {
if let Some(id) = value.get("thread_id").and_then(Value::as_str) {
if result.session_id.is_none() {
result.session_id = Some(id.to_string());
}
}
if let Some(e) = &emitter {
e(StreamEvent::System(SystemEventData {
subtype: "init".to_string(),
session_id: result.session_id.clone().unwrap_or_default(),
model: None,
cwd: None,
tools: Vec::new(),
permission_mode: None,
claude_code_version: None,
extra: serde_json::Map::new(),
}));
}
}
"turn.started" => {
result.turns = result.turns.saturating_add(1);
}
"item.completed" => {
let Some(item) = value.get("item") else {
continue;
};
let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
match item_type {
"agent_message" => {
if let Some(text) = item.get("text").and_then(Value::as_str) {
if let Some(e) = &emitter {
let sid = result.session_id.clone().unwrap_or_default();
e(synth_assistant_event(
&sid,
serde_json::json!({ "type": "text", "text": text }),
));
}
answer_parts.push(text.to_string());
}
}
other if other.contains("tool") || other.contains("call") => {
result.tool_calls = result.tool_calls.saturating_add(1);
let id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let name = item
.get("name")
.and_then(Value::as_str)
.unwrap_or(other)
.to_string();
let input = item
.get("arguments")
.or_else(|| item.get("input"))
.cloned()
.unwrap_or(Value::Null);
tracing::info!(
adapter = "codex",
tool_name = %name,
"external agent emitted tool_use (observation-only)"
);
if let Some(e) = &emitter {
let sid = result.session_id.clone().unwrap_or_default();
e(synth_assistant_event(
&sid,
serde_json::json!({
"type": "tool_use",
"id": id,
"name": name,
"input": input,
}),
));
}
result.tool_uses.push(ToolUseRequest { id, name, input });
}
_ => {}
}
}
"turn.completed" => {
}
_ => {}
}
}
result.answer = answer_parts.join("");
result.duration_ms = started.elapsed().as_millis() as u64;
if result.answer.is_empty() && result.error.is_none() {
result.is_error = true;
result.error = Some("codex produced no agent_message".to_string());
}
Ok(result)
}
pub(crate) fn build_gemini_args(opts: &InvokeOptions, task: &str, image_refs: &[String]) -> Vec<String> {
let mut prompt = task.to_string();
for r in image_refs {
if !prompt.is_empty() {
prompt.push(' ');
}
prompt.push_str(&format!("@{r}"));
}
let args = vec!["-p".to_string(), prompt, "--yolo".to_string()];
if let Some(cwd) = &opts.cwd {
let _ = cwd;
}
args
}
fn stage_gemini_images(
attachments: &[ImageAttachment],
) -> (Option<tempfile::TempDir>, Vec<String>) {
if attachments.is_empty() {
return (None, Vec::new());
}
let dir = match tempfile::tempdir() {
Ok(d) => d,
Err(e) => {
tracing::warn!(error = %e, "could not create temp dir for gemini images; sending text only");
return (None, Vec::new());
}
};
let mut refs = Vec::new();
for (i, att) in attachments.iter().enumerate() {
let Some((bytes, media_type)) = read_validated_image(&att.path) else {
continue;
};
let ext = match media_type {
"image/jpeg" => "jpg",
"image/gif" => "gif",
"image/webp" => "webp",
_ => "png",
};
let dest = dir.path().join(format!("img{i}.{ext}"));
match std::fs::write(&dest, &bytes) {
Ok(_) => refs.push(dest.to_string_lossy().into_owned()),
Err(e) => {
tracing::warn!(path = %dest.display(), error = %e, "could not stage image for gemini; skipping");
}
}
}
(Some(dir), refs)
}
pub async fn invoke_gemini(
binary_path: &Path,
task: &str,
opts: InvokeOptions,
emitter: Option<StreamEventEmitter>,
) -> Result<InvokeResult, InvokeError> {
tracing::info!(
adapter = "gemini",
binary = %binary_path.display(),
task_len = task.len(),
"external agent invocation started"
);
if opts
.mcp_endpoint
.as_deref()
.filter(|s| !s.is_empty())
.is_some()
{
tracing::warn!(
adapter = "gemini",
"mcp_endpoint supplied but Gemini CLI v0.1.x doesn't support \
--mcp-config; agent will run without CAR's MCP namespace"
);
}
let timeout_secs = opts
.timeout_secs
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS)
.max(1);
let timeout = Duration::from_secs(timeout_secs);
let started = std::time::Instant::now();
let (_image_stage, image_refs) = stage_gemini_images(&opts.attachments);
let dropped_attachments = (opts.attachments.len().saturating_sub(image_refs.len())) as u32;
let args = build_gemini_args(&opts, task, &image_refs);
let mut cmd = Command::new(binary_path);
cmd.args(&args);
if let Some(cwd) = &opts.cwd {
cmd.current_dir(cwd);
}
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
let child = cmd.spawn().map_err(|e| InvokeError::Spawn(e.to_string()))?;
let process_fut = async {
let output = child
.wait_with_output()
.await
.map_err(|e| InvokeError::Io(format!("wait: {e}")))?;
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
let mut result = InvokeResult {
answer: stdout.trim().to_string(),
duration_ms: started.elapsed().as_millis() as u64,
dropped_attachments,
..Default::default()
};
if !output.status.success() {
result.is_error = true;
result.error = Some(format!(
"exit code {}: {}",
output.status.code().unwrap_or(-1),
stderr.trim()
));
} else if result.answer.is_empty() {
result.is_error = true;
result.error = Some("gemini produced no stdout output".to_string());
} else if let Some(e) = &emitter {
e(synth_assistant_event(
"",
serde_json::json!({ "type": "text", "text": result.answer.clone() }),
));
}
Ok::<_, InvokeError>(result)
};
match tokio::time::timeout(timeout, process_fut).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => Err(InvokeError::Timeout(timeout_secs)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tokio::io::BufReader;
fn mock_reader(lines: &[&str]) -> BufReader<Cursor<Vec<u8>>> {
let joined = lines.join("\n");
BufReader::new(Cursor::new(joined.into_bytes()))
}
#[tokio::test]
async fn aggregates_simple_text_response() {
let lines = [
r#"{"type":"system","subtype":"init","session_id":"s1","model":"opus","tools":[]}"#,
r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"ok"}],"usage":{}},"session_id":"s1","uuid":"u1"}"#,
r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":1500,"num_turns":1,"result":"ok","session_id":"s1","total_cost_usd":0.05,"usage":{},"modelUsage":{},"uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert_eq!(result.answer, "ok");
assert_eq!(result.session_id.as_deref(), Some("s1"));
assert_eq!(result.turns, 1);
assert_eq!(result.tool_calls, 0);
assert_eq!(result.duration_ms, 1500);
assert_eq!(result.total_cost_usd, Some(0.05));
assert!(!result.is_error);
assert!(result.error.is_none());
}
#[tokio::test]
async fn counts_tool_use_blocks_across_turns() {
let lines = [
r#"{"type":"system","subtype":"init","session_id":"s2","model":"opus","tools":[]}"#,
r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"t1","name":"Read","input":{"file_path":"/x"}}],"usage":{}},"session_id":"s2","uuid":"u1"}"#,
r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"t2","name":"Bash","input":{"command":"ls"}},{"type":"text","text":"done"}],"usage":{}},"session_id":"s2","uuid":"u2"}"#,
r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":3000,"num_turns":2,"result":"done","session_id":"s2","total_cost_usd":0.10,"usage":{},"modelUsage":{},"uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert_eq!(result.tool_calls, 2);
assert_eq!(result.turns, 2);
assert_eq!(result.answer, "done");
assert_eq!(result.tool_uses.len(), 2);
assert_eq!(result.tool_uses[0].id, "t1");
assert_eq!(result.tool_uses[0].name, "Read");
assert_eq!(
result.tool_uses[0]
.input
.get("file_path")
.and_then(|v| v.as_str()),
Some("/x")
);
assert_eq!(result.tool_uses[1].id, "t2");
assert_eq!(result.tool_uses[1].name, "Bash");
}
#[tokio::test]
async fn surfaces_agent_reported_error() {
let lines = [
r#"{"type":"system","subtype":"init","session_id":"s3","model":"opus","tools":[]}"#,
r#"{"type":"result","subtype":"error","is_error":true,"duration_ms":500,"session_id":"s3","total_cost_usd":0.0,"usage":{},"modelUsage":{},"terminal_reason":"timeout","uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert!(result.is_error);
assert!(result.error.as_deref().unwrap().contains("error"));
}
#[tokio::test]
async fn empty_stream_produces_empty_result_with_no_panic() {
let result = process_stream(mock_reader(&[]), None).await.unwrap();
assert_eq!(result.answer, "");
assert_eq!(result.turns, 0);
assert!(result.session_id.is_none());
}
#[tokio::test]
async fn malformed_line_logged_but_stream_continues() {
let lines = [
r#"{not valid"#,
r#"{"type":"system","subtype":"init","session_id":"s4","model":"opus","tools":[]}"#,
r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":100,"num_turns":0,"result":"recovered","session_id":"s4","total_cost_usd":0.0,"usage":{},"modelUsage":{},"uuid":"r1"}"#,
];
let result = process_stream(mock_reader(&lines), None).await.unwrap();
assert_eq!(result.answer, "recovered");
assert_eq!(result.session_id.as_deref(), Some("s4"));
}
#[test]
fn build_claude_args_includes_required_format_flags() {
let args = build_claude_args(&InvokeOptions::default(), None);
assert!(args.contains(&"-p".to_string()));
assert!(args.iter().any(|a| a == "stream-json"));
assert!(!args.iter().any(|a| a == "--allowed-tools"));
assert!(!args.iter().any(|a| a == "--max-turns"));
assert!(!args.iter().any(|a| a == "--mcp-config"));
}
#[test]
fn build_claude_args_passes_allowed_tools() {
let opts = InvokeOptions {
allowed_tools: Some(vec!["Read".to_string(), "Bash".to_string()]),
max_turns: Some(5),
..Default::default()
};
let args = build_claude_args(&opts, None);
let pos = args.iter().position(|a| a == "--allowed-tools").unwrap();
assert_eq!(args[pos + 1], "Read Bash");
let pos = args.iter().position(|a| a == "--max-turns").unwrap();
assert_eq!(args[pos + 1], "5");
}
#[test]
fn build_claude_args_empty_allowed_tools_denies_everything() {
let opts = InvokeOptions {
allowed_tools: Some(vec![]),
..Default::default()
};
let args = build_claude_args(&opts, None);
let pos = args.iter().position(|a| a == "--allowed-tools").unwrap();
assert_eq!(args[pos + 1], "");
}
#[test]
fn build_claude_args_threads_mcp_config_path() {
let path = std::path::Path::new("/tmp/mcp-config.json");
let args = build_claude_args(&InvokeOptions::default(), Some(path));
let pos = args.iter().position(|a| a == "--mcp-config").unwrap();
assert_eq!(args[pos + 1], "/tmp/mcp-config.json");
}
#[test]
fn build_mcp_config_renders_http_server_entry() {
let json = build_mcp_config_json("http://127.0.0.1:9102/mcp");
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["mcpServers"]["car"]["type"], "http");
assert_eq!(
parsed["mcpServers"]["car"]["url"],
"http://127.0.0.1:9102/mcp"
);
}
#[test]
fn build_user_message_is_valid_json() {
let (msg, _accepted) = build_user_message("hello world", &[]);
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
assert_eq!(parsed["type"], "user");
assert_eq!(parsed["message"]["role"], "user");
assert_eq!(parsed["message"]["content"], "hello world");
}
#[test]
fn codex_args_use_exec_subcommand_and_json_output() {
let args = build_codex_args(&InvokeOptions::default(), None);
assert_eq!(args[0], "exec");
assert!(args.contains(&"--json".to_string()));
assert!(args.contains(&"--skip-git-repo-check".to_string()));
assert!(args.contains(&"--ephemeral".to_string()));
assert_eq!(args.last().map(String::as_str), Some("-"));
}
#[test]
fn codex_args_inject_mcp_via_inline_config_override() {
let args = build_codex_args(&InvokeOptions::default(), Some("http://127.0.0.1:9102/mcp"));
let mcp_pos = args.iter().position(|a| a == "-c").unwrap();
assert!(args[mcp_pos + 1].starts_with("mcp_servers.car="));
assert!(args[mcp_pos + 1].contains("http://127.0.0.1:9102/mcp"));
assert!(args[mcp_pos + 1].contains("type=\"http\""));
}
#[test]
fn codex_args_threads_cd_flag() {
let opts = InvokeOptions {
cwd: Some("/tmp/work".into()),
..Default::default()
};
let args = build_codex_args(&opts, None);
let pos = args.iter().position(|a| a == "--cd").unwrap();
assert_eq!(args[pos + 1], "/tmp/work");
}
#[tokio::test]
async fn codex_stream_aggregates_agent_message() {
let lines = [
r#"{"type":"thread.started","thread_id":"thread-abc"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"hello world"}}"#,
r#"{"type":"turn.completed","usage":{"input_tokens":5,"output_tokens":2}}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.answer, "hello world");
assert_eq!(result.session_id.as_deref(), Some("thread-abc"));
assert_eq!(result.turns, 1);
assert!(!result.is_error);
}
#[tokio::test]
async fn codex_stream_concatenates_multiple_agent_messages() {
let lines = [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"part 1 "}}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"part 2"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.answer, "part 1 part 2");
}
#[tokio::test]
async fn codex_stream_records_tool_calls() {
let lines = [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"id":"call_1","type":"tool_call","name":"shell","arguments":{"cmd":"ls"}}}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"done"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.tool_calls, 1);
assert_eq!(result.tool_uses.len(), 1);
assert_eq!(result.tool_uses[0].name, "shell");
assert_eq!(result.answer, "done");
}
#[tokio::test]
async fn codex_stream_skips_non_json_banner_lines() {
let lines = [
"Reading prompt from stdin...",
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"ok"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert_eq!(result.answer, "ok");
}
#[tokio::test]
async fn codex_stream_no_agent_message_marks_error() {
let lines = [r#"{"type":"thread.started","thread_id":"t"}"#];
let result = process_codex_stream(mock_reader(&lines), None)
.await
.unwrap();
assert!(result.is_error);
assert!(result.answer.is_empty());
}
#[tokio::test]
async fn codex_stream_fires_assistant_events_to_emitter() {
use std::sync::Mutex;
let captured: Arc<Mutex<Vec<StreamEvent>>> = Arc::new(Mutex::new(Vec::new()));
let sink = captured.clone();
let emitter: StreamEventEmitter = Arc::new(move |ev| sink.lock().unwrap().push(ev));
let lines = [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"turn.started"}"#,
r#"{"type":"item.completed","item":{"type":"agent_message","text":"hi there"}}"#,
r#"{"type":"turn.completed"}"#,
];
let result = process_codex_stream(mock_reader(&lines), Some(emitter))
.await
.unwrap();
assert_eq!(result.answer, "hi there");
let events = captured.lock().unwrap();
let texts: Vec<String> = events
.iter()
.filter_map(|e| match e {
StreamEvent::Assistant(a) => a
.message
.get("content")
.and_then(|c| c.as_array())
.and_then(|arr| arr.first())
.filter(|b| b.get("type").and_then(|t| t.as_str()) == Some("text"))
.and_then(|b| b.get("text"))
.and_then(|t| t.as_str())
.map(str::to_string),
_ => None,
})
.collect();
assert!(
texts.iter().any(|t| t == "hi there"),
"codex emitter must fire an Assistant text event (car#213); got {events:?}"
);
}
#[test]
fn gemini_args_use_prompt_and_yolo() {
let args = build_gemini_args(&InvokeOptions::default(), "say hi", &[]);
assert_eq!(args[0], "-p");
assert_eq!(args[1], "say hi");
assert!(args.contains(&"--yolo".to_string()));
}
const PNG_BYTES: &[u8] = b"\x89PNG\r\n\x1a\n\x00\x00\x00rest";
const JPEG_BYTES: &[u8] = b"\xFF\xD8\xFF\xE0rest";
fn temp_attachment(dir: &std::path::Path, name: &str, bytes: &[u8]) -> ImageAttachment {
let path = dir.join(name);
std::fs::write(&path, bytes).unwrap();
ImageAttachment { path, media_type: None }
}
#[test]
fn sniff_recognizes_supported_formats() {
assert_eq!(sniff_image_media_type(PNG_BYTES), Some("image/png"));
assert_eq!(sniff_image_media_type(JPEG_BYTES), Some("image/jpeg"));
assert_eq!(sniff_image_media_type(b"GIF89a..."), Some("image/gif"));
assert_eq!(sniff_image_media_type(b"RIFF\0\0\0\0WEBPrest"), Some("image/webp"));
assert_eq!(sniff_image_media_type(b"-----BEGIN OPENSSH PRIVATE KEY-----"), None);
assert_eq!(sniff_image_media_type(b"x"), None);
assert_eq!(sniff_image_media_type(b""), None);
}
#[test]
fn user_message_with_image_builds_content_array() {
let dir = tempfile::tempdir().unwrap();
let att = temp_attachment(dir.path(), "shot.png", PNG_BYTES);
let (msg, _accepted) = build_user_message("describe this", std::slice::from_ref(&att));
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
let content = parsed["message"]["content"].as_array().unwrap();
assert_eq!(content.len(), 2);
assert_eq!(content[0]["type"], "text");
assert_eq!(content[0]["text"], "describe this");
assert_eq!(content[1]["type"], "image");
assert_eq!(content[1]["source"]["type"], "base64");
assert_eq!(content[1]["source"]["media_type"], "image/png");
use base64::Engine as _;
let data = content[1]["source"]["data"].as_str().unwrap();
let decoded = base64::engine::general_purpose::STANDARD.decode(data).unwrap();
assert_eq!(decoded, PNG_BYTES);
}
#[test]
fn user_message_media_type_from_content_not_extension() {
let dir = tempfile::tempdir().unwrap();
let att = temp_attachment(dir.path(), "mislabeled.png", JPEG_BYTES);
let (msg, _accepted) = build_user_message("x", std::slice::from_ref(&att));
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
let content = parsed["message"]["content"].as_array().unwrap();
assert_eq!(content[1]["source"]["media_type"], "image/jpeg");
}
#[test]
fn user_message_image_only_omits_text_block() {
let dir = tempfile::tempdir().unwrap();
let att = temp_attachment(dir.path(), "shot.png", PNG_BYTES);
let (msg, _accepted) = build_user_message("", std::slice::from_ref(&att));
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
let content = parsed["message"]["content"].as_array().unwrap();
assert_eq!(content.len(), 1);
assert_eq!(content[0]["type"], "image");
}
#[test]
fn user_message_unreadable_image_is_skipped() {
let att = ImageAttachment {
path: std::path::PathBuf::from("/no/such/file.png"),
media_type: None,
};
let (msg, _accepted) = build_user_message("hi", std::slice::from_ref(&att));
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
let content = parsed["message"]["content"].as_array().unwrap();
assert_eq!(content.len(), 1);
assert_eq!(content[0]["type"], "text");
}
#[test]
fn user_message_reports_accepted_image_count() {
let dir = tempfile::tempdir().unwrap();
let good1 = temp_attachment(dir.path(), "a.png", PNG_BYTES);
let bad = temp_attachment(dir.path(), "b.png", b"not an image");
let good2 = temp_attachment(dir.path(), "c.jpg", JPEG_BYTES);
let (_msg, accepted) = build_user_message("hi", &[good1, bad, good2]);
assert_eq!(accepted, 2);
}
#[test]
fn user_message_non_image_file_is_not_inlined() {
let dir = tempfile::tempdir().unwrap();
let att = temp_attachment(dir.path(), "secret.png", b"-----BEGIN PRIVATE KEY-----");
let (msg, _accepted) = build_user_message("hi", std::slice::from_ref(&att));
let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap();
let content = parsed["message"]["content"].as_array().unwrap();
assert_eq!(content.len(), 1);
assert_eq!(content[0]["type"], "text");
}
#[test]
fn codex_args_include_image_flag_before_stdin_marker() {
let dir = tempfile::tempdir().unwrap();
let att = temp_attachment(dir.path(), "a.png", b"x");
let opts = InvokeOptions { attachments: vec![att.clone()], ..Default::default() };
let args = build_codex_args(&opts, None);
let image_idx = args.iter().position(|a| a == "--image").unwrap();
assert_eq!(args[image_idx + 1], att.path.to_string_lossy());
assert_eq!(args.last().unwrap(), "-");
assert!(image_idx < args.len() - 1);
}
#[test]
fn codex_args_skip_missing_image() {
let opts = InvokeOptions {
attachments: vec![ImageAttachment {
path: std::path::PathBuf::from("/no/such/img.png"),
media_type: None,
}],
..Default::default()
};
let args = build_codex_args(&opts, None);
assert!(!args.iter().any(|a| a == "--image"));
}
#[test]
fn gemini_args_append_image_refs() {
let args = build_gemini_args(
&InvokeOptions::default(),
"look",
&["/tmp/stage/img0.png".to_string()],
);
assert_eq!(args[1], "look @/tmp/stage/img0.png");
}
#[test]
fn stage_gemini_images_copies_to_safe_paths() {
let src = tempfile::tempdir().unwrap();
let att = temp_attachment(src.path(), "orig.png", PNG_BYTES);
let (stage, refs) = stage_gemini_images(std::slice::from_ref(&att));
assert!(stage.is_some());
assert_eq!(refs.len(), 1);
assert!(refs[0].ends_with("img0.png"));
assert!(!refs[0].contains(' '));
assert_eq!(std::fs::read(&refs[0]).unwrap(), PNG_BYTES);
}
#[test]
fn stage_gemini_images_skips_non_image() {
let src = tempfile::tempdir().unwrap();
let att = temp_attachment(src.path(), "notes.png", b"just text");
let (_stage, refs) = stage_gemini_images(std::slice::from_ref(&att));
assert!(refs.is_empty());
}
#[test]
fn invoke_options_deserializes_attachments() {
let json = r#"{"attachments":[{"path":"/a/b.png","media_type":"image/png"}]}"#;
let opts: InvokeOptions = serde_json::from_str(json).unwrap();
assert_eq!(opts.attachments.len(), 1);
assert_eq!(opts.attachments[0].path, std::path::PathBuf::from("/a/b.png"));
assert_eq!(opts.attachments[0].media_type.as_deref(), Some("image/png"));
let bare: InvokeOptions = serde_json::from_str("{}").unwrap();
assert!(bare.attachments.is_empty());
}
}