harn_vm/run_events.rs
1//! Run-event sink: an in-process bus the CLI installs to capture every
2//! observable side effect of a `harn run` invocation as a single
3//! ordered stream.
4//!
5//! Concrete sinks live in `harn-cli` (`harn run --json` writes them as
6//! NDJSON). The VM only knows it should call [`emit`] from a handful of
7//! observability checkpoints; sinks fan-out from there.
8//!
9//! Variants intentionally mirror the surface area of the run command
10//! rather than the on-disk event log. Stdout/stderr writes are captured
11//! here because they never enter the event log; transcript / persona /
12//! hook / tool events are forwarded here in addition to their existing
13//! persistent topics so a single subscriber can see the whole run
14//! without joining across topics.
15
16use std::sync::{Arc, OnceLock, RwLock};
17
18use serde::Serialize;
19
20/// One observable event from a running pipeline. Variants are
21/// `#[serde(tag = "event_type")]` so wire consumers (notably the CLI
22/// `--json` NDJSON stream) can discriminate without inspecting the
23/// payload shape.
24#[derive(Clone, Debug, Serialize)]
25#[serde(tag = "event_type", rename_all = "snake_case")]
26pub enum RunEvent {
27 /// Bytes written to stdout (raw, including any trailing newlines).
28 Stdout { payload: String },
29 /// Bytes written to stderr (raw, including any trailing newlines).
30 Stderr { payload: String },
31 /// One append on a transcript stream (`agent.transcript.llm` topic).
32 /// `kind` mirrors the transcript entry's `type` field.
33 Transcript {
34 #[serde(skip_serializing_if = "Option::is_none")]
35 agent_id: Option<String>,
36 kind: String,
37 payload: serde_json::Value,
38 },
39 /// A model-issued tool call. `call_id` matches the transcript
40 /// `call_id`; agents reconcile [`Self::ToolCall`] /
41 /// [`Self::ToolResult`] pairs by it.
42 ToolCall {
43 call_id: String,
44 name: String,
45 args: serde_json::Value,
46 /// RFC 3339 timestamp captured at emission.
47 started_at: String,
48 },
49 /// Outcome of a tool call.
50 ToolResult {
51 call_id: String,
52 ok: bool,
53 result: serde_json::Value,
54 },
55 /// A workflow hook fired during the run.
56 Hook {
57 name: String,
58 phase: String,
59 #[serde(skip_serializing_if = "serde_json::Value::is_null")]
60 payload: serde_json::Value,
61 },
62 /// A persona-stage transition. Mirrors the `persona.runtime.events`
63 /// topic; the `transition` field captures the stage state change
64 /// (`"started"`, `"completed"`, `"handoff"`, ...).
65 PersonaStage {
66 persona: String,
67 stage: String,
68 transition: String,
69 },
70 /// `harn run <bundle.harnpack>` resolved a pack to execute. Carries
71 /// the verified bundle hash, whether the embedded Ed25519 signature
72 /// verified end-to-end, the signing key fingerprint (when signed),
73 /// and whether the unpacked archive came from the content-addressed
74 /// cache or was extracted fresh on this run.
75 PackRun {
76 bundle_hash: String,
77 signature_verified: bool,
78 #[serde(skip_serializing_if = "Option::is_none")]
79 key_id: Option<String>,
80 cache_hit: bool,
81 dry_run_verify: bool,
82 },
83}
84
85/// Receiver of [`RunEvent`]s. Implementations must be cheap (the VM
86/// calls [`emit`] on hot paths like every `println`).
87pub trait RunEventSink: Send + Sync {
88 fn emit(&self, event: RunEvent);
89}
90
91fn active_slot() -> &'static RwLock<Option<Arc<dyn RunEventSink>>> {
92 static SLOT: OnceLock<RwLock<Option<Arc<dyn RunEventSink>>>> = OnceLock::new();
93 SLOT.get_or_init(|| RwLock::new(None))
94}
95
96/// Install `sink` as the process-wide run-event sink. Returns the
97/// previous sink (if any) so callers can chain. Installs are
98/// exclusive — there is one active sink at a time — to keep ordering
99/// trivially serial.
100pub fn install_sink(sink: Arc<dyn RunEventSink>) -> Option<Arc<dyn RunEventSink>> {
101 let mut guard = active_slot()
102 .write()
103 .unwrap_or_else(|poisoned| poisoned.into_inner());
104 guard.replace(sink)
105}
106
107/// Remove the active sink. No-op when none is installed.
108pub fn clear_sink() {
109 let mut guard = active_slot()
110 .write()
111 .unwrap_or_else(|poisoned| poisoned.into_inner());
112 *guard = None;
113}
114
115/// Whether a sink is currently installed. Useful as a fast-path gate
116/// for callers that would otherwise build a payload speculatively.
117pub fn sink_active() -> bool {
118 active_slot()
119 .read()
120 .map(|guard| guard.is_some())
121 .unwrap_or(false)
122}
123
124/// Emit `event` to the active sink. No-op when no sink is installed,
125/// so it is safe to call from every hook point unconditionally.
126pub fn emit(event: RunEvent) {
127 let guard = match active_slot().read() {
128 Ok(guard) => guard,
129 Err(poisoned) => poisoned.into_inner(),
130 };
131 if let Some(sink) = guard.as_ref() {
132 sink.emit(redact_run_event(event));
133 }
134}
135
136/// Scrub the secret-bearing JSON field of a `RunEvent` once, centrally, before
137/// it reaches any sink — so emitters can't forget (the `Hook` payload did, while
138/// the transcript/tool variants were only clean because `agent_observe`
139/// pre-scrubbed them). A second idempotent pass over a pre-scrubbed payload is
140/// cheap and makes the bus correct-by-construction for every future variant.
141/// `Stdout`/`Stderr` are the program's own raw output and pass through
142/// unredacted; they also take the fast path (no policy lookup) since `emit` runs
143/// on every `println`.
144fn redact_run_event(mut event: RunEvent) -> RunEvent {
145 let payload = match &mut event {
146 RunEvent::Transcript { payload, .. } => payload,
147 RunEvent::ToolCall { args, .. } => args,
148 RunEvent::ToolResult { result, .. } => result,
149 RunEvent::Hook { payload, .. } => payload,
150 RunEvent::Stdout { .. }
151 | RunEvent::Stderr { .. }
152 | RunEvent::PersonaStage { .. }
153 | RunEvent::PackRun { .. } => return event,
154 };
155 crate::redact::current_policy().redact_json_in_place(payload);
156 event
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use std::sync::Mutex;
163
164 /// The sink slot is process-global; tests that install/clear must
165 /// run serially even when nextest fans them out across threads.
166 static TEST_LOCK: Mutex<()> = Mutex::new(());
167
168 struct CapturingSink {
169 events: Mutex<Vec<RunEvent>>,
170 }
171
172 impl RunEventSink for CapturingSink {
173 fn emit(&self, event: RunEvent) {
174 self.events.lock().unwrap().push(event);
175 }
176 }
177
178 #[test]
179 fn install_emit_clear_round_trip() {
180 let _guard = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
181 let sink = Arc::new(CapturingSink {
182 events: Mutex::new(Vec::new()),
183 });
184 let prior = install_sink(sink.clone());
185 assert!(sink_active());
186 emit(RunEvent::Stdout {
187 payload: "hi\n".into(),
188 });
189 clear_sink();
190 assert!(!sink_active());
191 emit(RunEvent::Stdout {
192 payload: "after-clear".into(),
193 });
194
195 let captured = sink.events.lock().unwrap();
196 assert_eq!(captured.len(), 1, "events after clear must be dropped");
197 match &captured[0] {
198 RunEvent::Stdout { payload } => assert_eq!(payload, "hi\n"),
199 other => panic!("unexpected event {other:?}"),
200 }
201
202 if let Some(prior) = prior {
203 install_sink(prior);
204 }
205 }
206}