Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::tool_annotations::ToolKind;
28
29/// Typed worker lifecycle events emitted by delegated/background agent
30/// execution. Bridge-facing worker updates still derive a string status
31/// from these variants, but the runtime no longer passes raw status
32/// strings around internally.
33#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
34pub enum WorkerEvent {
35    WorkerSpawned,
36    WorkerCompleted,
37    WorkerFailed,
38    WorkerCancelled,
39}
40
41impl WorkerEvent {
42    pub fn as_status(self) -> &'static str {
43        match self {
44            Self::WorkerSpawned => "running",
45            Self::WorkerCompleted => "completed",
46            Self::WorkerFailed => "failed",
47            Self::WorkerCancelled => "cancelled",
48        }
49    }
50
51    pub fn as_str(self) -> &'static str {
52        match self {
53            Self::WorkerSpawned => "WorkerSpawned",
54            Self::WorkerCompleted => "WorkerCompleted",
55            Self::WorkerFailed => "WorkerFailed",
56            Self::WorkerCancelled => "WorkerCancelled",
57        }
58    }
59}
60
61/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
62#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum ToolCallStatus {
65    /// Dispatched by the model but not yet started.
66    Pending,
67    /// Dispatch is actively running.
68    InProgress,
69    /// Finished successfully.
70    Completed,
71    /// Finished with an error.
72    Failed,
73}
74
75/// Events emitted by the agent loop. The first five variants map 1:1
76/// to ACP `sessionUpdate` variants; the last three are harn-internal.
77#[derive(Clone, Debug, Serialize, Deserialize)]
78#[serde(tag = "type", rename_all = "snake_case")]
79pub enum AgentEvent {
80    AgentMessageChunk {
81        session_id: String,
82        content: String,
83    },
84    AgentThoughtChunk {
85        session_id: String,
86        content: String,
87    },
88    ToolCall {
89        session_id: String,
90        tool_call_id: String,
91        tool_name: String,
92        kind: Option<ToolKind>,
93        status: ToolCallStatus,
94        raw_input: serde_json::Value,
95    },
96    ToolCallUpdate {
97        session_id: String,
98        tool_call_id: String,
99        tool_name: String,
100        status: ToolCallStatus,
101        raw_output: Option<serde_json::Value>,
102        error: Option<String>,
103    },
104    Plan {
105        session_id: String,
106        plan: serde_json::Value,
107    },
108    TurnStart {
109        session_id: String,
110        iteration: usize,
111    },
112    TurnEnd {
113        session_id: String,
114        iteration: usize,
115        turn_info: serde_json::Value,
116    },
117    FeedbackInjected {
118        session_id: String,
119        kind: String,
120        content: String,
121    },
122    /// Emitted when the agent loop exhausts `max_iterations` without any
123    /// explicit break condition firing. Distinct from a natural "done" or
124    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
125    BudgetExhausted {
126        session_id: String,
127        max_iterations: usize,
128    },
129    /// Emitted when the loop breaks because consecutive text-only turns
130    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
131    /// hosts that key off agent-terminal events.
132    LoopStuck {
133        session_id: String,
134        max_nudges: usize,
135        last_iteration: usize,
136        tail_excerpt: String,
137    },
138    /// Emitted when the daemon idle-wait loop trips its watchdog because
139    /// every configured wake source returned `None` for N consecutive
140    /// attempts. Exists so a broken daemon doesn't hang the session
141    /// silently.
142    DaemonWatchdogTripped {
143        session_id: String,
144        attempts: usize,
145        elapsed_ms: u64,
146    },
147    /// Emitted when a skill is activated. Carries the match reason so
148    /// replayers can reconstruct *why* a given skill took effect at
149    /// this iteration.
150    SkillActivated {
151        session_id: String,
152        skill_name: String,
153        iteration: usize,
154        reason: String,
155    },
156    /// Emitted when a previously-active skill is deactivated because
157    /// the reassess phase no longer matches it.
158    SkillDeactivated {
159        session_id: String,
160        skill_name: String,
161        iteration: usize,
162    },
163    /// Emitted once per activation when the skill's `allowed_tools` filter
164    /// narrows the effective tool surface exposed to the model.
165    SkillScopeTools {
166        session_id: String,
167        skill_name: String,
168        allowed_tools: Vec<String>,
169    },
170    /// Emitted when a `tool_search` query is issued by the model. Carries
171    /// the raw query args, the configured strategy, and a `mode` tag
172    /// distinguishing the client-executed fallback (`"client"`) from
173    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
174    /// transcript event shape so IDEs can render a search-in-progress
175    /// chip in real time — the replay path walks the transcript after
176    /// the turn, which is too late for live UX.
177    ToolSearchQuery {
178        session_id: String,
179        tool_use_id: String,
180        name: String,
181        query: serde_json::Value,
182        strategy: String,
183        mode: String,
184    },
185    /// Emitted when `tool_search` resolves — carries the list of tool
186    /// names newly promoted into the model's effective surface for the
187    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
188    ToolSearchResult {
189        session_id: String,
190        tool_use_id: String,
191        promoted: Vec<String>,
192        strategy: String,
193        mode: String,
194    },
195    TranscriptCompacted {
196        session_id: String,
197        mode: String,
198        strategy: String,
199        archived_messages: usize,
200        estimated_tokens_before: usize,
201        estimated_tokens_after: usize,
202        snapshot_asset_id: Option<String>,
203    },
204}
205
206impl AgentEvent {
207    pub fn session_id(&self) -> &str {
208        match self {
209            Self::AgentMessageChunk { session_id, .. }
210            | Self::AgentThoughtChunk { session_id, .. }
211            | Self::ToolCall { session_id, .. }
212            | Self::ToolCallUpdate { session_id, .. }
213            | Self::Plan { session_id, .. }
214            | Self::TurnStart { session_id, .. }
215            | Self::TurnEnd { session_id, .. }
216            | Self::FeedbackInjected { session_id, .. }
217            | Self::BudgetExhausted { session_id, .. }
218            | Self::LoopStuck { session_id, .. }
219            | Self::DaemonWatchdogTripped { session_id, .. }
220            | Self::SkillActivated { session_id, .. }
221            | Self::SkillDeactivated { session_id, .. }
222            | Self::SkillScopeTools { session_id, .. }
223            | Self::ToolSearchQuery { session_id, .. }
224            | Self::ToolSearchResult { session_id, .. }
225            | Self::TranscriptCompacted { session_id, .. } => session_id,
226        }
227    }
228}
229
230/// External consumers of the event stream (e.g. the harn-cli ACP server,
231/// which translates events into JSON-RPC notifications).
232pub trait AgentEventSink: Send + Sync {
233    fn handle_event(&self, event: &AgentEvent);
234}
235
236/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
237/// `AgentEvent` with monotonic index + timestamp + frame depth so
238/// replay engines can reconstruct paused state at any event index,
239/// and scrubber UIs can bucket events by time. The envelope is the
240/// on-disk shape; the wire format for live consumers is still the
241/// raw `AgentEvent` so existing sinks don't churn.
242#[derive(Clone, Debug, Serialize, Deserialize)]
243pub struct PersistedAgentEvent {
244    /// Monotonic per-session index starting at 0. Unique within a
245    /// session; gaps never happen even under load because the sink
246    /// owns the counter under a mutex.
247    pub index: u64,
248    /// Milliseconds since the Unix epoch, captured when the sink
249    /// received the event. Not the event's emission time — that
250    /// would require threading a clock through every emit site.
251    pub emitted_at_ms: i64,
252    /// Call-stack depth at the moment of emission, when the caller
253    /// can supply it. `None` for events emitted from a context where
254    /// the VM frame stack isn't available.
255    pub frame_depth: Option<u32>,
256    /// The raw event, flattened so `jq '.type'` works as expected.
257    #[serde(flatten)]
258    pub event: AgentEvent,
259}
260
261/// Append-only JSONL sink for a single session's event stream (#103).
262/// One writer per session; sinks rotate to a numbered suffix when a
263/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
264/// sessions rarely exceed 5 MB, so rotation almost never fires).
265pub struct JsonlEventSink {
266    state: Mutex<JsonlEventSinkState>,
267    base_path: std::path::PathBuf,
268}
269
270struct JsonlEventSinkState {
271    writer: std::io::BufWriter<std::fs::File>,
272    index: u64,
273    bytes_written: u64,
274    rotation: u32,
275}
276
277impl JsonlEventSink {
278    /// Hard cap past which the current file rotates to a numbered
279    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
280    /// sessions don't produce unreadable multi-GB logs.
281    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
282
283    /// Open a new sink writing to `base_path`. Creates parent dirs
284    /// if missing. Overwrites an existing file so each fresh session
285    /// starts from index 0.
286    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
287        let base_path = base_path.into();
288        if let Some(parent) = base_path.parent() {
289            std::fs::create_dir_all(parent)?;
290        }
291        let file = std::fs::OpenOptions::new()
292            .create(true)
293            .truncate(true)
294            .write(true)
295            .open(&base_path)?;
296        Ok(Arc::new(Self {
297            state: Mutex::new(JsonlEventSinkState {
298                writer: std::io::BufWriter::new(file),
299                index: 0,
300                bytes_written: 0,
301                rotation: 0,
302            }),
303            base_path,
304        }))
305    }
306
307    /// Flush any buffered writes. Called on session shutdown; the
308    /// Drop impl calls this too but on early panic it may not run.
309    pub fn flush(&self) -> std::io::Result<()> {
310        use std::io::Write as _;
311        self.state
312            .lock()
313            .expect("jsonl sink mutex poisoned")
314            .writer
315            .flush()
316    }
317
318    /// Current event index — primarily for tests and the "how many
319    /// events are in this run" run-record summary.
320    pub fn event_count(&self) -> u64 {
321        self.state.lock().expect("jsonl sink mutex poisoned").index
322    }
323
324    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
325        use std::io::Write as _;
326        if state.bytes_written < Self::ROTATE_BYTES {
327            return Ok(());
328        }
329        state.writer.flush()?;
330        state.rotation += 1;
331        let suffix = format!("-{:06}", state.rotation);
332        let rotated = self.base_path.with_file_name({
333            let stem = self
334                .base_path
335                .file_stem()
336                .and_then(|s| s.to_str())
337                .unwrap_or("event_log");
338            let ext = self
339                .base_path
340                .extension()
341                .and_then(|e| e.to_str())
342                .unwrap_or("jsonl");
343            format!("{stem}{suffix}.{ext}")
344        });
345        let file = std::fs::OpenOptions::new()
346            .create(true)
347            .truncate(true)
348            .write(true)
349            .open(&rotated)?;
350        state.writer = std::io::BufWriter::new(file);
351        state.bytes_written = 0;
352        Ok(())
353    }
354}
355
356/// Event-log-backed sink for a single session's agent event stream.
357/// Uses the generalized append-only event log when one is installed for
358/// the current VM thread and falls back to `JsonlEventSink` only for
359/// older env-driven workflows.
360pub struct EventLogSink {
361    log: Arc<AnyEventLog>,
362    topic: Topic,
363    session_id: String,
364}
365
366impl EventLogSink {
367    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
368        let session_id = session_id.into();
369        let topic = Topic::new(format!(
370            "observability.agent_events.{}",
371            crate::event_log::sanitize_topic_component(&session_id)
372        ))
373        .expect("session id should sanitize to a valid topic");
374        Arc::new(Self {
375            log,
376            topic,
377            session_id,
378        })
379    }
380}
381
382impl AgentEventSink for JsonlEventSink {
383    fn handle_event(&self, event: &AgentEvent) {
384        use std::io::Write as _;
385        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
386        let index = state.index;
387        state.index += 1;
388        let emitted_at_ms = std::time::SystemTime::now()
389            .duration_since(std::time::UNIX_EPOCH)
390            .map(|d| d.as_millis() as i64)
391            .unwrap_or(0);
392        let envelope = PersistedAgentEvent {
393            index,
394            emitted_at_ms,
395            frame_depth: None,
396            event: event.clone(),
397        };
398        if let Ok(line) = serde_json::to_string(&envelope) {
399            // One line, newline-terminated — JSON Lines spec.
400            // Errors here are swallowed on purpose; a failing write
401            // must never crash the agent loop, and the run record
402            // itself is a secondary artifact.
403            let _ = state.writer.write_all(line.as_bytes());
404            let _ = state.writer.write_all(b"\n");
405            state.bytes_written += line.len() as u64 + 1;
406            let _ = self.rotate_if_needed(&mut state);
407        }
408    }
409}
410
411impl AgentEventSink for EventLogSink {
412    fn handle_event(&self, event: &AgentEvent) {
413        let event_json = match serde_json::to_value(event) {
414            Ok(value) => value,
415            Err(_) => return,
416        };
417        let event_kind = event_json
418            .get("type")
419            .and_then(|value| value.as_str())
420            .unwrap_or("agent_event")
421            .to_string();
422        let payload = serde_json::json!({
423            "index_hint": now_ms(),
424            "session_id": self.session_id,
425            "event": event_json,
426        });
427        let mut headers = std::collections::BTreeMap::new();
428        headers.insert("session_id".to_string(), self.session_id.clone());
429        let log = self.log.clone();
430        let topic = self.topic.clone();
431        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
432        if let Ok(handle) = tokio::runtime::Handle::try_current() {
433            handle.spawn(async move {
434                let _ = log.append(&topic, record).await;
435            });
436        } else {
437            let _ = futures::executor::block_on(log.append(&topic, record));
438        }
439    }
440}
441
442impl Drop for JsonlEventSink {
443    fn drop(&mut self) {
444        if let Ok(mut state) = self.state.lock() {
445            use std::io::Write as _;
446            let _ = state.writer.flush();
447        }
448    }
449}
450
451/// Fan-out helper for composing multiple external sinks.
452pub struct MultiSink {
453    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
454}
455
456impl MultiSink {
457    pub fn new() -> Self {
458        Self {
459            sinks: Mutex::new(Vec::new()),
460        }
461    }
462    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
463        self.sinks.lock().expect("sink mutex poisoned").push(sink);
464    }
465    pub fn len(&self) -> usize {
466        self.sinks.lock().expect("sink mutex poisoned").len()
467    }
468    pub fn is_empty(&self) -> bool {
469        self.len() == 0
470    }
471}
472
473impl Default for MultiSink {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479impl AgentEventSink for MultiSink {
480    fn handle_event(&self, event: &AgentEvent) {
481        // Deliberate: snapshot then release the lock before invoking sink
482        // callbacks. Sinks can re-enter the event system (e.g. a host
483        // sink that logs to another AgentEvent path), so holding the
484        // mutex across the callback would risk self-deadlock. Arc clones
485        // are refcount bumps — cheap.
486        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
487        for sink in sinks {
488            sink.handle_event(event);
489        }
490    }
491}
492
493type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
494
495fn external_sinks() -> &'static ExternalSinkRegistry {
496    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
497    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
498}
499
500pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
501    let session_id = session_id.into();
502    let mut reg = external_sinks().write().expect("sink registry poisoned");
503    reg.entry(session_id).or_default().push(sink);
504}
505
506/// Remove all external sinks registered for `session_id`. Does NOT
507/// close the session itself — subscribers and transcript survive, so a
508/// later `agent_loop` call with the same id continues the conversation.
509pub fn clear_session_sinks(session_id: &str) {
510    external_sinks()
511        .write()
512        .expect("sink registry poisoned")
513        .remove(session_id);
514}
515
516pub fn reset_all_sinks() {
517    external_sinks()
518        .write()
519        .expect("sink registry poisoned")
520        .clear();
521    crate::agent_sessions::reset_session_store();
522}
523
524/// Emit an event to external sinks registered for this session. Pipeline
525/// closure subscribers are NOT called by this function — the agent
526/// loop owns that path because it needs its async VM context.
527pub fn emit_event(event: &AgentEvent) {
528    let sinks: Vec<Arc<dyn AgentEventSink>> = {
529        let reg = external_sinks().read().expect("sink registry poisoned");
530        reg.get(event.session_id()).cloned().unwrap_or_default()
531    };
532    for sink in sinks {
533        sink.handle_event(event);
534    }
535}
536
537fn now_ms() -> i64 {
538    std::time::SystemTime::now()
539        .duration_since(std::time::UNIX_EPOCH)
540        .map(|duration| duration.as_millis() as i64)
541        .unwrap_or(0)
542}
543
544pub fn session_external_sink_count(session_id: &str) -> usize {
545    external_sinks()
546        .read()
547        .expect("sink registry poisoned")
548        .get(session_id)
549        .map(|v| v.len())
550        .unwrap_or(0)
551}
552
553pub fn session_closure_subscriber_count(session_id: &str) -> usize {
554    crate::agent_sessions::subscriber_count(session_id)
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use std::sync::atomic::{AtomicUsize, Ordering};
561
562    struct CountingSink(Arc<AtomicUsize>);
563    impl AgentEventSink for CountingSink {
564        fn handle_event(&self, _event: &AgentEvent) {
565            self.0.fetch_add(1, Ordering::SeqCst);
566        }
567    }
568
569    #[test]
570    fn multi_sink_fans_out_in_order() {
571        let multi = MultiSink::new();
572        let a = Arc::new(AtomicUsize::new(0));
573        let b = Arc::new(AtomicUsize::new(0));
574        multi.push(Arc::new(CountingSink(a.clone())));
575        multi.push(Arc::new(CountingSink(b.clone())));
576        let event = AgentEvent::TurnStart {
577            session_id: "s1".into(),
578            iteration: 1,
579        };
580        multi.handle_event(&event);
581        assert_eq!(a.load(Ordering::SeqCst), 1);
582        assert_eq!(b.load(Ordering::SeqCst), 1);
583    }
584
585    #[test]
586    fn session_scoped_sink_routing() {
587        reset_all_sinks();
588        let a = Arc::new(AtomicUsize::new(0));
589        let b = Arc::new(AtomicUsize::new(0));
590        register_sink("session-a", Arc::new(CountingSink(a.clone())));
591        register_sink("session-b", Arc::new(CountingSink(b.clone())));
592        emit_event(&AgentEvent::TurnStart {
593            session_id: "session-a".into(),
594            iteration: 0,
595        });
596        assert_eq!(a.load(Ordering::SeqCst), 1);
597        assert_eq!(b.load(Ordering::SeqCst), 0);
598        emit_event(&AgentEvent::TurnEnd {
599            session_id: "session-b".into(),
600            iteration: 0,
601            turn_info: serde_json::json!({}),
602        });
603        assert_eq!(a.load(Ordering::SeqCst), 1);
604        assert_eq!(b.load(Ordering::SeqCst), 1);
605        clear_session_sinks("session-a");
606        assert_eq!(session_external_sink_count("session-a"), 0);
607        assert_eq!(session_external_sink_count("session-b"), 1);
608        reset_all_sinks();
609    }
610
611    #[test]
612    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
613        use std::io::{BufRead, BufReader};
614        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
615        std::fs::create_dir_all(&dir).unwrap();
616        let path = dir.join("event_log.jsonl");
617        let sink = JsonlEventSink::open(&path).unwrap();
618        for i in 0..5 {
619            sink.handle_event(&AgentEvent::TurnStart {
620                session_id: "s".into(),
621                iteration: i,
622            });
623        }
624        assert_eq!(sink.event_count(), 5);
625        sink.flush().unwrap();
626
627        // Read back + assert monotonic indices + non-decreasing timestamps.
628        let file = std::fs::File::open(&path).unwrap();
629        let mut last_idx: i64 = -1;
630        let mut last_ts: i64 = 0;
631        for line in BufReader::new(file).lines() {
632            let line = line.unwrap();
633            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
634            let idx = val["index"].as_i64().unwrap();
635            let ts = val["emitted_at_ms"].as_i64().unwrap();
636            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
637            assert!(ts >= last_ts, "timestamps must be non-decreasing");
638            last_idx = idx;
639            last_ts = ts;
640            // Event payload flattened — type tag must survive.
641            assert_eq!(val["type"], "turn_start");
642        }
643        assert_eq!(last_idx, 4);
644        let _ = std::fs::remove_file(&path);
645    }
646
647    #[test]
648    fn tool_call_status_serde() {
649        assert_eq!(
650            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
651            "\"pending\""
652        );
653        assert_eq!(
654            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
655            "\"in_progress\""
656        );
657        assert_eq!(
658            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
659            "\"completed\""
660        );
661        assert_eq!(
662            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
663            "\"failed\""
664        );
665    }
666}