Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
29#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ToolCallStatus {
32    /// Dispatched by the model but not yet started.
33    Pending,
34    /// Dispatch is actively running.
35    InProgress,
36    /// Finished successfully.
37    Completed,
38    /// Finished with an error.
39    Failed,
40}
41
42/// Events emitted by the agent loop. The first five variants map 1:1
43/// to ACP `sessionUpdate` variants; the last three are harn-internal.
44#[derive(Clone, Debug, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum AgentEvent {
47    AgentMessageChunk {
48        session_id: String,
49        content: String,
50    },
51    AgentThoughtChunk {
52        session_id: String,
53        content: String,
54    },
55    ToolCall {
56        session_id: String,
57        tool_call_id: String,
58        tool_name: String,
59        kind: Option<ToolKind>,
60        status: ToolCallStatus,
61        raw_input: serde_json::Value,
62    },
63    ToolCallUpdate {
64        session_id: String,
65        tool_call_id: String,
66        tool_name: String,
67        status: ToolCallStatus,
68        raw_output: Option<serde_json::Value>,
69        error: Option<String>,
70    },
71    Plan {
72        session_id: String,
73        plan: serde_json::Value,
74    },
75    TurnStart {
76        session_id: String,
77        iteration: usize,
78    },
79    TurnEnd {
80        session_id: String,
81        iteration: usize,
82        turn_info: serde_json::Value,
83    },
84    FeedbackInjected {
85        session_id: String,
86        kind: String,
87        content: String,
88    },
89    /// Emitted when the agent loop exhausts `max_iterations` without any
90    /// explicit break condition firing. Distinct from a natural "done" or
91    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
92    BudgetExhausted {
93        session_id: String,
94        max_iterations: usize,
95    },
96    /// Emitted when the loop breaks because consecutive text-only turns
97    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
98    /// hosts that key off agent-terminal events.
99    LoopStuck {
100        session_id: String,
101        max_nudges: usize,
102        last_iteration: usize,
103        tail_excerpt: String,
104    },
105    /// Emitted when the daemon idle-wait loop trips its watchdog because
106    /// every configured wake source returned `None` for N consecutive
107    /// attempts. Exists so a broken daemon doesn't hang the session
108    /// silently.
109    DaemonWatchdogTripped {
110        session_id: String,
111        attempts: usize,
112        elapsed_ms: u64,
113    },
114}
115
116impl AgentEvent {
117    pub fn session_id(&self) -> &str {
118        match self {
119            Self::AgentMessageChunk { session_id, .. }
120            | Self::AgentThoughtChunk { session_id, .. }
121            | Self::ToolCall { session_id, .. }
122            | Self::ToolCallUpdate { session_id, .. }
123            | Self::Plan { session_id, .. }
124            | Self::TurnStart { session_id, .. }
125            | Self::TurnEnd { session_id, .. }
126            | Self::FeedbackInjected { session_id, .. }
127            | Self::BudgetExhausted { session_id, .. }
128            | Self::LoopStuck { session_id, .. }
129            | Self::DaemonWatchdogTripped { session_id, .. } => session_id,
130        }
131    }
132}
133
134/// External consumers of the event stream (e.g. the harn-cli ACP server,
135/// which translates events into JSON-RPC notifications).
136pub trait AgentEventSink: Send + Sync {
137    fn handle_event(&self, event: &AgentEvent);
138}
139
140/// Fan-out helper for composing multiple external sinks.
141pub struct MultiSink {
142    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
143}
144
145impl MultiSink {
146    pub fn new() -> Self {
147        Self {
148            sinks: Mutex::new(Vec::new()),
149        }
150    }
151    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
152        self.sinks.lock().expect("sink mutex poisoned").push(sink);
153    }
154    pub fn len(&self) -> usize {
155        self.sinks.lock().expect("sink mutex poisoned").len()
156    }
157    pub fn is_empty(&self) -> bool {
158        self.len() == 0
159    }
160}
161
162impl Default for MultiSink {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168impl AgentEventSink for MultiSink {
169    fn handle_event(&self, event: &AgentEvent) {
170        // Deliberate: snapshot then release the lock before invoking sink
171        // callbacks. Sinks can re-enter the event system (e.g. a host
172        // sink that logs to another AgentEvent path), so holding the
173        // mutex across the callback would risk self-deadlock. Arc clones
174        // are refcount bumps — cheap.
175        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
176        for sink in sinks {
177            sink.handle_event(event);
178        }
179    }
180}
181
182type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
183
184fn external_sinks() -> &'static ExternalSinkRegistry {
185    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
186    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
187}
188
189pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
190    let session_id = session_id.into();
191    let mut reg = external_sinks().write().expect("sink registry poisoned");
192    reg.entry(session_id).or_default().push(sink);
193}
194
195/// Remove all external sinks registered for `session_id`. Does NOT
196/// close the session itself — subscribers and transcript survive, so a
197/// later `agent_loop` call with the same id continues the conversation.
198pub fn clear_session_sinks(session_id: &str) {
199    external_sinks()
200        .write()
201        .expect("sink registry poisoned")
202        .remove(session_id);
203}
204
205pub fn reset_all_sinks() {
206    external_sinks()
207        .write()
208        .expect("sink registry poisoned")
209        .clear();
210    crate::agent_sessions::reset_session_store();
211}
212
213/// Emit an event to external sinks registered for this session. Pipeline
214/// closure subscribers are NOT called by this function — the agent
215/// loop owns that path because it needs its async VM context.
216pub fn emit_event(event: &AgentEvent) {
217    let sinks: Vec<Arc<dyn AgentEventSink>> = {
218        let reg = external_sinks().read().expect("sink registry poisoned");
219        reg.get(event.session_id()).cloned().unwrap_or_default()
220    };
221    for sink in sinks {
222        sink.handle_event(event);
223    }
224}
225
226pub fn session_external_sink_count(session_id: &str) -> usize {
227    external_sinks()
228        .read()
229        .expect("sink registry poisoned")
230        .get(session_id)
231        .map(|v| v.len())
232        .unwrap_or(0)
233}
234
235pub fn session_closure_subscriber_count(session_id: &str) -> usize {
236    crate::agent_sessions::subscriber_count(session_id)
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use std::sync::atomic::{AtomicUsize, Ordering};
243
244    struct CountingSink(Arc<AtomicUsize>);
245    impl AgentEventSink for CountingSink {
246        fn handle_event(&self, _event: &AgentEvent) {
247            self.0.fetch_add(1, Ordering::SeqCst);
248        }
249    }
250
251    #[test]
252    fn multi_sink_fans_out_in_order() {
253        let multi = MultiSink::new();
254        let a = Arc::new(AtomicUsize::new(0));
255        let b = Arc::new(AtomicUsize::new(0));
256        multi.push(Arc::new(CountingSink(a.clone())));
257        multi.push(Arc::new(CountingSink(b.clone())));
258        let event = AgentEvent::TurnStart {
259            session_id: "s1".into(),
260            iteration: 1,
261        };
262        multi.handle_event(&event);
263        assert_eq!(a.load(Ordering::SeqCst), 1);
264        assert_eq!(b.load(Ordering::SeqCst), 1);
265    }
266
267    #[test]
268    fn session_scoped_sink_routing() {
269        reset_all_sinks();
270        let a = Arc::new(AtomicUsize::new(0));
271        let b = Arc::new(AtomicUsize::new(0));
272        register_sink("session-a", Arc::new(CountingSink(a.clone())));
273        register_sink("session-b", Arc::new(CountingSink(b.clone())));
274        emit_event(&AgentEvent::TurnStart {
275            session_id: "session-a".into(),
276            iteration: 0,
277        });
278        assert_eq!(a.load(Ordering::SeqCst), 1);
279        assert_eq!(b.load(Ordering::SeqCst), 0);
280        emit_event(&AgentEvent::TurnEnd {
281            session_id: "session-b".into(),
282            iteration: 0,
283            turn_info: serde_json::json!({}),
284        });
285        assert_eq!(a.load(Ordering::SeqCst), 1);
286        assert_eq!(b.load(Ordering::SeqCst), 1);
287        clear_session_sinks("session-a");
288        assert_eq!(session_external_sink_count("session-a"), 0);
289        assert_eq!(session_external_sink_count("session-b"), 1);
290        reset_all_sinks();
291    }
292
293    #[test]
294    fn tool_call_status_serde() {
295        assert_eq!(
296            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
297            "\"pending\""
298        );
299        assert_eq!(
300            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
301            "\"in_progress\""
302        );
303        assert_eq!(
304            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
305            "\"completed\""
306        );
307        assert_eq!(
308            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
309            "\"failed\""
310        );
311    }
312}