use std::io::Write;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use harn_vm::run_events::{RunEvent, RunEventSink};
use serde::Serialize;
use crate::json_envelope::{JsonEnvelope, JsonError};
pub const RUN_JSON_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum RunEventWire {
Stdout {
seq: u64,
payload: String,
},
Stderr {
seq: u64,
payload: String,
},
Transcript {
seq: u64,
#[serde(skip_serializing_if = "Option::is_none")]
agent_id: Option<String>,
kind: String,
payload: serde_json::Value,
},
ToolCall {
seq: u64,
call_id: String,
name: String,
args: serde_json::Value,
started_at: String,
},
ToolResult {
seq: u64,
call_id: String,
ok: bool,
result: serde_json::Value,
},
Hook {
seq: u64,
name: String,
phase: String,
#[serde(skip_serializing_if = "serde_json::Value::is_null")]
payload: serde_json::Value,
},
PersonaStage {
seq: u64,
persona: String,
stage: String,
transition: String,
},
Result {
seq: u64,
value: serde_json::Value,
exit_code: i32,
},
Error {
seq: u64,
error: JsonError,
},
}
impl RunEventWire {
pub fn seq(&self) -> u64 {
match self {
Self::Stdout { seq, .. }
| Self::Stderr { seq, .. }
| Self::Transcript { seq, .. }
| Self::ToolCall { seq, .. }
| Self::ToolResult { seq, .. }
| Self::Hook { seq, .. }
| Self::PersonaStage { seq, .. }
| Self::Result { seq, .. }
| Self::Error { seq, .. } => *seq,
}
}
}
pub struct NdjsonEmitter {
inner: Arc<NdjsonEmitterInner>,
}
struct NdjsonEmitterInner {
seq: AtomicU64,
quiet: bool,
out: Mutex<Box<dyn Write + Send>>,
}
impl NdjsonEmitter {
pub fn new(out: Box<dyn Write + Send>, quiet: bool) -> Self {
Self {
inner: Arc::new(NdjsonEmitterInner {
seq: AtomicU64::new(0),
quiet,
out: Mutex::new(out),
}),
}
}
pub fn sink(&self) -> Arc<dyn RunEventSink> {
Arc::new(NdjsonSink {
inner: self.inner.clone(),
})
}
fn next_seq(&self) -> u64 {
self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1
}
fn write_envelope(inner: &NdjsonEmitterInner, event: RunEventWire) {
let envelope = JsonEnvelope::ok(RUN_JSON_SCHEMA_VERSION, event);
let line = serde_json::to_string(&envelope)
.unwrap_or_else(|_| r#"{"schemaVersion":1,"ok":false}"#.to_string());
if let Ok(mut out) = inner.out.lock() {
let _ = writeln!(out, "{line}");
let _ = out.flush();
}
}
pub fn emit_result(&self, value: serde_json::Value, exit_code: i32) {
let event = RunEventWire::Result {
seq: self.next_seq(),
value,
exit_code,
};
Self::write_envelope(&self.inner, event);
}
pub fn emit_error(&self, code: impl Into<String>, message: impl Into<String>) {
let event = RunEventWire::Error {
seq: self.next_seq(),
error: JsonError {
code: code.into(),
message: message.into(),
details: serde_json::Value::Null,
},
};
Self::write_envelope(&self.inner, event);
}
}
struct NdjsonSink {
inner: Arc<NdjsonEmitterInner>,
}
impl RunEventSink for NdjsonSink {
fn emit(&self, event: RunEvent) {
let seq = self.inner.seq.fetch_add(1, Ordering::SeqCst) + 1;
let wire = match event {
RunEvent::Stdout { payload } => {
if self.inner.quiet {
self.inner.seq.fetch_sub(1, Ordering::SeqCst);
return;
}
RunEventWire::Stdout { seq, payload }
}
RunEvent::Stderr { payload } => {
if self.inner.quiet {
self.inner.seq.fetch_sub(1, Ordering::SeqCst);
return;
}
RunEventWire::Stderr { seq, payload }
}
RunEvent::Transcript {
agent_id,
kind,
payload,
} => RunEventWire::Transcript {
seq,
agent_id,
kind,
payload,
},
RunEvent::ToolCall {
call_id,
name,
args,
started_at,
} => RunEventWire::ToolCall {
seq,
call_id,
name,
args,
started_at,
},
RunEvent::ToolResult {
call_id,
ok,
result,
} => RunEventWire::ToolResult {
seq,
call_id,
ok,
result,
},
RunEvent::Hook {
name,
phase,
payload,
} => RunEventWire::Hook {
seq,
name,
phase,
payload,
},
RunEvent::PersonaStage {
persona,
stage,
transition,
} => RunEventWire::PersonaStage {
seq,
persona,
stage,
transition,
},
};
NdjsonEmitter::write_envelope(&self.inner, wire);
}
}
#[cfg(test)]
mod tests {
use super::*;
static SINK_LOCK: Mutex<()> = Mutex::new(());
struct BufWriter(Arc<Mutex<Vec<u8>>>);
impl Write for BufWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[test]
fn emits_monotonic_seq_across_events() {
let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), false);
let sink = emitter.sink();
let prior = harn_vm::run_events::install_sink(sink);
harn_vm::run_events::emit(RunEvent::Stdout {
payload: "hello\n".into(),
});
harn_vm::run_events::emit(RunEvent::Stderr {
payload: "warn\n".into(),
});
harn_vm::run_events::clear_sink();
emitter.emit_result(serde_json::Value::Null, 0);
if let Some(prior) = prior {
harn_vm::run_events::install_sink(prior);
}
let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
assert_eq!(lines.len(), 3, "expected 3 NDJSON lines, got:\n{raw}");
let seqs: Vec<u64> = lines
.iter()
.map(|line| {
let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
v["data"]["seq"].as_u64().expect("seq present")
})
.collect();
assert_eq!(seqs, vec![1, 2, 3]);
let types: Vec<String> = lines
.iter()
.map(|line| {
let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
v["data"]["event_type"].as_str().expect("type").to_string()
})
.collect();
assert_eq!(types, vec!["stdout", "stderr", "result"]);
}
#[test]
fn quiet_drops_stdout_and_stderr_without_gaps() {
let _guard = SINK_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let emitter = NdjsonEmitter::new(Box::new(BufWriter(buf.clone())), true);
let sink = emitter.sink();
let prior = harn_vm::run_events::install_sink(sink);
harn_vm::run_events::emit(RunEvent::Stdout {
payload: "ignored\n".into(),
});
harn_vm::run_events::emit(RunEvent::Hook {
name: "PreRun".into(),
phase: "allow".into(),
payload: serde_json::Value::Null,
});
harn_vm::run_events::clear_sink();
emitter.emit_result(serde_json::Value::Null, 0);
if let Some(prior) = prior {
harn_vm::run_events::install_sink(prior);
}
let raw = String::from_utf8(buf.lock().unwrap().clone()).expect("utf8");
let lines: Vec<&str> = raw.lines().filter(|line| !line.is_empty()).collect();
assert_eq!(lines.len(), 2, "raw:\n{raw}");
let seqs: Vec<u64> = lines
.iter()
.map(|line| {
let v: serde_json::Value = serde_json::from_str(line).expect("valid json");
v["data"]["seq"].as_u64().expect("seq")
})
.collect();
assert_eq!(
seqs,
vec![1, 2],
"seq must stay contiguous after quiet filtering"
);
}
}