ralph_workflow/json_parser/codex/mod.rs
1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//! creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use std::cell::RefCell;
39use std::io::{self, BufRead, Write};
40use std::rc::Rc;
41
42use super::health::HealthMonitor;
43use super::health::StreamingQualityMetrics;
44use super::printer::SharedPrinter;
45use super::streaming_state::StreamingSession;
46use super::terminal::TerminalMode;
47use super::types::{format_unknown_json_event, CodexEvent};
48
49use event_handlers::{
50 handle_error, handle_item_completed, handle_item_started, handle_thread_started,
51 handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
52};
53
54/// Codex event parser
55pub struct CodexParser {
56 colors: Colors,
57 verbosity: Verbosity,
58 log_file: Option<String>,
59 display_name: String,
60 /// Unified streaming session for state tracking
61 streaming_session: Rc<RefCell<StreamingSession>>,
62 /// Delta accumulator for reasoning content (which uses special display)
63 /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
64 reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
65 /// Turn counter for generating synthetic turn IDs
66 turn_counter: Rc<RefCell<u64>>,
67 /// Terminal mode for output formatting
68 terminal_mode: RefCell<TerminalMode>,
69 /// Whether to show streaming quality metrics
70 show_streaming_metrics: bool,
71 /// Output printer for capturing or displaying output
72 printer: SharedPrinter,
73}
74
75impl CodexParser {
76 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
77 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
78 }
79
80 /// Create a new `CodexParser` with a custom printer.
81 ///
82 /// # Arguments
83 ///
84 /// * `colors` - Colors for terminal output
85 /// * `verbosity` - Verbosity level for output
86 /// * `printer` - Shared printer for output
87 ///
88 /// # Returns
89 ///
90 /// A new `CodexParser` instance
91 pub(crate) fn with_printer(
92 colors: Colors,
93 verbosity: Verbosity,
94 printer: SharedPrinter,
95 ) -> Self {
96 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
97 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
98
99 // Use the printer's is_terminal method to validate it's connected correctly
100 let _printer_is_terminal = printer.borrow().is_terminal();
101
102 Self {
103 colors,
104 verbosity,
105 log_file: None,
106 display_name: "Codex".to_string(),
107 streaming_session: Rc::new(RefCell::new(streaming_session)),
108 reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
109 turn_counter: Rc::new(RefCell::new(0)),
110 terminal_mode: RefCell::new(TerminalMode::detect()),
111 show_streaming_metrics: false,
112 printer,
113 }
114 }
115
116 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
117 self.show_streaming_metrics = show;
118 self
119 }
120
121 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
122 self.display_name = display_name.to_string();
123 self
124 }
125
126 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
127 self.log_file = Some(path.to_string());
128 self
129 }
130
131 #[cfg(test)]
132 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
133 *self.terminal_mode.borrow_mut() = mode;
134 self
135 }
136
137 /// Get a shared reference to the printer.
138 ///
139 /// This allows tests, monitoring, and other code to access the printer after parsing
140 /// to verify output content, check for duplicates, or capture output for analysis.
141 ///
142 /// # Returns
143 ///
144 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
145 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
146 pub fn printer(&self) -> SharedPrinter {
147 Rc::clone(&self.printer)
148 }
149
150 /// Get streaming quality metrics from the current session.
151 ///
152 /// This provides insight into the deduplication and streaming quality of the
153 /// parsing session.
154 ///
155 /// # Returns
156 ///
157 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
158 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
159 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
160 self.streaming_session
161 .borrow()
162 .get_streaming_quality_metrics()
163 }
164
165 /// Parse and display a single Codex JSON event
166 ///
167 /// Returns `Some(formatted_output)` for valid events, or None for:
168 /// - Malformed JSON (non-JSON text passed through if meaningful)
169 /// - Unknown event types
170 /// - Empty or whitespace-only output
171 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
172 let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
173 e
174 } else {
175 // Non-JSON line - pass through as-is if meaningful
176 let trimmed = line.trim();
177 if !trimmed.is_empty() && !trimmed.starts_with('{') {
178 return Some(format!("{trimmed}\n"));
179 }
180 return None;
181 };
182
183 let ctx = EventHandlerContext {
184 colors: &self.colors,
185 verbosity: self.verbosity,
186 display_name: &self.display_name,
187 streaming_session: &self.streaming_session,
188 reasoning_accumulator: &self.reasoning_accumulator,
189 terminal_mode: *self.terminal_mode.borrow(),
190 show_streaming_metrics: self.show_streaming_metrics,
191 };
192
193 match event {
194 CodexEvent::ThreadStarted { thread_id } => {
195 let output = handle_thread_started(&ctx, thread_id);
196 if output.is_empty() {
197 None
198 } else {
199 Some(output)
200 }
201 }
202 CodexEvent::TurnStarted {} => {
203 // Generate and set synthetic turn ID for duplicate detection
204 let turn_id = {
205 let mut counter = self.turn_counter.borrow_mut();
206 let id = format!("turn-{}", *counter);
207 *counter += 1;
208 id
209 };
210 let output = handle_turn_started(&ctx, turn_id);
211 if output.is_empty() {
212 None
213 } else {
214 Some(output)
215 }
216 }
217 CodexEvent::TurnCompleted { usage } => {
218 let output = handle_turn_completed(&ctx, usage);
219 if output.is_empty() {
220 None
221 } else {
222 Some(output)
223 }
224 }
225 CodexEvent::TurnFailed { error } => {
226 let output = handle_turn_failed(&ctx, error);
227 if output.is_empty() {
228 None
229 } else {
230 Some(output)
231 }
232 }
233 CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
234 CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
235 CodexEvent::Error { message, error } => {
236 let output = handle_error(&ctx, message, error);
237 if output.is_empty() {
238 None
239 } else {
240 Some(output)
241 }
242 }
243 CodexEvent::Unknown => {
244 // Use the generic unknown event formatter for consistent handling
245 let output = format_unknown_json_event(
246 line,
247 &self.display_name,
248 self.colors,
249 self.verbosity.is_verbose(),
250 );
251 if output.is_empty() {
252 None
253 } else {
254 Some(output)
255 }
256 }
257 }
258 }
259
260 /// Check if a Codex event is a control event (state management with no user output)
261 ///
262 /// Control events are valid JSON that represent state transitions rather than
263 /// user-facing content. They should be tracked separately from "ignored" events
264 /// to avoid false health warnings.
265 fn is_control_event(event: &CodexEvent) -> bool {
266 match event {
267 // Turn lifecycle events are control events
268 CodexEvent::ThreadStarted { .. }
269 | CodexEvent::TurnStarted { .. }
270 | CodexEvent::TurnCompleted { .. }
271 | CodexEvent::TurnFailed { .. } => true,
272 // Item started/completed events are control events for certain item types
273 CodexEvent::ItemStarted { item } => {
274 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
275 }
276 CodexEvent::ItemCompleted { item } => {
277 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
278 }
279 _ => false,
280 }
281 }
282
283 /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
284 ///
285 /// Partial events represent streaming content deltas (agent messages, reasoning)
286 /// that are shown to the user in real-time. These should be tracked separately
287 /// to avoid inflating "ignored" percentages.
288 fn is_partial_event(event: &CodexEvent) -> bool {
289 match event {
290 // Item started events for agent_message and reasoning produce streaming content
291 CodexEvent::ItemStarted { item: Some(item) } => matches!(
292 item.item_type.as_deref(),
293 Some("agent_message" | "reasoning")
294 ),
295 _ => false,
296 }
297 }
298
299 /// Parse a stream of Codex NDJSON events
300 pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
301 use super::incremental_parser::IncrementalNdjsonParser;
302
303 let c = &self.colors;
304 let monitor = HealthMonitor::new("Codex");
305 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
306 std::fs::OpenOptions::new()
307 .create(true)
308 .append(true)
309 .open(log_path)
310 .ok()
311 .map(std::io::BufWriter::new)
312 });
313
314 // Use incremental parser for true real-time streaming
315 // This processes JSON as soon as it's complete, not waiting for newlines
316 let mut incremental_parser = IncrementalNdjsonParser::new();
317 let mut byte_buffer = Vec::new();
318
319 loop {
320 // Read available bytes
321 byte_buffer.clear();
322 let chunk = reader.fill_buf()?;
323 if chunk.is_empty() {
324 break;
325 }
326
327 // Process all bytes immediately
328 byte_buffer.extend_from_slice(chunk);
329 let consumed = chunk.len();
330 reader.consume(consumed);
331
332 // Feed bytes to incremental parser
333 let json_events = incremental_parser.feed(&byte_buffer);
334
335 // Process each complete JSON event immediately
336 for line in json_events {
337 let trimmed = line.trim();
338 if trimmed.is_empty() {
339 continue;
340 }
341
342 // In debug mode, also show the raw JSON
343 if self.verbosity.is_debug() {
344 let mut printer = self.printer.borrow_mut();
345 writeln!(
346 printer,
347 "{}[DEBUG]{} {}{}{}",
348 c.dim(),
349 c.reset(),
350 c.dim(),
351 &line,
352 c.reset()
353 )?;
354 printer.flush()?;
355 }
356
357 // Parse the event once - parse_event handles malformed JSON by returning None
358 match self.parse_event(&line) {
359 Some(output) => {
360 // Check if this is a partial/delta event (streaming content)
361 if trimmed.starts_with('{') {
362 if let Ok(event) = serde_json::from_str::<CodexEvent>(&line) {
363 if Self::is_partial_event(&event) {
364 monitor.record_partial_event();
365 } else {
366 monitor.record_parsed();
367 }
368 } else {
369 monitor.record_parsed();
370 }
371 } else {
372 monitor.record_parsed();
373 }
374 // Write output to printer
375 let mut printer = self.printer.borrow_mut();
376 write!(printer, "{output}")?;
377 printer.flush()?;
378 }
379 None => {
380 // Check if this was a control event (state management with no user output)
381 if trimmed.starts_with('{') {
382 if let Ok(event) = serde_json::from_str::<CodexEvent>(&line) {
383 if Self::is_control_event(&event) {
384 monitor.record_control_event();
385 } else {
386 // Valid JSON but not a control event - track as unknown
387 monitor.record_unknown_event();
388 }
389 } else {
390 // Failed to deserialize - track as parse error
391 monitor.record_parse_error();
392 }
393 } else {
394 monitor.record_ignored();
395 }
396 }
397 }
398
399 // Log raw JSON to file if configured
400 if let Some(ref mut file) = log_writer {
401 writeln!(file, "{line}")?;
402 }
403 }
404 }
405
406 if let Some(ref mut file) = log_writer {
407 file.flush()?;
408 }
409 if let Some(warning) = monitor.check_and_warn(*c) {
410 let mut printer = self.printer.borrow_mut();
411 writeln!(printer, "{warning}")?;
412 }
413 Ok(())
414 }
415}