Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//! 2. **Closure subscribers** — `.harn` closures registered via the
15//!    `agent_subscribe(session_id, callback)` host builtin. Stored as
16//!    raw `VmValue`s and invoked by the agent loop in its async VM
17//!    context (the sink trait can't easily await, so this path
18//!    intentionally bypasses it).
19
20use std::cell::RefCell;
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27use crate::value::VmValue;
28
29/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum ToolCallStatus {
33    /// Dispatched by the model but not yet started.
34    Pending,
35    /// Dispatch is actively running.
36    InProgress,
37    /// Finished successfully.
38    Completed,
39    /// Finished with an error.
40    Failed,
41}
42
43/// Events emitted by the agent loop. The first five variants map 1:1
44/// to ACP `sessionUpdate` variants; the last three are harn-internal.
45#[derive(Clone, Debug, Serialize, Deserialize)]
46#[serde(tag = "type", rename_all = "snake_case")]
47pub enum AgentEvent {
48    AgentMessageChunk {
49        session_id: String,
50        content: String,
51    },
52    AgentThoughtChunk {
53        session_id: String,
54        content: String,
55    },
56    ToolCall {
57        session_id: String,
58        tool_call_id: String,
59        tool_name: String,
60        kind: Option<ToolKind>,
61        status: ToolCallStatus,
62        raw_input: serde_json::Value,
63    },
64    ToolCallUpdate {
65        session_id: String,
66        tool_call_id: String,
67        tool_name: String,
68        status: ToolCallStatus,
69        raw_output: Option<serde_json::Value>,
70        error: Option<String>,
71    },
72    Plan {
73        session_id: String,
74        plan: serde_json::Value,
75    },
76    // ── Internal (no direct ACP equivalent) ────────────────────────
77    TurnStart {
78        session_id: String,
79        iteration: usize,
80    },
81    TurnEnd {
82        session_id: String,
83        iteration: usize,
84        turn_info: serde_json::Value,
85    },
86    FeedbackInjected {
87        session_id: String,
88        kind: String,
89        content: String,
90    },
91    /// Emitted when the agent loop exhausts `max_iterations` without any
92    /// explicit break condition firing. Distinct from a natural "done" or
93    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
94    BudgetExhausted {
95        session_id: String,
96        max_iterations: usize,
97    },
98    /// Emitted when the loop breaks because consecutive text-only turns
99    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
100    /// hosts that key off agent-terminal events.
101    LoopStuck {
102        session_id: String,
103        max_nudges: usize,
104        last_iteration: usize,
105        tail_excerpt: String,
106    },
107    /// Emitted when the daemon idle-wait loop trips its watchdog because
108    /// every configured wake source returned `None` for N consecutive
109    /// attempts. Exists so a broken daemon doesn't hang the session
110    /// silently.
111    DaemonWatchdogTripped {
112        session_id: String,
113        attempts: usize,
114        elapsed_ms: u64,
115    },
116}
117
118impl AgentEvent {
119    pub fn session_id(&self) -> &str {
120        match self {
121            Self::AgentMessageChunk { session_id, .. }
122            | Self::AgentThoughtChunk { session_id, .. }
123            | Self::ToolCall { session_id, .. }
124            | Self::ToolCallUpdate { session_id, .. }
125            | Self::Plan { session_id, .. }
126            | Self::TurnStart { session_id, .. }
127            | Self::TurnEnd { session_id, .. }
128            | Self::FeedbackInjected { session_id, .. }
129            | Self::BudgetExhausted { session_id, .. }
130            | Self::LoopStuck { session_id, .. }
131            | Self::DaemonWatchdogTripped { session_id, .. } => session_id,
132        }
133    }
134}
135
136/// External consumers of the event stream (e.g. the harn-cli ACP server,
137/// which translates events into JSON-RPC notifications).
138pub trait AgentEventSink: Send + Sync {
139    fn handle_event(&self, event: &AgentEvent);
140}
141
142/// Fan-out helper for composing multiple external sinks.
143pub struct MultiSink {
144    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
145}
146
147impl MultiSink {
148    pub fn new() -> Self {
149        Self {
150            sinks: Mutex::new(Vec::new()),
151        }
152    }
153    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
154        self.sinks.lock().expect("sink mutex poisoned").push(sink);
155    }
156    pub fn len(&self) -> usize {
157        self.sinks.lock().expect("sink mutex poisoned").len()
158    }
159    pub fn is_empty(&self) -> bool {
160        self.len() == 0
161    }
162}
163
164impl Default for MultiSink {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170impl AgentEventSink for MultiSink {
171    fn handle_event(&self, event: &AgentEvent) {
172        // Deliberate: snapshot then release the lock before invoking sink
173        // callbacks. Sinks can re-enter the event system (e.g. a host
174        // sink that logs to another AgentEvent path), so holding the
175        // mutex across the callback would risk self-deadlock. Arc clones
176        // are refcount bumps — cheap.
177        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
178        for sink in sinks {
179            sink.handle_event(event);
180        }
181    }
182}
183
184// ── Registries ──────────────────────────────────────────────────────
185
186type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
187
188fn external_sinks() -> &'static ExternalSinkRegistry {
189    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
190    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
191}
192
193// Closure subscribers are stored thread-local because `VmValue`
194// contains `Rc`, which is neither `Send` nor `Sync`. The agent loop
195// runs on a single tokio current-thread worker, so all subscribe /
196// emit calls happen on the same thread — thread-local storage is
197// correct here.
198thread_local! {
199    static CLOSURE_SUBSCRIBERS: RefCell<HashMap<String, Vec<VmValue>>> =
200        RefCell::new(HashMap::new());
201}
202
203// Register an external (Rust-side) sink for a session.
204pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
205    let session_id = session_id.into();
206    let mut reg = external_sinks().write().expect("sink registry poisoned");
207    reg.entry(session_id).or_default().push(sink);
208}
209
210pub fn register_closure_subscriber(session_id: impl Into<String>, closure: VmValue) {
211    let session_id = session_id.into();
212    CLOSURE_SUBSCRIBERS.with(|reg| {
213        reg.borrow_mut()
214            .entry(session_id)
215            .or_default()
216            .push(closure);
217    });
218}
219
220pub fn closure_subscribers_for(session_id: &str) -> Vec<VmValue> {
221    CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).cloned().unwrap_or_default())
222}
223
224pub fn clear_session_sinks(session_id: &str) {
225    external_sinks()
226        .write()
227        .expect("sink registry poisoned")
228        .remove(session_id);
229    CLOSURE_SUBSCRIBERS.with(|reg| {
230        reg.borrow_mut().remove(session_id);
231    });
232}
233
234pub fn reset_all_sinks() {
235    external_sinks()
236        .write()
237        .expect("sink registry poisoned")
238        .clear();
239    CLOSURE_SUBSCRIBERS.with(|reg| {
240        reg.borrow_mut().clear();
241    });
242}
243
244/// Emit an event to external sinks registered for this session. Pipeline
245/// closure subscribers are NOT called by this function — the agent
246/// loop owns that path because it needs its async VM context.
247pub fn emit_event(event: &AgentEvent) {
248    let sinks: Vec<Arc<dyn AgentEventSink>> = {
249        let reg = external_sinks().read().expect("sink registry poisoned");
250        reg.get(event.session_id()).cloned().unwrap_or_default()
251    };
252    for sink in sinks {
253        sink.handle_event(event);
254    }
255}
256
257pub fn session_external_sink_count(session_id: &str) -> usize {
258    external_sinks()
259        .read()
260        .expect("sink registry poisoned")
261        .get(session_id)
262        .map(|v| v.len())
263        .unwrap_or(0)
264}
265
266pub fn session_closure_subscriber_count(session_id: &str) -> usize {
267    CLOSURE_SUBSCRIBERS.with(|reg| reg.borrow().get(session_id).map(|v| v.len()).unwrap_or(0))
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use std::sync::atomic::{AtomicUsize, Ordering};
274
275    struct CountingSink(Arc<AtomicUsize>);
276    impl AgentEventSink for CountingSink {
277        fn handle_event(&self, _event: &AgentEvent) {
278            self.0.fetch_add(1, Ordering::SeqCst);
279        }
280    }
281
282    #[test]
283    fn multi_sink_fans_out_in_order() {
284        let multi = MultiSink::new();
285        let a = Arc::new(AtomicUsize::new(0));
286        let b = Arc::new(AtomicUsize::new(0));
287        multi.push(Arc::new(CountingSink(a.clone())));
288        multi.push(Arc::new(CountingSink(b.clone())));
289        let event = AgentEvent::TurnStart {
290            session_id: "s1".into(),
291            iteration: 1,
292        };
293        multi.handle_event(&event);
294        assert_eq!(a.load(Ordering::SeqCst), 1);
295        assert_eq!(b.load(Ordering::SeqCst), 1);
296    }
297
298    #[test]
299    fn session_scoped_sink_routing() {
300        reset_all_sinks();
301        let a = Arc::new(AtomicUsize::new(0));
302        let b = Arc::new(AtomicUsize::new(0));
303        register_sink("session-a", Arc::new(CountingSink(a.clone())));
304        register_sink("session-b", Arc::new(CountingSink(b.clone())));
305        emit_event(&AgentEvent::TurnStart {
306            session_id: "session-a".into(),
307            iteration: 0,
308        });
309        assert_eq!(a.load(Ordering::SeqCst), 1);
310        assert_eq!(b.load(Ordering::SeqCst), 0);
311        emit_event(&AgentEvent::TurnEnd {
312            session_id: "session-b".into(),
313            iteration: 0,
314            turn_info: serde_json::json!({}),
315        });
316        assert_eq!(a.load(Ordering::SeqCst), 1);
317        assert_eq!(b.load(Ordering::SeqCst), 1);
318        clear_session_sinks("session-a");
319        assert_eq!(session_external_sink_count("session-a"), 0);
320        assert_eq!(session_external_sink_count("session-b"), 1);
321        reset_all_sinks();
322    }
323
324    #[test]
325    fn tool_call_status_serde() {
326        assert_eq!(
327            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
328            "\"pending\""
329        );
330        assert_eq!(
331            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
332            "\"in_progress\""
333        );
334        assert_eq!(
335            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
336            "\"completed\""
337        );
338        assert_eq!(
339            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
340            "\"failed\""
341        );
342    }
343}