1use std::{
16 collections::HashMap,
17 path::{Path, PathBuf},
18};
19
20use chrono::{DateTime, Datelike, SecondsFormat, Utc};
21use serde_json::{Value, json};
22
23use crate::{
24 sessions::IngestEvent,
25 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
26};
27
28use super::{
29 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
30 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
31 empty_options,
32 extract::{Extracted, extract_compact_repr, extract_raw_record, extract_self_str, extract_str},
33 extracted_text,
34 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events},
35 jsonl_bytes, part_id, raw_record,
36};
37
38const NAME: &str = "codex-cli";
39
40pub struct CodexCliFactory;
43
44impl AdapterFactory for CodexCliFactory {
45 fn name(&self) -> &'static str {
46 NAME
47 }
48
49 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50 Ok(Box::new(CodexCliAdapter::new(config_path(NAME, config)?)))
51 }
52
53 fn probe_default(&self, env: &Env) -> Option<Value> {
54 let path = env.home.join(".codex").join("sessions");
55 path.exists().then(|| json!({ "path": path }))
56 }
57
58 fn serialize(
59 &self,
60 session: &crate::sessions::SessionWithMessages,
61 fidelity: RestoreFidelity,
62 ) -> Result<Vec<RestoredFile>, AdapterError> {
63 serialize_session(session, fidelity)
64 }
65}
66
67fn serialize_session(
68 session: &crate::sessions::SessionWithMessages,
69 fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71 let mut records = Vec::new();
76 if fidelity == RestoreFidelity::Native
77 && let Some(raw) = raw_record(&session.session.options)
78 {
79 records.push(raw);
80 } else {
81 records.push(codex_session_meta(session));
82 }
83 let mut messages = session.messages.clone();
84 messages.sort_by(by_timestamp_then_id);
85 for message in &messages {
86 if fidelity == RestoreFidelity::Native
87 && let Some(raw) = raw_record(message.message.options())
88 {
89 records.push(raw);
90 continue;
91 }
92 if matches!(message.message, Message::System { .. }) {
97 continue;
98 }
99 records.push(codex_response_item(message));
100 }
101 Ok(vec![RestoredFile {
102 relative_path: codex_relative_path(session),
103 bytes: jsonl_bytes(NAME, &records)?,
104 }])
105}
106
107fn codex_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
108 let ts = session.session.created_at;
109 let filename_ts = ts.format("%Y-%m-%dT%H-%M-%S");
110 PathBuf::from("sessions")
111 .join(format!("{:04}", ts.year()))
112 .join(format!("{:02}", ts.month()))
113 .join(format!("{:02}", ts.day()))
114 .join(format!(
115 "rollout-{filename_ts}-{}.jsonl",
116 session.session.id
117 ))
118}
119
120fn codex_session_meta(session: &crate::sessions::SessionWithMessages) -> Value {
121 json!({
122 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
123 "type": "session_meta",
124 "payload": {
125 "id": session.session.id,
126 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
127 "cwd": &*session.session.project,
128 }
129 })
130}
131
132fn codex_response_item(message: &crate::sessions::MessageWithParts) -> Value {
133 json!({
134 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
135 "type": "response_item",
136 "payload": codex_payload(message),
137 })
138}
139
140fn codex_payload(message: &crate::sessions::MessageWithParts) -> Value {
141 if let Some(part) = message.parts.first() {
142 match &part.kind {
143 PartKind::ToolCall {
144 call_id,
145 name,
146 params,
147 ..
148 } if matches!(message.message, Message::Assistant { .. }) => {
149 return json!({
150 "type": "function_call",
151 "call_id": extracted_text(call_id),
152 "name": extracted_text(name),
153 "arguments": compact_json(params),
154 });
155 }
156 PartKind::ToolResult {
157 call_id, result, ..
158 } if matches!(message.message, Message::Tool { .. }) => {
159 return json!({
160 "type": "function_call_output",
161 "call_id": extracted_text(call_id),
162 "output": result,
163 });
164 }
165 PartKind::Reasoning { text }
166 if matches!(message.message, Message::Assistant { .. }) =>
167 {
168 if let Some(text) = text
169 && let Ok(value) = serde_json::from_str::<Value>(text.as_ref())
170 {
171 return value;
172 }
173 return json!({
174 "type": "reasoning",
175 "summary": [{"type": "summary_text", "text": extracted_text(text)}],
176 });
177 }
178 _ => {}
179 }
180 }
181 let is_assistant = matches!(message.message, Message::Assistant { .. });
182 json!({
183 "type": "message",
184 "role": match message.message.role() {
185 crate::wire::Role::System => "developer",
186 crate::wire::Role::User => "user",
187 crate::wire::Role::Assistant => "assistant",
188 crate::wire::Role::Tool => "tool",
189 },
190 "content": message
191 .parts
192 .iter()
193 .map(|part| codex_content_part(part, is_assistant))
194 .collect::<Vec<_>>(),
195 })
196}
197
198fn codex_content_part(part: &Part, is_assistant: bool) -> Value {
199 let text_type = if is_assistant {
203 "output_text"
204 } else {
205 "input_text"
206 };
207 match &part.kind {
208 PartKind::Text { text } => json!({
209 "type": text_type,
210 "text": extracted_text(text),
211 }),
212 PartKind::File { data, .. } => json!({
213 "type": text_type,
214 "text": match data {
215 crate::wire::FileData::String(value) => value.clone(),
216 crate::wire::FileData::Bytes(value) => format!("<{} bytes>", value.len()),
217 crate::wire::FileData::Url(value) => value.clone(),
218 },
219 }),
220 other => json!({
221 "type": text_type,
222 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
223 }),
224 }
225}
226
227#[derive(Debug, Clone)]
228pub struct CodexCliAdapter {
229 root: PathBuf,
230}
231
232impl CodexCliAdapter {
233 pub fn new(root: impl Into<PathBuf>) -> Self {
234 Self { root: root.into() }
235 }
236}
237
238impl Adapter for CodexCliAdapter {
239 fn discover(&self) -> DiscoverFuture<'_> {
240 jsonl_tree_discover(self)
241 }
242
243 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
244 jsonl_tree_events(self, oracle)
245 }
246}
247
248impl JsonlTree for CodexCliAdapter {
249 type State = HashMap<String, Extracted<String>>;
250
251 fn name(&self) -> &'static str {
252 NAME
253 }
254
255 fn root(&self) -> &Path {
256 &self.root
257 }
258
259 fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
260 let row: Value = serde_json::from_str(first_line).ok()?;
261 if row.get("type").and_then(Value::as_str) == Some("session_meta") {
262 row.get("payload")?
263 .get("id")?
264 .as_str()
265 .map(ToOwned::to_owned)
266 } else if is_legacy_session_row(&row) {
267 row.get("id")?.as_str().map(ToOwned::to_owned)
268 } else {
269 None
270 }
271 }
272
273 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
274 session_from_rows(path, rows)
275 }
276
277 fn events_from_row(
278 &self,
279 session: &Session,
280 row: &BoundedRow,
281 state: &mut Self::State,
282 ) -> Result<Vec<IngestEvent>, String> {
283 capture_tool_call_name(&row.value, state);
284 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
285 }
286}
287
288fn is_legacy_session_row(row: &Value) -> bool {
293 row.get("type").is_none() && row.get("id").is_some()
294}
295
296fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
297 let path_display = path.display().to_string();
298 let first = rows
299 .first()
300 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
301 let row = &first.value;
302 let at_first = format!("{path_display}:{}", first.line);
303 let payload = if row.get("type").and_then(Value::as_str) == Some("session_meta") {
307 row.get("payload").cloned().unwrap_or(Value::Null)
308 } else if is_legacy_session_row(row) {
309 row.clone()
310 } else {
311 return Err(AdapterError::schema(
312 NAME,
313 at_first,
314 "first row must be session_meta",
315 ));
316 };
317 let id = payload
318 .get("id")
319 .and_then(Value::as_str)
320 .ok_or_else(|| {
321 AdapterError::schema(NAME, at_first.clone(), "session_meta missing payload.id")
322 })?
323 .to_owned();
324 let created_at = payload
325 .get("timestamp")
326 .and_then(Value::as_str)
327 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
328 .map(|dt| dt.with_timezone(&Utc))
329 .or_else(|| {
330 row.get("timestamp")
331 .and_then(Value::as_str)
332 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
333 .map(|dt| dt.with_timezone(&Utc))
334 })
335 .ok_or_else(|| {
336 AdapterError::schema(NAME, at_first, "session_meta has no parseable timestamp")
337 })?;
338 let project = match extract_str(&payload, "cwd") {
339 Some(value) => value,
340 None => {
341 let path_str = path
342 .file_name()
343 .and_then(|n| n.to_str())
344 .unwrap_or(path_display.as_str())
345 .to_owned();
346 extract_self_str(&Value::String(path_str)).ok_or_else(|| {
347 AdapterError::schema(
348 NAME,
349 path_display.clone(),
350 "internal: Value::String produced None from Source::as_str",
351 )
352 })?
353 }
354 };
355 let mut options = ProviderOptions::new();
356 options.insert(
357 "source".to_owned(),
358 json!({
359 "adapter": "codex-cli",
360 "originator": payload.get("originator"),
361 "cli_version": payload.get("cli_version"),
362 "model_provider": payload.get("model_provider"),
363 "git": payload.get("git"),
364 "base_instructions": payload.get("base_instructions"),
365 "instructions": payload.get("instructions"),
366 "source": payload.get("source"),
367 "raw_record": extract_raw_record(row),
368 }),
369 );
370
371 Ok(Session {
372 id,
373 parent_session_id: None,
374 parent_message_id: None,
375 source_agent: "codex-cli".to_owned(),
376 created_at,
377 project,
378 options,
379 })
380}
381
382fn events_from_row(
391 session_id: &str,
392 line: usize,
393 row: &Value,
394 default_timestamp: DateTime<Utc>,
395 tool_call_names: &HashMap<String, Extracted<String>>,
396) -> Result<Vec<IngestEvent>, String> {
397 let kind = row.get("type").and_then(Value::as_str);
398 if kind == Some("session_meta")
402 || is_legacy_session_row(row)
403 || (kind.is_none() && row.get("record_type").is_some())
404 {
405 return Ok(Vec::new());
406 }
407 let (payload, timestamp) = if kind == Some("response_item") {
411 let timestamp = row
412 .get("timestamp")
413 .and_then(Value::as_str)
414 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
415 .map(|dt| dt.with_timezone(&Utc))
416 .unwrap_or(default_timestamp);
417 (row.get("payload").unwrap_or(&Value::Null), timestamp)
418 } else {
419 (row, default_timestamp)
420 };
421 let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
422 let message_id = format!("{session_id}:{line:06}");
423
424 match payload_type {
425 "message" => message_events(session_id, &message_id, timestamp, payload, row),
426 "function_call" => Ok(tool_call_events(
427 session_id,
428 &message_id,
429 timestamp,
430 payload,
431 row,
432 )),
433 "function_call_output" => Ok(tool_result_events(
434 session_id,
435 &message_id,
436 timestamp,
437 payload,
438 row,
439 tool_call_names,
440 )),
441 "reasoning" => Ok(reasoning_events(
442 session_id,
443 &message_id,
444 timestamp,
445 payload,
446 row,
447 )),
448 "custom_tool_call" => Ok(custom_tool_call_events(
449 session_id,
450 &message_id,
451 timestamp,
452 payload,
453 row,
454 )),
455 "custom_tool_call_output" => Ok(custom_tool_result_events(
456 session_id,
457 &message_id,
458 timestamp,
459 payload,
460 row,
461 )),
462 _ => Ok(vec![raw_carrier_event(session_id, line, row, timestamp)]),
463 }
464}
465
466fn row_options(row: &Value) -> ProviderOptions {
467 let mut options = ProviderOptions::new();
468 options.insert(
469 "source".to_owned(),
470 json!({ "raw_record": extract_raw_record(row) }),
471 );
472 options
473}
474
475fn raw_carrier_event(
476 session_id: &str,
477 line: usize,
478 row: &Value,
479 timestamp: DateTime<Utc>,
480) -> IngestEvent {
481 IngestEvent::Message(Message::System {
482 id: row
483 .get("id")
484 .and_then(Value::as_str)
485 .map_or_else(|| format!("{session_id}:{line:06}:raw"), ToOwned::to_owned),
486 session_id: session_id.to_owned(),
487 timestamp: row
488 .get("timestamp")
489 .and_then(Value::as_str)
490 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
491 .map(|dt| dt.with_timezone(&Utc))
492 .unwrap_or(timestamp),
493 content: None,
494 options: row_options(row),
495 })
496}
497
498fn capture_tool_call_name(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
502 let payload = match row.get("type").and_then(Value::as_str) {
505 Some("response_item") => row.get("payload"),
506 Some(_) => Some(row),
507 None => None,
508 };
509 let Some(payload) = payload else {
510 return;
511 };
512 if payload.get("type").and_then(Value::as_str) != Some("function_call") {
513 return;
514 }
515 let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
516 return;
517 };
518 let Some(name) = extract_str(payload, "name") else {
519 return;
520 };
521 map.insert(call_id.to_owned(), name);
522}
523
524fn message_events(
525 session_id: &str,
526 message_id: &str,
527 timestamp: DateTime<Utc>,
528 payload: &Value,
529 row: &Value,
530) -> Result<Vec<IngestEvent>, String> {
531 let role = payload
532 .get("role")
533 .and_then(Value::as_str)
534 .ok_or_else(|| "message missing role".to_owned())?;
535 let content = payload
536 .get("content")
537 .and_then(Value::as_array)
538 .cloned()
539 .unwrap_or_default();
540 let provenance = message_provenance(role, &content);
545 let mut parts = Vec::with_capacity(content.len());
546 for (ordinal, item) in content.iter().enumerate() {
547 let text = extract_str(item, "text").or_else(|| Some(extract_compact_repr(item)));
552 parts.push(Part {
553 session_id: session_id.to_owned(),
554 id: part_id(message_id, ordinal),
555 message_id: message_id.to_owned(),
556 ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
557 provenance,
558 options: empty_options(),
559 kind: PartKind::Text { text },
560 });
561 }
562
563 let (message, keep_parts) = match role {
564 "user" => (
565 Message::User {
566 id: message_id.to_owned(),
567 session_id: session_id.to_owned(),
568 timestamp,
569 options: row_options(row),
570 },
571 true,
572 ),
573 "assistant" => (
574 Message::Assistant {
575 id: message_id.to_owned(),
576 session_id: session_id.to_owned(),
577 timestamp,
578 options: row_options(row),
579 },
580 true,
581 ),
582 "developer" | "system" => (
585 Message::System {
586 id: message_id.to_owned(),
587 session_id: session_id.to_owned(),
588 timestamp,
589 content: None,
590 options: row_options(row),
591 },
592 true,
593 ),
594 other => return Err(format!("unsupported codex-cli role {other}")),
595 };
596
597 let mut events = Vec::with_capacity(parts.len() + 1);
598 events.push(IngestEvent::Message(message));
599 if keep_parts {
600 events.extend(parts.into_iter().map(IngestEvent::Part));
601 }
602 Ok(events)
603}
604
605fn message_provenance(role: &str, content: &[Value]) -> Provenance {
611 if role == "developer" || role == "system" {
612 return Provenance::Injected;
613 }
614 if role == "user" {
615 let injected = content.iter().any(|item| {
616 item.get("text")
617 .and_then(Value::as_str)
618 .is_some_and(is_injected_user_text)
619 });
620 if injected {
621 return Provenance::Injected;
622 }
623 }
624 Provenance::Conversational
625}
626
627fn is_injected_user_text(text: &str) -> bool {
629 let trimmed = text.trim_start();
630 trimmed.starts_with("<environment_context>")
631 || trimmed.starts_with("<user_instructions>")
632 || trimmed.starts_with("# AGENTS.md")
633}
634
635fn tool_call_events(
636 session_id: &str,
637 message_id: &str,
638 timestamp: DateTime<Utc>,
639 payload: &Value,
640 row: &Value,
641) -> Vec<IngestEvent> {
642 let call_id = extract_str(payload, "call_id");
643 let name = extract_str(payload, "name");
644 let params = match payload.get("arguments") {
645 Some(Value::String(text)) => {
646 serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
647 }
648 Some(other) => other.clone(),
649 None => Value::Null,
650 };
651 let part = Part {
652 session_id: session_id.to_owned(),
653 id: part_id(message_id, 0),
654 message_id: message_id.to_owned(),
655 ordinal: 0,
656 provenance: Provenance::Conversational,
658 options: empty_options(),
659 kind: PartKind::ToolCall {
660 call_id,
661 name,
662 params,
663 provider_executed: false,
664 },
665 };
666 vec![
667 IngestEvent::Message(Message::Assistant {
668 id: message_id.to_owned(),
669 session_id: session_id.to_owned(),
670 timestamp,
671 options: row_options(row),
672 }),
673 IngestEvent::Part(part),
674 ]
675}
676
677fn custom_tool_call_events(
678 session_id: &str,
679 message_id: &str,
680 timestamp: DateTime<Utc>,
681 payload: &Value,
682 row: &Value,
683) -> Vec<IngestEvent> {
684 let part = Part {
685 session_id: session_id.to_owned(),
686 id: part_id(message_id, 0),
687 message_id: message_id.to_owned(),
688 ordinal: 0,
689 provenance: Provenance::Conversational,
691 options: empty_options(),
692 kind: PartKind::ToolCall {
693 call_id: extract_str(payload, "call_id"),
694 name: extract_str(payload, "name"),
695 params: payload.get("input").cloned().unwrap_or(Value::Null),
696 provider_executed: true,
697 },
698 };
699 vec![
700 IngestEvent::Message(Message::Assistant {
701 id: message_id.to_owned(),
702 session_id: session_id.to_owned(),
703 timestamp,
704 options: row_options(row),
705 }),
706 IngestEvent::Part(part),
707 ]
708}
709
710fn custom_tool_result_events(
711 session_id: &str,
712 message_id: &str,
713 timestamp: DateTime<Utc>,
714 payload: &Value,
715 row: &Value,
716) -> Vec<IngestEvent> {
717 let part = Part {
718 session_id: session_id.to_owned(),
719 id: part_id(message_id, 0),
720 message_id: message_id.to_owned(),
721 ordinal: 0,
722 provenance: Provenance::Injected,
724 options: empty_options(),
725 kind: PartKind::ToolResult {
726 call_id: extract_str(payload, "call_id"),
727 name: extract_str(payload, "name"),
728 is_failure: false,
729 result: payload.get("output").cloned().unwrap_or(Value::Null),
730 },
731 };
732 vec![
733 IngestEvent::Message(Message::Tool {
734 id: message_id.to_owned(),
735 session_id: session_id.to_owned(),
736 timestamp,
737 options: row_options(row),
738 }),
739 IngestEvent::Part(part),
740 ]
741}
742
743fn tool_result_events(
744 session_id: &str,
745 message_id: &str,
746 timestamp: DateTime<Utc>,
747 payload: &Value,
748 row: &Value,
749 tool_call_names: &HashMap<String, Extracted<String>>,
750) -> Vec<IngestEvent> {
751 let call_id = extract_str(payload, "call_id");
752 let name = call_id
756 .as_ref()
757 .and_then(|id| tool_call_names.get(id.as_str()))
758 .cloned();
759 let result = payload.get("output").cloned().unwrap_or(Value::Null);
760 let part = Part {
761 session_id: session_id.to_owned(),
762 id: part_id(message_id, 0),
763 message_id: message_id.to_owned(),
764 ordinal: 0,
765 provenance: Provenance::Injected,
767 options: empty_options(),
768 kind: PartKind::ToolResult {
769 call_id,
770 name,
771 is_failure: false,
772 result,
773 },
774 };
775 vec![
776 IngestEvent::Message(Message::Tool {
777 id: message_id.to_owned(),
778 session_id: session_id.to_owned(),
779 timestamp,
780 options: row_options(row),
781 }),
782 IngestEvent::Part(part),
783 ]
784}
785
786fn reasoning_events(
787 session_id: &str,
788 message_id: &str,
789 timestamp: DateTime<Utc>,
790 payload: &Value,
791 row: &Value,
792) -> Vec<IngestEvent> {
793 let summary = payload
797 .get("summary")
798 .and_then(Value::as_array)
799 .and_then(|items| {
800 let joined = items
801 .iter()
802 .filter_map(|item| extract_str(item, "text"))
803 .map(|e| (*e).clone())
804 .collect::<Vec<_>>()
805 .join("\n");
806 if joined.is_empty() {
807 None
808 } else {
809 Some(extract_compact_repr(payload))
810 }
811 });
812 let part = Part {
813 session_id: session_id.to_owned(),
814 id: part_id(message_id, 0),
815 message_id: message_id.to_owned(),
816 ordinal: 0,
817 provenance: Provenance::Conversational,
819 options: empty_options(),
820 kind: PartKind::Reasoning { text: summary },
821 };
822 vec![
823 IngestEvent::Message(Message::Assistant {
824 id: message_id.to_owned(),
825 session_id: session_id.to_owned(),
826 timestamp,
827 options: row_options(row),
828 }),
829 IngestEvent::Part(part),
830 ]
831}
832
833#[cfg(test)]
834mod tests {
835 #![allow(clippy::expect_used, clippy::unwrap_used)]
840
841 use super::*;
842 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
843 use tempfile::TempDir;
844
845 const FIXTURES: &str = "tests/fixtures/adapter/codex_cli/sessions";
846
847 #[tokio::test(flavor = "multi_thread")]
848 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
849 let adapter = CodexCliAdapter::new(FIXTURES);
850 crate::adapter::test_support::assert_native_restore(
851 &CodexCliFactory,
852 &adapter,
853 std::path::Path::new(FIXTURES)
856 .parent()
857 .expect("FIXTURES is nested under a corpus root"),
858 )
859 .await
860 }
861
862 #[tokio::test(flavor = "multi_thread")]
863 async fn codex_cli_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
864 let temp = TempDir::new()?;
865 let store = Store::open_local(temp.path()).await?;
866 let adapter = CodexCliAdapter::new(FIXTURES);
867
868 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
869 assert!(summary.accepted() > 0, "ingest must accept rows");
870 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
871 assert_eq!(
872 summary.dropped_sessions, 0,
873 "no session-level rejections expected"
874 );
875 assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
876
877 let (sessions, messages, parts) = store.row_counts().await?;
878 assert!(sessions > 0, "at least one codex-cli session");
879 assert!(messages > 0, "at least one codex-cli message");
880 assert!(parts > 0, "at least one codex-cli Part");
881
882 let mut saw_text_part = false;
883 for session_id in store.session_ids().await? {
884 let session = store
885 .get_session(&session_id)
886 .await?
887 .expect("session round-trips");
888 assert_eq!(session.session.source_agent, "codex-cli");
889 assert!(
890 !session.messages.is_empty(),
891 "session {session_id} must carry messages",
892 );
893 for stored in &session.messages {
894 for part in &stored.parts {
895 if matches!(part.kind, PartKind::Text { .. }) {
896 saw_text_part = true;
897 }
898 }
899 }
900 }
901 assert!(
902 saw_text_part,
903 "codex-cli corpus must contain at least one Text Part",
904 );
905 Ok(())
906 }
907
908 #[test]
912 fn message_provenance_separates_prompts_from_harness_records() {
913 let prompt = vec![json!({"type": "input_text", "text": "refactor this"})];
914 assert_eq!(
915 message_provenance("user", &prompt),
916 Provenance::Conversational,
917 );
918 assert_eq!(
919 message_provenance("assistant", &[]),
920 Provenance::Conversational,
921 );
922
923 let developer = vec![json!({"type": "input_text", "text": "you are an agent"})];
924 assert_eq!(
925 message_provenance("developer", &developer),
926 Provenance::Injected,
927 );
928
929 let env = vec![json!({
930 "type": "input_text",
931 "text": "<environment_context>cwd=/tmp</environment_context>",
932 })];
933 assert_eq!(message_provenance("user", &env), Provenance::Injected);
934 }
935
936 #[test]
937 fn legacy_rows_normalize_to_payloads() {
938 let ts = Utc::now();
939 let map: HashMap<String, Extracted<String>> = HashMap::new();
940
941 let first = json!({"id": "s1", "timestamp": "2025-09-13T04:30:17.447Z"});
943 let state = json!({"record_type": "state"});
944 assert!(
945 events_from_row("s1", 1, &first, ts, &map)
946 .expect("legacy first row parses")
947 .is_empty(),
948 );
949 assert!(
950 events_from_row("s1", 2, &state, ts, &map)
951 .expect("state marker parses")
952 .is_empty(),
953 );
954
955 let message = json!({
957 "type": "message",
958 "role": "user",
959 "content": [{"type": "input_text", "text": "hi"}],
960 });
961 let events = events_from_row("s1", 3, &message, ts, &map).expect("legacy message parses");
962 assert_eq!(events.len(), 2, "message + one Text Part");
963 assert!(matches!(
964 events[0],
965 IngestEvent::Message(Message::User { .. })
966 ));
967 assert!(matches!(
968 &events[1],
969 IngestEvent::Part(part) if matches!(part.kind, PartKind::Text { .. }),
970 ));
971 }
972
973 #[tokio::test(flavor = "multi_thread")]
974 async fn legacy_rollout_ingests_into_canonical_shape() -> anyhow::Result<()> {
975 let temp = TempDir::new()?;
976 let store = Store::open_local(temp.path()).await?;
977 let adapter = CodexCliAdapter::new(FIXTURES);
978 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
979
980 let session = store
983 .get_session("67c52f3f-d25e-4194-a006-93de58f28d7c")
984 .await?
985 .expect("legacy rollout ingests as a session");
986 assert_eq!(session.session.source_agent, "codex-cli");
987 assert_eq!(
988 session
989 .session
990 .created_at
991 .to_rfc3339_opts(SecondsFormat::Millis, true),
992 "2025-09-13T04:30:17.447Z",
993 );
994 assert_eq!(session.messages.len(), 11, "every legacy data row ingests");
996 let resolved = session.messages.iter().any(|message| {
999 message
1000 .parts
1001 .iter()
1002 .any(|part| matches!(&part.kind, PartKind::ToolResult { name: Some(_), .. }))
1003 });
1004 assert!(
1005 resolved,
1006 "legacy function_call_output resolves its tool name"
1007 );
1008 Ok(())
1009 }
1010}