ralph_workflow/json_parser/codex/
mod.rs

1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//!    creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r          (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r  (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n     (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use std::cell::RefCell;
39use std::io::{self, BufRead, Write};
40use std::rc::Rc;
41
42use super::health::HealthMonitor;
43use super::health::StreamingQualityMetrics;
44use super::printer::SharedPrinter;
45use super::streaming_state::StreamingSession;
46use super::terminal::TerminalMode;
47use super::types::{format_unknown_json_event, CodexEvent};
48
49use event_handlers::{
50    handle_error, handle_item_completed, handle_item_started, handle_thread_started,
51    handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
52};
53
54/// Codex event parser
55pub struct CodexParser {
56    colors: Colors,
57    verbosity: Verbosity,
58    log_file: Option<String>,
59    display_name: String,
60    /// Unified streaming session for state tracking
61    streaming_session: Rc<RefCell<StreamingSession>>,
62    /// Delta accumulator for reasoning content (which uses special display)
63    /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
64    reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
65    /// Turn counter for generating synthetic turn IDs
66    turn_counter: Rc<RefCell<u64>>,
67    /// Terminal mode for output formatting
68    terminal_mode: RefCell<TerminalMode>,
69    /// Whether to show streaming quality metrics
70    show_streaming_metrics: bool,
71    /// Output printer for capturing or displaying output
72    printer: SharedPrinter,
73}
74
75impl CodexParser {
76    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
77        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
78    }
79
80    /// Create a new `CodexParser` with a custom printer.
81    ///
82    /// # Arguments
83    ///
84    /// * `colors` - Colors for terminal output
85    /// * `verbosity` - Verbosity level for output
86    /// * `printer` - Shared printer for output
87    ///
88    /// # Returns
89    ///
90    /// A new `CodexParser` instance
91    pub(crate) fn with_printer(
92        colors: Colors,
93        verbosity: Verbosity,
94        printer: SharedPrinter,
95    ) -> Self {
96        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
97        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
98
99        // Use the printer's is_terminal method to validate it's connected correctly
100        let _printer_is_terminal = printer.borrow().is_terminal();
101
102        Self {
103            colors,
104            verbosity,
105            log_file: None,
106            display_name: "Codex".to_string(),
107            streaming_session: Rc::new(RefCell::new(streaming_session)),
108            reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
109            turn_counter: Rc::new(RefCell::new(0)),
110            terminal_mode: RefCell::new(TerminalMode::detect()),
111            show_streaming_metrics: false,
112            printer,
113        }
114    }
115
116    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
117        self.show_streaming_metrics = show;
118        self
119    }
120
121    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
122        self.display_name = display_name.to_string();
123        self
124    }
125
126    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
127        self.log_file = Some(path.to_string());
128        self
129    }
130
131    #[cfg(test)]
132    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
133        *self.terminal_mode.borrow_mut() = mode;
134        self
135    }
136
137    /// Get a shared reference to the printer.
138    ///
139    /// This allows tests, monitoring, and other code to access the printer after parsing
140    /// to verify output content, check for duplicates, or capture output for analysis.
141    ///
142    /// # Returns
143    ///
144    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
145    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
146    pub fn printer(&self) -> SharedPrinter {
147        Rc::clone(&self.printer)
148    }
149
150    /// Get streaming quality metrics from the current session.
151    ///
152    /// This provides insight into the deduplication and streaming quality of the
153    /// parsing session.
154    ///
155    /// # Returns
156    ///
157    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
158    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
159    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
160        self.streaming_session
161            .borrow()
162            .get_streaming_quality_metrics()
163    }
164
165    /// Parse and display a single Codex JSON event
166    ///
167    /// Returns `Some(formatted_output)` for valid events, or None for:
168    /// - Malformed JSON (non-JSON text passed through if meaningful)
169    /// - Unknown event types
170    /// - Empty or whitespace-only output
171    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
172        let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
173            e
174        } else {
175            // Non-JSON line - pass through as-is if meaningful
176            let trimmed = line.trim();
177            if !trimmed.is_empty() && !trimmed.starts_with('{') {
178                return Some(format!("{trimmed}\n"));
179            }
180            return None;
181        };
182
183        let ctx = EventHandlerContext {
184            colors: &self.colors,
185            verbosity: self.verbosity,
186            display_name: &self.display_name,
187            streaming_session: &self.streaming_session,
188            reasoning_accumulator: &self.reasoning_accumulator,
189            terminal_mode: *self.terminal_mode.borrow(),
190            show_streaming_metrics: self.show_streaming_metrics,
191        };
192
193        match event {
194            CodexEvent::ThreadStarted { thread_id } => {
195                let output = handle_thread_started(&ctx, thread_id);
196                if output.is_empty() {
197                    None
198                } else {
199                    Some(output)
200                }
201            }
202            CodexEvent::TurnStarted {} => {
203                // Generate and set synthetic turn ID for duplicate detection
204                let turn_id = {
205                    let mut counter = self.turn_counter.borrow_mut();
206                    let id = format!("turn-{}", *counter);
207                    *counter += 1;
208                    id
209                };
210                let output = handle_turn_started(&ctx, turn_id);
211                if output.is_empty() {
212                    None
213                } else {
214                    Some(output)
215                }
216            }
217            CodexEvent::TurnCompleted { usage } => {
218                let output = handle_turn_completed(&ctx, usage);
219                if output.is_empty() {
220                    None
221                } else {
222                    Some(output)
223                }
224            }
225            CodexEvent::TurnFailed { error } => {
226                let output = handle_turn_failed(&ctx, error);
227                if output.is_empty() {
228                    None
229                } else {
230                    Some(output)
231                }
232            }
233            CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
234            CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
235            CodexEvent::Error { message, error } => {
236                let output = handle_error(&ctx, message, error);
237                if output.is_empty() {
238                    None
239                } else {
240                    Some(output)
241                }
242            }
243            CodexEvent::Unknown => {
244                // Use the generic unknown event formatter for consistent handling
245                let output = format_unknown_json_event(
246                    line,
247                    &self.display_name,
248                    self.colors,
249                    self.verbosity.is_verbose(),
250                );
251                if output.is_empty() {
252                    None
253                } else {
254                    Some(output)
255                }
256            }
257        }
258    }
259
260    /// Check if a Codex event is a control event (state management with no user output)
261    ///
262    /// Control events are valid JSON that represent state transitions rather than
263    /// user-facing content. They should be tracked separately from "ignored" events
264    /// to avoid false health warnings.
265    fn is_control_event(event: &CodexEvent) -> bool {
266        match event {
267            // Turn lifecycle events are control events
268            CodexEvent::ThreadStarted { .. }
269            | CodexEvent::TurnStarted { .. }
270            | CodexEvent::TurnCompleted { .. }
271            | CodexEvent::TurnFailed { .. } => true,
272            // Item started/completed events are control events for certain item types
273            CodexEvent::ItemStarted { item } => {
274                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
275            }
276            CodexEvent::ItemCompleted { item } => {
277                item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
278            }
279            _ => false,
280        }
281    }
282
283    /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
284    ///
285    /// Partial events represent streaming content deltas (agent messages, reasoning)
286    /// that are shown to the user in real-time. These should be tracked separately
287    /// to avoid inflating "ignored" percentages.
288    fn is_partial_event(event: &CodexEvent) -> bool {
289        match event {
290            // Item started events for agent_message and reasoning produce streaming content
291            CodexEvent::ItemStarted { item: Some(item) } => matches!(
292                item.item_type.as_deref(),
293                Some("agent_message" | "reasoning")
294            ),
295            _ => false,
296        }
297    }
298
299    /// Parse a stream of Codex NDJSON events
300    pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
301        use super::incremental_parser::IncrementalNdjsonParser;
302
303        let c = &self.colors;
304        let monitor = HealthMonitor::new("Codex");
305        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
306            std::fs::OpenOptions::new()
307                .create(true)
308                .append(true)
309                .open(log_path)
310                .ok()
311                .map(std::io::BufWriter::new)
312        });
313
314        // Use incremental parser for true real-time streaming
315        // This processes JSON as soon as it's complete, not waiting for newlines
316        let mut incremental_parser = IncrementalNdjsonParser::new();
317        let mut byte_buffer = Vec::new();
318
319        loop {
320            // Read available bytes
321            byte_buffer.clear();
322            let chunk = reader.fill_buf()?;
323            if chunk.is_empty() {
324                break;
325            }
326
327            // Process all bytes immediately
328            byte_buffer.extend_from_slice(chunk);
329            let consumed = chunk.len();
330            reader.consume(consumed);
331
332            // Feed bytes to incremental parser
333            let json_events = incremental_parser.feed(&byte_buffer);
334
335            // Process each complete JSON event immediately
336            for line in json_events {
337                let trimmed = line.trim();
338                if trimmed.is_empty() {
339                    continue;
340                }
341
342                // In debug mode, also show the raw JSON
343                if self.verbosity.is_debug() {
344                    let mut printer = self.printer.borrow_mut();
345                    writeln!(
346                        printer,
347                        "{}[DEBUG]{} {}{}{}",
348                        c.dim(),
349                        c.reset(),
350                        c.dim(),
351                        &line,
352                        c.reset()
353                    )?;
354                    printer.flush()?;
355                }
356
357                // Parse the event once - parse_event handles malformed JSON by returning None
358                match self.parse_event(&line) {
359                    Some(output) => {
360                        // Check if this is a partial/delta event (streaming content)
361                        if trimmed.starts_with('{') {
362                            if let Ok(event) = serde_json::from_str::<CodexEvent>(&line) {
363                                if Self::is_partial_event(&event) {
364                                    monitor.record_partial_event();
365                                } else {
366                                    monitor.record_parsed();
367                                }
368                            } else {
369                                monitor.record_parsed();
370                            }
371                        } else {
372                            monitor.record_parsed();
373                        }
374                        // Write output to printer
375                        let mut printer = self.printer.borrow_mut();
376                        write!(printer, "{output}")?;
377                        printer.flush()?;
378                    }
379                    None => {
380                        // Check if this was a control event (state management with no user output)
381                        if trimmed.starts_with('{') {
382                            if let Ok(event) = serde_json::from_str::<CodexEvent>(&line) {
383                                if Self::is_control_event(&event) {
384                                    monitor.record_control_event();
385                                } else {
386                                    // Valid JSON but not a control event - track as unknown
387                                    monitor.record_unknown_event();
388                                }
389                            } else {
390                                // Failed to deserialize - track as parse error
391                                monitor.record_parse_error();
392                            }
393                        } else {
394                            monitor.record_ignored();
395                        }
396                    }
397                }
398
399                // Log raw JSON to file if configured
400                if let Some(ref mut file) = log_writer {
401                    writeln!(file, "{line}")?;
402                }
403            }
404        }
405
406        if let Some(ref mut file) = log_writer {
407            file.flush()?;
408        }
409        if let Some(warning) = monitor.check_and_warn(*c) {
410            let mut printer = self.printer.borrow_mut();
411            writeln!(printer, "{warning}")?;
412        }
413        Ok(())
414    }
415}