car-external-agents 0.14.0

Detection of installed agentic CLIs (Claude Code, Codex, Gemini) for the Common Agent Runtime.
Documentation
//! Stream-json wire protocol for Claude Code (and the family of
//! agentic CLIs that follow the same shape).
//!
//! Each line on stdout is a single JSON object discriminated by its
//! `type` field. The full event taxonomy as observed against
//! `claude --output-format stream-json --input-format stream-json`:
//!
//! | type | When | What we read |
//! |---|---|---|
//! | `system` (subtype `init`) | Once at start | session_id, model, tools list, MCP servers, slash commands |
//! | `rate_limit_event` | Periodic | rate-limit utilization warnings |
//! | `assistant` | Each model turn | text content, tool_use blocks, usage |
//! | `user` | When `--replay-user-messages` is set | echoed user message |
//! | `result` | Once at end | final answer, duration, cost, num_turns |
//!
//! Tool execution events (`tool_use`, `tool_result`) arrive nested
//! inside `assistant.message.content[i]` blocks. The wire shape
//! mirrors the Anthropic Messages API exactly, which is intentional —
//! the SDK is a thin transport over the same primitives.
//!
//! Phase 2 stage 1 covers parsing only. Stage 2 wires the parsed
//! events into per-task invocation through `agents.invoke_external`.

use serde::{Deserialize, Serialize};
use serde_json::Value;

/// One line from Claude Code's stream-json stdout, parsed into a
/// strongly-typed variant when we recognise the `type` discriminator,
/// or preserved as raw JSON via [`StreamEvent::Other`] when we don't
/// — so a new event variant from an upstream release doesn't break
/// the parser.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamEvent {
    /// Session init / metadata. Emitted once at the top of the
    /// stream; carries everything a host UI needs to label the run.
    System(SystemEventData),
    /// Assistant message — text content + tool_use blocks + usage.
    /// One of these per model turn; multiple in a session that
    /// involves tool calls.
    Assistant(AssistantEventData),
    /// User message echo (`--replay-user-messages` flag). Off by
    /// default, useful when the host wants confirmation that the
    /// daemon's input arrived.
    User(UserEventData),
    /// Final result. Emitted once at the end of the run.
    Result(ResultEventData),
    /// Rate-limit warning. Non-fatal; the session continues.
    RateLimitEvent(RateLimitEventData),
    /// Any event type the parser doesn't have a typed shape for yet.
    /// Preserved as raw JSON so consumers can still extract whatever
    /// they need, and so an upstream release that introduces a new
    /// event type doesn't kill the stream.
    #[serde(other)]
    Other,
}

/// Captured `type: system` event content (excluding the discriminator).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemEventData {
    /// `init`, etc.
    pub subtype: String,
    /// Stable session id; used to correlate events across the run.
    pub session_id: String,
    /// Model identifier (`claude-opus-4-7[1m]`, etc.).
    #[serde(default)]
    pub model: Option<String>,
    /// Working directory the agent is operating in.
    #[serde(default)]
    pub cwd: Option<String>,
    /// Tools the agent will actually try to call. Useful for the
    /// host to decide which tools to wire into its dispatcher.
    #[serde(default)]
    pub tools: Vec<String>,
    /// Permission mode the binary launched in (`auto`, `plan`,
    /// `bypassPermissions`).
    #[serde(default)]
    pub permission_mode: Option<String>,
    /// Claude Code version reported by the binary.
    #[serde(default)]
    pub claude_code_version: Option<String>,
    /// Catch-all so we don't drop fields that aren't load-bearing
    /// today (mcp_servers, slash_commands, plugins, agents, …).
    #[serde(flatten)]
    pub extra: serde_json::Map<String, Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssistantEventData {
    /// The Anthropic Messages-API-shaped assistant message —
    /// `{role, content: [{type, text|input|...}], usage, ...}`.
    pub message: Value,
    pub session_id: String,
    pub uuid: String,
    /// Set when this assistant turn is happening inside a tool
    /// (e.g. nested Task tool invocations); `None` for top-level
    /// turns.
    #[serde(default)]
    pub parent_tool_use_id: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserEventData {
    pub message: Value,
    pub session_id: String,
    pub uuid: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultEventData {
    /// `success`, `error`, etc.
    pub subtype: String,
    pub is_error: bool,
    /// Wall-clock duration of the entire run.
    #[serde(default)]
    pub duration_ms: Option<u64>,
    /// Time spent waiting on the model (excludes tool execution).
    #[serde(default)]
    pub duration_api_ms: Option<u64>,
    /// Number of model turns. Useful as a cost-control proxy —
    /// users can cap a deployment's allowed turns.
    #[serde(default)]
    pub num_turns: Option<u32>,
    /// The final answer. May be empty when `is_error` is true.
    #[serde(default)]
    pub result: Option<String>,
    pub session_id: String,
    /// USD cost the model would charge against the API for this run.
    /// Subscription users don't pay this — it's "what this would
    /// have cost if billed per-token" and is reported for
    /// transparency.
    #[serde(default)]
    pub total_cost_usd: Option<f64>,
    #[serde(default)]
    pub stop_reason: Option<String>,
    #[serde(default)]
    pub terminal_reason: Option<String>,
    /// Anthropic API usage envelope (input/output tokens, cache hits).
    #[serde(default)]
    pub usage: Value,
    /// Per-model breakdown when the run involved multiple models
    /// (Claude Code uses Haiku for some background tasks).
    #[serde(default)]
    pub model_usage: Value,
    #[serde(flatten)]
    pub extra: serde_json::Map<String, Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RateLimitEventData {
    pub rate_limit_info: Value,
    pub session_id: String,
    pub uuid: String,
}

/// Parse a single NDJSON line into a [`StreamEvent`]. Empty lines
/// are skipped (returned as `Ok(None)`); malformed JSON returns an
/// error so the caller can decide whether to log-and-continue or
/// abort the stream.
pub fn parse_line(line: &str) -> Result<Option<StreamEvent>, serde_json::Error> {
    let trimmed = line.trim();
    if trimmed.is_empty() {
        return Ok(None);
    }
    serde_json::from_str(trimmed).map(Some)
}

/// Helpers that pull the most common information out of a parsed
/// event without forcing the caller to match every variant.
impl StreamEvent {
    /// Plain-text discriminator string. Useful for logging /
    /// telemetry.
    pub fn type_name(&self) -> &'static str {
        match self {
            StreamEvent::System(_) => "system",
            StreamEvent::Assistant(_) => "assistant",
            StreamEvent::User(_) => "user",
            StreamEvent::Result(_) => "result",
            StreamEvent::RateLimitEvent(_) => "rate_limit_event",
            StreamEvent::Other => "other",
        }
    }

    /// Stable per-session id when present.
    pub fn session_id(&self) -> Option<&str> {
        match self {
            StreamEvent::System(s) => Some(&s.session_id),
            StreamEvent::Assistant(a) => Some(&a.session_id),
            StreamEvent::User(u) => Some(&u.session_id),
            StreamEvent::Result(r) => Some(&r.session_id),
            StreamEvent::RateLimitEvent(r) => Some(&r.session_id),
            StreamEvent::Other => None,
        }
    }

    /// Concatenated text content from an assistant message. Returns
    /// `None` when this isn't an assistant event or the content
    /// array contains no text blocks (e.g. pure tool_use turns).
    pub fn assistant_text(&self) -> Option<String> {
        let StreamEvent::Assistant(a) = self else {
            return None;
        };
        let content = a.message.get("content")?.as_array()?;
        let mut out = String::new();
        for block in content {
            if block.get("type").and_then(Value::as_str) == Some("text") {
                if let Some(text) = block.get("text").and_then(Value::as_str) {
                    out.push_str(text);
                }
            }
        }
        if out.is_empty() {
            None
        } else {
            Some(out)
        }
    }

    /// Tool-use blocks the assistant emitted on this turn — each
    /// one is a (name, input_json) pair the host needs to execute
    /// and then return a `tool_result` for. Empty when the turn
    /// was pure text.
    pub fn tool_uses(&self) -> Vec<ToolUseRequest> {
        let StreamEvent::Assistant(a) = self else {
            return Vec::new();
        };
        let Some(content) = a.message.get("content").and_then(Value::as_array) else {
            return Vec::new();
        };
        let mut out = Vec::new();
        for block in content {
            if block.get("type").and_then(Value::as_str) == Some("tool_use") {
                let id = block
                    .get("id")
                    .and_then(Value::as_str)
                    .unwrap_or("")
                    .to_string();
                let name = block
                    .get("name")
                    .and_then(Value::as_str)
                    .unwrap_or("")
                    .to_string();
                let input = block.get("input").cloned().unwrap_or(Value::Null);
                out.push(ToolUseRequest { id, name, input });
            }
        }
        out
    }
}

/// Tool the assistant wants the host to execute. The host runs it,
/// then sends back a `tool_result` shaped object as the next user
/// message via the input stream.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolUseRequest {
    pub id: String,
    pub name: String,
    pub input: Value,
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Captured from a real `claude -p --output-format stream-json`
    /// session on 2026-05-10 — the entire stream for "reply with
    /// the word ok and nothing else" condensed into the four event
    /// types we care about. Keep this fixture as the canonical
    /// shape; do NOT regenerate against live claude (each call
    /// burns subscription quota and ~30K context tokens).
    const FIXTURE_SYSTEM: &str = r#"{"type":"system","subtype":"init","cwd":"/private/tmp/x","session_id":"sess-1","tools":["Bash","Read","Edit"],"model":"claude-opus-4-7[1m]","permissionMode":"auto","claude_code_version":"2.1.138","apiKeySource":"none"}"#;
    const FIXTURE_ASSISTANT: &str = r#"{"type":"assistant","message":{"model":"claude-opus-4-7","id":"msg_1","type":"message","role":"assistant","content":[{"type":"text","text":"ok"}],"usage":{"input_tokens":6,"output_tokens":1}},"parent_tool_use_id":null,"session_id":"sess-1","uuid":"u-1"}"#;
    const FIXTURE_RESULT: &str = r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":1742,"duration_api_ms":2572,"num_turns":1,"result":"ok","stop_reason":"end_turn","session_id":"sess-1","total_cost_usd":0.19,"usage":{},"modelUsage":{},"permission_denials":[],"terminal_reason":"completed","uuid":"r-1"}"#;
    const FIXTURE_RATE_LIMIT: &str = r#"{"type":"rate_limit_event","rate_limit_info":{"status":"allowed_warning","resetsAt":1778443200,"utilization":0.93},"uuid":"rl-1","session_id":"sess-1"}"#;
    /// A plausible tool_use turn — assistant decides to read a file.
    const FIXTURE_TOOL_USE: &str = r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"tu_1","name":"Read","input":{"file_path":"/tmp/x.txt"}}],"usage":{"input_tokens":10,"output_tokens":20}},"session_id":"sess-1","uuid":"u-2"}"#;

    #[test]
    fn parses_system_event() {
        let e = parse_line(FIXTURE_SYSTEM).unwrap().unwrap();
        let StreamEvent::System(s) = &e else {
            panic!("expected System, got {e:?}");
        };
        assert_eq!(s.subtype, "init");
        assert_eq!(s.session_id, "sess-1");
        assert_eq!(s.model.as_deref(), Some("claude-opus-4-7[1m]"));
        assert!(s.tools.contains(&"Bash".to_string()));
        assert_eq!(e.session_id(), Some("sess-1"));
        // The flatten catch-all preserves apiKeySource even though we
        // don't have a typed field for it.
        assert!(s.extra.contains_key("apiKeySource"));
    }

    #[test]
    fn parses_assistant_event_and_extracts_text() {
        let e = parse_line(FIXTURE_ASSISTANT).unwrap().unwrap();
        assert_eq!(e.type_name(), "assistant");
        assert_eq!(e.assistant_text().as_deref(), Some("ok"));
        assert!(e.tool_uses().is_empty());
    }

    #[test]
    fn parses_tool_use_block() {
        let e = parse_line(FIXTURE_TOOL_USE).unwrap().unwrap();
        let uses = e.tool_uses();
        assert_eq!(uses.len(), 1);
        assert_eq!(uses[0].name, "Read");
        assert_eq!(uses[0].id, "tu_1");
        assert_eq!(
            uses[0].input.get("file_path").and_then(Value::as_str),
            Some("/tmp/x.txt")
        );
        // No text block → assistant_text returns None.
        assert!(e.assistant_text().is_none());
    }

    #[test]
    fn parses_result_event() {
        let e = parse_line(FIXTURE_RESULT).unwrap().unwrap();
        let StreamEvent::Result(r) = &e else {
            panic!("expected Result, got {e:?}");
        };
        assert!(!r.is_error);
        assert_eq!(r.result.as_deref(), Some("ok"));
        assert_eq!(r.duration_ms, Some(1742));
        assert_eq!(r.num_turns, Some(1));
        assert!((r.total_cost_usd.unwrap() - 0.19).abs() < 1e-9);
        // `permission_denials` lands in `extra` since we don't have a
        // typed field for it.
        assert!(r.extra.contains_key("permission_denials"));
    }

    #[test]
    fn parses_rate_limit_event() {
        let e = parse_line(FIXTURE_RATE_LIMIT).unwrap().unwrap();
        let StreamEvent::RateLimitEvent(r) = &e else {
            panic!("expected RateLimitEvent, got {e:?}");
        };
        assert_eq!(r.session_id, "sess-1");
        assert_eq!(
            r.rate_limit_info.get("status").and_then(Value::as_str),
            Some("allowed_warning")
        );
    }

    #[test]
    fn unknown_type_falls_through_to_other() {
        let line = r#"{"type":"future_event_we_havent_seen_yet","payload":{"x":1}}"#;
        let e = parse_line(line).unwrap().unwrap();
        assert!(matches!(e, StreamEvent::Other));
        assert_eq!(e.type_name(), "other");
    }

    #[test]
    fn empty_line_is_skipped() {
        assert!(parse_line("").unwrap().is_none());
        assert!(parse_line("   \n").unwrap().is_none());
    }

    #[test]
    fn malformed_json_returns_err() {
        assert!(parse_line("{broken").is_err());
    }

    #[test]
    fn full_session_round_trips() {
        // Simulate the fixture stream the way the runner will see it
        // — assert we can extract the final answer + cost.
        let lines = [
            FIXTURE_SYSTEM,
            FIXTURE_RATE_LIMIT,
            FIXTURE_ASSISTANT,
            FIXTURE_RESULT,
        ];
        let events: Vec<StreamEvent> = lines
            .iter()
            .map(|l| parse_line(l).unwrap().unwrap())
            .collect();

        // First event is the session init.
        assert!(matches!(events[0], StreamEvent::System(_)));
        // Final event has the answer.
        let StreamEvent::Result(r) = &events[3] else {
            panic!("expected final Result");
        };
        assert_eq!(r.result.as_deref(), Some("ok"));

        // Aggregate text from all assistant events.
        let assistant_text: String = events
            .iter()
            .filter_map(StreamEvent::assistant_text)
            .collect();
        assert_eq!(assistant_text, "ok");
    }
}