Skip to main content

harn_cli/commands/run/
json_events.rs

1//! `harn run --json`: NDJSON event-stream emitter.
2//!
3//! Each line is a [`JsonEnvelope`] wrapping a [`RunEventWire`]. Wire
4//! events tag themselves with `event_type` for cheap discrimination
5//! by `jq`-style consumers and carry a strictly monotonic `seq`
6//! starting at `1`. See issue #1755 / epic #1753.
7
8use std::io::Write;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11
12use harn_vm::run_events::{RunEvent, RunEventSink};
13use serde::Serialize;
14
15use crate::json_envelope::{JsonEnvelope, JsonError};
16
17/// Schema version for the `harn run --json` event stream. Bump on any
18/// breaking change to the wire shape; agents key off this to negotiate
19/// compatibility.
20pub const RUN_JSON_SCHEMA_VERSION: u32 = 1;
21
22/// Wire form of a single event emitted by `harn run --json`. The
23/// `event_type` tag is flat so consumers can `jq '.data.event_type'`.
24/// `seq` is monotonic and process-local — the first event in a run is
25/// `seq=1`.
26#[derive(Debug, Clone, Serialize)]
27#[serde(tag = "event_type", rename_all = "snake_case")]
28pub enum RunEventWire {
29    Stdout {
30        seq: u64,
31        payload: String,
32    },
33    Stderr {
34        seq: u64,
35        payload: String,
36    },
37    Transcript {
38        seq: u64,
39        #[serde(skip_serializing_if = "Option::is_none")]
40        agent_id: Option<String>,
41        kind: String,
42        payload: serde_json::Value,
43    },
44    ToolCall {
45        seq: u64,
46        call_id: String,
47        name: String,
48        args: serde_json::Value,
49        started_at: String,
50    },
51    ToolResult {
52        seq: u64,
53        call_id: String,
54        ok: bool,
55        result: serde_json::Value,
56    },
57    Hook {
58        seq: u64,
59        name: String,
60        phase: String,
61        #[serde(skip_serializing_if = "serde_json::Value::is_null")]
62        payload: serde_json::Value,
63    },
64    PersonaStage {
65        seq: u64,
66        persona: String,
67        stage: String,
68        transition: String,
69    },
70    PackRun {
71        seq: u64,
72        bundle_hash: String,
73        signature_verified: bool,
74        #[serde(skip_serializing_if = "Option::is_none")]
75        key_id: Option<String>,
76        cache_hit: bool,
77        dry_run_verify: bool,
78    },
79    Result {
80        seq: u64,
81        value: serde_json::Value,
82        exit_code: i32,
83    },
84    Error {
85        seq: u64,
86        error: JsonError,
87    },
88}
89
90impl RunEventWire {
91    /// The monotonic sequence number assigned at emission.
92    pub fn seq(&self) -> u64 {
93        match self {
94            Self::Stdout { seq, .. }
95            | Self::Stderr { seq, .. }
96            | Self::Transcript { seq, .. }
97            | Self::ToolCall { seq, .. }
98            | Self::ToolResult { seq, .. }
99            | Self::Hook { seq, .. }
100            | Self::PersonaStage { seq, .. }
101            | Self::PackRun { seq, .. }
102            | Self::Result { seq, .. }
103            | Self::Error { seq, .. } => *seq,
104        }
105    }
106}
107
108/// Writer that drains [`RunEvent`]s, assigns monotonic seq numbers,
109/// wraps them in [`JsonEnvelope`]s, and emits one NDJSON line per
110/// event. Lines are flushed per event so streaming consumers see them
111/// as the run progresses.
112pub struct NdjsonEmitter {
113    inner: Arc<NdjsonEmitterInner>,
114}
115
116struct NdjsonEmitterInner {
117    seq: AtomicU64,
118    quiet: bool,
119    /// Output sink. Behind a Mutex so concurrent emits stay
120    /// line-atomic; serde line writes are tiny so contention is
121    /// negligible.
122    out: Mutex<Box<dyn Write + Send>>,
123}
124
125impl NdjsonEmitter {
126    /// Build an emitter that writes to `out`. `quiet` suppresses
127    /// `Stdout` and `Stderr` events (transcript/tool/hook/persona/
128    /// result events still flow).
129    pub fn new(out: Box<dyn Write + Send>, quiet: bool) -> Self {
130        Self {
131            inner: Arc::new(NdjsonEmitterInner {
132                seq: AtomicU64::new(0),
133                quiet,
134                out: Mutex::new(out),
135            }),
136        }
137    }
138
139    /// Build a thread-safe sink that forwards [`RunEvent`]s into this
140    /// emitter, applying seq numbering and `quiet` filtering.
141    pub fn sink(&self) -> Arc<dyn RunEventSink> {
142        Arc::new(NdjsonSink {
143            inner: self.inner.clone(),
144        })
145    }
146
147    /// Next monotonic seq value. The first event in a run is `seq=1`.
148    fn next_seq(&self) -> u64 {
149        self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1
150    }
151
152    fn write_envelope(inner: &NdjsonEmitterInner, event: RunEventWire) {
153        let envelope = JsonEnvelope::ok(RUN_JSON_SCHEMA_VERSION, event);
154        let line = serde_json::to_string(&envelope)
155            .unwrap_or_else(|_| r#"{"schemaVersion":1,"ok":false}"#.to_string());
156        if let Ok(mut out) = inner.out.lock() {
157            let _ = writeln!(out, "{line}");
158            let _ = out.flush();
159        }
160    }
161
162    /// Emit the terminal `Result` event for a run.
163    pub fn emit_result(&self, value: serde_json::Value, exit_code: i32) {
164        let event = RunEventWire::Result {
165            seq: self.next_seq(),
166            value,
167            exit_code,
168        };
169        Self::write_envelope(&self.inner, event);
170    }
171
172    /// Emit the terminal `Error` event for a fatal run failure (e.g.
173    /// compile error before the VM started).
174    pub fn emit_error(&self, code: impl Into<String>, message: impl Into<String>) {
175        let event = RunEventWire::Error {
176            seq: self.next_seq(),
177            error: JsonError {
178                code: code.into(),
179                message: message.into(),
180                details: serde_json::Value::Null,
181            },
182        };
183        Self::write_envelope(&self.inner, event);
184    }
185}
186
187struct NdjsonSink {
188    inner: Arc<NdjsonEmitterInner>,
189}
190
191impl RunEventSink for NdjsonSink {
192    fn emit(&self, event: RunEvent) {
193        let seq = self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1;
194        let wire = match event {
195            RunEvent::Stdout { payload } => {
196                if self.inner.quiet {
197                    // Undo the seq bump so monotonicity stays tight.
198                    self.inner.seq.fetch_sub(1, Ordering::SeqCst);
199                    return;
200                }
201                RunEventWire::Stdout { seq, payload }
202            }
203            RunEvent::Stderr { payload } => {
204                if self.inner.quiet {
205                    self.inner.seq.fetch_sub(1, Ordering::SeqCst);
206                    return;
207                }
208                RunEventWire::Stderr { seq, payload }
209            }
210            RunEvent::Transcript {
211                agent_id,
212                kind,
213                payload,
214            } => RunEventWire::Transcript {
215                seq,
216                agent_id,
217                kind,
218                payload,
219            },
220            RunEvent::ToolCall {
221                call_id,
222                name,
223                args,
224                started_at,
225            } => RunEventWire::ToolCall {
226                seq,
227                call_id,
228                name,
229                args,
230                started_at,
231            },
232            RunEvent::ToolResult {
233                call_id,
234                ok,
235                result,
236            } => RunEventWire::ToolResult {
237                seq,
238                call_id,
239                ok,
240                result,
241            },
242            RunEvent::Hook {
243                name,
244                phase,
245                payload,
246            } => RunEventWire::Hook {
247                seq,
248                name,
249                phase,
250                payload,
251            },
252            RunEvent::PersonaStage {
253                persona,
254                stage,
255                transition,
256            } => RunEventWire::PersonaStage {
257                seq,
258                persona,
259                stage,
260                transition,
261            },
262            RunEvent::PackRun {
263                bundle_hash,
264                signature_verified,
265                key_id,
266                cache_hit,
267                dry_run_verify,
268            } => RunEventWire::PackRun {
269                seq,
270                bundle_hash,
271                signature_verified,
272                key_id,
273                cache_hit,
274                dry_run_verify,
275            },
276        };
277        NdjsonEmitter::write_envelope(&self.inner, wire);
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284
285    /// `harn_vm::run_events::install_sink` writes a process-global
286    /// slot, so any test that swaps the sink must serialize against
287    /// peers.
288    static SINK_LOCK: Mutex<()> = Mutex::new(());
289
290    struct BufWriter(Arc<Mutex<Vec<u8>>>);
291    impl Write for BufWriter {
292        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
293            self.0.lock().unwrap().extend_from_slice(buf);
294            Ok(buf.len())
295        }
296        fn flush(&mut self) -> std::io::Result<()> {
297            Ok(())
298        }
299    }
300
301    #[test]
302    fn emits_monotonic_seq_across_events() {
303        let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
304        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
305        let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), false);
306        let sink = emitter.sink();
307        let prior = harn_vm::run_events::install_sink(sink);
308        harn_vm::run_events::emit(RunEvent::Stdout {
309            payload: "hello\n".into(),
310        });
311        harn_vm::run_events::emit(RunEvent::Stderr {
312            payload: "warn\n".into(),
313        });
314        harn_vm::run_events::clear_sink();
315        emitter.emit_result(serde_json::Value::Null, 0);
316        if let Some(prior) = prior {
317            harn_vm::run_events::install_sink(prior);
318        }
319
320        let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
321        let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
322        assert_eq!(lines.len(), 3, "expected 3 NDJSON lines, got:\n{raw}");
323        let seqs: Vec<u64> = lines
324            .iter()
325            .map(|line| {
326                let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
327                v["data"]["seq"].as_u64().expect("seq present")
328            })
329            .collect();
330        assert_eq!(seqs, vec![1, 2, 3]);
331        let types: Vec<String> = lines
332            .iter()
333            .map(|line| {
334                let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
335                v["data"]["event_type"].as_str().expect("type").to_string()
336            })
337            .collect();
338        assert_eq!(types, vec!["stdout", "stderr", "result"]);
339    }
340
341    #[test]
342    fn quiet_drops_stdout_and_stderr_without_gaps() {
343        let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
344        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
345        let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), true);
346        let sink = emitter.sink();
347        let prior = harn_vm::run_events::install_sink(sink);
348        harn_vm::run_events::emit(RunEvent::Stdout {
349            payload: "ignored\n".into(),
350        });
351        harn_vm::run_events::emit(RunEvent::Hook {
352            name: "PreRun".into(),
353            phase: "allow".into(),
354            payload: serde_json::Value::Null,
355        });
356        harn_vm::run_events::clear_sink();
357        emitter.emit_result(serde_json::Value::Null, 0);
358        if let Some(prior) = prior {
359            harn_vm::run_events::install_sink(prior);
360        }
361
362        let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
363        let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
364        // stdout suppressed; hook + result remain.
365        assert_eq!(lines.len(), 2, "raw:\n{raw}");
366        let seqs: Vec<u64> = lines
367            .iter()
368            .map(|line| {
369                let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
370                v["data"]["seq"].as_u64().expect("seq")
371            })
372            .collect();
373        assert_eq!(
374            seqs,
375            vec![1, 2],
376            "seq must stay contiguous after quiet filtering"
377        );
378    }
379}