1use std::{
2 collections::BTreeMap,
3 io::{BufRead, BufReader},
4 path::{Path, PathBuf},
5};
6
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, thiserror::Error)]
11pub enum RolloutJsonlError {
12 #[error("failed to parse codex rollout JSONL record: {source}: `{line}`")]
13 Parse {
14 line: String,
15 #[source]
16 source: serde_json::Error,
17 },
18 #[error("codex rollout JSONL record missing required field: {message}: `{line}`")]
19 Normalize { line: String, message: String },
20 #[error("failed to read codex rollout JSONL: {source}")]
21 Io {
22 #[source]
23 source: std::io::Error,
24 },
25}
26
27#[derive(Clone, Debug, Default)]
28pub struct RolloutJsonlParser;
29
30impl RolloutJsonlParser {
31 pub fn new() -> Self {
32 Self
33 }
34
35 pub fn parse_line(&mut self, line: &str) -> Result<Option<RolloutEvent>, RolloutJsonlError> {
41 let line = line.strip_suffix('\r').unwrap_or(line);
42 if line.chars().all(|ch| ch.is_whitespace()) {
43 return Ok(None);
44 }
45
46 let raw: RawRolloutLine =
47 serde_json::from_str(line).map_err(|source| RolloutJsonlError::Parse {
48 line: line.to_string(),
49 source,
50 })?;
51
52 let record_type = raw
53 .record_type
54 .ok_or_else(|| RolloutJsonlError::Normalize {
55 line: line.to_string(),
56 message: "record missing `type`".to_string(),
57 })?;
58
59 let payload = raw.payload.unwrap_or(serde_json::Value::Null);
60 let event = match record_type.as_str() {
61 "session_meta" => RolloutEvent::SessionMeta(RolloutSessionMeta {
62 timestamp: raw.timestamp,
63 payload: serde_json::from_value(payload).map_err(|source| {
64 RolloutJsonlError::Parse {
65 line: line.to_string(),
66 source,
67 }
68 })?,
69 extra: raw.extra,
70 }),
71 "event_msg" => RolloutEvent::EventMsg(RolloutEventMsg {
72 timestamp: raw.timestamp,
73 payload: serde_json::from_value(payload).map_err(|source| {
74 RolloutJsonlError::Parse {
75 line: line.to_string(),
76 source,
77 }
78 })?,
79 extra: raw.extra,
80 }),
81 "response_item" => RolloutEvent::ResponseItem(RolloutResponseItem {
82 timestamp: raw.timestamp,
83 payload: serde_json::from_value(payload).map_err(|source| {
84 RolloutJsonlError::Parse {
85 line: line.to_string(),
86 source,
87 }
88 })?,
89 extra: raw.extra,
90 }),
91 _ => RolloutEvent::Unknown(RolloutUnknown {
92 timestamp: raw.timestamp,
93 record_type,
94 payload,
95 extra: raw.extra,
96 }),
97 };
98
99 Ok(Some(event))
100 }
101}
102
103#[derive(Clone, Debug, Deserialize)]
104struct RawRolloutLine {
105 #[serde(default)]
106 timestamp: Option<String>,
107 #[serde(rename = "type")]
108 record_type: Option<String>,
109 #[serde(default)]
110 payload: Option<serde_json::Value>,
111 #[serde(flatten)]
112 extra: BTreeMap<String, serde_json::Value>,
113}
114
115#[derive(Clone, Debug, Deserialize, Serialize)]
116pub enum RolloutEvent {
117 SessionMeta(RolloutSessionMeta),
118 EventMsg(RolloutEventMsg),
119 ResponseItem(RolloutResponseItem),
120 Unknown(RolloutUnknown),
121}
122
123#[derive(Clone, Debug, Deserialize, Serialize)]
124pub struct RolloutSessionMeta {
125 pub timestamp: Option<String>,
126 pub payload: RolloutSessionMetaPayload,
127 #[serde(flatten)]
128 pub extra: BTreeMap<String, serde_json::Value>,
129}
130
131#[derive(Clone, Debug, Deserialize, Serialize)]
132pub struct RolloutEventMsg {
133 pub timestamp: Option<String>,
134 pub payload: RolloutEventMsgPayload,
135 #[serde(flatten)]
136 pub extra: BTreeMap<String, serde_json::Value>,
137}
138
139#[derive(Clone, Debug, Deserialize, Serialize)]
140pub struct RolloutResponseItem {
141 pub timestamp: Option<String>,
142 pub payload: RolloutResponseItemPayload,
143 #[serde(flatten)]
144 pub extra: BTreeMap<String, serde_json::Value>,
145}
146
147#[derive(Clone, Debug, Deserialize, Serialize)]
148pub struct RolloutUnknown {
149 pub timestamp: Option<String>,
150 #[serde(rename = "type")]
151 pub record_type: String,
152 pub payload: serde_json::Value,
153 #[serde(flatten)]
154 pub extra: BTreeMap<String, serde_json::Value>,
155}
156
157#[derive(Clone, Debug, Default, Deserialize, Serialize)]
158pub struct RolloutSessionMetaPayload {
159 pub id: Option<String>,
160 pub timestamp: Option<String>,
161 pub cwd: Option<String>,
162 pub originator: Option<String>,
163 pub cli_version: Option<String>,
164 pub source: Option<String>,
165 pub model_provider: Option<String>,
166 pub base_instructions: Option<RolloutBaseInstructions>,
167 #[serde(flatten)]
168 pub extra: BTreeMap<String, serde_json::Value>,
169}
170
171#[derive(Clone, Debug, Default, Deserialize, Serialize)]
172pub struct RolloutBaseInstructions {
173 pub text: Option<String>,
174 #[serde(flatten)]
175 pub extra: BTreeMap<String, serde_json::Value>,
176}
177
178#[derive(Clone, Debug, Default, Deserialize, Serialize)]
179pub struct RolloutEventMsgPayload {
180 #[serde(rename = "type")]
181 pub kind: Option<String>,
182 #[serde(flatten)]
183 pub extra: BTreeMap<String, serde_json::Value>,
184}
185
186#[derive(Clone, Debug, Default, Deserialize, Serialize)]
187pub struct RolloutResponseItemPayload {
188 #[serde(rename = "type")]
189 pub kind: Option<String>,
190
191 pub role: Option<String>,
192 pub content: Option<Vec<RolloutContentPart>>,
193 pub summary: Option<Vec<RolloutContentPart>>,
194
195 pub name: Option<String>,
196 pub arguments: Option<String>,
197 pub call_id: Option<String>,
198 pub output: Option<String>,
199 pub encrypted_content: Option<String>,
200
201 #[serde(flatten)]
202 pub extra: BTreeMap<String, serde_json::Value>,
203}
204
205#[derive(Clone, Debug, Default, Deserialize, Serialize)]
206pub struct RolloutContentPart {
207 #[serde(rename = "type")]
208 pub kind: Option<String>,
209 pub text: Option<String>,
210 #[serde(flatten)]
211 pub extra: BTreeMap<String, serde_json::Value>,
212}
213
214#[derive(Debug)]
215pub struct RolloutJsonlRecord {
216 pub line_number: usize,
218 pub outcome: Result<RolloutEvent, RolloutJsonlError>,
220}
221
222pub struct RolloutJsonlReader<R: BufRead> {
223 reader: R,
224 parser: RolloutJsonlParser,
225 line_number: usize,
226 buffer: String,
227 done: bool,
228}
229
230impl<R: BufRead> RolloutJsonlReader<R> {
231 pub fn new(reader: R) -> Self {
232 Self {
233 reader,
234 parser: RolloutJsonlParser::new(),
235 line_number: 0,
236 buffer: String::new(),
237 done: false,
238 }
239 }
240}
241
242impl<R: BufRead> Iterator for RolloutJsonlReader<R> {
243 type Item = RolloutJsonlRecord;
244
245 fn next(&mut self) -> Option<Self::Item> {
246 if self.done {
247 return None;
248 }
249
250 loop {
251 self.buffer.clear();
252 let line_number = self.line_number.saturating_add(1);
253 match self.reader.read_line(&mut self.buffer) {
254 Ok(0) => {
255 self.done = true;
256 return None;
257 }
258 Ok(_) => {
259 self.line_number = line_number;
260 if self.buffer.ends_with('\n') {
261 self.buffer.pop();
262 }
263
264 match self.parser.parse_line(&self.buffer) {
265 Ok(None) => continue,
266 Ok(Some(event)) => {
267 return Some(RolloutJsonlRecord {
268 line_number,
269 outcome: Ok(event),
270 });
271 }
272 Err(err) => {
273 return Some(RolloutJsonlRecord {
274 line_number,
275 outcome: Err(err),
276 });
277 }
278 }
279 }
280 Err(err) => {
281 self.done = true;
282 self.line_number = line_number;
283 return Some(RolloutJsonlRecord {
284 line_number,
285 outcome: Err(RolloutJsonlError::Io { source: err }),
286 });
287 }
288 }
289 }
290 }
291}
292
293pub type RolloutJsonlFileReader = RolloutJsonlReader<std::io::BufReader<std::fs::File>>;
294
295pub fn rollout_jsonl_reader<R: BufRead>(reader: R) -> RolloutJsonlReader<R> {
296 RolloutJsonlReader::new(reader)
297}
298
299pub fn rollout_jsonl_file(
300 path: impl AsRef<Path>,
301) -> Result<RolloutJsonlFileReader, RolloutJsonlError> {
302 let file =
303 std::fs::File::open(path.as_ref()).map_err(|source| RolloutJsonlError::Io { source })?;
304 Ok(RolloutJsonlReader::new(std::io::BufReader::new(file)))
305}
306
307pub fn find_rollout_files(root: impl AsRef<Path>) -> Vec<PathBuf> {
308 let root = root.as_ref();
309 let mut out = Vec::new();
310 let sessions = root.join("sessions");
311 if !sessions.exists() {
312 return out;
313 }
314
315 let mut stack = vec![sessions];
316 while let Some(dir) = stack.pop() {
317 let entries = match std::fs::read_dir(&dir) {
318 Ok(entries) => entries,
319 Err(_) => continue,
320 };
321 for entry in entries.flatten() {
322 let path = entry.path();
323 if path.is_dir() {
324 stack.push(path);
325 continue;
326 }
327 if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
328 if name.starts_with("rollout-") && name.ends_with(".jsonl") {
329 out.push(path);
330 }
331 }
332 }
333 }
334
335 out
336}
337
338pub fn find_rollout_file_by_id(root: impl AsRef<Path>, id: &str) -> Option<PathBuf> {
339 let root = root.as_ref();
340 let needle = id.strip_prefix("urn:uuid:").unwrap_or(id);
341 let files = find_rollout_files(root);
342
343 for path in &files {
344 if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
345 if name.contains(needle) {
346 return Some(path.clone());
347 }
348 }
349 }
350
351 for path in files {
352 let file = std::fs::File::open(&path).ok()?;
353 let mut reader = BufReader::new(file);
354 let mut line = String::new();
355 for _ in 0..32 {
356 line.clear();
357 let n = reader.read_line(&mut line).ok()?;
358 if n == 0 {
359 break;
360 }
361 if line.ends_with('\n') {
362 line.pop();
363 }
364 let logical = line.strip_suffix('\r').unwrap_or(&line);
365 if logical.chars().all(|ch| ch.is_whitespace()) {
366 continue;
367 }
368
369 let value: serde_json::Value = match serde_json::from_str(logical) {
370 Ok(value) => value,
371 Err(_) => continue,
372 };
373 if value.get("type").and_then(|v| v.as_str()) != Some("session_meta") {
374 continue;
375 }
376 let Some(session_id) = value
377 .get("payload")
378 .and_then(|p| p.get("id"))
379 .and_then(|v| v.as_str())
380 else {
381 continue;
382 };
383 let session_id = session_id.strip_prefix("urn:uuid:").unwrap_or(session_id);
384 if session_id == needle {
385 return Some(path);
386 }
387 }
388 }
389
390 None
391}