Skip to main content

atomr_agents_coding_cli_core/
event.rs

1//! Normalized event schema emitted by every vendor adapter.
2//!
3//! Adapters parse vendor-specific NDJSON / line streams and translate
4//! them into `CodingCliEvent`. The harness fans events out on a
5//! `tokio::sync::broadcast` channel; SSE in the web companion and the
6//! Python async iterator both consume from this channel.
7
8use std::collections::BTreeMap;
9
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::request::{CliRunId, CliSessionId};
14use crate::vendor::CliVendorKind;
15
16/// Why the run finished.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "snake_case")]
19pub enum FinishReason {
20    /// CLI reported a successful completion.
21    Completed,
22    /// Reached the wall-clock or token budget.
23    BudgetExhausted,
24    /// User or harness cancelled the run.
25    Cancelled,
26    /// CLI exited with a non-zero status.
27    ProcessError,
28    /// Parse pipeline gave up on the stream.
29    StreamError,
30}
31
32/// One tool descriptor reported by the CLI during init.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ToolDescriptorInit {
35    pub name: String,
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub description: Option<String>,
38}
39
40/// One MCP server the CLI loaded.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct McpServerInit {
43    pub name: String,
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub status: Option<String>,
46}
47
48/// Normalized lifecycle events. Tagged enum — serializes as
49/// `{"kind": "...", ...}` so the web client can switch on `kind`.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(tag = "kind", rename_all = "snake_case")]
52pub enum CodingCliEvent {
53    /// The harness has spawned the CLI process. Emitted before any
54    /// vendor-side events.
55    RunStarted {
56        run_id: CliRunId,
57        vendor: CliVendorKind,
58        model: Option<String>,
59        session_id: Option<CliSessionId>,
60    },
61
62    /// The CLI's `system/init` (or equivalent): tools loaded, MCP
63    /// servers connected, plugins resolved.
64    SystemInit {
65        tools: Vec<ToolDescriptorInit>,
66        mcp_servers: Vec<McpServerInit>,
67        #[serde(default)]
68        plugins: Vec<String>,
69    },
70
71    /// Streaming assistant text. Adapters emit one event per delta.
72    AssistantTextDelta { text: String },
73
74    /// CLI started a tool call.
75    ToolCallStarted {
76        tool_call_id: String,
77        name: String,
78        input: serde_json::Value,
79    },
80
81    /// CLI's tool call returned.
82    ToolCallFinished {
83        tool_call_id: String,
84        output: Option<serde_json::Value>,
85        error: Option<String>,
86    },
87
88    /// Vendor reported a retryable API error.
89    ApiRetry {
90        attempt: u32,
91        delay_ms: u64,
92        reason: String,
93    },
94
95    /// Token / cost accounting.
96    Usage {
97        input_tokens: u64,
98        output_tokens: u64,
99        cost_usd: Option<f64>,
100    },
101
102    /// Terminal event — process is done.
103    RunFinished {
104        reason: FinishReason,
105        result_text: Option<String>,
106    },
107
108    /// Pass-through for vendor-specific events the normalizer doesn't
109    /// yet map. Always safe to ignore in the UI.
110    RawVendorEvent {
111        vendor: CliVendorKind,
112        payload: serde_json::Value,
113    },
114
115    /// Free-form diagnostic message (parser warnings, etc.).
116    Note {
117        message: String,
118        #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
119        fields: BTreeMap<String, serde_json::Value>,
120    },
121}
122
123/// Subscriber handle backed by a `broadcast::Receiver`. Drops missed
124/// events silently — same semantics as `DeepResearchEventStream`.
125pub struct CodingCliEventStream {
126    rx: broadcast::Receiver<CodingCliEvent>,
127}
128
129impl CodingCliEventStream {
130    pub fn new(rx: broadcast::Receiver<CodingCliEvent>) -> Self {
131        Self { rx }
132    }
133
134    /// Wait for the next event. Returns `None` once the channel closes.
135    pub async fn recv(&mut self) -> Option<CodingCliEvent> {
136        loop {
137            match self.rx.recv().await {
138                Ok(ev) => return Some(ev),
139                Err(broadcast::error::RecvError::Lagged(_)) => continue,
140                Err(broadcast::error::RecvError::Closed) => return None,
141            }
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn event_round_trips_json() {
152        let ev = CodingCliEvent::AssistantTextDelta {
153            text: "Hello".into(),
154        };
155        let j = serde_json::to_string(&ev).unwrap();
156        assert!(j.contains("\"kind\":\"assistant_text_delta\""));
157        let back: CodingCliEvent = serde_json::from_str(&j).unwrap();
158        if let CodingCliEvent::AssistantTextDelta { text } = back {
159            assert_eq!(text, "Hello");
160        } else {
161            panic!("wrong variant");
162        }
163    }
164
165    #[test]
166    fn finish_reason_serializes_snake_case() {
167        let j = serde_json::to_string(&FinishReason::BudgetExhausted).unwrap();
168        assert_eq!(j, "\"budget_exhausted\"");
169    }
170}