1use std::{
16 collections::HashMap,
17 path::{Path, PathBuf},
18};
19
20use chrono::{DateTime, Datelike, SecondsFormat, Utc};
21use serde_json::{Value, json};
22
23use crate::{
24 sessions::IngestEvent,
25 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
26};
27
28use super::{
29 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
30 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
31 empty_options,
32 extract::{Extracted, extract_compact_repr, extract_raw_record, extract_self_str, extract_str},
33 extracted_text,
34 jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events},
35 jsonl_bytes, part_id, part_ordinal, raw_record,
36};
37
38const NAME: &str = "codex-cli";
39
40pub struct CodexCliFactory;
43
44impl AdapterFactory for CodexCliFactory {
45 fn name(&self) -> &'static str {
46 NAME
47 }
48
49 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50 Ok(Box::new(CodexCliAdapter::new(config_path(NAME, config)?)))
51 }
52
53 fn probe_default(&self, env: &Env) -> Option<Value> {
54 let path = env.home.join(".codex").join("sessions");
55 path.exists().then(|| json!({ "path": path }))
56 }
57
58 fn serialize(
59 &self,
60 session: &crate::sessions::SessionWithMessages,
61 fidelity: RestoreFidelity,
62 ) -> Result<Vec<RestoredFile>, AdapterError> {
63 serialize_session(session, fidelity)
64 }
65}
66
67fn serialize_session(
68 session: &crate::sessions::SessionWithMessages,
69 fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71 let mut records = Vec::new();
76 if fidelity == RestoreFidelity::Native
77 && let Some(raw) = raw_record(&session.session.options)
78 {
79 records.push(raw);
80 } else {
81 records.push(codex_session_meta(session));
82 }
83 let mut messages = session.messages.clone();
84 messages.sort_by(by_timestamp_then_id);
85 for message in &messages {
86 if fidelity == RestoreFidelity::Native
87 && let Some(raw) = raw_record(message.message.options())
88 {
89 records.push(raw);
90 continue;
91 }
92 if matches!(message.message, Message::System { .. }) {
97 continue;
98 }
99 records.push(codex_response_item(message));
100 }
101 Ok(vec![RestoredFile::new(
102 codex_relative_path(session),
103 jsonl_bytes(NAME, &records)?,
104 fidelity,
105 )])
106}
107
108fn codex_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
109 let ts = session.session.created_at;
110 let filename_ts = ts.format("%Y-%m-%dT%H-%M-%S");
111 PathBuf::from("sessions")
112 .join(format!("{:04}", ts.year()))
113 .join(format!("{:02}", ts.month()))
114 .join(format!("{:02}", ts.day()))
115 .join(format!(
116 "rollout-{filename_ts}-{}.jsonl",
117 session.session.id
118 ))
119}
120
121fn codex_session_meta(session: &crate::sessions::SessionWithMessages) -> Value {
122 json!({
123 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
124 "type": "session_meta",
125 "payload": {
126 "id": session.session.id,
127 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
128 "cwd": &*session.session.project,
129 }
130 })
131}
132
133fn codex_response_item(message: &crate::sessions::MessageWithParts) -> Value {
134 json!({
135 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
136 "type": "response_item",
137 "payload": codex_payload(message),
138 })
139}
140
141fn codex_payload(message: &crate::sessions::MessageWithParts) -> Value {
142 if let Some(part) = message.parts.first() {
143 match &part.kind {
144 PartKind::ToolCall {
145 call_id,
146 name,
147 params,
148 ..
149 } if matches!(message.message, Message::Assistant { .. }) => {
150 return json!({
151 "type": "function_call",
152 "call_id": extracted_text(call_id),
153 "name": extracted_text(name),
154 "arguments": compact_json(params),
155 });
156 }
157 PartKind::ToolResult {
158 call_id, result, ..
159 } if matches!(message.message, Message::Tool { .. }) => {
160 return json!({
161 "type": "function_call_output",
162 "call_id": extracted_text(call_id),
163 "output": result,
164 });
165 }
166 PartKind::Reasoning { text }
167 if matches!(message.message, Message::Assistant { .. }) =>
168 {
169 if let Some(text) = text
170 && let Ok(value) = serde_json::from_str::<Value>(text.as_ref())
171 {
172 return value;
173 }
174 return json!({
175 "type": "reasoning",
176 "summary": [{"type": "summary_text", "text": extracted_text(text)}],
177 });
178 }
179 _ => {}
180 }
181 }
182 let is_assistant = matches!(message.message, Message::Assistant { .. });
183 json!({
184 "type": "message",
185 "role": match message.message.role() {
186 crate::wire::Role::System => "developer",
187 crate::wire::Role::User => "user",
188 crate::wire::Role::Assistant => "assistant",
189 crate::wire::Role::Tool => "tool",
190 },
191 "content": message
192 .parts
193 .iter()
194 .map(|part| codex_content_part(part, is_assistant))
195 .collect::<Vec<_>>(),
196 })
197}
198
199fn codex_content_part(part: &Part, is_assistant: bool) -> Value {
200 let text_type = if is_assistant {
204 "output_text"
205 } else {
206 "input_text"
207 };
208 match &part.kind {
209 PartKind::Text { text } => json!({
210 "type": text_type,
211 "text": extracted_text(text),
212 }),
213 PartKind::File { data, .. } => json!({
214 "type": text_type,
215 "text": match data {
216 crate::wire::FileData::String(value) => value.clone(),
217 crate::wire::FileData::Bytes(value) => format!("<{} bytes>", value.len()),
218 crate::wire::FileData::Url(value) => value.clone(),
219 },
220 }),
221 other => json!({
222 "type": text_type,
223 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
224 }),
225 }
226}
227
228#[derive(Debug, Clone)]
229pub struct CodexCliAdapter {
230 root: PathBuf,
231}
232
233impl CodexCliAdapter {
234 pub fn new(root: impl Into<PathBuf>) -> Self {
235 Self { root: root.into() }
236 }
237}
238
239impl Adapter for CodexCliAdapter {
240 fn discover(&self) -> DiscoverFuture<'_> {
241 jsonl_tree_discover(self)
242 }
243
244 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
245 jsonl_tree_events(self, oracle)
246 }
247}
248
249impl JsonlTree for CodexCliAdapter {
250 type State = HashMap<String, Extracted<String>>;
251
252 fn name(&self) -> &'static str {
253 NAME
254 }
255
256 fn root(&self) -> &Path {
257 &self.root
258 }
259
260 fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
261 let row: Value = serde_json::from_str(first_line).ok()?;
262 if row.get("type").and_then(Value::as_str) == Some("session_meta") {
263 row.get("payload")?
264 .get("id")?
265 .as_str()
266 .map(ToOwned::to_owned)
267 } else if is_legacy_session_row(&row) {
268 row.get("id")?.as_str().map(ToOwned::to_owned)
269 } else {
270 None
271 }
272 }
273
274 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
275 session_from_rows(path, rows)
276 }
277
278 fn events_from_row(
279 &self,
280 session: &Session,
281 row: &BoundedRow,
282 state: &mut Self::State,
283 ) -> Result<Vec<IngestEvent>, String> {
284 capture_tool_call_name(&row.value, state);
285 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
286 }
287}
288
289fn is_legacy_session_row(row: &Value) -> bool {
294 row.get("type").is_none() && row.get("id").is_some()
295}
296
297fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
298 let path_display = path.display().to_string();
299 let first = rows
300 .first()
301 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
302 let row = &first.value;
303 let at_first = format!("{path_display}:{}", first.line);
304 let payload = if row.get("type").and_then(Value::as_str) == Some("session_meta") {
308 row.get("payload").cloned().unwrap_or(Value::Null)
309 } else if is_legacy_session_row(row) {
310 row.clone()
311 } else {
312 return Err(AdapterError::schema(
313 NAME,
314 at_first,
315 "first row must be session_meta",
316 ));
317 };
318 let id = payload
319 .get("id")
320 .and_then(Value::as_str)
321 .ok_or_else(|| {
322 AdapterError::schema(NAME, at_first.clone(), "session_meta missing payload.id")
323 })?
324 .to_owned();
325 let created_at = payload
326 .get("timestamp")
327 .and_then(Value::as_str)
328 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
329 .map(|dt| dt.with_timezone(&Utc))
330 .or_else(|| {
331 row.get("timestamp")
332 .and_then(Value::as_str)
333 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
334 .map(|dt| dt.with_timezone(&Utc))
335 })
336 .ok_or_else(|| {
337 AdapterError::schema(NAME, at_first, "session_meta has no parseable timestamp")
338 })?;
339 let project = match extract_str(&payload, "cwd") {
340 Some(value) => value,
341 None => {
342 let path_str = path
343 .file_name()
344 .and_then(|n| n.to_str())
345 .unwrap_or(path_display.as_str())
346 .to_owned();
347 extract_self_str(&Value::String(path_str)).ok_or_else(|| {
348 AdapterError::schema(
349 NAME,
350 path_display.clone(),
351 "internal: Value::String produced None from Source::as_str",
352 )
353 })?
354 }
355 };
356 let mut options = ProviderOptions::new();
357 options.insert(
358 "source".to_owned(),
359 json!({
360 "adapter": "codex-cli",
361 "originator": payload.get("originator"),
362 "cli_version": payload.get("cli_version"),
363 "model_provider": payload.get("model_provider"),
364 "git": payload.get("git"),
365 "base_instructions": payload.get("base_instructions"),
366 "instructions": payload.get("instructions"),
367 "source": payload.get("source"),
368 "raw_record": extract_raw_record(row),
369 }),
370 );
371
372 Ok(Session {
373 id,
374 parent_session_id: None,
375 parent_message_id: None,
376 source_agent: "codex-cli".to_owned(),
377 created_at,
378 project,
379 options,
380 })
381}
382
383fn events_from_row(
392 session_id: &str,
393 line: usize,
394 row: &Value,
395 default_timestamp: DateTime<Utc>,
396 tool_call_names: &HashMap<String, Extracted<String>>,
397) -> Result<Vec<IngestEvent>, String> {
398 let kind = row.get("type").and_then(Value::as_str);
399 if kind == Some("session_meta")
403 || is_legacy_session_row(row)
404 || (kind.is_none() && row.get("record_type").is_some())
405 {
406 return Ok(Vec::new());
407 }
408 let (payload, timestamp) = if kind == Some("response_item") {
412 let timestamp = row
413 .get("timestamp")
414 .and_then(Value::as_str)
415 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
416 .map(|dt| dt.with_timezone(&Utc))
417 .unwrap_or(default_timestamp);
418 (row.get("payload").unwrap_or(&Value::Null), timestamp)
419 } else {
420 (row, default_timestamp)
421 };
422 let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
423 let message_id = format!("{session_id}:{line:06}");
424
425 match payload_type {
426 "message" => message_events(session_id, &message_id, timestamp, payload, row),
427 "function_call" => Ok(tool_call_events(
428 session_id,
429 &message_id,
430 timestamp,
431 payload,
432 row,
433 )),
434 "function_call_output" => Ok(tool_result_events(
435 session_id,
436 &message_id,
437 timestamp,
438 payload,
439 row,
440 tool_call_names,
441 )),
442 "reasoning" => Ok(reasoning_events(
443 session_id,
444 &message_id,
445 timestamp,
446 payload,
447 row,
448 )),
449 "custom_tool_call" => Ok(custom_tool_call_events(
450 session_id,
451 &message_id,
452 timestamp,
453 payload,
454 row,
455 )),
456 "custom_tool_call_output" => Ok(custom_tool_result_events(
457 session_id,
458 &message_id,
459 timestamp,
460 payload,
461 row,
462 )),
463 _ => Ok(vec![raw_carrier_event(session_id, line, row, timestamp)]),
464 }
465}
466
467fn row_options(row: &Value) -> ProviderOptions {
468 let mut options = ProviderOptions::new();
469 options.insert(
470 "source".to_owned(),
471 json!({ "raw_record": extract_raw_record(row) }),
472 );
473 options
474}
475
476fn raw_carrier_event(
477 session_id: &str,
478 line: usize,
479 row: &Value,
480 timestamp: DateTime<Utc>,
481) -> IngestEvent {
482 IngestEvent::Message(Message::System {
483 id: row
484 .get("id")
485 .and_then(Value::as_str)
486 .map_or_else(|| format!("{session_id}:{line:06}:raw"), ToOwned::to_owned),
487 session_id: session_id.to_owned(),
488 timestamp: row
489 .get("timestamp")
490 .and_then(Value::as_str)
491 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
492 .map(|dt| dt.with_timezone(&Utc))
493 .unwrap_or(timestamp),
494 content: None,
495 options: row_options(row),
496 })
497}
498
499fn capture_tool_call_name(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
503 let payload = match row.get("type").and_then(Value::as_str) {
506 Some("response_item") => row.get("payload"),
507 Some(_) => Some(row),
508 None => None,
509 };
510 let Some(payload) = payload else {
511 return;
512 };
513 if payload.get("type").and_then(Value::as_str) != Some("function_call") {
514 return;
515 }
516 let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
517 return;
518 };
519 let Some(name) = extract_str(payload, "name") else {
520 return;
521 };
522 map.insert(call_id.to_owned(), name);
523}
524
525fn message_events(
526 session_id: &str,
527 message_id: &str,
528 timestamp: DateTime<Utc>,
529 payload: &Value,
530 row: &Value,
531) -> Result<Vec<IngestEvent>, String> {
532 let role = payload
533 .get("role")
534 .and_then(Value::as_str)
535 .ok_or_else(|| "message missing role".to_owned())?;
536 let content = payload
537 .get("content")
538 .and_then(Value::as_array)
539 .cloned()
540 .unwrap_or_default();
541 let provenance = message_provenance(role, &content);
546 let mut parts = Vec::with_capacity(content.len());
547 for (ordinal, item) in content.iter().enumerate() {
548 let text = extract_str(item, "text").or_else(|| Some(extract_compact_repr(item)));
553 parts.push(Part {
554 session_id: session_id.to_owned(),
555 id: part_id(message_id, ordinal),
556 message_id: message_id.to_owned(),
557 ordinal: part_ordinal(ordinal),
558 provenance,
559 options: empty_options(),
560 kind: PartKind::Text { text },
561 });
562 }
563
564 let (message, keep_parts) = match role {
565 "user" => (
566 Message::User {
567 id: message_id.to_owned(),
568 session_id: session_id.to_owned(),
569 timestamp,
570 options: row_options(row),
571 },
572 true,
573 ),
574 "assistant" => (
575 Message::Assistant {
576 id: message_id.to_owned(),
577 session_id: session_id.to_owned(),
578 timestamp,
579 options: row_options(row),
580 },
581 true,
582 ),
583 "developer" | "system" => (
586 Message::System {
587 id: message_id.to_owned(),
588 session_id: session_id.to_owned(),
589 timestamp,
590 content: None,
591 options: row_options(row),
592 },
593 true,
594 ),
595 other => return Err(format!("unsupported codex-cli role {other}")),
596 };
597
598 let mut events = Vec::with_capacity(parts.len() + 1);
599 events.push(IngestEvent::Message(message));
600 if keep_parts {
601 events.extend(parts.into_iter().map(IngestEvent::Part));
602 }
603 Ok(events)
604}
605
606fn message_provenance(role: &str, content: &[Value]) -> Provenance {
612 if role == "developer" || role == "system" {
613 return Provenance::Injected;
614 }
615 if role == "user" {
616 let injected = content.iter().any(|item| {
617 item.get("text")
618 .and_then(Value::as_str)
619 .is_some_and(is_injected_user_text)
620 });
621 if injected {
622 return Provenance::Injected;
623 }
624 }
625 Provenance::Conversational
626}
627
628fn is_injected_user_text(text: &str) -> bool {
630 let trimmed = text.trim_start();
631 trimmed.starts_with("<environment_context>")
632 || trimmed.starts_with("<user_instructions>")
633 || trimmed.starts_with("# AGENTS.md")
634}
635
636fn tool_call_events(
637 session_id: &str,
638 message_id: &str,
639 timestamp: DateTime<Utc>,
640 payload: &Value,
641 row: &Value,
642) -> Vec<IngestEvent> {
643 let call_id = extract_str(payload, "call_id");
644 let name = extract_str(payload, "name");
645 let params = match payload.get("arguments") {
646 Some(Value::String(text)) => {
647 serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
648 }
649 Some(other) => other.clone(),
650 None => Value::Null,
651 };
652 let part = Part {
653 session_id: session_id.to_owned(),
654 id: part_id(message_id, 0),
655 message_id: message_id.to_owned(),
656 ordinal: 0,
657 provenance: Provenance::Conversational,
659 options: empty_options(),
660 kind: PartKind::ToolCall {
661 call_id,
662 name,
663 params,
664 provider_executed: false,
665 },
666 };
667 vec![
668 IngestEvent::Message(Message::Assistant {
669 id: message_id.to_owned(),
670 session_id: session_id.to_owned(),
671 timestamp,
672 options: row_options(row),
673 }),
674 IngestEvent::Part(part),
675 ]
676}
677
678fn custom_tool_call_events(
679 session_id: &str,
680 message_id: &str,
681 timestamp: DateTime<Utc>,
682 payload: &Value,
683 row: &Value,
684) -> Vec<IngestEvent> {
685 let part = Part {
686 session_id: session_id.to_owned(),
687 id: part_id(message_id, 0),
688 message_id: message_id.to_owned(),
689 ordinal: 0,
690 provenance: Provenance::Conversational,
692 options: empty_options(),
693 kind: PartKind::ToolCall {
694 call_id: extract_str(payload, "call_id"),
695 name: extract_str(payload, "name"),
696 params: payload.get("input").cloned().unwrap_or(Value::Null),
697 provider_executed: true,
698 },
699 };
700 vec![
701 IngestEvent::Message(Message::Assistant {
702 id: message_id.to_owned(),
703 session_id: session_id.to_owned(),
704 timestamp,
705 options: row_options(row),
706 }),
707 IngestEvent::Part(part),
708 ]
709}
710
711fn custom_tool_result_events(
712 session_id: &str,
713 message_id: &str,
714 timestamp: DateTime<Utc>,
715 payload: &Value,
716 row: &Value,
717) -> Vec<IngestEvent> {
718 let part = Part {
719 session_id: session_id.to_owned(),
720 id: part_id(message_id, 0),
721 message_id: message_id.to_owned(),
722 ordinal: 0,
723 provenance: Provenance::Injected,
725 options: empty_options(),
726 kind: PartKind::ToolResult {
727 call_id: extract_str(payload, "call_id"),
728 name: extract_str(payload, "name"),
729 is_failure: false,
730 result: payload.get("output").cloned().unwrap_or(Value::Null),
731 },
732 };
733 vec![
734 IngestEvent::Message(Message::Tool {
735 id: message_id.to_owned(),
736 session_id: session_id.to_owned(),
737 timestamp,
738 options: row_options(row),
739 }),
740 IngestEvent::Part(part),
741 ]
742}
743
744fn tool_result_events(
745 session_id: &str,
746 message_id: &str,
747 timestamp: DateTime<Utc>,
748 payload: &Value,
749 row: &Value,
750 tool_call_names: &HashMap<String, Extracted<String>>,
751) -> Vec<IngestEvent> {
752 let call_id = extract_str(payload, "call_id");
753 let name = call_id
757 .as_ref()
758 .and_then(|id| tool_call_names.get(id.as_str()))
759 .cloned();
760 let result = payload.get("output").cloned().unwrap_or(Value::Null);
761 let part = Part {
762 session_id: session_id.to_owned(),
763 id: part_id(message_id, 0),
764 message_id: message_id.to_owned(),
765 ordinal: 0,
766 provenance: Provenance::Injected,
768 options: empty_options(),
769 kind: PartKind::ToolResult {
770 call_id,
771 name,
772 is_failure: false,
773 result,
774 },
775 };
776 vec![
777 IngestEvent::Message(Message::Tool {
778 id: message_id.to_owned(),
779 session_id: session_id.to_owned(),
780 timestamp,
781 options: row_options(row),
782 }),
783 IngestEvent::Part(part),
784 ]
785}
786
787fn reasoning_events(
788 session_id: &str,
789 message_id: &str,
790 timestamp: DateTime<Utc>,
791 payload: &Value,
792 row: &Value,
793) -> Vec<IngestEvent> {
794 let summary = payload
798 .get("summary")
799 .and_then(Value::as_array)
800 .and_then(|items| {
801 let joined = items
802 .iter()
803 .filter_map(|item| extract_str(item, "text"))
804 .map(|e| (*e).clone())
805 .collect::<Vec<_>>()
806 .join("\n");
807 if joined.is_empty() {
808 None
809 } else {
810 Some(extract_compact_repr(payload))
811 }
812 });
813 let part = Part {
814 session_id: session_id.to_owned(),
815 id: part_id(message_id, 0),
816 message_id: message_id.to_owned(),
817 ordinal: 0,
818 provenance: Provenance::Conversational,
820 options: empty_options(),
821 kind: PartKind::Reasoning { text: summary },
822 };
823 vec![
824 IngestEvent::Message(Message::Assistant {
825 id: message_id.to_owned(),
826 session_id: session_id.to_owned(),
827 timestamp,
828 options: row_options(row),
829 }),
830 IngestEvent::Part(part),
831 ]
832}
833
834#[cfg(test)]
835mod tests {
836 #![allow(clippy::expect_used, clippy::unwrap_used)]
841
842 use super::*;
843 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
844 use tempfile::TempDir;
845
846 const FIXTURES: &str = concat!(
849 env!("CARGO_MANIFEST_DIR"),
850 "/tests/fixtures/adapter/codex_cli/sessions"
851 );
852
853 #[test]
854 fn probe_default_finds_codex_sessions_under_home() -> anyhow::Result<()> {
855 crate::adapter::test_support::assert_probe_default(
856 &CodexCliFactory,
857 &[".codex", "sessions"],
858 )
859 }
860
861 #[tokio::test(flavor = "multi_thread")]
862 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
863 let adapter = CodexCliAdapter::new(FIXTURES);
864 crate::adapter::test_support::assert_native_restore(
865 &CodexCliFactory,
866 &adapter,
867 std::path::Path::new(FIXTURES)
870 .parent()
871 .expect("FIXTURES is nested under a corpus root"),
872 )
873 .await
874 }
875
876 #[tokio::test(flavor = "multi_thread")]
877 async fn codex_cli_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
878 let temp = TempDir::new()?;
879 let store = Store::open_local(temp.path()).await?;
880 let adapter = CodexCliAdapter::new(FIXTURES);
881
882 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
883 assert!(summary.accepted() > 0, "ingest must accept rows");
884 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
885 assert_eq!(
886 summary.dropped_sessions, 0,
887 "no session-level rejections expected"
888 );
889 assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
890
891 let (sessions, messages, parts) = store.row_counts().await?;
892 assert!(sessions > 0, "at least one codex-cli session");
893 assert!(messages > 0, "at least one codex-cli message");
894 assert!(parts > 0, "at least one codex-cli Part");
895
896 let mut saw_text_part = false;
897 for session_id in store.session_ids().await? {
898 let session = store
899 .get_session(&session_id)
900 .await?
901 .expect("session round-trips");
902 assert_eq!(session.session.source_agent, "codex-cli");
903 assert!(
904 !session.messages.is_empty(),
905 "session {session_id} must carry messages",
906 );
907 for stored in &session.messages {
908 for part in &stored.parts {
909 if matches!(part.kind, PartKind::Text { .. }) {
910 saw_text_part = true;
911 }
912 }
913 }
914 }
915 assert!(
916 saw_text_part,
917 "codex-cli corpus must contain at least one Text Part",
918 );
919 Ok(())
920 }
921
922 #[test]
926 fn message_provenance_separates_prompts_from_harness_records() {
927 let prompt = vec![json!({"type": "input_text", "text": "refactor this"})];
928 assert_eq!(
929 message_provenance("user", &prompt),
930 Provenance::Conversational,
931 );
932 assert_eq!(
933 message_provenance("assistant", &[]),
934 Provenance::Conversational,
935 );
936
937 let developer = vec![json!({"type": "input_text", "text": "you are an agent"})];
938 assert_eq!(
939 message_provenance("developer", &developer),
940 Provenance::Injected,
941 );
942
943 let env = vec![json!({
944 "type": "input_text",
945 "text": "<environment_context>cwd=/tmp</environment_context>",
946 })];
947 assert_eq!(message_provenance("user", &env), Provenance::Injected);
948 }
949
950 #[test]
951 fn legacy_rows_normalize_to_payloads() {
952 let ts = Utc::now();
953 let map: HashMap<String, Extracted<String>> = HashMap::new();
954
955 let first = json!({"id": "s1", "timestamp": "2025-09-13T04:30:17.447Z"});
957 let state = json!({"record_type": "state"});
958 assert!(
959 events_from_row("s1", 1, &first, ts, &map)
960 .expect("legacy first row parses")
961 .is_empty(),
962 );
963 assert!(
964 events_from_row("s1", 2, &state, ts, &map)
965 .expect("state marker parses")
966 .is_empty(),
967 );
968
969 let message = json!({
971 "type": "message",
972 "role": "user",
973 "content": [{"type": "input_text", "text": "hi"}],
974 });
975 let events = events_from_row("s1", 3, &message, ts, &map).expect("legacy message parses");
976 assert_eq!(events.len(), 2, "message + one Text Part");
977 assert!(matches!(
978 events[0],
979 IngestEvent::Message(Message::User { .. })
980 ));
981 assert!(matches!(
982 &events[1],
983 IngestEvent::Part(part) if matches!(part.kind, PartKind::Text { .. }),
984 ));
985 }
986
987 #[tokio::test(flavor = "multi_thread")]
988 async fn legacy_rollout_ingests_into_canonical_shape() -> anyhow::Result<()> {
989 let temp = TempDir::new()?;
990 let store = Store::open_local(temp.path()).await?;
991 let adapter = CodexCliAdapter::new(FIXTURES);
992 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
993
994 let session = store
997 .get_session("67c52f3f-d25e-4194-a006-93de58f28d7c")
998 .await?
999 .expect("legacy rollout ingests as a session");
1000 assert_eq!(session.session.source_agent, "codex-cli");
1001 assert_eq!(
1002 session
1003 .session
1004 .created_at
1005 .to_rfc3339_opts(SecondsFormat::Millis, true),
1006 "2025-09-13T04:30:17.447Z",
1007 );
1008 assert_eq!(session.messages.len(), 11, "every legacy data row ingests");
1010 let resolved = session.messages.iter().any(|message| {
1013 message
1014 .parts
1015 .iter()
1016 .any(|part| matches!(&part.kind, PartKind::ToolResult { name: Some(_), .. }))
1017 });
1018 assert!(
1019 resolved,
1020 "legacy function_call_output resolves its tool name"
1021 );
1022 Ok(())
1023 }
1024}