1use std::{
16 collections::HashMap,
17 path::{Path, PathBuf},
18};
19
20use chrono::{DateTime, Datelike, SecondsFormat, Utc};
21use serde_json::{Value, json};
22
23use crate::{
24 sessions::IngestEvent,
25 wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
26};
27
28use super::{
29 Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
30 RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
31 empty_options,
32 extract::{Extracted, extract_compact_repr, extract_raw_record, extract_self_str, extract_str},
33 extracted_text,
34 jsonl::{BoundedRow, JsonlTree, TAIL_CAP, jsonl_tree_discover, jsonl_tree_events, read_tail},
35 jsonl_bytes, part_id, part_ordinal, raw_record,
36};
37
38const NAME: &str = "codex-cli";
39
40pub struct CodexCliFactory;
43
44impl AdapterFactory for CodexCliFactory {
45 fn name(&self) -> &'static str {
46 NAME
47 }
48
49 fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50 Ok(Box::new(CodexCliAdapter::new(config_path(NAME, config)?)))
51 }
52
53 fn probe_default(&self, env: &Env) -> Option<Value> {
54 let path = env.home.join(".codex").join("sessions");
55 path.exists().then(|| json!({ "path": path }))
56 }
57
58 fn serialize(
59 &self,
60 session: &crate::sessions::SessionWithMessages,
61 fidelity: RestoreFidelity,
62 ) -> Result<Vec<RestoredFile>, AdapterError> {
63 serialize_session(session, fidelity)
64 }
65}
66
67fn serialize_session(
68 session: &crate::sessions::SessionWithMessages,
69 fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71 let mut records = Vec::new();
76 if fidelity == RestoreFidelity::Native
77 && let Some(raw) = raw_record(&session.session.options)
78 {
79 records.push(raw);
80 } else {
81 records.push(codex_session_meta(session));
82 }
83 let mut messages = session.messages.clone();
84 messages.sort_by(by_timestamp_then_id);
85 for message in &messages {
86 if fidelity == RestoreFidelity::Native
87 && let Some(raw) = raw_record(message.message.options())
88 {
89 records.push(raw);
90 continue;
91 }
92 if matches!(message.message, Message::System { .. }) {
97 continue;
98 }
99 records.push(codex_response_item(message));
100 }
101 Ok(vec![RestoredFile::new(
102 codex_relative_path(session),
103 jsonl_bytes(NAME, &records)?,
104 fidelity,
105 )])
106}
107
108fn codex_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
109 let ts = session.session.created_at;
110 let filename_ts = ts.format("%Y-%m-%dT%H-%M-%S");
111 PathBuf::from("sessions")
112 .join(format!("{:04}", ts.year()))
113 .join(format!("{:02}", ts.month()))
114 .join(format!("{:02}", ts.day()))
115 .join(format!(
116 "rollout-{filename_ts}-{}.jsonl",
117 session.session.id
118 ))
119}
120
121fn codex_session_meta(session: &crate::sessions::SessionWithMessages) -> Value {
122 json!({
123 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
124 "type": "session_meta",
125 "payload": {
126 "id": session.session.id,
127 "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
128 "cwd": &*session.session.project,
129 }
130 })
131}
132
133fn codex_response_item(message: &crate::sessions::MessageWithParts) -> Value {
134 json!({
135 "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
136 "type": "response_item",
137 "payload": codex_payload(message),
138 })
139}
140
141fn codex_payload(message: &crate::sessions::MessageWithParts) -> Value {
142 if let Some(part) = message.parts.first() {
143 match &part.kind {
144 PartKind::ToolCall {
145 call_id,
146 name,
147 params,
148 ..
149 } if matches!(message.message, Message::Assistant { .. }) => {
150 return json!({
151 "type": "function_call",
152 "call_id": extracted_text(call_id),
153 "name": extracted_text(name),
154 "arguments": compact_json(params),
155 });
156 }
157 PartKind::ToolResult {
158 call_id, result, ..
159 } if matches!(message.message, Message::Tool { .. }) => {
160 return json!({
161 "type": "function_call_output",
162 "call_id": extracted_text(call_id),
163 "output": result,
164 });
165 }
166 PartKind::Reasoning { text }
167 if matches!(message.message, Message::Assistant { .. }) =>
168 {
169 if let Some(text) = text
170 && let Ok(value) = serde_json::from_str::<Value>(text.as_ref())
171 {
172 return value;
173 }
174 return json!({
175 "type": "reasoning",
176 "summary": [{"type": "summary_text", "text": extracted_text(text)}],
177 });
178 }
179 _ => {}
180 }
181 }
182 let is_assistant = matches!(message.message, Message::Assistant { .. });
183 json!({
184 "type": "message",
185 "role": match message.message.role() {
186 crate::wire::Role::System => "developer",
187 crate::wire::Role::User => "user",
188 crate::wire::Role::Assistant => "assistant",
189 crate::wire::Role::Tool => "tool",
190 },
191 "content": message
192 .parts
193 .iter()
194 .map(|part| codex_content_part(part, is_assistant))
195 .collect::<Vec<_>>(),
196 })
197}
198
199fn codex_content_part(part: &Part, is_assistant: bool) -> Value {
200 let text_type = if is_assistant {
204 "output_text"
205 } else {
206 "input_text"
207 };
208 match &part.kind {
209 PartKind::Text { text } => json!({
210 "type": text_type,
211 "text": extracted_text(text),
212 }),
213 PartKind::File { data, .. } => json!({
214 "type": text_type,
215 "text": match data {
216 crate::wire::FileData::String(value) => value.clone(),
217 crate::wire::FileData::Bytes(value) => format!("<{} bytes>", value.len()),
218 crate::wire::FileData::Url(value) => value.clone(),
219 },
220 }),
221 other => json!({
222 "type": text_type,
223 "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
224 }),
225 }
226}
227
228#[derive(Debug, Clone)]
229pub struct CodexCliAdapter {
230 root: PathBuf,
231}
232
233impl CodexCliAdapter {
234 pub fn new(root: impl Into<PathBuf>) -> Self {
235 Self { root: root.into() }
236 }
237}
238
239impl Adapter for CodexCliAdapter {
240 fn discover(&self) -> DiscoverFuture<'_> {
241 jsonl_tree_discover(self)
242 }
243
244 fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
245 jsonl_tree_events(self, oracle)
246 }
247}
248
249impl JsonlTree for CodexCliAdapter {
250 type State = HashMap<String, Extracted<String>>;
251
252 fn name(&self) -> &'static str {
253 NAME
254 }
255
256 fn root(&self) -> &Path {
257 &self.root
258 }
259
260 fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
261 let row: Value = serde_json::from_str(first_line).ok()?;
262 if row.get("type").and_then(Value::as_str) == Some("session_meta") {
263 row.get("payload")?
264 .get("id")?
265 .as_str()
266 .map(ToOwned::to_owned)
267 } else if is_legacy_session_row(&row) {
268 row.get("id")?.as_str().map(ToOwned::to_owned)
269 } else {
270 None
271 }
272 }
273
274 fn peek_last_ts(&self, path: &Path) -> Option<i64> {
275 let tail = read_tail(path, TAIL_CAP)?;
282 tail.split(|&byte| byte == b'\n')
283 .rev()
284 .filter_map(|line| serde_json::from_slice::<Value>(line).ok())
285 .find(|row| row.get("type").and_then(Value::as_str) == Some("response_item"))
286 .and_then(|row| {
287 let text = row.get("timestamp").and_then(Value::as_str)?;
288 Some(
289 DateTime::parse_from_rfc3339(text)
290 .ok()?
291 .with_timezone(&Utc)
292 .timestamp_micros(),
293 )
294 })
295 }
296
297 fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
298 session_from_rows(path, rows)
299 }
300
301 fn events_from_row(
302 &self,
303 session: &Session,
304 row: &BoundedRow,
305 state: &mut Self::State,
306 ) -> Result<Vec<IngestEvent>, String> {
307 capture_tool_call_name(&row.value, state);
308 events_from_row(&session.id, row.line, &row.value, session.created_at, state)
309 }
310}
311
312fn is_legacy_session_row(row: &Value) -> bool {
317 row.get("type").is_none() && row.get("id").is_some()
318}
319
320fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
321 let path_display = path.display().to_string();
322 let first = rows
323 .first()
324 .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
325 let row = &first.value;
326 let at_first = format!("{path_display}:{}", first.line);
327 let payload = if row.get("type").and_then(Value::as_str) == Some("session_meta") {
331 row.get("payload").cloned().unwrap_or(Value::Null)
332 } else if is_legacy_session_row(row) {
333 row.clone()
334 } else {
335 return Err(AdapterError::schema(
336 NAME,
337 at_first,
338 "first row must be session_meta",
339 ));
340 };
341 let id = payload
342 .get("id")
343 .and_then(Value::as_str)
344 .ok_or_else(|| {
345 AdapterError::schema(NAME, at_first.clone(), "session_meta missing payload.id")
346 })?
347 .to_owned();
348 let created_at = payload
349 .get("timestamp")
350 .and_then(Value::as_str)
351 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
352 .map(|dt| dt.with_timezone(&Utc))
353 .or_else(|| {
354 row.get("timestamp")
355 .and_then(Value::as_str)
356 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
357 .map(|dt| dt.with_timezone(&Utc))
358 })
359 .ok_or_else(|| {
360 AdapterError::schema(NAME, at_first, "session_meta has no parseable timestamp")
361 })?;
362 let project = match extract_str(&payload, "cwd") {
363 Some(value) => value,
364 None => {
365 let path_str = path
366 .file_name()
367 .and_then(|n| n.to_str())
368 .unwrap_or(path_display.as_str())
369 .to_owned();
370 extract_self_str(&Value::String(path_str)).ok_or_else(|| {
371 AdapterError::schema(
372 NAME,
373 path_display.clone(),
374 "internal: Value::String produced None from Source::as_str",
375 )
376 })?
377 }
378 };
379 let mut options = ProviderOptions::new();
380 options.insert(
381 "source".to_owned(),
382 json!({
383 "adapter": "codex-cli",
384 "originator": payload.get("originator"),
385 "cli_version": payload.get("cli_version"),
386 "model_provider": payload.get("model_provider"),
387 "git": payload.get("git"),
388 "base_instructions": payload.get("base_instructions"),
389 "instructions": payload.get("instructions"),
390 "source": payload.get("source"),
391 "raw_record": extract_raw_record(row),
392 }),
393 );
394
395 Ok(Session {
396 id,
397 parent_session_id: None,
398 parent_message_id: None,
399 source_agent: "codex-cli".to_owned(),
400 created_at,
401 project,
402 options,
403 })
404}
405
406fn events_from_row(
415 session_id: &str,
416 line: usize,
417 row: &Value,
418 default_timestamp: DateTime<Utc>,
419 tool_call_names: &HashMap<String, Extracted<String>>,
420) -> Result<Vec<IngestEvent>, String> {
421 let kind = row.get("type").and_then(Value::as_str);
422 if kind == Some("session_meta")
426 || is_legacy_session_row(row)
427 || (kind.is_none() && row.get("record_type").is_some())
428 {
429 return Ok(Vec::new());
430 }
431 let (payload, timestamp) = if kind == Some("response_item") {
435 let timestamp = row
436 .get("timestamp")
437 .and_then(Value::as_str)
438 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
439 .map(|dt| dt.with_timezone(&Utc))
440 .unwrap_or(default_timestamp);
441 (row.get("payload").unwrap_or(&Value::Null), timestamp)
442 } else {
443 (row, default_timestamp)
444 };
445 let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
446 let message_id = format!("{session_id}:{line:06}");
447
448 match payload_type {
449 "message" => message_events(session_id, &message_id, timestamp, payload, row),
450 "function_call" => Ok(tool_call_events(
451 session_id,
452 &message_id,
453 timestamp,
454 payload,
455 row,
456 )),
457 "function_call_output" => Ok(tool_result_events(
458 session_id,
459 &message_id,
460 timestamp,
461 payload,
462 row,
463 tool_call_names,
464 )),
465 "reasoning" => Ok(reasoning_events(
466 session_id,
467 &message_id,
468 timestamp,
469 payload,
470 row,
471 )),
472 "custom_tool_call" => Ok(custom_tool_call_events(
473 session_id,
474 &message_id,
475 timestamp,
476 payload,
477 row,
478 )),
479 "custom_tool_call_output" => Ok(custom_tool_result_events(
480 session_id,
481 &message_id,
482 timestamp,
483 payload,
484 row,
485 )),
486 _ => Ok(vec![raw_carrier_event(session_id, line, row, timestamp)]),
487 }
488}
489
490fn row_options(row: &Value) -> ProviderOptions {
491 let mut options = ProviderOptions::new();
492 options.insert(
493 "source".to_owned(),
494 json!({ "raw_record": extract_raw_record(row) }),
495 );
496 options
497}
498
499fn raw_carrier_event(
500 session_id: &str,
501 line: usize,
502 row: &Value,
503 timestamp: DateTime<Utc>,
504) -> IngestEvent {
505 IngestEvent::Message(Message::System {
506 id: row
507 .get("id")
508 .and_then(Value::as_str)
509 .map_or_else(|| format!("{session_id}:{line:06}:raw"), ToOwned::to_owned),
510 session_id: session_id.to_owned(),
511 timestamp: row
512 .get("timestamp")
513 .and_then(Value::as_str)
514 .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
515 .map(|dt| dt.with_timezone(&Utc))
516 .unwrap_or(timestamp),
517 content: None,
518 options: row_options(row),
519 })
520}
521
522fn capture_tool_call_name(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
526 let payload = match row.get("type").and_then(Value::as_str) {
529 Some("response_item") => row.get("payload"),
530 Some(_) => Some(row),
531 None => None,
532 };
533 let Some(payload) = payload else {
534 return;
535 };
536 if payload.get("type").and_then(Value::as_str) != Some("function_call") {
537 return;
538 }
539 let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
540 return;
541 };
542 let Some(name) = extract_str(payload, "name") else {
543 return;
544 };
545 map.insert(call_id.to_owned(), name);
546}
547
548fn message_events(
549 session_id: &str,
550 message_id: &str,
551 timestamp: DateTime<Utc>,
552 payload: &Value,
553 row: &Value,
554) -> Result<Vec<IngestEvent>, String> {
555 let role = payload
556 .get("role")
557 .and_then(Value::as_str)
558 .ok_or_else(|| "message missing role".to_owned())?;
559 let Some(content) = payload.get("content").and_then(Value::as_array) else {
560 return Ok(vec![message_raw_carrier_event(
561 session_id, message_id, row, timestamp,
562 )]);
563 };
564 let provenance = message_provenance(role, content);
569 let mut parts = Vec::with_capacity(content.len());
570 for (ordinal, item) in content.iter().enumerate() {
571 let text = extract_str(item, "text").or_else(|| Some(extract_compact_repr(item)));
576 parts.push(Part {
577 session_id: session_id.to_owned(),
578 id: part_id(message_id, ordinal),
579 message_id: message_id.to_owned(),
580 ordinal: part_ordinal(ordinal),
581 provenance,
582 options: empty_options(),
583 kind: PartKind::Text { text },
584 });
585 }
586
587 let (message, keep_parts) = match role {
588 "user" => (
589 Message::User {
590 id: message_id.to_owned(),
591 session_id: session_id.to_owned(),
592 timestamp,
593 options: row_options(row),
594 },
595 true,
596 ),
597 "assistant" => (
598 Message::Assistant {
599 id: message_id.to_owned(),
600 session_id: session_id.to_owned(),
601 timestamp,
602 options: row_options(row),
603 },
604 true,
605 ),
606 "developer" | "system" => (
609 Message::System {
610 id: message_id.to_owned(),
611 session_id: session_id.to_owned(),
612 timestamp,
613 content: None,
614 options: row_options(row),
615 },
616 true,
617 ),
618 _ => {
619 return Ok(vec![message_raw_carrier_event(
620 session_id, message_id, row, timestamp,
621 )]);
622 }
623 };
624
625 let mut events = Vec::with_capacity(parts.len() + 1);
626 events.push(IngestEvent::Message(message));
627 if keep_parts {
628 events.extend(parts.into_iter().map(IngestEvent::Part));
629 }
630 Ok(events)
631}
632
633fn message_raw_carrier_event(
634 session_id: &str,
635 message_id: &str,
636 row: &Value,
637 timestamp: DateTime<Utc>,
638) -> IngestEvent {
639 IngestEvent::Message(Message::System {
640 id: message_id.to_owned(),
641 session_id: session_id.to_owned(),
642 timestamp,
643 content: row
644 .get("payload")
645 .and_then(|payload| payload.get("role"))
646 .or_else(|| row.get("role"))
647 .and_then(Value::as_str)
648 .and_then(|role| extract_self_str(&Value::String(role.to_owned()))),
649 options: row_options(row),
650 })
651}
652
653fn message_provenance(role: &str, content: &[Value]) -> Provenance {
659 if role == "developer" || role == "system" {
660 return Provenance::Injected;
661 }
662 if role == "user" {
663 let injected = content.iter().any(|item| {
664 item.get("text")
665 .and_then(Value::as_str)
666 .is_some_and(is_injected_user_text)
667 });
668 if injected {
669 return Provenance::Injected;
670 }
671 }
672 Provenance::Conversational
673}
674
675fn is_injected_user_text(text: &str) -> bool {
677 let trimmed = text.trim_start();
678 trimmed.starts_with("<environment_context>")
679 || trimmed.starts_with("<user_instructions>")
680 || trimmed.starts_with("# AGENTS.md")
681}
682
683fn tool_call_events(
684 session_id: &str,
685 message_id: &str,
686 timestamp: DateTime<Utc>,
687 payload: &Value,
688 row: &Value,
689) -> Vec<IngestEvent> {
690 let call_id = extract_str(payload, "call_id");
691 let name = extract_str(payload, "name");
692 let params = match payload.get("arguments") {
693 Some(Value::String(text)) => {
694 serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
695 }
696 Some(other) => other.clone(),
697 None => Value::Null,
698 };
699 let part = Part {
700 session_id: session_id.to_owned(),
701 id: part_id(message_id, 0),
702 message_id: message_id.to_owned(),
703 ordinal: 0,
704 provenance: Provenance::Conversational,
706 options: empty_options(),
707 kind: PartKind::ToolCall {
708 call_id,
709 name,
710 params,
711 provider_executed: false,
712 },
713 };
714 vec![
715 IngestEvent::Message(Message::Assistant {
716 id: message_id.to_owned(),
717 session_id: session_id.to_owned(),
718 timestamp,
719 options: row_options(row),
720 }),
721 IngestEvent::Part(part),
722 ]
723}
724
725fn custom_tool_call_events(
726 session_id: &str,
727 message_id: &str,
728 timestamp: DateTime<Utc>,
729 payload: &Value,
730 row: &Value,
731) -> Vec<IngestEvent> {
732 let part = Part {
733 session_id: session_id.to_owned(),
734 id: part_id(message_id, 0),
735 message_id: message_id.to_owned(),
736 ordinal: 0,
737 provenance: Provenance::Conversational,
739 options: empty_options(),
740 kind: PartKind::ToolCall {
741 call_id: extract_str(payload, "call_id"),
742 name: extract_str(payload, "name"),
743 params: payload.get("input").cloned().unwrap_or(Value::Null),
744 provider_executed: true,
745 },
746 };
747 vec![
748 IngestEvent::Message(Message::Assistant {
749 id: message_id.to_owned(),
750 session_id: session_id.to_owned(),
751 timestamp,
752 options: row_options(row),
753 }),
754 IngestEvent::Part(part),
755 ]
756}
757
758fn custom_tool_result_events(
759 session_id: &str,
760 message_id: &str,
761 timestamp: DateTime<Utc>,
762 payload: &Value,
763 row: &Value,
764) -> Vec<IngestEvent> {
765 let part = Part {
766 session_id: session_id.to_owned(),
767 id: part_id(message_id, 0),
768 message_id: message_id.to_owned(),
769 ordinal: 0,
770 provenance: Provenance::Injected,
772 options: empty_options(),
773 kind: PartKind::ToolResult {
774 call_id: extract_str(payload, "call_id"),
775 name: extract_str(payload, "name"),
776 is_failure: false,
777 result: payload.get("output").cloned().unwrap_or(Value::Null),
778 },
779 };
780 vec![
781 IngestEvent::Message(Message::Tool {
782 id: message_id.to_owned(),
783 session_id: session_id.to_owned(),
784 timestamp,
785 options: row_options(row),
786 }),
787 IngestEvent::Part(part),
788 ]
789}
790
791fn tool_result_events(
792 session_id: &str,
793 message_id: &str,
794 timestamp: DateTime<Utc>,
795 payload: &Value,
796 row: &Value,
797 tool_call_names: &HashMap<String, Extracted<String>>,
798) -> Vec<IngestEvent> {
799 let call_id = extract_str(payload, "call_id");
800 let name = call_id
804 .as_ref()
805 .and_then(|id| tool_call_names.get(id.as_str()))
806 .cloned();
807 let result = payload.get("output").cloned().unwrap_or(Value::Null);
808 let part = Part {
809 session_id: session_id.to_owned(),
810 id: part_id(message_id, 0),
811 message_id: message_id.to_owned(),
812 ordinal: 0,
813 provenance: Provenance::Injected,
815 options: empty_options(),
816 kind: PartKind::ToolResult {
817 call_id,
818 name,
819 is_failure: false,
820 result,
821 },
822 };
823 vec![
824 IngestEvent::Message(Message::Tool {
825 id: message_id.to_owned(),
826 session_id: session_id.to_owned(),
827 timestamp,
828 options: row_options(row),
829 }),
830 IngestEvent::Part(part),
831 ]
832}
833
834fn reasoning_events(
835 session_id: &str,
836 message_id: &str,
837 timestamp: DateTime<Utc>,
838 payload: &Value,
839 row: &Value,
840) -> Vec<IngestEvent> {
841 let summary = payload
845 .get("summary")
846 .and_then(Value::as_array)
847 .and_then(|items| {
848 let joined = items
849 .iter()
850 .filter_map(|item| extract_str(item, "text"))
851 .map(|e| (*e).clone())
852 .collect::<Vec<_>>()
853 .join("\n");
854 if joined.is_empty() {
855 None
856 } else {
857 Some(extract_compact_repr(payload))
858 }
859 });
860 let part = Part {
861 session_id: session_id.to_owned(),
862 id: part_id(message_id, 0),
863 message_id: message_id.to_owned(),
864 ordinal: 0,
865 provenance: Provenance::Conversational,
867 options: empty_options(),
868 kind: PartKind::Reasoning { text: summary },
869 };
870 vec![
871 IngestEvent::Message(Message::Assistant {
872 id: message_id.to_owned(),
873 session_id: session_id.to_owned(),
874 timestamp,
875 options: row_options(row),
876 }),
877 IngestEvent::Part(part),
878 ]
879}
880
881#[cfg(test)]
882mod tests {
883 #![allow(clippy::expect_used, clippy::unwrap_used)]
888
889 use super::*;
890 use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
891 use tempfile::TempDir;
892
893 const FIXTURES: &str = concat!(
896 env!("CARGO_MANIFEST_DIR"),
897 "/tests/fixtures/adapter/codex_cli/sessions"
898 );
899
900 #[test]
901 fn probe_default_finds_codex_sessions_under_home() -> anyhow::Result<()> {
902 crate::adapter::test_support::assert_probe_default(
903 &CodexCliFactory,
904 &[".codex", "sessions"],
905 )
906 }
907
908 #[test]
914 fn peek_last_ts_targets_last_response_item_ignoring_trailing_event_msg() {
915 let dir = TempDir::new().unwrap();
916 let path = dir.path().join("rollout.jsonl");
917 let lines = [
918 r#"{"type":"session_meta","timestamp":"2026-03-20T03:00:00.000Z","payload":{"id":"sess-x","timestamp":"2026-03-20T03:00:00.000Z"}}"#,
919 r#"{"type":"response_item","timestamp":"2026-03-20T03:10:00.000Z","payload":{"type":"message","role":"user","content":[{"type":"input_text","text":"hi"}]}}"#,
920 r#"{"type":"response_item","timestamp":"2026-03-20T03:20:30.500Z","payload":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"yo"}]}}"#,
921 r#"{"type":"event_msg","timestamp":"2026-03-20T03:59:59.000Z","payload":{"type":"token_count","info":{}}}"#,
924 ];
925 std::fs::write(&path, lines.join("\n") + "\n").unwrap();
926
927 let adapter = CodexCliAdapter::new(dir.path());
928 let expected = DateTime::parse_from_rfc3339("2026-03-20T03:20:30.500Z")
929 .unwrap()
930 .with_timezone(&Utc)
931 .timestamp_micros();
932 assert_eq!(adapter.peek_last_ts(&path), Some(expected));
933 }
934
935 #[tokio::test(flavor = "multi_thread")]
936 async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
937 let adapter = CodexCliAdapter::new(FIXTURES);
938 crate::adapter::test_support::assert_native_restore(
939 &CodexCliFactory,
940 &adapter,
941 std::path::Path::new(FIXTURES)
944 .parent()
945 .expect("FIXTURES is nested under a corpus root"),
946 )
947 .await
948 }
949
950 #[tokio::test(flavor = "multi_thread")]
951 async fn codex_cli_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
952 let temp = TempDir::new()?;
953 let store = Store::open_local(temp.path()).await?;
954 let adapter = CodexCliAdapter::new(FIXTURES);
955
956 let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
957 assert!(summary.accepted() > 0, "ingest must accept rows");
958 assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
959 assert_eq!(
960 summary.dropped_sessions, 0,
961 "no session-level rejections expected"
962 );
963 assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
964
965 let (sessions, messages, parts) = store.row_counts().await?;
966 assert!(sessions > 0, "at least one codex-cli session");
967 assert!(messages > 0, "at least one codex-cli message");
968 assert!(parts > 0, "at least one codex-cli Part");
969
970 let mut saw_text_part = false;
971 for session_id in store.session_ids().await? {
972 let session = store
973 .get_session(&session_id)
974 .await?
975 .expect("session round-trips");
976 assert_eq!(session.session.source_agent, "codex-cli");
977 assert!(
978 !session.messages.is_empty(),
979 "session {session_id} must carry messages",
980 );
981 for stored in &session.messages {
982 for part in &stored.parts {
983 if matches!(part.kind, PartKind::Text { .. }) {
984 saw_text_part = true;
985 }
986 }
987 }
988 }
989 assert!(
990 saw_text_part,
991 "codex-cli corpus must contain at least one Text Part",
992 );
993 Ok(())
994 }
995
996 #[test]
1000 fn message_provenance_separates_prompts_from_harness_records() {
1001 let prompt = vec![json!({"type": "input_text", "text": "refactor this"})];
1002 assert_eq!(
1003 message_provenance("user", &prompt),
1004 Provenance::Conversational,
1005 );
1006 assert_eq!(
1007 message_provenance("assistant", &[]),
1008 Provenance::Conversational,
1009 );
1010
1011 let developer = vec![json!({"type": "input_text", "text": "you are an agent"})];
1012 assert_eq!(
1013 message_provenance("developer", &developer),
1014 Provenance::Injected,
1015 );
1016
1017 let env = vec![json!({
1018 "type": "input_text",
1019 "text": "<environment_context>cwd=/tmp</environment_context>",
1020 })];
1021 assert_eq!(message_provenance("user", &env), Provenance::Injected);
1022 }
1023
1024 #[test]
1025 fn legacy_rows_normalize_to_payloads() {
1026 let ts = Utc::now();
1027 let map: HashMap<String, Extracted<String>> = HashMap::new();
1028
1029 let first = json!({"id": "s1", "timestamp": "2025-09-13T04:30:17.447Z"});
1031 let state = json!({"record_type": "state"});
1032 assert!(
1033 events_from_row("s1", 1, &first, ts, &map)
1034 .expect("legacy first row parses")
1035 .is_empty(),
1036 );
1037 assert!(
1038 events_from_row("s1", 2, &state, ts, &map)
1039 .expect("state marker parses")
1040 .is_empty(),
1041 );
1042
1043 let message = json!({
1045 "type": "message",
1046 "role": "user",
1047 "content": [{"type": "input_text", "text": "hi"}],
1048 });
1049 let events = events_from_row("s1", 3, &message, ts, &map).expect("legacy message parses");
1050 assert_eq!(events.len(), 2, "message + one Text Part");
1051 assert!(matches!(
1052 events[0],
1053 IngestEvent::Message(Message::User { .. })
1054 ));
1055 assert!(matches!(
1056 &events[1],
1057 IngestEvent::Part(part) if matches!(part.kind, PartKind::Text { .. }),
1058 ));
1059 }
1060
1061 #[test]
1062 fn unknown_message_role_becomes_lossless_carrier() {
1063 let ts = Utc::now();
1064 let map: HashMap<String, Extracted<String>> = HashMap::new();
1065 let row = json!({
1066 "type": "response_item",
1067 "timestamp": "2026-06-01T00:00:00Z",
1068 "payload": {
1069 "type": "message",
1070 "role": "future_role",
1071 "content": [{"type": "input_text", "text": "keep me"}],
1072 },
1073 });
1074
1075 let events = events_from_row("s1", 4, &row, ts, &map).expect("carrier is valid");
1076 assert_eq!(events.len(), 1);
1077 assert!(matches!(
1078 &events[0],
1079 IngestEvent::Message(Message::System { id, content, .. })
1080 if id == "s1:000004" && content.as_deref().map(String::as_str) == Some("future_role")
1081 ));
1082 }
1083
1084 #[tokio::test(flavor = "multi_thread")]
1085 async fn legacy_rollout_ingests_into_canonical_shape() -> anyhow::Result<()> {
1086 let temp = TempDir::new()?;
1087 let store = Store::open_local(temp.path()).await?;
1088 let adapter = CodexCliAdapter::new(FIXTURES);
1089 ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1090
1091 let session = store
1094 .get_session("67c52f3f-d25e-4194-a006-93de58f28d7c")
1095 .await?
1096 .expect("legacy rollout ingests as a session");
1097 assert_eq!(session.session.source_agent, "codex-cli");
1098 assert_eq!(
1099 session
1100 .session
1101 .created_at
1102 .to_rfc3339_opts(SecondsFormat::Millis, true),
1103 "2025-09-13T04:30:17.447Z",
1104 );
1105 assert_eq!(session.messages.len(), 11, "every legacy data row ingests");
1107 let resolved = session.messages.iter().any(|message| {
1110 message
1111 .parts
1112 .iter()
1113 .any(|part| matches!(&part.kind, PartKind::ToolResult { name: Some(_), .. }))
1114 });
1115 assert!(
1116 resolved,
1117 "legacy function_call_output resolves its tool name"
1118 );
1119 Ok(())
1120 }
1121}