Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//! 2. **Closure subscribers** — `.harn` closures registered via the
15//!    `agent_subscribe(session_id, callback)` host builtin. Stored as
16//!    raw `VmValue`s and invoked by the agent loop in its async VM
17//!    context (the sink trait can't easily await, so this path
18//!    intentionally bypasses it).
19
20use std::cell::RefCell;
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27use crate::value::VmValue;
28
29/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum ToolCallStatus {
33    /// Dispatched by the model but not yet started.
34    Pending,
35    /// Dispatch is actively running.
36    InProgress,
37    /// Finished successfully.
38    Completed,
39    /// Finished with an error.
40    Failed,
41}
42
43/// Events emitted by the agent loop. The first five variants map 1:1
44/// to ACP `sessionUpdate` variants; the last three are harn-internal.
45#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(tag = "type", rename_all = "snake_case")]
47pub enum AgentEvent {
48    AgentMessageChunk {
49        session_id: String,
50        content: String,
51    },
52    AgentThoughtChunk {
53        session_id: String,
54        content: String,
55    },
56    ToolCall {
57        session_id: String,
58        tool_call_id: String,
59        tool_name: String,
60        kind: Option<ToolKind>,
61        status: ToolCallStatus,
62        raw_input: serde_json::Value,
63    },
64    ToolCallUpdate {
65        session_id: String,
66        tool_call_id: String,
67        tool_name: String,
68        status: ToolCallStatus,
69        raw_output: Option<serde_json::Value>,
70        error: Option<String>,
71    },
72    Plan {
73        session_id: String,
74        plan: serde_json::Value,
75    },
76    TurnStart {
77        session_id: String,
78        iteration: usize,
79    },
80    TurnEnd {
81        session_id: String,
82        iteration: usize,
83        turn_info: serde_json::Value,
84    },
85    FeedbackInjected {
86        session_id: String,
87        kind: String,
88        content: String,
89    },
90    /// Emitted when the agent loop exhausts `max_iterations` without any
91    /// explicit break condition firing. Distinct from a natural "done" or
92    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
93    BudgetExhausted {
94        session_id: String,
95        max_iterations: usize,
96    },
97    /// Emitted when the loop breaks because consecutive text-only turns
98    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
99    /// hosts that key off agent-terminal events.
100    LoopStuck {
101        session_id: String,
102        max_nudges: usize,
103        last_iteration: usize,
104        tail_excerpt: String,
105    },
106    /// Emitted when the daemon idle-wait loop trips its watchdog because
107    /// every configured wake source returned `None` for N consecutive
108    /// attempts. Exists so a broken daemon doesn't hang the session
109    /// silently.
110    DaemonWatchdogTripped {
111        session_id: String,
112        attempts: usize,
113        elapsed_ms: u64,
114    },
115}
116
117impl AgentEvent {
118    pub fn session_id(&self) -> &str {
119        match self {
120            Self::AgentMessageChunk { session_id, .. }
121            | Self::AgentThoughtChunk { session_id, .. }
122            | Self::ToolCall { session_id, .. }
123            | Self::ToolCallUpdate { session_id, .. }
124            | Self::Plan { session_id, .. }
125            | Self::TurnStart { session_id, .. }
126            | Self::TurnEnd { session_id, .. }
127            | Self::FeedbackInjected { session_id, .. }
128            | Self::BudgetExhausted { session_id, .. }
129            | Self::LoopStuck { session_id, .. }
130            | Self::DaemonWatchdogTripped { session_id, .. } => session_id,
131        }
132    }
133}
134
135/// External consumers of the event stream (e.g. the harn-cli ACP server,
136/// which translates events into JSON-RPC notifications).
137pub trait AgentEventSink: Send + Sync {
138    fn handle_event(&self, event: &AgentEvent);
139}
140
141/// Fan-out helper for composing multiple external sinks.
142pub struct MultiSink {
143    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
144}
145
146impl MultiSink {
147    pub fn new() -> Self {
148        Self {
149            sinks: Mutex::new(Vec::new()),
150        }
151    }
152    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
153        self.sinks.lock().expect("sink mutex poisoned").push(sink);
154    }
155    pub fn len(&self) -> usize {
156        self.sinks.lock().expect("sink mutex poisoned").len()
157    }
158    pub fn is_empty(&self) -> bool {
159        self.len() == 0
160    }
161}
162
163impl Default for MultiSink {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169impl AgentEventSink for MultiSink {
170    fn handle_event(&self, event: &AgentEvent) {
171        // Deliberate: snapshot then release the lock before invoking sink
172        // callbacks. Sinks can re-enter the event system (e.g. a host
173        // sink that logs to another AgentEvent path), so holding the
174        // mutex across the callback would risk self-deadlock. Arc clones
175        // are refcount bumps — cheap.
176        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
177        for sink in sinks {
178            sink.handle_event(event);
179        }
180    }
181}
182
183type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
184
185fn external_sinks() -> &'static ExternalSinkRegistry {
186    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
187    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
188}
189
190// Closure subscribers are stored thread-local because `VmValue`
191// contains `Rc`, which is neither `Send` nor `Sync`. The agent loop
192// runs on a single tokio current-thread worker, so all subscribe /
193// emit calls happen on the same thread — thread-local storage is
194// correct here.
195thread_local! {
196    static CLOSURE_SUBSCRIBERS: RefCell<HashMap<String, Vec<VmValue>>> =
197        RefCell::new(HashMap::new());
198}
199
200pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
201    let session_id = session_id.into();
202    let mut reg = external_sinks().write().expect("sink registry poisoned");
203    reg.entry(session_id).or_default().push(sink);
204}
205
206pub fn register_closure_subscriber(session_id: impl Into<String>, closure: VmValue) {
207    let session_id = session_id.into();
208    CLOSURE_SUBSCRIBERS.with(|reg| {
209        reg.borrow_mut()
210            .entry(session_id)
211            .or_default()
212            .push(closure);
213    });
214}
215
216pub fn closure_subscribers_for(session_id: &str) -> Vec<VmValue> {
217    CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).cloned().unwrap_or_default())
218}
219
220pub fn clear_session_sinks(session_id: &str) {
221    external_sinks()
222        .write()
223        .expect("sink registry poisoned")
224        .remove(session_id);
225    CLOSURE_SUBSCRIBERS.with(|reg| {
226        reg.borrow_mut().remove(session_id);
227    });
228}
229
230pub fn reset_all_sinks() {
231    external_sinks()
232        .write()
233        .expect("sink registry poisoned")
234        .clear();
235    CLOSURE_SUBSCRIBERS.with(|reg| {
236        reg.borrow_mut().clear();
237    });
238}
239
240/// Emit an event to external sinks registered for this session. Pipeline
241/// closure subscribers are NOT called by this function — the agent
242/// loop owns that path because it needs its async VM context.
243pub fn emit_event(event: &AgentEvent) {
244    let sinks: Vec<Arc<dyn AgentEventSink>> = {
245        let reg = external_sinks().read().expect("sink registry poisoned");
246        reg.get(event.session_id()).cloned().unwrap_or_default()
247    };
248    for sink in sinks {
249        sink.handle_event(event);
250    }
251}
252
253pub fn session_external_sink_count(session_id: &str) -> usize {
254    external_sinks()
255        .read()
256        .expect("sink registry poisoned")
257        .get(session_id)
258        .map(|v| v.len())
259        .unwrap_or(0)
260}
261
262pub fn session_closure_subscriber_count(session_id: &str) -> usize {
263    CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).map(|v| v.len()).unwrap_or(0))
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use std::sync::atomic::{AtomicUsize, Ordering};
270
271    struct CountingSink(Arc<AtomicUsize>);
272    impl AgentEventSink for CountingSink {
273        fn handle_event(&self, _event: &AgentEvent) {
274            self.0.fetch_add(1, Ordering::SeqCst);
275        }
276    }
277
278    #[test]
279    fn multi_sink_fans_out_in_order() {
280        let multi = MultiSink::new();
281        let a = Arc::new(AtomicUsize::new(0));
282        let b = Arc::new(AtomicUsize::new(0));
283        multi.push(Arc::new(CountingSink(a.clone())));
284        multi.push(Arc::new(CountingSink(b.clone())));
285        let event = AgentEvent::TurnStart {
286            session_id: "s1".into(),
287            iteration: 1,
288        };
289        multi.handle_event(&event);
290        assert_eq!(a.load(Ordering::SeqCst), 1);
291        assert_eq!(b.load(Ordering::SeqCst), 1);
292    }
293
294    #[test]
295    fn session_scoped_sink_routing() {
296        reset_all_sinks();
297        let a = Arc::new(AtomicUsize::new(0));
298        let b = Arc::new(AtomicUsize::new(0));
299        register_sink("session-a", Arc::new(CountingSink(a.clone())));
300        register_sink("session-b", Arc::new(CountingSink(b.clone())));
301        emit_event(&AgentEvent::TurnStart {
302            session_id: "session-a".into(),
303            iteration: 0,
304        });
305        assert_eq!(a.load(Ordering::SeqCst), 1);
306        assert_eq!(b.load(Ordering::SeqCst), 0);
307        emit_event(&AgentEvent::TurnEnd {
308            session_id: "session-b".into(),
309            iteration: 0,
310            turn_info: serde_json::json!({}),
311        });
312        assert_eq!(a.load(Ordering::SeqCst), 1);
313        assert_eq!(b.load(Ordering::SeqCst), 1);
314        clear_session_sinks("session-a");
315        assert_eq!(session_external_sink_count("session-a"), 0);
316        assert_eq!(session_external_sink_count("session-b"), 1);
317        reset_all_sinks();
318    }
319
320    #[test]
321    fn tool_call_status_serde() {
322        assert_eq!(
323            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
324            "\"pending\""
325        );
326        assert_eq!(
327            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
328            "\"in_progress\""
329        );
330        assert_eq!(
331            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
332            "\"completed\""
333        );
334        assert_eq!(
335            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
336            "\"failed\""
337        );
338    }
339}