Skip to main content

harn_cli/commands/run/
json_events.rs

1//! `harn run --json`: NDJSON event-stream emitter.
2//!
3//! Each line is a [`JsonEnvelope`] wrapping a [`RunEventWire`]. Wire
4//! events tag themselves with `event_type` for cheap discrimination
5//! by `jq`-style consumers and carry a strictly monotonic `seq`
6//! starting at `1`. See issue #1755 / epic #1753.
7
8use std::io::Write;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11
12use harn_vm::run_events::{RunEvent, RunEventSink};
13use serde::Serialize;
14
15use crate::json_envelope::{JsonEnvelope, JsonError};
16
17/// Schema version for the `harn run --json` event stream. Bump on any
18/// breaking change to the wire shape; agents key off this to negotiate
19/// compatibility.
20pub const RUN_JSON_SCHEMA_VERSION: u32 = 1;
21
22/// Wire form of a single event emitted by `harn run --json`. The
23/// `event_type` tag is flat so consumers can `jq '.data.event_type'`.
24/// `seq` is monotonic and process-local — the first event in a run is
25/// `seq=1`.
26#[derive(Debug, Clone, Serialize)]
27#[serde(tag = "event_type", rename_all = "snake_case")]
28pub enum RunEventWire {
29    Stdout {
30        seq: u64,
31        payload: String,
32    },
33    Stderr {
34        seq: u64,
35        payload: String,
36    },
37    Transcript {
38        seq: u64,
39        #[serde(skip_serializing_if = "Option::is_none")]
40        agent_id: Option<String>,
41        kind: String,
42        payload: serde_json::Value,
43    },
44    ToolCall {
45        seq: u64,
46        call_id: String,
47        name: String,
48        args: serde_json::Value,
49        started_at: String,
50    },
51    ToolResult {
52        seq: u64,
53        call_id: String,
54        ok: bool,
55        result: serde_json::Value,
56    },
57    Hook {
58        seq: u64,
59        name: String,
60        phase: String,
61        #[serde(skip_serializing_if = "serde_json::Value::is_null")]
62        payload: serde_json::Value,
63    },
64    PersonaStage {
65        seq: u64,
66        persona: String,
67        stage: String,
68        transition: String,
69    },
70    Result {
71        seq: u64,
72        value: serde_json::Value,
73        exit_code: i32,
74    },
75    Error {
76        seq: u64,
77        error: JsonError,
78    },
79}
80
81impl RunEventWire {
82    /// The monotonic sequence number assigned at emission.
83    pub fn seq(&self) -> u64 {
84        match self {
85            Self::Stdout { seq, .. }
86            | Self::Stderr { seq, .. }
87            | Self::Transcript { seq, .. }
88            | Self::ToolCall { seq, .. }
89            | Self::ToolResult { seq, .. }
90            | Self::Hook { seq, .. }
91            | Self::PersonaStage { seq, .. }
92            | Self::Result { seq, .. }
93            | Self::Error { seq, .. } => *seq,
94        }
95    }
96}
97
98/// Writer that drains [`RunEvent`]s, assigns monotonic seq numbers,
99/// wraps them in [`JsonEnvelope`]s, and emits one NDJSON line per
100/// event. Lines are flushed per event so streaming consumers see them
101/// as the run progresses.
102pub struct NdjsonEmitter {
103    inner: Arc<NdjsonEmitterInner>,
104}
105
106struct NdjsonEmitterInner {
107    seq: AtomicU64,
108    quiet: bool,
109    /// Output sink. Behind a Mutex so concurrent emits stay
110    /// line-atomic; serde line writes are tiny so contention is
111    /// negligible.
112    out: Mutex<Box<dyn Write + Send>>,
113}
114
115impl NdjsonEmitter {
116    /// Build an emitter that writes to `out`. `quiet` suppresses
117    /// `Stdout` and `Stderr` events (transcript/tool/hook/persona/
118    /// result events still flow).
119    pub fn new(out: Box<dyn Write + Send>, quiet: bool) -> Self {
120        Self {
121            inner: Arc::new(NdjsonEmitterInner {
122                seq: AtomicU64::new(0),
123                quiet,
124                out: Mutex::new(out),
125            }),
126        }
127    }
128
129    /// Build a thread-safe sink that forwards [`RunEvent`]s into this
130    /// emitter, applying seq numbering and `quiet` filtering.
131    pub fn sink(&self) -> Arc<dyn RunEventSink> {
132        Arc::new(NdjsonSink {
133            inner: self.inner.clone(),
134        })
135    }
136
137    /// Next monotonic seq value. The first event in a run is `seq=1`.
138    fn next_seq(&self) -> u64 {
139        self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1
140    }
141
142    fn write_envelope(inner: &NdjsonEmitterInner, event: RunEventWire) {
143        let envelope = JsonEnvelope::ok(RUN_JSON_SCHEMA_VERSION, event);
144        let line = serde_json::to_string(&envelope)
145            .unwrap_or_else(|_| r#"{"schemaVersion":1,"ok":false}"#.to_string());
146        if let Ok(mut out) = inner.out.lock() {
147            let _ = writeln!(out, "{line}");
148            let _ = out.flush();
149        }
150    }
151
152    /// Emit the terminal `Result` event for a run.
153    pub fn emit_result(&self, value: serde_json::Value, exit_code: i32) {
154        let event = RunEventWire::Result {
155            seq: self.next_seq(),
156            value,
157            exit_code,
158        };
159        Self::write_envelope(&self.inner, event);
160    }
161
162    /// Emit the terminal `Error` event for a fatal run failure (e.g.
163    /// compile error before the VM started).
164    pub fn emit_error(&self, code: impl Into<String>, message: impl Into<String>) {
165        let event = RunEventWire::Error {
166            seq: self.next_seq(),
167            error: JsonError {
168                code: code.into(),
169                message: message.into(),
170                details: serde_json::Value::Null,
171            },
172        };
173        Self::write_envelope(&self.inner, event);
174    }
175}
176
177struct NdjsonSink {
178    inner: Arc<NdjsonEmitterInner>,
179}
180
181impl RunEventSink for NdjsonSink {
182    fn emit(&self, event: RunEvent) {
183        let seq = self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1;
184        let wire = match event {
185            RunEvent::Stdout { payload } => {
186                if self.inner.quiet {
187                    // Undo the seq bump so monotonicity stays tight.
188                    self.inner.seq.fetch_sub(1, Ordering::SeqCst);
189                    return;
190                }
191                RunEventWire::Stdout { seq, payload }
192            }
193            RunEvent::Stderr { payload } => {
194                if self.inner.quiet {
195                    self.inner.seq.fetch_sub(1, Ordering::SeqCst);
196                    return;
197                }
198                RunEventWire::Stderr { seq, payload }
199            }
200            RunEvent::Transcript {
201                agent_id,
202                kind,
203                payload,
204            } => RunEventWire::Transcript {
205                seq,
206                agent_id,
207                kind,
208                payload,
209            },
210            RunEvent::ToolCall {
211                call_id,
212                name,
213                args,
214                started_at,
215            } => RunEventWire::ToolCall {
216                seq,
217                call_id,
218                name,
219                args,
220                started_at,
221            },
222            RunEvent::ToolResult {
223                call_id,
224                ok,
225                result,
226            } => RunEventWire::ToolResult {
227                seq,
228                call_id,
229                ok,
230                result,
231            },
232            RunEvent::Hook {
233                name,
234                phase,
235                payload,
236            } => RunEventWire::Hook {
237                seq,
238                name,
239                phase,
240                payload,
241            },
242            RunEvent::PersonaStage {
243                persona,
244                stage,
245                transition,
246            } => RunEventWire::PersonaStage {
247                seq,
248                persona,
249                stage,
250                transition,
251            },
252        };
253        NdjsonEmitter::write_envelope(&self.inner, wire);
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    /// `harn_vm::run_events::install_sink` writes a process-global
262    /// slot, so any test that swaps the sink must serialize against
263    /// peers.
264    static SINK_LOCK: Mutex<()> = Mutex::new(());
265
266    struct BufWriter(Arc<Mutex<Vec<u8>>>);
267    impl Write for BufWriter {
268        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
269            self.0.lock().unwrap().extend_from_slice(buf);
270            Ok(buf.len())
271        }
272        fn flush(&mut self) -> std::io::Result<()> {
273            Ok(())
274        }
275    }
276
277    #[test]
278    fn emits_monotonic_seq_across_events() {
279        let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
280        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
281        let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), false);
282        let sink = emitter.sink();
283        let prior = harn_vm::run_events::install_sink(sink);
284        harn_vm::run_events::emit(RunEvent::Stdout {
285            payload: "hello\n".into(),
286        });
287        harn_vm::run_events::emit(RunEvent::Stderr {
288            payload: "warn\n".into(),
289        });
290        harn_vm::run_events::clear_sink();
291        emitter.emit_result(serde_json::Value::Null, 0);
292        if let Some(prior) = prior {
293            harn_vm::run_events::install_sink(prior);
294        }
295
296        let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
297        let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
298        assert_eq!(lines.len(), 3, "expected 3 NDJSON lines, got:\n{raw}");
299        let seqs: Vec<u64> = lines
300            .iter()
301            .map(|line| {
302                let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
303                v["data"]["seq"].as_u64().expect("seq present")
304            })
305            .collect();
306        assert_eq!(seqs, vec![1, 2, 3]);
307        let types: Vec<String> = lines
308            .iter()
309            .map(|line| {
310                let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
311                v["data"]["event_type"].as_str().expect("type").to_string()
312            })
313            .collect();
314        assert_eq!(types, vec!["stdout", "stderr", "result"]);
315    }
316
317    #[test]
318    fn quiet_drops_stdout_and_stderr_without_gaps() {
319        let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
320        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
321        let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), true);
322        let sink = emitter.sink();
323        let prior = harn_vm::run_events::install_sink(sink);
324        harn_vm::run_events::emit(RunEvent::Stdout {
325            payload: "ignored\n".into(),
326        });
327        harn_vm::run_events::emit(RunEvent::Hook {
328            name: "PreRun".into(),
329            phase: "allow".into(),
330            payload: serde_json::Value::Null,
331        });
332        harn_vm::run_events::clear_sink();
333        emitter.emit_result(serde_json::Value::Null, 0);
334        if let Some(prior) = prior {
335            harn_vm::run_events::install_sink(prior);
336        }
337
338        let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
339        let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
340        // stdout suppressed; hook + result remain.
341        assert_eq!(lines.len(), 2, "raw:\n{raw}");
342        let seqs: Vec<u64> = lines
343            .iter()
344            .map(|line| {
345                let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
346                v["data"]["seq"].as_u64().expect("seq")
347            })
348            .collect();
349        assert_eq!(
350            seqs,
351            vec![1, 2],
352            "seq must stay contiguous after quiet filtering"
353        );
354    }
355}