1use std::io::Write;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11
12use harn_vm::run_events::{RunEvent, RunEventSink};
13use serde::Serialize;
14
15use crate::json_envelope::{JsonEnvelope, JsonError};
16
17pub const RUN_JSON_SCHEMA_VERSION: u32 = 1;
21
22#[derive(Debug, Clone, Serialize)]
27#[serde(tag = "event_type", rename_all = "snake_case")]
28pub enum RunEventWire {
29 Stdout {
30 seq: u64,
31 payload: String,
32 },
33 Stderr {
34 seq: u64,
35 payload: String,
36 },
37 Transcript {
38 seq: u64,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 agent_id: Option<String>,
41 kind: String,
42 payload: serde_json::Value,
43 },
44 ToolCall {
45 seq: u64,
46 call_id: String,
47 name: String,
48 args: serde_json::Value,
49 started_at: String,
50 },
51 ToolResult {
52 seq: u64,
53 call_id: String,
54 ok: bool,
55 result: serde_json::Value,
56 },
57 Hook {
58 seq: u64,
59 name: String,
60 phase: String,
61 #[serde(skip_serializing_if = "serde_json::Value::is_null")]
62 payload: serde_json::Value,
63 },
64 PersonaStage {
65 seq: u64,
66 persona: String,
67 stage: String,
68 transition: String,
69 },
70 PackRun {
71 seq: u64,
72 bundle_hash: String,
73 signature_verified: bool,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 key_id: Option<String>,
76 cache_hit: bool,
77 dry_run_verify: bool,
78 },
79 Result {
80 seq: u64,
81 value: serde_json::Value,
82 exit_code: i32,
83 },
84 Error {
85 seq: u64,
86 error: JsonError,
87 },
88}
89
90impl RunEventWire {
91 pub fn seq(&self) -> u64 {
93 match self {
94 Self::Stdout { seq, .. }
95 | Self::Stderr { seq, .. }
96 | Self::Transcript { seq, .. }
97 | Self::ToolCall { seq, .. }
98 | Self::ToolResult { seq, .. }
99 | Self::Hook { seq, .. }
100 | Self::PersonaStage { seq, .. }
101 | Self::PackRun { seq, .. }
102 | Self::Result { seq, .. }
103 | Self::Error { seq, .. } => *seq,
104 }
105 }
106}
107
108pub struct NdjsonEmitter {
113 inner: Arc<NdjsonEmitterInner>,
114}
115
116struct NdjsonEmitterInner {
117 seq: AtomicU64,
118 quiet: bool,
119 out: Mutex<Box<dyn Write + Send>>,
123}
124
125impl NdjsonEmitter {
126 pub fn new(out: Box<dyn Write + Send>, quiet: bool) -> Self {
130 Self {
131 inner: Arc::new(NdjsonEmitterInner {
132 seq: AtomicU64::new(0),
133 quiet,
134 out: Mutex::new(out),
135 }),
136 }
137 }
138
139 pub fn sink(&self) -> Arc<dyn RunEventSink> {
142 Arc::new(NdjsonSink {
143 inner: self.inner.clone(),
144 })
145 }
146
147 fn next_seq(&self) -> u64 {
149 self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1
150 }
151
152 fn write_envelope(inner: &NdjsonEmitterInner, event: RunEventWire) {
153 let envelope = JsonEnvelope::ok(RUN_JSON_SCHEMA_VERSION, event);
154 let line = serde_json::to_string(&envelope)
155 .unwrap_or_else(|_| r#"{"schemaVersion":1,"ok":false}"#.to_string());
156 if let Ok(mut out) = inner.out.lock() {
157 let _ = writeln!(out, "{line}");
158 let _ = out.flush();
159 }
160 }
161
162 pub fn emit_result(&self, value: serde_json::Value, exit_code: i32) {
164 let event = RunEventWire::Result {
165 seq: self.next_seq(),
166 value,
167 exit_code,
168 };
169 Self::write_envelope(&self.inner, event);
170 }
171
172 pub fn emit_error(&self, code: impl Into<String>, message: impl Into<String>) {
175 let event = RunEventWire::Error {
176 seq: self.next_seq(),
177 error: JsonError {
178 code: code.into(),
179 message: message.into(),
180 details: serde_json::Value::Null,
181 },
182 };
183 Self::write_envelope(&self.inner, event);
184 }
185}
186
187struct NdjsonSink {
188 inner: Arc<NdjsonEmitterInner>,
189}
190
191impl RunEventSink for NdjsonSink {
192 fn emit(&self, event: RunEvent) {
193 let seq = self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1;
194 let wire = match event {
195 RunEvent::Stdout { payload } => {
196 if self.inner.quiet {
197 self.inner.seq.fetch_sub(1, Ordering::SeqCst);
199 return;
200 }
201 RunEventWire::Stdout { seq, payload }
202 }
203 RunEvent::Stderr { payload } => {
204 if self.inner.quiet {
205 self.inner.seq.fetch_sub(1, Ordering::SeqCst);
206 return;
207 }
208 RunEventWire::Stderr { seq, payload }
209 }
210 RunEvent::Transcript {
211 agent_id,
212 kind,
213 payload,
214 } => RunEventWire::Transcript {
215 seq,
216 agent_id,
217 kind,
218 payload,
219 },
220 RunEvent::ToolCall {
221 call_id,
222 name,
223 args,
224 started_at,
225 } => RunEventWire::ToolCall {
226 seq,
227 call_id,
228 name,
229 args,
230 started_at,
231 },
232 RunEvent::ToolResult {
233 call_id,
234 ok,
235 result,
236 } => RunEventWire::ToolResult {
237 seq,
238 call_id,
239 ok,
240 result,
241 },
242 RunEvent::Hook {
243 name,
244 phase,
245 payload,
246 } => RunEventWire::Hook {
247 seq,
248 name,
249 phase,
250 payload,
251 },
252 RunEvent::PersonaStage {
253 persona,
254 stage,
255 transition,
256 } => RunEventWire::PersonaStage {
257 seq,
258 persona,
259 stage,
260 transition,
261 },
262 RunEvent::PackRun {
263 bundle_hash,
264 signature_verified,
265 key_id,
266 cache_hit,
267 dry_run_verify,
268 } => RunEventWire::PackRun {
269 seq,
270 bundle_hash,
271 signature_verified,
272 key_id,
273 cache_hit,
274 dry_run_verify,
275 },
276 };
277 NdjsonEmitter::write_envelope(&self.inner, wire);
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 static SINK_LOCK: Mutex<()> = Mutex::new(());
289
290 struct BufWriter(Arc<Mutex<Vec<u8>>>);
291 impl Write for BufWriter {
292 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
293 self.0.lock().unwrap().extend_from_slice(buf);
294 Ok(buf.len())
295 }
296 fn flush(&mut self) -> std::io::Result<()> {
297 Ok(())
298 }
299 }
300
301 #[test]
302 fn emits_monotonic_seq_across_events() {
303 let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
304 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
305 let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), false);
306 let sink = emitter.sink();
307 let prior = harn_vm::run_events::install_sink(sink);
308 harn_vm::run_events::emit(RunEvent::Stdout {
309 payload: "hello\n".into(),
310 });
311 harn_vm::run_events::emit(RunEvent::Stderr {
312 payload: "warn\n".into(),
313 });
314 harn_vm::run_events::clear_sink();
315 emitter.emit_result(serde_json::Value::Null, 0);
316 if let Some(prior) = prior {
317 harn_vm::run_events::install_sink(prior);
318 }
319
320 let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
321 let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
322 assert_eq!(lines.len(), 3, "expected 3 NDJSON lines, got:\n{raw}");
323 let seqs: Vec<u64> = lines
324 .iter()
325 .map(|line| {
326 let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
327 v["data"]["seq"].as_u64().expect("seq present")
328 })
329 .collect();
330 assert_eq!(seqs, vec![1, 2, 3]);
331 let types: Vec<String> = lines
332 .iter()
333 .map(|line| {
334 let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
335 v["data"]["event_type"].as_str().expect("type").to_string()
336 })
337 .collect();
338 assert_eq!(types, vec!["stdout", "stderr", "result"]);
339 }
340
341 #[test]
342 fn quiet_drops_stdout_and_stderr_without_gaps() {
343 let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
344 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
345 let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), true);
346 let sink = emitter.sink();
347 let prior = harn_vm::run_events::install_sink(sink);
348 harn_vm::run_events::emit(RunEvent::Stdout {
349 payload: "ignored\n".into(),
350 });
351 harn_vm::run_events::emit(RunEvent::Hook {
352 name: "PreRun".into(),
353 phase: "allow".into(),
354 payload: serde_json::Value::Null,
355 });
356 harn_vm::run_events::clear_sink();
357 emitter.emit_result(serde_json::Value::Null, 0);
358 if let Some(prior) = prior {
359 harn_vm::run_events::install_sink(prior);
360 }
361
362 let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
363 let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
364 assert_eq!(lines.len(), 2, "raw:\n{raw}");
366 let seqs: Vec<u64> = lines
367 .iter()
368 .map(|line| {
369 let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
370 v["data"]["seq"].as_u64().expect("seq")
371 })
372 .collect();
373 assert_eq!(
374 seqs,
375 vec![1, 2],
376 "seq must stay contiguous after quiet filtering"
377 );
378 }
379}