1use std::{
2 future::Future,
3 io::{self as stdio, BufRead, Write},
4 path::{Path, PathBuf},
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures_core::Stream;
10use tokio::{
11 fs,
12 fs::OpenOptions,
13 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader, BufWriter},
14 sync::mpsc,
15 task, time,
16};
17
18use crate::{CodexError, ExecStreamError, ThreadEvent};
19
20#[derive(Clone, Debug, Default)]
21pub(crate) struct StreamContext {
22 current_thread_id: Option<String>,
23 current_turn_id: Option<String>,
24 next_synthetic_turn: u32,
25}
26
27#[derive(Clone, Debug, Default)]
31pub struct JsonlThreadEventParser {
32 context: StreamContext,
33}
34
35impl JsonlThreadEventParser {
36 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn reset(&mut self) {
43 self.context = StreamContext::default();
44 }
45
46 pub fn parse_line(&mut self, line: &str) -> Result<Option<ThreadEvent>, ExecStreamError> {
52 let line = line.strip_suffix('\r').unwrap_or(line);
53 if line.chars().all(|ch| ch.is_whitespace()) {
54 return Ok(None);
55 }
56
57 normalize_thread_event(line, &mut self.context).map(Some)
58 }
59}
60
61#[derive(Debug)]
62pub struct ThreadEventJsonlRecord {
63 pub line_number: usize,
65 pub outcome: Result<ThreadEvent, ExecStreamError>,
67}
68
69impl Clone for ThreadEventJsonlRecord {
70 fn clone(&self) -> Self {
71 Self {
72 line_number: self.line_number,
73 outcome: match &self.outcome {
74 Ok(event) => Ok(event.clone()),
75 Err(err) => Err(clone_exec_stream_error(err)),
76 },
77 }
78 }
79}
80
81fn clone_exec_stream_error(err: &ExecStreamError) -> ExecStreamError {
82 match err {
83 ExecStreamError::Codex(source) => ExecStreamError::Codex(clone_codex_error(source)),
84 ExecStreamError::Parse { line, source } => ExecStreamError::Parse {
85 line: line.clone(),
86 source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
87 },
88 ExecStreamError::Normalize { line, message } => ExecStreamError::Normalize {
89 line: line.clone(),
90 message: message.clone(),
91 },
92 ExecStreamError::IdleTimeout { idle_for } => ExecStreamError::IdleTimeout {
93 idle_for: *idle_for,
94 },
95 ExecStreamError::ChannelClosed => ExecStreamError::ChannelClosed,
96 }
97}
98
99fn clone_codex_error(err: &CodexError) -> CodexError {
100 match err {
101 CodexError::Spawn { binary, source } => CodexError::Spawn {
102 binary: binary.clone(),
103 source: clone_io_error(source),
104 },
105 CodexError::Wait { source } => CodexError::Wait {
106 source: clone_io_error(source),
107 },
108 CodexError::Timeout { timeout } => CodexError::Timeout { timeout: *timeout },
109 CodexError::NonZeroExit { status, stderr } => CodexError::NonZeroExit {
110 status: *status,
111 stderr: stderr.clone(),
112 },
113 CodexError::InvalidUtf8(source) => {
114 let io_err = std::io::Error::new(std::io::ErrorKind::InvalidData, source.to_string());
115 CodexError::CaptureIo(io_err)
116 }
117 CodexError::JsonParse {
118 context,
119 stdout,
120 source,
121 } => CodexError::JsonParse {
122 context,
123 stdout: stdout.clone(),
124 source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
125 },
126 CodexError::ExecPolicyParse { stdout, source } => CodexError::ExecPolicyParse {
127 stdout: stdout.clone(),
128 source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
129 },
130 CodexError::FeatureListParse { reason, stdout } => CodexError::FeatureListParse {
131 reason: reason.clone(),
132 stdout: stdout.clone(),
133 },
134 CodexError::ResponsesApiProxyInfoRead { path, source } => {
135 CodexError::ResponsesApiProxyInfoRead {
136 path: path.clone(),
137 source: clone_io_error(source),
138 }
139 }
140 CodexError::ResponsesApiProxyInfoParse { path, source } => {
141 CodexError::ResponsesApiProxyInfoParse {
142 path: path.clone(),
143 source: <serde_json::Error as serde::de::Error>::custom(source.to_string()),
144 }
145 }
146 CodexError::EmptyPrompt => CodexError::EmptyPrompt,
147 CodexError::EmptySandboxCommand => CodexError::EmptySandboxCommand,
148 CodexError::EmptyExecPolicyCommand => CodexError::EmptyExecPolicyCommand,
149 CodexError::EmptyApiKey => CodexError::EmptyApiKey,
150 CodexError::EmptyTaskId => CodexError::EmptyTaskId,
151 CodexError::EmptyEnvId => CodexError::EmptyEnvId,
152 CodexError::EmptyMcpServerName => CodexError::EmptyMcpServerName,
153 CodexError::EmptyMcpCommand => CodexError::EmptyMcpCommand,
154 CodexError::EmptyMcpUrl => CodexError::EmptyMcpUrl,
155 CodexError::EmptySocketPath => CodexError::EmptySocketPath,
156 CodexError::TempDir(source) => CodexError::TempDir(clone_io_error(source)),
157 CodexError::WorkingDirectory { source } => CodexError::WorkingDirectory {
158 source: clone_io_error(source),
159 },
160 CodexError::PrepareOutputDirectory { path, source } => CodexError::PrepareOutputDirectory {
161 path: path.clone(),
162 source: clone_io_error(source),
163 },
164 CodexError::PrepareCodexHome { path, source } => CodexError::PrepareCodexHome {
165 path: path.clone(),
166 source: clone_io_error(source),
167 },
168 CodexError::StdoutUnavailable => CodexError::StdoutUnavailable,
169 CodexError::StderrUnavailable => CodexError::StderrUnavailable,
170 CodexError::StdinUnavailable => CodexError::StdinUnavailable,
171 CodexError::CaptureIo(source) => CodexError::CaptureIo(clone_io_error(source)),
172 CodexError::StdinWrite(source) => CodexError::StdinWrite(clone_io_error(source)),
173 CodexError::Join(source) => {
174 let io_err = std::io::Error::other(source.to_string());
175 CodexError::CaptureIo(io_err)
176 }
177 }
178}
179
180fn clone_io_error(err: &std::io::Error) -> std::io::Error {
181 std::io::Error::new(err.kind(), err.to_string())
182}
183
184pub struct ThreadEventJsonlReader<R: BufRead> {
185 reader: R,
186 parser: JsonlThreadEventParser,
187 line_number: usize,
188 buffer: String,
189 done: bool,
190}
191
192impl<R: BufRead> ThreadEventJsonlReader<R> {
193 pub fn new(reader: R) -> Self {
195 Self {
196 reader,
197 parser: JsonlThreadEventParser::new(),
198 line_number: 0,
199 buffer: String::new(),
200 done: false,
201 }
202 }
203
204 pub fn into_inner(self) -> R {
206 self.reader
207 }
208}
209
210impl<R: BufRead> Iterator for ThreadEventJsonlReader<R> {
211 type Item = ThreadEventJsonlRecord;
212
213 fn next(&mut self) -> Option<Self::Item> {
214 if self.done {
215 return None;
216 }
217
218 loop {
219 self.buffer.clear();
220 let line_number = self.line_number.saturating_add(1);
221
222 match self.reader.read_line(&mut self.buffer) {
223 Ok(0) => {
224 self.done = true;
225 return None;
226 }
227 Ok(_) => {
228 self.line_number = line_number;
229 if self.buffer.ends_with('\n') {
230 self.buffer.pop();
231 }
232
233 match self.parser.parse_line(&self.buffer) {
234 Ok(None) => continue,
235 Ok(Some(event)) => {
236 return Some(ThreadEventJsonlRecord {
237 line_number,
238 outcome: Ok(event),
239 });
240 }
241 Err(err) => {
242 return Some(ThreadEventJsonlRecord {
243 line_number,
244 outcome: Err(err),
245 });
246 }
247 }
248 }
249 Err(err) => {
250 self.done = true;
251 self.line_number = line_number;
252 return Some(ThreadEventJsonlRecord {
253 line_number,
254 outcome: Err(ExecStreamError::from(CodexError::CaptureIo(err))),
255 });
256 }
257 }
258 }
259 }
260}
261
262pub type ThreadEventJsonlFileReader = ThreadEventJsonlReader<std::io::BufReader<std::fs::File>>;
263
264pub fn thread_event_jsonl_reader<R: BufRead>(reader: R) -> ThreadEventJsonlReader<R> {
266 ThreadEventJsonlReader::new(reader)
267}
268
269pub fn thread_event_jsonl_file(
271 path: impl AsRef<Path>,
272) -> Result<ThreadEventJsonlFileReader, ExecStreamError> {
273 let file = std::fs::File::open(path.as_ref())
274 .map_err(|err| ExecStreamError::from(CodexError::CaptureIo(err)))?;
275 Ok(ThreadEventJsonlReader::new(std::io::BufReader::new(file)))
276}
277
278pub(crate) async fn prepare_json_log(
279 path: Option<PathBuf>,
280) -> Result<Option<JsonLogSink>, ExecStreamError> {
281 match path {
282 Some(path) => {
283 let sink = JsonLogSink::new(path)
284 .await
285 .map_err(|err| ExecStreamError::from(CodexError::CaptureIo(err)))?;
286 Ok(Some(sink))
287 }
288 None => Ok(None),
289 }
290}
291
292#[derive(Debug)]
293pub(crate) struct JsonLogSink {
294 writer: BufWriter<fs::File>,
295}
296
297impl JsonLogSink {
298 pub(crate) async fn new(path: PathBuf) -> Result<Self, std::io::Error> {
299 if let Some(parent) = path.parent() {
300 if !parent.as_os_str().is_empty() {
301 fs::create_dir_all(parent).await?;
302 }
303 }
304
305 let file = OpenOptions::new()
306 .create(true)
307 .append(true)
308 .open(&path)
309 .await?;
310
311 Ok(Self {
312 writer: BufWriter::new(file),
313 })
314 }
315
316 async fn write_line(&mut self, line: &str) -> Result<(), std::io::Error> {
317 self.writer.write_all(line.as_bytes()).await?;
318 self.writer.write_all(b"\n").await?;
319 self.writer.flush().await
320 }
321}
322
323pub(crate) struct EventChannelStream {
324 rx: mpsc::Receiver<Result<ThreadEvent, ExecStreamError>>,
325 idle_timeout: Option<std::time::Duration>,
326 idle_timer: Option<Pin<Box<time::Sleep>>>,
327}
328
329impl EventChannelStream {
330 pub(crate) fn new(
331 rx: mpsc::Receiver<Result<ThreadEvent, ExecStreamError>>,
332 idle_timeout: Option<std::time::Duration>,
333 ) -> Self {
334 Self {
335 rx,
336 idle_timeout,
337 idle_timer: None,
338 }
339 }
340
341 fn reset_timer(&mut self) {
342 self.idle_timer = self
343 .idle_timeout
344 .map(|duration| Box::pin(time::sleep(duration)));
345 }
346}
347
348impl Stream for EventChannelStream {
349 type Item = Result<ThreadEvent, ExecStreamError>;
350
351 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352 let this = self.get_mut();
353
354 if let Some(timer) = this.idle_timer.as_mut() {
355 if let Poll::Ready(()) = timer.as_mut().poll(cx) {
356 let idle_for = this.idle_timeout.expect("idle_timer implies timeout");
357 this.idle_timer = None;
358 return Poll::Ready(Some(Err(ExecStreamError::IdleTimeout { idle_for })));
359 }
360 }
361
362 match this.rx.poll_recv(cx) {
363 Poll::Ready(Some(item)) => {
364 if this.idle_timeout.is_some() {
365 this.reset_timer();
366 }
367 Poll::Ready(Some(item))
368 }
369 Poll::Ready(None) => Poll::Ready(None),
370 Poll::Pending => {
371 if this.idle_timer.is_none() {
372 if let Some(duration) = this.idle_timeout {
373 let mut sleep = Box::pin(time::sleep(duration));
374 if let Poll::Ready(()) = sleep.as_mut().poll(cx) {
375 return Poll::Ready(Some(Err(ExecStreamError::IdleTimeout {
376 idle_for: duration,
377 })));
378 }
379 this.idle_timer = Some(sleep);
380 }
381 }
382 Poll::Pending
383 }
384 }
385 }
386}
387
388pub(crate) async fn forward_json_events<R>(
389 reader: R,
390 sender: mpsc::Sender<Result<ThreadEvent, ExecStreamError>>,
391 mirror_stdout: bool,
392 mut log: Option<JsonLogSink>,
393) -> Result<(), ExecStreamError>
394where
395 R: AsyncRead + Unpin,
396{
397 let mut lines = BufReader::new(reader).lines();
398 let mut context = StreamContext::default();
399 loop {
400 let line = match lines.next_line().await {
401 Ok(Some(line)) => line,
402 Ok(None) => break,
403 Err(err) => {
404 return Err(CodexError::CaptureIo(err).into());
405 }
406 };
407
408 if line.trim().is_empty() {
409 continue;
410 }
411
412 if let Some(sink) = log.as_mut() {
413 sink.write_line(&line)
414 .await
415 .map_err(|err| ExecStreamError::from(CodexError::CaptureIo(err)))?;
416 }
417
418 if mirror_stdout {
419 if let Err(err) = task::block_in_place(|| {
420 let mut out = stdio::stdout();
421 out.write_all(line.as_bytes())?;
422 out.write_all(b"\n")?;
423 out.flush()
424 }) {
425 return Err(CodexError::CaptureIo(err).into());
426 }
427 }
428
429 let event = normalize_thread_event(&line, &mut context);
430 if sender.send(event).await.is_err() {
431 break;
432 }
433 }
434
435 Ok(())
436}
437
438pub(crate) fn normalize_thread_event(
439 line: &str,
440 context: &mut StreamContext,
441) -> Result<ThreadEvent, ExecStreamError> {
442 let mut value: serde_json::Value =
443 serde_json::from_str(line).map_err(|source| ExecStreamError::Parse {
444 line: line.to_string(),
445 source,
446 })?;
447
448 let event_type = value
449 .get("type")
450 .and_then(|t| t.as_str())
451 .map(|s| s.to_string())
452 .ok_or_else(|| ExecStreamError::Normalize {
453 line: line.to_string(),
454 message: "event missing `type`".to_string(),
455 })?;
456
457 match event_type.as_str() {
458 "thread.started" | "thread.resumed" => {
459 let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id", "id"])
460 .ok_or_else(|| missing(&event_type, "thread_id", line))?;
461 context.current_thread_id = Some(thread_id.to_string());
462 context.current_turn_id = None;
463 }
464 "turn.started" => {
465 let turn_id = extract_str_from_keys(&value, &["turn_id", "id"])
466 .map(|s| s.to_string())
467 .unwrap_or_else(|| {
468 let next = context.next_synthetic_turn.max(1);
469 let id = format!("synthetic-turn-{next}");
470 context.next_synthetic_turn = next.saturating_add(1);
471 id
472 });
473 let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id"])
474 .map(|s| s.to_string())
475 .or_else(|| context.current_thread_id.clone())
476 .ok_or_else(|| missing("turn.started", "thread_id", line))?;
477 set_str(&mut value, "turn_id", turn_id.clone());
478 set_str(&mut value, "thread_id", thread_id.clone());
479 context.current_thread_id = Some(thread_id);
480 context.current_turn_id = Some(turn_id);
481 }
482 "turn.completed" | "turn.failed" => {
483 let turn_id = extract_str_from_keys(&value, &["turn_id", "id"])
484 .map(|s| s.to_string())
485 .or_else(|| context.current_turn_id.clone())
486 .ok_or_else(|| missing(&event_type, "turn_id", line))?;
487 let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id"])
488 .map(|s| s.to_string())
489 .or_else(|| context.current_thread_id.clone())
490 .ok_or_else(|| missing(&event_type, "thread_id", line))?;
491 set_str(&mut value, "turn_id", turn_id.clone());
492 set_str(&mut value, "thread_id", thread_id.clone());
493 context.current_turn_id = None;
494 context.current_thread_id = Some(thread_id);
495 }
496 t if t.starts_with("item.") => {
497 normalize_item_payload(&mut value);
498 if event_type == "item.delta" || event_type == "item.updated" {
499 normalize_item_delta_payload(&mut value);
500 }
501 let turn_id = extract_str(&value, "turn_id")
502 .map(|s| s.to_string())
503 .or_else(|| context.current_turn_id.clone())
504 .ok_or_else(|| missing(&event_type, "turn_id", line))?;
505 let thread_id = extract_str_from_keys(&value, &["thread_id", "conversation_id"])
506 .map(|s| s.to_string())
507 .or_else(|| context.current_thread_id.clone())
508 .ok_or_else(|| missing(&event_type, "thread_id", line))?;
509 set_str(&mut value, "turn_id", turn_id);
510 set_str(&mut value, "thread_id", thread_id);
511 }
512 _ => {}
513 }
514
515 serde_json::from_value::<ThreadEvent>(value).map_err(|source| ExecStreamError::Parse {
516 line: line.to_string(),
517 source,
518 })
519}
520
521fn extract_str<'a>(value: &'a serde_json::Value, key: &str) -> Option<&'a str> {
522 value
523 .get(key)
524 .and_then(|v| v.as_str())
525 .map(|s| s.trim())
526 .filter(|s| !s.is_empty())
527}
528
529fn extract_str_from_keys<'a>(value: &'a serde_json::Value, keys: &[&str]) -> Option<&'a str> {
530 for key in keys {
531 if let Some(found) = extract_str(value, key) {
532 return Some(found);
533 }
534 }
535 None
536}
537
538fn set_str(value: &mut serde_json::Value, key: &str, new_value: String) {
539 if let Some(map) = value.as_object_mut() {
540 map.insert(key.to_string(), serde_json::Value::String(new_value));
541 }
542}
543
544fn normalize_item_delta_payload(value: &mut serde_json::Value) {
545 let Some(map) = value.as_object_mut() else {
546 return;
547 };
548
549 if !map.contains_key("delta") {
550 if let Some(content) = map.remove("content") {
551 map.insert("delta".to_string(), content);
552 }
553 }
554
555 let Some(item_type) = map.get("item_type").and_then(|value| value.as_str()) else {
556 return;
557 };
558
559 if !matches!(item_type, "agent_message" | "reasoning") {
560 return;
561 }
562
563 let Some(delta) = map.get_mut("delta") else {
564 return;
565 };
566
567 if let Some(text_delta) = delta.as_str() {
568 *delta = serde_json::json!({ "text_delta": text_delta });
569 }
570}
571
572fn normalize_item_payload(value: &mut serde_json::Value) {
573 let mut item_object = match value
574 .get_mut("item")
575 .and_then(|item| item.as_object_mut())
576 .map(|map| map.clone())
577 {
578 Some(map) => map,
579 None => return,
580 };
581
582 if !item_object.contains_key("item_type") {
583 if let Some(item_type) = item_object.remove("type") {
584 item_object.insert("item_type".to_string(), item_type);
585 }
586 }
587
588 if !item_object.contains_key("content") {
589 let mut content: Option<serde_json::Value> = None;
590 if let Some(text) = item_object.remove("text") {
591 if let Some(text_str) = text.as_str() {
592 content = Some(serde_json::json!({ "text": text_str }));
593 } else {
594 content = Some(text);
595 }
596 } else if let Some(command) = item_object.get("command").cloned() {
597 let mut map = serde_json::Map::new();
598 map.insert("command".to_string(), command);
599 if let Some(stdout) = item_object.remove("aggregated_output") {
600 map.insert("stdout".to_string(), stdout);
601 }
602 if let Some(exit_code) = item_object.remove("exit_code") {
603 map.insert("exit_code".to_string(), exit_code);
604 }
605 if let Some(stderr) = item_object.remove("stderr") {
606 map.insert("stderr".to_string(), stderr);
607 }
608 content = Some(serde_json::Value::Object(map));
609 }
610
611 if let Some(content_value) = content {
612 item_object.insert("content".to_string(), content_value);
613 }
614 }
615
616 let item_type = item_object
617 .get("item_type")
618 .and_then(|value| value.as_str())
619 .or_else(|| item_object.get("type").and_then(|value| value.as_str()))
620 .map(|value| value.to_string());
621
622 if matches!(item_type.as_deref(), Some("agent_message" | "reasoning")) {
623 if let Some(content) = item_object.get_mut("content") {
624 if let Some(text) = content.as_str() {
625 *content = serde_json::json!({ "text": text });
626 }
627 }
628 }
629
630 if let Some(root) = value.as_object_mut() {
631 for (mut key, mut v) in item_object {
632 if key == "type" {
633 key = "item_type".to_string();
634 }
635 root.insert(key, v.take());
636 }
637 root.remove("item");
638 }
639}
640
641fn missing(event: &str, field: &str, line: &str) -> ExecStreamError {
642 ExecStreamError::Normalize {
643 line: line.to_string(),
644 message: format!("{event} missing `{field}` and no prior context to infer it"),
645 }
646}