Skip to main content

xurl_core/provider/
codex.rs

1use std::cmp::Reverse;
2use std::collections::BTreeMap;
3use std::fs;
4use std::io::{BufReader, Read};
5use std::path::{Path, PathBuf};
6use std::process::{Command, Stdio};
7use std::time::SystemTime;
8
9use rusqlite::{Connection, OpenFlags, OptionalExtension};
10use serde_json::Value;
11use toml::Table as TomlTable;
12use toml::Value as TomlValue;
13use walkdir::WalkDir;
14
15use crate::error::{Result, XurlError};
16use crate::jsonl;
17use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
18use crate::provider::{Provider, WriteEventSink, append_passthrough_args};
19
20#[derive(Debug, Clone)]
21pub struct CodexProvider {
22    root: PathBuf,
23}
24
25#[derive(Debug, Clone)]
26struct SqliteThreadRecord {
27    rollout_path: PathBuf,
28    archived: bool,
29}
30
31impl CodexProvider {
32    pub fn new(root: impl Into<PathBuf>) -> Self {
33        Self { root: root.into() }
34    }
35
36    fn sessions_root(&self) -> PathBuf {
37        self.root.join("sessions")
38    }
39
40    fn archived_root(&self) -> PathBuf {
41        self.root.join("archived_sessions")
42    }
43
44    fn state_db_paths(&self) -> Vec<PathBuf> {
45        let mut paths = if let Ok(entries) = fs::read_dir(&self.root) {
46            entries
47                .filter_map(std::result::Result::ok)
48                .filter_map(|entry| {
49                    let path = entry.path();
50                    let name = path.file_name()?.to_str()?;
51                    let is_state_db = name == "state.sqlite"
52                        || (name.starts_with("state_") && name.ends_with(".sqlite"));
53                    if is_state_db && path.is_file() {
54                        Some(path)
55                    } else {
56                        None
57                    }
58                })
59                .collect::<Vec<_>>()
60        } else {
61            Vec::new()
62        };
63
64        paths.sort_by_key(|path| {
65            let version = path
66                .file_name()
67                .and_then(|name| name.to_str())
68                .and_then(|name| {
69                    name.strip_prefix("state_")
70                        .and_then(|name| name.strip_suffix(".sqlite"))
71                })
72                .and_then(|raw| raw.parse::<u32>().ok())
73                .unwrap_or(0);
74            let modified = fs::metadata(path)
75                .and_then(|meta| meta.modified())
76                .unwrap_or(SystemTime::UNIX_EPOCH);
77            (Reverse(version), Reverse(modified))
78        });
79
80        paths
81    }
82
83    fn query_thread_record(
84        db_path: &Path,
85        session_id: &str,
86    ) -> std::result::Result<Option<SqliteThreadRecord>, rusqlite::Error> {
87        let conn = Connection::open_with_flags(db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
88        let mut stmt =
89            conn.prepare("SELECT rollout_path, archived FROM threads WHERE id = ?1 LIMIT 1")?;
90        let row = stmt
91            .query_row([session_id], |row| {
92                Ok(SqliteThreadRecord {
93                    rollout_path: PathBuf::from(row.get::<_, String>(0)?),
94                    archived: row.get::<_, i64>(1)? != 0,
95                })
96            })
97            .optional()?;
98        Ok(row)
99    }
100
101    fn lookup_thread_from_state_db(
102        state_dbs: &[PathBuf],
103        session_id: &str,
104        warnings: &mut Vec<String>,
105    ) -> Option<SqliteThreadRecord> {
106        for db_path in state_dbs {
107            match Self::query_thread_record(db_path, session_id) {
108                Ok(Some(record)) => return Some(record),
109                Ok(None) => continue,
110                Err(err) => warnings.push(format!(
111                    "failed reading sqlite thread index {}: {err}",
112                    db_path.display()
113                )),
114            }
115        }
116
117        None
118    }
119
120    fn find_candidates(root: &Path, session_id: &str) -> Vec<PathBuf> {
121        let needle = format!("{session_id}.jsonl");
122        if !root.exists() {
123            return Vec::new();
124        }
125
126        WalkDir::new(root)
127            .into_iter()
128            .filter_map(std::result::Result::ok)
129            .filter(|entry| entry.file_type().is_file())
130            .map(|entry| entry.into_path())
131            .filter(|path| {
132                path.file_name()
133                    .and_then(|name| name.to_str())
134                    .is_some_and(|name| name.starts_with("rollout-") && name.ends_with(&needle))
135            })
136            .collect()
137    }
138
139    fn choose_latest(paths: Vec<PathBuf>) -> Option<(PathBuf, usize)> {
140        if paths.is_empty() {
141            return None;
142        }
143
144        let mut scored = paths
145            .into_iter()
146            .map(|path| {
147                let modified = fs::metadata(&path)
148                    .and_then(|meta| meta.modified())
149                    .unwrap_or(SystemTime::UNIX_EPOCH);
150                (path, modified)
151            })
152            .collect::<Vec<_>>();
153
154        scored.sort_by_key(|(_, modified)| Reverse(*modified));
155        let count = scored.len();
156        scored.into_iter().next().map(|(path, _)| (path, count))
157    }
158
159    fn codex_bin() -> String {
160        std::env::var("XURL_CODEX_BIN").unwrap_or_else(|_| "codex".to_string())
161    }
162
163    fn config_path(&self) -> PathBuf {
164        self.root.join("config.toml")
165    }
166
167    fn load_role_overrides(&self, role: &str) -> Result<Vec<(String, String)>> {
168        let config_path = self.config_path();
169        let raw = fs::read_to_string(&config_path).map_err(|source| XurlError::Io {
170            path: config_path.clone(),
171            source,
172        })?;
173        let config = toml::from_str::<TomlTable>(&raw).map_err(|err| {
174            XurlError::InvalidMode(format!(
175                "failed parsing codex config {}: {err}",
176                config_path.display()
177            ))
178        })?;
179        let role_config = config
180            .get("agents")
181            .and_then(TomlValue::as_table)
182            .and_then(|agents| agents.get(role))
183            .and_then(TomlValue::as_table)
184            .ok_or_else(|| {
185                XurlError::InvalidMode(format!(
186                    "codex role `{role}` is not defined in {}",
187                    config_path.display()
188                ))
189            })?;
190
191        let mut merged = BTreeMap::<String, String>::new();
192        if let Some(config_file) = role_config.get("config_file").and_then(TomlValue::as_str) {
193            let config_file_path = if Path::new(config_file).is_absolute() {
194                PathBuf::from(config_file)
195            } else {
196                self.root.join(config_file)
197            };
198            let raw = fs::read_to_string(&config_file_path).map_err(|source| XurlError::Io {
199                path: config_file_path.clone(),
200                source,
201            })?;
202            let config = toml::from_str::<TomlTable>(&raw).map_err(|err| {
203                XurlError::InvalidMode(format!(
204                    "failed parsing codex role config {}: {err}",
205                    config_file_path.display()
206                ))
207            })?;
208            for (key, value) in config {
209                Self::flatten_codex_config(&key, &value, &mut merged);
210            }
211        }
212
213        for (key, value) in role_config {
214            if key == "description" || key == "config_file" {
215                continue;
216            }
217            Self::flatten_codex_config(key, value, &mut merged);
218        }
219
220        if merged.is_empty() {
221            return Err(XurlError::InvalidMode(format!(
222                "codex role `{role}` does not define writable config overrides"
223            )));
224        }
225
226        Ok(merged.into_iter().collect())
227    }
228
229    fn flatten_codex_config(
230        prefix: &str,
231        value: &TomlValue,
232        output: &mut BTreeMap<String, String>,
233    ) {
234        if let TomlValue::Table(table) = value {
235            for (key, child) in table {
236                let next_prefix = format!("{prefix}.{key}");
237                Self::flatten_codex_config(&next_prefix, child, output);
238            }
239            return;
240        }
241
242        output.insert(prefix.to_string(), Self::encode_codex_config_value(value));
243    }
244
245    fn encode_codex_config_value(value: &TomlValue) -> String {
246        match value {
247            TomlValue::String(text) => text.clone(),
248            TomlValue::Integer(number) => number.to_string(),
249            TomlValue::Float(number) => number.to_string(),
250            TomlValue::Boolean(flag) => flag.to_string(),
251            TomlValue::Datetime(datetime) => datetime.to_string(),
252            TomlValue::Array(_) | TomlValue::Table(_) => {
253                serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
254            }
255        }
256    }
257
258    fn spawn_codex_command(args: &[String]) -> Result<std::process::Child> {
259        let bin = Self::codex_bin();
260        let mut command = Command::new(&bin);
261        command
262            .args(args)
263            .stdin(Stdio::null())
264            .stdout(Stdio::piped())
265            .stderr(Stdio::piped());
266        command.spawn().map_err(|source| {
267            if source.kind() == std::io::ErrorKind::NotFound {
268                XurlError::CommandNotFound { command: bin }
269            } else {
270                XurlError::Io {
271                    path: PathBuf::from(bin),
272                    source,
273                }
274            }
275        })
276    }
277
278    fn run_write(
279        &self,
280        args: &[String],
281        req: &WriteRequest,
282        sink: &mut dyn WriteEventSink,
283        warnings: Vec<String>,
284    ) -> Result<WriteResult> {
285        let mut child = Self::spawn_codex_command(args)?;
286        let stdout = child.stdout.take().ok_or_else(|| {
287            XurlError::WriteProtocol("codex stdout pipe is unavailable".to_string())
288        })?;
289        let stderr = child.stderr.take().ok_or_else(|| {
290            XurlError::WriteProtocol("codex stderr pipe is unavailable".to_string())
291        })?;
292        let stderr_handle = std::thread::spawn(move || {
293            let mut reader = BufReader::new(stderr);
294            let mut content = String::new();
295            let _ = reader.read_to_string(&mut content);
296            content
297        });
298
299        let mut session_id = req.session_id.clone();
300        let mut final_text = None::<String>;
301        let stream_path = Path::new("<codex:stdout>");
302        let reader = BufReader::new(stdout);
303        jsonl::parse_jsonl_reader(stream_path, reader, |_, value| {
304            let Some(event_type) = value.get("type").and_then(Value::as_str) else {
305                return Ok(());
306            };
307
308            if event_type == "thread.started" {
309                if let Some(thread_id) = value.get("thread_id").and_then(Value::as_str) {
310                    sink.on_session_ready(ProviderKind::Codex, thread_id)?;
311                    session_id = Some(thread_id.to_string());
312                }
313                return Ok(());
314            }
315
316            if event_type != "item.completed" {
317                return Ok(());
318            }
319
320            let Some(item) = value.get("item") else {
321                return Ok(());
322            };
323            if item.get("type").and_then(Value::as_str) != Some("agent_message") {
324                return Ok(());
325            }
326
327            if let Some(text) = item.get("text").and_then(Value::as_str) {
328                sink.on_text_delta(text)?;
329                final_text = Some(text.to_string());
330            }
331            Ok(())
332        })?;
333
334        let status = child.wait().map_err(|source| XurlError::Io {
335            path: PathBuf::from(Self::codex_bin()),
336            source,
337        })?;
338        let stderr_content = stderr_handle.join().unwrap_or_default();
339
340        if !status.success() {
341            return Err(XurlError::CommandFailed {
342                command: format!("{} {}", Self::codex_bin(), args.join(" ")),
343                code: status.code(),
344                stderr: stderr_content.trim().to_string(),
345            });
346        }
347
348        let session_id = if let Some(session_id) = session_id {
349            session_id
350        } else {
351            return Err(XurlError::WriteProtocol(
352                "missing thread id in codex event stream".to_string(),
353            ));
354        };
355
356        Ok(WriteResult {
357            provider: ProviderKind::Codex,
358            session_id,
359            final_text,
360            warnings,
361        })
362    }
363}
364
365impl Provider for CodexProvider {
366    fn kind(&self) -> ProviderKind {
367        ProviderKind::Codex
368    }
369
370    fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
371        let sessions = self.sessions_root();
372        let archived = self.archived_root();
373        let state_dbs = self.state_db_paths();
374        let mut warnings = Vec::new();
375        let sqlite_record =
376            Self::lookup_thread_from_state_db(&state_dbs, session_id, &mut warnings);
377
378        if let Some(record) = sqlite_record.as_ref().filter(|record| !record.archived) {
379            if record.rollout_path.exists() {
380                return Ok(ResolvedThread {
381                    provider: ProviderKind::Codex,
382                    session_id: session_id.to_string(),
383                    path: record.rollout_path.clone(),
384                    metadata: ResolutionMeta {
385                        source: "codex:sqlite:sessions".to_string(),
386                        candidate_count: 1,
387                        warnings,
388                    },
389                });
390            }
391
392            warnings.push(format!(
393                "sqlite thread index points to a missing rollout for session_id={session_id}: {}",
394                record.rollout_path.display()
395            ));
396        }
397
398        let active_candidates = Self::find_candidates(&sessions, session_id);
399        if let Some((selected, count)) = Self::choose_latest(active_candidates) {
400            if count > 1 {
401                warnings.push(format!(
402                    "multiple matches found ({count}) for session_id={session_id}; selected latest: {}",
403                    selected.display()
404                ));
405            }
406
407            let meta = ResolutionMeta {
408                source: "codex:sessions".to_string(),
409                candidate_count: count,
410                warnings,
411            };
412
413            return Ok(ResolvedThread {
414                provider: ProviderKind::Codex,
415                session_id: session_id.to_string(),
416                path: selected,
417                metadata: meta,
418            });
419        }
420
421        if let Some(record) = sqlite_record.as_ref().filter(|record| record.archived) {
422            if record.rollout_path.exists() {
423                return Ok(ResolvedThread {
424                    provider: ProviderKind::Codex,
425                    session_id: session_id.to_string(),
426                    path: record.rollout_path.clone(),
427                    metadata: ResolutionMeta {
428                        source: "codex:sqlite:archived_sessions".to_string(),
429                        candidate_count: 1,
430                        warnings,
431                    },
432                });
433            }
434
435            warnings.push(format!(
436                "sqlite thread index points to a missing archived rollout for session_id={session_id}: {}",
437                record.rollout_path.display()
438            ));
439        }
440
441        let archived_candidates = Self::find_candidates(&archived, session_id);
442        if let Some((selected, count)) = Self::choose_latest(archived_candidates) {
443            if count > 1 {
444                warnings.push(format!(
445                    "multiple archived matches found ({count}) for session_id={session_id}; selected latest: {}",
446                    selected.display()
447                ));
448            }
449
450            let meta = ResolutionMeta {
451                source: "codex:archived_sessions".to_string(),
452                candidate_count: count,
453                warnings,
454            };
455
456            return Ok(ResolvedThread {
457                provider: ProviderKind::Codex,
458                session_id: session_id.to_string(),
459                path: selected,
460                metadata: meta,
461            });
462        }
463
464        Err(XurlError::ThreadNotFound {
465            provider: ProviderKind::Codex.to_string(),
466            session_id: session_id.to_string(),
467            searched_roots: vec![sessions, archived]
468                .into_iter()
469                .chain(state_dbs)
470                .collect(),
471        })
472    }
473
474    fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
475        let warnings = Vec::new();
476        let role_overrides = if let Some(role) = req.options.role.as_deref() {
477            self.load_role_overrides(role)?
478        } else {
479            Vec::new()
480        };
481        let mut args = Vec::new();
482        args.push("exec".to_string());
483
484        if let Some(session_id) = req.session_id.as_deref() {
485            args.push("resume".to_string());
486            args.push("--json".to_string());
487            append_passthrough_args(&mut args, &req.options.params);
488            for (key, value) in &role_overrides {
489                args.push("--config".to_string());
490                args.push(format!("{key}={value}"));
491            }
492            args.push(session_id.to_string());
493            args.push(req.prompt.clone());
494            self.run_write(&args, req, sink, warnings)
495        } else {
496            args.push("--json".to_string());
497            append_passthrough_args(&mut args, &req.options.params);
498            for (key, value) in &role_overrides {
499                args.push("--config".to_string());
500                args.push(format!("{key}={value}"));
501            }
502            args.push(req.prompt.clone());
503            self.run_write(&args, req, sink, warnings)
504        }
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use std::fs;
511    use std::path::Path;
512
513    use rusqlite::Connection;
514    use tempfile::tempdir;
515
516    use crate::provider::Provider;
517    use crate::provider::codex::CodexProvider;
518
519    fn prepare_state_db(path: &Path) -> Connection {
520        let conn = Connection::open(path).expect("open sqlite");
521        conn.execute_batch(
522            "
523            CREATE TABLE threads (
524                id TEXT PRIMARY KEY,
525                rollout_path TEXT NOT NULL,
526                archived INTEGER NOT NULL DEFAULT 0
527            );
528            ",
529        )
530        .expect("create schema");
531        conn
532    }
533
534    #[test]
535    fn resolves_from_sessions() {
536        let temp = tempdir().expect("tempdir");
537        let path = temp
538            .path()
539            .join("sessions/2026/02/23/rollout-2026-02-23T04-48-50-019c871c-b1f9-7f60-9c4f-87ed09f13592.jsonl");
540        fs::create_dir_all(path.parent().expect("parent")).expect("mkdir");
541        fs::write(&path, "{}\n").expect("write");
542
543        let provider = CodexProvider::new(temp.path());
544        let resolved = provider
545            .resolve("019c871c-b1f9-7f60-9c4f-87ed09f13592")
546            .expect("resolve should succeed");
547        assert_eq!(resolved.path, path);
548    }
549
550    #[test]
551    fn resolves_from_archived_when_not_in_sessions() {
552        let temp = tempdir().expect("tempdir");
553        let path = temp
554            .path()
555            .join("archived_sessions/rollout-2026-02-22T01-05-36-019c8129-f668-7951-8d56-cc5513541c26.jsonl");
556        fs::create_dir_all(path.parent().expect("parent")).expect("mkdir");
557        fs::write(&path, "{}\n").expect("write");
558
559        let provider = CodexProvider::new(temp.path());
560        let resolved = provider
561            .resolve("019c8129-f668-7951-8d56-cc5513541c26")
562            .expect("resolve should succeed");
563        assert_eq!(resolved.path, path);
564        assert_eq!(resolved.metadata.source, "codex:archived_sessions");
565    }
566
567    #[test]
568    fn returns_not_found_when_missing() {
569        let temp = tempdir().expect("tempdir");
570        let provider = CodexProvider::new(temp.path());
571        let err = provider
572            .resolve("019c8129-f668-7951-8d56-cc5513541c26")
573            .expect_err("should fail");
574        assert!(format!("{err}").contains("thread not found"));
575    }
576
577    #[test]
578    fn resolves_from_sqlite_state_index() {
579        let temp = tempdir().expect("tempdir");
580        let state_db = temp.path().join("state_5.sqlite");
581        let conn = prepare_state_db(&state_db);
582
583        let session_id = "019c871c-b1f9-7f60-9c4f-87ed09f13592";
584        let rollout = temp.path().join("sessions/custom/path/thread.jsonl");
585        fs::create_dir_all(rollout.parent().expect("parent")).expect("mkdir");
586        fs::write(&rollout, "{}\n").expect("write");
587
588        conn.execute(
589            "INSERT INTO threads (id, rollout_path, archived) VALUES (?1, ?2, 0)",
590            (&session_id, rollout.display().to_string()),
591        )
592        .expect("insert thread");
593
594        let provider = CodexProvider::new(temp.path());
595        let resolved = provider
596            .resolve(session_id)
597            .expect("resolve should succeed");
598        assert_eq!(resolved.path, rollout);
599        assert_eq!(resolved.metadata.source, "codex:sqlite:sessions");
600    }
601
602    #[test]
603    fn resolves_archived_from_sqlite_state_index() {
604        let temp = tempdir().expect("tempdir");
605        let state_db = temp.path().join("state.sqlite");
606        let conn = prepare_state_db(&state_db);
607
608        let session_id = "019c8129-f668-7951-8d56-cc5513541c26";
609        let rollout = temp
610            .path()
611            .join("archived_sessions/custom/path/thread.jsonl");
612        fs::create_dir_all(rollout.parent().expect("parent")).expect("mkdir");
613        fs::write(&rollout, "{}\n").expect("write");
614
615        conn.execute(
616            "INSERT INTO threads (id, rollout_path, archived) VALUES (?1, ?2, 1)",
617            (&session_id, rollout.display().to_string()),
618        )
619        .expect("insert thread");
620
621        let provider = CodexProvider::new(temp.path());
622        let resolved = provider
623            .resolve(session_id)
624            .expect("resolve should succeed");
625        assert_eq!(resolved.path, rollout);
626        assert_eq!(resolved.metadata.source, "codex:sqlite:archived_sessions");
627    }
628
629    #[test]
630    fn falls_back_to_filesystem_when_sqlite_rollout_missing() {
631        let temp = tempdir().expect("tempdir");
632        let state_db = temp.path().join("state_5.sqlite");
633        let conn = prepare_state_db(&state_db);
634
635        let session_id = "019c871c-b1f9-7f60-9c4f-87ed09f13592";
636        let stale_rollout = temp.path().join("sessions/stale/path/thread.jsonl");
637        conn.execute(
638            "INSERT INTO threads (id, rollout_path, archived) VALUES (?1, ?2, 0)",
639            (&session_id, stale_rollout.display().to_string()),
640        )
641        .expect("insert thread");
642
643        let fs_rollout = temp.path().join(
644            "sessions/2026/02/23/rollout-2026-02-23T04-48-50-019c871c-b1f9-7f60-9c4f-87ed09f13592.jsonl",
645        );
646        fs::create_dir_all(fs_rollout.parent().expect("parent")).expect("mkdir");
647        fs::write(&fs_rollout, "{}\n").expect("write");
648
649        let provider = CodexProvider::new(temp.path());
650        let resolved = provider
651            .resolve(session_id)
652            .expect("resolve should succeed");
653        assert_eq!(resolved.path, fs_rollout);
654        assert_eq!(resolved.metadata.source, "codex:sessions");
655        assert_eq!(resolved.metadata.warnings.len(), 1);
656        assert!(resolved.metadata.warnings[0].contains("missing rollout"));
657    }
658
659    #[test]
660    fn loads_role_overrides_from_main_and_config_file() {
661        let temp = tempdir().expect("tempdir");
662        fs::write(
663            temp.path().join("config.toml"),
664            r#"
665[agents.reviewer]
666description = "review role"
667config_file = "agents/reviewer.toml"
668model_reasoning_effort = "high"
669developer_instructions = "Focus on high priority issues."
670"#,
671        )
672        .expect("write config");
673        let role_config_dir = temp.path().join("agents");
674        fs::create_dir_all(&role_config_dir).expect("mkdir role config");
675        fs::write(
676            role_config_dir.join("reviewer.toml"),
677            r#"
678model = "gpt-5.3-codex"
679"#,
680        )
681        .expect("write role config");
682
683        let provider = CodexProvider::new(temp.path());
684        let overrides = provider
685            .load_role_overrides("reviewer")
686            .expect("must load role");
687
688        assert_eq!(
689            overrides,
690            vec![
691                (
692                    "developer_instructions".to_string(),
693                    "Focus on high priority issues.".to_string(),
694                ),
695                ("model".to_string(), "gpt-5.3-codex".to_string()),
696                ("model_reasoning_effort".to_string(), "high".to_string()),
697            ]
698        );
699    }
700
701    #[test]
702    fn missing_role_override_returns_error() {
703        let temp = tempdir().expect("tempdir");
704        fs::write(
705            temp.path().join("config.toml"),
706            r#"
707[agents.default]
708description = "default role"
709"#,
710        )
711        .expect("write config");
712
713        let provider = CodexProvider::new(temp.path());
714        let err = provider
715            .load_role_overrides("reviewer")
716            .expect_err("must fail");
717        assert!(format!("{err}").contains("is not defined"));
718    }
719}