ralph_workflow/json_parser/codex/mod.rs
1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//! creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use std::cell::RefCell;
39use std::io::{self, BufRead, Write};
40use std::rc::Rc;
41
42use super::health::HealthMonitor;
43#[cfg(feature = "test-utils")]
44use super::health::StreamingQualityMetrics;
45use super::printer::SharedPrinter;
46use super::streaming_state::StreamingSession;
47use super::terminal::TerminalMode;
48use super::types::{format_unknown_json_event, CodexEvent};
49
50use event_handlers::{
51 handle_error, handle_item_completed, handle_item_started, handle_thread_started,
52 handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
53};
54
55/// Codex event parser
56pub struct CodexParser {
57 colors: Colors,
58 verbosity: Verbosity,
59 log_file: Option<String>,
60 display_name: String,
61 /// Unified streaming session for state tracking
62 streaming_session: Rc<RefCell<StreamingSession>>,
63 /// Delta accumulator for reasoning content (which uses special display)
64 /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
65 reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
66 /// Turn counter for generating synthetic turn IDs
67 turn_counter: Rc<RefCell<u64>>,
68 /// Terminal mode for output formatting
69 terminal_mode: RefCell<TerminalMode>,
70 /// Whether to show streaming quality metrics
71 show_streaming_metrics: bool,
72 /// Output printer for capturing or displaying output
73 printer: SharedPrinter,
74}
75
76impl CodexParser {
77 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
78 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
79 }
80
81 /// Create a new `CodexParser` with a custom printer.
82 ///
83 /// # Arguments
84 ///
85 /// * `colors` - Colors for terminal output
86 /// * `verbosity` - Verbosity level for output
87 /// * `printer` - Shared printer for output
88 ///
89 /// # Returns
90 ///
91 /// A new `CodexParser` instance
92 pub(crate) fn with_printer(
93 colors: Colors,
94 verbosity: Verbosity,
95 printer: SharedPrinter,
96 ) -> Self {
97 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
98 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
99
100 // Use the printer's is_terminal method to validate it's connected correctly
101 let _printer_is_terminal = printer.borrow().is_terminal();
102
103 Self {
104 colors,
105 verbosity,
106 log_file: None,
107 display_name: "Codex".to_string(),
108 streaming_session: Rc::new(RefCell::new(streaming_session)),
109 reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
110 turn_counter: Rc::new(RefCell::new(0)),
111 terminal_mode: RefCell::new(TerminalMode::detect()),
112 show_streaming_metrics: false,
113 printer,
114 }
115 }
116
117 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
118 self.show_streaming_metrics = show;
119 self
120 }
121
122 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
123 self.display_name = display_name.to_string();
124 self
125 }
126
127 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
128 self.log_file = Some(path.to_string());
129 self
130 }
131
132 #[cfg(any(test, feature = "test-utils"))]
133 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
134 *self.terminal_mode.borrow_mut() = mode;
135 self
136 }
137
138 // ===== Test utilities (available with test-utils feature) =====
139
140 /// Create a new parser with a custom printer (for testing).
141 ///
142 /// This method is public when the `test-utils` feature is enabled,
143 /// allowing integration tests to create parsers with custom printers.
144 #[cfg(feature = "test-utils")]
145 pub fn with_printer_for_test(
146 colors: Colors,
147 verbosity: Verbosity,
148 printer: SharedPrinter,
149 ) -> Self {
150 Self::with_printer(colors, verbosity, printer)
151 }
152
153 /// Set the log file path (for testing).
154 ///
155 /// This method is public when the `test-utils` feature is enabled,
156 /// allowing integration tests to configure log file output.
157 #[cfg(feature = "test-utils")]
158 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
159 self.log_file = Some(path.to_string());
160 self
161 }
162
163 /// Parse a stream of JSON events (for testing).
164 ///
165 /// This method is public when the `test-utils` feature is enabled,
166 /// allowing integration tests to invoke parsing.
167 #[cfg(feature = "test-utils")]
168 pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
169 self.parse_stream(reader)
170 }
171
172 /// Get a shared reference to the printer.
173 ///
174 /// This allows tests, monitoring, and other code to access the printer after parsing
175 /// to verify output content, check for duplicates, or capture output for analysis.
176 /// Only available with the `test-utils` feature.
177 ///
178 /// # Returns
179 ///
180 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181 #[cfg(feature = "test-utils")]
182 pub fn printer(&self) -> SharedPrinter {
183 Rc::clone(&self.printer)
184 }
185
186 /// Get streaming quality metrics from the current session.
187 ///
188 /// This provides insight into the deduplication and streaming quality of the
189 /// parsing session. Only available with the `test-utils` feature.
190 ///
191 /// # Returns
192 ///
193 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
194 #[cfg(feature = "test-utils")]
195 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
196 self.streaming_session
197 .borrow()
198 .get_streaming_quality_metrics()
199 }
200
201 /// Convert output string to Option, returning None if empty.
202 #[inline]
203 fn optional_output(output: String) -> Option<String> {
204 if output.is_empty() {
205 None
206 } else {
207 Some(output)
208 }
209 }
210
211 /// Parse and display a single Codex JSON event
212 ///
213 /// Returns `Some(formatted_output)` for valid events, or None for:
214 /// - Malformed JSON (non-JSON text passed through if meaningful)
215 /// - Unknown event types
216 /// - Empty or whitespace-only output
217 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
218 let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
219 e
220 } else {
221 // Non-JSON line - pass through as-is if meaningful
222 let trimmed = line.trim();
223 if !trimmed.is_empty() && !trimmed.starts_with('{') {
224 return Some(format!("{trimmed}\n"));
225 }
226 return None;
227 };
228
229 let ctx = EventHandlerContext {
230 colors: &self.colors,
231 verbosity: self.verbosity,
232 display_name: &self.display_name,
233 streaming_session: &self.streaming_session,
234 reasoning_accumulator: &self.reasoning_accumulator,
235 terminal_mode: *self.terminal_mode.borrow(),
236 show_streaming_metrics: self.show_streaming_metrics,
237 };
238
239 match event {
240 CodexEvent::ThreadStarted { thread_id } => {
241 Self::optional_output(handle_thread_started(&ctx, thread_id))
242 }
243 CodexEvent::TurnStarted {} => {
244 // Generate and set synthetic turn ID for duplicate detection
245 let turn_id = {
246 let mut counter = self.turn_counter.borrow_mut();
247 let id = format!("turn-{}", *counter);
248 *counter += 1;
249 id
250 };
251 Self::optional_output(handle_turn_started(&ctx, turn_id))
252 }
253 CodexEvent::TurnCompleted { usage } => {
254 Self::optional_output(handle_turn_completed(&ctx, usage))
255 }
256 CodexEvent::TurnFailed { error } => {
257 Self::optional_output(handle_turn_failed(&ctx, error))
258 }
259 CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
260 CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
261 CodexEvent::Error { message, error } => {
262 Self::optional_output(handle_error(&ctx, message, error))
263 }
264 CodexEvent::Result { result } => self.format_result_event(result),
265 CodexEvent::Unknown => {
266 let output = format_unknown_json_event(
267 line,
268 &self.display_name,
269 self.colors,
270 self.verbosity.is_verbose(),
271 );
272 Self::optional_output(output)
273 }
274 }
275 }
276
277 /// Format a Result event for display.
278 ///
279 /// Result events are synthetic control events that are written to the log file
280 /// by `process_event_line`. In debug mode, this method also formats them for
281 /// console output to help with troubleshooting.
282 fn format_result_event(&self, result: Option<String>) -> Option<String> {
283 if !self.verbosity.is_debug() {
284 return None;
285 }
286 result.map(|content| {
287 let limit = self.verbosity.truncate_limit("result");
288 let preview = crate::common::truncate_text(&content, limit);
289 format!(
290 "{}[{}]{} {}Result:{} {}{}{}\n",
291 self.colors.dim(),
292 self.display_name,
293 self.colors.reset(),
294 self.colors.green(),
295 self.colors.reset(),
296 self.colors.dim(),
297 preview,
298 self.colors.reset()
299 )
300 })
301 }
302
303 /// Check if a Codex event is a control event (state management with no user output)
304 ///
305 /// Control events are valid JSON that represent state transitions rather than
306 /// user-facing content. They should be tracked separately from "ignored" events
307 /// to avoid false health warnings.
308 fn is_control_event(event: &CodexEvent) -> bool {
309 match event {
310 // Turn lifecycle events are control events
311 CodexEvent::ThreadStarted { .. }
312 | CodexEvent::TurnStarted { .. }
313 | CodexEvent::TurnCompleted { .. }
314 | CodexEvent::TurnFailed { .. }
315 | CodexEvent::Result { .. } => true,
316 // Item started/completed events are control events for certain item types
317 CodexEvent::ItemStarted { item } => {
318 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
319 }
320 CodexEvent::ItemCompleted { item } => {
321 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
322 }
323 _ => false,
324 }
325 }
326
327 /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
328 ///
329 /// Partial events represent streaming content deltas (agent messages, reasoning)
330 /// that are shown to the user in real-time. These should be tracked separately
331 /// to avoid inflating "ignored" percentages.
332 fn is_partial_event(event: &CodexEvent) -> bool {
333 match event {
334 // Item started events for agent_message and reasoning produce streaming content
335 CodexEvent::ItemStarted { item: Some(item) } => matches!(
336 item.item_type.as_deref(),
337 Some("agent_message" | "reasoning")
338 ),
339 _ => false,
340 }
341 }
342
343 /// Write a synthetic result event to the log file with accumulated content.
344 ///
345 /// This is called when a `TurnCompleted` event is encountered to ensure
346 /// that the extraction process can find the aggregated content.
347 ///
348 /// # Persistence Guarantees
349 ///
350 /// This function flushes the `BufWriter` after writing and calls `sync_all()`
351 /// to ensure data is committed to physical storage. Errors are propagated
352 /// to ensure the result event is actually persisted before continuing.
353 fn write_synthetic_result_event(
354 file: &mut impl std::io::Write,
355 accumulated: &str,
356 ) -> io::Result<()> {
357 let result_event = CodexEvent::Result {
358 result: Some(accumulated.to_string()),
359 };
360 let json = serde_json::to_string(&result_event)?;
361 writeln!(file, "{json}")?;
362 file.flush()?;
363 Ok(())
364 }
365
366 /// Process a single JSON event line during parsing.
367 ///
368 /// This helper method handles the common logic for processing parsed JSON events,
369 /// including debug output, event parsing, health monitoring, and log writing.
370 /// It's used both for events from the streaming parser and for any remaining
371 /// buffered data at the end of the stream.
372 ///
373 /// # Arguments
374 ///
375 /// * `line` - The JSON line to process
376 /// * `monitor` - The health monitor to record parsing metrics (mut needed for `record_*` methods)
377 /// * `log_writer` - Optional log file writer
378 ///
379 /// # Returns
380 ///
381 /// `Ok(true)` if the line was successfully processed, `Ok(false)` if the line
382 /// was empty or skipped, or `Err` if an IO error occurred.
383 fn process_event_line(
384 &self,
385 line: &str,
386 monitor: &HealthMonitor,
387 log_writer: &mut Option<std::io::BufWriter<std::fs::File>>,
388 ) -> io::Result<bool> {
389 let trimmed = line.trim();
390 if trimmed.is_empty() {
391 return Ok(false);
392 }
393
394 if self.verbosity.is_debug() {
395 let mut printer = self.printer.borrow_mut();
396 writeln!(
397 printer,
398 "{}[DEBUG]{} {}{}{}",
399 self.colors.dim(),
400 self.colors.reset(),
401 self.colors.dim(),
402 line,
403 self.colors.reset()
404 )?;
405 printer.flush()?;
406 }
407
408 // Parse the event once for both display/logic and synthetic result writing
409 let parsed_event = if trimmed.starts_with('{') {
410 serde_json::from_str::<CodexEvent>(trimmed).ok()
411 } else {
412 None
413 };
414
415 // Check if this is a turn.completed event using the parsed event
416 let is_turn_completed = parsed_event
417 .as_ref()
418 .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
419
420 match self.parse_event(line) {
421 Some(output) => {
422 if let Some(event) = &parsed_event {
423 if Self::is_partial_event(event) {
424 monitor.record_partial_event();
425 } else {
426 monitor.record_parsed();
427 }
428 } else {
429 monitor.record_parsed();
430 }
431 let mut printer = self.printer.borrow_mut();
432 write!(printer, "{output}")?;
433 printer.flush()?;
434 }
435 None => {
436 if let Some(event) = &parsed_event {
437 if Self::is_control_event(event) {
438 monitor.record_control_event();
439 } else {
440 monitor.record_unknown_event();
441 }
442 } else {
443 monitor.record_ignored();
444 }
445 }
446 }
447
448 if let Some(ref mut file) = log_writer {
449 writeln!(file, "{line}")?;
450 // Write synthetic result event on turn.completed to ensure content is captured
451 // This handles the normal case where the stream completes properly
452 if is_turn_completed {
453 if let Some(accumulated) = self
454 .streaming_session
455 .borrow()
456 .get_accumulated(super::types::ContentType::Text, "agent_msg")
457 {
458 // Propagate the error to ensure the result event is written
459 Self::write_synthetic_result_event(file, accumulated)?;
460 // Sync to disk to ensure result event is persisted immediately
461 file.get_mut().sync_all()?;
462 }
463 }
464 }
465
466 Ok(true)
467 }
468
469 /// Parse a stream of Codex NDJSON events
470 pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
471 use super::incremental_parser::IncrementalNdjsonParser;
472
473 let monitor = HealthMonitor::new("Codex");
474 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
475 std::fs::OpenOptions::new()
476 .create(true)
477 .append(true)
478 .open(log_path)
479 .ok()
480 .map(std::io::BufWriter::new)
481 });
482
483 let mut incremental_parser = IncrementalNdjsonParser::new();
484 let mut byte_buffer = Vec::new();
485 // Track whether we've written a synthetic result event for the current turn
486 let mut result_written_for_current_turn = false;
487
488 loop {
489 byte_buffer.clear();
490 let chunk = reader.fill_buf()?;
491 if chunk.is_empty() {
492 break;
493 }
494 let consumed = chunk.len();
495 byte_buffer.extend_from_slice(chunk);
496 reader.consume(consumed);
497
498 for line in incremental_parser.feed(&byte_buffer) {
499 // Check if this is a turn.completed or turn.started event before processing
500 let is_turn_completed = line.trim().starts_with('{')
501 && serde_json::from_str::<CodexEvent>(line.trim())
502 .ok()
503 .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
504 let is_turn_started = line.trim().starts_with('{')
505 && serde_json::from_str::<CodexEvent>(line.trim())
506 .ok()
507 .is_some_and(|e| matches!(e, CodexEvent::TurnStarted { .. }));
508
509 self.process_event_line(&line, &monitor, &mut log_writer)?;
510
511 // Track result event writes - reset flag when new turn starts
512 if is_turn_started {
513 result_written_for_current_turn = false;
514 } else if is_turn_completed {
515 result_written_for_current_turn = true;
516 }
517 }
518 }
519
520 // Handle any remaining buffered data when the stream ends.
521 // Only process if it's valid JSON - incomplete buffered data should be skipped.
522 if let Some(remaining) = incremental_parser.finish() {
523 // Only process if it's valid JSON to avoid processing incomplete buffered data
524 if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(&remaining).is_ok()
525 {
526 self.process_event_line(&remaining, &monitor, &mut log_writer)?;
527 }
528 }
529
530 // Ensure accumulated content is written even if turn.completed was not received
531 // This handles the case where the stream ends unexpectedly
532 if let Some(ref mut file) = log_writer {
533 if !result_written_for_current_turn {
534 if let Some(accumulated) = self
535 .streaming_session
536 .borrow()
537 .get_accumulated(super::types::ContentType::Text, "agent_msg")
538 {
539 // Write the synthetic result event for any accumulated content
540 Self::write_synthetic_result_event(file, accumulated)?;
541 }
542 }
543 file.flush()?;
544 }
545 // Ensure data is written to disk before continuing
546 // This prevents race conditions where extraction runs before OS commits writes
547 if let Some(ref mut file) = log_writer {
548 // Propagate sync_all errors to ensure all data is committed to disk
549 file.get_mut().sync_all()?;
550 }
551 if let Some(warning) = monitor.check_and_warn(self.colors) {
552 let mut printer = self.printer.borrow_mut();
553 writeln!(printer, "{warning}")?;
554 }
555 Ok(())
556 }
557}