codex-recall 0.1.2

Local search and recall for Codex session JSONL archives
Documentation
use crate::parser::parse_session_file;
use crate::store::{build_session_key, Store};
use anyhow::{Context, Result};
use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IndexReport {
    pub files_total: usize,
    pub files_seen: usize,
    pub files_skipped: usize,
    pub skipped_unchanged: usize,
    pub skipped_missing: usize,
    pub skipped_non_session: usize,
    pub sessions_indexed: usize,
    pub events_indexed: usize,
    pub bytes_total: u64,
    pub bytes_seen: u64,
    pub current_file: Option<PathBuf>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SourceScanReport {
    pub files_total: usize,
    pub pending_files: usize,
    pub pending_bytes: u64,
    pub stable_pending_files: usize,
    pub waiting_files: usize,
    pub missing_sources: Vec<PathBuf>,
}

pub fn index_sources(store: &Store, sources: &[PathBuf]) -> Result<IndexReport> {
    index_sources_with_progress(store, sources, |_| {})
}

pub fn scan_sources_for_pending(
    store: Option<&Store>,
    sources: &[PathBuf],
    quiet_for: Duration,
) -> Result<SourceScanReport> {
    let now = SystemTime::now();
    let mut report = SourceScanReport {
        files_total: 0,
        pending_files: 0,
        pending_bytes: 0,
        stable_pending_files: 0,
        waiting_files: 0,
        missing_sources: Vec::new(),
    };

    for source in sources {
        if !source.exists() {
            report.missing_sources.push(source.clone());
            continue;
        }

        for path in jsonl_files(source)? {
            report.files_total += 1;
            let file_state = match FileState::from_path(&path) {
                Ok(file_state) => file_state,
                Err(error) if is_not_found_error(&error) => continue,
                Err(error) => return Err(error),
            };
            let is_current = if let Some(store) = store {
                store.is_source_current(
                    &path,
                    file_state.source_file_mtime_ns,
                    file_state.source_file_size,
                )?
            } else {
                false
            };

            if is_current {
                continue;
            }

            report.pending_files += 1;
            report.pending_bytes = report
                .pending_bytes
                .saturating_add(file_state.source_file_size as u64);
            if is_stable(now, file_state.modified, quiet_for) {
                report.stable_pending_files += 1;
            } else {
                report.waiting_files += 1;
            }
        }
    }

    Ok(report)
}

pub fn index_sources_with_progress<F>(
    store: &Store,
    sources: &[PathBuf],
    mut on_progress: F,
) -> Result<IndexReport>
where
    F: FnMut(&IndexReport),
{
    let mut files = Vec::new();
    for source in sources {
        files.extend(jsonl_files(source)?);
    }
    files.sort();

    let mut report = IndexReport {
        files_total: files.len(),
        files_seen: 0,
        files_skipped: 0,
        skipped_unchanged: 0,
        skipped_missing: 0,
        skipped_non_session: 0,
        sessions_indexed: 0,
        events_indexed: 0,
        bytes_total: total_known_bytes(&files),
        bytes_seen: 0,
        current_file: None,
    };

    on_progress(&report);

    for path in files {
        report.current_file = Some(path.clone());

        let file_state = match FileState::from_path(&path) {
            Ok(file_state) => file_state,
            Err(error) if is_not_found_error(&error) => {
                report.files_seen += 1;
                report.files_skipped += 1;
                report.skipped_missing += 1;
                if should_report_after_file(&report) {
                    on_progress(&report);
                }
                continue;
            }
            Err(error) => return Err(error),
        };
        report.bytes_seen = report
            .bytes_seen
            .saturating_add(file_state.source_file_size as u64);

        if store.is_source_current(
            &path,
            file_state.source_file_mtime_ns,
            file_state.source_file_size,
        )? {
            report.files_seen += 1;
            report.files_skipped += 1;
            report.skipped_unchanged += 1;
            if should_report_after_file(&report) {
                on_progress(&report);
            }
            continue;
        }

        on_progress(&report);

        let parsed = match parse_session_file(&path) {
            Ok(parsed) => parsed,
            Err(error) if is_not_found_error(&error) => {
                report.files_seen += 1;
                report.files_skipped += 1;
                report.skipped_missing += 1;
                if should_report_after_file(&report) {
                    on_progress(&report);
                }
                continue;
            }
            Err(error) => return Err(error),
        };

        if let Some(parsed) = parsed {
            let session_key =
                build_session_key(&parsed.session.id, &parsed.session.source_file_path);
            report.events_indexed += parsed.events.len();
            store.index_session(&parsed)?;
            store.mark_source_indexed(
                &path,
                file_state.source_file_mtime_ns,
                file_state.source_file_size,
                Some(&parsed.session.id),
                Some(&session_key),
            )?;
            report.sessions_indexed += 1;
        } else {
            store.mark_source_indexed(
                &path,
                file_state.source_file_mtime_ns,
                file_state.source_file_size,
                None,
                None,
            )?;
            report.files_skipped += 1;
            report.skipped_non_session += 1;
        }

        report.files_seen += 1;
        if should_report_after_file(&report) {
            on_progress(&report);
        }
    }

    Ok(report)
}

fn should_report_after_file(report: &IndexReport) -> bool {
    report.files_seen == 1
        || report.files_seen.is_multiple_of(25)
        || report.files_seen == report.files_total
}

fn total_known_bytes(files: &[PathBuf]) -> u64 {
    files
        .iter()
        .filter_map(|path| fs::metadata(path).ok())
        .map(|metadata| metadata.len())
        .sum()
}

fn jsonl_files(root: &Path) -> Result<Vec<PathBuf>> {
    let mut files = Vec::new();
    collect_jsonl_files(root, &mut files)?;
    files.sort();
    Ok(files)
}

fn collect_jsonl_files(path: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
    if !path.exists() {
        return Ok(());
    }

    if path.is_file() {
        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
            files.push(path.to_path_buf());
        }
        return Ok(());
    }

    for entry in fs::read_dir(path)? {
        let entry = entry?;
        let path = entry.path();
        if path.is_dir() {
            collect_jsonl_files(&path, files)?;
        } else if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
            files.push(path);
        }
    }

    Ok(())
}

struct FileState {
    source_file_mtime_ns: i64,
    source_file_size: i64,
    modified: SystemTime,
}

impl FileState {
    fn from_path(path: &Path) -> Result<Self> {
        let metadata = fs::metadata(path).with_context(|| format!("stat {}", path.display()))?;
        let modified = metadata
            .modified()
            .with_context(|| format!("read mtime {}", path.display()))?;
        let source_file_mtime_ns = modified
            .duration_since(UNIX_EPOCH)
            .with_context(|| format!("mtime before unix epoch {}", path.display()))?
            .as_nanos() as i64;
        let source_file_size = metadata.len() as i64;

        Ok(Self {
            source_file_mtime_ns,
            source_file_size,
            modified,
        })
    }
}

fn is_stable(now: SystemTime, modified: SystemTime, quiet_for: Duration) -> bool {
    quiet_for.is_zero()
        || now
            .duration_since(modified)
            .is_ok_and(|age| age >= quiet_for)
}

fn is_not_found_error(error: &anyhow::Error) -> bool {
    error.chain().any(|cause| {
        cause
            .downcast_ref::<std::io::Error>()
            .is_some_and(|io_error| io_error.kind() == ErrorKind::NotFound)
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    fn temp_root(name: &str) -> PathBuf {
        let dir = std::env::temp_dir().join(format!(
            "codex-recall-indexer-test-{}-{}",
            std::process::id(),
            name
        ));
        let _ = std::fs::remove_dir_all(&dir);
        std::fs::create_dir_all(&dir).unwrap();
        dir
    }

    fn write_session(path: &Path, message: &str) {
        std::fs::write(
            path,
            format!(
                r#"{{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{{"id":"session-1","timestamp":"2026-04-13T01:00:00Z","cwd":"/tmp"}}}}
{{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{{"type":"user_message","message":"{message}"}}}}
"#
            ),
        )
        .unwrap();
    }

    #[test]
    fn skips_unchanged_files_on_second_index_run() {
        let root = temp_root("incremental");
        let source = root.join("sessions");
        std::fs::create_dir_all(&source).unwrap();
        let session_file = source.join("session.jsonl");
        write_session(&session_file, "First message");
        let store = Store::open(root.join("index.sqlite")).unwrap();

        let first = index_sources(&store, std::slice::from_ref(&source)).unwrap();
        let second = index_sources(&store, &[source]).unwrap();

        assert_eq!(first.files_seen, 1);
        assert_eq!(first.files_total, 1);
        assert!(first.bytes_total > 0);
        assert_eq!(first.bytes_seen, first.bytes_total);
        assert_eq!(first.current_file, Some(session_file));
        assert_eq!(first.files_skipped, 0);
        assert_eq!(first.sessions_indexed, 1);
        assert_eq!(second.files_seen, 1);
        assert_eq!(second.files_total, 1);
        assert_eq!(second.files_skipped, 1);
        assert_eq!(second.skipped_unchanged, 1);
        assert_eq!(second.skipped_missing, 0);
        assert_eq!(second.skipped_non_session, 0);
        assert_eq!(second.sessions_indexed, 0);
    }

    #[test]
    fn skips_files_removed_after_scan() {
        let root = temp_root("removed-after-scan");
        let source = root.join("sessions");
        std::fs::create_dir_all(&source).unwrap();
        for index in 0..99 {
            write_session(
                &source.join(format!("a-{index:03}.jsonl")),
                &format!("Message {index}"),
            );
        }
        let disappearing = source.join("z-disappearing.jsonl");
        write_session(&disappearing, "This file disappears during indexing");
        let store = Store::open(root.join("index.sqlite")).unwrap();

        let mut removed = false;
        let report = index_sources_with_progress(&store, &[source], |report| {
            if report.files_seen == 0 && !removed {
                std::fs::remove_file(&disappearing).unwrap();
                removed = true;
            }
        })
        .unwrap();

        assert_eq!(report.files_seen, 100);
        assert_eq!(report.files_total, 100);
        assert_eq!(report.files_skipped, 1);
        assert_eq!(report.skipped_missing, 1);
        assert_eq!(report.sessions_indexed, 99);
    }

    #[test]
    fn reports_current_file_before_processing() {
        let root = temp_root("current-before-processing");
        let source = root.join("sessions");
        std::fs::create_dir_all(&source).unwrap();
        let session_file = source.join("session.jsonl");
        write_session(&session_file, "Current file");
        let store = Store::open(root.join("index.sqlite")).unwrap();

        let mut saw_current_before_index = false;
        index_sources_with_progress(&store, &[source], |report| {
            if report.current_file.as_ref() == Some(&session_file) && report.sessions_indexed == 0 {
                saw_current_before_index = true;
            }
        })
        .unwrap();

        assert!(saw_current_before_index);
    }
}