Skip to main content

codex/
jsonl.rs

1use std::{
2    future::Future,
3    io::{self as stdio, BufRead, Write},
4    path::{Path, PathBuf},
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures_core::Stream;
10use tokio::{
11    fs,
12    fs::OpenOptions,
13    io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader, BufWriter},
14    sync::mpsc,
15    task, time,
16};
17
18use crate::{CodexError, ExecStreamError, ThreadEvent};
19
20#[derive(Clone, Debug, Default)]
21pub(crate) struct StreamContext {
22    current_thread_id: Option<String>,
23    current_turn_id: Option<String>,
24    next_synthetic_turn: u32,
25}
26
27/// Parses Codex `--json` JSONL logs into typed [`ThreadEvent`] values.
28///
29/// This API is synchronous and line-oriented (v1 contract).
30#[derive(Clone, Debug, Default)]
31pub struct JsonlThreadEventParser {
32    context: StreamContext,
33}
34
35impl JsonlThreadEventParser {
36    /// Constructs a new parser with no established context.
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Clears any thread/turn context and resets synthetic turn counters.
42    pub fn reset(&mut self) {
43        self.context = StreamContext::default();
44    }
45
46    /// Parses a single logical JSONL line.
47    ///
48    /// - Returns `Ok(None)` for empty / whitespace-only lines.
49    /// - Otherwise returns `Ok(Some(ThreadEvent))` on success.
50    /// - Returns `Err(ExecStreamError)` on JSON parse / normalization / typed parse failures.
51    pub fn parse_line(&mut self, line: &str) -> Result<Option<ThreadEvent>, ExecStreamError> {
52        let line = line.strip_suffix('\r').unwrap_or(line);
53        if line.chars().all(|ch| ch.is_whitespace()) {
54            return Ok(None);
55        }
56
57        normalize_thread_event(line, &mut self.context).map(Some)
58    }
59}
60
61#[derive(Debug)]
62pub struct ThreadEventJsonlRecord {
63    /// 1-based line number in the underlying source (file/reader).
64    pub line_number: usize,
65    /// The parse outcome for this line (success or failure).
66    pub outcome: Result<ThreadEvent, ExecStreamError>,
67}
68
69impl Clone for ThreadEventJsonlRecord {
70    fn clone(&self) -> Self {
71        Self {
72            line_number: self.line_number,
73            outcome: match &self.outcome {
74                Ok(event) => Ok(event.clone()),
75                Err(err) => Err(clone_exec_stream_error(err)),
76            },
77        }
78    }
79}
80
81fn clone_exec_stream_error(err: &ExecStreamError) -> ExecStreamError {
82    match err {
83        ExecStreamError::Codex(source) => ExecStreamError::Codex(clone_codex_error(source)),
84        ExecStreamError::Parse { line, source } => ExecStreamError::Parse {
85            line: line.clone(),
86            source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
87        },
88        ExecStreamError::Normalize { line, message } => ExecStreamError::Normalize {
89            line: line.clone(),
90            message: message.clone(),
91        },
92        ExecStreamError::IdleTimeout { idle_for } => ExecStreamError::IdleTimeout {
93            idle_for: *idle_for,
94        },
95        ExecStreamError::ChannelClosed => ExecStreamError::ChannelClosed,
96    }
97}
98
99fn clone_codex_error(err: &CodexError) -> CodexError {
100    match err {
101        CodexError::Spawn { binary, source } => CodexError::Spawn {
102            binary: binary.clone(),
103            source: clone_io_error(source),
104        },
105        CodexError::Wait { source } => CodexError::Wait {
106            source: clone_io_error(source),
107        },
108        CodexError::Timeout { timeout } => CodexError::Timeout { timeout: *timeout },
109        CodexError::NonZeroExit { status, stderr } => CodexError::NonZeroExit {
110            status: *status,
111            stderr: stderr.clone(),
112        },
113        CodexError::InvalidUtf8(source) => {
114            let io_err = std::io::Error::new(std::io::ErrorKind::InvalidData, source.to_string());
115            CodexError::CaptureIo(io_err)
116        }
117        CodexError::JsonParse {
118            context,
119            stdout,
120            source,
121        } => CodexError::JsonParse {
122            context,
123            stdout: stdout.clone(),
124            source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
125        },
126        CodexError::ExecPolicyParse { stdout, source } => CodexError::ExecPolicyParse {
127            stdout: stdout.clone(),
128            source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
129        },
130        CodexError::FeatureListParse { reason, stdout } => CodexError::FeatureListParse {
131            reason: reason.clone(),
132            stdout: stdout.clone(),
133        },
134        CodexError::ResponsesApiProxyInfoRead { path, source } => {
135            CodexError::ResponsesApiProxyInfoRead {
136                path: path.clone(),
137                source: clone_io_error(source),
138            }
139        }
140        CodexError::ResponsesApiProxyInfoParse { path, source } => {
141            CodexError::ResponsesApiProxyInfoParse {
142                path: path.clone(),
143                source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
144            }
145        }
146        CodexError::EmptyPrompt => CodexError::EmptyPrompt,
147        CodexError::EmptySandboxCommand => CodexError::EmptySandboxCommand,
148        CodexError::EmptyExecPolicyCommand => CodexError::EmptyExecPolicyCommand,
149        CodexError::EmptyApiKey => CodexError::EmptyApiKey,
150        CodexError::EmptyTaskId => CodexError::EmptyTaskId,
151        CodexError::EmptyEnvId => CodexError::EmptyEnvId,
152        CodexError::EmptyMcpServerName => CodexError::EmptyMcpServerName,
153        CodexError::EmptyMcpCommand => CodexError::EmptyMcpCommand,
154        CodexError::EmptyMcpUrl => CodexError::EmptyMcpUrl,
155        CodexError::EmptySocketPath => CodexError::EmptySocketPath,
156        CodexError::TempDir(source) => CodexError::TempDir(clone_io_error(source)),
157        CodexError::WorkingDirectory { source } => CodexError::WorkingDirectory {
158            source: clone_io_error(source),
159        },
160        CodexError::PrepareOutputDirectory { path, source } => CodexError::PrepareOutputDirectory {
161            path: path.clone(),
162            source: clone_io_error(source),
163        },
164        CodexError::PrepareCodexHome { path, source } => CodexError::PrepareCodexHome {
165            path: path.clone(),
166            source: clone_io_error(source),
167        },
168        CodexError::StdoutUnavailable => CodexError::StdoutUnavailable,
169        CodexError::StderrUnavailable => CodexError::StderrUnavailable,
170        CodexError::StdinUnavailable => CodexError::StdinUnavailable,
171        CodexError::CaptureIo(source) => CodexError::CaptureIo(clone_io_error(source)),
172        CodexError::StdinWrite(source) => CodexError::StdinWrite(clone_io_error(source)),
173        CodexError::Join(source) => {
174            let io_err = std::io::Error::other(source.to_string());
175            CodexError::CaptureIo(io_err)
176        }
177    }
178}
179
180fn clone_io_error(err: &std::io::Error) -> std::io::Error {
181    std::io::Error::new(err.kind(), err.to_string())
182}
183
184pub struct ThreadEventJsonlReader<R: BufRead> {
185    reader: R,
186    parser: JsonlThreadEventParser,
187    line_number: usize,
188    buffer: String,
189    done: bool,
190}
191
192impl<R: BufRead> ThreadEventJsonlReader<R> {
193    /// Creates a reader-backed iterator with a fresh parser.
194    pub fn new(reader: R) -> Self {
195        Self {
196            reader,
197            parser: JsonlThreadEventParser::new(),
198            line_number: 0,
199            buffer: String::new(),
200            done: false,
201        }
202    }
203
204    /// Consumes the iterator and returns the wrapped reader.
205    pub fn into_inner(self) -> R {
206        self.reader
207    }
208}
209
210impl<R: BufRead> Iterator for ThreadEventJsonlReader<R> {
211    type Item = ThreadEventJsonlRecord;
212
213    fn next(&mut self) -> Option<Self::Item> {
214        if self.done {
215            return None;
216        }
217
218        loop {
219            self.buffer.clear();
220            let line_number = self.line_number.saturating_add(1);
221
222            match self.reader.read_line(&mut self.buffer) {
223                Ok(0) => {
224                    self.done = true;
225                    return None;
226                }
227                Ok(_) => {
228                    self.line_number = line_number;
229                    if self.buffer.ends_with('\n') {
230                        self.buffer.pop();
231                    }
232
233                    match self.parser.parse_line(&self.buffer) {
234                        Ok(None) => continue,
235                        Ok(Some(event)) => {
236                            return Some(ThreadEventJsonlRecord {
237                                line_number,
238                                outcome: Ok(event),
239                            });
240                        }
241                        Err(err) => {
242                            return Some(ThreadEventJsonlRecord {
243                                line_number,
244                                outcome: Err(err),
245                            });
246                        }
247                    }
248                }
249                Err(err) => {
250                    self.done = true;
251                    self.line_number = line_number;
252                    return Some(ThreadEventJsonlRecord {
253                        line_number,
254                        outcome: Err(ExecStreamError::from(CodexError::CaptureIo(err))),
255                    });
256                }
257            }
258        }
259    }
260}
261
262pub type ThreadEventJsonlFileReader = ThreadEventJsonlReader<std::io::BufReader<std::fs::File>>;
263
264/// Convenience constructor for reader-backed parsing.
265pub fn thread_event_jsonl_reader<R: BufRead>(reader: R) -> ThreadEventJsonlReader<R> {
266    ThreadEventJsonlReader::new(reader)
267}
268
269/// Convenience constructor for file-backed parsing.
270pub fn thread_event_jsonl_file(
271    path: impl AsRef<Path>,
272) -> Result<ThreadEventJsonlFileReader, ExecStreamError> {
273    let file = std::fs::File::open(path.as_ref())
274        .map_err(|err| ExecStreamError::from(CodexError::CaptureIo(err)))?;
275    Ok(ThreadEventJsonlReader::new(std::io::BufReader::new(file)))
276}
277
278pub(crate) async fn prepare_json_log(
279    path: Option<PathBuf>,
280) -> Result<Option<JsonLogSink>, ExecStreamError> {
281    match path {
282        Some(path) => {
283            let sink = JsonLogSink::new(path)
284                .await
285                .map_err(|err| ExecStreamError::from(CodexError::CaptureIo(err)))?;
286            Ok(Some(sink))
287        }
288        None => Ok(None),
289    }
290}
291
292#[derive(Debug)]
293pub(crate) struct JsonLogSink {
294    writer: BufWriter<fs::File>,
295}
296
297impl JsonLogSink {
298    pub(crate) async fn new(path: PathBuf) -> Result<Self, std::io::Error> {
299        if let Some(parent) = path.parent() {
300            if !parent.as_os_str().is_empty() {
301                fs::create_dir_all(parent).await?;
302            }
303        }
304
305        let file = OpenOptions::new()
306            .create(true)
307            .append(true)
308            .open(&path)
309            .await?;
310
311        Ok(Self {
312            writer: BufWriter::new(file),
313        })
314    }
315
316    async fn write_line(&mut self, line: &str) -> Result<(), std::io::Error> {
317        self.writer.write_all(line.as_bytes()).await?;
318        self.writer.write_all(b"\n").await?;
319        self.writer.flush().await
320    }
321}
322
323pub(crate) struct EventChannelStream {
324    rx: mpsc::Receiver<Result<ThreadEvent, ExecStreamError>>,
325    idle_timeout: Option<std::time::Duration>,
326    idle_timer: Option<Pin<Box<time::Sleep>>>,
327}
328
329impl EventChannelStream {
330    pub(crate) fn new(
331        rx: mpsc::Receiver<Result<ThreadEvent, ExecStreamError>>,
332        idle_timeout: Option<std::time::Duration>,
333    ) -> Self {
334        Self {
335            rx,
336            idle_timeout,
337            idle_timer: None,
338        }
339    }
340
341    fn reset_timer(&mut self) {
342        self.idle_timer = self
343            .idle_timeout
344            .map(|duration| Box::pin(time::sleep(duration)));
345    }
346}
347
348impl Stream for EventChannelStream {
349    type Item = Result<ThreadEvent, ExecStreamError>;
350
351    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352        let this = self.get_mut();
353
354        if let Some(timer) = this.idle_timer.as_mut() {
355            if let Poll::Ready(()) = timer.as_mut().poll(cx) {
356                let idle_for = this.idle_timeout.expect("idle_timer implies timeout");
357                this.idle_timer = None;
358                return Poll::Ready(Some(Err(ExecStreamError::IdleTimeout { idle_for })));
359            }
360        }
361
362        match this.rx.poll_recv(cx) {
363            Poll::Ready(Some(item)) => {
364                if this.idle_timeout.is_some() {
365                    this.reset_timer();
366                }
367                Poll::Ready(Some(item))
368            }
369            Poll::Ready(None) => Poll::Ready(None),
370            Poll::Pending => {
371                if this.idle_timer.is_none() {
372                    if let Some(duration) = this.idle_timeout {
373                        let mut sleep = Box::pin(time::sleep(duration));
374                        if let Poll::Ready(()) = sleep.as_mut().poll(cx) {
375                            return Poll::Ready(Some(Err(ExecStreamError::IdleTimeout {
376                                idle_for: duration,
377                            })));
378                        }
379                        this.idle_timer = Some(sleep);
380                    }
381                }
382                Poll::Pending
383            }
384        }
385    }
386}
387
388pub(crate) async fn forward_json_events<R>(
389    reader: R,
390    sender: mpsc::Sender<Result<ThreadEvent, ExecStreamError>>,
391    mirror_stdout: bool,
392    mut log: Option<JsonLogSink>,
393) -> Result<(), ExecStreamError>
394where
395    R: AsyncRead + Unpin,
396{
397    let mut lines = BufReader::new(reader).lines();
398    let mut context = StreamContext::default();
399    loop {
400        let line = match lines.next_line().await {
401            Ok(Some(line)) => line,
402            Ok(None) => break,
403            Err(err) => {
404                return Err(CodexError::CaptureIo(err).into());
405            }
406        };
407
408        if line.trim().is_empty() {
409            continue;
410        }
411
412        if let Some(sink) = log.as_mut() {
413            sink.write_line(&line)
414                .await
415                .map_err(|err| ExecStreamError::from(CodexError::CaptureIo(err)))?;
416        }
417
418        if mirror_stdout {
419            if let Err(err) = task::block_in_place(|| {
420                let mut out = stdio::stdout();
421                out.write_all(line.as_bytes())?;
422                out.write_all(b"\n")?;
423                out.flush()
424            }) {
425                return Err(CodexError::CaptureIo(err).into());
426            }
427        }
428
429        let event = normalize_thread_event(&line, &mut context);
430        if sender.send(event).await.is_err() {
431            break;
432        }
433    }
434
435    Ok(())
436}
437
438pub(crate) fn normalize_thread_event(
439    line: &str,
440    context: &mut StreamContext,
441) -> Result<ThreadEvent, ExecStreamError> {
442    let mut value: serde_json::Value =
443        serde_json::from_str(line).map_err(|source| ExecStreamError::Parse {
444            line: line.to_string(),
445            source,
446        })?;
447
448    let event_type = value
449        .get("type")
450        .and_then(|t| t.as_str())
451        .map(|s| s.to_string())
452        .ok_or_else(|| ExecStreamError::Normalize {
453            line: line.to_string(),
454            message: "event missing `type`".to_string(),
455        })?;
456
457    match event_type.as_str() {
458        "thread.started" | "thread.resumed" => {
459            let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id", "id"])
460                .ok_or_else(|| missing(&event_type, "thread_id", line))?;
461            context.current_thread_id = Some(thread_id.to_string());
462            context.current_turn_id = None;
463        }
464        "turn.started" => {
465            let turn_id = extract_str_from_keys(&value, &["turn_id", "id"])
466                .map(|s| s.to_string())
467                .unwrap_or_else(|| {
468                    let next = context.next_synthetic_turn.max(1);
469                    let id = format!("synthetic-turn-{next}");
470                    context.next_synthetic_turn = next.saturating_add(1);
471                    id
472                });
473            let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id"])
474                .map(|s| s.to_string())
475                .or_else(|| context.current_thread_id.clone())
476                .ok_or_else(|| missing("turn.started", "thread_id", line))?;
477            set_str(&mut value, "turn_id", turn_id.clone());
478            set_str(&mut value, "thread_id", thread_id.clone());
479            context.current_thread_id = Some(thread_id);
480            context.current_turn_id = Some(turn_id);
481        }
482        "turn.completed" | "turn.failed" => {
483            let turn_id = extract_str_from_keys(&value, &["turn_id", "id"])
484                .map(|s| s.to_string())
485                .or_else(|| context.current_turn_id.clone())
486                .ok_or_else(|| missing(&event_type, "turn_id", line))?;
487            let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id"])
488                .map(|s| s.to_string())
489                .or_else(|| context.current_thread_id.clone())
490                .ok_or_else(|| missing(&event_type, "thread_id", line))?;
491            set_str(&mut value, "turn_id", turn_id.clone());
492            set_str(&mut value, "thread_id", thread_id.clone());
493            context.current_turn_id = None;
494            context.current_thread_id = Some(thread_id);
495        }
496        t if t.starts_with("item.") => {
497            normalize_item_payload(&mut value);
498            if event_type == "item.delta" || event_type == "item.updated" {
499                normalize_item_delta_payload(&mut value);
500            }
501            let turn_id = extract_str(&value, "turn_id")
502                .map(|s| s.to_string())
503                .or_else(|| context.current_turn_id.clone())
504                .ok_or_else(|| missing(&event_type, "turn_id", line))?;
505            let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id"])
506                .map(|s| s.to_string())
507                .or_else(|| context.current_thread_id.clone())
508                .ok_or_else(|| missing(&event_type, "thread_id", line))?;
509            set_str(&mut value, "turn_id", turn_id);
510            set_str(&mut value, "thread_id", thread_id);
511        }
512        _ => {}
513    }
514
515    serde_json::from_value::<ThreadEvent>(value).map_err(|source| ExecStreamError::Parse {
516        line: line.to_string(),
517        source,
518    })
519}
520
521fn extract_str<'a>(value: &'a serde_json::Value, key: &str) -> Option<&'a str> {
522    value
523        .get(key)
524        .and_then(|v| v.as_str())
525        .map(|s| s.trim())
526        .filter(|s| !s.is_empty())
527}
528
529fn extract_str_from_keys<'a>(value: &'a serde_json::Value, keys: &[&str]) -> Option<&'a str> {
530    for key in keys {
531        if let Some(found) = extract_str(value, key) {
532            return Some(found);
533        }
534    }
535    None
536}
537
538fn set_str(value: &mut serde_json::Value, key: &str, new_value: String) {
539    if let Some(map) = value.as_object_mut() {
540        map.insert(key.to_string(), serde_json::Value::String(new_value));
541    }
542}
543
544fn normalize_item_delta_payload(value: &mut serde_json::Value) {
545    let Some(map) = value.as_object_mut() else {
546        return;
547    };
548
549    if !map.contains_key("delta") {
550        if let Some(content) = map.remove("content") {
551            map.insert("delta".to_string(), content);
552        }
553    }
554
555    let Some(item_type) = map.get("item_type").and_then(|value| value.as_str()) else {
556        return;
557    };
558
559    if !matches!(item_type, "agent_message" | "reasoning") {
560        return;
561    }
562
563    let Some(delta) = map.get_mut("delta") else {
564        return;
565    };
566
567    if let Some(text_delta) = delta.as_str() {
568        *delta = serde_json::json!({ "text_delta": text_delta });
569    }
570}
571
572fn normalize_item_payload(value: &mut serde_json::Value) {
573    let mut item_object = match value
574        .get_mut("item")
575        .and_then(|item| item.as_object_mut())
576        .map(|map| map.clone())
577    {
578        Some(map) => map,
579        None => return,
580    };
581
582    if !item_object.contains_key("item_type") {
583        if let Some(item_type) = item_object.remove("type") {
584            item_object.insert("item_type".to_string(), item_type);
585        }
586    }
587
588    if !item_object.contains_key("content") {
589        let mut content: Option<serde_json::Value> = None;
590        if let Some(text) = item_object.remove("text") {
591            if let Some(text_str) = text.as_str() {
592                content = Some(serde_json::json!({ "text": text_str }));
593            } else {
594                content = Some(text);
595            }
596        } else if let Some(command) = item_object.get("command").cloned() {
597            let mut map = serde_json::Map::new();
598            map.insert("command".to_string(), command);
599            if let Some(stdout) = item_object.remove("aggregated_output") {
600                map.insert("stdout".to_string(), stdout);
601            }
602            if let Some(exit_code) = item_object.remove("exit_code") {
603                map.insert("exit_code".to_string(), exit_code);
604            }
605            if let Some(stderr) = item_object.remove("stderr") {
606                map.insert("stderr".to_string(), stderr);
607            }
608            content = Some(serde_json::Value::Object(map));
609        }
610
611        if let Some(content_value) = content {
612            item_object.insert("content".to_string(), content_value);
613        }
614    }
615
616    let item_type = item_object
617        .get("item_type")
618        .and_then(|value| value.as_str())
619        .or_else(|| item_object.get("type").and_then(|value| value.as_str()))
620        .map(|value| value.to_string());
621
622    if matches!(item_type.as_deref(), Some("agent_message" | "reasoning")) {
623        if let Some(content) = item_object.get_mut("content") {
624            if let Some(text) = content.as_str() {
625                *content = serde_json::json!({ "text": text });
626            }
627        }
628    }
629
630    if let Some(root) = value.as_object_mut() {
631        for (mut key, mut v) in item_object {
632            if key == "type" {
633                key = "item_type".to_string();
634            }
635            root.insert(key, v.take());
636        }
637        root.remove("item");
638    }
639}
640
641fn missing(event: &str, field: &str, line: &str) -> ExecStreamError {
642    ExecStreamError::Normalize {
643        line: line.to_string(),
644        message: format!("{event} missing `{field}` and no prior context to infer it"),
645    }
646}