ralph_workflow/json_parser/codex/mod.rs
1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//! creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use std::cell::RefCell;
39use std::io::{self, BufRead, Write};
40use std::rc::Rc;
41
42use super::health::HealthMonitor;
43use super::health::StreamingQualityMetrics;
44use super::printer::SharedPrinter;
45use super::streaming_state::StreamingSession;
46use super::terminal::TerminalMode;
47use super::types::{format_unknown_json_event, CodexEvent};
48
49use event_handlers::{
50 handle_error, handle_item_completed, handle_item_started, handle_thread_started,
51 handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
52};
53
54/// Codex event parser
55pub struct CodexParser {
56 colors: Colors,
57 verbosity: Verbosity,
58 log_file: Option<String>,
59 display_name: String,
60 /// Unified streaming session for state tracking
61 streaming_session: Rc<RefCell<StreamingSession>>,
62 /// Delta accumulator for reasoning content (which uses special display)
63 /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
64 reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
65 /// Turn counter for generating synthetic turn IDs
66 turn_counter: Rc<RefCell<u64>>,
67 /// Terminal mode for output formatting
68 terminal_mode: RefCell<TerminalMode>,
69 /// Whether to show streaming quality metrics
70 show_streaming_metrics: bool,
71 /// Output printer for capturing or displaying output
72 printer: SharedPrinter,
73}
74
75impl CodexParser {
76 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
77 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
78 }
79
80 /// Create a new `CodexParser` with a custom printer.
81 ///
82 /// # Arguments
83 ///
84 /// * `colors` - Colors for terminal output
85 /// * `verbosity` - Verbosity level for output
86 /// * `printer` - Shared printer for output
87 ///
88 /// # Returns
89 ///
90 /// A new `CodexParser` instance
91 pub(crate) fn with_printer(
92 colors: Colors,
93 verbosity: Verbosity,
94 printer: SharedPrinter,
95 ) -> Self {
96 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
97 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
98
99 // Use the printer's is_terminal method to validate it's connected correctly
100 let _printer_is_terminal = printer.borrow().is_terminal();
101
102 Self {
103 colors,
104 verbosity,
105 log_file: None,
106 display_name: "Codex".to_string(),
107 streaming_session: Rc::new(RefCell::new(streaming_session)),
108 reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
109 turn_counter: Rc::new(RefCell::new(0)),
110 terminal_mode: RefCell::new(TerminalMode::detect()),
111 show_streaming_metrics: false,
112 printer,
113 }
114 }
115
116 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
117 self.show_streaming_metrics = show;
118 self
119 }
120
121 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
122 self.display_name = display_name.to_string();
123 self
124 }
125
126 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
127 self.log_file = Some(path.to_string());
128 self
129 }
130
131 #[cfg(test)]
132 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
133 *self.terminal_mode.borrow_mut() = mode;
134 self
135 }
136
137 // ===== Test utilities (available with test-utils feature) =====
138
139 /// Create a new parser with a custom printer (for testing).
140 ///
141 /// This method is public when the `test-utils` feature is enabled,
142 /// allowing integration tests to create parsers with custom printers.
143 #[cfg(feature = "test-utils")]
144 pub fn with_printer_for_test(
145 colors: Colors,
146 verbosity: Verbosity,
147 printer: SharedPrinter,
148 ) -> Self {
149 Self::with_printer(colors, verbosity, printer)
150 }
151
152 /// Set the log file path (for testing).
153 ///
154 /// This method is public when the `test-utils` feature is enabled,
155 /// allowing integration tests to configure log file output.
156 #[cfg(feature = "test-utils")]
157 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
158 self.log_file = Some(path.to_string());
159 self
160 }
161
162 /// Parse a stream of JSON events (for testing).
163 ///
164 /// This method is public when the `test-utils` feature is enabled,
165 /// allowing integration tests to invoke parsing.
166 #[cfg(feature = "test-utils")]
167 pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
168 self.parse_stream(reader)
169 }
170
171 /// Get a shared reference to the printer.
172 ///
173 /// This allows tests, monitoring, and other code to access the printer after parsing
174 /// to verify output content, check for duplicates, or capture output for analysis.
175 ///
176 /// # Returns
177 ///
178 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
179 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
180 pub fn printer(&self) -> SharedPrinter {
181 Rc::clone(&self.printer)
182 }
183
184 /// Get streaming quality metrics from the current session.
185 ///
186 /// This provides insight into the deduplication and streaming quality of the
187 /// parsing session.
188 ///
189 /// # Returns
190 ///
191 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
192 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
193 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
194 self.streaming_session
195 .borrow()
196 .get_streaming_quality_metrics()
197 }
198
199 /// Convert output string to Option, returning None if empty.
200 #[inline]
201 fn optional_output(output: String) -> Option<String> {
202 if output.is_empty() {
203 None
204 } else {
205 Some(output)
206 }
207 }
208
209 /// Parse and display a single Codex JSON event
210 ///
211 /// Returns `Some(formatted_output)` for valid events, or None for:
212 /// - Malformed JSON (non-JSON text passed through if meaningful)
213 /// - Unknown event types
214 /// - Empty or whitespace-only output
215 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
216 let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
217 e
218 } else {
219 // Non-JSON line - pass through as-is if meaningful
220 let trimmed = line.trim();
221 if !trimmed.is_empty() && !trimmed.starts_with('{') {
222 return Some(format!("{trimmed}\n"));
223 }
224 return None;
225 };
226
227 let ctx = EventHandlerContext {
228 colors: &self.colors,
229 verbosity: self.verbosity,
230 display_name: &self.display_name,
231 streaming_session: &self.streaming_session,
232 reasoning_accumulator: &self.reasoning_accumulator,
233 terminal_mode: *self.terminal_mode.borrow(),
234 show_streaming_metrics: self.show_streaming_metrics,
235 };
236
237 match event {
238 CodexEvent::ThreadStarted { thread_id } => {
239 Self::optional_output(handle_thread_started(&ctx, thread_id))
240 }
241 CodexEvent::TurnStarted {} => {
242 // Generate and set synthetic turn ID for duplicate detection
243 let turn_id = {
244 let mut counter = self.turn_counter.borrow_mut();
245 let id = format!("turn-{}", *counter);
246 *counter += 1;
247 id
248 };
249 Self::optional_output(handle_turn_started(&ctx, turn_id))
250 }
251 CodexEvent::TurnCompleted { usage } => {
252 Self::optional_output(handle_turn_completed(&ctx, usage))
253 }
254 CodexEvent::TurnFailed { error } => {
255 Self::optional_output(handle_turn_failed(&ctx, error))
256 }
257 CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
258 CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
259 CodexEvent::Error { message, error } => {
260 Self::optional_output(handle_error(&ctx, message, error))
261 }
262 CodexEvent::Result { result } => self.format_result_event(result),
263 CodexEvent::Unknown => {
264 let output = format_unknown_json_event(
265 line,
266 &self.display_name,
267 self.colors,
268 self.verbosity.is_verbose(),
269 );
270 Self::optional_output(output)
271 }
272 }
273 }
274
275 /// Format a Result event for display.
276 ///
277 /// Result events are synthetic control events that are written to the log file
278 /// by `process_event_line`. In debug mode, this method also formats them for
279 /// console output to help with troubleshooting.
280 fn format_result_event(&self, result: Option<String>) -> Option<String> {
281 if !self.verbosity.is_debug() {
282 return None;
283 }
284 result.map(|content| {
285 let limit = self.verbosity.truncate_limit("result");
286 let preview = crate::common::truncate_text(&content, limit);
287 format!(
288 "{}[{}]{} {}Result:{} {}{}{}\n",
289 self.colors.dim(),
290 self.display_name,
291 self.colors.reset(),
292 self.colors.green(),
293 self.colors.reset(),
294 self.colors.dim(),
295 preview,
296 self.colors.reset()
297 )
298 })
299 }
300
301 /// Check if a Codex event is a control event (state management with no user output)
302 ///
303 /// Control events are valid JSON that represent state transitions rather than
304 /// user-facing content. They should be tracked separately from "ignored" events
305 /// to avoid false health warnings.
306 fn is_control_event(event: &CodexEvent) -> bool {
307 match event {
308 // Turn lifecycle events are control events
309 CodexEvent::ThreadStarted { .. }
310 | CodexEvent::TurnStarted { .. }
311 | CodexEvent::TurnCompleted { .. }
312 | CodexEvent::TurnFailed { .. }
313 | CodexEvent::Result { .. } => true,
314 // Item started/completed events are control events for certain item types
315 CodexEvent::ItemStarted { item } => {
316 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
317 }
318 CodexEvent::ItemCompleted { item } => {
319 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
320 }
321 _ => false,
322 }
323 }
324
325 /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
326 ///
327 /// Partial events represent streaming content deltas (agent messages, reasoning)
328 /// that are shown to the user in real-time. These should be tracked separately
329 /// to avoid inflating "ignored" percentages.
330 fn is_partial_event(event: &CodexEvent) -> bool {
331 match event {
332 // Item started events for agent_message and reasoning produce streaming content
333 CodexEvent::ItemStarted { item: Some(item) } => matches!(
334 item.item_type.as_deref(),
335 Some("agent_message" | "reasoning")
336 ),
337 _ => false,
338 }
339 }
340
341 /// Write a synthetic result event to the log file with accumulated content.
342 ///
343 /// This is called when a `TurnCompleted` event is encountered to ensure
344 /// that the extraction process can find the aggregated content.
345 ///
346 /// # Persistence Guarantees
347 ///
348 /// This function flushes the `BufWriter` after writing and calls `sync_all()`
349 /// to ensure data is committed to physical storage. Errors are propagated
350 /// to ensure the result event is actually persisted before continuing.
351 fn write_synthetic_result_event(
352 file: &mut impl std::io::Write,
353 accumulated: &str,
354 ) -> io::Result<()> {
355 let result_event = CodexEvent::Result {
356 result: Some(accumulated.to_string()),
357 };
358 let json = serde_json::to_string(&result_event)?;
359 writeln!(file, "{json}")?;
360 file.flush()?;
361 Ok(())
362 }
363
364 /// Process a single JSON event line during parsing.
365 ///
366 /// This helper method handles the common logic for processing parsed JSON events,
367 /// including debug output, event parsing, health monitoring, and log writing.
368 /// It's used both for events from the streaming parser and for any remaining
369 /// buffered data at the end of the stream.
370 ///
371 /// # Arguments
372 ///
373 /// * `line` - The JSON line to process
374 /// * `monitor` - The health monitor to record parsing metrics (mut needed for `record_*` methods)
375 /// * `log_writer` - Optional log file writer
376 ///
377 /// # Returns
378 ///
379 /// `Ok(true)` if the line was successfully processed, `Ok(false)` if the line
380 /// was empty or skipped, or `Err` if an IO error occurred.
381 fn process_event_line(
382 &self,
383 line: &str,
384 monitor: &HealthMonitor,
385 log_writer: &mut Option<std::io::BufWriter<std::fs::File>>,
386 ) -> io::Result<bool> {
387 let trimmed = line.trim();
388 if trimmed.is_empty() {
389 return Ok(false);
390 }
391
392 if self.verbosity.is_debug() {
393 let mut printer = self.printer.borrow_mut();
394 writeln!(
395 printer,
396 "{}[DEBUG]{} {}{}{}",
397 self.colors.dim(),
398 self.colors.reset(),
399 self.colors.dim(),
400 line,
401 self.colors.reset()
402 )?;
403 printer.flush()?;
404 }
405
406 // Parse the event once for both display/logic and synthetic result writing
407 let parsed_event = if trimmed.starts_with('{') {
408 serde_json::from_str::<CodexEvent>(trimmed).ok()
409 } else {
410 None
411 };
412
413 // Check if this is a turn.completed event using the parsed event
414 let is_turn_completed = parsed_event
415 .as_ref()
416 .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
417
418 match self.parse_event(line) {
419 Some(output) => {
420 if let Some(event) = &parsed_event {
421 if Self::is_partial_event(event) {
422 monitor.record_partial_event();
423 } else {
424 monitor.record_parsed();
425 }
426 } else {
427 monitor.record_parsed();
428 }
429 let mut printer = self.printer.borrow_mut();
430 write!(printer, "{output}")?;
431 printer.flush()?;
432 }
433 None => {
434 if let Some(event) = &parsed_event {
435 if Self::is_control_event(event) {
436 monitor.record_control_event();
437 } else {
438 monitor.record_unknown_event();
439 }
440 } else {
441 monitor.record_ignored();
442 }
443 }
444 }
445
446 if let Some(ref mut file) = log_writer {
447 writeln!(file, "{line}")?;
448 // Write synthetic result event on turn.completed to ensure content is captured
449 // This handles the normal case where the stream completes properly
450 if is_turn_completed {
451 if let Some(accumulated) = self
452 .streaming_session
453 .borrow()
454 .get_accumulated(super::types::ContentType::Text, "agent_msg")
455 {
456 // Propagate the error to ensure the result event is written
457 Self::write_synthetic_result_event(file, accumulated)?;
458 // Sync to disk to ensure result event is persisted immediately
459 file.get_mut().sync_all()?;
460 }
461 }
462 }
463
464 Ok(true)
465 }
466
467 /// Parse a stream of Codex NDJSON events
468 pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
469 use super::incremental_parser::IncrementalNdjsonParser;
470
471 let monitor = HealthMonitor::new("Codex");
472 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
473 std::fs::OpenOptions::new()
474 .create(true)
475 .append(true)
476 .open(log_path)
477 .ok()
478 .map(std::io::BufWriter::new)
479 });
480
481 let mut incremental_parser = IncrementalNdjsonParser::new();
482 let mut byte_buffer = Vec::new();
483 // Track whether we've written a synthetic result event for the current turn
484 let mut result_written_for_current_turn = false;
485
486 loop {
487 byte_buffer.clear();
488 let chunk = reader.fill_buf()?;
489 if chunk.is_empty() {
490 break;
491 }
492 let consumed = chunk.len();
493 byte_buffer.extend_from_slice(chunk);
494 reader.consume(consumed);
495
496 for line in incremental_parser.feed(&byte_buffer) {
497 // Check if this is a turn.completed or turn.started event before processing
498 let is_turn_completed = line.trim().starts_with('{')
499 && serde_json::from_str::<CodexEvent>(line.trim())
500 .ok()
501 .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
502 let is_turn_started = line.trim().starts_with('{')
503 && serde_json::from_str::<CodexEvent>(line.trim())
504 .ok()
505 .is_some_and(|e| matches!(e, CodexEvent::TurnStarted { .. }));
506
507 self.process_event_line(&line, &monitor, &mut log_writer)?;
508
509 // Track result event writes - reset flag when new turn starts
510 if is_turn_started {
511 result_written_for_current_turn = false;
512 } else if is_turn_completed {
513 result_written_for_current_turn = true;
514 }
515 }
516 }
517
518 // Handle any remaining buffered data when the stream ends.
519 // Only process if it's valid JSON - incomplete buffered data should be skipped.
520 if let Some(remaining) = incremental_parser.finish() {
521 // Only process if it's valid JSON to avoid processing incomplete buffered data
522 if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(&remaining).is_ok()
523 {
524 self.process_event_line(&remaining, &monitor, &mut log_writer)?;
525 }
526 }
527
528 // Ensure accumulated content is written even if turn.completed was not received
529 // This handles the case where the stream ends unexpectedly
530 if let Some(ref mut file) = log_writer {
531 if !result_written_for_current_turn {
532 if let Some(accumulated) = self
533 .streaming_session
534 .borrow()
535 .get_accumulated(super::types::ContentType::Text, "agent_msg")
536 {
537 // Write the synthetic result event for any accumulated content
538 Self::write_synthetic_result_event(file, accumulated)?;
539 }
540 }
541 file.flush()?;
542 }
543 // Ensure data is written to disk before continuing
544 // This prevents race conditions where extraction runs before OS commits writes
545 if let Some(ref mut file) = log_writer {
546 // Propagate sync_all errors to ensure all data is committed to disk
547 file.get_mut().sync_all()?;
548 }
549 if let Some(warning) = monitor.check_and_warn(self.colors) {
550 let mut printer = self.printer.borrow_mut();
551 writeln!(printer, "{warning}")?;
552 }
553 Ok(())
554 }
555}