Skip to main content

codex_recall/
indexer.rs

1use crate::parser::parse_session_file;
2use crate::store::{build_session_key, Store};
3use anyhow::{Context, Result};
4use std::fs;
5use std::io::ErrorKind;
6use std::path::{Path, PathBuf};
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexReport {
11    pub files_total: usize,
12    pub files_seen: usize,
13    pub files_skipped: usize,
14    pub skipped_unchanged: usize,
15    pub skipped_missing: usize,
16    pub skipped_non_session: usize,
17    pub sessions_indexed: usize,
18    pub events_indexed: usize,
19    pub bytes_total: u64,
20    pub bytes_seen: u64,
21    pub current_file: Option<PathBuf>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct SourceScanReport {
26    pub files_total: usize,
27    pub pending_files: usize,
28    pub pending_bytes: u64,
29    pub stable_pending_files: usize,
30    pub waiting_files: usize,
31    pub missing_sources: Vec<PathBuf>,
32}
33
34pub fn index_sources(store: &Store, sources: &[PathBuf]) -> Result<IndexReport> {
35    index_sources_with_progress(store, sources, |_| {})
36}
37
38pub fn scan_sources_for_pending(
39    store: Option<&Store>,
40    sources: &[PathBuf],
41    quiet_for: Duration,
42) -> Result<SourceScanReport> {
43    let now = SystemTime::now();
44    let mut report = SourceScanReport {
45        files_total: 0,
46        pending_files: 0,
47        pending_bytes: 0,
48        stable_pending_files: 0,
49        waiting_files: 0,
50        missing_sources: Vec::new(),
51    };
52
53    for source in sources {
54        if !source.exists() {
55            report.missing_sources.push(source.clone());
56            continue;
57        }
58
59        for path in jsonl_files(source)? {
60            report.files_total += 1;
61            let file_state = match FileState::from_path(&path) {
62                Ok(file_state) => file_state,
63                Err(error) if is_not_found_error(&error) => continue,
64                Err(error) => return Err(error),
65            };
66            let is_current = if let Some(store) = store {
67                store.is_source_current(
68                    &path,
69                    file_state.source_file_mtime_ns,
70                    file_state.source_file_size,
71                )?
72            } else {
73                false
74            };
75
76            if is_current {
77                continue;
78            }
79
80            report.pending_files += 1;
81            report.pending_bytes = report
82                .pending_bytes
83                .saturating_add(file_state.source_file_size as u64);
84            if is_stable(now, file_state.modified, quiet_for) {
85                report.stable_pending_files += 1;
86            } else {
87                report.waiting_files += 1;
88            }
89        }
90    }
91
92    Ok(report)
93}
94
95pub fn index_sources_with_progress<F>(
96    store: &Store,
97    sources: &[PathBuf],
98    mut on_progress: F,
99) -> Result<IndexReport>
100where
101    F: FnMut(&IndexReport),
102{
103    let mut files = Vec::new();
104    for source in sources {
105        files.extend(jsonl_files(source)?);
106    }
107    files.sort();
108
109    let mut report = IndexReport {
110        files_total: files.len(),
111        files_seen: 0,
112        files_skipped: 0,
113        skipped_unchanged: 0,
114        skipped_missing: 0,
115        skipped_non_session: 0,
116        sessions_indexed: 0,
117        events_indexed: 0,
118        bytes_total: total_known_bytes(&files),
119        bytes_seen: 0,
120        current_file: None,
121    };
122
123    on_progress(&report);
124
125    for path in files {
126        report.current_file = Some(path.clone());
127
128        let file_state = match FileState::from_path(&path) {
129            Ok(file_state) => file_state,
130            Err(error) if is_not_found_error(&error) => {
131                report.files_seen += 1;
132                report.files_skipped += 1;
133                report.skipped_missing += 1;
134                if should_report_after_file(&report) {
135                    on_progress(&report);
136                }
137                continue;
138            }
139            Err(error) => return Err(error),
140        };
141        report.bytes_seen = report
142            .bytes_seen
143            .saturating_add(file_state.source_file_size as u64);
144
145        if store.is_source_current(
146            &path,
147            file_state.source_file_mtime_ns,
148            file_state.source_file_size,
149        )? {
150            report.files_seen += 1;
151            report.files_skipped += 1;
152            report.skipped_unchanged += 1;
153            if should_report_after_file(&report) {
154                on_progress(&report);
155            }
156            continue;
157        }
158
159        on_progress(&report);
160
161        let parsed = match parse_session_file(&path) {
162            Ok(parsed) => parsed,
163            Err(error) if is_not_found_error(&error) => {
164                report.files_seen += 1;
165                report.files_skipped += 1;
166                report.skipped_missing += 1;
167                if should_report_after_file(&report) {
168                    on_progress(&report);
169                }
170                continue;
171            }
172            Err(error) => return Err(error),
173        };
174
175        if let Some(parsed) = parsed {
176            let session_key =
177                build_session_key(&parsed.session.id, &parsed.session.source_file_path);
178            report.events_indexed += parsed.events.len();
179            store.index_session(&parsed)?;
180            store.mark_source_indexed(
181                &path,
182                file_state.source_file_mtime_ns,
183                file_state.source_file_size,
184                Some(&parsed.session.id),
185                Some(&session_key),
186            )?;
187            report.sessions_indexed += 1;
188        } else {
189            store.mark_source_indexed(
190                &path,
191                file_state.source_file_mtime_ns,
192                file_state.source_file_size,
193                None,
194                None,
195            )?;
196            report.files_skipped += 1;
197            report.skipped_non_session += 1;
198        }
199
200        report.files_seen += 1;
201        if should_report_after_file(&report) {
202            on_progress(&report);
203        }
204    }
205
206    Ok(report)
207}
208
209fn should_report_after_file(report: &IndexReport) -> bool {
210    report.files_seen == 1
211        || report.files_seen.is_multiple_of(25)
212        || report.files_seen == report.files_total
213}
214
215fn total_known_bytes(files: &[PathBuf]) -> u64 {
216    files
217        .iter()
218        .filter_map(|path| fs::metadata(path).ok())
219        .map(|metadata| metadata.len())
220        .sum()
221}
222
223fn jsonl_files(root: &Path) -> Result<Vec<PathBuf>> {
224    let mut files = Vec::new();
225    collect_jsonl_files(root, &mut files)?;
226    files.sort();
227    Ok(files)
228}
229
230fn collect_jsonl_files(path: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
231    if !path.exists() {
232        return Ok(());
233    }
234
235    if path.is_file() {
236        if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
237            files.push(path.to_path_buf());
238        }
239        return Ok(());
240    }
241
242    for entry in fs::read_dir(path)? {
243        let entry = entry?;
244        let path = entry.path();
245        if path.is_dir() {
246            collect_jsonl_files(&path, files)?;
247        } else if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
248            files.push(path);
249        }
250    }
251
252    Ok(())
253}
254
255struct FileState {
256    source_file_mtime_ns: i64,
257    source_file_size: i64,
258    modified: SystemTime,
259}
260
261impl FileState {
262    fn from_path(path: &Path) -> Result<Self> {
263        let metadata = fs::metadata(path).with_context(|| format!("stat {}", path.display()))?;
264        let modified = metadata
265            .modified()
266            .with_context(|| format!("read mtime {}", path.display()))?;
267        let source_file_mtime_ns = modified
268            .duration_since(UNIX_EPOCH)
269            .with_context(|| format!("mtime before unix epoch {}", path.display()))?
270            .as_nanos() as i64;
271        let source_file_size = metadata.len() as i64;
272
273        Ok(Self {
274            source_file_mtime_ns,
275            source_file_size,
276            modified,
277        })
278    }
279}
280
281fn is_stable(now: SystemTime, modified: SystemTime, quiet_for: Duration) -> bool {
282    quiet_for.is_zero()
283        || now
284            .duration_since(modified)
285            .is_ok_and(|age| age >= quiet_for)
286}
287
288fn is_not_found_error(error: &anyhow::Error) -> bool {
289    error.chain().any(|cause| {
290        cause
291            .downcast_ref::<std::io::Error>()
292            .is_some_and(|io_error| io_error.kind() == ErrorKind::NotFound)
293    })
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299
300    fn temp_root(name: &str) -> PathBuf {
301        let dir = std::env::temp_dir().join(format!(
302            "codex-recall-indexer-test-{}-{}",
303            std::process::id(),
304            name
305        ));
306        let _ = std::fs::remove_dir_all(&dir);
307        std::fs::create_dir_all(&dir).unwrap();
308        dir
309    }
310
311    fn write_session(path: &Path, message: &str) {
312        std::fs::write(
313            path,
314            format!(
315                r#"{{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{{"id":"session-1","timestamp":"2026-04-13T01:00:00Z","cwd":"/tmp"}}}}
316{{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{{"type":"user_message","message":"{message}"}}}}
317"#
318            ),
319        )
320        .unwrap();
321    }
322
323    #[test]
324    fn skips_unchanged_files_on_second_index_run() {
325        let root = temp_root("incremental");
326        let source = root.join("sessions");
327        std::fs::create_dir_all(&source).unwrap();
328        let session_file = source.join("session.jsonl");
329        write_session(&session_file, "First message");
330        let store = Store::open(root.join("index.sqlite")).unwrap();
331
332        let first = index_sources(&store, std::slice::from_ref(&source)).unwrap();
333        let second = index_sources(&store, &[source]).unwrap();
334
335        assert_eq!(first.files_seen, 1);
336        assert_eq!(first.files_total, 1);
337        assert!(first.bytes_total > 0);
338        assert_eq!(first.bytes_seen, first.bytes_total);
339        assert_eq!(first.current_file, Some(session_file));
340        assert_eq!(first.files_skipped, 0);
341        assert_eq!(first.sessions_indexed, 1);
342        assert_eq!(second.files_seen, 1);
343        assert_eq!(second.files_total, 1);
344        assert_eq!(second.files_skipped, 1);
345        assert_eq!(second.skipped_unchanged, 1);
346        assert_eq!(second.skipped_missing, 0);
347        assert_eq!(second.skipped_non_session, 0);
348        assert_eq!(second.sessions_indexed, 0);
349    }
350
351    #[test]
352    fn skips_files_removed_after_scan() {
353        let root = temp_root("removed-after-scan");
354        let source = root.join("sessions");
355        std::fs::create_dir_all(&source).unwrap();
356        for index in 0..99 {
357            write_session(
358                &source.join(format!("a-{index:03}.jsonl")),
359                &format!("Message {index}"),
360            );
361        }
362        let disappearing = source.join("z-disappearing.jsonl");
363        write_session(&disappearing, "This file disappears during indexing");
364        let store = Store::open(root.join("index.sqlite")).unwrap();
365
366        let mut removed = false;
367        let report = index_sources_with_progress(&store, &[source], |report| {
368            if report.files_seen == 0 && !removed {
369                std::fs::remove_file(&disappearing).unwrap();
370                removed = true;
371            }
372        })
373        .unwrap();
374
375        assert_eq!(report.files_seen, 100);
376        assert_eq!(report.files_total, 100);
377        assert_eq!(report.files_skipped, 1);
378        assert_eq!(report.skipped_missing, 1);
379        assert_eq!(report.sessions_indexed, 99);
380    }
381
382    #[test]
383    fn reports_current_file_before_processing() {
384        let root = temp_root("current-before-processing");
385        let source = root.join("sessions");
386        std::fs::create_dir_all(&source).unwrap();
387        let session_file = source.join("session.jsonl");
388        write_session(&session_file, "Current file");
389        let store = Store::open(root.join("index.sqlite")).unwrap();
390
391        let mut saw_current_before_index = false;
392        index_sources_with_progress(&store, &[source], |report| {
393            if report.current_file.as_ref() == Some(&session_file) && report.sessions_indexed == 0 {
394                saw_current_before_index = true;
395            }
396        })
397        .unwrap();
398
399        assert!(saw_current_before_index);
400    }
401}