use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Utc};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use walkdir::WalkDir;
use crate::{codex, db::Db, ingest, paths, pricing::Pricing};
#[derive(Debug, Default)]
pub struct BackfillStats {
pub files_scanned: u64,
pub bytes_read: u64,
pub rows_inserted: u64,
pub files_archived: u64,
}
#[derive(Clone, Copy, Debug, clap::ValueEnum)]
pub enum Source {
All,
ClaudeCode,
Codex,
}
pub fn run(source: Source, archive_older_than_days: Option<u32>) -> Result<()> {
let mut db = Db::open()?;
let pricing = Pricing::load()?;
let mut total = BackfillStats::default();
let started = std::time::Instant::now();
if matches!(source, Source::All | Source::ClaudeCode) {
let s = backfill_claude(&mut db, &pricing)?;
merge(&mut total, &s);
}
if matches!(source, Source::All | Source::Codex) {
let s = backfill_codex(&mut db, &pricing)?;
merge(&mut total, &s);
}
if let Some(days) = archive_older_than_days {
total.files_archived = archive_claude(days)?;
}
println!(
"Backfill done in {:.1}s — files scanned: {}, new rows: {}, bytes read:{}{}",
started.elapsed().as_secs_f32(),
total.files_scanned,
total.rows_inserted,
humanize_bytes(total.bytes_read),
if total.files_archived > 0 {
format!(", archived: {}", total.files_archived)
} else {
String::new()
}
);
Ok(())
}
fn merge(a: &mut BackfillStats, b: &BackfillStats) {
a.files_scanned += b.files_scanned;
a.bytes_read += b.bytes_read;
a.rows_inserted += b.rows_inserted;
a.files_archived += b.files_archived;
}
fn backfill_claude(db: &mut Db, pricing: &Pricing) -> Result<BackfillStats> {
let root = paths::claude_projects_dir()?;
if !root.exists() {
return Ok(BackfillStats::default());
}
println!("Scanning Claude Code transcripts at {} ...", root.display());
let mut stats = BackfillStats::default();
for entry in WalkDir::new(&root)
.into_iter()
.filter_map(std::result::Result::ok)
.filter(|e| e.file_type().is_file())
{
let p = entry.path();
if p.extension().and_then(|s| s.to_str()) != Some("jsonl") {
continue;
}
let Some(transcript_path) = p.to_str() else {
continue;
};
let cursor = db.cursor_for(transcript_path)?;
let Ok(mut f) = std::fs::File::open(p) else {
continue;
};
let len = f.metadata()?.len();
let start = if cursor.byte_offset > len {
0
} else {
cursor.byte_offset
};
if start == len {
stats.files_scanned += 1;
continue;
}
f.seek(SeekFrom::Start(start))?;
let mut bytes = Vec::new();
f.read_to_end(&mut bytes)?;
stats.bytes_read += bytes.len() as u64;
let consume_to = match bytes.iter().rposition(|b| *b == b'\n') {
Some(i) => i + 1,
None => 0,
};
let safe = &bytes[..consume_to];
let project_path = paths::project_from_transcript(transcript_path);
let ctx = ingest::FileCtx {
transcript_path,
fallback_session_id: "",
fallback_cwd: None,
project_path: &project_path,
};
let tx = db.conn.transaction()?;
let mut last_uuid: Option<String> = cursor.last_uuid.clone();
for line in safe.split(|b| *b == b'\n') {
if line.is_empty() {
continue;
}
let Ok(line_str) = std::str::from_utf8(line) else {
continue;
};
if let Ok(Some(row)) = ingest::parse_claude_line(line_str, &ctx) {
last_uuid = Some(row.uuid.clone());
if ingest::insert_row(&tx, &row, pricing)? {
stats.rows_inserted += 1;
}
}
}
tx.commit()?;
let new_offset = start + consume_to as u64;
db.set_cursor(transcript_path, new_offset, last_uuid.as_deref())?;
stats.files_scanned += 1;
}
Ok(stats)
}
fn backfill_codex(db: &mut Db, pricing: &Pricing) -> Result<BackfillStats> {
let root = codex::sessions_dir()?;
if !root.exists() {
return Ok(BackfillStats::default());
}
println!("Scanning Codex transcripts at {} ...", root.display());
let mut stats = BackfillStats::default();
for path in codex::enumerate_files()? {
let transcript_path = path.to_string_lossy().to_string();
let cursor = db.cursor_for(&transcript_path)?;
let len = match std::fs::metadata(&path) {
Ok(m) => m.len(),
Err(_) => continue,
};
let start = if cursor.byte_offset > len {
0
} else {
cursor.byte_offset
};
if start == len {
stats.files_scanned += 1;
continue;
}
let Ok(rows) = codex::parse_rollout(&path, None) else {
continue;
};
stats.bytes_read += len;
let tx = db.conn.transaction()?;
for row in rows {
if ingest::insert_row(&tx, &row, pricing)? {
stats.rows_inserted += 1;
}
}
tx.commit()?;
db.set_cursor(&transcript_path, len, None)?;
stats.files_scanned += 1;
}
Ok(stats)
}
fn archive_claude(days: u32) -> Result<u64> {
let root = paths::claude_projects_dir()?;
let archive = paths::archive_dir()?;
std::fs::create_dir_all(&archive).with_context(|| format!("creating {}", archive.display()))?;
let cutoff: DateTime<Utc> = Utc::now() - Duration::days(i64::from(days));
let mut count = 0u64;
for entry in WalkDir::new(&root)
.into_iter()
.filter_map(std::result::Result::ok)
.filter(|e| e.file_type().is_file())
{
let p = entry.path();
if p.extension().and_then(|s| s.to_str()) != Some("jsonl") {
continue;
}
let mtime = match entry.metadata() {
Ok(meta) => match meta.modified() {
Ok(t) => DateTime::<Utc>::from(t),
Err(_) => continue,
},
Err(_) => continue,
};
if mtime > cutoff {
continue;
}
let rel = p.strip_prefix(&root).unwrap_or(p);
let mut out_path = archive.join(rel);
if let Some(parent) = out_path.parent() {
std::fs::create_dir_all(parent)?;
}
out_path.set_extension("jsonl.gz");
let in_bytes = std::fs::read(p)?;
let mut tmp_os = out_path.clone().into_os_string();
tmp_os.push(".part");
let tmp_path = PathBuf::from(tmp_os);
{
let f = std::fs::File::create(&tmp_path)?;
let mut enc = flate2::write::GzEncoder::new(f, flate2::Compression::default());
enc.write_all(&in_bytes)?;
enc.finish()?;
}
std::fs::rename(&tmp_path, &out_path)?;
std::fs::remove_file(p)?;
count += 1;
}
Ok(count)
}
fn humanize_bytes(n: u64) -> String {
const K: f64 = 1024.0;
let n = n as f64;
if n < K {
format!(" ({} B)", n as u64)
} else if n < K * K {
format!(" ({:.1} KiB)", n / K)
} else if n < K * K * K {
format!(" ({:.1} MiB)", n / (K * K))
} else {
format!(" ({:.2} GiB)", n / (K * K * K))
}
}