harn_vm/run_events.rs
1//! Run-event sink: an in-process bus the CLI installs to capture every
2//! observable side effect of a `harn run` invocation as a single
3//! ordered stream.
4//!
5//! Concrete sinks live in `harn-cli` (`harn run --json` writes them as
6//! NDJSON). The VM only knows it should call [`emit`] from a handful of
7//! observability checkpoints; sinks fan-out from there.
8//!
9//! Variants intentionally mirror the surface area of the run command
10//! rather than the on-disk event log. Stdout/stderr writes are captured
11//! here because they never enter the event log; transcript / persona /
12//! hook / tool events are forwarded here in addition to their existing
13//! persistent topics so a single subscriber can see the whole run
14//! without joining across topics.
15
16use std::sync::{Arc, OnceLock, RwLock};
17
18use serde::Serialize;
19
20/// One observable event from a running pipeline. Variants are
21/// `#[serde(tag = "event_type")]` so wire consumers (notably the CLI
22/// `--json` NDJSON stream) can discriminate without inspecting the
23/// payload shape.
24#[derive(Clone, Debug, Serialize)]
25#[serde(tag = "event_type", rename_all = "snake_case")]
26pub enum RunEvent {
27 /// Bytes written to stdout (raw, including any trailing newlines).
28 Stdout { payload: String },
29 /// Bytes written to stderr (raw, including any trailing newlines).
30 Stderr { payload: String },
31 /// One append on a transcript stream (`agent.transcript.llm` topic).
32 /// `kind` mirrors the transcript entry's `type` field.
33 Transcript {
34 #[serde(skip_serializing_if = "Option::is_none")]
35 agent_id: Option<String>,
36 kind: String,
37 payload: serde_json::Value,
38 },
39 /// A model-issued tool call. `call_id` matches the transcript
40 /// `call_id`; agents reconcile [`Self::ToolCall`] /
41 /// [`Self::ToolResult`] pairs by it.
42 ToolCall {
43 call_id: String,
44 name: String,
45 args: serde_json::Value,
46 /// RFC 3339 timestamp captured at emission.
47 started_at: String,
48 },
49 /// Outcome of a tool call.
50 ToolResult {
51 call_id: String,
52 ok: bool,
53 result: serde_json::Value,
54 },
55 /// A workflow hook fired during the run.
56 Hook {
57 name: String,
58 phase: String,
59 #[serde(skip_serializing_if = "serde_json::Value::is_null")]
60 payload: serde_json::Value,
61 },
62 /// A persona-stage transition. Mirrors the `persona.runtime.events`
63 /// topic; the `transition` field captures the stage state change
64 /// (`"started"`, `"completed"`, `"handoff"`, ...).
65 PersonaStage {
66 persona: String,
67 stage: String,
68 transition: String,
69 },
70 /// `harn run <bundle.harnpack>` resolved a pack to execute. Carries
71 /// the verified bundle hash, whether the embedded Ed25519 signature
72 /// verified end-to-end, the signing key fingerprint (when signed),
73 /// and whether the unpacked archive came from the content-addressed
74 /// cache or was extracted fresh on this run.
75 PackRun {
76 bundle_hash: String,
77 signature_verified: bool,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 key_id: Option<String>,
80 cache_hit: bool,
81 dry_run_verify: bool,
82 },
83}
84
85/// Receiver of [`RunEvent`]s. Implementations must be cheap (the VM
86/// calls [`emit`] on hot paths like every `println`).
87pub trait RunEventSink: Send + Sync {
88 fn emit(&self, event: RunEvent);
89}
90
91fn active_slot() -> &'static RwLock<Option<Arc<dyn RunEventSink>>> {
92 static SLOT: OnceLock<RwLock<Option<Arc<dyn RunEventSink>>>> = OnceLock::new();
93 SLOT.get_or_init(|| RwLock::new(None))
94}
95
96/// Install `sink` as the process-wide run-event sink. Returns the
97/// previous sink (if any) so callers can chain. Installs are
98/// exclusive — there is one active sink at a time — to keep ordering
99/// trivially serial.
100pub fn install_sink(sink: Arc<dyn RunEventSink>) -> Option<Arc<dyn RunEventSink>> {
101 let mut guard = active_slot()
102 .write()
103 .unwrap_or_else(|poisoned| poisoned.into_inner());
104 guard.replace(sink)
105}
106
107/// Remove the active sink. No-op when none is installed.
108pub fn clear_sink() {
109 let mut guard = active_slot()
110 .write()
111 .unwrap_or_else(|poisoned| poisoned.into_inner());
112 *guard = None;
113}
114
115/// Whether a sink is currently installed. Useful as a fast-path gate
116/// for callers that would otherwise build a payload speculatively.
117pub fn sink_active() -> bool {
118 active_slot()
119 .read()
120 .map(|guard| guard.is_some())
121 .unwrap_or(false)
122}
123
124/// Emit `event` to the active sink. No-op when no sink is installed,
125/// so it is safe to call from every hook point unconditionally.
126pub fn emit(event: RunEvent) {
127 let guard = match active_slot().read() {
128 Ok(guard) => guard,
129 Err(poisoned) => poisoned.into_inner(),
130 };
131 if let Some(sink) = guard.as_ref() {
132 sink.emit(event);
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use std::sync::Mutex;
140
141 /// The sink slot is process-global; tests that install/clear must
142 /// run serially even when nextest fans them out across threads.
143 static TEST_LOCK: Mutex<()> = Mutex::new(());
144
145 struct CapturingSink {
146 events: Mutex<Vec<RunEvent>>,
147 }
148
149 impl RunEventSink for CapturingSink {
150 fn emit(&self, event: RunEvent) {
151 self.events.lock().unwrap().push(event);
152 }
153 }
154
155 #[test]
156 fn install_emit_clear_round_trip() {
157 let _guard = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
158 let sink = Arc::new(CapturingSink {
159 events: Mutex::new(Vec::new()),
160 });
161 let prior = install_sink(sink.clone());
162 assert!(sink_active());
163 emit(RunEvent::Stdout {
164 payload: "hi\n".into(),
165 });
166 clear_sink();
167 assert!(!sink_active());
168 emit(RunEvent::Stdout {
169 payload: "after-clear".into(),
170 });
171
172 let captured = sink.events.lock().unwrap();
173 assert_eq!(captured.len(), 1, "events after clear must be dropped");
174 match &captured[0] {
175 RunEvent::Stdout { payload } => assert_eq!(payload, "hi\n"),
176 other => panic!("unexpected event {other:?}"),
177 }
178
179 if let Some(prior) = prior {
180 install_sink(prior);
181 }
182 }
183}