1use std::io::Write;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11
12use harn_vm::run_events::{RunEvent, RunEventSink};
13use serde::Serialize;
14
15use crate::json_envelope::{JsonEnvelope, JsonError};
16
17pub const RUN_JSON_SCHEMA_VERSION: u32 = 1;
21
22#[derive(Debug, Clone, Serialize)]
27#[serde(tag = "event_type", rename_all = "snake_case")]
28pub enum RunEventWire {
29 Stdout {
30 seq: u64,
31 payload: String,
32 },
33 Stderr {
34 seq: u64,
35 payload: String,
36 },
37 Transcript {
38 seq: u64,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 agent_id: Option<String>,
41 kind: String,
42 payload: serde_json::Value,
43 },
44 ToolCall {
45 seq: u64,
46 call_id: String,
47 name: String,
48 args: serde_json::Value,
49 started_at: String,
50 },
51 ToolResult {
52 seq: u64,
53 call_id: String,
54 ok: bool,
55 result: serde_json::Value,
56 },
57 Hook {
58 seq: u64,
59 name: String,
60 phase: String,
61 #[serde(skip_serializing_if = "serde_json::Value::is_null")]
62 payload: serde_json::Value,
63 },
64 PersonaStage {
65 seq: u64,
66 persona: String,
67 stage: String,
68 transition: String,
69 },
70 Result {
71 seq: u64,
72 value: serde_json::Value,
73 exit_code: i32,
74 },
75 Error {
76 seq: u64,
77 error: JsonError,
78 },
79}
80
81impl RunEventWire {
82 pub fn seq(&self) -> u64 {
84 match self {
85 Self::Stdout { seq, .. }
86 | Self::Stderr { seq, .. }
87 | Self::Transcript { seq, .. }
88 | Self::ToolCall { seq, .. }
89 | Self::ToolResult { seq, .. }
90 | Self::Hook { seq, .. }
91 | Self::PersonaStage { seq, .. }
92 | Self::Result { seq, .. }
93 | Self::Error { seq, .. } => *seq,
94 }
95 }
96}
97
98pub struct NdjsonEmitter {
103 inner: Arc<NdjsonEmitterInner>,
104}
105
106struct NdjsonEmitterInner {
107 seq: AtomicU64,
108 quiet: bool,
109 out: Mutex<Box<dyn Write + Send>>,
113}
114
115impl NdjsonEmitter {
116 pub fn new(out: Box<dyn Write + Send>, quiet: bool) -> Self {
120 Self {
121 inner: Arc::new(NdjsonEmitterInner {
122 seq: AtomicU64::new(0),
123 quiet,
124 out: Mutex::new(out),
125 }),
126 }
127 }
128
129 pub fn sink(&self) -> Arc<dyn RunEventSink> {
132 Arc::new(NdjsonSink {
133 inner: self.inner.clone(),
134 })
135 }
136
137 fn next_seq(&self) -> u64 {
139 self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1
140 }
141
142 fn write_envelope(inner: &NdjsonEmitterInner, event: RunEventWire) {
143 let envelope = JsonEnvelope::ok(RUN_JSON_SCHEMA_VERSION, event);
144 let line = serde_json::to_string(&envelope)
145 .unwrap_or_else(|_| r#"{"schemaVersion":1,"ok":false}"#.to_string());
146 if let Ok(mut out) = inner.out.lock() {
147 let _ = writeln!(out, "{line}");
148 let _ = out.flush();
149 }
150 }
151
152 pub fn emit_result(&self, value: serde_json::Value, exit_code: i32) {
154 let event = RunEventWire::Result {
155 seq: self.next_seq(),
156 value,
157 exit_code,
158 };
159 Self::write_envelope(&self.inner, event);
160 }
161
162 pub fn emit_error(&self, code: impl Into<String>, message: impl Into<String>) {
165 let event = RunEventWire::Error {
166 seq: self.next_seq(),
167 error: JsonError {
168 code: code.into(),
169 message: message.into(),
170 details: serde_json::Value::Null,
171 },
172 };
173 Self::write_envelope(&self.inner, event);
174 }
175}
176
177struct NdjsonSink {
178 inner: Arc<NdjsonEmitterInner>,
179}
180
181impl RunEventSink for NdjsonSink {
182 fn emit(&self, event: RunEvent) {
183 let seq = self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1;
184 let wire = match event {
185 RunEvent::Stdout { payload } => {
186 if self.inner.quiet {
187 self.inner.seq.fetch_sub(1, Ordering::SeqCst);
189 return;
190 }
191 RunEventWire::Stdout { seq, payload }
192 }
193 RunEvent::Stderr { payload } => {
194 if self.inner.quiet {
195 self.inner.seq.fetch_sub(1, Ordering::SeqCst);
196 return;
197 }
198 RunEventWire::Stderr { seq, payload }
199 }
200 RunEvent::Transcript {
201 agent_id,
202 kind,
203 payload,
204 } => RunEventWire::Transcript {
205 seq,
206 agent_id,
207 kind,
208 payload,
209 },
210 RunEvent::ToolCall {
211 call_id,
212 name,
213 args,
214 started_at,
215 } => RunEventWire::ToolCall {
216 seq,
217 call_id,
218 name,
219 args,
220 started_at,
221 },
222 RunEvent::ToolResult {
223 call_id,
224 ok,
225 result,
226 } => RunEventWire::ToolResult {
227 seq,
228 call_id,
229 ok,
230 result,
231 },
232 RunEvent::Hook {
233 name,
234 phase,
235 payload,
236 } => RunEventWire::Hook {
237 seq,
238 name,
239 phase,
240 payload,
241 },
242 RunEvent::PersonaStage {
243 persona,
244 stage,
245 transition,
246 } => RunEventWire::PersonaStage {
247 seq,
248 persona,
249 stage,
250 transition,
251 },
252 };
253 NdjsonEmitter::write_envelope(&self.inner, wire);
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 static SINK_LOCK: Mutex<()> = Mutex::new(());
265
266 struct BufWriter(Arc<Mutex<Vec<u8>>>);
267 impl Write for BufWriter {
268 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
269 self.0.lock().unwrap().extend_from_slice(buf);
270 Ok(buf.len())
271 }
272 fn flush(&mut self) -> std::io::Result<()> {
273 Ok(())
274 }
275 }
276
277 #[test]
278 fn emits_monotonic_seq_across_events() {
279 let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
280 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
281 let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), false);
282 let sink = emitter.sink();
283 let prior = harn_vm::run_events::install_sink(sink);
284 harn_vm::run_events::emit(RunEvent::Stdout {
285 payload: "hello\n".into(),
286 });
287 harn_vm::run_events::emit(RunEvent::Stderr {
288 payload: "warn\n".into(),
289 });
290 harn_vm::run_events::clear_sink();
291 emitter.emit_result(serde_json::Value::Null, 0);
292 if let Some(prior) = prior {
293 harn_vm::run_events::install_sink(prior);
294 }
295
296 let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
297 let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
298 assert_eq!(lines.len(), 3, "expected 3 NDJSON lines, got:\n{raw}");
299 let seqs: Vec<u64> = lines
300 .iter()
301 .map(|line| {
302 let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
303 v["data"]["seq"].as_u64().expect("seq present")
304 })
305 .collect();
306 assert_eq!(seqs, vec![1, 2, 3]);
307 let types: Vec<String> = lines
308 .iter()
309 .map(|line| {
310 let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
311 v["data"]["event_type"].as_str().expect("type").to_string()
312 })
313 .collect();
314 assert_eq!(types, vec!["stdout", "stderr", "result"]);
315 }
316
317 #[test]
318 fn quiet_drops_stdout_and_stderr_without_gaps() {
319 let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
320 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
321 let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), true);
322 let sink = emitter.sink();
323 let prior = harn_vm::run_events::install_sink(sink);
324 harn_vm::run_events::emit(RunEvent::Stdout {
325 payload: "ignored\n".into(),
326 });
327 harn_vm::run_events::emit(RunEvent::Hook {
328 name: "PreRun".into(),
329 phase: "allow".into(),
330 payload: serde_json::Value::Null,
331 });
332 harn_vm::run_events::clear_sink();
333 emitter.emit_result(serde_json::Value::Null, 0);
334 if let Some(prior) = prior {
335 harn_vm::run_events::install_sink(prior);
336 }
337
338 let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
339 let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
340 assert_eq!(lines.len(), 2, "raw:\n{raw}");
342 let seqs: Vec<u64> = lines
343 .iter()
344 .map(|line| {
345 let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
346 v["data"]["seq"].as_u64().expect("seq")
347 })
348 .collect();
349 assert_eq!(
350 seqs,
351 vec![1, 2],
352 "seq must stay contiguous after quiet filtering"
353 );
354 }
355}