Skip to main content

harn_vm/
run_events.rs

1//! Run-event sink: an in-process bus the CLI installs to capture every
2//! observable side effect of a `harn run` invocation as a single
3//! ordered stream.
4//!
5//! Concrete sinks live in `harn-cli` (`harn run --json` writes them as
6//! NDJSON). The VM only knows it should call [`emit`] from a handful of
7//! observability checkpoints; sinks fan-out from there.
8//!
9//! Variants intentionally mirror the surface area of the run command
10//! rather than the on-disk event log. Stdout/stderr writes are captured
11//! here because they never enter the event log; transcript / persona /
12//! hook / tool events are forwarded here in addition to their existing
13//! persistent topics so a single subscriber can see the whole run
14//! without joining across topics.
15
16use std::sync::{Arc, OnceLock, RwLock};
17
18use serde::Serialize;
19
20/// One observable event from a running pipeline. Variants are
21/// `#[serde(tag = "event_type")]` so wire consumers (notably the CLI
22/// `--json` NDJSON stream) can discriminate without inspecting the
23/// payload shape.
24#[derive(Clone, Debug, Serialize)]
25#[serde(tag = "event_type", rename_all = "snake_case")]
26pub enum RunEvent {
27    /// Bytes written to stdout (raw, including any trailing newlines).
28    Stdout { payload: String },
29    /// Bytes written to stderr (raw, including any trailing newlines).
30    Stderr { payload: String },
31    /// One append on a transcript stream (`agent.transcript.llm` topic).
32    /// `kind` mirrors the transcript entry's `type` field.
33    Transcript {
34        #[serde(skip_serializing_if = "Option::is_none")]
35        agent_id: Option<String>,
36        kind: String,
37        payload: serde_json::Value,
38    },
39    /// A model-issued tool call. `call_id` matches the transcript
40    /// `call_id`; agents reconcile [`Self::ToolCall`] /
41    /// [`Self::ToolResult`] pairs by it.
42    ToolCall {
43        call_id: String,
44        name: String,
45        args: serde_json::Value,
46        /// RFC 3339 timestamp captured at emission.
47        started_at: String,
48    },
49    /// Outcome of a tool call.
50    ToolResult {
51        call_id: String,
52        ok: bool,
53        result: serde_json::Value,
54    },
55    /// A workflow hook fired during the run.
56    Hook {
57        name: String,
58        phase: String,
59        #[serde(skip_serializing_if = "serde_json::Value::is_null")]
60        payload: serde_json::Value,
61    },
62    /// A persona-stage transition. Mirrors the `persona.runtime.events`
63    /// topic; the `transition` field captures the stage state change
64    /// (`"started"`, `"completed"`, `"handoff"`, ...).
65    PersonaStage {
66        persona: String,
67        stage: String,
68        transition: String,
69    },
70    /// `harn run <bundle.harnpack>` resolved a pack to execute. Carries
71    /// the verified bundle hash, whether the embedded Ed25519 signature
72    /// verified end-to-end, the signing key fingerprint (when signed),
73    /// and whether the unpacked archive came from the content-addressed
74    /// cache or was extracted fresh on this run.
75    PackRun {
76        bundle_hash: String,
77        signature_verified: bool,
78        #[serde(skip_serializing_if = "Option::is_none")]
79        key_id: Option<String>,
80        cache_hit: bool,
81        dry_run_verify: bool,
82    },
83}
84
85/// Receiver of [`RunEvent`]s. Implementations must be cheap (the VM
86/// calls [`emit`] on hot paths like every `println`).
87pub trait RunEventSink: Send + Sync {
88    fn emit(&self, event: RunEvent);
89}
90
91fn active_slot() -> &'static RwLock<Option<Arc<dyn RunEventSink>>> {
92    static SLOT: OnceLock<RwLock<Option<Arc<dyn RunEventSink>>>> = OnceLock::new();
93    SLOT.get_or_init(|| RwLock::new(None))
94}
95
96/// Install `sink` as the process-wide run-event sink. Returns the
97/// previous sink (if any) so callers can chain. Installs are
98/// exclusive — there is one active sink at a time — to keep ordering
99/// trivially serial.
100pub fn install_sink(sink: Arc<dyn RunEventSink>) -> Option<Arc<dyn RunEventSink>> {
101    let mut guard = active_slot()
102        .write()
103        .unwrap_or_else(|poisoned| poisoned.into_inner());
104    guard.replace(sink)
105}
106
107/// Remove the active sink. No-op when none is installed.
108pub fn clear_sink() {
109    let mut guard = active_slot()
110        .write()
111        .unwrap_or_else(|poisoned| poisoned.into_inner());
112    *guard = None;
113}
114
115/// Whether a sink is currently installed. Useful as a fast-path gate
116/// for callers that would otherwise build a payload speculatively.
117pub fn sink_active() -> bool {
118    active_slot()
119        .read()
120        .map(|guard| guard.is_some())
121        .unwrap_or(false)
122}
123
124/// Emit `event` to the active sink. No-op when no sink is installed,
125/// so it is safe to call from every hook point unconditionally.
126pub fn emit(event: RunEvent) {
127    let guard = match active_slot().read() {
128        Ok(guard) => guard,
129        Err(poisoned) => poisoned.into_inner(),
130    };
131    if let Some(sink) = guard.as_ref() {
132        sink.emit(event);
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use std::sync::Mutex;
140
141    /// The sink slot is process-global; tests that install/clear must
142    /// run serially even when nextest fans them out across threads.
143    static TEST_LOCK: Mutex<()> = Mutex::new(());
144
145    struct CapturingSink {
146        events: Mutex<Vec<RunEvent>>,
147    }
148
149    impl RunEventSink for CapturingSink {
150        fn emit(&self, event: RunEvent) {
151            self.events.lock().unwrap().push(event);
152        }
153    }
154
155    #[test]
156    fn install_emit_clear_round_trip() {
157        let _guard = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
158        let sink = Arc::new(CapturingSink {
159            events: Mutex::new(Vec::new()),
160        });
161        let prior = install_sink(sink.clone());
162        assert!(sink_active());
163        emit(RunEvent::Stdout {
164            payload: "hi\n".into(),
165        });
166        clear_sink();
167        assert!(!sink_active());
168        emit(RunEvent::Stdout {
169            payload: "after-clear".into(),
170        });
171
172        let captured = sink.events.lock().unwrap();
173        assert_eq!(captured.len(), 1, "events after clear must be dropped");
174        match &captured[0] {
175            RunEvent::Stdout { payload } => assert_eq!(payload, "hi\n"),
176            other => panic!("unexpected event {other:?}"),
177        }
178
179        if let Some(prior) = prior {
180            install_sink(prior);
181        }
182    }
183}