ralph_workflow/json_parser/codex/
mod.rs

1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//!    creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r          (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r  (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n     (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use std::cell::RefCell;
39use std::io::{self, BufRead, Write};
40use std::rc::Rc;
41
42use super::health::HealthMonitor;
43#[cfg(feature = "test-utils")]
44use super::health::StreamingQualityMetrics;
45use super::printer::SharedPrinter;
46use super::streaming_state::StreamingSession;
47use super::terminal::TerminalMode;
48use super::types::{format_unknown_json_event, CodexEvent};
49
50use event_handlers::{
51    handle_error, handle_item_completed, handle_item_started, handle_thread_started,
52    handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
53};
54
55/// Codex event parser
56pub struct CodexParser {
57    colors: Colors,
58    verbosity: Verbosity,
59    log_file: Option<String>,
60    display_name: String,
61    /// Unified streaming session for state tracking
62    streaming_session: Rc<RefCell<StreamingSession>>,
63    /// Delta accumulator for reasoning content (which uses special display)
64    /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
65    reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
66    /// Turn counter for generating synthetic turn IDs
67    turn_counter: Rc<RefCell<u64>>,
68    /// Terminal mode for output formatting
69    terminal_mode: RefCell<TerminalMode>,
70    /// Whether to show streaming quality metrics
71    show_streaming_metrics: bool,
72    /// Output printer for capturing or displaying output
73    printer: SharedPrinter,
74}
75
76impl CodexParser {
77    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
78        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
79    }
80
81    /// Create a new `CodexParser` with a custom printer.
82    ///
83    /// # Arguments
84    ///
85    /// * `colors` - Colors for terminal output
86    /// * `verbosity` - Verbosity level for output
87    /// * `printer` - Shared printer for output
88    ///
89    /// # Returns
90    ///
91    /// A new `CodexParser` instance
92    pub(crate) fn with_printer(
93        colors: Colors,
94        verbosity: Verbosity,
95        printer: SharedPrinter,
96    ) -> Self {
97        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
98        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
99
100        // Use the printer's is_terminal method to validate it's connected correctly
101        let _printer_is_terminal = printer.borrow().is_terminal();
102
103        Self {
104            colors,
105            verbosity,
106            log_file: None,
107            display_name: "Codex".to_string(),
108            streaming_session: Rc::new(RefCell::new(streaming_session)),
109            reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
110            turn_counter: Rc::new(RefCell::new(0)),
111            terminal_mode: RefCell::new(TerminalMode::detect()),
112            show_streaming_metrics: false,
113            printer,
114        }
115    }
116
117    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
118        self.show_streaming_metrics = show;
119        self
120    }
121
122    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
123        self.display_name = display_name.to_string();
124        self
125    }
126
127    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
128        self.log_file = Some(path.to_string());
129        self
130    }
131
132    #[cfg(any(test, feature = "test-utils"))]
133    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
134        *self.terminal_mode.borrow_mut() = mode;
135        self
136    }
137
138    // ===== Test utilities (available with test-utils feature) =====
139
140    /// Create a new parser with a custom printer (for testing).
141    ///
142    /// This method is public when the `test-utils` feature is enabled,
143    /// allowing integration tests to create parsers with custom printers.
144    #[cfg(feature = "test-utils")]
145    pub fn with_printer_for_test(
146        colors: Colors,
147        verbosity: Verbosity,
148        printer: SharedPrinter,
149    ) -> Self {
150        Self::with_printer(colors, verbosity, printer)
151    }
152
153    /// Set the log file path (for testing).
154    ///
155    /// This method is public when the `test-utils` feature is enabled,
156    /// allowing integration tests to configure log file output.
157    #[cfg(feature = "test-utils")]
158    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
159        self.log_file = Some(path.to_string());
160        self
161    }
162
163    /// Parse a stream of JSON events (for testing).
164    ///
165    /// This method is public when the `test-utils` feature is enabled,
166    /// allowing integration tests to invoke parsing.
167    #[cfg(feature = "test-utils")]
168    pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
169        self.parse_stream(reader)
170    }
171
172    /// Get a shared reference to the printer.
173    ///
174    /// This allows tests, monitoring, and other code to access the printer after parsing
175    /// to verify output content, check for duplicates, or capture output for analysis.
176    /// Only available with the `test-utils` feature.
177    ///
178    /// # Returns
179    ///
180    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181    #[cfg(feature = "test-utils")]
182    pub fn printer(&self) -> SharedPrinter {
183        Rc::clone(&self.printer)
184    }
185
186    /// Get streaming quality metrics from the current session.
187    ///
188    /// This provides insight into the deduplication and streaming quality of the
189    /// parsing session. Only available with the `test-utils` feature.
190    ///
191    /// # Returns
192    ///
193    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
194    #[cfg(feature = "test-utils")]
195    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
196        self.streaming_session
197            .borrow()
198            .get_streaming_quality_metrics()
199    }
200
201    /// Convert output string to Option, returning None if empty.
202    #[inline]
203    fn optional_output(output: String) -> Option<String> {
204        if output.is_empty() {
205            None
206        } else {
207            Some(output)
208        }
209    }
210
211    /// Parse and display a single Codex JSON event
212    ///
213    /// Returns `Some(formatted_output)` for valid events, or None for:
214    /// - Malformed JSON (non-JSON text passed through if meaningful)
215    /// - Unknown event types
216    /// - Empty or whitespace-only output
217    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
218        let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
219            e
220        } else {
221            // Non-JSON line - pass through as-is if meaningful
222            let trimmed = line.trim();
223            if !trimmed.is_empty() && !trimmed.starts_with('{') {
224                return Some(format!("{trimmed}\n"));
225            }
226            return None;
227        };
228
229        let ctx = EventHandlerContext {
230            colors: &self.colors,
231            verbosity: self.verbosity,
232            display_name: &self.display_name,
233            streaming_session: &self.streaming_session,
234            reasoning_accumulator: &self.reasoning_accumulator,
235            terminal_mode: *self.terminal_mode.borrow(),
236            show_streaming_metrics: self.show_streaming_metrics,
237        };
238
239        match event {
240            CodexEvent::ThreadStarted { thread_id } => {
241                Self::optional_output(handle_thread_started(&ctx, thread_id))
242            }
243            CodexEvent::TurnStarted {} => {
244                // Generate and set synthetic turn ID for duplicate detection
245                let turn_id = {
246                    let mut counter = self.turn_counter.borrow_mut();
247                    let id = format!("turn-{}", *counter);
248                    *counter += 1;
249                    id
250                };
251                Self::optional_output(handle_turn_started(&ctx, turn_id))
252            }
253            CodexEvent::TurnCompleted { usage } => {
254                Self::optional_output(handle_turn_completed(&ctx, usage))
255            }
256            CodexEvent::TurnFailed { error } => {
257                Self::optional_output(handle_turn_failed(&ctx, error))
258            }
259            CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
260            CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
261            CodexEvent::Error { message, error } => {
262                Self::optional_output(handle_error(&ctx, message, error))
263            }
264            CodexEvent::Result { result } => self.format_result_event(result),
265            CodexEvent::Unknown => {
266                let output = format_unknown_json_event(
267                    line,
268                    &self.display_name,
269                    self.colors,
270                    self.verbosity.is_verbose(),
271                );
272                Self::optional_output(output)
273            }
274        }
275    }
276
277    /// Format a Result event for display.
278    ///
279    /// Result events are synthetic control events that are written to the log file
280    /// by `process_event_line`. In debug mode, this method also formats them for
281    /// console output to help with troubleshooting.
282    fn format_result_event(&self, result: Option<String>) -> Option<String> {
283        if !self.verbosity.is_debug() {
284            return None;
285        }
286        result.map(|content| {
287            let limit = self.verbosity.truncate_limit("result");
288            let preview = crate::common::truncate_text(&content, limit);
289            format!(
290                "{}[{}]{} {}Result:{} {}{}{}\n",
291                self.colors.dim(),
292                self.display_name,
293                self.colors.reset(),
294                self.colors.green(),
295                self.colors.reset(),
296                self.colors.dim(),
297                preview,
298                self.colors.reset()
299            )
300        })
301    }
302
303    /// Check if a Codex event is a control event (state management with no user output)
304    ///
305    /// Control events are valid JSON that represent state transitions rather than
306    /// user-facing content. They should be tracked separately from "ignored" events
307    /// to avoid false health warnings.
308    fn is_control_event(event: &CodexEvent) -> bool {
309        match event {
310            // Turn lifecycle events are control events
311            CodexEvent::ThreadStarted { .. }
312            | CodexEvent::TurnStarted { .. }
313            | CodexEvent::TurnCompleted { .. }
314            | CodexEvent::TurnFailed { .. }
315            | CodexEvent::Result { .. } => true,
316            // Item started/completed events are control events for certain item types
317            CodexEvent::ItemStarted { item } => {
318                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
319            }
320            CodexEvent::ItemCompleted { item } => {
321                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
322            }
323            _ => false,
324        }
325    }
326
327    /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
328    ///
329    /// Partial events represent streaming content deltas (agent messages, reasoning)
330    /// that are shown to the user in real-time. These should be tracked separately
331    /// to avoid inflating "ignored" percentages.
332    fn is_partial_event(event: &CodexEvent) -> bool {
333        match event {
334            // Item started events for agent_message and reasoning produce streaming content
335            CodexEvent::ItemStarted { item: Some(item) } => matches!(
336                item.item_type.as_deref(),
337                Some("agent_message" | "reasoning")
338            ),
339            _ => false,
340        }
341    }
342
343    /// Write a synthetic result event to the log file with accumulated content.
344    ///
345    /// This is called when a `TurnCompleted` event is encountered to ensure
346    /// that the extraction process can find the aggregated content.
347    ///
348    /// # Persistence Guarantees
349    ///
350    /// This function flushes the `BufWriter` after writing and calls `sync_all()`
351    /// to ensure data is committed to physical storage. Errors are propagated
352    /// to ensure the result event is actually persisted before continuing.
353    fn write_synthetic_result_event(
354        file: &mut impl std::io::Write,
355        accumulated: &str,
356    ) -> io::Result<()> {
357        let result_event = CodexEvent::Result {
358            result: Some(accumulated.to_string()),
359        };
360        let json = serde_json::to_string(&result_event)?;
361        writeln!(file, "{json}")?;
362        file.flush()?;
363        Ok(())
364    }
365
366    /// Process a single JSON event line during parsing.
367    ///
368    /// This helper method handles the common logic for processing parsed JSON events,
369    /// including debug output, event parsing, health monitoring, and log writing.
370    /// It's used both for events from the streaming parser and for any remaining
371    /// buffered data at the end of the stream.
372    ///
373    /// # Arguments
374    ///
375    /// * `line` - The JSON line to process
376    /// * `monitor` - The health monitor to record parsing metrics (mut needed for `record_*` methods)
377    /// * `log_writer` - Optional log file writer
378    ///
379    /// # Returns
380    ///
381    /// `Ok(true)` if the line was successfully processed, `Ok(false)` if the line
382    /// was empty or skipped, or `Err` if an IO error occurred.
383    fn process_event_line(
384        &self,
385        line: &str,
386        monitor: &HealthMonitor,
387        log_writer: &mut Option<std::io::BufWriter<std::fs::File>>,
388    ) -> io::Result<bool> {
389        let trimmed = line.trim();
390        if trimmed.is_empty() {
391            return Ok(false);
392        }
393
394        if self.verbosity.is_debug() {
395            let mut printer = self.printer.borrow_mut();
396            writeln!(
397                printer,
398                "{}[DEBUG]{} {}{}{}",
399                self.colors.dim(),
400                self.colors.reset(),
401                self.colors.dim(),
402                line,
403                self.colors.reset()
404            )?;
405            printer.flush()?;
406        }
407
408        // Parse the event once for both display/logic and synthetic result writing
409        let parsed_event = if trimmed.starts_with('{') {
410            serde_json::from_str::<CodexEvent>(trimmed).ok()
411        } else {
412            None
413        };
414
415        // Check if this is a turn.completed event using the parsed event
416        let is_turn_completed = parsed_event
417            .as_ref()
418            .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
419
420        match self.parse_event(line) {
421            Some(output) => {
422                if let Some(event) = &parsed_event {
423                    if Self::is_partial_event(event) {
424                        monitor.record_partial_event();
425                    } else {
426                        monitor.record_parsed();
427                    }
428                } else {
429                    monitor.record_parsed();
430                }
431                let mut printer = self.printer.borrow_mut();
432                write!(printer, "{output}")?;
433                printer.flush()?;
434            }
435            None => {
436                if let Some(event) = &parsed_event {
437                    if Self::is_control_event(event) {
438                        monitor.record_control_event();
439                    } else {
440                        monitor.record_unknown_event();
441                    }
442                } else {
443                    monitor.record_ignored();
444                }
445            }
446        }
447
448        if let Some(ref mut file) = log_writer {
449            writeln!(file, "{line}")?;
450            // Write synthetic result event on turn.completed to ensure content is captured
451            // This handles the normal case where the stream completes properly
452            if is_turn_completed {
453                if let Some(accumulated) = self
454                    .streaming_session
455                    .borrow()
456                    .get_accumulated(super::types::ContentType::Text, "agent_msg")
457                {
458                    // Propagate the error to ensure the result event is written
459                    Self::write_synthetic_result_event(file, accumulated)?;
460                    // Sync to disk to ensure result event is persisted immediately
461                    file.get_mut().sync_all()?;
462                }
463            }
464        }
465
466        Ok(true)
467    }
468
469    /// Parse a stream of Codex NDJSON events
470    pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
471        use super::incremental_parser::IncrementalNdjsonParser;
472
473        let monitor = HealthMonitor::new("Codex");
474        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
475            std::fs::OpenOptions::new()
476                .create(true)
477                .append(true)
478                .open(log_path)
479                .ok()
480                .map(std::io::BufWriter::new)
481        });
482
483        let mut incremental_parser = IncrementalNdjsonParser::new();
484        let mut byte_buffer = Vec::new();
485        // Track whether we've written a synthetic result event for the current turn
486        let mut result_written_for_current_turn = false;
487
488        loop {
489            byte_buffer.clear();
490            let chunk = reader.fill_buf()?;
491            if chunk.is_empty() {
492                break;
493            }
494            let consumed = chunk.len();
495            byte_buffer.extend_from_slice(chunk);
496            reader.consume(consumed);
497
498            for line in incremental_parser.feed(&byte_buffer) {
499                // Check if this is a turn.completed or turn.started event before processing
500                let is_turn_completed = line.trim().starts_with('{')
501                    && serde_json::from_str::<CodexEvent>(line.trim())
502                        .ok()
503                        .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
504                let is_turn_started = line.trim().starts_with('{')
505                    && serde_json::from_str::<CodexEvent>(line.trim())
506                        .ok()
507                        .is_some_and(|e| matches!(e, CodexEvent::TurnStarted { .. }));
508
509                self.process_event_line(&line, &monitor, &mut log_writer)?;
510
511                // Track result event writes - reset flag when new turn starts
512                if is_turn_started {
513                    result_written_for_current_turn = false;
514                } else if is_turn_completed {
515                    result_written_for_current_turn = true;
516                }
517            }
518        }
519
520        // Handle any remaining buffered data when the stream ends.
521        // Only process if it's valid JSON - incomplete buffered data should be skipped.
522        if let Some(remaining) = incremental_parser.finish() {
523            // Only process if it's valid JSON to avoid processing incomplete buffered data
524            if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(&remaining).is_ok()
525            {
526                self.process_event_line(&remaining, &monitor, &mut log_writer)?;
527            }
528        }
529
530        // Ensure accumulated content is written even if turn.completed was not received
531        // This handles the case where the stream ends unexpectedly
532        if let Some(ref mut file) = log_writer {
533            if !result_written_for_current_turn {
534                if let Some(accumulated) = self
535                    .streaming_session
536                    .borrow()
537                    .get_accumulated(super::types::ContentType::Text, "agent_msg")
538                {
539                    // Write the synthetic result event for any accumulated content
540                    Self::write_synthetic_result_event(file, accumulated)?;
541                }
542            }
543            file.flush()?;
544        }
545        // Ensure data is written to disk before continuing
546        // This prevents race conditions where extraction runs before OS commits writes
547        if let Some(ref mut file) = log_writer {
548            // Propagate sync_all errors to ensure all data is committed to disk
549            file.get_mut().sync_all()?;
550        }
551        if let Some(warning) = monitor.check_and_warn(self.colors) {
552            let mut printer = self.printer.borrow_mut();
553            writeln!(printer, "{warning}")?;
554        }
555        Ok(())
556    }
557}