ralph_workflow/json_parser/codex/mod.rs
1//! Codex CLI JSON parser.
2//!
3//! Parses NDJSON output from `OpenAI` Codex CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `item.started` events with `agent_message` type),
9//! the parser:
10//!
11//! 1. **Accumulates** text deltas from each chunk into a buffer
12//! 2. **Displays** the accumulated text after each chunk
13//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
14//! creating an updating effect that shows the content building up in real-time
15//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
16//!
17//! Example output sequence for streaming "Hello World" in two chunks:
18//! ```text
19//! [Codex] Hello\r (first chunk with prefix, no newline)
20//! \x1b[2K\r[Codex] Hello World\r (second chunk clears line, rewrites with accumulated)
21//! [Codex] Hello World\n (item.completed shows final result with prefix)
22//! ```
23//!
24//! # Single-Line Pattern
25//!
26//! The renderer uses a single-line pattern with carriage return for in-place updates.
27//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
28//!
29//! Each delta rewrites the entire line with prefix, ensuring that:
30//! - The user always sees the prefix
31//! - Content updates in-place without visual artifacts
32//! - Terminal state is clean and predictable
33
34mod event_handlers;
35
36use crate::config::Verbosity;
37use crate::logger::Colors;
38use crate::workspace::Workspace;
39use std::cell::RefCell;
40use std::io::{self, BufRead, Write};
41use std::path::PathBuf;
42use std::rc::Rc;
43
44use super::health::HealthMonitor;
45#[cfg(feature = "test-utils")]
46use super::health::StreamingQualityMetrics;
47use super::printer::SharedPrinter;
48use super::streaming_state::StreamingSession;
49use super::terminal::TerminalMode;
50use super::types::{format_unknown_json_event, CodexEvent};
51
52use event_handlers::{
53 handle_error, handle_item_completed, handle_item_started, handle_thread_started,
54 handle_turn_completed, handle_turn_failed, handle_turn_started, EventHandlerContext,
55};
56
57/// Codex event parser
58pub struct CodexParser {
59 colors: Colors,
60 verbosity: Verbosity,
61 /// Relative path to log file (if logging enabled)
62 log_path: Option<PathBuf>,
63 display_name: String,
64 /// Unified streaming session for state tracking
65 streaming_session: Rc<RefCell<StreamingSession>>,
66 /// Delta accumulator for reasoning content (which uses special display)
67 /// Note: We keep this for reasoning only, as it uses `DeltaDisplayFormatter`
68 reasoning_accumulator: Rc<RefCell<super::types::DeltaAccumulator>>,
69 /// Turn counter for generating synthetic turn IDs
70 turn_counter: Rc<RefCell<u64>>,
71 /// Terminal mode for output formatting
72 terminal_mode: RefCell<TerminalMode>,
73 /// Whether to show streaming quality metrics
74 show_streaming_metrics: bool,
75 /// Output printer for capturing or displaying output
76 printer: SharedPrinter,
77}
78
79impl CodexParser {
80 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
81 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
82 }
83
84 /// Create a new `CodexParser` with a custom printer.
85 ///
86 /// # Arguments
87 ///
88 /// * `colors` - Colors for terminal output
89 /// * `verbosity` - Verbosity level for output
90 /// * `printer` - Shared printer for output
91 ///
92 /// # Returns
93 ///
94 /// A new `CodexParser` instance
95 pub(crate) fn with_printer(
96 colors: Colors,
97 verbosity: Verbosity,
98 printer: SharedPrinter,
99 ) -> Self {
100 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
101 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
102
103 // Use the printer's is_terminal method to validate it's connected correctly
104 let _printer_is_terminal = printer.borrow().is_terminal();
105
106 Self {
107 colors,
108 verbosity,
109 log_path: None,
110 display_name: "Codex".to_string(),
111 streaming_session: Rc::new(RefCell::new(streaming_session)),
112 reasoning_accumulator: Rc::new(RefCell::new(super::types::DeltaAccumulator::new())),
113 turn_counter: Rc::new(RefCell::new(0)),
114 terminal_mode: RefCell::new(TerminalMode::detect()),
115 show_streaming_metrics: false,
116 printer,
117 }
118 }
119
120 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
121 self.show_streaming_metrics = show;
122 self
123 }
124
125 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
126 self.display_name = display_name.to_string();
127 self
128 }
129
130 /// Configure log file path.
131 ///
132 /// The workspace is passed to `parse_stream` separately.
133 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
134 self.log_path = Some(PathBuf::from(path));
135 self
136 }
137
138 #[cfg(any(test, feature = "test-utils"))]
139 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
140 *self.terminal_mode.borrow_mut() = mode;
141 self
142 }
143
144 // ===== Test utilities (available with test-utils feature) =====
145
146 /// Create a new parser with a custom printer (for testing).
147 ///
148 /// This method is public when the `test-utils` feature is enabled,
149 /// allowing integration tests to create parsers with custom printers.
150 #[cfg(feature = "test-utils")]
151 pub fn with_printer_for_test(
152 colors: Colors,
153 verbosity: Verbosity,
154 printer: SharedPrinter,
155 ) -> Self {
156 Self::with_printer(colors, verbosity, printer)
157 }
158
159 /// Set the log file path (for testing).
160 ///
161 /// This method is public when the `test-utils` feature is enabled,
162 /// allowing integration tests to configure log file path.
163 #[cfg(feature = "test-utils")]
164 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
165 self.log_path = Some(PathBuf::from(path));
166 self
167 }
168
169 /// Parse a stream of JSON events (for testing).
170 ///
171 /// This method is public when the `test-utils` feature is enabled,
172 /// allowing integration tests to invoke parsing.
173 #[cfg(feature = "test-utils")]
174 pub fn parse_stream_for_test<R: std::io::BufRead>(
175 &self,
176 reader: R,
177 workspace: &dyn Workspace,
178 ) -> std::io::Result<()> {
179 self.parse_stream(reader, workspace)
180 }
181
182 /// Get a shared reference to the printer.
183 ///
184 /// This allows tests, monitoring, and other code to access the printer after parsing
185 /// to verify output content, check for duplicates, or capture output for analysis.
186 /// Only available with the `test-utils` feature.
187 ///
188 /// # Returns
189 ///
190 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
191 #[cfg(feature = "test-utils")]
192 pub fn printer(&self) -> SharedPrinter {
193 Rc::clone(&self.printer)
194 }
195
196 /// Get streaming quality metrics from the current session.
197 ///
198 /// This provides insight into the deduplication and streaming quality of the
199 /// parsing session. Only available with the `test-utils` feature.
200 ///
201 /// # Returns
202 ///
203 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
204 #[cfg(feature = "test-utils")]
205 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
206 self.streaming_session
207 .borrow()
208 .get_streaming_quality_metrics()
209 }
210
211 /// Convert output string to Option, returning None if empty.
212 #[inline]
213 fn optional_output(output: String) -> Option<String> {
214 if output.is_empty() {
215 None
216 } else {
217 Some(output)
218 }
219 }
220
221 /// Parse and display a single Codex JSON event
222 ///
223 /// Returns `Some(formatted_output)` for valid events, or None for:
224 /// - Malformed JSON (non-JSON text passed through if meaningful)
225 /// - Unknown event types
226 /// - Empty or whitespace-only output
227 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
228 let event: CodexEvent = if let Ok(e) = serde_json::from_str(line) {
229 e
230 } else {
231 // Non-JSON line - pass through as-is if meaningful
232 let trimmed = line.trim();
233 if !trimmed.is_empty() && !trimmed.starts_with('{') {
234 return Some(format!("{trimmed}\n"));
235 }
236 return None;
237 };
238
239 let ctx = EventHandlerContext {
240 colors: &self.colors,
241 verbosity: self.verbosity,
242 display_name: &self.display_name,
243 streaming_session: &self.streaming_session,
244 reasoning_accumulator: &self.reasoning_accumulator,
245 terminal_mode: *self.terminal_mode.borrow(),
246 show_streaming_metrics: self.show_streaming_metrics,
247 };
248
249 match event {
250 CodexEvent::ThreadStarted { thread_id } => {
251 Self::optional_output(handle_thread_started(&ctx, thread_id))
252 }
253 CodexEvent::TurnStarted {} => {
254 // Generate and set synthetic turn ID for duplicate detection
255 let turn_id = {
256 let mut counter = self.turn_counter.borrow_mut();
257 let id = format!("turn-{}", *counter);
258 *counter += 1;
259 id
260 };
261 Self::optional_output(handle_turn_started(&ctx, turn_id))
262 }
263 CodexEvent::TurnCompleted { usage } => {
264 Self::optional_output(handle_turn_completed(&ctx, usage))
265 }
266 CodexEvent::TurnFailed { error } => {
267 Self::optional_output(handle_turn_failed(&ctx, error))
268 }
269 CodexEvent::ItemStarted { item } => handle_item_started(&ctx, item.as_ref()),
270 CodexEvent::ItemCompleted { item } => handle_item_completed(&ctx, item.as_ref()),
271 CodexEvent::Error { message, error } => {
272 Self::optional_output(handle_error(&ctx, message, error))
273 }
274 CodexEvent::Result { result } => self.format_result_event(result),
275 CodexEvent::Unknown => {
276 let output = format_unknown_json_event(
277 line,
278 &self.display_name,
279 self.colors,
280 self.verbosity.is_verbose(),
281 );
282 Self::optional_output(output)
283 }
284 }
285 }
286
287 /// Format a Result event for display.
288 ///
289 /// Result events are synthetic control events that are written to the log file
290 /// by `process_event_line`. In debug mode, this method also formats them for
291 /// console output to help with troubleshooting.
292 fn format_result_event(&self, result: Option<String>) -> Option<String> {
293 if !self.verbosity.is_debug() {
294 return None;
295 }
296 result.map(|content| {
297 let limit = self.verbosity.truncate_limit("result");
298 let preview = crate::common::truncate_text(&content, limit);
299 format!(
300 "{}[{}]{} {}Result:{} {}{}{}\n",
301 self.colors.dim(),
302 self.display_name,
303 self.colors.reset(),
304 self.colors.green(),
305 self.colors.reset(),
306 self.colors.dim(),
307 preview,
308 self.colors.reset()
309 )
310 })
311 }
312
313 /// Check if a Codex event is a control event (state management with no user output)
314 ///
315 /// Control events are valid JSON that represent state transitions rather than
316 /// user-facing content. They should be tracked separately from "ignored" events
317 /// to avoid false health warnings.
318 fn is_control_event(event: &CodexEvent) -> bool {
319 match event {
320 // Turn lifecycle events are control events
321 CodexEvent::ThreadStarted { .. }
322 | CodexEvent::TurnStarted { .. }
323 | CodexEvent::TurnCompleted { .. }
324 | CodexEvent::TurnFailed { .. }
325 | CodexEvent::Result { .. } => true,
326 // Item started/completed events are control events for certain item types
327 CodexEvent::ItemStarted { item } => {
328 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
329 }
330 CodexEvent::ItemCompleted { item } => {
331 item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
332 }
333 _ => false,
334 }
335 }
336
337 /// Check if a Codex event is a partial/delta event (streaming content displayed incrementally)
338 ///
339 /// Partial events represent streaming content deltas (agent messages, reasoning)
340 /// that are shown to the user in real-time. These should be tracked separately
341 /// to avoid inflating "ignored" percentages.
342 fn is_partial_event(event: &CodexEvent) -> bool {
343 match event {
344 // Item started events for agent_message and reasoning produce streaming content
345 CodexEvent::ItemStarted { item: Some(item) } => matches!(
346 item.item_type.as_deref(),
347 Some("agent_message" | "reasoning")
348 ),
349 _ => false,
350 }
351 }
352
353 /// Write a synthetic result event to the log file with accumulated content.
354 ///
355 /// This is called when a `TurnCompleted` event is encountered to ensure
356 /// that the extraction process can find the aggregated content.
357 ///
358 /// # Persistence Guarantees
359 ///
360 /// This function flushes the writer after writing. Errors are propagated
361 /// to ensure the result event is actually persisted before continuing.
362 fn write_synthetic_result_event(
363 file: &mut impl std::io::Write,
364 accumulated: &str,
365 ) -> io::Result<()> {
366 let result_event = CodexEvent::Result {
367 result: Some(accumulated.to_string()),
368 };
369 let json = serde_json::to_string(&result_event)?;
370 writeln!(file, "{json}")?;
371 file.flush()?;
372 Ok(())
373 }
374
375 /// Write a synthetic result event to a byte buffer.
376 fn write_synthetic_result_to_buffer(buffer: &mut Vec<u8>, accumulated: &str) -> io::Result<()> {
377 Self::write_synthetic_result_event(buffer, accumulated)
378 }
379
380 /// Process a single JSON event line during parsing.
381 ///
382 /// This helper method handles the common logic for processing parsed JSON events,
383 /// including debug output, event parsing, health monitoring, and log writing.
384 /// It's used both for events from the streaming parser and for any remaining
385 /// buffered data at the end of the stream.
386 ///
387 /// # Arguments
388 ///
389 /// * `line` - The JSON line to process
390 /// * `monitor` - The health monitor to record parsing metrics (mut needed for `record_*` methods)
391 /// * `log_writer` - Optional log file writer
392 ///
393 /// # Returns
394 ///
395 /// `Ok(true)` if the line was successfully processed, `Ok(false)` if the line
396 /// was empty or skipped, or `Err` if an IO error occurred.
397 fn process_event_line_with_buffer(
398 &self,
399 line: &str,
400 monitor: &HealthMonitor,
401 logging_enabled: bool,
402 log_buffer: &mut Vec<u8>,
403 ) -> io::Result<bool> {
404 let trimmed = line.trim();
405 if trimmed.is_empty() {
406 return Ok(false);
407 }
408
409 if self.verbosity.is_debug() {
410 let mut printer = self.printer.borrow_mut();
411 writeln!(
412 printer,
413 "{}[DEBUG]{} {}{}{}",
414 self.colors.dim(),
415 self.colors.reset(),
416 self.colors.dim(),
417 line,
418 self.colors.reset()
419 )?;
420 printer.flush()?;
421 }
422
423 // Parse the event once for both display/logic and synthetic result writing
424 let parsed_event = if trimmed.starts_with('{') {
425 serde_json::from_str::<CodexEvent>(trimmed).ok()
426 } else {
427 None
428 };
429
430 // Check if this is a turn.completed event using the parsed event
431 let is_turn_completed = parsed_event
432 .as_ref()
433 .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
434
435 match self.parse_event(line) {
436 Some(output) => {
437 if let Some(event) = &parsed_event {
438 if Self::is_partial_event(event) {
439 monitor.record_partial_event();
440 } else {
441 monitor.record_parsed();
442 }
443 } else {
444 monitor.record_parsed();
445 }
446 let mut printer = self.printer.borrow_mut();
447 write!(printer, "{output}")?;
448 printer.flush()?;
449 }
450 None => {
451 if let Some(event) = &parsed_event {
452 if Self::is_control_event(event) {
453 monitor.record_control_event();
454 } else {
455 monitor.record_unknown_event();
456 }
457 } else {
458 monitor.record_ignored();
459 }
460 }
461 }
462
463 if logging_enabled {
464 writeln!(log_buffer, "{line}")?;
465 // Write synthetic result event on turn.completed to ensure content is captured
466 // This handles the normal case where the stream completes properly
467 if is_turn_completed {
468 if let Some(accumulated) = self
469 .streaming_session
470 .borrow()
471 .get_accumulated(super::types::ContentType::Text, "agent_msg")
472 {
473 Self::write_synthetic_result_to_buffer(log_buffer, accumulated)?;
474 }
475 }
476 }
477
478 Ok(true)
479 }
480
481 /// Parse a stream of Codex NDJSON events
482 pub(crate) fn parse_stream<R: BufRead>(
483 &self,
484 mut reader: R,
485 workspace: &dyn Workspace,
486 ) -> io::Result<()> {
487 use super::incremental_parser::IncrementalNdjsonParser;
488
489 let monitor = HealthMonitor::new("Codex");
490 // Accumulate log content in memory, write to workspace at the end
491 let logging_enabled = self.log_path.is_some();
492 let mut log_buffer: Vec<u8> = Vec::new();
493
494 let mut incremental_parser = IncrementalNdjsonParser::new();
495 let mut byte_buffer = Vec::new();
496 // Track whether we've written a synthetic result event for the current turn
497 let mut result_written_for_current_turn = false;
498
499 loop {
500 byte_buffer.clear();
501 let chunk = reader.fill_buf()?;
502 if chunk.is_empty() {
503 break;
504 }
505 let consumed = chunk.len();
506 byte_buffer.extend_from_slice(chunk);
507 reader.consume(consumed);
508
509 for line in incremental_parser.feed(&byte_buffer) {
510 // Check if this is a turn.completed or turn.started event before processing
511 let is_turn_completed = line.trim().starts_with('{')
512 && serde_json::from_str::<CodexEvent>(line.trim())
513 .ok()
514 .is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
515 let is_turn_started = line.trim().starts_with('{')
516 && serde_json::from_str::<CodexEvent>(line.trim())
517 .ok()
518 .is_some_and(|e| matches!(e, CodexEvent::TurnStarted { .. }));
519
520 self.process_event_line_with_buffer(
521 &line,
522 &monitor,
523 logging_enabled,
524 &mut log_buffer,
525 )?;
526
527 // Track result event writes - reset flag when new turn starts
528 if is_turn_started {
529 result_written_for_current_turn = false;
530 } else if is_turn_completed {
531 result_written_for_current_turn = true;
532 }
533 }
534 }
535
536 // Handle any remaining buffered data when the stream ends.
537 // Only process if it's valid JSON - incomplete buffered data should be skipped.
538 if let Some(remaining) = incremental_parser.finish() {
539 // Only process if it's valid JSON to avoid processing incomplete buffered data
540 if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(&remaining).is_ok()
541 {
542 self.process_event_line_with_buffer(
543 &remaining,
544 &monitor,
545 logging_enabled,
546 &mut log_buffer,
547 )?;
548 }
549 }
550
551 // Ensure accumulated content is written even if turn.completed was not received
552 // This handles the case where the stream ends unexpectedly
553 if logging_enabled && !result_written_for_current_turn {
554 if let Some(accumulated) = self
555 .streaming_session
556 .borrow()
557 .get_accumulated(super::types::ContentType::Text, "agent_msg")
558 {
559 // Write the synthetic result event for any accumulated content
560 Self::write_synthetic_result_to_buffer(&mut log_buffer, accumulated)?;
561 }
562 }
563
564 // Write accumulated log content to workspace
565 if let Some(log_path) = &self.log_path {
566 workspace.append_bytes(log_path, &log_buffer)?;
567 }
568
569 if let Some(warning) = monitor.check_and_warn(self.colors) {
570 let mut printer = self.printer.borrow_mut();
571 writeln!(printer, "{warning}")?;
572 }
573 Ok(())
574 }
575}