1use crate::parser::parse_session_file;
2use crate::store::{build_session_key, Store};
3use anyhow::{Context, Result};
4use std::fs;
5use std::io::ErrorKind;
6use std::path::{Path, PathBuf};
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct IndexReport {
11 pub files_total: usize,
12 pub files_seen: usize,
13 pub files_skipped: usize,
14 pub skipped_unchanged: usize,
15 pub skipped_missing: usize,
16 pub skipped_non_session: usize,
17 pub sessions_indexed: usize,
18 pub events_indexed: usize,
19 pub bytes_total: u64,
20 pub bytes_seen: u64,
21 pub current_file: Option<PathBuf>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct SourceScanReport {
26 pub files_total: usize,
27 pub pending_files: usize,
28 pub pending_bytes: u64,
29 pub stable_pending_files: usize,
30 pub waiting_files: usize,
31 pub missing_sources: Vec<PathBuf>,
32}
33
34pub fn index_sources(store: &Store, sources: &[PathBuf]) -> Result<IndexReport> {
35 index_sources_with_progress(store, sources, |_| {})
36}
37
38pub fn scan_sources_for_pending(
39 store: Option<&Store>,
40 sources: &[PathBuf],
41 quiet_for: Duration,
42) -> Result<SourceScanReport> {
43 let now = SystemTime::now();
44 let mut report = SourceScanReport {
45 files_total: 0,
46 pending_files: 0,
47 pending_bytes: 0,
48 stable_pending_files: 0,
49 waiting_files: 0,
50 missing_sources: Vec::new(),
51 };
52
53 for source in sources {
54 if !source.exists() {
55 report.missing_sources.push(source.clone());
56 continue;
57 }
58
59 for path in jsonl_files(source)? {
60 report.files_total += 1;
61 let file_state = match FileState::from_path(&path) {
62 Ok(file_state) => file_state,
63 Err(error) if is_not_found_error(&error) => continue,
64 Err(error) => return Err(error),
65 };
66 let is_current = if let Some(store) = store {
67 store.is_source_current(
68 &path,
69 file_state.source_file_mtime_ns,
70 file_state.source_file_size,
71 )?
72 } else {
73 false
74 };
75
76 if is_current {
77 continue;
78 }
79
80 report.pending_files += 1;
81 report.pending_bytes = report
82 .pending_bytes
83 .saturating_add(file_state.source_file_size as u64);
84 if is_stable(now, file_state.modified, quiet_for) {
85 report.stable_pending_files += 1;
86 } else {
87 report.waiting_files += 1;
88 }
89 }
90 }
91
92 Ok(report)
93}
94
95pub fn index_sources_with_progress<F>(
96 store: &Store,
97 sources: &[PathBuf],
98 mut on_progress: F,
99) -> Result<IndexReport>
100where
101 F: FnMut(&IndexReport),
102{
103 let mut files = Vec::new();
104 for source in sources {
105 files.extend(jsonl_files(source)?);
106 }
107 files.sort();
108
109 let mut report = IndexReport {
110 files_total: files.len(),
111 files_seen: 0,
112 files_skipped: 0,
113 skipped_unchanged: 0,
114 skipped_missing: 0,
115 skipped_non_session: 0,
116 sessions_indexed: 0,
117 events_indexed: 0,
118 bytes_total: total_known_bytes(&files),
119 bytes_seen: 0,
120 current_file: None,
121 };
122
123 on_progress(&report);
124
125 for path in files {
126 report.current_file = Some(path.clone());
127
128 let file_state = match FileState::from_path(&path) {
129 Ok(file_state) => file_state,
130 Err(error) if is_not_found_error(&error) => {
131 report.files_seen += 1;
132 report.files_skipped += 1;
133 report.skipped_missing += 1;
134 if should_report_after_file(&report) {
135 on_progress(&report);
136 }
137 continue;
138 }
139 Err(error) => return Err(error),
140 };
141 report.bytes_seen = report
142 .bytes_seen
143 .saturating_add(file_state.source_file_size as u64);
144
145 if store.is_source_current(
146 &path,
147 file_state.source_file_mtime_ns,
148 file_state.source_file_size,
149 )? {
150 report.files_seen += 1;
151 report.files_skipped += 1;
152 report.skipped_unchanged += 1;
153 if should_report_after_file(&report) {
154 on_progress(&report);
155 }
156 continue;
157 }
158
159 on_progress(&report);
160
161 let parsed = match parse_session_file(&path) {
162 Ok(parsed) => parsed,
163 Err(error) if is_not_found_error(&error) => {
164 report.files_seen += 1;
165 report.files_skipped += 1;
166 report.skipped_missing += 1;
167 if should_report_after_file(&report) {
168 on_progress(&report);
169 }
170 continue;
171 }
172 Err(error) => return Err(error),
173 };
174
175 if let Some(parsed) = parsed {
176 let session_key =
177 build_session_key(&parsed.session.id, &parsed.session.source_file_path);
178 report.events_indexed += parsed.events.len();
179 store.index_session(&parsed)?;
180 store.mark_source_indexed(
181 &path,
182 file_state.source_file_mtime_ns,
183 file_state.source_file_size,
184 Some(&parsed.session.id),
185 Some(&session_key),
186 )?;
187 report.sessions_indexed += 1;
188 } else {
189 store.mark_source_indexed(
190 &path,
191 file_state.source_file_mtime_ns,
192 file_state.source_file_size,
193 None,
194 None,
195 )?;
196 report.files_skipped += 1;
197 report.skipped_non_session += 1;
198 }
199
200 report.files_seen += 1;
201 if should_report_after_file(&report) {
202 on_progress(&report);
203 }
204 }
205
206 Ok(report)
207}
208
209fn should_report_after_file(report: &IndexReport) -> bool {
210 report.files_seen == 1
211 || report.files_seen.is_multiple_of(25)
212 || report.files_seen == report.files_total
213}
214
215fn total_known_bytes(files: &[PathBuf]) -> u64 {
216 files
217 .iter()
218 .filter_map(|path| fs::metadata(path).ok())
219 .map(|metadata| metadata.len())
220 .sum()
221}
222
223fn jsonl_files(root: &Path) -> Result<Vec<PathBuf>> {
224 let mut files = Vec::new();
225 collect_jsonl_files(root, &mut files)?;
226 files.sort();
227 Ok(files)
228}
229
230fn collect_jsonl_files(path: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
231 if !path.exists() {
232 return Ok(());
233 }
234
235 if path.is_file() {
236 if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
237 files.push(path.to_path_buf());
238 }
239 return Ok(());
240 }
241
242 for entry in fs::read_dir(path)? {
243 let entry = entry?;
244 let path = entry.path();
245 if path.is_dir() {
246 collect_jsonl_files(&path, files)?;
247 } else if path.extension().and_then(|ext| ext.to_str()) == Some("jsonl") {
248 files.push(path);
249 }
250 }
251
252 Ok(())
253}
254
255struct FileState {
256 source_file_mtime_ns: i64,
257 source_file_size: i64,
258 modified: SystemTime,
259}
260
261impl FileState {
262 fn from_path(path: &Path) -> Result<Self> {
263 let metadata = fs::metadata(path).with_context(|| format!("stat {}", path.display()))?;
264 let modified = metadata
265 .modified()
266 .with_context(|| format!("read mtime {}", path.display()))?;
267 let source_file_mtime_ns = modified
268 .duration_since(UNIX_EPOCH)
269 .with_context(|| format!("mtime before unix epoch {}", path.display()))?
270 .as_nanos() as i64;
271 let source_file_size = metadata.len() as i64;
272
273 Ok(Self {
274 source_file_mtime_ns,
275 source_file_size,
276 modified,
277 })
278 }
279}
280
281fn is_stable(now: SystemTime, modified: SystemTime, quiet_for: Duration) -> bool {
282 quiet_for.is_zero()
283 || now
284 .duration_since(modified)
285 .is_ok_and(|age| age >= quiet_for)
286}
287
288fn is_not_found_error(error: &anyhow::Error) -> bool {
289 error.chain().any(|cause| {
290 cause
291 .downcast_ref::<std::io::Error>()
292 .is_some_and(|io_error| io_error.kind() == ErrorKind::NotFound)
293 })
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 fn temp_root(name: &str) -> PathBuf {
301 let dir = std::env::temp_dir().join(format!(
302 "codex-recall-indexer-test-{}-{}",
303 std::process::id(),
304 name
305 ));
306 let _ = std::fs::remove_dir_all(&dir);
307 std::fs::create_dir_all(&dir).unwrap();
308 dir
309 }
310
311 fn write_session(path: &Path, message: &str) {
312 std::fs::write(
313 path,
314 format!(
315 r#"{{"timestamp":"2026-04-13T01:00:00Z","type":"session_meta","payload":{{"id":"session-1","timestamp":"2026-04-13T01:00:00Z","cwd":"/tmp"}}}}
316{{"timestamp":"2026-04-13T01:00:01Z","type":"event_msg","payload":{{"type":"user_message","message":"{message}"}}}}
317"#
318 ),
319 )
320 .unwrap();
321 }
322
323 #[test]
324 fn skips_unchanged_files_on_second_index_run() {
325 let root = temp_root("incremental");
326 let source = root.join("sessions");
327 std::fs::create_dir_all(&source).unwrap();
328 let session_file = source.join("session.jsonl");
329 write_session(&session_file, "First message");
330 let store = Store::open(root.join("index.sqlite")).unwrap();
331
332 let first = index_sources(&store, std::slice::from_ref(&source)).unwrap();
333 let second = index_sources(&store, &[source]).unwrap();
334
335 assert_eq!(first.files_seen, 1);
336 assert_eq!(first.files_total, 1);
337 assert!(first.bytes_total > 0);
338 assert_eq!(first.bytes_seen, first.bytes_total);
339 assert_eq!(first.current_file, Some(session_file));
340 assert_eq!(first.files_skipped, 0);
341 assert_eq!(first.sessions_indexed, 1);
342 assert_eq!(second.files_seen, 1);
343 assert_eq!(second.files_total, 1);
344 assert_eq!(second.files_skipped, 1);
345 assert_eq!(second.skipped_unchanged, 1);
346 assert_eq!(second.skipped_missing, 0);
347 assert_eq!(second.skipped_non_session, 0);
348 assert_eq!(second.sessions_indexed, 0);
349 }
350
351 #[test]
352 fn skips_files_removed_after_scan() {
353 let root = temp_root("removed-after-scan");
354 let source = root.join("sessions");
355 std::fs::create_dir_all(&source).unwrap();
356 for index in 0..99 {
357 write_session(
358 &source.join(format!("a-{index:03}.jsonl")),
359 &format!("Message {index}"),
360 );
361 }
362 let disappearing = source.join("z-disappearing.jsonl");
363 write_session(&disappearing, "This file disappears during indexing");
364 let store = Store::open(root.join("index.sqlite")).unwrap();
365
366 let mut removed = false;
367 let report = index_sources_with_progress(&store, &[source], |report| {
368 if report.files_seen == 0 && !removed {
369 std::fs::remove_file(&disappearing).unwrap();
370 removed = true;
371 }
372 })
373 .unwrap();
374
375 assert_eq!(report.files_seen, 100);
376 assert_eq!(report.files_total, 100);
377 assert_eq!(report.files_skipped, 1);
378 assert_eq!(report.skipped_missing, 1);
379 assert_eq!(report.sessions_indexed, 99);
380 }
381
382 #[test]
383 fn reports_current_file_before_processing() {
384 let root = temp_root("current-before-processing");
385 let source = root.join("sessions");
386 std::fs::create_dir_all(&source).unwrap();
387 let session_file = source.join("session.jsonl");
388 write_session(&session_file, "Current file");
389 let store = Store::open(root.join("index.sqlite")).unwrap();
390
391 let mut saw_current_before_index = false;
392 index_sources_with_progress(&store, &[source], |report| {
393 if report.current_file.as_ref() == Some(&session_file) && report.sessions_indexed == 0 {
394 saw_current_before_index = true;
395 }
396 })
397 .unwrap();
398
399 assert!(saw_current_before_index);
400 }
401}