impl CodexParser {
fn is_control_event(event: &CodexEvent) -> bool {
match event {
CodexEvent::ThreadStarted { .. }
| CodexEvent::TurnStarted { .. }
| CodexEvent::TurnCompleted { .. }
| CodexEvent::TurnFailed { .. }
| CodexEvent::Result { .. } => true,
CodexEvent::ItemStarted { item } => {
item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
}
CodexEvent::ItemCompleted { item } => {
item.as_ref().and_then(|i| i.item_type.as_deref()) == Some("plan_update")
}
_ => false,
}
}
fn is_partial_event(event: &CodexEvent) -> bool {
match event {
CodexEvent::ItemStarted { item: Some(item) } => matches!(
item.item_type.as_deref(),
Some("agent_message" | "reasoning")
),
_ => false,
}
}
fn write_synthetic_result_event(
file: &mut impl std::io::Write,
accumulated: &str,
) -> std::io::Result<()> {
let result_event = CodexEvent::Result {
result: Some(accumulated.to_string()),
};
let json = serde_json::to_string(&result_event)?;
writeln!(file, "{json}")?;
file.flush()?;
Ok(())
}
fn write_synthetic_result_to_buffer(
buffer: &mut Vec<u8>,
accumulated: &str,
) -> std::io::Result<()> {
Self::write_synthetic_result_event(buffer, accumulated)
}
fn print_debug_line(&mut self, line: &str) -> std::io::Result<()> {
let colors = self.colors;
self.with_printer_mut(|printer| {
writeln!(printer, "{}[DEBUG]{} {}{}{}", colors.dim(), colors.reset(), colors.dim(), line, colors.reset())?;
printer.flush()
})
}
fn log_event_line(
&self,
line: &str,
is_turn_completed: bool,
log_buffer: &mut Vec<u8>,
) -> std::io::Result<()> {
writeln!(log_buffer, "{line}")?;
if is_turn_completed {
if let Some(acc) = self.state.streaming_session.borrow()
.get_accumulated(super::types::ContentType::Text, "agent_msg")
{
Self::write_synthetic_result_to_buffer(log_buffer, acc)?;
}
}
Ok(())
}
fn dispatch_parsed_event(&mut self, line: &str, parsed_event: &Option<CodexEvent>, monitor: &HealthMonitor) -> std::io::Result<()> {
let output = self.parse_event(line);
record_event_monitor_outcome(parsed_event, output.as_deref(), monitor);
print_event_output_if_present(self, output)
}
fn process_event_line_with_buffer(&mut self, line: &str, monitor: &HealthMonitor, logging_enabled: bool, log_buffer: &mut Vec<u8>) -> std::io::Result<bool> {
let trimmed = line.trim();
if trimmed.is_empty() { return Ok(false); }
if self.verbosity.is_debug() { self.print_debug_line(line)?; }
let parsed_event = parse_codex_event_if_json(trimmed);
let is_turn_completed = parsed_event.as_ref().is_some_and(|e| matches!(e, CodexEvent::TurnCompleted { .. }));
self.dispatch_parsed_event(line, &parsed_event, monitor)?;
if logging_enabled { self.log_event_line(line, is_turn_completed, log_buffer)?; }
Ok(true)
}
pub(crate) fn parse_stream<R: BufRead>(
&mut self,
mut reader: R,
workspace: &dyn Workspace,
) -> std::io::Result<()> {
use crate::json_parser::incremental_parser::IncrementalNdjsonParser;
let monitor = HealthMonitor::new("Codex");
let logging_enabled = self.log_path.is_some();
let mut log_buffer: Vec<u8> = Vec::new();
let mut incremental_parser = IncrementalNdjsonParser::new();
let result_written = std::cell::Cell::new(false);
self.run_codex_stream_loop(
&mut reader,
&mut incremental_parser,
&monitor,
&mut log_buffer,
logging_enabled,
&result_written,
)?;
self.finalize_codex_stream(
workspace, &monitor, &mut log_buffer, logging_enabled, incremental_parser, &result_written,
)
}
fn run_codex_stream_loop<R: BufRead>(
&mut self, reader: &mut R,
incremental_parser: &mut crate::json_parser::incremental_parser::IncrementalNdjsonParser,
monitor: &HealthMonitor, log_buffer: &mut Vec<u8>,
logging_enabled: bool, result_written: &std::cell::Cell<bool>,
) -> std::io::Result<()> {
loop {
let chunk = reader.fill_buf()?;
if chunk.is_empty() { break; }
let data = chunk.to_vec(); reader.consume(data.len());
let (new_parser, batch) = std::mem::take(incremental_parser).feed_and_get_events(&data);
*incremental_parser = new_parser;
batch.into_iter().try_for_each(|line| { self.process_stream_line_with_turn_tracking(&line, monitor, logging_enabled, log_buffer, result_written) })?;
}
Ok(())
}
fn process_stream_line_with_turn_tracking(
&mut self,
line: &str,
monitor: &HealthMonitor,
logging_enabled: bool,
log_buffer: &mut Vec<u8>,
result_written: &std::cell::Cell<bool>,
) -> std::io::Result<()> {
let trimmed = line.trim();
let is_turn_completed = is_codex_turn_event::<true>(trimmed);
let is_turn_started = is_codex_turn_event::<false>(trimmed);
self.process_event_line_with_buffer(line, monitor, logging_enabled, log_buffer)?;
update_turn_tracking(is_turn_started, is_turn_completed, result_written);
Ok(())
}
fn process_remaining_input(&mut self, remaining: &str, monitor: &HealthMonitor, logging_enabled: bool, log_buffer: &mut Vec<u8>) -> std::io::Result<()> {
if remaining.starts_with('{') && serde_json::from_str::<CodexEvent>(remaining).is_ok() {
self.process_event_line_with_buffer(remaining, monitor, logging_enabled, log_buffer)?;
}
Ok(())
}
fn flush_unwritten_result(&self, log_buffer: &mut Vec<u8>, result_written: &std::cell::Cell<bool>) -> std::io::Result<()> {
if result_written.get() { return Ok(()); }
if let Some(acc) = self.state.streaming_session.borrow().get_accumulated(super::types::ContentType::Text, "agent_msg") {
Self::write_synthetic_result_to_buffer(log_buffer, acc)?;
}
Ok(())
}
fn print_monitor_warning(&mut self, monitor: &HealthMonitor) {
if let Some(warning) = monitor.check_and_warn(self.colors) {
self.with_printer_mut(|printer| { writeln!(printer, "{warning}").ok(); });
}
}
fn finalize_codex_stream(
&mut self, workspace: &dyn Workspace, monitor: &HealthMonitor,
log_buffer: &mut Vec<u8>, logging_enabled: bool,
incremental_parser: crate::json_parser::incremental_parser::IncrementalNdjsonParser,
result_written: &std::cell::Cell<bool>,
) -> std::io::Result<()> {
if let Some(remaining) = incremental_parser.finish() { self.process_remaining_input(&remaining, monitor, logging_enabled, log_buffer)?; }
self.tool_activity_tracker.reset(); if logging_enabled { self.flush_unwritten_result(log_buffer, result_written)?; }
if let Some(log_path) = &self.log_path { workspace.append_bytes(log_path, log_buffer)?; }
self.print_monitor_warning(monitor);
Ok(())
}
}
fn update_turn_tracking(
is_turn_started: bool,
is_turn_completed: bool,
result_written: &std::cell::Cell<bool>,
) {
if is_turn_started {
result_written.set(false);
} else if is_turn_completed {
result_written.set(true);
}
}
fn is_codex_turn_event<const IS_COMPLETED: bool>(trimmed: &str) -> bool {
if !trimmed.starts_with('{') { return false; }
serde_json::from_str::<CodexEvent>(trimmed)
.ok()
.is_some_and(|e| if IS_COMPLETED {
matches!(e, CodexEvent::TurnCompleted { .. })
} else {
matches!(e, CodexEvent::TurnStarted { .. })
})
}
fn parse_codex_event_if_json(trimmed: &str) -> Option<CodexEvent> {
if trimmed.starts_with('{') {
serde_json::from_str::<CodexEvent>(trimmed).ok()
} else {
None
}
}
fn record_event_monitor_outcome(parsed_event: &Option<CodexEvent>, output: Option<&str>, monitor: &HealthMonitor) {
if output.is_some() {
record_codex_monitor_parsed(parsed_event, monitor);
} else {
record_codex_monitor_no_output(parsed_event, monitor);
}
}
fn print_event_output_if_present(parser: &mut CodexParser, output: Option<String>) -> std::io::Result<()> {
output.map_or(Ok(()), |text| {
parser.with_printer_mut(|printer| { write!(printer, "{text}")?; printer.flush() })
})
}
fn record_codex_monitor_parsed(parsed_event: &Option<CodexEvent>, monitor: &HealthMonitor) {
match parsed_event {
Some(event) if CodexParser::is_partial_event(event) => monitor.record_partial_event(),
Some(_) => monitor.record_parsed(),
None => monitor.record_parsed(),
}
}
fn record_codex_monitor_no_output(parsed_event: &Option<CodexEvent>, monitor: &HealthMonitor) {
match parsed_event {
Some(event) if CodexParser::is_control_event(event) => monitor.record_control_event(),
Some(_) => monitor.record_unknown_event(),
None => monitor.record_ignored(),
}
}