Skip to main content

pond/adapter/
codex_cli.rs

1//! OpenAI Codex CLI adapter.
2//!
3//! Source path: `~/.codex/sessions/<year>/<month>/<day>/rollout-<ts>-<uuid>.jsonl`.
4//! Each line is an envelope `{timestamp, type, payload}`. Top-level types:
5//! `session_meta` (consumed up front for Session), `event_msg` /
6//! `turn_context` (transport noise, skipped), `response_item` (the per-turn
7//! model interaction: subtypes `message`, `reasoning`, `function_call`,
8//! `function_call_output`, `custom_tool_call`).
9//!
10//! Pre-Oct-2025 legacy rollouts (spec.md#adapters) predate the envelope: the
11//! first row is a bare metadata object and each data row is an un-enveloped
12//! payload, interleaved with `{record_type:"state"}` noise. The adapter
13//! accepts both shapes.
14
15use std::{
16    collections::HashMap,
17    path::{Path, PathBuf},
18};
19
20use chrono::{DateTime, Datelike, SecondsFormat, Utc};
21use serde_json::{Value, json};
22
23use crate::{
24    sessions::IngestEvent,
25    wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
26};
27
28use super::{
29    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
30    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
31    empty_options,
32    extract::{Extracted, extract_compact_repr, extract_raw_record, extract_self_str, extract_str},
33    extracted_text,
34    jsonl::{BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events},
35    jsonl_bytes, part_id, part_ordinal, raw_record,
36};
37
38const NAME: &str = "codex-cli";
39
40/// Stateless factory: opens [`CodexCliAdapter`] instances and probes for the
41/// canonical install location under `~/.codex/sessions`.
42pub struct CodexCliFactory;
43
44impl AdapterFactory for CodexCliFactory {
45    fn name(&self) -> &'static str {
46        NAME
47    }
48
49    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50        Ok(Box::new(CodexCliAdapter::new(config_path(NAME, config)?)))
51    }
52
53    fn probe_default(&self, env: &Env) -> Option<Value> {
54        let path = env.home.join(".codex").join("sessions");
55        path.exists().then(|| json!({ "path": path }))
56    }
57
58    fn serialize(
59        &self,
60        session: &crate::sessions::SessionWithMessages,
61        fidelity: RestoreFidelity,
62    ) -> Result<Vec<RestoredFile>, AdapterError> {
63        serialize_session(session, fidelity)
64    }
65}
66
67fn serialize_session(
68    session: &crate::sessions::SessionWithMessages,
69    fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71    // Native replays verbatim `options.source.raw_record` rows (session_meta,
72    // then one per message); `codex_session_meta` / `codex_response_item` below
73    // are foreign-only. Replay echoes a frozen snapshot - safe only while
74    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
75    let mut records = Vec::new();
76    if fidelity == RestoreFidelity::Native
77        && let Some(raw) = raw_record(&session.session.options)
78    {
79        records.push(raw);
80    } else {
81        records.push(codex_session_meta(session));
82    }
83    let mut messages = session.messages.clone();
84    messages.sort_by(by_timestamp_then_id);
85    for message in &messages {
86        if fidelity == RestoreFidelity::Native
87            && let Some(raw) = raw_record(message.message.options())
88        {
89            records.push(raw);
90            continue;
91        }
92        // Foreign restore: a System message (a rule-3 carrier, or a source's
93        // own system/developer turn) has no idiomatic home in another
94        // client's transcript - drop it; the content stays in canonical
95        // (spec.md#adapter-native-restore-lossless, foreign clause).
96        if matches!(message.message, Message::System { .. }) {
97            continue;
98        }
99        records.push(codex_response_item(message));
100    }
101    Ok(vec![RestoredFile::new(
102        codex_relative_path(session),
103        jsonl_bytes(NAME, &records)?,
104        fidelity,
105    )])
106}
107
108fn codex_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
109    let ts = session.session.created_at;
110    let filename_ts = ts.format("%Y-%m-%dT%H-%M-%S");
111    PathBuf::from("sessions")
112        .join(format!("{:04}", ts.year()))
113        .join(format!("{:02}", ts.month()))
114        .join(format!("{:02}", ts.day()))
115        .join(format!(
116            "rollout-{filename_ts}-{}.jsonl",
117            session.session.id
118        ))
119}
120
121fn codex_session_meta(session: &crate::sessions::SessionWithMessages) -> Value {
122    json!({
123        "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
124        "type": "session_meta",
125        "payload": {
126            "id": session.session.id,
127            "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
128            "cwd": &*session.session.project,
129        }
130    })
131}
132
133fn codex_response_item(message: &crate::sessions::MessageWithParts) -> Value {
134    json!({
135        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
136        "type": "response_item",
137        "payload": codex_payload(message),
138    })
139}
140
141fn codex_payload(message: &crate::sessions::MessageWithParts) -> Value {
142    if let Some(part) = message.parts.first() {
143        match &part.kind {
144            PartKind::ToolCall {
145                call_id,
146                name,
147                params,
148                ..
149            } if matches!(message.message, Message::Assistant { .. }) => {
150                return json!({
151                    "type": "function_call",
152                    "call_id": extracted_text(call_id),
153                    "name": extracted_text(name),
154                    "arguments": compact_json(params),
155                });
156            }
157            PartKind::ToolResult {
158                call_id, result, ..
159            } if matches!(message.message, Message::Tool { .. }) => {
160                return json!({
161                    "type": "function_call_output",
162                    "call_id": extracted_text(call_id),
163                    "output": result,
164                });
165            }
166            PartKind::Reasoning { text }
167                if matches!(message.message, Message::Assistant { .. }) =>
168            {
169                if let Some(text) = text
170                    && let Ok(value) = serde_json::from_str::<Value>(text.as_ref())
171                {
172                    return value;
173                }
174                return json!({
175                    "type": "reasoning",
176                    "summary": [{"type": "summary_text", "text": extracted_text(text)}],
177                });
178            }
179            _ => {}
180        }
181    }
182    let is_assistant = matches!(message.message, Message::Assistant { .. });
183    json!({
184        "type": "message",
185        "role": match message.message.role() {
186            crate::wire::Role::System => "developer",
187            crate::wire::Role::User => "user",
188            crate::wire::Role::Assistant => "assistant",
189            crate::wire::Role::Tool => "tool",
190        },
191        "content": message
192            .parts
193            .iter()
194            .map(|part| codex_content_part(part, is_assistant))
195            .collect::<Vec<_>>(),
196    })
197}
198
199fn codex_content_part(part: &Part, is_assistant: bool) -> Value {
200    // Codex tags an assistant turn's content `output_text` and a user or
201    // developer turn's content `input_text` - the discriminator is the
202    // owning message's role, not the part.
203    let text_type = if is_assistant {
204        "output_text"
205    } else {
206        "input_text"
207    };
208    match &part.kind {
209        PartKind::Text { text } => json!({
210            "type": text_type,
211            "text": extracted_text(text),
212        }),
213        PartKind::File { data, .. } => json!({
214            "type": text_type,
215            "text": match data {
216                crate::wire::FileData::String(value) => value.clone(),
217                crate::wire::FileData::Bytes(value) => format!("<{} bytes>", value.len()),
218                crate::wire::FileData::Url(value) => value.clone(),
219            },
220        }),
221        other => json!({
222            "type": text_type,
223            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
224        }),
225    }
226}
227
228#[derive(Debug, Clone)]
229pub struct CodexCliAdapter {
230    root: PathBuf,
231}
232
233impl CodexCliAdapter {
234    pub fn new(root: impl Into<PathBuf>) -> Self {
235        Self { root: root.into() }
236    }
237}
238
239impl Adapter for CodexCliAdapter {
240    fn discover(&self) -> DiscoverFuture<'_> {
241        jsonl_tree_discover(self)
242    }
243
244    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
245        jsonl_tree_events(self, oracle)
246    }
247}
248
249impl JsonlTree for CodexCliAdapter {
250    type State = HashMap<String, Extracted<String>>;
251
252    fn name(&self) -> &'static str {
253        NAME
254    }
255
256    fn root(&self) -> &Path {
257        &self.root
258    }
259
260    fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
261        let row: Value = serde_json::from_str(first_line).ok()?;
262        if row.get("type").and_then(Value::as_str) == Some("session_meta") {
263            row.get("payload")?
264                .get("id")?
265                .as_str()
266                .map(ToOwned::to_owned)
267        } else if is_legacy_session_row(&row) {
268            row.get("id")?.as_str().map(ToOwned::to_owned)
269        } else {
270            None
271        }
272    }
273
274    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
275        session_from_rows(path, rows)
276    }
277
278    fn events_from_row(
279        &self,
280        session: &Session,
281        row: &BoundedRow,
282        state: &mut Self::State,
283    ) -> Result<Vec<IngestEvent>, String> {
284        capture_tool_call_name(&row.value, state);
285        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
286    }
287}
288
289/// True for a pre-Oct-2025 legacy rollout's bare first row: session metadata
290/// (`id`/`timestamp`/`git`/`instructions`) at the top level with no `type`
291/// envelope. spec.md#adapters: legacy rollouts predate the `session_meta`
292/// wrapper, so the first row IS the payload.
293fn is_legacy_session_row(row: &Value) -> bool {
294    row.get("type").is_none() && row.get("id").is_some()
295}
296
297fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
298    let path_display = path.display().to_string();
299    let first = rows
300        .first()
301        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
302    let row = &first.value;
303    let at_first = format!("{path_display}:{}", first.line);
304    // The current rollout wraps session metadata in a `session_meta` envelope;
305    // a legacy rollout (spec.md#adapters) has none - the first row is a bare
306    // metadata object. Either way, read fields from `payload`.
307    let payload = if row.get("type").and_then(Value::as_str) == Some("session_meta") {
308        row.get("payload").cloned().unwrap_or(Value::Null)
309    } else if is_legacy_session_row(row) {
310        row.clone()
311    } else {
312        return Err(AdapterError::schema(
313            NAME,
314            at_first,
315            "first row must be session_meta",
316        ));
317    };
318    let id = payload
319        .get("id")
320        .and_then(Value::as_str)
321        .ok_or_else(|| {
322            AdapterError::schema(NAME, at_first.clone(), "session_meta missing payload.id")
323        })?
324        .to_owned();
325    let created_at = payload
326        .get("timestamp")
327        .and_then(Value::as_str)
328        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
329        .map(|dt| dt.with_timezone(&Utc))
330        .or_else(|| {
331            row.get("timestamp")
332                .and_then(Value::as_str)
333                .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
334                .map(|dt| dt.with_timezone(&Utc))
335        })
336        .ok_or_else(|| {
337            AdapterError::schema(NAME, at_first, "session_meta has no parseable timestamp")
338        })?;
339    let project = match extract_str(&payload, "cwd") {
340        Some(value) => value,
341        None => {
342            let path_str = path
343                .file_name()
344                .and_then(|n| n.to_str())
345                .unwrap_or(path_display.as_str())
346                .to_owned();
347            extract_self_str(&Value::String(path_str)).ok_or_else(|| {
348                AdapterError::schema(
349                    NAME,
350                    path_display.clone(),
351                    "internal: Value::String produced None from Source::as_str",
352                )
353            })?
354        }
355    };
356    let mut options = ProviderOptions::new();
357    options.insert(
358        "source".to_owned(),
359        json!({
360            "adapter": "codex-cli",
361            "originator": payload.get("originator"),
362            "cli_version": payload.get("cli_version"),
363            "model_provider": payload.get("model_provider"),
364            "git": payload.get("git"),
365            "base_instructions": payload.get("base_instructions"),
366            "instructions": payload.get("instructions"),
367            "source": payload.get("source"),
368            "raw_record": extract_raw_record(row),
369        }),
370    );
371
372    Ok(Session {
373        id,
374        parent_session_id: None,
375        parent_message_id: None,
376        source_agent: "codex-cli".to_owned(),
377        created_at,
378        project,
379        options,
380    })
381}
382
383/// Map one codex-cli JSONL record into zero-or-more `IngestEvent`s. Records pond
384/// keeps: `response_item` with `payload.type = "message"` (User/Assistant/
385/// System message + text Parts), `function_call` (Assistant + ToolCall),
386/// `function_call_output` (Tool + ToolResult), `reasoning` (Assistant +
387/// Reasoning Part). `session_meta` is consumed up front; `event_msg` and
388/// `turn_context` are transport noise. Legacy rows (spec.md#adapters) carry
389/// the same payload shapes un-enveloped; `{record_type:"state"}` markers and
390/// the bare first row are eventless.
391fn events_from_row(
392    session_id: &str,
393    line: usize,
394    row: &Value,
395    default_timestamp: DateTime<Utc>,
396    tool_call_names: &HashMap<String, Extracted<String>>,
397) -> Result<Vec<IngestEvent>, String> {
398    let kind = row.get("type").and_then(Value::as_str);
399    // Eventless rows: `session_meta` (current) and the legacy bare first row
400    // are both consumed up front by session_meta(); legacy
401    // `{record_type:"state"}` markers are transport noise (spec.md#adapters).
402    if kind == Some("session_meta")
403        || is_legacy_session_row(row)
404        || (kind.is_none() && row.get("record_type").is_some())
405    {
406        return Ok(Vec::new());
407    }
408    // Normalize to the per-turn payload. A current row wraps it in a
409    // `response_item` envelope carrying its own timestamp; a legacy data row
410    // IS the payload (spec.md#adapters) and inherits the session timestamp.
411    let (payload, timestamp) = if kind == Some("response_item") {
412        let timestamp = row
413            .get("timestamp")
414            .and_then(Value::as_str)
415            .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
416            .map(|dt| dt.with_timezone(&Utc))
417            .unwrap_or(default_timestamp);
418        (row.get("payload").unwrap_or(&Value::Null), timestamp)
419    } else {
420        (row, default_timestamp)
421    };
422    let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
423    let message_id = format!("{session_id}:{line:06}");
424
425    match payload_type {
426        "message" => message_events(session_id, &message_id, timestamp, payload, row),
427        "function_call" => Ok(tool_call_events(
428            session_id,
429            &message_id,
430            timestamp,
431            payload,
432            row,
433        )),
434        "function_call_output" => Ok(tool_result_events(
435            session_id,
436            &message_id,
437            timestamp,
438            payload,
439            row,
440            tool_call_names,
441        )),
442        "reasoning" => Ok(reasoning_events(
443            session_id,
444            &message_id,
445            timestamp,
446            payload,
447            row,
448        )),
449        "custom_tool_call" => Ok(custom_tool_call_events(
450            session_id,
451            &message_id,
452            timestamp,
453            payload,
454            row,
455        )),
456        "custom_tool_call_output" => Ok(custom_tool_result_events(
457            session_id,
458            &message_id,
459            timestamp,
460            payload,
461            row,
462        )),
463        _ => Ok(vec![raw_carrier_event(session_id, line, row, timestamp)]),
464    }
465}
466
467fn row_options(row: &Value) -> ProviderOptions {
468    let mut options = ProviderOptions::new();
469    options.insert(
470        "source".to_owned(),
471        json!({ "raw_record": extract_raw_record(row) }),
472    );
473    options
474}
475
476fn raw_carrier_event(
477    session_id: &str,
478    line: usize,
479    row: &Value,
480    timestamp: DateTime<Utc>,
481) -> IngestEvent {
482    IngestEvent::Message(Message::System {
483        id: row
484            .get("id")
485            .and_then(Value::as_str)
486            .map_or_else(|| format!("{session_id}:{line:06}:raw"), ToOwned::to_owned),
487        session_id: session_id.to_owned(),
488        timestamp: row
489            .get("timestamp")
490            .and_then(Value::as_str)
491            .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
492            .map(|dt| dt.with_timezone(&Utc))
493            .unwrap_or(timestamp),
494        content: None,
495        options: row_options(row),
496    })
497}
498
499/// Stash one row's `function_call` (call_id -> name) into the per-file
500/// map so the matching `function_call_output` row downstream can resolve
501/// the tool name rather than fall back to a sentinel.
502fn capture_tool_call_name(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
503    // Mirror events_from_row's payload normalization: a current row wraps the
504    // payload under `response_item`, a legacy row IS the payload.
505    let payload = match row.get("type").and_then(Value::as_str) {
506        Some("response_item") => row.get("payload"),
507        Some(_) => Some(row),
508        None => None,
509    };
510    let Some(payload) = payload else {
511        return;
512    };
513    if payload.get("type").and_then(Value::as_str) != Some("function_call") {
514        return;
515    }
516    let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
517        return;
518    };
519    let Some(name) = extract_str(payload, "name") else {
520        return;
521    };
522    map.insert(call_id.to_owned(), name);
523}
524
525fn message_events(
526    session_id: &str,
527    message_id: &str,
528    timestamp: DateTime<Utc>,
529    payload: &Value,
530    row: &Value,
531) -> Result<Vec<IngestEvent>, String> {
532    let role = payload
533        .get("role")
534        .and_then(Value::as_str)
535        .ok_or_else(|| "message missing role".to_owned())?;
536    let content = payload
537        .get("content")
538        .and_then(Value::as_array)
539        .cloned()
540        .unwrap_or_default();
541    // spec.md#model-part-provenance: a `developer` record is a harness instruction
542    // block; a `user`-slot record whose body is `<environment_context>` or an
543    // `# AGENTS.md instructions` blob is injected context, not a genuine
544    // prompt. Everything else in a message record is conversation.
545    let provenance = message_provenance(role, &content);
546    let mut parts = Vec::with_capacity(content.len());
547    for (ordinal, item) in content.iter().enumerate() {
548        // Faithful encoding of one content item: prefer the raw `text`
549        // field when present; otherwise compact-encode the structured
550        // body as a JSON string. The fallback is lossless (preserves the
551        // item bytes) and explicit (not a synthesised "unknown" or "").
552        let text = extract_str(item, "text").or_else(|| Some(extract_compact_repr(item)));
553        parts.push(Part {
554            session_id: session_id.to_owned(),
555            id: part_id(message_id, ordinal),
556            message_id: message_id.to_owned(),
557            ordinal: part_ordinal(ordinal),
558            provenance,
559            options: empty_options(),
560            kind: PartKind::Text { text },
561        });
562    }
563
564    let (message, keep_parts) = match role {
565        "user" => (
566            Message::User {
567                id: message_id.to_owned(),
568                session_id: session_id.to_owned(),
569                timestamp,
570                options: row_options(row),
571            },
572            true,
573        ),
574        "assistant" => (
575            Message::Assistant {
576                id: message_id.to_owned(),
577                session_id: session_id.to_owned(),
578                timestamp,
579                options: row_options(row),
580            },
581            true,
582        ),
583        // `developer` rows are codex-cli's system-prompt frames; map to System
584        // with `content: None` and let the inner Text Parts carry the body.
585        "developer" | "system" => (
586            Message::System {
587                id: message_id.to_owned(),
588                session_id: session_id.to_owned(),
589                timestamp,
590                content: None,
591                options: row_options(row),
592            },
593            true,
594        ),
595        other => return Err(format!("unsupported codex-cli role {other}")),
596    };
597
598    let mut events = Vec::with_capacity(parts.len() + 1);
599    events.push(IngestEvent::Message(message));
600    if keep_parts {
601        events.extend(parts.into_iter().map(IngestEvent::Part));
602    }
603    Ok(events)
604}
605
606/// Provenance of a codex `message` record (spec.md#model-part-provenance). A
607/// `developer` record is a harness instruction block; a `user`-slot record
608/// whose only content is `<environment_context>` or `# AGENTS.md instructions`
609/// is injected context rather than a typed prompt. v1 codex never interleaves
610/// authored and injected content within one record.
611fn message_provenance(role: &str, content: &[Value]) -> Provenance {
612    if role == "developer" || role == "system" {
613        return Provenance::Injected;
614    }
615    if role == "user" {
616        let injected = content.iter().any(|item| {
617            item.get("text")
618                .and_then(Value::as_str)
619                .is_some_and(is_injected_user_text)
620        });
621        if injected {
622            return Provenance::Injected;
623        }
624    }
625    Provenance::Conversational
626}
627
628/// Harness-injected user-slot content codex emits as a non-prompt record.
629fn is_injected_user_text(text: &str) -> bool {
630    let trimmed = text.trim_start();
631    trimmed.starts_with("<environment_context>")
632        || trimmed.starts_with("<user_instructions>")
633        || trimmed.starts_with("# AGENTS.md")
634}
635
636fn tool_call_events(
637    session_id: &str,
638    message_id: &str,
639    timestamp: DateTime<Utc>,
640    payload: &Value,
641    row: &Value,
642) -> Vec<IngestEvent> {
643    let call_id = extract_str(payload, "call_id");
644    let name = extract_str(payload, "name");
645    let params = match payload.get("arguments") {
646        Some(Value::String(text)) => {
647            serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
648        }
649        Some(other) => other.clone(),
650        None => Value::Null,
651    };
652    let part = Part {
653        session_id: session_id.to_owned(),
654        id: part_id(message_id, 0),
655        message_id: message_id.to_owned(),
656        ordinal: 0,
657        // spec.md#model-part-provenance: the model authored the tool call.
658        provenance: Provenance::Conversational,
659        options: empty_options(),
660        kind: PartKind::ToolCall {
661            call_id,
662            name,
663            params,
664            provider_executed: false,
665        },
666    };
667    vec![
668        IngestEvent::Message(Message::Assistant {
669            id: message_id.to_owned(),
670            session_id: session_id.to_owned(),
671            timestamp,
672            options: row_options(row),
673        }),
674        IngestEvent::Part(part),
675    ]
676}
677
678fn custom_tool_call_events(
679    session_id: &str,
680    message_id: &str,
681    timestamp: DateTime<Utc>,
682    payload: &Value,
683    row: &Value,
684) -> Vec<IngestEvent> {
685    let part = Part {
686        session_id: session_id.to_owned(),
687        id: part_id(message_id, 0),
688        message_id: message_id.to_owned(),
689        ordinal: 0,
690        // spec.md#model-part-provenance: the model authored the tool call.
691        provenance: Provenance::Conversational,
692        options: empty_options(),
693        kind: PartKind::ToolCall {
694            call_id: extract_str(payload, "call_id"),
695            name: extract_str(payload, "name"),
696            params: payload.get("input").cloned().unwrap_or(Value::Null),
697            provider_executed: true,
698        },
699    };
700    vec![
701        IngestEvent::Message(Message::Assistant {
702            id: message_id.to_owned(),
703            session_id: session_id.to_owned(),
704            timestamp,
705            options: row_options(row),
706        }),
707        IngestEvent::Part(part),
708    ]
709}
710
711fn custom_tool_result_events(
712    session_id: &str,
713    message_id: &str,
714    timestamp: DateTime<Utc>,
715    payload: &Value,
716    row: &Value,
717) -> Vec<IngestEvent> {
718    let part = Part {
719        session_id: session_id.to_owned(),
720        id: part_id(message_id, 0),
721        message_id: message_id.to_owned(),
722        ordinal: 0,
723        // spec.md#model-part-provenance: tool output is runtime-produced.
724        provenance: Provenance::Injected,
725        options: empty_options(),
726        kind: PartKind::ToolResult {
727            call_id: extract_str(payload, "call_id"),
728            name: extract_str(payload, "name"),
729            is_failure: false,
730            result: payload.get("output").cloned().unwrap_or(Value::Null),
731        },
732    };
733    vec![
734        IngestEvent::Message(Message::Tool {
735            id: message_id.to_owned(),
736            session_id: session_id.to_owned(),
737            timestamp,
738            options: row_options(row),
739        }),
740        IngestEvent::Part(part),
741    ]
742}
743
744fn tool_result_events(
745    session_id: &str,
746    message_id: &str,
747    timestamp: DateTime<Utc>,
748    payload: &Value,
749    row: &Value,
750    tool_call_names: &HashMap<String, Extracted<String>>,
751) -> Vec<IngestEvent> {
752    let call_id = extract_str(payload, "call_id");
753    // Resolve tool name from the earlier `function_call` row via the
754    // per-file `call_id -> name` map. Misses (e.g. compaction pruned the
755    // originating call) yield `None`, a faithful "unresolved" value.
756    let name = call_id
757        .as_ref()
758        .and_then(|id| tool_call_names.get(id.as_str()))
759        .cloned();
760    let result = payload.get("output").cloned().unwrap_or(Value::Null);
761    let part = Part {
762        session_id: session_id.to_owned(),
763        id: part_id(message_id, 0),
764        message_id: message_id.to_owned(),
765        ordinal: 0,
766        // spec.md#model-part-provenance: tool output is runtime-produced.
767        provenance: Provenance::Injected,
768        options: empty_options(),
769        kind: PartKind::ToolResult {
770            call_id,
771            name,
772            is_failure: false,
773            result,
774        },
775    };
776    vec![
777        IngestEvent::Message(Message::Tool {
778            id: message_id.to_owned(),
779            session_id: session_id.to_owned(),
780            timestamp,
781            options: row_options(row),
782        }),
783        IngestEvent::Part(part),
784    ]
785}
786
787fn reasoning_events(
788    session_id: &str,
789    message_id: &str,
790    timestamp: DateTime<Utc>,
791    payload: &Value,
792    row: &Value,
793) -> Vec<IngestEvent> {
794    // The source `summary` array is the only place reasoning text lives in
795    // codex-cli's format. Empty array (or missing field) -> `None`. Joined
796    // text -> `Some(...)`. Don't synthesize an empty string.
797    let summary = payload
798        .get("summary")
799        .and_then(Value::as_array)
800        .and_then(|items| {
801            let joined = items
802                .iter()
803                .filter_map(|item| extract_str(item, "text"))
804                .map(|e| (*e).clone())
805                .collect::<Vec<_>>()
806                .join("\n");
807            if joined.is_empty() {
808                None
809            } else {
810                Some(extract_compact_repr(payload))
811            }
812        });
813    let part = Part {
814        session_id: session_id.to_owned(),
815        id: part_id(message_id, 0),
816        message_id: message_id.to_owned(),
817        ordinal: 0,
818        // spec.md#model-part-provenance: model-authored reasoning.
819        provenance: Provenance::Conversational,
820        options: empty_options(),
821        kind: PartKind::Reasoning { text: summary },
822    };
823    vec![
824        IngestEvent::Message(Message::Assistant {
825            id: message_id.to_owned(),
826            session_id: session_id.to_owned(),
827            timestamp,
828            options: row_options(row),
829        }),
830        IngestEvent::Part(part),
831    ]
832}
833
834#[cfg(test)]
835mod tests {
836    //! End-to-end test for the codex-cli adapter: ingest the committed fixture
837    //! corpus and assert pond's canonical Session/Message/Part shape comes out
838    //! the other side. The fixture lives under
839    //! `tests/fixtures/adapter/codex_cli/`.
840    #![allow(clippy::expect_used, clippy::unwrap_used)]
841
842    use super::*;
843    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
844    use tempfile::TempDir;
845
846    // Manifest-dir anchored: unit tests must not depend on the process cwd
847    // (figment::Jail chdirs the whole test process while config tests run).
848    const FIXTURES: &str = concat!(
849        env!("CARGO_MANIFEST_DIR"),
850        "/tests/fixtures/adapter/codex_cli/sessions"
851    );
852
853    #[test]
854    fn probe_default_finds_codex_sessions_under_home() -> anyhow::Result<()> {
855        crate::adapter::test_support::assert_probe_default(
856            &CodexCliFactory,
857            &[".codex", "sessions"],
858        )
859    }
860
861    #[tokio::test(flavor = "multi_thread")]
862    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
863        let adapter = CodexCliAdapter::new(FIXTURES);
864        crate::adapter::test_support::assert_native_restore(
865            &CodexCliFactory,
866            &adapter,
867            // Codex rollout paths embed the `sessions/` segment, so the corpus
868            // root is FIXTURES' parent, not FIXTURES itself.
869            std::path::Path::new(FIXTURES)
870                .parent()
871                .expect("FIXTURES is nested under a corpus root"),
872        )
873        .await
874    }
875
876    #[tokio::test(flavor = "multi_thread")]
877    async fn codex_cli_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
878        let temp = TempDir::new()?;
879        let store = Store::open_local(temp.path()).await?;
880        let adapter = CodexCliAdapter::new(FIXTURES);
881
882        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
883        assert!(summary.accepted() > 0, "ingest must accept rows");
884        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
885        assert_eq!(
886            summary.dropped_sessions, 0,
887            "no session-level rejections expected"
888        );
889        assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
890
891        let (sessions, messages, parts) = store.row_counts().await?;
892        assert!(sessions > 0, "at least one codex-cli session");
893        assert!(messages > 0, "at least one codex-cli message");
894        assert!(parts > 0, "at least one codex-cli Part");
895
896        let mut saw_text_part = false;
897        for session_id in store.session_ids().await? {
898            let session = store
899                .get_session(&session_id)
900                .await?
901                .expect("session round-trips");
902            assert_eq!(session.session.source_agent, "codex-cli");
903            assert!(
904                !session.messages.is_empty(),
905                "session {session_id} must carry messages",
906            );
907            for stored in &session.messages {
908                for part in &stored.parts {
909                    if matches!(part.kind, PartKind::Text { .. }) {
910                        saw_text_part = true;
911                    }
912                }
913            }
914        }
915        assert!(
916            saw_text_part,
917            "codex-cli corpus must contain at least one Text Part",
918        );
919        Ok(())
920    }
921
922    /// spec.md#model-part-provenance: a `developer` record and a `user`-slot record
923    /// whose body is `<environment_context>` are harness-injected; a genuine
924    /// user prompt and an assistant message are conversation.
925    #[test]
926    fn message_provenance_separates_prompts_from_harness_records() {
927        let prompt = vec![json!({"type": "input_text", "text": "refactor this"})];
928        assert_eq!(
929            message_provenance("user", &prompt),
930            Provenance::Conversational,
931        );
932        assert_eq!(
933            message_provenance("assistant", &[]),
934            Provenance::Conversational,
935        );
936
937        let developer = vec![json!({"type": "input_text", "text": "you are an agent"})];
938        assert_eq!(
939            message_provenance("developer", &developer),
940            Provenance::Injected,
941        );
942
943        let env = vec![json!({
944            "type": "input_text",
945            "text": "<environment_context>cwd=/tmp</environment_context>",
946        })];
947        assert_eq!(message_provenance("user", &env), Provenance::Injected);
948    }
949
950    #[test]
951    fn legacy_rows_normalize_to_payloads() {
952        let ts = Utc::now();
953        let map: HashMap<String, Extracted<String>> = HashMap::new();
954
955        // The bare first row and `{record_type:"state"}` markers are eventless.
956        let first = json!({"id": "s1", "timestamp": "2025-09-13T04:30:17.447Z"});
957        let state = json!({"record_type": "state"});
958        assert!(
959            events_from_row("s1", 1, &first, ts, &map)
960                .expect("legacy first row parses")
961                .is_empty(),
962        );
963        assert!(
964            events_from_row("s1", 2, &state, ts, &map)
965                .expect("state marker parses")
966                .is_empty(),
967        );
968
969        // An un-enveloped legacy `message` row yields a Message + Text Part.
970        let message = json!({
971            "type": "message",
972            "role": "user",
973            "content": [{"type": "input_text", "text": "hi"}],
974        });
975        let events = events_from_row("s1", 3, &message, ts, &map).expect("legacy message parses");
976        assert_eq!(events.len(), 2, "message + one Text Part");
977        assert!(matches!(
978            events[0],
979            IngestEvent::Message(Message::User { .. })
980        ));
981        assert!(matches!(
982            &events[1],
983            IngestEvent::Part(part) if matches!(part.kind, PartKind::Text { .. }),
984        ));
985    }
986
987    #[tokio::test(flavor = "multi_thread")]
988    async fn legacy_rollout_ingests_into_canonical_shape() -> anyhow::Result<()> {
989        let temp = TempDir::new()?;
990        let store = Store::open_local(temp.path()).await?;
991        let adapter = CodexCliAdapter::new(FIXTURES);
992        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
993
994        // The legacy fixture's bare first row -> Session: id and timestamp
995        // read from the top level, with no `session_meta` envelope.
996        let session = store
997            .get_session("67c52f3f-d25e-4194-a006-93de58f28d7c")
998            .await?
999            .expect("legacy rollout ingests as a session");
1000        assert_eq!(session.session.source_agent, "codex-cli");
1001        assert_eq!(
1002            session
1003                .session
1004                .created_at
1005                .to_rfc3339_opts(SecondsFormat::Millis, true),
1006            "2025-09-13T04:30:17.447Z",
1007        );
1008        // Eleven un-enveloped data rows -> eleven messages.
1009        assert_eq!(session.messages.len(), 11, "every legacy data row ingests");
1010        // The legacy `function_call_output` resolves its tool name from the
1011        // prior legacy `function_call` row via the per-file call_id map.
1012        let resolved = session.messages.iter().any(|message| {
1013            message
1014                .parts
1015                .iter()
1016                .any(|part| matches!(&part.kind, PartKind::ToolResult { name: Some(_), .. }))
1017        });
1018        assert!(
1019            resolved,
1020            "legacy function_call_output resolves its tool name"
1021        );
1022        Ok(())
1023    }
1024}