1use std::cmp::Reverse;
2use std::collections::BTreeMap;
3use std::fs;
4use std::io::{BufReader, Read};
5use std::path::{Path, PathBuf};
6use std::process::{Command, Stdio};
7use std::time::SystemTime;
8
9use rusqlite::{Connection, OpenFlags, OptionalExtension};
10use serde_json::Value;
11use toml::Table as TomlTable;
12use toml::Value as TomlValue;
13use walkdir::WalkDir;
14
15use crate::error::{Result, XurlError};
16use crate::jsonl;
17use crate::model::{ProviderKind, ResolutionMeta, ResolvedThread, WriteRequest, WriteResult};
18use crate::provider::{Provider, WriteEventSink, append_passthrough_args};
19
20#[derive(Debug, Clone)]
21pub struct CodexProvider {
22 root: PathBuf,
23}
24
25#[derive(Debug, Clone)]
26struct SqliteThreadRecord {
27 rollout_path: PathBuf,
28 archived: bool,
29}
30
31impl CodexProvider {
32 pub fn new(root: impl Into<PathBuf>) -> Self {
33 Self { root: root.into() }
34 }
35
36 fn sessions_root(&self) -> PathBuf {
37 self.root.join("sessions")
38 }
39
40 fn archived_root(&self) -> PathBuf {
41 self.root.join("archived_sessions")
42 }
43
44 fn state_db_paths(&self) -> Vec<PathBuf> {
45 let mut paths = if let Ok(entries) = fs::read_dir(&self.root) {
46 entries
47 .filter_map(std::result::Result::ok)
48 .filter_map(|entry| {
49 let path = entry.path();
50 let name = path.file_name()?.to_str()?;
51 let is_state_db = name == "state.sqlite"
52 || (name.starts_with("state_") && name.ends_with(".sqlite"));
53 if is_state_db && path.is_file() {
54 Some(path)
55 } else {
56 None
57 }
58 })
59 .collect::<Vec<_>>()
60 } else {
61 Vec::new()
62 };
63
64 paths.sort_by_key(|path| {
65 let version = path
66 .file_name()
67 .and_then(|name| name.to_str())
68 .and_then(|name| {
69 name.strip_prefix("state_")
70 .and_then(|name| name.strip_suffix(".sqlite"))
71 })
72 .and_then(|raw| raw.parse::<u32>().ok())
73 .unwrap_or(0);
74 let modified = fs::metadata(path)
75 .and_then(|meta| meta.modified())
76 .unwrap_or(SystemTime::UNIX_EPOCH);
77 (Reverse(version), Reverse(modified))
78 });
79
80 paths
81 }
82
83 fn query_thread_record(
84 db_path: &Path,
85 session_id: &str,
86 ) -> std::result::Result<Option<SqliteThreadRecord>, rusqlite::Error> {
87 let conn = Connection::open_with_flags(db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
88 let mut stmt =
89 conn.prepare("SELECT rollout_path, archived FROM threads WHERE id = ?1 LIMIT 1")?;
90 let row = stmt
91 .query_row([session_id], |row| {
92 Ok(SqliteThreadRecord {
93 rollout_path: PathBuf::from(row.get::<_, String>(0)?),
94 archived: row.get::<_, i64>(1)? != 0,
95 })
96 })
97 .optional()?;
98 Ok(row)
99 }
100
101 fn lookup_thread_from_state_db(
102 state_dbs: &[PathBuf],
103 session_id: &str,
104 warnings: &mut Vec<String>,
105 ) -> Option<SqliteThreadRecord> {
106 for db_path in state_dbs {
107 match Self::query_thread_record(db_path, session_id) {
108 Ok(Some(record)) => return Some(record),
109 Ok(None) => continue,
110 Err(err) => warnings.push(format!(
111 "failed reading sqlite thread index {}: {err}",
112 db_path.display()
113 )),
114 }
115 }
116
117 None
118 }
119
120 fn find_candidates(root: &Path, session_id: &str) -> Vec<PathBuf> {
121 let needle = format!("{session_id}.jsonl");
122 if !root.exists() {
123 return Vec::new();
124 }
125
126 WalkDir::new(root)
127 .into_iter()
128 .filter_map(std::result::Result::ok)
129 .filter(|entry| entry.file_type().is_file())
130 .map(|entry| entry.into_path())
131 .filter(|path| {
132 path.file_name()
133 .and_then(|name| name.to_str())
134 .is_some_and(|name| name.starts_with("rollout-") && name.ends_with(&needle))
135 })
136 .collect()
137 }
138
139 fn choose_latest(paths: Vec<PathBuf>) -> Option<(PathBuf, usize)> {
140 if paths.is_empty() {
141 return None;
142 }
143
144 let mut scored = paths
145 .into_iter()
146 .map(|path| {
147 let modified = fs::metadata(&path)
148 .and_then(|meta| meta.modified())
149 .unwrap_or(SystemTime::UNIX_EPOCH);
150 (path, modified)
151 })
152 .collect::<Vec<_>>();
153
154 scored.sort_by_key(|(_, modified)| Reverse(*modified));
155 let count = scored.len();
156 scored.into_iter().next().map(|(path, _)| (path, count))
157 }
158
159 fn codex_bin() -> String {
160 std::env::var("XURL_CODEX_BIN").unwrap_or_else(|_| "codex".to_string())
161 }
162
163 fn config_path(&self) -> PathBuf {
164 self.root.join("config.toml")
165 }
166
167 fn load_role_overrides(&self, role: &str) -> Result<Vec<(String, String)>> {
168 let config_path = self.config_path();
169 let raw = fs::read_to_string(&config_path).map_err(|source| XurlError::Io {
170 path: config_path.clone(),
171 source,
172 })?;
173 let config = toml::from_str::<TomlTable>(&raw).map_err(|err| {
174 XurlError::InvalidMode(format!(
175 "failed parsing codex config {}: {err}",
176 config_path.display()
177 ))
178 })?;
179 let role_config = config
180 .get("agents")
181 .and_then(TomlValue::as_table)
182 .and_then(|agents| agents.get(role))
183 .and_then(TomlValue::as_table)
184 .ok_or_else(|| {
185 XurlError::InvalidMode(format!(
186 "codex role `{role}` is not defined in {}",
187 config_path.display()
188 ))
189 })?;
190
191 let mut merged = BTreeMap::<String, String>::new();
192 if let Some(config_file) = role_config.get("config_file").and_then(TomlValue::as_str) {
193 let config_file_path = if Path::new(config_file).is_absolute() {
194 PathBuf::from(config_file)
195 } else {
196 self.root.join(config_file)
197 };
198 let raw = fs::read_to_string(&config_file_path).map_err(|source| XurlError::Io {
199 path: config_file_path.clone(),
200 source,
201 })?;
202 let config = toml::from_str::<TomlTable>(&raw).map_err(|err| {
203 XurlError::InvalidMode(format!(
204 "failed parsing codex role config {}: {err}",
205 config_file_path.display()
206 ))
207 })?;
208 for (key, value) in config {
209 Self::flatten_codex_config(&key, &value, &mut merged);
210 }
211 }
212
213 for (key, value) in role_config {
214 if key == "description" || key == "config_file" {
215 continue;
216 }
217 Self::flatten_codex_config(key, value, &mut merged);
218 }
219
220 if merged.is_empty() {
221 return Err(XurlError::InvalidMode(format!(
222 "codex role `{role}` does not define writable config overrides"
223 )));
224 }
225
226 Ok(merged.into_iter().collect())
227 }
228
229 fn flatten_codex_config(
230 prefix: &str,
231 value: &TomlValue,
232 output: &mut BTreeMap<String, String>,
233 ) {
234 if let TomlValue::Table(table) = value {
235 for (key, child) in table {
236 let next_prefix = format!("{prefix}.{key}");
237 Self::flatten_codex_config(&next_prefix, child, output);
238 }
239 return;
240 }
241
242 output.insert(prefix.to_string(), Self::encode_codex_config_value(value));
243 }
244
245 fn encode_codex_config_value(value: &TomlValue) -> String {
246 match value {
247 TomlValue::String(text) => text.clone(),
248 TomlValue::Integer(number) => number.to_string(),
249 TomlValue::Float(number) => number.to_string(),
250 TomlValue::Boolean(flag) => flag.to_string(),
251 TomlValue::Datetime(datetime) => datetime.to_string(),
252 TomlValue::Array(_) | TomlValue::Table(_) => {
253 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
254 }
255 }
256 }
257
258 fn spawn_codex_command(args: &[String]) -> Result<std::process::Child> {
259 let bin = Self::codex_bin();
260 let mut command = Command::new(&bin);
261 command
262 .args(args)
263 .stdin(Stdio::null())
264 .stdout(Stdio::piped())
265 .stderr(Stdio::piped());
266 command.spawn().map_err(|source| {
267 if source.kind() == std::io::ErrorKind::NotFound {
268 XurlError::CommandNotFound { command: bin }
269 } else {
270 XurlError::Io {
271 path: PathBuf::from(bin),
272 source,
273 }
274 }
275 })
276 }
277
278 fn run_write(
279 &self,
280 args: &[String],
281 req: &WriteRequest,
282 sink: &mut dyn WriteEventSink,
283 warnings: Vec<String>,
284 ) -> Result<WriteResult> {
285 let mut child = Self::spawn_codex_command(args)?;
286 let stdout = child.stdout.take().ok_or_else(|| {
287 XurlError::WriteProtocol("codex stdout pipe is unavailable".to_string())
288 })?;
289 let stderr = child.stderr.take().ok_or_else(|| {
290 XurlError::WriteProtocol("codex stderr pipe is unavailable".to_string())
291 })?;
292 let stderr_handle = std::thread::spawn(move || {
293 let mut reader = BufReader::new(stderr);
294 let mut content = String::new();
295 let _ = reader.read_to_string(&mut content);
296 content
297 });
298
299 let mut session_id = req.session_id.clone();
300 let mut final_text = None::<String>;
301 let stream_path = Path::new("<codex:stdout>");
302 let reader = BufReader::new(stdout);
303 jsonl::parse_jsonl_reader(stream_path, reader, |_, value| {
304 let Some(event_type) = value.get("type").and_then(Value::as_str) else {
305 return Ok(());
306 };
307
308 if event_type == "thread.started" {
309 if let Some(thread_id) = value.get("thread_id").and_then(Value::as_str) {
310 sink.on_session_ready(ProviderKind::Codex, thread_id)?;
311 session_id = Some(thread_id.to_string());
312 }
313 return Ok(());
314 }
315
316 if event_type != "item.completed" {
317 return Ok(());
318 }
319
320 let Some(item) = value.get("item") else {
321 return Ok(());
322 };
323 if item.get("type").and_then(Value::as_str) != Some("agent_message") {
324 return Ok(());
325 }
326
327 if let Some(text) = item.get("text").and_then(Value::as_str) {
328 sink.on_text_delta(text)?;
329 final_text = Some(text.to_string());
330 }
331 Ok(())
332 })?;
333
334 let status = child.wait().map_err(|source| XurlError::Io {
335 path: PathBuf::from(Self::codex_bin()),
336 source,
337 })?;
338 let stderr_content = stderr_handle.join().unwrap_or_default();
339
340 if !status.success() {
341 return Err(XurlError::CommandFailed {
342 command: format!("{} {}", Self::codex_bin(), args.join(" ")),
343 code: status.code(),
344 stderr: stderr_content.trim().to_string(),
345 });
346 }
347
348 let session_id = if let Some(session_id) = session_id {
349 session_id
350 } else {
351 return Err(XurlError::WriteProtocol(
352 "missing thread id in codex event stream".to_string(),
353 ));
354 };
355
356 Ok(WriteResult {
357 provider: ProviderKind::Codex,
358 session_id,
359 final_text,
360 warnings,
361 })
362 }
363}
364
365impl Provider for CodexProvider {
366 fn kind(&self) -> ProviderKind {
367 ProviderKind::Codex
368 }
369
370 fn resolve(&self, session_id: &str) -> Result<ResolvedThread> {
371 let sessions = self.sessions_root();
372 let archived = self.archived_root();
373 let state_dbs = self.state_db_paths();
374 let mut warnings = Vec::new();
375 let sqlite_record =
376 Self::lookup_thread_from_state_db(&state_dbs, session_id, &mut warnings);
377
378 if let Some(record) = sqlite_record.as_ref().filter(|record| !record.archived) {
379 if record.rollout_path.exists() {
380 return Ok(ResolvedThread {
381 provider: ProviderKind::Codex,
382 session_id: session_id.to_string(),
383 path: record.rollout_path.clone(),
384 metadata: ResolutionMeta {
385 source: "codex:sqlite:sessions".to_string(),
386 candidate_count: 1,
387 warnings,
388 },
389 });
390 }
391
392 warnings.push(format!(
393 "sqlite thread index points to a missing rollout for session_id={session_id}: {}",
394 record.rollout_path.display()
395 ));
396 }
397
398 let active_candidates = Self::find_candidates(&sessions, session_id);
399 if let Some((selected, count)) = Self::choose_latest(active_candidates) {
400 if count > 1 {
401 warnings.push(format!(
402 "multiple matches found ({count}) for session_id={session_id}; selected latest: {}",
403 selected.display()
404 ));
405 }
406
407 let meta = ResolutionMeta {
408 source: "codex:sessions".to_string(),
409 candidate_count: count,
410 warnings,
411 };
412
413 return Ok(ResolvedThread {
414 provider: ProviderKind::Codex,
415 session_id: session_id.to_string(),
416 path: selected,
417 metadata: meta,
418 });
419 }
420
421 if let Some(record) = sqlite_record.as_ref().filter(|record| record.archived) {
422 if record.rollout_path.exists() {
423 return Ok(ResolvedThread {
424 provider: ProviderKind::Codex,
425 session_id: session_id.to_string(),
426 path: record.rollout_path.clone(),
427 metadata: ResolutionMeta {
428 source: "codex:sqlite:archived_sessions".to_string(),
429 candidate_count: 1,
430 warnings,
431 },
432 });
433 }
434
435 warnings.push(format!(
436 "sqlite thread index points to a missing archived rollout for session_id={session_id}: {}",
437 record.rollout_path.display()
438 ));
439 }
440
441 let archived_candidates = Self::find_candidates(&archived, session_id);
442 if let Some((selected, count)) = Self::choose_latest(archived_candidates) {
443 if count > 1 {
444 warnings.push(format!(
445 "multiple archived matches found ({count}) for session_id={session_id}; selected latest: {}",
446 selected.display()
447 ));
448 }
449
450 let meta = ResolutionMeta {
451 source: "codex:archived_sessions".to_string(),
452 candidate_count: count,
453 warnings,
454 };
455
456 return Ok(ResolvedThread {
457 provider: ProviderKind::Codex,
458 session_id: session_id.to_string(),
459 path: selected,
460 metadata: meta,
461 });
462 }
463
464 Err(XurlError::ThreadNotFound {
465 provider: ProviderKind::Codex.to_string(),
466 session_id: session_id.to_string(),
467 searched_roots: vec![sessions, archived]
468 .into_iter()
469 .chain(state_dbs)
470 .collect(),
471 })
472 }
473
474 fn write(&self, req: &WriteRequest, sink: &mut dyn WriteEventSink) -> Result<WriteResult> {
475 let warnings = Vec::new();
476 let role_overrides = if let Some(role) = req.options.role.as_deref() {
477 self.load_role_overrides(role)?
478 } else {
479 Vec::new()
480 };
481 let mut args = Vec::new();
482 args.push("exec".to_string());
483
484 if let Some(session_id) = req.session_id.as_deref() {
485 args.push("resume".to_string());
486 args.push("--json".to_string());
487 append_passthrough_args(&mut args, &req.options.params);
488 for (key, value) in &role_overrides {
489 args.push("--config".to_string());
490 args.push(format!("{key}={value}"));
491 }
492 args.push(session_id.to_string());
493 args.push(req.prompt.clone());
494 self.run_write(&args, req, sink, warnings)
495 } else {
496 args.push("--json".to_string());
497 append_passthrough_args(&mut args, &req.options.params);
498 for (key, value) in &role_overrides {
499 args.push("--config".to_string());
500 args.push(format!("{key}={value}"));
501 }
502 args.push(req.prompt.clone());
503 self.run_write(&args, req, sink, warnings)
504 }
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use std::fs;
511 use std::path::Path;
512
513 use rusqlite::Connection;
514 use tempfile::tempdir;
515
516 use crate::provider::Provider;
517 use crate::provider::codex::CodexProvider;
518
519 fn prepare_state_db(path: &Path) -> Connection {
520 let conn = Connection::open(path).expect("open sqlite");
521 conn.execute_batch(
522 "
523 CREATE TABLE threads (
524 id TEXT PRIMARY KEY,
525 rollout_path TEXT NOT NULL,
526 archived INTEGER NOT NULL DEFAULT 0
527 );
528 ",
529 )
530 .expect("create schema");
531 conn
532 }
533
534 #[test]
535 fn resolves_from_sessions() {
536 let temp = tempdir().expect("tempdir");
537 let path = temp
538 .path()
539 .join("sessions/2026/02/23/rollout-2026-02-23T04-48-50-019c871c-b1f9-7f60-9c4f-87ed09f13592.jsonl");
540 fs::create_dir_all(path.parent().expect("parent")).expect("mkdir");
541 fs::write(&path, "{}\n").expect("write");
542
543 let provider = CodexProvider::new(temp.path());
544 let resolved = provider
545 .resolve("019c871c-b1f9-7f60-9c4f-87ed09f13592")
546 .expect("resolve should succeed");
547 assert_eq!(resolved.path, path);
548 }
549
550 #[test]
551 fn resolves_from_archived_when_not_in_sessions() {
552 let temp = tempdir().expect("tempdir");
553 let path = temp
554 .path()
555 .join("archived_sessions/rollout-2026-02-22T01-05-36-019c8129-f668-7951-8d56-cc5513541c26.jsonl");
556 fs::create_dir_all(path.parent().expect("parent")).expect("mkdir");
557 fs::write(&path, "{}\n").expect("write");
558
559 let provider = CodexProvider::new(temp.path());
560 let resolved = provider
561 .resolve("019c8129-f668-7951-8d56-cc5513541c26")
562 .expect("resolve should succeed");
563 assert_eq!(resolved.path, path);
564 assert_eq!(resolved.metadata.source, "codex:archived_sessions");
565 }
566
567 #[test]
568 fn returns_not_found_when_missing() {
569 let temp = tempdir().expect("tempdir");
570 let provider = CodexProvider::new(temp.path());
571 let err = provider
572 .resolve("019c8129-f668-7951-8d56-cc5513541c26")
573 .expect_err("should fail");
574 assert!(format!("{err}").contains("thread not found"));
575 }
576
577 #[test]
578 fn resolves_from_sqlite_state_index() {
579 let temp = tempdir().expect("tempdir");
580 let state_db = temp.path().join("state_5.sqlite");
581 let conn = prepare_state_db(&state_db);
582
583 let session_id = "019c871c-b1f9-7f60-9c4f-87ed09f13592";
584 let rollout = temp.path().join("sessions/custom/path/thread.jsonl");
585 fs::create_dir_all(rollout.parent().expect("parent")).expect("mkdir");
586 fs::write(&rollout, "{}\n").expect("write");
587
588 conn.execute(
589 "INSERT INTO threads (id, rollout_path, archived) VALUES (?1, ?2, 0)",
590 (&session_id, rollout.display().to_string()),
591 )
592 .expect("insert thread");
593
594 let provider = CodexProvider::new(temp.path());
595 let resolved = provider
596 .resolve(session_id)
597 .expect("resolve should succeed");
598 assert_eq!(resolved.path, rollout);
599 assert_eq!(resolved.metadata.source, "codex:sqlite:sessions");
600 }
601
602 #[test]
603 fn resolves_archived_from_sqlite_state_index() {
604 let temp = tempdir().expect("tempdir");
605 let state_db = temp.path().join("state.sqlite");
606 let conn = prepare_state_db(&state_db);
607
608 let session_id = "019c8129-f668-7951-8d56-cc5513541c26";
609 let rollout = temp
610 .path()
611 .join("archived_sessions/custom/path/thread.jsonl");
612 fs::create_dir_all(rollout.parent().expect("parent")).expect("mkdir");
613 fs::write(&rollout, "{}\n").expect("write");
614
615 conn.execute(
616 "INSERT INTO threads (id, rollout_path, archived) VALUES (?1, ?2, 1)",
617 (&session_id, rollout.display().to_string()),
618 )
619 .expect("insert thread");
620
621 let provider = CodexProvider::new(temp.path());
622 let resolved = provider
623 .resolve(session_id)
624 .expect("resolve should succeed");
625 assert_eq!(resolved.path, rollout);
626 assert_eq!(resolved.metadata.source, "codex:sqlite:archived_sessions");
627 }
628
629 #[test]
630 fn falls_back_to_filesystem_when_sqlite_rollout_missing() {
631 let temp = tempdir().expect("tempdir");
632 let state_db = temp.path().join("state_5.sqlite");
633 let conn = prepare_state_db(&state_db);
634
635 let session_id = "019c871c-b1f9-7f60-9c4f-87ed09f13592";
636 let stale_rollout = temp.path().join("sessions/stale/path/thread.jsonl");
637 conn.execute(
638 "INSERT INTO threads (id, rollout_path, archived) VALUES (?1, ?2, 0)",
639 (&session_id, stale_rollout.display().to_string()),
640 )
641 .expect("insert thread");
642
643 let fs_rollout = temp.path().join(
644 "sessions/2026/02/23/rollout-2026-02-23T04-48-50-019c871c-b1f9-7f60-9c4f-87ed09f13592.jsonl",
645 );
646 fs::create_dir_all(fs_rollout.parent().expect("parent")).expect("mkdir");
647 fs::write(&fs_rollout, "{}\n").expect("write");
648
649 let provider = CodexProvider::new(temp.path());
650 let resolved = provider
651 .resolve(session_id)
652 .expect("resolve should succeed");
653 assert_eq!(resolved.path, fs_rollout);
654 assert_eq!(resolved.metadata.source, "codex:sessions");
655 assert_eq!(resolved.metadata.warnings.len(), 1);
656 assert!(resolved.metadata.warnings[0].contains("missing rollout"));
657 }
658
659 #[test]
660 fn loads_role_overrides_from_main_and_config_file() {
661 let temp = tempdir().expect("tempdir");
662 fs::write(
663 temp.path().join("config.toml"),
664 r#"
665[agents.reviewer]
666description = "review role"
667config_file = "agents/reviewer.toml"
668model_reasoning_effort = "high"
669developer_instructions = "Focus on high priority issues."
670"#,
671 )
672 .expect("write config");
673 let role_config_dir = temp.path().join("agents");
674 fs::create_dir_all(&role_config_dir).expect("mkdir role config");
675 fs::write(
676 role_config_dir.join("reviewer.toml"),
677 r#"
678model = "gpt-5.3-codex"
679"#,
680 )
681 .expect("write role config");
682
683 let provider = CodexProvider::new(temp.path());
684 let overrides = provider
685 .load_role_overrides("reviewer")
686 .expect("must load role");
687
688 assert_eq!(
689 overrides,
690 vec![
691 (
692 "developer_instructions".to_string(),
693 "Focus on high priority issues.".to_string(),
694 ),
695 ("model".to_string(), "gpt-5.3-codex".to_string()),
696 ("model_reasoning_effort".to_string(), "high".to_string()),
697 ]
698 );
699 }
700
701 #[test]
702 fn missing_role_override_returns_error() {
703 let temp = tempdir().expect("tempdir");
704 fs::write(
705 temp.path().join("config.toml"),
706 r#"
707[agents.default]
708description = "default role"
709"#,
710 )
711 .expect("write config");
712
713 let provider = CodexProvider::new(temp.path());
714 let err = provider
715 .load_role_overrides("reviewer")
716 .expect_err("must fail");
717 assert!(format!("{err}").contains("is not defined"));
718 }
719}