Skip to main content

zeph_core/
json_event_sink.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `JsonEventSink`: the single stdout writer for `--json` mode.
5//!
6//! All JSON events in a `--json` session are emitted through a shared
7//! `Arc<JsonEventSink>`. The internal `Mutex<Stdout>` ensures that concurrent
8//! emitters from different tasks cannot interleave partial lines.
9//!
10//! # Ordering guarantee
11//!
12//! Events emitted from the same thread preserve their issuance order. Events
13//! from concurrent threads interleave in mutex-acquisition order. Within a
14//! single response, `response_chunk` events precede `response_end`. Tool events
15//! may interleave with chunks when tools run mid-stream (normal for agent loops).
16//!
17//! # Lock discipline
18//!
19//! `emit` holds the lock only for serialization + write + flush. It never
20//! `.await`s while holding the lock, satisfying invariant §10.
21
22use std::io::{self, Write};
23use std::sync::Mutex;
24
25use serde::Serialize;
26
27#[non_exhaustive]
28/// Structured event emitted on stdout in `--json` mode.
29///
30/// All variants are serialized as JSONL with a `"event"` discriminator field.
31#[derive(Serialize)]
32#[serde(tag = "event", rename_all = "snake_case")]
33pub enum JsonEvent<'a> {
34    /// Session boot banner emitted before the first prompt.
35    Boot {
36        version: &'a str,
37        bare: bool,
38        auto: bool,
39    },
40    /// User input received from stdin.
41    Query { text: &'a str, queue_len: usize },
42    /// Streaming assistant text chunk.
43    ResponseChunk { text: &'a str },
44    /// End-of-response marker.
45    ResponseEnd,
46    /// A tool invocation is about to run.
47    ToolCall {
48        tool: &'a str,
49        args: &'a serde_json::Value,
50        id: &'a str,
51    },
52    /// A tool returned a result.
53    ToolResult {
54        tool: &'a str,
55        id: &'a str,
56        output: &'a str,
57        is_error: bool,
58    },
59    /// Token counts and estimated cost summary.
60    Cost {
61        input_tokens: u64,
62        output_tokens: u64,
63        total_usd: f64,
64    },
65    /// Loop tick notification fired each `/loop` iteration.
66    LoopTick {
67        iteration: u64,
68        total_ticks: u64,
69        prompt_preview: &'a str,
70    },
71    /// Slash command acknowledgement — distinguishes `/loop start` confirmation
72    /// from regular assistant output in JSON streams.
73    CommandAck { command: &'a str, text: &'a str },
74    /// General status message (equivalent to spinner text in interactive channels).
75    Status { message: &'a str },
76    /// Terminal error emitted before the process exits.
77    Error { message: &'a str },
78}
79
80/// The single stdout writer for `--json` mode.
81///
82/// Wrap in `Arc` and share between `JsonCliChannel` and `JsonEventLayer`.
83/// `emit` is synchronous and lock-bounded: it never yields across `.await`.
84pub struct JsonEventSink {
85    writer: Mutex<Box<dyn Write + Send>>,
86}
87
88impl std::fmt::Debug for JsonEventSink {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("JsonEventSink").finish_non_exhaustive()
91    }
92}
93
94impl JsonEventSink {
95    /// Create a new sink that writes to the process's stdout.
96    #[must_use]
97    pub fn new() -> Self {
98        Self {
99            writer: Mutex::new(Box::new(io::stdout())),
100        }
101    }
102
103    /// Create a sink backed by an arbitrary [`Write`] implementation.
104    ///
105    /// Intended for testing: pass a type that implements [`Write`] + [`Send`] + `'static`,
106    /// such as [`std::io::Cursor`]`<Vec<u8>>`, to capture emitted JSONL lines in memory.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use std::io::Cursor;
112    /// use std::sync::Arc;
113    /// use zeph_core::json_event_sink::{JsonEvent, JsonEventSink};
114    ///
115    /// let sink = Arc::new(JsonEventSink::with_writer(Cursor::new(Vec::<u8>::new())));
116    /// sink.emit(&JsonEvent::Status { message: "hello" });
117    /// ```
118    #[must_use]
119    pub fn with_writer(w: impl Write + Send + 'static) -> Self {
120        Self {
121            writer: Mutex::new(Box::new(w)),
122        }
123    }
124
125    /// Serialize `event` as a JSON line and write it to the underlying writer.
126    ///
127    /// Silently drops the event when the mutex is poisoned or serialization fails.
128    /// This is intentional: a JSON output failure must not crash the agent.
129    pub fn emit(&self, event: &JsonEvent<'_>) {
130        let Ok(mut w) = self.writer.lock() else {
131            return;
132        };
133        if let Ok(line) = serde_json::to_string(event) {
134            let _ = writeln!(w, "{line}");
135            let _ = w.flush();
136        }
137    }
138}
139
140impl Default for JsonEventSink {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn boot_event_serializes_correctly() {
152        let event = JsonEvent::Boot {
153            version: "0.1.0",
154            bare: true,
155            auto: false,
156        };
157        let s = serde_json::to_string(&event).unwrap();
158        assert!(s.contains("\"event\":\"boot\""));
159        assert!(s.contains("\"version\":\"0.1.0\""));
160        assert!(s.contains("\"bare\":true"));
161    }
162
163    #[test]
164    fn response_end_serializes_without_fields() {
165        let event = JsonEvent::ResponseEnd;
166        let s = serde_json::to_string(&event).unwrap();
167        assert_eq!(s, r#"{"event":"response_end"}"#);
168    }
169
170    #[test]
171    fn emit_does_not_panic_on_concurrent_use() {
172        use std::sync::Arc;
173        use std::thread;
174
175        let sink = Arc::new(JsonEventSink::new());
176        let handles: Vec<_> = (0..4)
177            .map(|i| {
178                let s = Arc::clone(&sink);
179                thread::spawn(move || {
180                    for _ in 0..10 {
181                        s.emit(&JsonEvent::Status {
182                            message: &format!("thread {i}"),
183                        });
184                    }
185                })
186            })
187            .collect();
188        for h in handles {
189            h.join().unwrap();
190        }
191    }
192}