Skip to main content

codex/
rollout_jsonl.rs

1use std::{
2    collections::BTreeMap,
3    io::{BufRead, BufReader},
4    path::{Path, PathBuf},
5};
6
7use serde::{Deserialize, Serialize};
8
9/// Errors that may occur while parsing Codex CLI rollout JSONL logs (`rollout-*.jsonl`).
10#[derive(Debug, thiserror::Error)]
11pub enum RolloutJsonlError {
12    #[error("failed to parse codex rollout JSONL record: {source}: `{line}`")]
13    Parse {
14        line: String,
15        #[source]
16        source: serde_json::Error,
17    },
18    #[error("codex rollout JSONL record missing required field: {message}: `{line}`")]
19    Normalize { line: String, message: String },
20    #[error("failed to read codex rollout JSONL: {source}")]
21    Io {
22        #[source]
23        source: std::io::Error,
24    },
25}
26
27#[derive(Clone, Debug, Default)]
28pub struct RolloutJsonlParser;
29
30impl RolloutJsonlParser {
31    pub fn new() -> Self {
32        Self
33    }
34
35    /// Parses a single logical JSONL line.
36    ///
37    /// - Returns `Ok(None)` for empty / whitespace-only lines.
38    /// - Otherwise returns `Ok(Some(RolloutEvent))` on success.
39    /// - Returns `Err(RolloutJsonlError)` on JSON parse / typed parse failures.
40    pub fn parse_line(&mut self, line: &str) -> Result<Option<RolloutEvent>, RolloutJsonlError> {
41        let line = line.strip_suffix('\r').unwrap_or(line);
42        if line.chars().all(|ch| ch.is_whitespace()) {
43            return Ok(None);
44        }
45
46        let raw: RawRolloutLine =
47            serde_json::from_str(line).map_err(|source| RolloutJsonlError::Parse {
48                line: line.to_string(),
49                source,
50            })?;
51
52        let record_type = raw
53            .record_type
54            .ok_or_else(|| RolloutJsonlError::Normalize {
55                line: line.to_string(),
56                message: "record missing `type`".to_string(),
57            })?;
58
59        let payload = raw.payload.unwrap_or(serde_json::Value::Null);
60        let event = match record_type.as_str() {
61            "session_meta" => RolloutEvent::SessionMeta(RolloutSessionMeta {
62                timestamp: raw.timestamp,
63                payload: serde_json::from_value(payload).map_err(|source| {
64                    RolloutJsonlError::Parse {
65                        line: line.to_string(),
66                        source,
67                    }
68                })?,
69                extra: raw.extra,
70            }),
71            "event_msg" => RolloutEvent::EventMsg(RolloutEventMsg {
72                timestamp: raw.timestamp,
73                payload: serde_json::from_value(payload).map_err(|source| {
74                    RolloutJsonlError::Parse {
75                        line: line.to_string(),
76                        source,
77                    }
78                })?,
79                extra: raw.extra,
80            }),
81            "response_item" => RolloutEvent::ResponseItem(RolloutResponseItem {
82                timestamp: raw.timestamp,
83                payload: serde_json::from_value(payload).map_err(|source| {
84                    RolloutJsonlError::Parse {
85                        line: line.to_string(),
86                        source,
87                    }
88                })?,
89                extra: raw.extra,
90            }),
91            _ => RolloutEvent::Unknown(RolloutUnknown {
92                timestamp: raw.timestamp,
93                record_type,
94                payload,
95                extra: raw.extra,
96            }),
97        };
98
99        Ok(Some(event))
100    }
101}
102
103#[derive(Clone, Debug, Deserialize)]
104struct RawRolloutLine {
105    #[serde(default)]
106    timestamp: Option<String>,
107    #[serde(rename = "type")]
108    record_type: Option<String>,
109    #[serde(default)]
110    payload: Option<serde_json::Value>,
111    #[serde(flatten)]
112    extra: BTreeMap<String, serde_json::Value>,
113}
114
115#[derive(Clone, Debug, Deserialize, Serialize)]
116pub enum RolloutEvent {
117    SessionMeta(RolloutSessionMeta),
118    EventMsg(RolloutEventMsg),
119    ResponseItem(RolloutResponseItem),
120    Unknown(RolloutUnknown),
121}
122
123#[derive(Clone, Debug, Deserialize, Serialize)]
124pub struct RolloutSessionMeta {
125    pub timestamp: Option<String>,
126    pub payload: RolloutSessionMetaPayload,
127    #[serde(flatten)]
128    pub extra: BTreeMap<String, serde_json::Value>,
129}
130
131#[derive(Clone, Debug, Deserialize, Serialize)]
132pub struct RolloutEventMsg {
133    pub timestamp: Option<String>,
134    pub payload: RolloutEventMsgPayload,
135    #[serde(flatten)]
136    pub extra: BTreeMap<String, serde_json::Value>,
137}
138
139#[derive(Clone, Debug, Deserialize, Serialize)]
140pub struct RolloutResponseItem {
141    pub timestamp: Option<String>,
142    pub payload: RolloutResponseItemPayload,
143    #[serde(flatten)]
144    pub extra: BTreeMap<String, serde_json::Value>,
145}
146
147#[derive(Clone, Debug, Deserialize, Serialize)]
148pub struct RolloutUnknown {
149    pub timestamp: Option<String>,
150    #[serde(rename = "type")]
151    pub record_type: String,
152    pub payload: serde_json::Value,
153    #[serde(flatten)]
154    pub extra: BTreeMap<String, serde_json::Value>,
155}
156
157#[derive(Clone, Debug, Default, Deserialize, Serialize)]
158pub struct RolloutSessionMetaPayload {
159    pub id: Option<String>,
160    pub timestamp: Option<String>,
161    pub cwd: Option<String>,
162    pub originator: Option<String>,
163    pub cli_version: Option<String>,
164    pub source: Option<String>,
165    pub model_provider: Option<String>,
166    pub base_instructions: Option<RolloutBaseInstructions>,
167    #[serde(flatten)]
168    pub extra: BTreeMap<String, serde_json::Value>,
169}
170
171#[derive(Clone, Debug, Default, Deserialize, Serialize)]
172pub struct RolloutBaseInstructions {
173    pub text: Option<String>,
174    #[serde(flatten)]
175    pub extra: BTreeMap<String, serde_json::Value>,
176}
177
178#[derive(Clone, Debug, Default, Deserialize, Serialize)]
179pub struct RolloutEventMsgPayload {
180    #[serde(rename = "type")]
181    pub kind: Option<String>,
182    #[serde(flatten)]
183    pub extra: BTreeMap<String, serde_json::Value>,
184}
185
186#[derive(Clone, Debug, Default, Deserialize, Serialize)]
187pub struct RolloutResponseItemPayload {
188    #[serde(rename = "type")]
189    pub kind: Option<String>,
190
191    pub role: Option<String>,
192    pub content: Option<Vec<RolloutContentPart>>,
193    pub summary: Option<Vec<RolloutContentPart>>,
194
195    pub name: Option<String>,
196    pub arguments: Option<String>,
197    pub call_id: Option<String>,
198    pub output: Option<String>,
199    pub encrypted_content: Option<String>,
200
201    #[serde(flatten)]
202    pub extra: BTreeMap<String, serde_json::Value>,
203}
204
205#[derive(Clone, Debug, Default, Deserialize, Serialize)]
206pub struct RolloutContentPart {
207    #[serde(rename = "type")]
208    pub kind: Option<String>,
209    pub text: Option<String>,
210    #[serde(flatten)]
211    pub extra: BTreeMap<String, serde_json::Value>,
212}
213
214#[derive(Debug)]
215pub struct RolloutJsonlRecord {
216    /// 1-based line number in the underlying source (file/reader).
217    pub line_number: usize,
218    /// The parse outcome for this line (success or failure).
219    pub outcome: Result<RolloutEvent, RolloutJsonlError>,
220}
221
222pub struct RolloutJsonlReader<R: BufRead> {
223    reader: R,
224    parser: RolloutJsonlParser,
225    line_number: usize,
226    buffer: String,
227    done: bool,
228}
229
230impl<R: BufRead> RolloutJsonlReader<R> {
231    pub fn new(reader: R) -> Self {
232        Self {
233            reader,
234            parser: RolloutJsonlParser::new(),
235            line_number: 0,
236            buffer: String::new(),
237            done: false,
238        }
239    }
240}
241
242impl<R: BufRead> Iterator for RolloutJsonlReader<R> {
243    type Item = RolloutJsonlRecord;
244
245    fn next(&mut self) -> Option<Self::Item> {
246        if self.done {
247            return None;
248        }
249
250        loop {
251            self.buffer.clear();
252            let line_number = self.line_number.saturating_add(1);
253            match self.reader.read_line(&mut self.buffer) {
254                Ok(0) => {
255                    self.done = true;
256                    return None;
257                }
258                Ok(_) => {
259                    self.line_number = line_number;
260                    if self.buffer.ends_with('\n') {
261                        self.buffer.pop();
262                    }
263
264                    match self.parser.parse_line(&self.buffer) {
265                        Ok(None) => continue,
266                        Ok(Some(event)) => {
267                            return Some(RolloutJsonlRecord {
268                                line_number,
269                                outcome: Ok(event),
270                            });
271                        }
272                        Err(err) => {
273                            return Some(RolloutJsonlRecord {
274                                line_number,
275                                outcome: Err(err),
276                            });
277                        }
278                    }
279                }
280                Err(err) => {
281                    self.done = true;
282                    self.line_number = line_number;
283                    return Some(RolloutJsonlRecord {
284                        line_number,
285                        outcome: Err(RolloutJsonlError::Io { source: err }),
286                    });
287                }
288            }
289        }
290    }
291}
292
293pub type RolloutJsonlFileReader = RolloutJsonlReader<std::io::BufReader<std::fs::File>>;
294
295pub fn rollout_jsonl_reader<R: BufRead>(reader: R) -> RolloutJsonlReader<R> {
296    RolloutJsonlReader::new(reader)
297}
298
299pub fn rollout_jsonl_file(
300    path: impl AsRef<Path>,
301) -> Result<RolloutJsonlFileReader, RolloutJsonlError> {
302    let file =
303        std::fs::File::open(path.as_ref()).map_err(|source| RolloutJsonlError::Io { source })?;
304    Ok(RolloutJsonlReader::new(std::io::BufReader::new(file)))
305}
306
307pub fn find_rollout_files(root: impl AsRef<Path>) -> Vec<PathBuf> {
308    let root = root.as_ref();
309    let mut out = Vec::new();
310    let sessions = root.join("sessions");
311    if !sessions.exists() {
312        return out;
313    }
314
315    let mut stack = vec![sessions];
316    while let Some(dir) = stack.pop() {
317        let entries = match std::fs::read_dir(&dir) {
318            Ok(entries) => entries,
319            Err(_) => continue,
320        };
321        for entry in entries.flatten() {
322            let path = entry.path();
323            if path.is_dir() {
324                stack.push(path);
325                continue;
326            }
327            if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
328                if name.starts_with("rollout-") && name.ends_with(".jsonl") {
329                    out.push(path);
330                }
331            }
332        }
333    }
334
335    out
336}
337
338pub fn find_rollout_file_by_id(root: impl AsRef<Path>, id: &str) -> Option<PathBuf> {
339    let root = root.as_ref();
340    let needle = id.strip_prefix("urn:uuid:").unwrap_or(id);
341    let files = find_rollout_files(root);
342
343    for path in &files {
344        if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
345            if name.contains(needle) {
346                return Some(path.clone());
347            }
348        }
349    }
350
351    for path in files {
352        let file = std::fs::File::open(&path).ok()?;
353        let mut reader = BufReader::new(file);
354        let mut line = String::new();
355        for _ in 0..32 {
356            line.clear();
357            let n = reader.read_line(&mut line).ok()?;
358            if n == 0 {
359                break;
360            }
361            if line.ends_with('\n') {
362                line.pop();
363            }
364            let logical = line.strip_suffix('\r').unwrap_or(&line);
365            if logical.chars().all(|ch| ch.is_whitespace()) {
366                continue;
367            }
368
369            let value: serde_json::Value = match serde_json::from_str(logical) {
370                Ok(value) => value,
371                Err(_) => continue,
372            };
373            if value.get("type").and_then(|v| v.as_str()) != Some("session_meta") {
374                continue;
375            }
376            let Some(session_id) = value
377                .get("payload")
378                .and_then(|p| p.get("id"))
379                .and_then(|v| v.as_str())
380            else {
381                continue;
382            };
383            let session_id = session_id.strip_prefix("urn:uuid:").unwrap_or(session_id);
384            if session_id == needle {
385                return Some(path);
386            }
387        }
388    }
389
390    None
391}