Skip to main content

pond/adapter/
codex_cli.rs

1//! OpenAI Codex CLI adapter.
2//!
3//! Source path: `~/.codex/sessions/<year>/<month>/<day>/rollout-<ts>-<uuid>.jsonl`.
4//! Each line is an envelope `{timestamp, type, payload}`. Top-level types:
5//! `session_meta` (consumed up front for Session), `event_msg` /
6//! `turn_context` (transport noise, skipped), `response_item` (the per-turn
7//! model interaction: subtypes `message`, `reasoning`, `function_call`,
8//! `function_call_output`, `custom_tool_call`).
9//!
10//! Pre-Oct-2025 legacy rollouts (spec.md#adapters) predate the envelope: the
11//! first row is a bare metadata object and each data row is an un-enveloped
12//! payload, interleaved with `{record_type:"state"}` noise. The adapter
13//! accepts both shapes.
14
15use std::{
16    collections::HashMap,
17    path::{Path, PathBuf},
18};
19
20use chrono::{DateTime, Datelike, SecondsFormat, Utc};
21use serde_json::{Value, json};
22
23use crate::{
24    sessions::IngestEvent,
25    wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
26};
27
28use super::{
29    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
30    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
31    empty_options,
32    extract::{Extracted, extract_compact_repr, extract_raw_record, extract_self_str, extract_str},
33    extracted_text,
34    jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events},
35    jsonl_bytes, part_id, raw_record,
36};
37
38const NAME: &str = "codex-cli";
39
40/// Stateless factory: opens [`CodexCliAdapter`] instances and probes for the
41/// canonical install location under `~/.codex/sessions`.
42pub struct CodexCliFactory;
43
44impl AdapterFactory for CodexCliFactory {
45    fn name(&self) -> &'static str {
46        NAME
47    }
48
49    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50        Ok(Box::new(CodexCliAdapter::new(config_path(NAME, config)?)))
51    }
52
53    fn probe_default(&self, env: &Env) -> Option<Value> {
54        let path = env.home.join(".codex").join("sessions");
55        path.exists().then(|| json!({ "path": path }))
56    }
57
58    fn serialize(
59        &self,
60        session: &crate::sessions::SessionWithMessages,
61        fidelity: RestoreFidelity,
62    ) -> Result<Vec<RestoredFile>, AdapterError> {
63        serialize_session(session, fidelity)
64    }
65}
66
67fn serialize_session(
68    session: &crate::sessions::SessionWithMessages,
69    fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71    // Native replays verbatim `options.source.raw_record` rows (session_meta,
72    // then one per message); `codex_session_meta` / `codex_response_item` below
73    // are foreign-only. Replay echoes a frozen snapshot - safe only while
74    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
75    let mut records = Vec::new();
76    if fidelity == RestoreFidelity::Native
77        && let Some(raw) = raw_record(&session.session.options)
78    {
79        records.push(raw);
80    } else {
81        records.push(codex_session_meta(session));
82    }
83    let mut messages = session.messages.clone();
84    messages.sort_by(by_timestamp_then_id);
85    for message in &messages {
86        if fidelity == RestoreFidelity::Native
87            && let Some(raw) = raw_record(message.message.options())
88        {
89            records.push(raw);
90            continue;
91        }
92        // Foreign restore: a System message (a rule-3 carrier, or a source's
93        // own system/developer turn) has no idiomatic home in another
94        // client's transcript - drop it; the content stays in canonical
95        // (spec.md#adapter-native-restore-lossless, foreign clause).
96        if matches!(message.message, Message::System { .. }) {
97            continue;
98        }
99        records.push(codex_response_item(message));
100    }
101    Ok(vec![RestoredFile {
102        relative_path: codex_relative_path(session),
103        bytes: jsonl_bytes(NAME, &records)?,
104    }])
105}
106
107fn codex_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
108    let ts = session.session.created_at;
109    let filename_ts = ts.format("%Y-%m-%dT%H-%M-%S");
110    PathBuf::from("sessions")
111        .join(format!("{:04}", ts.year()))
112        .join(format!("{:02}", ts.month()))
113        .join(format!("{:02}", ts.day()))
114        .join(format!(
115            "rollout-{filename_ts}-{}.jsonl",
116            session.session.id
117        ))
118}
119
120fn codex_session_meta(session: &crate::sessions::SessionWithMessages) -> Value {
121    json!({
122        "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
123        "type": "session_meta",
124        "payload": {
125            "id": session.session.id,
126            "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
127            "cwd": &*session.session.project,
128        }
129    })
130}
131
132fn codex_response_item(message: &crate::sessions::MessageWithParts) -> Value {
133    json!({
134        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
135        "type": "response_item",
136        "payload": codex_payload(message),
137    })
138}
139
140fn codex_payload(message: &crate::sessions::MessageWithParts) -> Value {
141    if let Some(part) = message.parts.first() {
142        match &part.kind {
143            PartKind::ToolCall {
144                call_id,
145                name,
146                params,
147                ..
148            } if matches!(message.message, Message::Assistant { .. }) => {
149                return json!({
150                    "type": "function_call",
151                    "call_id": extracted_text(call_id),
152                    "name": extracted_text(name),
153                    "arguments": compact_json(params),
154                });
155            }
156            PartKind::ToolResult {
157                call_id, result, ..
158            } if matches!(message.message, Message::Tool { .. }) => {
159                return json!({
160                    "type": "function_call_output",
161                    "call_id": extracted_text(call_id),
162                    "output": result,
163                });
164            }
165            PartKind::Reasoning { text }
166                if matches!(message.message, Message::Assistant { .. }) =>
167            {
168                if let Some(text) = text
169                    && let Ok(value) = serde_json::from_str::<Value>(text.as_ref())
170                {
171                    return value;
172                }
173                return json!({
174                    "type": "reasoning",
175                    "summary": [{"type": "summary_text", "text": extracted_text(text)}],
176                });
177            }
178            _ => {}
179        }
180    }
181    let is_assistant = matches!(message.message, Message::Assistant { .. });
182    json!({
183        "type": "message",
184        "role": match message.message.role() {
185            crate::wire::Role::System => "developer",
186            crate::wire::Role::User => "user",
187            crate::wire::Role::Assistant => "assistant",
188            crate::wire::Role::Tool => "tool",
189        },
190        "content": message
191            .parts
192            .iter()
193            .map(|part| codex_content_part(part, is_assistant))
194            .collect::<Vec<_>>(),
195    })
196}
197
198fn codex_content_part(part: &Part, is_assistant: bool) -> Value {
199    // Codex tags an assistant turn's content `output_text` and a user or
200    // developer turn's content `input_text` - the discriminator is the
201    // owning message's role, not the part.
202    let text_type = if is_assistant {
203        "output_text"
204    } else {
205        "input_text"
206    };
207    match &part.kind {
208        PartKind::Text { text } => json!({
209            "type": text_type,
210            "text": extracted_text(text),
211        }),
212        PartKind::File { data, .. } => json!({
213            "type": text_type,
214            "text": match data {
215                crate::wire::FileData::String(value) => value.clone(),
216                crate::wire::FileData::Bytes(value) => format!("<{} bytes>", value.len()),
217                crate::wire::FileData::Url(value) => value.clone(),
218            },
219        }),
220        other => json!({
221            "type": text_type,
222            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
223        }),
224    }
225}
226
227#[derive(Debug, Clone)]
228pub struct CodexCliAdapter {
229    root: PathBuf,
230}
231
232impl CodexCliAdapter {
233    pub fn new(root: impl Into<PathBuf>) -> Self {
234        Self { root: root.into() }
235    }
236}
237
238impl Adapter for CodexCliAdapter {
239    fn discover(&self) -> DiscoverFuture<'_> {
240        jsonl_tree_discover(self)
241    }
242
243    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
244        jsonl_tree_events(self, oracle)
245    }
246}
247
248impl JsonlTree for CodexCliAdapter {
249    type State = HashMap<String, Extracted<String>>;
250
251    fn name(&self) -> &'static str {
252        NAME
253    }
254
255    fn root(&self) -> &Path {
256        &self.root
257    }
258
259    fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
260        let row: Value = serde_json::from_str(first_line).ok()?;
261        if row.get("type").and_then(Value::as_str) == Some("session_meta") {
262            row.get("payload")?
263                .get("id")?
264                .as_str()
265                .map(ToOwned::to_owned)
266        } else if is_legacy_session_row(&row) {
267            row.get("id")?.as_str().map(ToOwned::to_owned)
268        } else {
269            None
270        }
271    }
272
273    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
274        session_from_rows(path, rows)
275    }
276
277    fn events_from_row(
278        &self,
279        session: &Session,
280        row: &BoundedRow,
281        state: &mut Self::State,
282    ) -> Result<Vec<IngestEvent>, String> {
283        capture_tool_call_name(&row.value, state);
284        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
285    }
286}
287
288/// True for a pre-Oct-2025 legacy rollout's bare first row: session metadata
289/// (`id`/`timestamp`/`git`/`instructions`) at the top level with no `type`
290/// envelope. spec.md#adapters: legacy rollouts predate the `session_meta`
291/// wrapper, so the first row IS the payload.
292fn is_legacy_session_row(row: &Value) -> bool {
293    row.get("type").is_none() && row.get("id").is_some()
294}
295
296fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
297    let path_display = path.display().to_string();
298    let first = rows
299        .first()
300        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
301    let row = &first.value;
302    let at_first = format!("{path_display}:{}", first.line);
303    // The current rollout wraps session metadata in a `session_meta` envelope;
304    // a legacy rollout (spec.md#adapters) has none - the first row is a bare
305    // metadata object. Either way, read fields from `payload`.
306    let payload = if row.get("type").and_then(Value::as_str) == Some("session_meta") {
307        row.get("payload").cloned().unwrap_or(Value::Null)
308    } else if is_legacy_session_row(row) {
309        row.clone()
310    } else {
311        return Err(AdapterError::schema(
312            NAME,
313            at_first,
314            "first row must be session_meta",
315        ));
316    };
317    let id = payload
318        .get("id")
319        .and_then(Value::as_str)
320        .ok_or_else(|| {
321            AdapterError::schema(NAME, at_first.clone(), "session_meta missing payload.id")
322        })?
323        .to_owned();
324    let created_at = payload
325        .get("timestamp")
326        .and_then(Value::as_str)
327        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
328        .map(|dt| dt.with_timezone(&Utc))
329        .or_else(|| {
330            row.get("timestamp")
331                .and_then(Value::as_str)
332                .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
333                .map(|dt| dt.with_timezone(&Utc))
334        })
335        .ok_or_else(|| {
336            AdapterError::schema(NAME, at_first, "session_meta has no parseable timestamp")
337        })?;
338    let project = match extract_str(&payload, "cwd") {
339        Some(value) => value,
340        None => {
341            let path_str = path
342                .file_name()
343                .and_then(|n| n.to_str())
344                .unwrap_or(path_display.as_str())
345                .to_owned();
346            extract_self_str(&Value::String(path_str)).ok_or_else(|| {
347                AdapterError::schema(
348                    NAME,
349                    path_display.clone(),
350                    "internal: Value::String produced None from Source::as_str",
351                )
352            })?
353        }
354    };
355    let mut options = ProviderOptions::new();
356    options.insert(
357        "source".to_owned(),
358        json!({
359            "adapter": "codex-cli",
360            "originator": payload.get("originator"),
361            "cli_version": payload.get("cli_version"),
362            "model_provider": payload.get("model_provider"),
363            "git": payload.get("git"),
364            "base_instructions": payload.get("base_instructions"),
365            "instructions": payload.get("instructions"),
366            "source": payload.get("source"),
367            "raw_record": extract_raw_record(row),
368        }),
369    );
370
371    Ok(Session {
372        id,
373        parent_session_id: None,
374        parent_message_id: None,
375        source_agent: "codex-cli".to_owned(),
376        created_at,
377        project,
378        options,
379    })
380}
381
382/// Map one codex-cli JSONL record into zero-or-more `IngestEvent`s. Records pond
383/// keeps: `response_item` with `payload.type = "message"` (User/Assistant/
384/// System message + text Parts), `function_call` (Assistant + ToolCall),
385/// `function_call_output` (Tool + ToolResult), `reasoning` (Assistant +
386/// Reasoning Part). `session_meta` is consumed up front; `event_msg` and
387/// `turn_context` are transport noise. Legacy rows (spec.md#adapters) carry
388/// the same payload shapes un-enveloped; `{record_type:"state"}` markers and
389/// the bare first row are eventless.
390fn events_from_row(
391    session_id: &str,
392    line: usize,
393    row: &Value,
394    default_timestamp: DateTime<Utc>,
395    tool_call_names: &HashMap<String, Extracted<String>>,
396) -> Result<Vec<IngestEvent>, String> {
397    let kind = row.get("type").and_then(Value::as_str);
398    // Eventless rows: `session_meta` (current) and the legacy bare first row
399    // are both consumed up front by session_meta(); legacy
400    // `{record_type:"state"}` markers are transport noise (spec.md#adapters).
401    if kind == Some("session_meta")
402        || is_legacy_session_row(row)
403        || (kind.is_none() && row.get("record_type").is_some())
404    {
405        return Ok(Vec::new());
406    }
407    // Normalize to the per-turn payload. A current row wraps it in a
408    // `response_item` envelope carrying its own timestamp; a legacy data row
409    // IS the payload (spec.md#adapters) and inherits the session timestamp.
410    let (payload, timestamp) = if kind == Some("response_item") {
411        let timestamp = row
412            .get("timestamp")
413            .and_then(Value::as_str)
414            .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
415            .map(|dt| dt.with_timezone(&Utc))
416            .unwrap_or(default_timestamp);
417        (row.get("payload").unwrap_or(&Value::Null), timestamp)
418    } else {
419        (row, default_timestamp)
420    };
421    let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
422    let message_id = format!("{session_id}:{line:06}");
423
424    match payload_type {
425        "message" => message_events(session_id, &message_id, timestamp, payload, row),
426        "function_call" => Ok(tool_call_events(
427            session_id,
428            &message_id,
429            timestamp,
430            payload,
431            row,
432        )),
433        "function_call_output" => Ok(tool_result_events(
434            session_id,
435            &message_id,
436            timestamp,
437            payload,
438            row,
439            tool_call_names,
440        )),
441        "reasoning" => Ok(reasoning_events(
442            session_id,
443            &message_id,
444            timestamp,
445            payload,
446            row,
447        )),
448        "custom_tool_call" => Ok(custom_tool_call_events(
449            session_id,
450            &message_id,
451            timestamp,
452            payload,
453            row,
454        )),
455        "custom_tool_call_output" => Ok(custom_tool_result_events(
456            session_id,
457            &message_id,
458            timestamp,
459            payload,
460            row,
461        )),
462        _ => Ok(vec![raw_carrier_event(session_id, line, row, timestamp)]),
463    }
464}
465
466fn row_options(row: &Value) -> ProviderOptions {
467    let mut options = ProviderOptions::new();
468    options.insert(
469        "source".to_owned(),
470        json!({ "raw_record": extract_raw_record(row) }),
471    );
472    options
473}
474
475fn raw_carrier_event(
476    session_id: &str,
477    line: usize,
478    row: &Value,
479    timestamp: DateTime<Utc>,
480) -> IngestEvent {
481    IngestEvent::Message(Message::System {
482        id: row
483            .get("id")
484            .and_then(Value::as_str)
485            .map_or_else(|| format!("{session_id}:{line:06}:raw"), ToOwned::to_owned),
486        session_id: session_id.to_owned(),
487        timestamp: row
488            .get("timestamp")
489            .and_then(Value::as_str)
490            .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
491            .map(|dt| dt.with_timezone(&Utc))
492            .unwrap_or(timestamp),
493        content: None,
494        options: row_options(row),
495    })
496}
497
498/// Stash one row's `function_call` (call_id -> name) into the per-file
499/// map so the matching `function_call_output` row downstream can resolve
500/// the tool name rather than fall back to a sentinel.
501fn capture_tool_call_name(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
502    // Mirror events_from_row's payload normalization: a current row wraps the
503    // payload under `response_item`, a legacy row IS the payload.
504    let payload = match row.get("type").and_then(Value::as_str) {
505        Some("response_item") => row.get("payload"),
506        Some(_) => Some(row),
507        None => None,
508    };
509    let Some(payload) = payload else {
510        return;
511    };
512    if payload.get("type").and_then(Value::as_str) != Some("function_call") {
513        return;
514    }
515    let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
516        return;
517    };
518    let Some(name) = extract_str(payload, "name") else {
519        return;
520    };
521    map.insert(call_id.to_owned(), name);
522}
523
524fn message_events(
525    session_id: &str,
526    message_id: &str,
527    timestamp: DateTime<Utc>,
528    payload: &Value,
529    row: &Value,
530) -> Result<Vec<IngestEvent>, String> {
531    let role = payload
532        .get("role")
533        .and_then(Value::as_str)
534        .ok_or_else(|| "message missing role".to_owned())?;
535    let content = payload
536        .get("content")
537        .and_then(Value::as_array)
538        .cloned()
539        .unwrap_or_default();
540    // spec.md#model-part-provenance: a `developer` record is a harness instruction
541    // block; a `user`-slot record whose body is `<environment_context>` or an
542    // `# AGENTS.md instructions` blob is injected context, not a genuine
543    // prompt. Everything else in a message record is conversation.
544    let provenance = message_provenance(role, &content);
545    let mut parts = Vec::with_capacity(content.len());
546    for (ordinal, item) in content.iter().enumerate() {
547        // Faithful encoding of one content item: prefer the raw `text`
548        // field when present; otherwise compact-encode the structured
549        // body as a JSON string. The fallback is lossless (preserves the
550        // item bytes) and explicit (not a synthesised "unknown" or "").
551        let text = extract_str(item, "text").or_else(|| Some(extract_compact_repr(item)));
552        parts.push(Part {
553            session_id: session_id.to_owned(),
554            id: part_id(message_id, ordinal),
555            message_id: message_id.to_owned(),
556            ordinal: i32::try_from(ordinal).unwrap_or(i32::MAX),
557            provenance,
558            options: empty_options(),
559            kind: PartKind::Text { text },
560        });
561    }
562
563    let (message, keep_parts) = match role {
564        "user" => (
565            Message::User {
566                id: message_id.to_owned(),
567                session_id: session_id.to_owned(),
568                timestamp,
569                options: row_options(row),
570            },
571            true,
572        ),
573        "assistant" => (
574            Message::Assistant {
575                id: message_id.to_owned(),
576                session_id: session_id.to_owned(),
577                timestamp,
578                options: row_options(row),
579            },
580            true,
581        ),
582        // `developer` rows are codex-cli's system-prompt frames; map to System
583        // with `content: None` and let the inner Text Parts carry the body.
584        "developer" | "system" => (
585            Message::System {
586                id: message_id.to_owned(),
587                session_id: session_id.to_owned(),
588                timestamp,
589                content: None,
590                options: row_options(row),
591            },
592            true,
593        ),
594        other => return Err(format!("unsupported codex-cli role {other}")),
595    };
596
597    let mut events = Vec::with_capacity(parts.len() + 1);
598    events.push(IngestEvent::Message(message));
599    if keep_parts {
600        events.extend(parts.into_iter().map(IngestEvent::Part));
601    }
602    Ok(events)
603}
604
605/// Provenance of a codex `message` record (spec.md#model-part-provenance). A
606/// `developer` record is a harness instruction block; a `user`-slot record
607/// whose only content is `<environment_context>` or `# AGENTS.md instructions`
608/// is injected context rather than a typed prompt. v1 codex never interleaves
609/// authored and injected content within one record.
610fn message_provenance(role: &str, content: &[Value]) -> Provenance {
611    if role == "developer" || role == "system" {
612        return Provenance::Injected;
613    }
614    if role == "user" {
615        let injected = content.iter().any(|item| {
616            item.get("text")
617                .and_then(Value::as_str)
618                .is_some_and(is_injected_user_text)
619        });
620        if injected {
621            return Provenance::Injected;
622        }
623    }
624    Provenance::Conversational
625}
626
627/// Harness-injected user-slot content codex emits as a non-prompt record.
628fn is_injected_user_text(text: &str) -> bool {
629    let trimmed = text.trim_start();
630    trimmed.starts_with("<environment_context>")
631        || trimmed.starts_with("<user_instructions>")
632        || trimmed.starts_with("# AGENTS.md")
633}
634
635fn tool_call_events(
636    session_id: &str,
637    message_id: &str,
638    timestamp: DateTime<Utc>,
639    payload: &Value,
640    row: &Value,
641) -> Vec<IngestEvent> {
642    let call_id = extract_str(payload, "call_id");
643    let name = extract_str(payload, "name");
644    let params = match payload.get("arguments") {
645        Some(Value::String(text)) => {
646            serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
647        }
648        Some(other) => other.clone(),
649        None => Value::Null,
650    };
651    let part = Part {
652        session_id: session_id.to_owned(),
653        id: part_id(message_id, 0),
654        message_id: message_id.to_owned(),
655        ordinal: 0,
656        // spec.md#model-part-provenance: the model authored the tool call.
657        provenance: Provenance::Conversational,
658        options: empty_options(),
659        kind: PartKind::ToolCall {
660            call_id,
661            name,
662            params,
663            provider_executed: false,
664        },
665    };
666    vec![
667        IngestEvent::Message(Message::Assistant {
668            id: message_id.to_owned(),
669            session_id: session_id.to_owned(),
670            timestamp,
671            options: row_options(row),
672        }),
673        IngestEvent::Part(part),
674    ]
675}
676
677fn custom_tool_call_events(
678    session_id: &str,
679    message_id: &str,
680    timestamp: DateTime<Utc>,
681    payload: &Value,
682    row: &Value,
683) -> Vec<IngestEvent> {
684    let part = Part {
685        session_id: session_id.to_owned(),
686        id: part_id(message_id, 0),
687        message_id: message_id.to_owned(),
688        ordinal: 0,
689        // spec.md#model-part-provenance: the model authored the tool call.
690        provenance: Provenance::Conversational,
691        options: empty_options(),
692        kind: PartKind::ToolCall {
693            call_id: extract_str(payload, "call_id"),
694            name: extract_str(payload, "name"),
695            params: payload.get("input").cloned().unwrap_or(Value::Null),
696            provider_executed: true,
697        },
698    };
699    vec![
700        IngestEvent::Message(Message::Assistant {
701            id: message_id.to_owned(),
702            session_id: session_id.to_owned(),
703            timestamp,
704            options: row_options(row),
705        }),
706        IngestEvent::Part(part),
707    ]
708}
709
710fn custom_tool_result_events(
711    session_id: &str,
712    message_id: &str,
713    timestamp: DateTime<Utc>,
714    payload: &Value,
715    row: &Value,
716) -> Vec<IngestEvent> {
717    let part = Part {
718        session_id: session_id.to_owned(),
719        id: part_id(message_id, 0),
720        message_id: message_id.to_owned(),
721        ordinal: 0,
722        // spec.md#model-part-provenance: tool output is runtime-produced.
723        provenance: Provenance::Injected,
724        options: empty_options(),
725        kind: PartKind::ToolResult {
726            call_id: extract_str(payload, "call_id"),
727            name: extract_str(payload, "name"),
728            is_failure: false,
729            result: payload.get("output").cloned().unwrap_or(Value::Null),
730        },
731    };
732    vec![
733        IngestEvent::Message(Message::Tool {
734            id: message_id.to_owned(),
735            session_id: session_id.to_owned(),
736            timestamp,
737            options: row_options(row),
738        }),
739        IngestEvent::Part(part),
740    ]
741}
742
743fn tool_result_events(
744    session_id: &str,
745    message_id: &str,
746    timestamp: DateTime<Utc>,
747    payload: &Value,
748    row: &Value,
749    tool_call_names: &HashMap<String, Extracted<String>>,
750) -> Vec<IngestEvent> {
751    let call_id = extract_str(payload, "call_id");
752    // Resolve tool name from the earlier `function_call` row via the
753    // per-file `call_id -> name` map. Misses (e.g. compaction pruned the
754    // originating call) yield `None`, a faithful "unresolved" value.
755    let name = call_id
756        .as_ref()
757        .and_then(|id| tool_call_names.get(id.as_str()))
758        .cloned();
759    let result = payload.get("output").cloned().unwrap_or(Value::Null);
760    let part = Part {
761        session_id: session_id.to_owned(),
762        id: part_id(message_id, 0),
763        message_id: message_id.to_owned(),
764        ordinal: 0,
765        // spec.md#model-part-provenance: tool output is runtime-produced.
766        provenance: Provenance::Injected,
767        options: empty_options(),
768        kind: PartKind::ToolResult {
769            call_id,
770            name,
771            is_failure: false,
772            result,
773        },
774    };
775    vec![
776        IngestEvent::Message(Message::Tool {
777            id: message_id.to_owned(),
778            session_id: session_id.to_owned(),
779            timestamp,
780            options: row_options(row),
781        }),
782        IngestEvent::Part(part),
783    ]
784}
785
786fn reasoning_events(
787    session_id: &str,
788    message_id: &str,
789    timestamp: DateTime<Utc>,
790    payload: &Value,
791    row: &Value,
792) -> Vec<IngestEvent> {
793    // The source `summary` array is the only place reasoning text lives in
794    // codex-cli's format. Empty array (or missing field) -> `None`. Joined
795    // text -> `Some(...)`. Don't synthesize an empty string.
796    let summary = payload
797        .get("summary")
798        .and_then(Value::as_array)
799        .and_then(|items| {
800            let joined = items
801                .iter()
802                .filter_map(|item| extract_str(item, "text"))
803                .map(|e| (*e).clone())
804                .collect::<Vec<_>>()
805                .join("\n");
806            if joined.is_empty() {
807                None
808            } else {
809                Some(extract_compact_repr(payload))
810            }
811        });
812    let part = Part {
813        session_id: session_id.to_owned(),
814        id: part_id(message_id, 0),
815        message_id: message_id.to_owned(),
816        ordinal: 0,
817        // spec.md#model-part-provenance: model-authored reasoning.
818        provenance: Provenance::Conversational,
819        options: empty_options(),
820        kind: PartKind::Reasoning { text: summary },
821    };
822    vec![
823        IngestEvent::Message(Message::Assistant {
824            id: message_id.to_owned(),
825            session_id: session_id.to_owned(),
826            timestamp,
827            options: row_options(row),
828        }),
829        IngestEvent::Part(part),
830    ]
831}
832
833#[cfg(test)]
834mod tests {
835    //! End-to-end test for the codex-cli adapter: ingest the committed fixture
836    //! corpus and assert pond's canonical Session/Message/Part shape comes out
837    //! the other side. The fixture lives under
838    //! `tests/fixtures/adapter/codex_cli/`.
839    #![allow(clippy::expect_used, clippy::unwrap_used)]
840
841    use super::*;
842    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
843    use tempfile::TempDir;
844
845    const FIXTURES: &str = "tests/fixtures/adapter/codex_cli/sessions";
846
847    #[tokio::test(flavor = "multi_thread")]
848    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
849        let adapter = CodexCliAdapter::new(FIXTURES);
850        crate::adapter::test_support::assert_native_restore(
851            &CodexCliFactory,
852            &adapter,
853            // Codex rollout paths embed the `sessions/` segment, so the corpus
854            // root is FIXTURES' parent, not FIXTURES itself.
855            std::path::Path::new(FIXTURES)
856                .parent()
857                .expect("FIXTURES is nested under a corpus root"),
858        )
859        .await
860    }
861
862    #[tokio::test(flavor = "multi_thread")]
863    async fn codex_cli_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
864        let temp = TempDir::new()?;
865        let store = Store::open_local(temp.path()).await?;
866        let adapter = CodexCliAdapter::new(FIXTURES);
867
868        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
869        assert!(summary.accepted() > 0, "ingest must accept rows");
870        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
871        assert_eq!(
872            summary.dropped_sessions, 0,
873            "no session-level rejections expected"
874        );
875        assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
876
877        let (sessions, messages, parts) = store.row_counts().await?;
878        assert!(sessions > 0, "at least one codex-cli session");
879        assert!(messages > 0, "at least one codex-cli message");
880        assert!(parts > 0, "at least one codex-cli Part");
881
882        let mut saw_text_part = false;
883        for session_id in store.session_ids().await? {
884            let session = store
885                .get_session(&session_id)
886                .await?
887                .expect("session round-trips");
888            assert_eq!(session.session.source_agent, "codex-cli");
889            assert!(
890                !session.messages.is_empty(),
891                "session {session_id} must carry messages",
892            );
893            for stored in &session.messages {
894                for part in &stored.parts {
895                    if matches!(part.kind, PartKind::Text { .. }) {
896                        saw_text_part = true;
897                    }
898                }
899            }
900        }
901        assert!(
902            saw_text_part,
903            "codex-cli corpus must contain at least one Text Part",
904        );
905        Ok(())
906    }
907
908    /// spec.md#model-part-provenance: a `developer` record and a `user`-slot record
909    /// whose body is `<environment_context>` are harness-injected; a genuine
910    /// user prompt and an assistant message are conversation.
911    #[test]
912    fn message_provenance_separates_prompts_from_harness_records() {
913        let prompt = vec![json!({"type": "input_text", "text": "refactor this"})];
914        assert_eq!(
915            message_provenance("user", &prompt),
916            Provenance::Conversational,
917        );
918        assert_eq!(
919            message_provenance("assistant", &[]),
920            Provenance::Conversational,
921        );
922
923        let developer = vec![json!({"type": "input_text", "text": "you are an agent"})];
924        assert_eq!(
925            message_provenance("developer", &developer),
926            Provenance::Injected,
927        );
928
929        let env = vec![json!({
930            "type": "input_text",
931            "text": "<environment_context>cwd=/tmp</environment_context>",
932        })];
933        assert_eq!(message_provenance("user", &env), Provenance::Injected);
934    }
935
936    #[test]
937    fn legacy_rows_normalize_to_payloads() {
938        let ts = Utc::now();
939        let map: HashMap<String, Extracted<String>> = HashMap::new();
940
941        // The bare first row and `{record_type:"state"}` markers are eventless.
942        let first = json!({"id": "s1", "timestamp": "2025-09-13T04:30:17.447Z"});
943        let state = json!({"record_type": "state"});
944        assert!(
945            events_from_row("s1", 1, &first, ts, &map)
946                .expect("legacy first row parses")
947                .is_empty(),
948        );
949        assert!(
950            events_from_row("s1", 2, &state, ts, &map)
951                .expect("state marker parses")
952                .is_empty(),
953        );
954
955        // An un-enveloped legacy `message` row yields a Message + Text Part.
956        let message = json!({
957            "type": "message",
958            "role": "user",
959            "content": [{"type": "input_text", "text": "hi"}],
960        });
961        let events = events_from_row("s1", 3, &message, ts, &map).expect("legacy message parses");
962        assert_eq!(events.len(), 2, "message + one Text Part");
963        assert!(matches!(
964            events[0],
965            IngestEvent::Message(Message::User { .. })
966        ));
967        assert!(matches!(
968            &events[1],
969            IngestEvent::Part(part) if matches!(part.kind, PartKind::Text { .. }),
970        ));
971    }
972
973    #[tokio::test(flavor = "multi_thread")]
974    async fn legacy_rollout_ingests_into_canonical_shape() -> anyhow::Result<()> {
975        let temp = TempDir::new()?;
976        let store = Store::open_local(temp.path()).await?;
977        let adapter = CodexCliAdapter::new(FIXTURES);
978        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
979
980        // The legacy fixture's bare first row -> Session: id and timestamp
981        // read from the top level, with no `session_meta` envelope.
982        let session = store
983            .get_session("67c52f3f-d25e-4194-a006-93de58f28d7c")
984            .await?
985            .expect("legacy rollout ingests as a session");
986        assert_eq!(session.session.source_agent, "codex-cli");
987        assert_eq!(
988            session
989                .session
990                .created_at
991                .to_rfc3339_opts(SecondsFormat::Millis, true),
992            "2025-09-13T04:30:17.447Z",
993        );
994        // Eleven un-enveloped data rows -> eleven messages.
995        assert_eq!(session.messages.len(), 11, "every legacy data row ingests");
996        // The legacy `function_call_output` resolves its tool name from the
997        // prior legacy `function_call` row via the per-file call_id map.
998        let resolved = session.messages.iter().any(|message| {
999            message
1000                .parts
1001                .iter()
1002                .any(|part| matches!(&part.kind, PartKind::ToolResult { name: Some(_), .. }))
1003        });
1004        assert!(
1005            resolved,
1006            "legacy function_call_output resolves its tool name"
1007        );
1008        Ok(())
1009    }
1010}