zeph_core/json_event_sink.rs
1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `JsonEventSink`: the single stdout writer for `--json` mode.
5//!
6//! All JSON events in a `--json` session are emitted through a shared
7//! `Arc<JsonEventSink>`. The internal `Mutex<Stdout>` ensures that concurrent
8//! emitters from different tasks cannot interleave partial lines.
9//!
10//! # Ordering guarantee
11//!
12//! Events emitted from the same thread preserve their issuance order. Events
13//! from concurrent threads interleave in mutex-acquisition order. Within a
14//! single response, `response_chunk` events precede `response_end`. Tool events
15//! may interleave with chunks when tools run mid-stream (normal for agent loops).
16//!
17//! # Lock discipline
18//!
19//! `emit` holds the lock only for serialization + write + flush. It never
20//! `.await`s while holding the lock, satisfying invariant §10.
21
22use std::io::{self, Write};
23use std::sync::Mutex;
24
25use serde::Serialize;
26
27#[non_exhaustive]
28/// Structured event emitted on stdout in `--json` mode.
29///
30/// All variants are serialized as JSONL with a `"event"` discriminator field.
31#[derive(Serialize)]
32#[serde(tag = "event", rename_all = "snake_case")]
33pub enum JsonEvent<'a> {
34 /// Session boot banner emitted before the first prompt.
35 Boot {
36 version: &'a str,
37 bare: bool,
38 auto: bool,
39 },
40 /// User input received from stdin.
41 Query { text: &'a str, queue_len: usize },
42 /// Streaming assistant text chunk.
43 ResponseChunk { text: &'a str },
44 /// End-of-response marker.
45 ResponseEnd,
46 /// A tool invocation is about to run.
47 ToolCall {
48 tool: &'a str,
49 args: &'a serde_json::Value,
50 id: &'a str,
51 },
52 /// A tool returned a result.
53 ToolResult {
54 tool: &'a str,
55 id: &'a str,
56 output: &'a str,
57 is_error: bool,
58 },
59 /// Token counts and estimated cost summary.
60 Cost {
61 input_tokens: u64,
62 output_tokens: u64,
63 total_usd: f64,
64 },
65 /// Loop tick notification fired each `/loop` iteration.
66 LoopTick {
67 iteration: u64,
68 total_ticks: u64,
69 prompt_preview: &'a str,
70 },
71 /// Slash command acknowledgement — distinguishes `/loop start` confirmation
72 /// from regular assistant output in JSON streams.
73 CommandAck { command: &'a str, text: &'a str },
74 /// General status message (equivalent to spinner text in interactive channels).
75 Status { message: &'a str },
76 /// Terminal error emitted before the process exits.
77 Error { message: &'a str },
78}
79
80/// The single stdout writer for `--json` mode.
81///
82/// Wrap in `Arc` and share between `JsonCliChannel` and `JsonEventLayer`.
83/// `emit` is synchronous and lock-bounded: it never yields across `.await`.
84pub struct JsonEventSink {
85 writer: Mutex<Box<dyn Write + Send>>,
86}
87
88impl std::fmt::Debug for JsonEventSink {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("JsonEventSink").finish_non_exhaustive()
91 }
92}
93
94impl JsonEventSink {
95 /// Create a new sink that writes to the process's stdout.
96 #[must_use]
97 pub fn new() -> Self {
98 Self {
99 writer: Mutex::new(Box::new(io::stdout())),
100 }
101 }
102
103 /// Create a sink backed by an arbitrary [`Write`] implementation.
104 ///
105 /// Intended for testing: pass a type that implements [`Write`] + [`Send`] + `'static`,
106 /// such as [`std::io::Cursor`]`<Vec<u8>>`, to capture emitted JSONL lines in memory.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use std::io::Cursor;
112 /// use std::sync::Arc;
113 /// use zeph_core::json_event_sink::{JsonEvent, JsonEventSink};
114 ///
115 /// let sink = Arc::new(JsonEventSink::with_writer(Cursor::new(Vec::<u8>::new())));
116 /// sink.emit(&JsonEvent::Status { message: "hello" });
117 /// ```
118 #[must_use]
119 pub fn with_writer(w: impl Write + Send + 'static) -> Self {
120 Self {
121 writer: Mutex::new(Box::new(w)),
122 }
123 }
124
125 /// Serialize `event` as a JSON line and write it to the underlying writer.
126 ///
127 /// Silently drops the event when the mutex is poisoned or serialization fails.
128 /// This is intentional: a JSON output failure must not crash the agent.
129 pub fn emit(&self, event: &JsonEvent<'_>) {
130 let Ok(mut w) = self.writer.lock() else {
131 return;
132 };
133 if let Ok(line) = serde_json::to_string(event) {
134 let _ = writeln!(w, "{line}");
135 let _ = w.flush();
136 }
137 }
138}
139
140impl Default for JsonEventSink {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn boot_event_serializes_correctly() {
152 let event = JsonEvent::Boot {
153 version: "0.1.0",
154 bare: true,
155 auto: false,
156 };
157 let s = serde_json::to_string(&event).unwrap();
158 assert!(s.contains("\"event\":\"boot\""));
159 assert!(s.contains("\"version\":\"0.1.0\""));
160 assert!(s.contains("\"bare\":true"));
161 }
162
163 #[test]
164 fn response_end_serializes_without_fields() {
165 let event = JsonEvent::ResponseEnd;
166 let s = serde_json::to_string(&event).unwrap();
167 assert_eq!(s, r#"{"event":"response_end"}"#);
168 }
169
170 #[test]
171 fn emit_does_not_panic_on_concurrent_use() {
172 use std::sync::Arc;
173 use std::thread;
174
175 let sink = Arc::new(JsonEventSink::new());
176 let handles: Vec<_> = (0..4)
177 .map(|i| {
178 let s = Arc::clone(&sink);
179 thread::spawn(move || {
180 for _ in 0..10 {
181 s.emit(&JsonEvent::Status {
182 message: &format!("thread {i}"),
183 });
184 }
185 })
186 })
187 .collect();
188 for h in handles {
189 h.join().unwrap();
190 }
191 }
192}