Skip to main content

ralph_workflow/json_parser/codex/
mod.rs

1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//!    creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r          (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r  (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n     (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use crate::workspace::Workspace;
39use std::cell::RefCell;
40use std::io::{self, BufRead, Write};
41use std::path::PathBuf;
42use std::rc::Rc;
43
44use super::health::HealthMonitor;
45#[cfg(feature = "test-utils")]
46use super::health::StreamingQualityMetrics;
47use super::printer::SharedPrinter;
48use super::streaming_state::StreamingSession;
49use super::terminal::TerminalMode;
50use super::types::{format_unknown_json_event, CodexEvent};
51
52use event_handlers::{
53    handle_error, handle_item_completed, handle_item_started, handle_thread_started,
54    handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
55};
56
57/// Codex event parser
58pub struct CodexParser {
59    colors: Colors,
60    verbosity: Verbosity,
61    /// Relative path to log file (if logging enabled)
62    log_path: Option<PathBuf>,
63    display_name: String,
64    /// Unified streaming session for state tracking
65    streaming_session: Rc<RefCell<StreamingSession>>,
66    /// Delta accumulator for reasoning content (which uses special display)
67    /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
68    reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
69    /// Turn counter for generating synthetic turn IDs
70    turn_counter: Rc<RefCell<u64>>,
71    /// Terminal mode for output formatting
72    terminal_mode: RefCell<TerminalMode>,
73    /// Whether to show streaming quality metrics
74    show_streaming_metrics: bool,
75    /// Output printer for capturing or displaying output
76    printer: SharedPrinter,
77}
78
79impl CodexParser {
80    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
81        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
82    }
83
84    /// Create a new `CodexParser` with a custom printer.
85    ///
86    /// # Arguments
87    ///
88    /// * `colors` - Colors for terminal output
89    /// * `verbosity` - Verbosity level for output
90    /// * `printer` - Shared printer for output
91    ///
92    /// # Returns
93    ///
94    /// A new `CodexParser` instance
95    pub(crate) fn with_printer(
96        colors: Colors,
97        verbosity: Verbosity,
98        printer: SharedPrinter,
99    ) -> Self {
100        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
101        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
102
103        // Use the printer's is_terminal method to validate it's connected correctly
104        let _printer_is_terminal = printer.borrow().is_terminal();
105
106        Self {
107            colors,
108            verbosity,
109            log_path: None,
110            display_name: "Codex".to_string(),
111            streaming_session: Rc::new(RefCell::new(streaming_session)),
112            reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
113            turn_counter: Rc::new(RefCell::new(0)),
114            terminal_mode: RefCell::new(TerminalMode::detect()),
115            show_streaming_metrics: false,
116            printer,
117        }
118    }
119
120    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
121        self.show_streaming_metrics = show;
122        self
123    }
124
125    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
126        self.display_name = display_name.to_string();
127        self
128    }
129
130    /// Configure log file path.
131    ///
132    /// The workspace is passed to `parse_stream` separately.
133    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
134        self.log_path = Some(PathBuf::from(path));
135        self
136    }
137
138    #[cfg(any(test, feature = "test-utils"))]
139    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
140        *self.terminal_mode.borrow_mut() = mode;
141        self
142    }
143
144    // ===== Test utilities (available with test-utils feature) =====
145
146    /// Create a new parser with a custom printer (for testing).
147    ///
148    /// This method is public when the `test-utils` feature is enabled,
149    /// allowing integration tests to create parsers with custom printers.
150    #[cfg(feature = "test-utils")]
151    pub fn with_printer_for_test(
152        colors: Colors,
153        verbosity: Verbosity,
154        printer: SharedPrinter,
155    ) -> Self {
156        Self::with_printer(colors, verbosity, printer)
157    }
158
159    /// Set the log file path (for testing).
160    ///
161    /// This method is public when the `test-utils` feature is enabled,
162    /// allowing integration tests to configure log file path.
163    #[cfg(feature = "test-utils")]
164    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
165        self.log_path = Some(PathBuf::from(path));
166        self
167    }
168
169    /// Parse a stream of JSON events (for testing).
170    ///
171    /// This method is public when the `test-utils` feature is enabled,
172    /// allowing integration tests to invoke parsing.
173    #[cfg(feature = "test-utils")]
174    pub fn parse_stream_for_test<R: std::io::BufRead>(
175        &self,
176        reader: R,
177        workspace: &dyn Workspace,
178    ) -> std::io::Result<()> {
179        self.parse_stream(reader, workspace)
180    }
181
182    /// Get a shared reference to the printer.
183    ///
184    /// This allows tests, monitoring, and other code to access the printer after parsing
185    /// to verify output content, check for duplicates, or capture output for analysis.
186    /// Only available with the `test-utils` feature.
187    ///
188    /// # Returns
189    ///
190    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
191    #[cfg(feature = "test-utils")]
192    pub fn printer(&self) -> SharedPrinter {
193        Rc::clone(&self.printer)
194    }
195
196    /// Get streaming quality metrics from the current session.
197    ///
198    /// This provides insight into the deduplication and streaming quality of the
199    /// parsing session. Only available with the `test-utils` feature.
200    ///
201    /// # Returns
202    ///
203    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
204    #[cfg(feature = "test-utils")]
205    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
206        self.streaming_session
207            .borrow()
208            .get_streaming_quality_metrics()
209    }
210
211    /// Convert output string to Option, returning None if empty.
212    #[inline]
213    fn optional_output(output: String) -> Option<String> {
214        if output.is_empty() {
215            None
216        } else {
217            Some(output)
218        }
219    }
220
221    /// Parse and display a single Codex JSON event
222    ///
223    /// Returns `Some(formatted_output)` for valid events, or None for:
224    /// - Malformed JSON (non-JSON text passed through if meaningful)
225    /// - Unknown event types
226    /// - Empty or whitespace-only output
227    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
228        let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
229            e
230        } else {
231            // Non-JSON line - pass through as-is if meaningful
232            let trimmed = line.trim();
233            if !trimmed.is_empty() && !trimmed.starts_with('{') {
234                return Some(format!("{trimmed}\n"));
235            }
236            return None;
237        };
238
239        let ctx = EventHandlerContext {
240            colors: &self.colors,
241            verbosity: self.verbosity,
242            display_name: &self.display_name,
243            streaming_session: &self.streaming_session,
244            reasoning_accumulator: &self.reasoning_accumulator,
245            terminal_mode: *self.terminal_mode.borrow(),
246            show_streaming_metrics: self.show_streaming_metrics,
247        };
248
249        match event {
250            CodexEvent::ThreadStarted { thread_id } => {
251                Self::optional_output(handle_thread_started(&ctx, thread_id))
252            }
253            CodexEvent::TurnStarted {} => {
254                // Generate and set synthetic turn ID for duplicate detection
255                let turn_id = {
256                    let mut counter = self.turn_counter.borrow_mut();
257                    let id = format!("turn-{}", *counter);
258                    *counter += 1;
259                    id
260                };
261                Self::optional_output(handle_turn_started(&ctx, turn_id))
262            }
263            CodexEvent::TurnCompleted { usage } => {
264                Self::optional_output(handle_turn_completed(&ctx, usage))
265            }
266            CodexEvent::TurnFailed { error } => {
267                Self::optional_output(handle_turn_failed(&ctx, error))
268            }
269            CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
270            CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
271            CodexEvent::Error { message, error } => {
272                Self::optional_output(handle_error(&ctx, message, error))
273            }
274            CodexEvent::Result { result } => self.format_result_event(result),
275            CodexEvent::Unknown => {
276                let output = format_unknown_json_event(
277                    line,
278                    &self.display_name,
279                    self.colors,
280                    self.verbosity.is_verbose(),
281                );
282                Self::optional_output(output)
283            }
284        }
285    }
286
287    /// Format a Result event for display.
288    ///
289    /// Result events are synthetic control events that are written to the log file
290    /// by `process_event_line`. In debug mode, this method also formats them for
291    /// console output to help with troubleshooting.
292    fn format_result_event(&self, result: Option<String>) -> Option<String> {
293        if !self.verbosity.is_debug() {
294            return None;
295        }
296        result.map(|content| {
297            let limit = self.verbosity.truncate_limit("result");
298            let preview = crate::common::truncate_text(&content, limit);
299            format!(
300                "{}[{}]{} {}Result:{} {}{}{}\n",
301                self.colors.dim(),
302                self.display_name,
303                self.colors.reset(),
304                self.colors.green(),
305                self.colors.reset(),
306                self.colors.dim(),
307                preview,
308                self.colors.reset()
309            )
310        })
311    }
312
313    /// Check if a Codex event is a control event (state management with no user output)
314    ///
315    /// Control events are valid JSON that represent state transitions rather than
316    /// user-facing content. They should be tracked separately from "ignored" events
317    /// to avoid false health warnings.
318    fn is_control_event(event: &CodexEvent) -> bool {
319        match event {
320            // Turn lifecycle events are control events
321            CodexEvent::ThreadStarted { .. }
322            | CodexEvent::TurnStarted { .. }
323            | CodexEvent::TurnCompleted { .. }
324            | CodexEvent::TurnFailed { .. }
325            | CodexEvent::Result { .. } => true,
326            // Item started/completed events are control events for certain item types
327            CodexEvent::ItemStarted { item } => {
328                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
329            }
330            CodexEvent::ItemCompleted { item } => {
331                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
332            }
333            _ => false,
334        }
335    }
336
337    /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
338    ///
339    /// Partial events represent streaming content deltas (agent messages, reasoning)
340    /// that are shown to the user in real-time. These should be tracked separately
341    /// to avoid inflating "ignored" percentages.
342    fn is_partial_event(event: &CodexEvent) -> bool {
343        match event {
344            // Item started events for agent_message and reasoning produce streaming content
345            CodexEvent::ItemStarted { item: Some(item) } => matches!(
346                item.item_type.as_deref(),
347                Some("agent_message" | "reasoning")
348            ),
349            _ => false,
350        }
351    }
352
353    /// Write a synthetic result event to the log file with accumulated content.
354    ///
355    /// This is called when a `TurnCompleted` event is encountered to ensure
356    /// that the extraction process can find the aggregated content.
357    ///
358    /// # Persistence Guarantees
359    ///
360    /// This function flushes the writer after writing. Errors are propagated
361    /// to ensure the result event is actually persisted before continuing.
362    fn write_synthetic_result_event(
363        file: &mut impl std::io::Write,
364        accumulated: &str,
365    ) -> io::Result<()> {
366        let result_event = CodexEvent::Result {
367            result: Some(accumulated.to_string()),
368        };
369        let json = serde_json::to_string(&result_event)?;
370        writeln!(file, "{json}")?;
371        file.flush()?;
372        Ok(())
373    }
374
375    /// Write a synthetic result event to a byte buffer.
376    fn write_synthetic_result_to_buffer(buffer: &mut Vec<u8>, accumulated: &str) -> io::Result<()> {
377        Self::write_synthetic_result_event(buffer, accumulated)
378    }
379
380    /// Process a single JSON event line during parsing.
381    ///
382    /// This helper method handles the common logic for processing parsed JSON events,
383    /// including debug output, event parsing, health monitoring, and log writing.
384    /// It's used both for events from the streaming parser and for any remaining
385    /// buffered data at the end of the stream.
386    ///
387    /// # Arguments
388    ///
389    /// * `line` - The JSON line to process
390    /// * `monitor` - The health monitor to record parsing metrics (mut needed for `record_*` methods)
391    /// * `log_writer` - Optional log file writer
392    ///
393    /// # Returns
394    ///
395    /// `Ok(true)` if the line was successfully processed, `Ok(false)` if the line
396    /// was empty or skipped, or `Err` if an IO error occurred.
397    fn process_event_line_with_buffer(
398        &self,
399        line: &str,
400        monitor: &HealthMonitor,
401        logging_enabled: bool,
402        log_buffer: &mut Vec<u8>,
403    ) -> io::Result<bool> {
404        let trimmed = line.trim();
405        if trimmed.is_empty() {
406            return Ok(false);
407        }
408
409        if self.verbosity.is_debug() {
410            let mut printer = self.printer.borrow_mut();
411            writeln!(
412                printer,
413                "{}[DEBUG]{} {}{}{}",
414                self.colors.dim(),
415                self.colors.reset(),
416                self.colors.dim(),
417                line,
418                self.colors.reset()
419            )?;
420            printer.flush()?;
421        }
422
423        // Parse the event once for both display/logic and synthetic result writing
424        let parsed_event = if trimmed.starts_with('{') {
425            serde_json::from_str::<CodexEvent>(trimmed).ok()
426        } else {
427            None
428        };
429
430        // Check if this is a turn.completed event using the parsed event
431        let is_turn_completed = parsed_event
432            .as_ref()
433            .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
434
435        match self.parse_event(line) {
436            Some(output) => {
437                if let Some(event) = &parsed_event {
438                    if Self::is_partial_event(event) {
439                        monitor.record_partial_event();
440                    } else {
441                        monitor.record_parsed();
442                    }
443                } else {
444                    monitor.record_parsed();
445                }
446                let mut printer = self.printer.borrow_mut();
447                write!(printer, "{output}")?;
448                printer.flush()?;
449            }
450            None => {
451                if let Some(event) = &parsed_event {
452                    if Self::is_control_event(event) {
453                        monitor.record_control_event();
454                    } else {
455                        monitor.record_unknown_event();
456                    }
457                } else {
458                    monitor.record_ignored();
459                }
460            }
461        }
462
463        if logging_enabled {
464            writeln!(log_buffer, "{line}")?;
465            // Write synthetic result event on turn.completed to ensure content is captured
466            // This handles the normal case where the stream completes properly
467            if is_turn_completed {
468                if let Some(accumulated) = self
469                    .streaming_session
470                    .borrow()
471                    .get_accumulated(super::types::ContentType::Text, "agent_msg")
472                {
473                    Self::write_synthetic_result_to_buffer(log_buffer, accumulated)?;
474                }
475            }
476        }
477
478        Ok(true)
479    }
480
481    /// Parse a stream of Codex NDJSON events
482    pub(crate) fn parse_stream<R: BufRead>(
483        &self,
484        mut reader: R,
485        workspace: &dyn Workspace,
486    ) -> io::Result<()> {
487        use super::incremental_parser::IncrementalNdjsonParser;
488
489        let monitor = HealthMonitor::new("Codex");
490        // Accumulate log content in memory, write to workspace at the end
491        let logging_enabled = self.log_path.is_some();
492        let mut log_buffer: Vec<u8> = Vec::new();
493
494        let mut incremental_parser = IncrementalNdjsonParser::new();
495        let mut byte_buffer = Vec::new();
496        // Track whether we've written a synthetic result event for the current turn
497        let mut result_written_for_current_turn = false;
498
499        loop {
500            byte_buffer.clear();
501            let chunk = reader.fill_buf()?;
502            if chunk.is_empty() {
503                break;
504            }
505            let consumed = chunk.len();
506            byte_buffer.extend_from_slice(chunk);
507            reader.consume(consumed);
508
509            for line in incremental_parser.feed(&byte_buffer) {
510                // Check if this is a turn.completed or turn.started event before processing
511                let is_turn_completed = line.trim().starts_with('{')
512                    && serde_json::from_str::<CodexEvent>(line.trim())
513                        .ok()
514                        .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
515                let is_turn_started = line.trim().starts_with('{')
516                    && serde_json::from_str::<CodexEvent>(line.trim())
517                        .ok()
518                        .is_some_and(|e| matches!(e, CodexEvent::TurnStarted { .. }));
519
520                self.process_event_line_with_buffer(
521                    &line,
522                    &monitor,
523                    logging_enabled,
524                    &mut log_buffer,
525                )?;
526
527                // Track result event writes - reset flag when new turn starts
528                if is_turn_started {
529                    result_written_for_current_turn = false;
530                } else if is_turn_completed {
531                    result_written_for_current_turn = true;
532                }
533            }
534        }
535
536        // Handle any remaining buffered data when the stream ends.
537        // Only process if it's valid JSON - incomplete buffered data should be skipped.
538        if let Some(remaining) = incremental_parser.finish() {
539            // Only process if it's valid JSON to avoid processing incomplete buffered data
540            if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(&remaining).is_ok()
541            {
542                self.process_event_line_with_buffer(
543                    &remaining,
544                    &monitor,
545                    logging_enabled,
546                    &mut log_buffer,
547                )?;
548            }
549        }
550
551        // Ensure accumulated content is written even if turn.completed was not received
552        // This handles the case where the stream ends unexpectedly
553        if logging_enabled && !result_written_for_current_turn {
554            if let Some(accumulated) = self
555                .streaming_session
556                .borrow()
557                .get_accumulated(super::types::ContentType::Text, "agent_msg")
558            {
559                // Write the synthetic result event for any accumulated content
560                Self::write_synthetic_result_to_buffer(&mut log_buffer, accumulated)?;
561            }
562        }
563
564        // Write accumulated log content to workspace
565        if let Some(log_path) = &self.log_path {
566            workspace.append_bytes(log_path, &log_buffer)?;
567        }
568
569        if let Some(warning) = monitor.check_and_warn(self.colors) {
570            let mut printer = self.printer.borrow_mut();
571            writeln!(printer, "{warning}")?;
572        }
573        Ok(())
574    }
575}