Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::HandoffArtifact;
28use crate::tool_annotations::ToolKind;
29
30/// Typed worker lifecycle events emitted by delegated/background agent
31/// execution. Bridge-facing worker updates still derive a string status
32/// from these variants, but the runtime no longer passes raw status
33/// strings around internally.
34#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
35pub enum WorkerEvent {
36    WorkerSpawned,
37    WorkerCompleted,
38    WorkerFailed,
39    WorkerCancelled,
40}
41
42impl WorkerEvent {
43    pub fn as_status(self) -> &'static str {
44        match self {
45            Self::WorkerSpawned => "running",
46            Self::WorkerCompleted => "completed",
47            Self::WorkerFailed => "failed",
48            Self::WorkerCancelled => "cancelled",
49        }
50    }
51
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::WorkerSpawned => "WorkerSpawned",
55            Self::WorkerCompleted => "WorkerCompleted",
56            Self::WorkerFailed => "WorkerFailed",
57            Self::WorkerCancelled => "WorkerCancelled",
58        }
59    }
60}
61
62/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
63#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum ToolCallStatus {
66    /// Dispatched by the model but not yet started.
67    Pending,
68    /// Dispatch is actively running.
69    InProgress,
70    /// Finished successfully.
71    Completed,
72    /// Finished with an error.
73    Failed,
74}
75
76/// Events emitted by the agent loop. The first five variants map 1:1
77/// to ACP `sessionUpdate` variants; the last three are harn-internal.
78#[derive(Clone, Debug, Serialize, Deserialize)]
79#[serde(tag = "type", rename_all = "snake_case")]
80pub enum AgentEvent {
81    AgentMessageChunk {
82        session_id: String,
83        content: String,
84    },
85    AgentThoughtChunk {
86        session_id: String,
87        content: String,
88    },
89    ToolCall {
90        session_id: String,
91        tool_call_id: String,
92        tool_name: String,
93        kind: Option<ToolKind>,
94        status: ToolCallStatus,
95        raw_input: serde_json::Value,
96    },
97    ToolCallUpdate {
98        session_id: String,
99        tool_call_id: String,
100        tool_name: String,
101        status: ToolCallStatus,
102        raw_output: Option<serde_json::Value>,
103        error: Option<String>,
104    },
105    Plan {
106        session_id: String,
107        plan: serde_json::Value,
108    },
109    TurnStart {
110        session_id: String,
111        iteration: usize,
112    },
113    TurnEnd {
114        session_id: String,
115        iteration: usize,
116        turn_info: serde_json::Value,
117    },
118    FeedbackInjected {
119        session_id: String,
120        kind: String,
121        content: String,
122    },
123    /// Emitted when the agent loop exhausts `max_iterations` without any
124    /// explicit break condition firing. Distinct from a natural "done" or
125    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
126    BudgetExhausted {
127        session_id: String,
128        max_iterations: usize,
129    },
130    /// Emitted when the loop breaks because consecutive text-only turns
131    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
132    /// hosts that key off agent-terminal events.
133    LoopStuck {
134        session_id: String,
135        max_nudges: usize,
136        last_iteration: usize,
137        tail_excerpt: String,
138    },
139    /// Emitted when the daemon idle-wait loop trips its watchdog because
140    /// every configured wake source returned `None` for N consecutive
141    /// attempts. Exists so a broken daemon doesn't hang the session
142    /// silently.
143    DaemonWatchdogTripped {
144        session_id: String,
145        attempts: usize,
146        elapsed_ms: u64,
147    },
148    /// Emitted when a skill is activated. Carries the match reason so
149    /// replayers can reconstruct *why* a given skill took effect at
150    /// this iteration.
151    SkillActivated {
152        session_id: String,
153        skill_name: String,
154        iteration: usize,
155        reason: String,
156    },
157    /// Emitted when a previously-active skill is deactivated because
158    /// the reassess phase no longer matches it.
159    SkillDeactivated {
160        session_id: String,
161        skill_name: String,
162        iteration: usize,
163    },
164    /// Emitted once per activation when the skill's `allowed_tools` filter
165    /// narrows the effective tool surface exposed to the model.
166    SkillScopeTools {
167        session_id: String,
168        skill_name: String,
169        allowed_tools: Vec<String>,
170    },
171    /// Emitted when a `tool_search` query is issued by the model. Carries
172    /// the raw query args, the configured strategy, and a `mode` tag
173    /// distinguishing the client-executed fallback (`"client"`) from
174    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
175    /// transcript event shape so hosts can render a search-in-progress
176    /// chip in real time — the replay path walks the transcript after
177    /// the turn, which is too late for live UX.
178    ToolSearchQuery {
179        session_id: String,
180        tool_use_id: String,
181        name: String,
182        query: serde_json::Value,
183        strategy: String,
184        mode: String,
185    },
186    /// Emitted when `tool_search` resolves — carries the list of tool
187    /// names newly promoted into the model's effective surface for the
188    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
189    ToolSearchResult {
190        session_id: String,
191        tool_use_id: String,
192        promoted: Vec<String>,
193        strategy: String,
194        mode: String,
195    },
196    TranscriptCompacted {
197        session_id: String,
198        mode: String,
199        strategy: String,
200        archived_messages: usize,
201        estimated_tokens_before: usize,
202        estimated_tokens_after: usize,
203        snapshot_asset_id: Option<String>,
204    },
205    Handoff {
206        session_id: String,
207        artifact_id: String,
208        handoff: Box<HandoffArtifact>,
209    },
210}
211
212impl AgentEvent {
213    pub fn session_id(&self) -> &str {
214        match self {
215            Self::AgentMessageChunk { session_id, .. }
216            | Self::AgentThoughtChunk { session_id, .. }
217            | Self::ToolCall { session_id, .. }
218            | Self::ToolCallUpdate { session_id, .. }
219            | Self::Plan { session_id, .. }
220            | Self::TurnStart { session_id, .. }
221            | Self::TurnEnd { session_id, .. }
222            | Self::FeedbackInjected { session_id, .. }
223            | Self::BudgetExhausted { session_id, .. }
224            | Self::LoopStuck { session_id, .. }
225            | Self::DaemonWatchdogTripped { session_id, .. }
226            | Self::SkillActivated { session_id, .. }
227            | Self::SkillDeactivated { session_id, .. }
228            | Self::SkillScopeTools { session_id, .. }
229            | Self::ToolSearchQuery { session_id, .. }
230            | Self::ToolSearchResult { session_id, .. }
231            | Self::TranscriptCompacted { session_id, .. }
232            | Self::Handoff { session_id, .. } => session_id,
233        }
234    }
235}
236
237/// External consumers of the event stream (e.g. the harn-cli ACP server,
238/// which translates events into JSON-RPC notifications).
239pub trait AgentEventSink: Send + Sync {
240    fn handle_event(&self, event: &AgentEvent);
241}
242
243/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
244/// `AgentEvent` with monotonic index + timestamp + frame depth so
245/// replay engines can reconstruct paused state at any event index,
246/// and scrubber UIs can bucket events by time. The envelope is the
247/// on-disk shape; the wire format for live consumers is still the
248/// raw `AgentEvent` so existing sinks don't churn.
249#[derive(Clone, Debug, Serialize, Deserialize)]
250pub struct PersistedAgentEvent {
251    /// Monotonic per-session index starting at 0. Unique within a
252    /// session; gaps never happen even under load because the sink
253    /// owns the counter under a mutex.
254    pub index: u64,
255    /// Milliseconds since the Unix epoch, captured when the sink
256    /// received the event. Not the event's emission time — that
257    /// would require threading a clock through every emit site.
258    pub emitted_at_ms: i64,
259    /// Call-stack depth at the moment of emission, when the caller
260    /// can supply it. `None` for events emitted from a context where
261    /// the VM frame stack isn't available.
262    pub frame_depth: Option<u32>,
263    /// The raw event, flattened so `jq '.type'` works as expected.
264    #[serde(flatten)]
265    pub event: AgentEvent,
266}
267
268/// Append-only JSONL sink for a single session's event stream (#103).
269/// One writer per session; sinks rotate to a numbered suffix when a
270/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
271/// sessions rarely exceed 5 MB, so rotation almost never fires).
272pub struct JsonlEventSink {
273    state: Mutex<JsonlEventSinkState>,
274    base_path: std::path::PathBuf,
275}
276
277struct JsonlEventSinkState {
278    writer: std::io::BufWriter<std::fs::File>,
279    index: u64,
280    bytes_written: u64,
281    rotation: u32,
282}
283
284impl JsonlEventSink {
285    /// Hard cap past which the current file rotates to a numbered
286    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
287    /// sessions don't produce unreadable multi-GB logs.
288    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
289
290    /// Open a new sink writing to `base_path`. Creates parent dirs
291    /// if missing. Overwrites an existing file so each fresh session
292    /// starts from index 0.
293    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
294        let base_path = base_path.into();
295        if let Some(parent) = base_path.parent() {
296            std::fs::create_dir_all(parent)?;
297        }
298        let file = std::fs::OpenOptions::new()
299            .create(true)
300            .truncate(true)
301            .write(true)
302            .open(&base_path)?;
303        Ok(Arc::new(Self {
304            state: Mutex::new(JsonlEventSinkState {
305                writer: std::io::BufWriter::new(file),
306                index: 0,
307                bytes_written: 0,
308                rotation: 0,
309            }),
310            base_path,
311        }))
312    }
313
314    /// Flush any buffered writes. Called on session shutdown; the
315    /// Drop impl calls this too but on early panic it may not run.
316    pub fn flush(&self) -> std::io::Result<()> {
317        use std::io::Write as _;
318        self.state
319            .lock()
320            .expect("jsonl sink mutex poisoned")
321            .writer
322            .flush()
323    }
324
325    /// Current event index — primarily for tests and the "how many
326    /// events are in this run" run-record summary.
327    pub fn event_count(&self) -> u64 {
328        self.state.lock().expect("jsonl sink mutex poisoned").index
329    }
330
331    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
332        use std::io::Write as _;
333        if state.bytes_written < Self::ROTATE_BYTES {
334            return Ok(());
335        }
336        state.writer.flush()?;
337        state.rotation += 1;
338        let suffix = format!("-{:06}", state.rotation);
339        let rotated = self.base_path.with_file_name({
340            let stem = self
341                .base_path
342                .file_stem()
343                .and_then(|s| s.to_str())
344                .unwrap_or("event_log");
345            let ext = self
346                .base_path
347                .extension()
348                .and_then(|e| e.to_str())
349                .unwrap_or("jsonl");
350            format!("{stem}{suffix}.{ext}")
351        });
352        let file = std::fs::OpenOptions::new()
353            .create(true)
354            .truncate(true)
355            .write(true)
356            .open(&rotated)?;
357        state.writer = std::io::BufWriter::new(file);
358        state.bytes_written = 0;
359        Ok(())
360    }
361}
362
363/// Event-log-backed sink for a single session's agent event stream.
364/// Uses the generalized append-only event log when one is installed for
365/// the current VM thread and falls back to `JsonlEventSink` only for
366/// older env-driven workflows.
367pub struct EventLogSink {
368    log: Arc<AnyEventLog>,
369    topic: Topic,
370    session_id: String,
371}
372
373impl EventLogSink {
374    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
375        let session_id = session_id.into();
376        let topic = Topic::new(format!(
377            "observability.agent_events.{}",
378            crate::event_log::sanitize_topic_component(&session_id)
379        ))
380        .expect("session id should sanitize to a valid topic");
381        Arc::new(Self {
382            log,
383            topic,
384            session_id,
385        })
386    }
387}
388
389impl AgentEventSink for JsonlEventSink {
390    fn handle_event(&self, event: &AgentEvent) {
391        use std::io::Write as _;
392        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
393        let index = state.index;
394        state.index += 1;
395        let emitted_at_ms = std::time::SystemTime::now()
396            .duration_since(std::time::UNIX_EPOCH)
397            .map(|d| d.as_millis() as i64)
398            .unwrap_or(0);
399        let envelope = PersistedAgentEvent {
400            index,
401            emitted_at_ms,
402            frame_depth: None,
403            event: event.clone(),
404        };
405        if let Ok(line) = serde_json::to_string(&envelope) {
406            // One line, newline-terminated — JSON Lines spec.
407            // Errors here are swallowed on purpose; a failing write
408            // must never crash the agent loop, and the run record
409            // itself is a secondary artifact.
410            let _ = state.writer.write_all(line.as_bytes());
411            let _ = state.writer.write_all(b"\n");
412            state.bytes_written += line.len() as u64 + 1;
413            let _ = self.rotate_if_needed(&mut state);
414        }
415    }
416}
417
418impl AgentEventSink for EventLogSink {
419    fn handle_event(&self, event: &AgentEvent) {
420        let event_json = match serde_json::to_value(event) {
421            Ok(value) => value,
422            Err(_) => return,
423        };
424        let event_kind = event_json
425            .get("type")
426            .and_then(|value| value.as_str())
427            .unwrap_or("agent_event")
428            .to_string();
429        let payload = serde_json::json!({
430            "index_hint": now_ms(),
431            "session_id": self.session_id,
432            "event": event_json,
433        });
434        let mut headers = std::collections::BTreeMap::new();
435        headers.insert("session_id".to_string(), self.session_id.clone());
436        let log = self.log.clone();
437        let topic = self.topic.clone();
438        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
439        if let Ok(handle) = tokio::runtime::Handle::try_current() {
440            handle.spawn(async move {
441                let _ = log.append(&topic, record).await;
442            });
443        } else {
444            let _ = futures::executor::block_on(log.append(&topic, record));
445        }
446    }
447}
448
449impl Drop for JsonlEventSink {
450    fn drop(&mut self) {
451        if let Ok(mut state) = self.state.lock() {
452            use std::io::Write as _;
453            let _ = state.writer.flush();
454        }
455    }
456}
457
458/// Fan-out helper for composing multiple external sinks.
459pub struct MultiSink {
460    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
461}
462
463impl MultiSink {
464    pub fn new() -> Self {
465        Self {
466            sinks: Mutex::new(Vec::new()),
467        }
468    }
469    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
470        self.sinks.lock().expect("sink mutex poisoned").push(sink);
471    }
472    pub fn len(&self) -> usize {
473        self.sinks.lock().expect("sink mutex poisoned").len()
474    }
475    pub fn is_empty(&self) -> bool {
476        self.len() == 0
477    }
478}
479
480impl Default for MultiSink {
481    fn default() -> Self {
482        Self::new()
483    }
484}
485
486impl AgentEventSink for MultiSink {
487    fn handle_event(&self, event: &AgentEvent) {
488        // Deliberate: snapshot then release the lock before invoking sink
489        // callbacks. Sinks can re-enter the event system (e.g. a host
490        // sink that logs to another AgentEvent path), so holding the
491        // mutex across the callback would risk self-deadlock. Arc clones
492        // are refcount bumps — cheap.
493        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
494        for sink in sinks {
495            sink.handle_event(event);
496        }
497    }
498}
499
500type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
501
502fn external_sinks() -> &'static ExternalSinkRegistry {
503    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
504    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
505}
506
507pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
508    let session_id = session_id.into();
509    let mut reg = external_sinks().write().expect("sink registry poisoned");
510    reg.entry(session_id).or_default().push(sink);
511}
512
513/// Remove all external sinks registered for `session_id`. Does NOT
514/// close the session itself — subscribers and transcript survive, so a
515/// later `agent_loop` call with the same id continues the conversation.
516pub fn clear_session_sinks(session_id: &str) {
517    external_sinks()
518        .write()
519        .expect("sink registry poisoned")
520        .remove(session_id);
521}
522
523pub fn reset_all_sinks() {
524    external_sinks()
525        .write()
526        .expect("sink registry poisoned")
527        .clear();
528    crate::agent_sessions::reset_session_store();
529}
530
531/// Emit an event to external sinks registered for this session. Pipeline
532/// closure subscribers are NOT called by this function — the agent
533/// loop owns that path because it needs its async VM context.
534pub fn emit_event(event: &AgentEvent) {
535    let sinks: Vec<Arc<dyn AgentEventSink>> = {
536        let reg = external_sinks().read().expect("sink registry poisoned");
537        reg.get(event.session_id()).cloned().unwrap_or_default()
538    };
539    for sink in sinks {
540        sink.handle_event(event);
541    }
542}
543
544fn now_ms() -> i64 {
545    std::time::SystemTime::now()
546        .duration_since(std::time::UNIX_EPOCH)
547        .map(|duration| duration.as_millis() as i64)
548        .unwrap_or(0)
549}
550
551pub fn session_external_sink_count(session_id: &str) -> usize {
552    external_sinks()
553        .read()
554        .expect("sink registry poisoned")
555        .get(session_id)
556        .map(|v| v.len())
557        .unwrap_or(0)
558}
559
560pub fn session_closure_subscriber_count(session_id: &str) -> usize {
561    crate::agent_sessions::subscriber_count(session_id)
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use std::sync::atomic::{AtomicUsize, Ordering};
568
569    struct CountingSink(Arc<AtomicUsize>);
570    impl AgentEventSink for CountingSink {
571        fn handle_event(&self, _event: &AgentEvent) {
572            self.0.fetch_add(1, Ordering::SeqCst);
573        }
574    }
575
576    #[test]
577    fn multi_sink_fans_out_in_order() {
578        let multi = MultiSink::new();
579        let a = Arc::new(AtomicUsize::new(0));
580        let b = Arc::new(AtomicUsize::new(0));
581        multi.push(Arc::new(CountingSink(a.clone())));
582        multi.push(Arc::new(CountingSink(b.clone())));
583        let event = AgentEvent::TurnStart {
584            session_id: "s1".into(),
585            iteration: 1,
586        };
587        multi.handle_event(&event);
588        assert_eq!(a.load(Ordering::SeqCst), 1);
589        assert_eq!(b.load(Ordering::SeqCst), 1);
590    }
591
592    #[test]
593    fn session_scoped_sink_routing() {
594        reset_all_sinks();
595        let a = Arc::new(AtomicUsize::new(0));
596        let b = Arc::new(AtomicUsize::new(0));
597        register_sink("session-a", Arc::new(CountingSink(a.clone())));
598        register_sink("session-b", Arc::new(CountingSink(b.clone())));
599        emit_event(&AgentEvent::TurnStart {
600            session_id: "session-a".into(),
601            iteration: 0,
602        });
603        assert_eq!(a.load(Ordering::SeqCst), 1);
604        assert_eq!(b.load(Ordering::SeqCst), 0);
605        emit_event(&AgentEvent::TurnEnd {
606            session_id: "session-b".into(),
607            iteration: 0,
608            turn_info: serde_json::json!({}),
609        });
610        assert_eq!(a.load(Ordering::SeqCst), 1);
611        assert_eq!(b.load(Ordering::SeqCst), 1);
612        clear_session_sinks("session-a");
613        assert_eq!(session_external_sink_count("session-a"), 0);
614        assert_eq!(session_external_sink_count("session-b"), 1);
615        reset_all_sinks();
616    }
617
618    #[test]
619    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
620        use std::io::{BufRead, BufReader};
621        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
622        std::fs::create_dir_all(&dir).unwrap();
623        let path = dir.join("event_log.jsonl");
624        let sink = JsonlEventSink::open(&path).unwrap();
625        for i in 0..5 {
626            sink.handle_event(&AgentEvent::TurnStart {
627                session_id: "s".into(),
628                iteration: i,
629            });
630        }
631        assert_eq!(sink.event_count(), 5);
632        sink.flush().unwrap();
633
634        // Read back + assert monotonic indices + non-decreasing timestamps.
635        let file = std::fs::File::open(&path).unwrap();
636        let mut last_idx: i64 = -1;
637        let mut last_ts: i64 = 0;
638        for line in BufReader::new(file).lines() {
639            let line = line.unwrap();
640            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
641            let idx = val["index"].as_i64().unwrap();
642            let ts = val["emitted_at_ms"].as_i64().unwrap();
643            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
644            assert!(ts >= last_ts, "timestamps must be non-decreasing");
645            last_idx = idx;
646            last_ts = ts;
647            // Event payload flattened — type tag must survive.
648            assert_eq!(val["type"], "turn_start");
649        }
650        assert_eq!(last_idx, 4);
651        let _ = std::fs::remove_file(&path);
652    }
653
654    #[test]
655    fn tool_call_status_serde() {
656        assert_eq!(
657            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
658            "\"pending\""
659        );
660        assert_eq!(
661            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
662            "\"in_progress\""
663        );
664        assert_eq!(
665            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
666            "\"completed\""
667        );
668        assert_eq!(
669            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
670            "\"failed\""
671        );
672    }
673}