Skip to main content

pond/adapter/
codex_cli.rs

1//! OpenAI Codex CLI adapter.
2//!
3//! Source path: `~/.codex/sessions/<year>/<month>/<day>/rollout-<ts>-<uuid>.jsonl`.
4//! Each line is an envelope `{timestamp, type, payload}`. Top-level types:
5//! `session_meta` (consumed up front for Session), `event_msg` /
6//! `turn_context` (transport noise, skipped), `response_item` (the per-turn
7//! model interaction: subtypes `message`, `reasoning`, `function_call`,
8//! `function_call_output`, `custom_tool_call`).
9//!
10//! Pre-Oct-2025 legacy rollouts (spec.md#adapters) predate the envelope: the
11//! first row is a bare metadata object and each data row is an un-enveloped
12//! payload, interleaved with `{record_type:"state"}` noise. The adapter
13//! accepts both shapes.
14
15use std::{
16    collections::HashMap,
17    path::{Path, PathBuf},
18};
19
20use chrono::{DateTime, Datelike, SecondsFormat, Utc};
21use serde_json::{Value, json};
22
23use crate::{
24    sessions::IngestEvent,
25    wire::{Message, Part, PartKind, Provenance, ProviderOptions, Session},
26};
27
28use super::{
29    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
30    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
31    empty_options,
32    extract::{Extracted, extract_compact_repr, extract_raw_record, extract_self_str, extract_str},
33    extracted_text,
34    jsonl::{BoundedRow, JsonlTree, TAIL_CAP, jsonl_tree_discover, jsonl_tree_events, read_tail},
35    jsonl_bytes, part_id, part_ordinal, raw_record,
36};
37
38const NAME: &str = "codex-cli";
39
40/// Stateless factory: opens [`CodexCliAdapter`] instances and probes for the
41/// canonical install location under `~/.codex/sessions`.
42pub struct CodexCliFactory;
43
44impl AdapterFactory for CodexCliFactory {
45    fn name(&self) -> &'static str {
46        NAME
47    }
48
49    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
50        Ok(Box::new(CodexCliAdapter::new(config_path(NAME, config)?)))
51    }
52
53    fn probe_default(&self, env: &Env) -> Option<Value> {
54        let path = env.home.join(".codex").join("sessions");
55        path.exists().then(|| json!({ "path": path }))
56    }
57
58    fn serialize(
59        &self,
60        session: &crate::sessions::SessionWithMessages,
61        fidelity: RestoreFidelity,
62    ) -> Result<Vec<RestoredFile>, AdapterError> {
63        serialize_session(session, fidelity)
64    }
65}
66
67fn serialize_session(
68    session: &crate::sessions::SessionWithMessages,
69    fidelity: RestoreFidelity,
70) -> Result<Vec<RestoredFile>, AdapterError> {
71    // Native replays verbatim `options.source.raw_record` rows (session_meta,
72    // then one per message); `codex_session_meta` / `codex_response_item` below
73    // are foreign-only. Replay echoes a frozen snapshot - safe only while
74    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
75    let mut records = Vec::new();
76    if fidelity == RestoreFidelity::Native
77        && let Some(raw) = raw_record(&session.session.options)
78    {
79        records.push(raw);
80    } else {
81        records.push(codex_session_meta(session));
82    }
83    let mut messages = session.messages.clone();
84    messages.sort_by(by_timestamp_then_id);
85    for message in &messages {
86        if fidelity == RestoreFidelity::Native
87            && let Some(raw) = raw_record(message.message.options())
88        {
89            records.push(raw);
90            continue;
91        }
92        // Foreign restore: a System message (a rule-3 carrier, or a source's
93        // own system/developer turn) has no idiomatic home in another
94        // client's transcript - drop it; the content stays in canonical
95        // (spec.md#adapter-native-restore-lossless, foreign clause).
96        if matches!(message.message, Message::System { .. }) {
97            continue;
98        }
99        records.push(codex_response_item(message));
100    }
101    Ok(vec![RestoredFile::new(
102        codex_relative_path(session),
103        jsonl_bytes(NAME, &records)?,
104        fidelity,
105    )])
106}
107
108fn codex_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
109    let ts = session.session.created_at;
110    let filename_ts = ts.format("%Y-%m-%dT%H-%M-%S");
111    PathBuf::from("sessions")
112        .join(format!("{:04}", ts.year()))
113        .join(format!("{:02}", ts.month()))
114        .join(format!("{:02}", ts.day()))
115        .join(format!(
116            "rollout-{filename_ts}-{}.jsonl",
117            session.session.id
118        ))
119}
120
121fn codex_session_meta(session: &crate::sessions::SessionWithMessages) -> Value {
122    json!({
123        "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
124        "type": "session_meta",
125        "payload": {
126            "id": session.session.id,
127            "timestamp": session.session.created_at.to_rfc3339_opts(SecondsFormat::Millis, true),
128            "cwd": &*session.session.project,
129        }
130    })
131}
132
133fn codex_response_item(message: &crate::sessions::MessageWithParts) -> Value {
134    json!({
135        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
136        "type": "response_item",
137        "payload": codex_payload(message),
138    })
139}
140
141fn codex_payload(message: &crate::sessions::MessageWithParts) -> Value {
142    if let Some(part) = message.parts.first() {
143        match &part.kind {
144            PartKind::ToolCall {
145                call_id,
146                name,
147                params,
148                ..
149            } if matches!(message.message, Message::Assistant { .. }) => {
150                return json!({
151                    "type": "function_call",
152                    "call_id": extracted_text(call_id),
153                    "name": extracted_text(name),
154                    "arguments": compact_json(params),
155                });
156            }
157            PartKind::ToolResult {
158                call_id, result, ..
159            } if matches!(message.message, Message::Tool { .. }) => {
160                return json!({
161                    "type": "function_call_output",
162                    "call_id": extracted_text(call_id),
163                    "output": result,
164                });
165            }
166            PartKind::Reasoning { text }
167                if matches!(message.message, Message::Assistant { .. }) =>
168            {
169                if let Some(text) = text
170                    && let Ok(value) = serde_json::from_str::<Value>(text.as_ref())
171                {
172                    return value;
173                }
174                return json!({
175                    "type": "reasoning",
176                    "summary": [{"type": "summary_text", "text": extracted_text(text)}],
177                });
178            }
179            _ => {}
180        }
181    }
182    let is_assistant = matches!(message.message, Message::Assistant { .. });
183    json!({
184        "type": "message",
185        "role": match message.message.role() {
186            crate::wire::Role::System => "developer",
187            crate::wire::Role::User => "user",
188            crate::wire::Role::Assistant => "assistant",
189            crate::wire::Role::Tool => "tool",
190        },
191        "content": message
192            .parts
193            .iter()
194            .map(|part| codex_content_part(part, is_assistant))
195            .collect::<Vec<_>>(),
196    })
197}
198
199fn codex_content_part(part: &Part, is_assistant: bool) -> Value {
200    // Codex tags an assistant turn's content `output_text` and a user or
201    // developer turn's content `input_text` - the discriminator is the
202    // owning message's role, not the part.
203    let text_type = if is_assistant {
204        "output_text"
205    } else {
206        "input_text"
207    };
208    match &part.kind {
209        PartKind::Text { text } => json!({
210            "type": text_type,
211            "text": extracted_text(text),
212        }),
213        PartKind::File { data, .. } => json!({
214            "type": text_type,
215            "text": match data {
216                crate::wire::FileData::String(value) => value.clone(),
217                crate::wire::FileData::Bytes(value) => format!("<{} bytes>", value.len()),
218                crate::wire::FileData::Url(value) => value.clone(),
219            },
220        }),
221        other => json!({
222            "type": text_type,
223            "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null)),
224        }),
225    }
226}
227
228#[derive(Debug, Clone)]
229pub struct CodexCliAdapter {
230    root: PathBuf,
231}
232
233impl CodexCliAdapter {
234    pub fn new(root: impl Into<PathBuf>) -> Self {
235        Self { root: root.into() }
236    }
237}
238
239impl Adapter for CodexCliAdapter {
240    fn discover(&self) -> DiscoverFuture<'_> {
241        jsonl_tree_discover(self)
242    }
243
244    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
245        jsonl_tree_events(self, oracle)
246    }
247}
248
249impl JsonlTree for CodexCliAdapter {
250    type State = HashMap<String, Extracted<String>>;
251
252    fn name(&self) -> &'static str {
253        NAME
254    }
255
256    fn root(&self) -> &Path {
257        &self.root
258    }
259
260    fn peek_session_id(&self, _path: &Path, first_line: &str) -> Option<String> {
261        let row: Value = serde_json::from_str(first_line).ok()?;
262        if row.get("type").and_then(Value::as_str) == Some("session_meta") {
263            row.get("payload")?
264                .get("id")?
265                .as_str()
266                .map(ToOwned::to_owned)
267        } else if is_legacy_session_row(&row) {
268            row.get("id")?.as_str().map(ToOwned::to_owned)
269        } else {
270            None
271        }
272    }
273
274    fn peek_last_ts(&self, path: &Path) -> Option<i64> {
275        // Codex message ids are physical line numbers, not tail-recoverable on a
276        // multi-GB rollout, so freshness keys on the watermark timestamp instead.
277        // Pond stores the envelope `timestamp` only for `response_item` rows
278        // (`event_msg`/`turn_context` get the session-start default), so the
279        // session's max stored timestamp is its last response_item's. Scan a
280        // bounded tail backward for it - never the whole file.
281        let tail = read_tail(path, TAIL_CAP)?;
282        tail.split(|&byte| byte == b'\n')
283            .rev()
284            .filter_map(|line| serde_json::from_slice::<Value>(line).ok())
285            .find(|row| row.get("type").and_then(Value::as_str) == Some("response_item"))
286            .and_then(|row| {
287                let text = row.get("timestamp").and_then(Value::as_str)?;
288                Some(
289                    DateTime::parse_from_rfc3339(text)
290                        .ok()?
291                        .with_timezone(&Utc)
292                        .timestamp_micros(),
293                )
294            })
295    }
296
297    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
298        session_from_rows(path, rows)
299    }
300
301    fn events_from_row(
302        &self,
303        session: &Session,
304        row: &BoundedRow,
305        state: &mut Self::State,
306    ) -> Result<Vec<IngestEvent>, String> {
307        capture_tool_call_name(&row.value, state);
308        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
309    }
310}
311
312/// True for a pre-Oct-2025 legacy rollout's bare first row: session metadata
313/// (`id`/`timestamp`/`git`/`instructions`) at the top level with no `type`
314/// envelope. spec.md#adapters: legacy rollouts predate the `session_meta`
315/// wrapper, so the first row IS the payload.
316fn is_legacy_session_row(row: &Value) -> bool {
317    row.get("type").is_none() && row.get("id").is_some()
318}
319
320fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
321    let path_display = path.display().to_string();
322    let first = rows
323        .first()
324        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
325    let row = &first.value;
326    let at_first = format!("{path_display}:{}", first.line);
327    // The current rollout wraps session metadata in a `session_meta` envelope;
328    // a legacy rollout (spec.md#adapters) has none - the first row is a bare
329    // metadata object. Either way, read fields from `payload`.
330    let payload = if row.get("type").and_then(Value::as_str) == Some("session_meta") {
331        row.get("payload").cloned().unwrap_or(Value::Null)
332    } else if is_legacy_session_row(row) {
333        row.clone()
334    } else {
335        return Err(AdapterError::schema(
336            NAME,
337            at_first,
338            "first row must be session_meta",
339        ));
340    };
341    let id = payload
342        .get("id")
343        .and_then(Value::as_str)
344        .ok_or_else(|| {
345            AdapterError::schema(NAME, at_first.clone(), "session_meta missing payload.id")
346        })?
347        .to_owned();
348    let created_at = payload
349        .get("timestamp")
350        .and_then(Value::as_str)
351        .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
352        .map(|dt| dt.with_timezone(&Utc))
353        .or_else(|| {
354            row.get("timestamp")
355                .and_then(Value::as_str)
356                .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
357                .map(|dt| dt.with_timezone(&Utc))
358        })
359        .ok_or_else(|| {
360            AdapterError::schema(NAME, at_first, "session_meta has no parseable timestamp")
361        })?;
362    let project = match extract_str(&payload, "cwd") {
363        Some(value) => value,
364        None => {
365            let path_str = path
366                .file_name()
367                .and_then(|n| n.to_str())
368                .unwrap_or(path_display.as_str())
369                .to_owned();
370            extract_self_str(&Value::String(path_str)).ok_or_else(|| {
371                AdapterError::schema(
372                    NAME,
373                    path_display.clone(),
374                    "internal: Value::String produced None from Source::as_str",
375                )
376            })?
377        }
378    };
379    let mut options = ProviderOptions::new();
380    options.insert(
381        "source".to_owned(),
382        json!({
383            "adapter": "codex-cli",
384            "originator": payload.get("originator"),
385            "cli_version": payload.get("cli_version"),
386            "model_provider": payload.get("model_provider"),
387            "git": payload.get("git"),
388            "base_instructions": payload.get("base_instructions"),
389            "instructions": payload.get("instructions"),
390            "source": payload.get("source"),
391            "raw_record": extract_raw_record(row),
392        }),
393    );
394
395    Ok(Session {
396        id,
397        parent_session_id: None,
398        parent_message_id: None,
399        source_agent: "codex-cli".to_owned(),
400        created_at,
401        project,
402        options,
403    })
404}
405
406/// Map one codex-cli JSONL record into zero-or-more `IngestEvent`s. Records pond
407/// keeps: `response_item` with `payload.type = "message"` (User/Assistant/
408/// System message + text Parts), `function_call` (Assistant + ToolCall),
409/// `function_call_output` (Tool + ToolResult), `reasoning` (Assistant +
410/// Reasoning Part). `session_meta` is consumed up front; `event_msg` and
411/// `turn_context` are transport noise. Legacy rows (spec.md#adapters) carry
412/// the same payload shapes un-enveloped; `{record_type:"state"}` markers and
413/// the bare first row are eventless.
414fn events_from_row(
415    session_id: &str,
416    line: usize,
417    row: &Value,
418    default_timestamp: DateTime<Utc>,
419    tool_call_names: &HashMap<String, Extracted<String>>,
420) -> Result<Vec<IngestEvent>, String> {
421    let kind = row.get("type").and_then(Value::as_str);
422    // Eventless rows: `session_meta` (current) and the legacy bare first row
423    // are both consumed up front by session_meta(); legacy
424    // `{record_type:"state"}` markers are transport noise (spec.md#adapters).
425    if kind == Some("session_meta")
426        || is_legacy_session_row(row)
427        || (kind.is_none() && row.get("record_type").is_some())
428    {
429        return Ok(Vec::new());
430    }
431    // Normalize to the per-turn payload. A current row wraps it in a
432    // `response_item` envelope carrying its own timestamp; a legacy data row
433    // IS the payload (spec.md#adapters) and inherits the session timestamp.
434    let (payload, timestamp) = if kind == Some("response_item") {
435        let timestamp = row
436            .get("timestamp")
437            .and_then(Value::as_str)
438            .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
439            .map(|dt| dt.with_timezone(&Utc))
440            .unwrap_or(default_timestamp);
441        (row.get("payload").unwrap_or(&Value::Null), timestamp)
442    } else {
443        (row, default_timestamp)
444    };
445    let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
446    let message_id = format!("{session_id}:{line:06}");
447
448    match payload_type {
449        "message" => message_events(session_id, &message_id, timestamp, payload, row),
450        "function_call" => Ok(tool_call_events(
451            session_id,
452            &message_id,
453            timestamp,
454            payload,
455            row,
456        )),
457        "function_call_output" => Ok(tool_result_events(
458            session_id,
459            &message_id,
460            timestamp,
461            payload,
462            row,
463            tool_call_names,
464        )),
465        "reasoning" => Ok(reasoning_events(
466            session_id,
467            &message_id,
468            timestamp,
469            payload,
470            row,
471        )),
472        "custom_tool_call" => Ok(custom_tool_call_events(
473            session_id,
474            &message_id,
475            timestamp,
476            payload,
477            row,
478        )),
479        "custom_tool_call_output" => Ok(custom_tool_result_events(
480            session_id,
481            &message_id,
482            timestamp,
483            payload,
484            row,
485        )),
486        _ => Ok(vec![raw_carrier_event(session_id, line, row, timestamp)]),
487    }
488}
489
490fn row_options(row: &Value) -> ProviderOptions {
491    let mut options = ProviderOptions::new();
492    options.insert(
493        "source".to_owned(),
494        json!({ "raw_record": extract_raw_record(row) }),
495    );
496    options
497}
498
499fn raw_carrier_event(
500    session_id: &str,
501    line: usize,
502    row: &Value,
503    timestamp: DateTime<Utc>,
504) -> IngestEvent {
505    IngestEvent::Message(Message::System {
506        id: row
507            .get("id")
508            .and_then(Value::as_str)
509            .map_or_else(|| format!("{session_id}:{line:06}:raw"), ToOwned::to_owned),
510        session_id: session_id.to_owned(),
511        timestamp: row
512            .get("timestamp")
513            .and_then(Value::as_str)
514            .and_then(|text| DateTime::parse_from_rfc3339(text).ok())
515            .map(|dt| dt.with_timezone(&Utc))
516            .unwrap_or(timestamp),
517        content: None,
518        options: row_options(row),
519    })
520}
521
522/// Stash one row's `function_call` (call_id -> name) into the per-file
523/// map so the matching `function_call_output` row downstream can resolve
524/// the tool name rather than fall back to a sentinel.
525fn capture_tool_call_name(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
526    // Mirror events_from_row's payload normalization: a current row wraps the
527    // payload under `response_item`, a legacy row IS the payload.
528    let payload = match row.get("type").and_then(Value::as_str) {
529        Some("response_item") => row.get("payload"),
530        Some(_) => Some(row),
531        None => None,
532    };
533    let Some(payload) = payload else {
534        return;
535    };
536    if payload.get("type").and_then(Value::as_str) != Some("function_call") {
537        return;
538    }
539    let Some(call_id) = payload.get("call_id").and_then(Value::as_str) else {
540        return;
541    };
542    let Some(name) = extract_str(payload, "name") else {
543        return;
544    };
545    map.insert(call_id.to_owned(), name);
546}
547
548fn message_events(
549    session_id: &str,
550    message_id: &str,
551    timestamp: DateTime<Utc>,
552    payload: &Value,
553    row: &Value,
554) -> Result<Vec<IngestEvent>, String> {
555    let role = payload
556        .get("role")
557        .and_then(Value::as_str)
558        .ok_or_else(|| "message missing role".to_owned())?;
559    let Some(content) = payload.get("content").and_then(Value::as_array) else {
560        return Ok(vec![message_raw_carrier_event(
561            session_id, message_id, row, timestamp,
562        )]);
563    };
564    // spec.md#model-part-provenance: a `developer` record is a harness instruction
565    // block; a `user`-slot record whose body is `<environment_context>` or an
566    // `# AGENTS.md instructions` blob is injected context, not a genuine
567    // prompt. Everything else in a message record is conversation.
568    let provenance = message_provenance(role, content);
569    let mut parts = Vec::with_capacity(content.len());
570    for (ordinal, item) in content.iter().enumerate() {
571        // Faithful encoding of one content item: prefer the raw `text`
572        // field when present; otherwise compact-encode the structured
573        // body as a JSON string. The fallback is lossless (preserves the
574        // item bytes) and explicit (not a synthesised "unknown" or "").
575        let text = extract_str(item, "text").or_else(|| Some(extract_compact_repr(item)));
576        parts.push(Part {
577            session_id: session_id.to_owned(),
578            id: part_id(message_id, ordinal),
579            message_id: message_id.to_owned(),
580            ordinal: part_ordinal(ordinal),
581            provenance,
582            options: empty_options(),
583            kind: PartKind::Text { text },
584        });
585    }
586
587    let (message, keep_parts) = match role {
588        "user" => (
589            Message::User {
590                id: message_id.to_owned(),
591                session_id: session_id.to_owned(),
592                timestamp,
593                options: row_options(row),
594            },
595            true,
596        ),
597        "assistant" => (
598            Message::Assistant {
599                id: message_id.to_owned(),
600                session_id: session_id.to_owned(),
601                timestamp,
602                options: row_options(row),
603            },
604            true,
605        ),
606        // `developer` rows are codex-cli's system-prompt frames; map to System
607        // with `content: None` and let the inner Text Parts carry the body.
608        "developer" | "system" => (
609            Message::System {
610                id: message_id.to_owned(),
611                session_id: session_id.to_owned(),
612                timestamp,
613                content: None,
614                options: row_options(row),
615            },
616            true,
617        ),
618        _ => {
619            return Ok(vec![message_raw_carrier_event(
620                session_id, message_id, row, timestamp,
621            )]);
622        }
623    };
624
625    let mut events = Vec::with_capacity(parts.len() + 1);
626    events.push(IngestEvent::Message(message));
627    if keep_parts {
628        events.extend(parts.into_iter().map(IngestEvent::Part));
629    }
630    Ok(events)
631}
632
633fn message_raw_carrier_event(
634    session_id: &str,
635    message_id: &str,
636    row: &Value,
637    timestamp: DateTime<Utc>,
638) -> IngestEvent {
639    IngestEvent::Message(Message::System {
640        id: message_id.to_owned(),
641        session_id: session_id.to_owned(),
642        timestamp,
643        content: row
644            .get("payload")
645            .and_then(|payload| payload.get("role"))
646            .or_else(|| row.get("role"))
647            .and_then(Value::as_str)
648            .and_then(|role| extract_self_str(&Value::String(role.to_owned()))),
649        options: row_options(row),
650    })
651}
652
653/// Provenance of a codex `message` record (spec.md#model-part-provenance). A
654/// `developer` record is a harness instruction block; a `user`-slot record
655/// whose only content is `<environment_context>` or `# AGENTS.md instructions`
656/// is injected context rather than a typed prompt. v1 codex never interleaves
657/// authored and injected content within one record.
658fn message_provenance(role: &str, content: &[Value]) -> Provenance {
659    if role == "developer" || role == "system" {
660        return Provenance::Injected;
661    }
662    if role == "user" {
663        let injected = content.iter().any(|item| {
664            item.get("text")
665                .and_then(Value::as_str)
666                .is_some_and(is_injected_user_text)
667        });
668        if injected {
669            return Provenance::Injected;
670        }
671    }
672    Provenance::Conversational
673}
674
675/// Harness-injected user-slot content codex emits as a non-prompt record.
676fn is_injected_user_text(text: &str) -> bool {
677    let trimmed = text.trim_start();
678    trimmed.starts_with("<environment_context>")
679        || trimmed.starts_with("<user_instructions>")
680        || trimmed.starts_with("# AGENTS.md")
681}
682
683fn tool_call_events(
684    session_id: &str,
685    message_id: &str,
686    timestamp: DateTime<Utc>,
687    payload: &Value,
688    row: &Value,
689) -> Vec<IngestEvent> {
690    let call_id = extract_str(payload, "call_id");
691    let name = extract_str(payload, "name");
692    let params = match payload.get("arguments") {
693        Some(Value::String(text)) => {
694            serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
695        }
696        Some(other) => other.clone(),
697        None => Value::Null,
698    };
699    let part = Part {
700        session_id: session_id.to_owned(),
701        id: part_id(message_id, 0),
702        message_id: message_id.to_owned(),
703        ordinal: 0,
704        // spec.md#model-part-provenance: the model authored the tool call.
705        provenance: Provenance::Conversational,
706        options: empty_options(),
707        kind: PartKind::ToolCall {
708            call_id,
709            name,
710            params,
711            provider_executed: false,
712        },
713    };
714    vec![
715        IngestEvent::Message(Message::Assistant {
716            id: message_id.to_owned(),
717            session_id: session_id.to_owned(),
718            timestamp,
719            options: row_options(row),
720        }),
721        IngestEvent::Part(part),
722    ]
723}
724
725fn custom_tool_call_events(
726    session_id: &str,
727    message_id: &str,
728    timestamp: DateTime<Utc>,
729    payload: &Value,
730    row: &Value,
731) -> Vec<IngestEvent> {
732    let part = Part {
733        session_id: session_id.to_owned(),
734        id: part_id(message_id, 0),
735        message_id: message_id.to_owned(),
736        ordinal: 0,
737        // spec.md#model-part-provenance: the model authored the tool call.
738        provenance: Provenance::Conversational,
739        options: empty_options(),
740        kind: PartKind::ToolCall {
741            call_id: extract_str(payload, "call_id"),
742            name: extract_str(payload, "name"),
743            params: payload.get("input").cloned().unwrap_or(Value::Null),
744            provider_executed: true,
745        },
746    };
747    vec![
748        IngestEvent::Message(Message::Assistant {
749            id: message_id.to_owned(),
750            session_id: session_id.to_owned(),
751            timestamp,
752            options: row_options(row),
753        }),
754        IngestEvent::Part(part),
755    ]
756}
757
758fn custom_tool_result_events(
759    session_id: &str,
760    message_id: &str,
761    timestamp: DateTime<Utc>,
762    payload: &Value,
763    row: &Value,
764) -> Vec<IngestEvent> {
765    let part = Part {
766        session_id: session_id.to_owned(),
767        id: part_id(message_id, 0),
768        message_id: message_id.to_owned(),
769        ordinal: 0,
770        // spec.md#model-part-provenance: tool output is runtime-produced.
771        provenance: Provenance::Injected,
772        options: empty_options(),
773        kind: PartKind::ToolResult {
774            call_id: extract_str(payload, "call_id"),
775            name: extract_str(payload, "name"),
776            is_failure: false,
777            result: payload.get("output").cloned().unwrap_or(Value::Null),
778        },
779    };
780    vec![
781        IngestEvent::Message(Message::Tool {
782            id: message_id.to_owned(),
783            session_id: session_id.to_owned(),
784            timestamp,
785            options: row_options(row),
786        }),
787        IngestEvent::Part(part),
788    ]
789}
790
791fn tool_result_events(
792    session_id: &str,
793    message_id: &str,
794    timestamp: DateTime<Utc>,
795    payload: &Value,
796    row: &Value,
797    tool_call_names: &HashMap<String, Extracted<String>>,
798) -> Vec<IngestEvent> {
799    let call_id = extract_str(payload, "call_id");
800    // Resolve tool name from the earlier `function_call` row via the
801    // per-file `call_id -> name` map. Misses (e.g. compaction pruned the
802    // originating call) yield `None`, a faithful "unresolved" value.
803    let name = call_id
804        .as_ref()
805        .and_then(|id| tool_call_names.get(id.as_str()))
806        .cloned();
807    let result = payload.get("output").cloned().unwrap_or(Value::Null);
808    let part = Part {
809        session_id: session_id.to_owned(),
810        id: part_id(message_id, 0),
811        message_id: message_id.to_owned(),
812        ordinal: 0,
813        // spec.md#model-part-provenance: tool output is runtime-produced.
814        provenance: Provenance::Injected,
815        options: empty_options(),
816        kind: PartKind::ToolResult {
817            call_id,
818            name,
819            is_failure: false,
820            result,
821        },
822    };
823    vec![
824        IngestEvent::Message(Message::Tool {
825            id: message_id.to_owned(),
826            session_id: session_id.to_owned(),
827            timestamp,
828            options: row_options(row),
829        }),
830        IngestEvent::Part(part),
831    ]
832}
833
834fn reasoning_events(
835    session_id: &str,
836    message_id: &str,
837    timestamp: DateTime<Utc>,
838    payload: &Value,
839    row: &Value,
840) -> Vec<IngestEvent> {
841    // The source `summary` array is the only place reasoning text lives in
842    // codex-cli's format. Empty array (or missing field) -> `None`. Joined
843    // text -> `Some(...)`. Don't synthesize an empty string.
844    let summary = payload
845        .get("summary")
846        .and_then(Value::as_array)
847        .and_then(|items| {
848            let joined = items
849                .iter()
850                .filter_map(|item| extract_str(item, "text"))
851                .map(|e| (*e).clone())
852                .collect::<Vec<_>>()
853                .join("\n");
854            if joined.is_empty() {
855                None
856            } else {
857                Some(extract_compact_repr(payload))
858            }
859        });
860    let part = Part {
861        session_id: session_id.to_owned(),
862        id: part_id(message_id, 0),
863        message_id: message_id.to_owned(),
864        ordinal: 0,
865        // spec.md#model-part-provenance: model-authored reasoning.
866        provenance: Provenance::Conversational,
867        options: empty_options(),
868        kind: PartKind::Reasoning { text: summary },
869    };
870    vec![
871        IngestEvent::Message(Message::Assistant {
872            id: message_id.to_owned(),
873            session_id: session_id.to_owned(),
874            timestamp,
875            options: row_options(row),
876        }),
877        IngestEvent::Part(part),
878    ]
879}
880
881#[cfg(test)]
882mod tests {
883    //! End-to-end test for the codex-cli adapter: ingest the committed fixture
884    //! corpus and assert pond's canonical Session/Message/Part shape comes out
885    //! the other side. The fixture lives under
886    //! `tests/fixtures/adapter/codex_cli/`.
887    #![allow(clippy::expect_used, clippy::unwrap_used)]
888
889    use super::*;
890    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
891    use tempfile::TempDir;
892
893    // Manifest-dir anchored: unit tests must not depend on the process cwd
894    // (figment::Jail chdirs the whole test process while config tests run).
895    const FIXTURES: &str = concat!(
896        env!("CARGO_MANIFEST_DIR"),
897        "/tests/fixtures/adapter/codex_cli/sessions"
898    );
899
900    #[test]
901    fn probe_default_finds_codex_sessions_under_home() -> anyhow::Result<()> {
902        crate::adapter::test_support::assert_probe_default(
903            &CodexCliFactory,
904            &[".codex", "sessions"],
905        )
906    }
907
908    /// `peek_last_ts` is the freshness watermark for multi-GB rollouts where the
909    /// line-numbered message id is not tail-recoverable. It must read the last
910    /// `response_item`'s envelope timestamp - pond's stored max - and ignore the
911    /// trailing `event_msg` noise (whose stored timestamp is the session default),
912    /// scanning only the file tail.
913    #[test]
914    fn peek_last_ts_targets_last_response_item_ignoring_trailing_event_msg() {
915        let dir = TempDir::new().unwrap();
916        let path = dir.path().join("rollout.jsonl");
917        let lines = [
918            r#"{"type":"session_meta","timestamp":"2026-03-20T03:00:00.000Z","payload":{"id":"sess-x","timestamp":"2026-03-20T03:00:00.000Z"}}"#,
919            r#"{"type":"response_item","timestamp":"2026-03-20T03:10:00.000Z","payload":{"type":"message","role":"user","content":[{"type":"input_text","text":"hi"}]}}"#,
920            r#"{"type":"response_item","timestamp":"2026-03-20T03:20:30.500Z","payload":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"yo"}]}}"#,
921            // Trailing token-count noise, later wall-clock but stored at the
922            // session default - must NOT be picked as the watermark.
923            r#"{"type":"event_msg","timestamp":"2026-03-20T03:59:59.000Z","payload":{"type":"token_count","info":{}}}"#,
924        ];
925        std::fs::write(&path, lines.join("\n") + "\n").unwrap();
926
927        let adapter = CodexCliAdapter::new(dir.path());
928        let expected = DateTime::parse_from_rfc3339("2026-03-20T03:20:30.500Z")
929            .unwrap()
930            .with_timezone(&Utc)
931            .timestamp_micros();
932        assert_eq!(adapter.peek_last_ts(&path), Some(expected));
933    }
934
935    #[tokio::test(flavor = "multi_thread")]
936    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
937        let adapter = CodexCliAdapter::new(FIXTURES);
938        crate::adapter::test_support::assert_native_restore(
939            &CodexCliFactory,
940            &adapter,
941            // Codex rollout paths embed the `sessions/` segment, so the corpus
942            // root is FIXTURES' parent, not FIXTURES itself.
943            std::path::Path::new(FIXTURES)
944                .parent()
945                .expect("FIXTURES is nested under a corpus root"),
946        )
947        .await
948    }
949
950    #[tokio::test(flavor = "multi_thread")]
951    async fn codex_cli_adapter_ingests_fixture_corpus_into_canonical_shape() -> anyhow::Result<()> {
952        let temp = TempDir::new()?;
953        let store = Store::open_local(temp.path()).await?;
954        let adapter = CodexCliAdapter::new(FIXTURES);
955
956        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
957        assert!(summary.accepted() > 0, "ingest must accept rows");
958        assert_eq!(summary.dropped_events, 0, "no per-event drops expected");
959        assert_eq!(
960            summary.dropped_sessions, 0,
961            "no session-level rejections expected"
962        );
963        assert_eq!(summary.skipped_files, 0, "no whole-file skips expected");
964
965        let (sessions, messages, parts) = store.row_counts().await?;
966        assert!(sessions > 0, "at least one codex-cli session");
967        assert!(messages > 0, "at least one codex-cli message");
968        assert!(parts > 0, "at least one codex-cli Part");
969
970        let mut saw_text_part = false;
971        for session_id in store.session_ids().await? {
972            let session = store
973                .get_session(&session_id)
974                .await?
975                .expect("session round-trips");
976            assert_eq!(session.session.source_agent, "codex-cli");
977            assert!(
978                !session.messages.is_empty(),
979                "session {session_id} must carry messages",
980            );
981            for stored in &session.messages {
982                for part in &stored.parts {
983                    if matches!(part.kind, PartKind::Text { .. }) {
984                        saw_text_part = true;
985                    }
986                }
987            }
988        }
989        assert!(
990            saw_text_part,
991            "codex-cli corpus must contain at least one Text Part",
992        );
993        Ok(())
994    }
995
996    /// spec.md#model-part-provenance: a `developer` record and a `user`-slot record
997    /// whose body is `<environment_context>` are harness-injected; a genuine
998    /// user prompt and an assistant message are conversation.
999    #[test]
1000    fn message_provenance_separates_prompts_from_harness_records() {
1001        let prompt = vec![json!({"type": "input_text", "text": "refactor this"})];
1002        assert_eq!(
1003            message_provenance("user", &prompt),
1004            Provenance::Conversational,
1005        );
1006        assert_eq!(
1007            message_provenance("assistant", &[]),
1008            Provenance::Conversational,
1009        );
1010
1011        let developer = vec![json!({"type": "input_text", "text": "you are an agent"})];
1012        assert_eq!(
1013            message_provenance("developer", &developer),
1014            Provenance::Injected,
1015        );
1016
1017        let env = vec![json!({
1018            "type": "input_text",
1019            "text": "<environment_context>cwd=/tmp</environment_context>",
1020        })];
1021        assert_eq!(message_provenance("user", &env), Provenance::Injected);
1022    }
1023
1024    #[test]
1025    fn legacy_rows_normalize_to_payloads() {
1026        let ts = Utc::now();
1027        let map: HashMap<String, Extracted<String>> = HashMap::new();
1028
1029        // The bare first row and `{record_type:"state"}` markers are eventless.
1030        let first = json!({"id": "s1", "timestamp": "2025-09-13T04:30:17.447Z"});
1031        let state = json!({"record_type": "state"});
1032        assert!(
1033            events_from_row("s1", 1, &first, ts, &map)
1034                .expect("legacy first row parses")
1035                .is_empty(),
1036        );
1037        assert!(
1038            events_from_row("s1", 2, &state, ts, &map)
1039                .expect("state marker parses")
1040                .is_empty(),
1041        );
1042
1043        // An un-enveloped legacy `message` row yields a Message + Text Part.
1044        let message = json!({
1045            "type": "message",
1046            "role": "user",
1047            "content": [{"type": "input_text", "text": "hi"}],
1048        });
1049        let events = events_from_row("s1", 3, &message, ts, &map).expect("legacy message parses");
1050        assert_eq!(events.len(), 2, "message + one Text Part");
1051        assert!(matches!(
1052            events[0],
1053            IngestEvent::Message(Message::User { .. })
1054        ));
1055        assert!(matches!(
1056            &events[1],
1057            IngestEvent::Part(part) if matches!(part.kind, PartKind::Text { .. }),
1058        ));
1059    }
1060
1061    #[test]
1062    fn unknown_message_role_becomes_lossless_carrier() {
1063        let ts = Utc::now();
1064        let map: HashMap<String, Extracted<String>> = HashMap::new();
1065        let row = json!({
1066            "type": "response_item",
1067            "timestamp": "2026-06-01T00:00:00Z",
1068            "payload": {
1069                "type": "message",
1070                "role": "future_role",
1071                "content": [{"type": "input_text", "text": "keep me"}],
1072            },
1073        });
1074
1075        let events = events_from_row("s1", 4, &row, ts, &map).expect("carrier is valid");
1076        assert_eq!(events.len(), 1);
1077        assert!(matches!(
1078            &events[0],
1079            IngestEvent::Message(Message::System { id, content, .. })
1080                if id == "s1:000004" && content.as_deref().map(String::as_str) == Some("future_role")
1081        ));
1082    }
1083
1084    #[tokio::test(flavor = "multi_thread")]
1085    async fn legacy_rollout_ingests_into_canonical_shape() -> anyhow::Result<()> {
1086        let temp = TempDir::new()?;
1087        let store = Store::open_local(temp.path()).await?;
1088        let adapter = CodexCliAdapter::new(FIXTURES);
1089        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1090
1091        // The legacy fixture's bare first row -> Session: id and timestamp
1092        // read from the top level, with no `session_meta` envelope.
1093        let session = store
1094            .get_session("67c52f3f-d25e-4194-a006-93de58f28d7c")
1095            .await?
1096            .expect("legacy rollout ingests as a session");
1097        assert_eq!(session.session.source_agent, "codex-cli");
1098        assert_eq!(
1099            session
1100                .session
1101                .created_at
1102                .to_rfc3339_opts(SecondsFormat::Millis, true),
1103            "2025-09-13T04:30:17.447Z",
1104        );
1105        // Eleven un-enveloped data rows -> eleven messages.
1106        assert_eq!(session.messages.len(), 11, "every legacy data row ingests");
1107        // The legacy `function_call_output` resolves its tool name from the
1108        // prior legacy `function_call` row via the per-file call_id map.
1109        let resolved = session.messages.iter().any(|message| {
1110            message
1111                .parts
1112                .iter()
1113                .any(|part| matches!(&part.kind, PartKind::ToolResult { name: Some(_), .. }))
1114        });
1115        assert!(
1116            resolved,
1117            "legacy function_call_output resolves its tool name"
1118        );
1119        Ok(())
1120    }
1121}