Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
29#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub enum ToolCallStatus {
32    /// Dispatched by the model but not yet started.
33    Pending,
34    /// Dispatch is actively running.
35    InProgress,
36    /// Finished successfully.
37    Completed,
38    /// Finished with an error.
39    Failed,
40}
41
42/// Events emitted by the agent loop. The first five variants map 1:1
43/// to ACP `sessionUpdate` variants; the last three are harn-internal.
44#[derive(Clone, Debug, Serialize, Deserialize)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum AgentEvent {
47    AgentMessageChunk {
48        session_id: String,
49        content: String,
50    },
51    AgentThoughtChunk {
52        session_id: String,
53        content: String,
54    },
55    ToolCall {
56        session_id: String,
57        tool_call_id: String,
58        tool_name: String,
59        kind: Option<ToolKind>,
60        status: ToolCallStatus,
61        raw_input: serde_json::Value,
62    },
63    ToolCallUpdate {
64        session_id: String,
65        tool_call_id: String,
66        tool_name: String,
67        status: ToolCallStatus,
68        raw_output: Option<serde_json::Value>,
69        error: Option<String>,
70    },
71    Plan {
72        session_id: String,
73        plan: serde_json::Value,
74    },
75    TurnStart {
76        session_id: String,
77        iteration: usize,
78    },
79    TurnEnd {
80        session_id: String,
81        iteration: usize,
82        turn_info: serde_json::Value,
83    },
84    FeedbackInjected {
85        session_id: String,
86        kind: String,
87        content: String,
88    },
89    /// Emitted when the agent loop exhausts `max_iterations` without any
90    /// explicit break condition firing. Distinct from a natural "done" or
91    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
92    BudgetExhausted {
93        session_id: String,
94        max_iterations: usize,
95    },
96    /// Emitted when the loop breaks because consecutive text-only turns
97    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
98    /// hosts that key off agent-terminal events.
99    LoopStuck {
100        session_id: String,
101        max_nudges: usize,
102        last_iteration: usize,
103        tail_excerpt: String,
104    },
105    /// Emitted when the daemon idle-wait loop trips its watchdog because
106    /// every configured wake source returned `None` for N consecutive
107    /// attempts. Exists so a broken daemon doesn't hang the session
108    /// silently.
109    DaemonWatchdogTripped {
110        session_id: String,
111        attempts: usize,
112        elapsed_ms: u64,
113    },
114    /// Emitted when a skill is activated. Carries the match reason so
115    /// replayers can reconstruct *why* a given skill took effect at
116    /// this iteration.
117    SkillActivated {
118        session_id: String,
119        skill_name: String,
120        iteration: usize,
121        reason: String,
122    },
123    /// Emitted when a previously-active skill is deactivated because
124    /// the reassess phase no longer matches it.
125    SkillDeactivated {
126        session_id: String,
127        skill_name: String,
128        iteration: usize,
129    },
130    /// Emitted once per activation when the skill's `allowed_tools` filter
131    /// narrows the effective tool surface exposed to the model.
132    SkillScopeTools {
133        session_id: String,
134        skill_name: String,
135        allowed_tools: Vec<String>,
136    },
137    /// Emitted when a `tool_search` query is issued by the model. Carries
138    /// the raw query args, the configured strategy, and a `mode` tag
139    /// distinguishing the client-executed fallback (`"client"`) from
140    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
141    /// transcript event shape so IDEs can render a search-in-progress
142    /// chip in real time — the replay path walks the transcript after
143    /// the turn, which is too late for live UX.
144    ToolSearchQuery {
145        session_id: String,
146        tool_use_id: String,
147        name: String,
148        query: serde_json::Value,
149        strategy: String,
150        mode: String,
151    },
152    /// Emitted when `tool_search` resolves — carries the list of tool
153    /// names newly promoted into the model's effective surface for the
154    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
155    ToolSearchResult {
156        session_id: String,
157        tool_use_id: String,
158        promoted: Vec<String>,
159        strategy: String,
160        mode: String,
161    },
162}
163
164impl AgentEvent {
165    pub fn session_id(&self) -> &str {
166        match self {
167            Self::AgentMessageChunk { session_id, .. }
168            | Self::AgentThoughtChunk { session_id, .. }
169            | Self::ToolCall { session_id, .. }
170            | Self::ToolCallUpdate { session_id, .. }
171            | Self::Plan { session_id, .. }
172            | Self::TurnStart { session_id, .. }
173            | Self::TurnEnd { session_id, .. }
174            | Self::FeedbackInjected { session_id, .. }
175            | Self::BudgetExhausted { session_id, .. }
176            | Self::LoopStuck { session_id, .. }
177            | Self::DaemonWatchdogTripped { session_id, .. }
178            | Self::SkillActivated { session_id, .. }
179            | Self::SkillDeactivated { session_id, .. }
180            | Self::SkillScopeTools { session_id, .. }
181            | Self::ToolSearchQuery { session_id, .. }
182            | Self::ToolSearchResult { session_id, .. } => session_id,
183        }
184    }
185}
186
187/// External consumers of the event stream (e.g. the harn-cli ACP server,
188/// which translates events into JSON-RPC notifications).
189pub trait AgentEventSink: Send + Sync {
190    fn handle_event(&self, event: &AgentEvent);
191}
192
193/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
194/// `AgentEvent` with monotonic index + timestamp + frame depth so
195/// replay engines can reconstruct paused state at any event index,
196/// and scrubber UIs can bucket events by time. The envelope is the
197/// on-disk shape; the wire format for live consumers is still the
198/// raw `AgentEvent` so existing sinks don't churn.
199#[derive(Clone, Debug, Serialize, Deserialize)]
200pub struct PersistedAgentEvent {
201    /// Monotonic per-session index starting at 0. Unique within a
202    /// session; gaps never happen even under load because the sink
203    /// owns the counter under a mutex.
204    pub index: u64,
205    /// Milliseconds since the Unix epoch, captured when the sink
206    /// received the event. Not the event's emission time — that
207    /// would require threading a clock through every emit site.
208    pub emitted_at_ms: i64,
209    /// Call-stack depth at the moment of emission, when the caller
210    /// can supply it. `None` for events emitted from a context where
211    /// the VM frame stack isn't available.
212    pub frame_depth: Option<u32>,
213    /// The raw event, flattened so `jq '.type'` works as expected.
214    #[serde(flatten)]
215    pub event: AgentEvent,
216}
217
218/// Append-only JSONL sink for a single session's event stream (#103).
219/// One writer per session; sinks rotate to a numbered suffix when a
220/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
221/// sessions rarely exceed 5 MB, so rotation almost never fires).
222pub struct JsonlEventSink {
223    state: Mutex<JsonlEventSinkState>,
224    base_path: std::path::PathBuf,
225}
226
227struct JsonlEventSinkState {
228    writer: std::io::BufWriter<std::fs::File>,
229    index: u64,
230    bytes_written: u64,
231    rotation: u32,
232}
233
234impl JsonlEventSink {
235    /// Hard cap past which the current file rotates to a numbered
236    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
237    /// sessions don't produce unreadable multi-GB logs.
238    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
239
240    /// Open a new sink writing to `base_path`. Creates parent dirs
241    /// if missing. Overwrites an existing file so each fresh session
242    /// starts from index 0.
243    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
244        let base_path = base_path.into();
245        if let Some(parent) = base_path.parent() {
246            std::fs::create_dir_all(parent)?;
247        }
248        let file = std::fs::OpenOptions::new()
249            .create(true)
250            .truncate(true)
251            .write(true)
252            .open(&base_path)?;
253        Ok(Arc::new(Self {
254            state: Mutex::new(JsonlEventSinkState {
255                writer: std::io::BufWriter::new(file),
256                index: 0,
257                bytes_written: 0,
258                rotation: 0,
259            }),
260            base_path,
261        }))
262    }
263
264    /// Flush any buffered writes. Called on session shutdown; the
265    /// Drop impl calls this too but on early panic it may not run.
266    pub fn flush(&self) -> std::io::Result<()> {
267        use std::io::Write as _;
268        self.state
269            .lock()
270            .expect("jsonl sink mutex poisoned")
271            .writer
272            .flush()
273    }
274
275    /// Current event index — primarily for tests and the "how many
276    /// events are in this run" run-record summary.
277    pub fn event_count(&self) -> u64 {
278        self.state.lock().expect("jsonl sink mutex poisoned").index
279    }
280
281    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
282        use std::io::Write as _;
283        if state.bytes_written < Self::ROTATE_BYTES {
284            return Ok(());
285        }
286        state.writer.flush()?;
287        state.rotation += 1;
288        let suffix = format!("-{:06}", state.rotation);
289        let rotated = self.base_path.with_file_name({
290            let stem = self
291                .base_path
292                .file_stem()
293                .and_then(|s| s.to_str())
294                .unwrap_or("event_log");
295            let ext = self
296                .base_path
297                .extension()
298                .and_then(|e| e.to_str())
299                .unwrap_or("jsonl");
300            format!("{stem}{suffix}.{ext}")
301        });
302        let file = std::fs::OpenOptions::new()
303            .create(true)
304            .truncate(true)
305            .write(true)
306            .open(&rotated)?;
307        state.writer = std::io::BufWriter::new(file);
308        state.bytes_written = 0;
309        Ok(())
310    }
311}
312
313impl AgentEventSink for JsonlEventSink {
314    fn handle_event(&self, event: &AgentEvent) {
315        use std::io::Write as _;
316        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
317        let index = state.index;
318        state.index += 1;
319        let emitted_at_ms = std::time::SystemTime::now()
320            .duration_since(std::time::UNIX_EPOCH)
321            .map(|d| d.as_millis() as i64)
322            .unwrap_or(0);
323        let envelope = PersistedAgentEvent {
324            index,
325            emitted_at_ms,
326            frame_depth: None,
327            event: event.clone(),
328        };
329        if let Ok(line) = serde_json::to_string(&envelope) {
330            // One line, newline-terminated — JSON Lines spec.
331            // Errors here are swallowed on purpose; a failing write
332            // must never crash the agent loop, and the run record
333            // itself is a secondary artifact.
334            let _ = state.writer.write_all(line.as_bytes());
335            let _ = state.writer.write_all(b"\n");
336            state.bytes_written += line.len() as u64 + 1;
337            let _ = self.rotate_if_needed(&mut state);
338        }
339    }
340}
341
342impl Drop for JsonlEventSink {
343    fn drop(&mut self) {
344        if let Ok(mut state) = self.state.lock() {
345            use std::io::Write as _;
346            let _ = state.writer.flush();
347        }
348    }
349}
350
351/// Fan-out helper for composing multiple external sinks.
352pub struct MultiSink {
353    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
354}
355
356impl MultiSink {
357    pub fn new() -> Self {
358        Self {
359            sinks: Mutex::new(Vec::new()),
360        }
361    }
362    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
363        self.sinks.lock().expect("sink mutex poisoned").push(sink);
364    }
365    pub fn len(&self) -> usize {
366        self.sinks.lock().expect("sink mutex poisoned").len()
367    }
368    pub fn is_empty(&self) -> bool {
369        self.len() == 0
370    }
371}
372
373impl Default for MultiSink {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379impl AgentEventSink for MultiSink {
380    fn handle_event(&self, event: &AgentEvent) {
381        // Deliberate: snapshot then release the lock before invoking sink
382        // callbacks. Sinks can re-enter the event system (e.g. a host
383        // sink that logs to another AgentEvent path), so holding the
384        // mutex across the callback would risk self-deadlock. Arc clones
385        // are refcount bumps — cheap.
386        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
387        for sink in sinks {
388            sink.handle_event(event);
389        }
390    }
391}
392
393type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
394
395fn external_sinks() -> &'static ExternalSinkRegistry {
396    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
397    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
398}
399
400pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
401    let session_id = session_id.into();
402    let mut reg = external_sinks().write().expect("sink registry poisoned");
403    reg.entry(session_id).or_default().push(sink);
404}
405
406/// Remove all external sinks registered for `session_id`. Does NOT
407/// close the session itself — subscribers and transcript survive, so a
408/// later `agent_loop` call with the same id continues the conversation.
409pub fn clear_session_sinks(session_id: &str) {
410    external_sinks()
411        .write()
412        .expect("sink registry poisoned")
413        .remove(session_id);
414}
415
416pub fn reset_all_sinks() {
417    external_sinks()
418        .write()
419        .expect("sink registry poisoned")
420        .clear();
421    crate::agent_sessions::reset_session_store();
422}
423
424/// Emit an event to external sinks registered for this session. Pipeline
425/// closure subscribers are NOT called by this function — the agent
426/// loop owns that path because it needs its async VM context.
427pub fn emit_event(event: &AgentEvent) {
428    let sinks: Vec<Arc<dyn AgentEventSink>> = {
429        let reg = external_sinks().read().expect("sink registry poisoned");
430        reg.get(event.session_id()).cloned().unwrap_or_default()
431    };
432    for sink in sinks {
433        sink.handle_event(event);
434    }
435}
436
437pub fn session_external_sink_count(session_id: &str) -> usize {
438    external_sinks()
439        .read()
440        .expect("sink registry poisoned")
441        .get(session_id)
442        .map(|v| v.len())
443        .unwrap_or(0)
444}
445
446pub fn session_closure_subscriber_count(session_id: &str) -> usize {
447    crate::agent_sessions::subscriber_count(session_id)
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use std::sync::atomic::{AtomicUsize, Ordering};
454
455    struct CountingSink(Arc<AtomicUsize>);
456    impl AgentEventSink for CountingSink {
457        fn handle_event(&self, _event: &AgentEvent) {
458            self.0.fetch_add(1, Ordering::SeqCst);
459        }
460    }
461
462    #[test]
463    fn multi_sink_fans_out_in_order() {
464        let multi = MultiSink::new();
465        let a = Arc::new(AtomicUsize::new(0));
466        let b = Arc::new(AtomicUsize::new(0));
467        multi.push(Arc::new(CountingSink(a.clone())));
468        multi.push(Arc::new(CountingSink(b.clone())));
469        let event = AgentEvent::TurnStart {
470            session_id: "s1".into(),
471            iteration: 1,
472        };
473        multi.handle_event(&event);
474        assert_eq!(a.load(Ordering::SeqCst), 1);
475        assert_eq!(b.load(Ordering::SeqCst), 1);
476    }
477
478    #[test]
479    fn session_scoped_sink_routing() {
480        reset_all_sinks();
481        let a = Arc::new(AtomicUsize::new(0));
482        let b = Arc::new(AtomicUsize::new(0));
483        register_sink("session-a", Arc::new(CountingSink(a.clone())));
484        register_sink("session-b", Arc::new(CountingSink(b.clone())));
485        emit_event(&AgentEvent::TurnStart {
486            session_id: "session-a".into(),
487            iteration: 0,
488        });
489        assert_eq!(a.load(Ordering::SeqCst), 1);
490        assert_eq!(b.load(Ordering::SeqCst), 0);
491        emit_event(&AgentEvent::TurnEnd {
492            session_id: "session-b".into(),
493            iteration: 0,
494            turn_info: serde_json::json!({}),
495        });
496        assert_eq!(a.load(Ordering::SeqCst), 1);
497        assert_eq!(b.load(Ordering::SeqCst), 1);
498        clear_session_sinks("session-a");
499        assert_eq!(session_external_sink_count("session-a"), 0);
500        assert_eq!(session_external_sink_count("session-b"), 1);
501        reset_all_sinks();
502    }
503
504    #[test]
505    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
506        use std::io::{BufRead, BufReader};
507        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
508        std::fs::create_dir_all(&dir).unwrap();
509        let path = dir.join("event_log.jsonl");
510        let sink = JsonlEventSink::open(&path).unwrap();
511        for i in 0..5 {
512            sink.handle_event(&AgentEvent::TurnStart {
513                session_id: "s".into(),
514                iteration: i,
515            });
516        }
517        assert_eq!(sink.event_count(), 5);
518        sink.flush().unwrap();
519
520        // Read back + assert monotonic indices + non-decreasing timestamps.
521        let file = std::fs::File::open(&path).unwrap();
522        let mut last_idx: i64 = -1;
523        let mut last_ts: i64 = 0;
524        for line in BufReader::new(file).lines() {
525            let line = line.unwrap();
526            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
527            let idx = val["index"].as_i64().unwrap();
528            let ts = val["emitted_at_ms"].as_i64().unwrap();
529            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
530            assert!(ts >= last_ts, "timestamps must be non-decreasing");
531            last_idx = idx;
532            last_ts = ts;
533            // Event payload flattened — type tag must survive.
534            assert_eq!(val["type"], "turn_start");
535        }
536        assert_eq!(last_idx, 4);
537        let _ = std::fs::remove_file(&path);
538    }
539
540    #[test]
541    fn tool_call_status_serde() {
542        assert_eq!(
543            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
544            "\"pending\""
545        );
546        assert_eq!(
547            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
548            "\"in_progress\""
549        );
550        assert_eq!(
551            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
552            "\"completed\""
553        );
554        assert_eq!(
555            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
556            "\"failed\""
557        );
558    }
559}