Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28/// Typed worker lifecycle events emitted by delegated/background agent
29/// execution. Bridge-facing worker updates still derive a string status
30/// from these variants, but the runtime no longer passes raw status
31/// strings around internally.
32#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
33pub enum WorkerEvent {
34    WorkerSpawned,
35    WorkerCompleted,
36    WorkerFailed,
37    WorkerCancelled,
38}
39
40impl WorkerEvent {
41    pub fn as_status(self) -> &'static str {
42        match self {
43            Self::WorkerSpawned => "running",
44            Self::WorkerCompleted => "completed",
45            Self::WorkerFailed => "failed",
46            Self::WorkerCancelled => "cancelled",
47        }
48    }
49
50    pub fn as_str(self) -> &'static str {
51        match self {
52            Self::WorkerSpawned => "WorkerSpawned",
53            Self::WorkerCompleted => "WorkerCompleted",
54            Self::WorkerFailed => "WorkerFailed",
55            Self::WorkerCancelled => "WorkerCancelled",
56        }
57    }
58}
59
60/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
61#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum ToolCallStatus {
64    /// Dispatched by the model but not yet started.
65    Pending,
66    /// Dispatch is actively running.
67    InProgress,
68    /// Finished successfully.
69    Completed,
70    /// Finished with an error.
71    Failed,
72}
73
74/// Events emitted by the agent loop. The first five variants map 1:1
75/// to ACP `sessionUpdate` variants; the last three are harn-internal.
76#[derive(Clone, Debug, Serialize, Deserialize)]
77#[serde(tag = "type", rename_all = "snake_case")]
78pub enum AgentEvent {
79    AgentMessageChunk {
80        session_id: String,
81        content: String,
82    },
83    AgentThoughtChunk {
84        session_id: String,
85        content: String,
86    },
87    ToolCall {
88        session_id: String,
89        tool_call_id: String,
90        tool_name: String,
91        kind: Option<ToolKind>,
92        status: ToolCallStatus,
93        raw_input: serde_json::Value,
94    },
95    ToolCallUpdate {
96        session_id: String,
97        tool_call_id: String,
98        tool_name: String,
99        status: ToolCallStatus,
100        raw_output: Option<serde_json::Value>,
101        error: Option<String>,
102    },
103    Plan {
104        session_id: String,
105        plan: serde_json::Value,
106    },
107    TurnStart {
108        session_id: String,
109        iteration: usize,
110    },
111    TurnEnd {
112        session_id: String,
113        iteration: usize,
114        turn_info: serde_json::Value,
115    },
116    FeedbackInjected {
117        session_id: String,
118        kind: String,
119        content: String,
120    },
121    /// Emitted when the agent loop exhausts `max_iterations` without any
122    /// explicit break condition firing. Distinct from a natural "done" or
123    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
124    BudgetExhausted {
125        session_id: String,
126        max_iterations: usize,
127    },
128    /// Emitted when the loop breaks because consecutive text-only turns
129    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
130    /// hosts that key off agent-terminal events.
131    LoopStuck {
132        session_id: String,
133        max_nudges: usize,
134        last_iteration: usize,
135        tail_excerpt: String,
136    },
137    /// Emitted when the daemon idle-wait loop trips its watchdog because
138    /// every configured wake source returned `None` for N consecutive
139    /// attempts. Exists so a broken daemon doesn't hang the session
140    /// silently.
141    DaemonWatchdogTripped {
142        session_id: String,
143        attempts: usize,
144        elapsed_ms: u64,
145    },
146    /// Emitted when a skill is activated. Carries the match reason so
147    /// replayers can reconstruct *why* a given skill took effect at
148    /// this iteration.
149    SkillActivated {
150        session_id: String,
151        skill_name: String,
152        iteration: usize,
153        reason: String,
154    },
155    /// Emitted when a previously-active skill is deactivated because
156    /// the reassess phase no longer matches it.
157    SkillDeactivated {
158        session_id: String,
159        skill_name: String,
160        iteration: usize,
161    },
162    /// Emitted once per activation when the skill's `allowed_tools` filter
163    /// narrows the effective tool surface exposed to the model.
164    SkillScopeTools {
165        session_id: String,
166        skill_name: String,
167        allowed_tools: Vec<String>,
168    },
169    /// Emitted when a `tool_search` query is issued by the model. Carries
170    /// the raw query args, the configured strategy, and a `mode` tag
171    /// distinguishing the client-executed fallback (`"client"`) from
172    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
173    /// transcript event shape so IDEs can render a search-in-progress
174    /// chip in real time — the replay path walks the transcript after
175    /// the turn, which is too late for live UX.
176    ToolSearchQuery {
177        session_id: String,
178        tool_use_id: String,
179        name: String,
180        query: serde_json::Value,
181        strategy: String,
182        mode: String,
183    },
184    /// Emitted when `tool_search` resolves — carries the list of tool
185    /// names newly promoted into the model's effective surface for the
186    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
187    ToolSearchResult {
188        session_id: String,
189        tool_use_id: String,
190        promoted: Vec<String>,
191        strategy: String,
192        mode: String,
193    },
194    TranscriptCompacted {
195        session_id: String,
196        mode: String,
197        strategy: String,
198        archived_messages: usize,
199        estimated_tokens_before: usize,
200        estimated_tokens_after: usize,
201        snapshot_asset_id: Option<String>,
202    },
203}
204
205impl AgentEvent {
206    pub fn session_id(&self) -> &str {
207        match self {
208            Self::AgentMessageChunk { session_id, .. }
209            | Self::AgentThoughtChunk { session_id, .. }
210            | Self::ToolCall { session_id, .. }
211            | Self::ToolCallUpdate { session_id, .. }
212            | Self::Plan { session_id, .. }
213            | Self::TurnStart { session_id, .. }
214            | Self::TurnEnd { session_id, .. }
215            | Self::FeedbackInjected { session_id, .. }
216            | Self::BudgetExhausted { session_id, .. }
217            | Self::LoopStuck { session_id, .. }
218            | Self::DaemonWatchdogTripped { session_id, .. }
219            | Self::SkillActivated { session_id, .. }
220            | Self::SkillDeactivated { session_id, .. }
221            | Self::SkillScopeTools { session_id, .. }
222            | Self::ToolSearchQuery { session_id, .. }
223            | Self::ToolSearchResult { session_id, .. }
224            | Self::TranscriptCompacted { session_id, .. } => session_id,
225        }
226    }
227}
228
229/// External consumers of the event stream (e.g. the harn-cli ACP server,
230/// which translates events into JSON-RPC notifications).
231pub trait AgentEventSink: Send + Sync {
232    fn handle_event(&self, event: &AgentEvent);
233}
234
235/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
236/// `AgentEvent` with monotonic index + timestamp + frame depth so
237/// replay engines can reconstruct paused state at any event index,
238/// and scrubber UIs can bucket events by time. The envelope is the
239/// on-disk shape; the wire format for live consumers is still the
240/// raw `AgentEvent` so existing sinks don't churn.
241#[derive(Clone, Debug, Serialize, Deserialize)]
242pub struct PersistedAgentEvent {
243    /// Monotonic per-session index starting at 0. Unique within a
244    /// session; gaps never happen even under load because the sink
245    /// owns the counter under a mutex.
246    pub index: u64,
247    /// Milliseconds since the Unix epoch, captured when the sink
248    /// received the event. Not the event's emission time — that
249    /// would require threading a clock through every emit site.
250    pub emitted_at_ms: i64,
251    /// Call-stack depth at the moment of emission, when the caller
252    /// can supply it. `None` for events emitted from a context where
253    /// the VM frame stack isn't available.
254    pub frame_depth: Option<u32>,
255    /// The raw event, flattened so `jq '.type'` works as expected.
256    #[serde(flatten)]
257    pub event: AgentEvent,
258}
259
260/// Append-only JSONL sink for a single session's event stream (#103).
261/// One writer per session; sinks rotate to a numbered suffix when a
262/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
263/// sessions rarely exceed 5 MB, so rotation almost never fires).
264pub struct JsonlEventSink {
265    state: Mutex<JsonlEventSinkState>,
266    base_path: std::path::PathBuf,
267}
268
269struct JsonlEventSinkState {
270    writer: std::io::BufWriter<std::fs::File>,
271    index: u64,
272    bytes_written: u64,
273    rotation: u32,
274}
275
276impl JsonlEventSink {
277    /// Hard cap past which the current file rotates to a numbered
278    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
279    /// sessions don't produce unreadable multi-GB logs.
280    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
281
282    /// Open a new sink writing to `base_path`. Creates parent dirs
283    /// if missing. Overwrites an existing file so each fresh session
284    /// starts from index 0.
285    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
286        let base_path = base_path.into();
287        if let Some(parent) = base_path.parent() {
288            std::fs::create_dir_all(parent)?;
289        }
290        let file = std::fs::OpenOptions::new()
291            .create(true)
292            .truncate(true)
293            .write(true)
294            .open(&base_path)?;
295        Ok(Arc::new(Self {
296            state: Mutex::new(JsonlEventSinkState {
297                writer: std::io::BufWriter::new(file),
298                index: 0,
299                bytes_written: 0,
300                rotation: 0,
301            }),
302            base_path,
303        }))
304    }
305
306    /// Flush any buffered writes. Called on session shutdown; the
307    /// Drop impl calls this too but on early panic it may not run.
308    pub fn flush(&self) -> std::io::Result<()> {
309        use std::io::Write as _;
310        self.state
311            .lock()
312            .expect("jsonl sink mutex poisoned")
313            .writer
314            .flush()
315    }
316
317    /// Current event index — primarily for tests and the "how many
318    /// events are in this run" run-record summary.
319    pub fn event_count(&self) -> u64 {
320        self.state.lock().expect("jsonl sink mutex poisoned").index
321    }
322
323    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
324        use std::io::Write as _;
325        if state.bytes_written < Self::ROTATE_BYTES {
326            return Ok(());
327        }
328        state.writer.flush()?;
329        state.rotation += 1;
330        let suffix = format!("-{:06}", state.rotation);
331        let rotated = self.base_path.with_file_name({
332            let stem = self
333                .base_path
334                .file_stem()
335                .and_then(|s| s.to_str())
336                .unwrap_or("event_log");
337            let ext = self
338                .base_path
339                .extension()
340                .and_then(|e| e.to_str())
341                .unwrap_or("jsonl");
342            format!("{stem}{suffix}.{ext}")
343        });
344        let file = std::fs::OpenOptions::new()
345            .create(true)
346            .truncate(true)
347            .write(true)
348            .open(&rotated)?;
349        state.writer = std::io::BufWriter::new(file);
350        state.bytes_written = 0;
351        Ok(())
352    }
353}
354
355impl AgentEventSink for JsonlEventSink {
356    fn handle_event(&self, event: &AgentEvent) {
357        use std::io::Write as _;
358        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
359        let index = state.index;
360        state.index += 1;
361        let emitted_at_ms = std::time::SystemTime::now()
362            .duration_since(std::time::UNIX_EPOCH)
363            .map(|d| d.as_millis() as i64)
364            .unwrap_or(0);
365        let envelope = PersistedAgentEvent {
366            index,
367            emitted_at_ms,
368            frame_depth: None,
369            event: event.clone(),
370        };
371        if let Ok(line) = serde_json::to_string(&envelope) {
372            // One line, newline-terminated — JSON Lines spec.
373            // Errors here are swallowed on purpose; a failing write
374            // must never crash the agent loop, and the run record
375            // itself is a secondary artifact.
376            let _ = state.writer.write_all(line.as_bytes());
377            let _ = state.writer.write_all(b"\n");
378            state.bytes_written += line.len() as u64 + 1;
379            let _ = self.rotate_if_needed(&mut state);
380        }
381    }
382}
383
384impl Drop for JsonlEventSink {
385    fn drop(&mut self) {
386        if let Ok(mut state) = self.state.lock() {
387            use std::io::Write as _;
388            let _ = state.writer.flush();
389        }
390    }
391}
392
393/// Fan-out helper for composing multiple external sinks.
394pub struct MultiSink {
395    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
396}
397
398impl MultiSink {
399    pub fn new() -> Self {
400        Self {
401            sinks: Mutex::new(Vec::new()),
402        }
403    }
404    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
405        self.sinks.lock().expect("sink mutex poisoned").push(sink);
406    }
407    pub fn len(&self) -> usize {
408        self.sinks.lock().expect("sink mutex poisoned").len()
409    }
410    pub fn is_empty(&self) -> bool {
411        self.len() == 0
412    }
413}
414
415impl Default for MultiSink {
416    fn default() -> Self {
417        Self::new()
418    }
419}
420
421impl AgentEventSink for MultiSink {
422    fn handle_event(&self, event: &AgentEvent) {
423        // Deliberate: snapshot then release the lock before invoking sink
424        // callbacks. Sinks can re-enter the event system (e.g. a host
425        // sink that logs to another AgentEvent path), so holding the
426        // mutex across the callback would risk self-deadlock. Arc clones
427        // are refcount bumps — cheap.
428        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
429        for sink in sinks {
430            sink.handle_event(event);
431        }
432    }
433}
434
435type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
436
437fn external_sinks() -> &'static ExternalSinkRegistry {
438    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
439    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
440}
441
442pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
443    let session_id = session_id.into();
444    let mut reg = external_sinks().write().expect("sink registry poisoned");
445    reg.entry(session_id).or_default().push(sink);
446}
447
448/// Remove all external sinks registered for `session_id`. Does NOT
449/// close the session itself — subscribers and transcript survive, so a
450/// later `agent_loop` call with the same id continues the conversation.
451pub fn clear_session_sinks(session_id: &str) {
452    external_sinks()
453        .write()
454        .expect("sink registry poisoned")
455        .remove(session_id);
456}
457
458pub fn reset_all_sinks() {
459    external_sinks()
460        .write()
461        .expect("sink registry poisoned")
462        .clear();
463    crate::agent_sessions::reset_session_store();
464}
465
466/// Emit an event to external sinks registered for this session. Pipeline
467/// closure subscribers are NOT called by this function — the agent
468/// loop owns that path because it needs its async VM context.
469pub fn emit_event(event: &AgentEvent) {
470    let sinks: Vec<Arc<dyn AgentEventSink>> = {
471        let reg = external_sinks().read().expect("sink registry poisoned");
472        reg.get(event.session_id()).cloned().unwrap_or_default()
473    };
474    for sink in sinks {
475        sink.handle_event(event);
476    }
477}
478
479pub fn session_external_sink_count(session_id: &str) -> usize {
480    external_sinks()
481        .read()
482        .expect("sink registry poisoned")
483        .get(session_id)
484        .map(|v| v.len())
485        .unwrap_or(0)
486}
487
488pub fn session_closure_subscriber_count(session_id: &str) -> usize {
489    crate::agent_sessions::subscriber_count(session_id)
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use std::sync::atomic::{AtomicUsize, Ordering};
496
497    struct CountingSink(Arc<AtomicUsize>);
498    impl AgentEventSink for CountingSink {
499        fn handle_event(&self, _event: &AgentEvent) {
500            self.0.fetch_add(1, Ordering::SeqCst);
501        }
502    }
503
504    #[test]
505    fn multi_sink_fans_out_in_order() {
506        let multi = MultiSink::new();
507        let a = Arc::new(AtomicUsize::new(0));
508        let b = Arc::new(AtomicUsize::new(0));
509        multi.push(Arc::new(CountingSink(a.clone())));
510        multi.push(Arc::new(CountingSink(b.clone())));
511        let event = AgentEvent::TurnStart {
512            session_id: "s1".into(),
513            iteration: 1,
514        };
515        multi.handle_event(&event);
516        assert_eq!(a.load(Ordering::SeqCst), 1);
517        assert_eq!(b.load(Ordering::SeqCst), 1);
518    }
519
520    #[test]
521    fn session_scoped_sink_routing() {
522        reset_all_sinks();
523        let a = Arc::new(AtomicUsize::new(0));
524        let b = Arc::new(AtomicUsize::new(0));
525        register_sink("session-a", Arc::new(CountingSink(a.clone())));
526        register_sink("session-b", Arc::new(CountingSink(b.clone())));
527        emit_event(&AgentEvent::TurnStart {
528            session_id: "session-a".into(),
529            iteration: 0,
530        });
531        assert_eq!(a.load(Ordering::SeqCst), 1);
532        assert_eq!(b.load(Ordering::SeqCst), 0);
533        emit_event(&AgentEvent::TurnEnd {
534            session_id: "session-b".into(),
535            iteration: 0,
536            turn_info: serde_json::json!({}),
537        });
538        assert_eq!(a.load(Ordering::SeqCst), 1);
539        assert_eq!(b.load(Ordering::SeqCst), 1);
540        clear_session_sinks("session-a");
541        assert_eq!(session_external_sink_count("session-a"), 0);
542        assert_eq!(session_external_sink_count("session-b"), 1);
543        reset_all_sinks();
544    }
545
546    #[test]
547    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
548        use std::io::{BufRead, BufReader};
549        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
550        std::fs::create_dir_all(&dir).unwrap();
551        let path = dir.join("event_log.jsonl");
552        let sink = JsonlEventSink::open(&path).unwrap();
553        for i in 0..5 {
554            sink.handle_event(&AgentEvent::TurnStart {
555                session_id: "s".into(),
556                iteration: i,
557            });
558        }
559        assert_eq!(sink.event_count(), 5);
560        sink.flush().unwrap();
561
562        // Read back + assert monotonic indices + non-decreasing timestamps.
563        let file = std::fs::File::open(&path).unwrap();
564        let mut last_idx: i64 = -1;
565        let mut last_ts: i64 = 0;
566        for line in BufReader::new(file).lines() {
567            let line = line.unwrap();
568            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
569            let idx = val["index"].as_i64().unwrap();
570            let ts = val["emitted_at_ms"].as_i64().unwrap();
571            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
572            assert!(ts >= last_ts, "timestamps must be non-decreasing");
573            last_idx = idx;
574            last_ts = ts;
575            // Event payload flattened — type tag must survive.
576            assert_eq!(val["type"], "turn_start");
577        }
578        assert_eq!(last_idx, 4);
579        let _ = std::fs::remove_file(&path);
580    }
581
582    #[test]
583    fn tool_call_status_serde() {
584        assert_eq!(
585            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
586            "\"pending\""
587        );
588        assert_eq!(
589            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
590            "\"in_progress\""
591        );
592        assert_eq!(
593            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
594            "\"completed\""
595        );
596        assert_eq!(
597            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
598            "\"failed\""
599        );
600    }
601}