Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
29#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ToolCallStatus {
32    /// Dispatched by the model but not yet started.
33    Pending,
34    /// Dispatch is actively running.
35    InProgress,
36    /// Finished successfully.
37    Completed,
38    /// Finished with an error.
39    Failed,
40}
41
42/// Events emitted by the agent loop. The first five variants map 1:1
43/// to ACP `sessionUpdate` variants; the last three are harn-internal.
44#[derive(Clone, Debug, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum AgentEvent {
47    AgentMessageChunk {
48        session_id: String,
49        content: String,
50    },
51    AgentThoughtChunk {
52        session_id: String,
53        content: String,
54    },
55    ToolCall {
56        session_id: String,
57        tool_call_id: String,
58        tool_name: String,
59        kind: Option<ToolKind>,
60        status: ToolCallStatus,
61        raw_input: serde_json::Value,
62    },
63    ToolCallUpdate {
64        session_id: String,
65        tool_call_id: String,
66        tool_name: String,
67        status: ToolCallStatus,
68        raw_output: Option<serde_json::Value>,
69        error: Option<String>,
70    },
71    Plan {
72        session_id: String,
73        plan: serde_json::Value,
74    },
75    TurnStart {
76        session_id: String,
77        iteration: usize,
78    },
79    TurnEnd {
80        session_id: String,
81        iteration: usize,
82        turn_info: serde_json::Value,
83    },
84    FeedbackInjected {
85        session_id: String,
86        kind: String,
87        content: String,
88    },
89    /// Emitted when the agent loop exhausts `max_iterations` without any
90    /// explicit break condition firing. Distinct from a natural "done" or
91    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
92    BudgetExhausted {
93        session_id: String,
94        max_iterations: usize,
95    },
96    /// Emitted when the loop breaks because consecutive text-only turns
97    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
98    /// hosts that key off agent-terminal events.
99    LoopStuck {
100        session_id: String,
101        max_nudges: usize,
102        last_iteration: usize,
103        tail_excerpt: String,
104    },
105    /// Emitted when the daemon idle-wait loop trips its watchdog because
106    /// every configured wake source returned `None` for N consecutive
107    /// attempts. Exists so a broken daemon doesn't hang the session
108    /// silently.
109    DaemonWatchdogTripped {
110        session_id: String,
111        attempts: usize,
112        elapsed_ms: u64,
113    },
114    /// Emitted when a skill is activated. Carries the match reason so
115    /// replayers can reconstruct *why* a given skill took effect at
116    /// this iteration.
117    SkillActivated {
118        session_id: String,
119        skill_name: String,
120        iteration: usize,
121        reason: String,
122    },
123    /// Emitted when a previously-active skill is deactivated because
124    /// the reassess phase no longer matches it.
125    SkillDeactivated {
126        session_id: String,
127        skill_name: String,
128        iteration: usize,
129    },
130    /// Emitted once per activation when the skill's `allowed_tools` filter
131    /// narrows the effective tool surface exposed to the model.
132    SkillScopeTools {
133        session_id: String,
134        skill_name: String,
135        allowed_tools: Vec<String>,
136    },
137}
138
139impl AgentEvent {
140    pub fn session_id(&self) -> &str {
141        match self {
142            Self::AgentMessageChunk { session_id, .. }
143            | Self::AgentThoughtChunk { session_id, .. }
144            | Self::ToolCall { session_id, .. }
145            | Self::ToolCallUpdate { session_id, .. }
146            | Self::Plan { session_id, .. }
147            | Self::TurnStart { session_id, .. }
148            | Self::TurnEnd { session_id, .. }
149            | Self::FeedbackInjected { session_id, .. }
150            | Self::BudgetExhausted { session_id, .. }
151            | Self::LoopStuck { session_id, .. }
152            | Self::DaemonWatchdogTripped { session_id, .. }
153            | Self::SkillActivated { session_id, .. }
154            | Self::SkillDeactivated { session_id, .. }
155            | Self::SkillScopeTools { session_id, .. } => session_id,
156        }
157    }
158}
159
160/// External consumers of the event stream (e.g. the harn-cli ACP server,
161/// which translates events into JSON-RPC notifications).
162pub trait AgentEventSink: Send + Sync {
163    fn handle_event(&self, event: &AgentEvent);
164}
165
166/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
167/// `AgentEvent` with monotonic index + timestamp + frame depth so
168/// replay engines can reconstruct paused state at any event index,
169/// and scrubber UIs can bucket events by time. The envelope is the
170/// on-disk shape; the wire format for live consumers is still the
171/// raw `AgentEvent` so existing sinks don't churn.
172#[derive(Clone, Debug, Serialize, Deserialize)]
173pub struct PersistedAgentEvent {
174    /// Monotonic per-session index starting at 0. Unique within a
175    /// session; gaps never happen even under load because the sink
176    /// owns the counter under a mutex.
177    pub index: u64,
178    /// Milliseconds since the Unix epoch, captured when the sink
179    /// received the event. Not the event's emission time — that
180    /// would require threading a clock through every emit site.
181    pub emitted_at_ms: i64,
182    /// Call-stack depth at the moment of emission, when the caller
183    /// can supply it. `None` for events emitted from a context where
184    /// the VM frame stack isn't available.
185    pub frame_depth: Option<u32>,
186    /// The raw event, flattened so `jq '.type'` works as expected.
187    #[serde(flatten)]
188    pub event: AgentEvent,
189}
190
191/// Append-only JSONL sink for a single session's event stream (#103).
192/// One writer per session; sinks rotate to a numbered suffix when a
193/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
194/// sessions rarely exceed 5 MB, so rotation almost never fires).
195pub struct JsonlEventSink {
196    state: Mutex<JsonlEventSinkState>,
197    base_path: std::path::PathBuf,
198}
199
200struct JsonlEventSinkState {
201    writer: std::io::BufWriter<std::fs::File>,
202    index: u64,
203    bytes_written: u64,
204    rotation: u32,
205}
206
207impl JsonlEventSink {
208    /// Hard cap past which the current file rotates to a numbered
209    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
210    /// sessions don't produce unreadable multi-GB logs.
211    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
212
213    /// Open a new sink writing to `base_path`. Creates parent dirs
214    /// if missing. Overwrites an existing file so each fresh session
215    /// starts from index 0.
216    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
217        let base_path = base_path.into();
218        if let Some(parent) = base_path.parent() {
219            std::fs::create_dir_all(parent)?;
220        }
221        let file = std::fs::OpenOptions::new()
222            .create(true)
223            .truncate(true)
224            .write(true)
225            .open(&base_path)?;
226        Ok(Arc::new(Self {
227            state: Mutex::new(JsonlEventSinkState {
228                writer: std::io::BufWriter::new(file),
229                index: 0,
230                bytes_written: 0,
231                rotation: 0,
232            }),
233            base_path,
234        }))
235    }
236
237    /// Flush any buffered writes. Called on session shutdown; the
238    /// Drop impl calls this too but on early panic it may not run.
239    pub fn flush(&self) -> std::io::Result<()> {
240        use std::io::Write as _;
241        self.state
242            .lock()
243            .expect("jsonl sink mutex poisoned")
244            .writer
245            .flush()
246    }
247
248    /// Current event index — primarily for tests and the "how many
249    /// events are in this run" run-record summary.
250    pub fn event_count(&self) -> u64 {
251        self.state.lock().expect("jsonl sink mutex poisoned").index
252    }
253
254    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
255        use std::io::Write as _;
256        if state.bytes_written < Self::ROTATE_BYTES {
257            return Ok(());
258        }
259        state.writer.flush()?;
260        state.rotation += 1;
261        let suffix = format!("-{:06}", state.rotation);
262        let rotated = self.base_path.with_file_name({
263            let stem = self
264                .base_path
265                .file_stem()
266                .and_then(|s| s.to_str())
267                .unwrap_or("event_log");
268            let ext = self
269                .base_path
270                .extension()
271                .and_then(|e| e.to_str())
272                .unwrap_or("jsonl");
273            format!("{stem}{suffix}.{ext}")
274        });
275        let file = std::fs::OpenOptions::new()
276            .create(true)
277            .truncate(true)
278            .write(true)
279            .open(&rotated)?;
280        state.writer = std::io::BufWriter::new(file);
281        state.bytes_written = 0;
282        Ok(())
283    }
284}
285
286impl AgentEventSink for JsonlEventSink {
287    fn handle_event(&self, event: &AgentEvent) {
288        use std::io::Write as _;
289        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
290        let index = state.index;
291        state.index += 1;
292        let emitted_at_ms = std::time::SystemTime::now()
293            .duration_since(std::time::UNIX_EPOCH)
294            .map(|d| d.as_millis() as i64)
295            .unwrap_or(0);
296        let envelope = PersistedAgentEvent {
297            index,
298            emitted_at_ms,
299            frame_depth: None,
300            event: event.clone(),
301        };
302        if let Ok(line) = serde_json::to_string(&envelope) {
303            // One line, newline-terminated — JSON Lines spec.
304            // Errors here are swallowed on purpose; a failing write
305            // must never crash the agent loop, and the run record
306            // itself is a secondary artifact.
307            let _ = state.writer.write_all(line.as_bytes());
308            let _ = state.writer.write_all(b"\n");
309            state.bytes_written += line.len() as u64 + 1;
310            let _ = self.rotate_if_needed(&mut state);
311        }
312    }
313}
314
315impl Drop for JsonlEventSink {
316    fn drop(&mut self) {
317        if let Ok(mut state) = self.state.lock() {
318            use std::io::Write as _;
319            let _ = state.writer.flush();
320        }
321    }
322}
323
324/// Fan-out helper for composing multiple external sinks.
325pub struct MultiSink {
326    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
327}
328
329impl MultiSink {
330    pub fn new() -> Self {
331        Self {
332            sinks: Mutex::new(Vec::new()),
333        }
334    }
335    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
336        self.sinks.lock().expect("sink mutex poisoned").push(sink);
337    }
338    pub fn len(&self) -> usize {
339        self.sinks.lock().expect("sink mutex poisoned").len()
340    }
341    pub fn is_empty(&self) -> bool {
342        self.len() == 0
343    }
344}
345
346impl Default for MultiSink {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352impl AgentEventSink for MultiSink {
353    fn handle_event(&self, event: &AgentEvent) {
354        // Deliberate: snapshot then release the lock before invoking sink
355        // callbacks. Sinks can re-enter the event system (e.g. a host
356        // sink that logs to another AgentEvent path), so holding the
357        // mutex across the callback would risk self-deadlock. Arc clones
358        // are refcount bumps — cheap.
359        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
360        for sink in sinks {
361            sink.handle_event(event);
362        }
363    }
364}
365
366type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
367
368fn external_sinks() -> &'static ExternalSinkRegistry {
369    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
370    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
371}
372
373pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
374    let session_id = session_id.into();
375    let mut reg = external_sinks().write().expect("sink registry poisoned");
376    reg.entry(session_id).or_default().push(sink);
377}
378
379/// Remove all external sinks registered for `session_id`. Does NOT
380/// close the session itself — subscribers and transcript survive, so a
381/// later `agent_loop` call with the same id continues the conversation.
382pub fn clear_session_sinks(session_id: &str) {
383    external_sinks()
384        .write()
385        .expect("sink registry poisoned")
386        .remove(session_id);
387}
388
389pub fn reset_all_sinks() {
390    external_sinks()
391        .write()
392        .expect("sink registry poisoned")
393        .clear();
394    crate::agent_sessions::reset_session_store();
395}
396
397/// Emit an event to external sinks registered for this session. Pipeline
398/// closure subscribers are NOT called by this function — the agent
399/// loop owns that path because it needs its async VM context.
400pub fn emit_event(event: &AgentEvent) {
401    let sinks: Vec<Arc<dyn AgentEventSink>> = {
402        let reg = external_sinks().read().expect("sink registry poisoned");
403        reg.get(event.session_id()).cloned().unwrap_or_default()
404    };
405    for sink in sinks {
406        sink.handle_event(event);
407    }
408}
409
410pub fn session_external_sink_count(session_id: &str) -> usize {
411    external_sinks()
412        .read()
413        .expect("sink registry poisoned")
414        .get(session_id)
415        .map(|v| v.len())
416        .unwrap_or(0)
417}
418
419pub fn session_closure_subscriber_count(session_id: &str) -> usize {
420    crate::agent_sessions::subscriber_count(session_id)
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use std::sync::atomic::{AtomicUsize, Ordering};
427
428    struct CountingSink(Arc<AtomicUsize>);
429    impl AgentEventSink for CountingSink {
430        fn handle_event(&self, _event: &AgentEvent) {
431            self.0.fetch_add(1, Ordering::SeqCst);
432        }
433    }
434
435    #[test]
436    fn multi_sink_fans_out_in_order() {
437        let multi = MultiSink::new();
438        let a = Arc::new(AtomicUsize::new(0));
439        let b = Arc::new(AtomicUsize::new(0));
440        multi.push(Arc::new(CountingSink(a.clone())));
441        multi.push(Arc::new(CountingSink(b.clone())));
442        let event = AgentEvent::TurnStart {
443            session_id: "s1".into(),
444            iteration: 1,
445        };
446        multi.handle_event(&event);
447        assert_eq!(a.load(Ordering::SeqCst), 1);
448        assert_eq!(b.load(Ordering::SeqCst), 1);
449    }
450
451    #[test]
452    fn session_scoped_sink_routing() {
453        reset_all_sinks();
454        let a = Arc::new(AtomicUsize::new(0));
455        let b = Arc::new(AtomicUsize::new(0));
456        register_sink("session-a", Arc::new(CountingSink(a.clone())));
457        register_sink("session-b", Arc::new(CountingSink(b.clone())));
458        emit_event(&AgentEvent::TurnStart {
459            session_id: "session-a".into(),
460            iteration: 0,
461        });
462        assert_eq!(a.load(Ordering::SeqCst), 1);
463        assert_eq!(b.load(Ordering::SeqCst), 0);
464        emit_event(&AgentEvent::TurnEnd {
465            session_id: "session-b".into(),
466            iteration: 0,
467            turn_info: serde_json::json!({}),
468        });
469        assert_eq!(a.load(Ordering::SeqCst), 1);
470        assert_eq!(b.load(Ordering::SeqCst), 1);
471        clear_session_sinks("session-a");
472        assert_eq!(session_external_sink_count("session-a"), 0);
473        assert_eq!(session_external_sink_count("session-b"), 1);
474        reset_all_sinks();
475    }
476
477    #[test]
478    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
479        use std::io::{BufRead, BufReader};
480        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
481        std::fs::create_dir_all(&dir).unwrap();
482        let path = dir.join("event_log.jsonl");
483        let sink = JsonlEventSink::open(&path).unwrap();
484        for i in 0..5 {
485            sink.handle_event(&AgentEvent::TurnStart {
486                session_id: "s".into(),
487                iteration: i,
488            });
489        }
490        assert_eq!(sink.event_count(), 5);
491        sink.flush().unwrap();
492
493        // Read back + assert monotonic indices + non-decreasing timestamps.
494        let file = std::fs::File::open(&path).unwrap();
495        let mut last_idx: i64 = -1;
496        let mut last_ts: i64 = 0;
497        for line in BufReader::new(file).lines() {
498            let line = line.unwrap();
499            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
500            let idx = val["index"].as_i64().unwrap();
501            let ts = val["emitted_at_ms"].as_i64().unwrap();
502            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
503            assert!(ts >= last_ts, "timestamps must be non-decreasing");
504            last_idx = idx;
505            last_ts = ts;
506            // Event payload flattened — type tag must survive.
507            assert_eq!(val["type"], "turn_start");
508        }
509        assert_eq!(last_idx, 4);
510        let _ = std::fs::remove_file(&path);
511    }
512
513    #[test]
514    fn tool_call_status_serde() {
515        assert_eq!(
516            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
517            "\"pending\""
518        );
519        assert_eq!(
520            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
521            "\"in_progress\""
522        );
523        assert_eq!(
524            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
525            "\"completed\""
526        );
527        assert_eq!(
528            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
529            "\"failed\""
530        );
531    }
532}