ralph_workflow/json_parser/codex/
mod.rs

1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//!    creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r          (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r  (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n     (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use std::cell::RefCell;
39use std::io::{self, BufRead, Write};
40use std::rc::Rc;
41
42use super::health::HealthMonitor;
43use super::health::StreamingQualityMetrics;
44use super::printer::SharedPrinter;
45use super::streaming_state::StreamingSession;
46use super::terminal::TerminalMode;
47use super::types::{format_unknown_json_event, CodexEvent};
48
49use event_handlers::{
50    handle_error, handle_item_completed, handle_item_started, handle_thread_started,
51    handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
52};
53
54/// Codex event parser
55pub struct CodexParser {
56    colors: Colors,
57    verbosity: Verbosity,
58    log_file: Option<String>,
59    display_name: String,
60    /// Unified streaming session for state tracking
61    streaming_session: Rc<RefCell<StreamingSession>>,
62    /// Delta accumulator for reasoning content (which uses special display)
63    /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
64    reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
65    /// Turn counter for generating synthetic turn IDs
66    turn_counter: Rc<RefCell<u64>>,
67    /// Terminal mode for output formatting
68    terminal_mode: RefCell<TerminalMode>,
69    /// Whether to show streaming quality metrics
70    show_streaming_metrics: bool,
71    /// Output printer for capturing or displaying output
72    printer: SharedPrinter,
73}
74
75impl CodexParser {
76    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
77        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
78    }
79
80    /// Create a new `CodexParser` with a custom printer.
81    ///
82    /// # Arguments
83    ///
84    /// * `colors` - Colors for terminal output
85    /// * `verbosity` - Verbosity level for output
86    /// * `printer` - Shared printer for output
87    ///
88    /// # Returns
89    ///
90    /// A new `CodexParser` instance
91    pub(crate) fn with_printer(
92        colors: Colors,
93        verbosity: Verbosity,
94        printer: SharedPrinter,
95    ) -> Self {
96        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
97        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
98
99        // Use the printer's is_terminal method to validate it's connected correctly
100        let _printer_is_terminal = printer.borrow().is_terminal();
101
102        Self {
103            colors,
104            verbosity,
105            log_file: None,
106            display_name: "Codex".to_string(),
107            streaming_session: Rc::new(RefCell::new(streaming_session)),
108            reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
109            turn_counter: Rc::new(RefCell::new(0)),
110            terminal_mode: RefCell::new(TerminalMode::detect()),
111            show_streaming_metrics: false,
112            printer,
113        }
114    }
115
116    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
117        self.show_streaming_metrics = show;
118        self
119    }
120
121    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
122        self.display_name = display_name.to_string();
123        self
124    }
125
126    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
127        self.log_file = Some(path.to_string());
128        self
129    }
130
131    #[cfg(test)]
132    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
133        *self.terminal_mode.borrow_mut() = mode;
134        self
135    }
136
137    // ===== Test utilities (available with test-utils feature) =====
138
139    /// Create a new parser with a custom printer (for testing).
140    ///
141    /// This method is public when the `test-utils` feature is enabled,
142    /// allowing integration tests to create parsers with custom printers.
143    #[cfg(feature = "test-utils")]
144    pub fn with_printer_for_test(
145        colors: Colors,
146        verbosity: Verbosity,
147        printer: SharedPrinter,
148    ) -> Self {
149        Self::with_printer(colors, verbosity, printer)
150    }
151
152    /// Set the log file path (for testing).
153    ///
154    /// This method is public when the `test-utils` feature is enabled,
155    /// allowing integration tests to configure log file output.
156    #[cfg(feature = "test-utils")]
157    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
158        self.log_file = Some(path.to_string());
159        self
160    }
161
162    /// Parse a stream of JSON events (for testing).
163    ///
164    /// This method is public when the `test-utils` feature is enabled,
165    /// allowing integration tests to invoke parsing.
166    #[cfg(feature = "test-utils")]
167    pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
168        self.parse_stream(reader)
169    }
170
171    /// Get a shared reference to the printer.
172    ///
173    /// This allows tests, monitoring, and other code to access the printer after parsing
174    /// to verify output content, check for duplicates, or capture output for analysis.
175    ///
176    /// # Returns
177    ///
178    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
179    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
180    pub fn printer(&self) -> SharedPrinter {
181        Rc::clone(&self.printer)
182    }
183
184    /// Get streaming quality metrics from the current session.
185    ///
186    /// This provides insight into the deduplication and streaming quality of the
187    /// parsing session.
188    ///
189    /// # Returns
190    ///
191    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
192    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
193    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
194        self.streaming_session
195            .borrow()
196            .get_streaming_quality_metrics()
197    }
198
199    /// Convert output string to Option, returning None if empty.
200    #[inline]
201    fn optional_output(output: String) -> Option<String> {
202        if output.is_empty() {
203            None
204        } else {
205            Some(output)
206        }
207    }
208
209    /// Parse and display a single Codex JSON event
210    ///
211    /// Returns `Some(formatted_output)` for valid events, or None for:
212    /// - Malformed JSON (non-JSON text passed through if meaningful)
213    /// - Unknown event types
214    /// - Empty or whitespace-only output
215    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
216        let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
217            e
218        } else {
219            // Non-JSON line - pass through as-is if meaningful
220            let trimmed = line.trim();
221            if !trimmed.is_empty() && !trimmed.starts_with('{') {
222                return Some(format!("{trimmed}\n"));
223            }
224            return None;
225        };
226
227        let ctx = EventHandlerContext {
228            colors: &self.colors,
229            verbosity: self.verbosity,
230            display_name: &self.display_name,
231            streaming_session: &self.streaming_session,
232            reasoning_accumulator: &self.reasoning_accumulator,
233            terminal_mode: *self.terminal_mode.borrow(),
234            show_streaming_metrics: self.show_streaming_metrics,
235        };
236
237        match event {
238            CodexEvent::ThreadStarted { thread_id } => {
239                Self::optional_output(handle_thread_started(&ctx, thread_id))
240            }
241            CodexEvent::TurnStarted {} => {
242                // Generate and set synthetic turn ID for duplicate detection
243                let turn_id = {
244                    let mut counter = self.turn_counter.borrow_mut();
245                    let id = format!("turn-{}", *counter);
246                    *counter += 1;
247                    id
248                };
249                Self::optional_output(handle_turn_started(&ctx, turn_id))
250            }
251            CodexEvent::TurnCompleted { usage } => {
252                Self::optional_output(handle_turn_completed(&ctx, usage))
253            }
254            CodexEvent::TurnFailed { error } => {
255                Self::optional_output(handle_turn_failed(&ctx, error))
256            }
257            CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
258            CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
259            CodexEvent::Error { message, error } => {
260                Self::optional_output(handle_error(&ctx, message, error))
261            }
262            CodexEvent::Result { result } => self.format_result_event(result),
263            CodexEvent::Unknown => {
264                let output = format_unknown_json_event(
265                    line,
266                    &self.display_name,
267                    self.colors,
268                    self.verbosity.is_verbose(),
269                );
270                Self::optional_output(output)
271            }
272        }
273    }
274
275    /// Format a Result event for display.
276    ///
277    /// Result events are synthetic control events that are written to the log file
278    /// by `process_event_line`. In debug mode, this method also formats them for
279    /// console output to help with troubleshooting.
280    fn format_result_event(&self, result: Option<String>) -> Option<String> {
281        if !self.verbosity.is_debug() {
282            return None;
283        }
284        result.map(|content| {
285            let limit = self.verbosity.truncate_limit("result");
286            let preview = crate::common::truncate_text(&content, limit);
287            format!(
288                "{}[{}]{} {}Result:{} {}{}{}\n",
289                self.colors.dim(),
290                self.display_name,
291                self.colors.reset(),
292                self.colors.green(),
293                self.colors.reset(),
294                self.colors.dim(),
295                preview,
296                self.colors.reset()
297            )
298        })
299    }
300
301    /// Check if a Codex event is a control event (state management with no user output)
302    ///
303    /// Control events are valid JSON that represent state transitions rather than
304    /// user-facing content. They should be tracked separately from "ignored" events
305    /// to avoid false health warnings.
306    fn is_control_event(event: &CodexEvent) -> bool {
307        match event {
308            // Turn lifecycle events are control events
309            CodexEvent::ThreadStarted { .. }
310            | CodexEvent::TurnStarted { .. }
311            | CodexEvent::TurnCompleted { .. }
312            | CodexEvent::TurnFailed { .. }
313            | CodexEvent::Result { .. } => true,
314            // Item started/completed events are control events for certain item types
315            CodexEvent::ItemStarted { item } => {
316                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
317            }
318            CodexEvent::ItemCompleted { item } => {
319                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
320            }
321            _ => false,
322        }
323    }
324
325    /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
326    ///
327    /// Partial events represent streaming content deltas (agent messages, reasoning)
328    /// that are shown to the user in real-time. These should be tracked separately
329    /// to avoid inflating "ignored" percentages.
330    fn is_partial_event(event: &CodexEvent) -> bool {
331        match event {
332            // Item started events for agent_message and reasoning produce streaming content
333            CodexEvent::ItemStarted { item: Some(item) } => matches!(
334                item.item_type.as_deref(),
335                Some("agent_message" | "reasoning")
336            ),
337            _ => false,
338        }
339    }
340
341    /// Write a synthetic result event to the log file with accumulated content.
342    ///
343    /// This is called when a `TurnCompleted` event is encountered to ensure
344    /// that the extraction process can find the aggregated content.
345    ///
346    /// # Persistence Guarantees
347    ///
348    /// This function flushes the `BufWriter` after writing and calls `sync_all()`
349    /// to ensure data is committed to physical storage. Errors are propagated
350    /// to ensure the result event is actually persisted before continuing.
351    fn write_synthetic_result_event(
352        file: &mut impl std::io::Write,
353        accumulated: &str,
354    ) -> io::Result<()> {
355        let result_event = CodexEvent::Result {
356            result: Some(accumulated.to_string()),
357        };
358        let json = serde_json::to_string(&result_event)?;
359        writeln!(file, "{json}")?;
360        file.flush()?;
361        Ok(())
362    }
363
364    /// Process a single JSON event line during parsing.
365    ///
366    /// This helper method handles the common logic for processing parsed JSON events,
367    /// including debug output, event parsing, health monitoring, and log writing.
368    /// It's used both for events from the streaming parser and for any remaining
369    /// buffered data at the end of the stream.
370    ///
371    /// # Arguments
372    ///
373    /// * `line` - The JSON line to process
374    /// * `monitor` - The health monitor to record parsing metrics (mut needed for `record_*` methods)
375    /// * `log_writer` - Optional log file writer
376    ///
377    /// # Returns
378    ///
379    /// `Ok(true)` if the line was successfully processed, `Ok(false)` if the line
380    /// was empty or skipped, or `Err` if an IO error occurred.
381    fn process_event_line(
382        &self,
383        line: &str,
384        monitor: &HealthMonitor,
385        log_writer: &mut Option<std::io::BufWriter<std::fs::File>>,
386    ) -> io::Result<bool> {
387        let trimmed = line.trim();
388        if trimmed.is_empty() {
389            return Ok(false);
390        }
391
392        if self.verbosity.is_debug() {
393            let mut printer = self.printer.borrow_mut();
394            writeln!(
395                printer,
396                "{}[DEBUG]{} {}{}{}",
397                self.colors.dim(),
398                self.colors.reset(),
399                self.colors.dim(),
400                line,
401                self.colors.reset()
402            )?;
403            printer.flush()?;
404        }
405
406        // Parse the event once for both display/logic and synthetic result writing
407        let parsed_event = if trimmed.starts_with('{') {
408            serde_json::from_str::<CodexEvent>(trimmed).ok()
409        } else {
410            None
411        };
412
413        // Check if this is a turn.completed event using the parsed event
414        let is_turn_completed = parsed_event
415            .as_ref()
416            .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
417
418        match self.parse_event(line) {
419            Some(output) => {
420                if let Some(event) = &parsed_event {
421                    if Self::is_partial_event(event) {
422                        monitor.record_partial_event();
423                    } else {
424                        monitor.record_parsed();
425                    }
426                } else {
427                    monitor.record_parsed();
428                }
429                let mut printer = self.printer.borrow_mut();
430                write!(printer, "{output}")?;
431                printer.flush()?;
432            }
433            None => {
434                if let Some(event) = &parsed_event {
435                    if Self::is_control_event(event) {
436                        monitor.record_control_event();
437                    } else {
438                        monitor.record_unknown_event();
439                    }
440                } else {
441                    monitor.record_ignored();
442                }
443            }
444        }
445
446        if let Some(ref mut file) = log_writer {
447            writeln!(file, "{line}")?;
448            // Write synthetic result event on turn.completed to ensure content is captured
449            // This handles the normal case where the stream completes properly
450            if is_turn_completed {
451                if let Some(accumulated) = self
452                    .streaming_session
453                    .borrow()
454                    .get_accumulated(super::types::ContentType::Text, "agent_msg")
455                {
456                    // Propagate the error to ensure the result event is written
457                    Self::write_synthetic_result_event(file, accumulated)?;
458                    // Sync to disk to ensure result event is persisted immediately
459                    file.get_mut().sync_all()?;
460                }
461            }
462        }
463
464        Ok(true)
465    }
466
467    /// Parse a stream of Codex NDJSON events
468    pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
469        use super::incremental_parser::IncrementalNdjsonParser;
470
471        let monitor = HealthMonitor::new("Codex");
472        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
473            std::fs::OpenOptions::new()
474                .create(true)
475                .append(true)
476                .open(log_path)
477                .ok()
478                .map(std::io::BufWriter::new)
479        });
480
481        let mut incremental_parser = IncrementalNdjsonParser::new();
482        let mut byte_buffer = Vec::new();
483        // Track whether we've written a synthetic result event for the current turn
484        let mut result_written_for_current_turn = false;
485
486        loop {
487            byte_buffer.clear();
488            let chunk = reader.fill_buf()?;
489            if chunk.is_empty() {
490                break;
491            }
492            let consumed = chunk.len();
493            byte_buffer.extend_from_slice(chunk);
494            reader.consume(consumed);
495
496            for line in incremental_parser.feed(&byte_buffer) {
497                // Check if this is a turn.completed or turn.started event before processing
498                let is_turn_completed = line.trim().starts_with('{')
499                    && serde_json::from_str::<CodexEvent>(line.trim())
500                        .ok()
501                        .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
502                let is_turn_started = line.trim().starts_with('{')
503                    && serde_json::from_str::<CodexEvent>(line.trim())
504                        .ok()
505                        .is_some_and(|e| matches!(e, CodexEvent::TurnStarted { .. }));
506
507                self.process_event_line(&line, &monitor, &mut log_writer)?;
508
509                // Track result event writes - reset flag when new turn starts
510                if is_turn_started {
511                    result_written_for_current_turn = false;
512                } else if is_turn_completed {
513                    result_written_for_current_turn = true;
514                }
515            }
516        }
517
518        // Handle any remaining buffered data when the stream ends.
519        // Only process if it's valid JSON - incomplete buffered data should be skipped.
520        if let Some(remaining) = incremental_parser.finish() {
521            // Only process if it's valid JSON to avoid processing incomplete buffered data
522            if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(&remaining).is_ok()
523            {
524                self.process_event_line(&remaining, &monitor, &mut log_writer)?;
525            }
526        }
527
528        // Ensure accumulated content is written even if turn.completed was not received
529        // This handles the case where the stream ends unexpectedly
530        if let Some(ref mut file) = log_writer {
531            if !result_written_for_current_turn {
532                if let Some(accumulated) = self
533                    .streaming_session
534                    .borrow()
535                    .get_accumulated(super::types::ContentType::Text, "agent_msg")
536                {
537                    // Write the synthetic result event for any accumulated content
538                    Self::write_synthetic_result_event(file, accumulated)?;
539                }
540            }
541            file.flush()?;
542        }
543        // Ensure data is written to disk before continuing
544        // This prevents race conditions where extraction runs before OS commits writes
545        if let Some(ref mut file) = log_writer {
546            // Propagate sync_all errors to ensure all data is committed to disk
547            file.get_mut().sync_all()?;
548        }
549        if let Some(warning) = monitor.check_and_warn(self.colors) {
550            let mut printer = self.printer.borrow_mut();
551            writeln!(printer, "{warning}")?;
552        }
553        Ok(())
554    }
555}