use anyhow::{Context, Result};
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
use super::super::Manifest;
use super::{ArchiveOptions, ArchiveRequest, get_codex_dir, run};
#[derive(Debug, Default)]
pub struct BackfillReport {
pub vault_path: PathBuf,
pub vault_snapshots_walked: usize,
pub sessions_found: usize,
pub sessions_archived: usize,
pub sessions_skipped_already_archived: usize,
pub errors: Vec<(PathBuf, anyhow::Error)>,
}
pub fn run_backfill(vault_path: &Path, options: ArchiveOptions) -> Result<BackfillReport> {
let mut report = BackfillReport {
vault_path: vault_path.to_path_buf(),
..Default::default()
};
if !vault_path.exists() {
anyhow::bail!(
"vault path does not exist: {}. The vault may have already been removed.",
vault_path.display()
);
}
let snapshots = collect_snapshot_dirs(vault_path)
.with_context(|| format!("walking vault root {}", vault_path.display()))?;
if snapshots.is_empty() {
eprintln!(
"warning: vault path {} has no session-* snapshots; nothing to backfill",
vault_path.display()
);
return Ok(report);
}
let mut seen: HashSet<String> = match collect_archived_ids() {
Ok(ids) => ids,
Err(e) => {
eprintln!(
"warning: failed to scan codex for dedup set: {e}; \
backfill may produce duplicates"
);
let codex_dir = get_codex_dir().unwrap_or_else(|_| PathBuf::from("<codex>"));
report.errors.push((
codex_dir,
e.context("scanning codex for already-archived session_ids (dedup set)"),
));
HashSet::new()
}
};
let total_snapshots = snapshots.len();
for (idx, snapshot) in snapshots.iter().enumerate() {
report.vault_snapshots_walked += 1;
let sessions = match collect_session_jsonls(snapshot) {
Ok(v) => v,
Err(e) => {
eprintln!(
"warning: failed to walk vault snapshot {}: {}",
snapshot.display(),
e
);
report.errors.push((snapshot.clone(), e));
continue;
}
};
for session_path in sessions {
report.sessions_found += 1;
let session_id = match session_id_from_path(&session_path) {
Some(id) => id,
None => {
let err = anyhow::anyhow!(
"could not derive session_id from {}",
session_path.display()
);
report.errors.push((session_path, err));
continue;
}
};
if seen.contains(&session_id) {
report.sessions_skipped_already_archived += 1;
continue;
}
match run(
ArchiveRequest::Single(session_path.clone()),
options.clone(),
) {
Ok(_result) => {
report.sessions_archived += 1;
seen.insert(session_id);
}
Err(e) => {
eprintln!(
"warning: failed to archive {}: {}",
session_path.display(),
e
);
report.errors.push((session_path, e));
}
}
}
eprintln!(
"Backfilling vault: {}/{} snapshots, {}/{} sessions archived...",
idx + 1,
total_snapshots,
report.sessions_archived,
report.sessions_found
);
}
eprintln!(
"Backfill complete: {} vault snapshots, {} sessions found, {} archived, \
{} already in codex, {} errors.",
report.vault_snapshots_walked,
report.sessions_found,
report.sessions_archived,
report.sessions_skipped_already_archived,
report.errors.len()
);
Ok(report)
}
fn collect_snapshot_dirs(vault_path: &Path) -> Result<Vec<PathBuf>> {
let mut out = Vec::new();
for entry in fs::read_dir(vault_path)? {
let entry = entry?;
let path = entry.path();
if !path.is_dir() {
continue;
}
match path.file_name().and_then(|n| n.to_str()) {
Some(name) if name.starts_with("session-") => out.push(path),
_ => continue,
}
}
out.sort();
Ok(out)
}
fn collect_session_jsonls(snapshot_dir: &Path) -> Result<Vec<PathBuf>> {
let projects = snapshot_dir.join("projects");
if !projects.exists() {
return Ok(Vec::new());
}
let mut out = Vec::new();
for slug_entry in fs::read_dir(&projects)? {
let slug_entry = slug_entry?;
let slug_path = slug_entry.path();
if !slug_path.is_dir() {
continue;
}
for entry in fs::read_dir(&slug_path)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
continue;
}
if let Some(name) = path.file_name().and_then(|n| n.to_str())
&& name.starts_with("agent-")
{
continue;
}
out.push(path);
}
}
out.sort();
Ok(out)
}
fn collect_archived_ids() -> Result<HashSet<String>> {
let codex_dir = get_codex_dir()?;
let mut ids = HashSet::new();
if !codex_dir.exists() {
return Ok(ids);
}
for entry in fs::read_dir(&codex_dir)? {
let entry = entry?;
let manifest_path = entry.path().join("manifest.json");
if !manifest_path.exists() {
continue;
}
let content = match fs::read_to_string(&manifest_path) {
Ok(c) => c,
Err(_) => continue,
};
let manifest: Manifest = match serde_json::from_str(&content) {
Ok(m) => m,
Err(_) => continue,
};
ids.insert(manifest.session_id);
}
Ok(ids)
}
fn session_id_from_path(path: &Path) -> Option<String> {
path.file_stem()
.and_then(|s| s.to_str())
.map(|s| s.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn build_fake_vault(root: &Path, n_snapshots: usize, sessions_per_snapshot: usize) -> PathBuf {
let vault = root.join("vault-archives");
fs::create_dir_all(&vault).unwrap();
for s in 0..n_snapshots {
let snap = vault.join(format!("session-2026{:04}-100000-{:06}", s, s));
let slug = snap.join("projects").join("-home-charlie");
fs::create_dir_all(&slug).unwrap();
for k in 0..sessions_per_snapshot {
let uuid = format!(
"{:08x}-snap{:02}-sess{:02}-0000-000000000000",
s * 1000 + k,
s,
k
);
let session = slug.join(format!("{uuid}.jsonl"));
let line = format!(
"{{\"role\":\"user\",\"content\":\"hello\",\
\"timestamp\":\"2026-04-29T10:00:0{}Z\"}}\n",
k % 10
);
fs::write(&session, line).unwrap();
}
}
vault
}
fn drive_backfill(
n_snaps: usize,
sess_per_snap: usize,
) -> (BackfillReport, PathBuf, tempfile::TempDir) {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tmp = tempfile::tempdir().unwrap();
let codex_dir = tmp.path().join("codex");
fs::create_dir_all(&codex_dir).unwrap();
let vault = build_fake_vault(tmp.path(), n_snaps, sess_per_snap);
let prev = std::env::var("MX_CODEX_PATH").ok();
unsafe {
std::env::set_var("MX_CODEX_PATH", &codex_dir);
}
let options = ArchiveOptions::default();
let report = run_backfill(&vault, options).expect("backfill failed");
unsafe {
match prev {
Some(v) => std::env::set_var("MX_CODEX_PATH", v),
None => std::env::remove_var("MX_CODEX_PATH"),
}
}
(report, codex_dir, tmp)
}
#[cfg(unix)]
fn drop_unreadable_session(vault_path: &Path) -> PathBuf {
use std::os::unix::fs::PermissionsExt;
let snap = fs::read_dir(vault_path)
.unwrap()
.filter_map(|e| e.ok())
.find(|e| {
e.file_name()
.to_str()
.is_some_and(|n| n.starts_with("session-"))
})
.expect("at least one snapshot");
let slug = snap.path().join("projects").join("-home-charlie");
let path = slug.join("deadbeef-corrupt-uuid-0000-000000000000.jsonl");
fs::write(&path, "this content cannot be read because chmod 000").unwrap();
let mut perms = fs::metadata(&path).unwrap().permissions();
perms.set_mode(0o000);
fs::set_permissions(&path, perms).unwrap();
path
}
#[cfg(unix)]
#[test]
#[serial]
fn backfill_per_session_failure_is_non_fatal() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tmp = tempfile::tempdir().unwrap();
let codex_dir = tmp.path().join("codex");
fs::create_dir_all(&codex_dir).unwrap();
let vault = build_fake_vault(tmp.path(), 3, 1);
let corrupt = drop_unreadable_session(&vault);
let prev = std::env::var("MX_CODEX_PATH").ok();
unsafe {
std::env::set_var("MX_CODEX_PATH", &codex_dir);
}
let report = run_backfill(&vault, ArchiveOptions::default())
.expect("backfill must return Ok even when a session fails");
use std::os::unix::fs::PermissionsExt;
if corrupt.exists() {
let mut perms = fs::metadata(&corrupt).unwrap().permissions();
perms.set_mode(0o644);
let _ = fs::set_permissions(&corrupt, perms);
}
unsafe {
match prev {
Some(v) => std::env::set_var("MX_CODEX_PATH", v),
None => std::env::remove_var("MX_CODEX_PATH"),
}
}
assert_eq!(report.sessions_found, 4);
assert_eq!(report.sessions_archived, 3);
assert_eq!(report.sessions_skipped_already_archived, 0);
assert_eq!(
report.errors.len(),
1,
"expected exactly one error (the corrupt session), got: {:?}",
report
.errors
.iter()
.map(|(p, e)| format!("{}: {e}", p.display()))
.collect::<Vec<_>>()
);
assert_eq!(report.errors[0].0, corrupt);
}
#[test]
#[serial]
fn backfill_walks_snapshots_and_archives_sessions() {
let (report, codex, _tmp) = drive_backfill(3, 2);
assert_eq!(report.vault_snapshots_walked, 3);
assert_eq!(report.sessions_found, 6);
assert_eq!(report.sessions_archived, 6);
assert_eq!(report.sessions_skipped_already_archived, 0);
assert!(report.errors.is_empty());
let manifests = fs::read_dir(&codex)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().join("manifest.json").exists())
.count();
assert_eq!(manifests, 6);
}
#[test]
#[serial]
fn backfill_is_idempotent_on_second_run() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tmp = tempfile::tempdir().unwrap();
let codex_dir = tmp.path().join("codex");
fs::create_dir_all(&codex_dir).unwrap();
let vault = build_fake_vault(tmp.path(), 2, 2);
let prev = std::env::var("MX_CODEX_PATH").ok();
unsafe {
std::env::set_var("MX_CODEX_PATH", &codex_dir);
}
let r1 = run_backfill(&vault, ArchiveOptions::default()).unwrap();
assert_eq!(r1.sessions_archived, 4);
assert_eq!(r1.sessions_skipped_already_archived, 0);
let r2 = run_backfill(&vault, ArchiveOptions::default()).unwrap();
assert_eq!(r2.sessions_archived, 0);
assert_eq!(r2.sessions_skipped_already_archived, 4);
let manifests = fs::read_dir(&codex_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().join("manifest.json").exists())
.count();
assert_eq!(manifests, 4);
unsafe {
match prev {
Some(v) => std::env::set_var("MX_CODEX_PATH", v),
None => std::env::remove_var("MX_CODEX_PATH"),
}
}
}
#[test]
fn backfill_missing_vault_path_errors() {
let tmp = tempfile::tempdir().unwrap();
let bogus = tmp.path().join("does-not-exist");
let err = run_backfill(&bogus, ArchiveOptions::default()).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("does not exist"),
"expected helpful error, got: {msg}"
);
}
#[test]
#[serial]
fn backfill_surfaces_codex_scan_failure_on_report() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tmp = tempfile::tempdir().unwrap();
let codex_path = tmp.path().join("codex-not-a-dir");
fs::write(&codex_path, "i am a file, not a directory").unwrap();
let vault = build_fake_vault(tmp.path(), 1, 1);
let prev = std::env::var("MX_CODEX_PATH").ok();
unsafe {
std::env::set_var("MX_CODEX_PATH", &codex_path);
}
let report = run_backfill(&vault, ArchiveOptions::default())
.expect("backfill must continue past a codex scan failure (non-fatal error model)");
unsafe {
match prev {
Some(v) => std::env::set_var("MX_CODEX_PATH", v),
None => std::env::remove_var("MX_CODEX_PATH"),
}
}
let scan_err_present = report.errors.iter().any(|(_, e)| {
let msg = format!("{e:#}");
msg.contains("scanning codex") || msg.contains("dedup set")
});
assert!(
scan_err_present,
"expected a codex-scan failure in report.errors, got: {:?}",
report
.errors
.iter()
.map(|(p, e)| format!("{}: {e}", p.display()))
.collect::<Vec<_>>()
);
}
#[test]
#[serial]
fn backfill_empty_vault_warns_and_reports_zeros() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tmp = tempfile::tempdir().unwrap();
let vault = tmp.path().join("empty-vault");
fs::create_dir_all(&vault).unwrap();
let codex_dir = tmp.path().join("codex");
fs::create_dir_all(&codex_dir).unwrap();
let prev = std::env::var("MX_CODEX_PATH").ok();
unsafe {
std::env::set_var("MX_CODEX_PATH", &codex_dir);
}
let report = run_backfill(&vault, ArchiveOptions::default()).unwrap();
assert_eq!(report.vault_snapshots_walked, 0);
assert_eq!(report.sessions_found, 0);
assert_eq!(report.sessions_archived, 0);
unsafe {
match prev {
Some(v) => std::env::set_var("MX_CODEX_PATH", v),
None => std::env::remove_var("MX_CODEX_PATH"),
}
}
}
}