Skip to main content

lean_ctx/core/
index_orchestrator.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::{Arc, Mutex, OnceLock};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::Serialize;
7
8use crate::core::bm25_index::BM25Index;
9use crate::core::graph_index::{self, ProjectIndex};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12enum State {
13    Idle,
14    Building,
15    Ready,
16    Failed,
17}
18
19#[derive(Debug, Clone)]
20struct Component {
21    state: State,
22    started_ms: Option<u64>,
23    finished_ms: Option<u64>,
24    duration_ms: Option<u64>,
25    last_error: Option<String>,
26    /// Human-readable outcome detail surfaced to operators (e.g. doc count +
27    /// persisted size, or the "not persisted: too large …" remedy). Independent
28    /// of `last_error` so a *successful* build can still carry a warning note.
29    note: Option<String>,
30}
31
32impl Component {
33    fn new() -> Self {
34        Self {
35            state: State::Idle,
36            started_ms: None,
37            finished_ms: None,
38            duration_ms: None,
39            last_error: None,
40            note: None,
41        }
42    }
43}
44
45#[derive(Debug)]
46struct ProjectBuild {
47    worker_running: bool,
48    graph: Component,
49    bm25: Component,
50}
51
52impl ProjectBuild {
53    fn new() -> Self {
54        Self {
55            worker_running: false,
56            graph: Component::new(),
57            bm25: Component::new(),
58        }
59    }
60}
61
62// Lock ordering (see rust/LOCK_ORDERING.md):
63//   L1 = REGISTRY outer Mutex  (the HashMap guard)
64//   L2 = per-project Arc<Mutex<ProjectBuild>>  (inner guard)
65//
66// Invariant: L1 must NEVER be held while locking L2.
67// `entry_for()` enforces this by cloning the Arc and dropping L1 before
68// the caller acquires L2.
69static REGISTRY: OnceLock<Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>>> = OnceLock::new();
70
71fn registry() -> &'static Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>> {
72    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
73}
74
75fn entry_for(project_root: &str) -> Arc<Mutex<ProjectBuild>> {
76    let mut map = registry()
77        .lock()
78        .unwrap_or_else(std::sync::PoisonError::into_inner);
79    map.entry(project_root.to_string())
80        .or_insert_with(|| Arc::new(Mutex::new(ProjectBuild::new())))
81        .clone()
82}
83
84fn now_ms() -> u64 {
85    SystemTime::now()
86        .duration_since(UNIX_EPOCH)
87        .unwrap_or_default()
88        .as_millis() as u64
89}
90
91fn start_component(c: &mut Component) {
92    c.state = State::Building;
93    c.started_ms = Some(now_ms());
94    c.finished_ms = None;
95    c.duration_ms = None;
96    c.last_error = None;
97    c.note = None;
98}
99
100fn finish_ok(c: &mut Component) {
101    c.state = State::Ready;
102    let end = now_ms();
103    c.finished_ms = Some(end);
104    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
105}
106
107fn finish_err(c: &mut Component, e: String) {
108    c.state = State::Failed;
109    let end = now_ms();
110    c.finished_ms = Some(end);
111    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
112    c.last_error = Some(e);
113}
114
115pub fn ensure_all_background(project_root: &str) {
116    let state = entry_for(project_root);
117    let should_spawn = {
118        let mut s = state
119            .lock()
120            .unwrap_or_else(std::sync::PoisonError::into_inner);
121        if s.worker_running {
122            false
123        } else {
124            s.worker_running = true;
125            true
126        }
127    };
128
129    if !should_spawn {
130        return;
131    }
132
133    let root = project_root.to_string();
134    std::thread::spawn(move || {
135        let state = entry_for(&root);
136
137        // Pre-warm the resident line-search index in parallel (own thread,
138        // deduped internally) so the first ctx_search hits the fast path.
139        crate::core::search_index::ensure_background(&root, true, false);
140
141        // Phase 1: Graph index — may produce a content cache from the file walk
142        {
143            let mut s = state
144                .lock()
145                .unwrap_or_else(std::sync::PoisonError::into_inner);
146            start_component(&mut s.graph);
147        }
148        let graph_result = std::panic::catch_unwind(|| {
149            let (idx, content_cache) = graph_index::scan_with_content_cache(&root);
150            // JSON index write is kept for backward compatibility with remaining
151            // direct ProjectIndex consumers. Will be removed when all consumers
152            // are migrated to GraphProvider/PropertyGraph. (OPT-14/15 Phase 6)
153            let _ = idx.save();
154            (idx, content_cache)
155        });
156        let content_cache = if let Ok((_idx, cache)) = graph_result {
157            let mut s = state
158                .lock()
159                .unwrap_or_else(std::sync::PoisonError::into_inner);
160            finish_ok(&mut s.graph);
161            cache
162        } else {
163            let mut s = state
164                .lock()
165                .unwrap_or_else(std::sync::PoisonError::into_inner);
166            finish_err(&mut s.graph, "graph index build panicked".to_string());
167            HashMap::new()
168        };
169
170        // Phase 2: BM25 index — reuses content from graph scan when available
171        {
172            let mut s = state
173                .lock()
174                .unwrap_or_else(std::sync::PoisonError::into_inner);
175            start_component(&mut s.bm25);
176        }
177        let bm = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
178            let root_pb = Path::new(&root);
179            let idx = if content_cache.is_empty() {
180                BM25Index::load_or_build(root_pb)
181            } else {
182                BM25Index::build_with_content_hint(root_pb, &content_cache)
183            };
184            let outcome = idx.save(root_pb);
185            (idx.doc_count, outcome)
186        }));
187        if let Ok((doc_count, save_res)) = bm {
188            let mut s = state
189                .lock()
190                .unwrap_or_else(std::sync::PoisonError::into_inner);
191            finish_ok(&mut s.bm25);
192            s.bm25.note = Some(bm25_build_note(doc_count, &save_res));
193        } else {
194            let mut s = state
195                .lock()
196                .unwrap_or_else(std::sync::PoisonError::into_inner);
197            finish_err(&mut s.bm25, "bm25 build panicked".to_string());
198        }
199
200        let mut s = state
201            .lock()
202            .unwrap_or_else(std::sync::PoisonError::into_inner);
203        s.worker_running = false;
204    });
205}
206
207/// Ensure background indexing for all extra roots (in addition to the primary).
208/// Each extra root that is not a subdirectory of `primary_root` gets its own
209/// graph + BM25 index. Capped at `MAX_EXTRA_ROOT_BUILDS` to prevent runaway.
210const MAX_EXTRA_ROOT_BUILDS: usize = 8;
211
212pub fn ensure_extra_roots_background(primary_root: &str, extra_roots: &[String]) {
213    let primary = Path::new(primary_root);
214    let mut built = 0;
215    for root in extra_roots {
216        if built >= MAX_EXTRA_ROOT_BUILDS {
217            break;
218        }
219        let rp = Path::new(root);
220        if !rp.is_dir() {
221            continue;
222        }
223        // Skip if extra_root is inside primary (already indexed by the primary scan)
224        if rp.starts_with(primary) {
225            continue;
226        }
227        // Skip if primary is inside this extra_root (avoid double-indexing the parent)
228        if primary.starts_with(rp) {
229            continue;
230        }
231        ensure_all_background(root);
232        built += 1;
233    }
234}
235
236/// Build a human-readable outcome note for a finished BM25 build, including the
237/// indexed chunk count and whether the index was persisted to disk. A
238/// "too large" refusal carries the exact remedy so the operator (or agent) is
239/// never left guessing why search/ranking stays cold (issue #249).
240fn bm25_build_note(
241    doc_count: usize,
242    save: &std::io::Result<crate::core::bm25_index::SaveOutcome>,
243) -> String {
244    use crate::core::bm25_index::SaveOutcome;
245    match save {
246        Ok(SaveOutcome::Persisted { compressed_bytes }) => format!(
247            "indexed {doc_count} chunks, {:.1} MB persisted",
248            *compressed_bytes as f64 / 1_048_576.0
249        ),
250        Ok(SaveOutcome::SkippedTooLarge {
251            compressed_bytes,
252            limit_bytes,
253        }) => format!(
254            "indexed {doc_count} chunks but NOT persisted to disk: compressed {:.1} MB exceeds the {:.0} MB cap. \
255             Raise it via LEAN_CTX_BM25_MAX_CACHE_MB (or bm25_max_cache_mb in config) or add extra_ignore_patterns, \
256             then run `lean-ctx reindex`. Until then the index is rebuilt from scratch on every cold start.",
257            *compressed_bytes as f64 / 1_048_576.0,
258            *limit_bytes as f64 / 1_048_576.0
259        ),
260        Err(e) => format!("indexed {doc_count} chunks but persisting failed: {e}"),
261    }
262}
263
264/// Lightweight, allocation-frugal snapshot of the BM25 component for the
265/// in-call composer/search messaging. Avoids the heavier [`disk_status`] walk.
266#[derive(Debug, Clone)]
267pub struct Bm25Summary {
268    pub state: &'static str,
269    /// While building: elapsed so far. Otherwise: last build duration.
270    pub elapsed_ms: Option<u64>,
271    pub note: Option<String>,
272    pub last_error: Option<String>,
273}
274
275pub fn bm25_summary(project_root: &str) -> Bm25Summary {
276    let entry = entry_for(project_root);
277    let s = entry
278        .lock()
279        .unwrap_or_else(std::sync::PoisonError::into_inner);
280    let c = &s.bm25;
281    let elapsed_ms = if matches!(c.state, State::Building) {
282        c.started_ms.map(|start| now_ms().saturating_sub(start))
283    } else {
284        c.duration_ms
285    };
286    Bm25Summary {
287        state: match c.state {
288            State::Idle => "idle",
289            State::Building => "building",
290            State::Ready => "ready",
291            State::Failed => "failed",
292        },
293        elapsed_ms,
294        note: c.note.clone(),
295        last_error: c.last_error.clone(),
296    }
297}
298
299pub fn try_load_graph_index(project_root: &str) -> Option<ProjectIndex> {
300    // Resident cache: avoids re-reading + zstd-decompressing + serde-parsing the
301    // on-disk index on every graph-touching query. Returns an in-memory clone.
302    crate::core::graph_cache::get_cached(project_root).map(|arc| (*arc).clone())
303}
304
305pub fn try_load_bm25_index(project_root: &str) -> Option<BM25Index> {
306    BM25Index::load(Path::new(project_root))
307}
308
309/// Returns true if any project is currently building its indices.
310pub fn is_building() -> bool {
311    let map = registry()
312        .lock()
313        .unwrap_or_else(std::sync::PoisonError::into_inner);
314    map.values().any(|entry| {
315        let s = entry
316            .lock()
317            .unwrap_or_else(std::sync::PoisonError::into_inner);
318        matches!(s.bm25.state, State::Building) || matches!(s.graph.state, State::Building)
319    })
320}
321
322#[derive(Debug, Serialize)]
323struct ComponentStatus<'a> {
324    state: &'a str,
325    started_ms: Option<u64>,
326    finished_ms: Option<u64>,
327    duration_ms: Option<u64>,
328    last_error: Option<&'a str>,
329    #[serde(skip_serializing_if = "Option::is_none")]
330    note: Option<&'a str>,
331}
332
333fn component_status(c: &Component) -> ComponentStatus<'_> {
334    ComponentStatus {
335        state: match c.state {
336            State::Idle => "idle",
337            State::Building => "building",
338            State::Ready => "ready",
339            State::Failed => "failed",
340        },
341        started_ms: c.started_ms,
342        finished_ms: c.finished_ms,
343        duration_ms: c.duration_ms,
344        last_error: c.last_error.as_deref(),
345        note: c.note.as_deref(),
346    }
347}
348
349#[derive(Debug, Serialize)]
350struct StatusResponse<'a> {
351    project_root: &'a str,
352    graph_index: ComponentStatus<'a>,
353    bm25_index: ComponentStatus<'a>,
354    disk: DiskStatusAll,
355}
356
357#[derive(Debug, Serialize, Default)]
358pub struct DiskStatus {
359    pub exists: bool,
360    pub size_bytes: Option<u64>,
361    pub file_count: Option<u64>,
362    pub modified_at: Option<String>,
363}
364
365#[derive(Debug, Serialize, Default)]
366pub struct DiskStatusAll {
367    pub graph_index: DiskStatus,
368    pub bm25_index: DiskStatus,
369    pub code_graph: DiskStatus,
370}
371
372fn disk_status_for_graph(project_root: &str) -> DiskStatus {
373    let Some(dir) = graph_index::ProjectIndex::index_dir(project_root) else {
374        return DiskStatus::default();
375    };
376    let zst = dir.join("index.json.zst");
377    let json = dir.join("index.json");
378    let path = if zst.exists() {
379        zst
380    } else if json.exists() {
381        json
382    } else {
383        return DiskStatus::default();
384    };
385    let meta = std::fs::metadata(&path).ok();
386    let file_count =
387        graph_index::ProjectIndex::load(project_root).map(|idx| idx.files.len() as u64);
388    DiskStatus {
389        exists: true,
390        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
391        file_count,
392        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
393    }
394}
395
396fn disk_status_for_bm25(project_root: &str) -> DiskStatus {
397    let root = Path::new(project_root);
398    let path = BM25Index::index_file_path(root);
399    if !path.exists() {
400        return DiskStatus::default();
401    }
402    let meta = std::fs::metadata(&path).ok();
403    DiskStatus {
404        exists: true,
405        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
406        file_count: None,
407        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
408    }
409}
410
411fn disk_status_for_code_graph(project_root: &str) -> DiskStatus {
412    let dir = crate::core::property_graph::graph_dir(project_root);
413    let db_path = dir.join("graph.db");
414    if !db_path.exists() {
415        return DiskStatus::default();
416    }
417    let meta = std::fs::metadata(&db_path).ok();
418    let node_count = crate::core::property_graph::CodeGraph::open(project_root)
419        .ok()
420        .and_then(|g| {
421            g.connection()
422                .query_row("SELECT count(*) FROM nodes", [], |r| r.get::<_, i64>(0))
423                .ok()
424                .map(|c| c as u64)
425        });
426    DiskStatus {
427        exists: true,
428        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
429        file_count: node_count,
430        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
431    }
432}
433
434fn format_time(t: SystemTime) -> String {
435    let secs = t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
436    let dt = chrono::DateTime::from_timestamp(secs as i64, 0);
437    dt.map_or_else(
438        || format!("{secs}"),
439        |d| d.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
440    )
441}
442
443pub fn disk_status(project_root: &str) -> DiskStatusAll {
444    DiskStatusAll {
445        graph_index: disk_status_for_graph(project_root),
446        bm25_index: disk_status_for_bm25(project_root),
447        code_graph: disk_status_for_code_graph(project_root),
448    }
449}
450
451pub fn status_json(project_root: &str) -> String {
452    let state = entry_for(project_root);
453    let s = state
454        .lock()
455        .unwrap_or_else(std::sync::PoisonError::into_inner);
456    let res = StatusResponse {
457        project_root,
458        graph_index: component_status(&s.graph),
459        bm25_index: component_status(&s.bm25),
460        disk: disk_status(project_root),
461    };
462    serde_json::to_string(&res).unwrap_or_else(|_| "{}".to_string())
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn status_json_is_valid_json() {
471        let s = status_json("/tmp");
472        let _: serde_json::Value = serde_json::from_str(&s).unwrap();
473    }
474
475    #[test]
476    fn build_note_persisted_reports_size() {
477        let note = bm25_build_note(
478            42,
479            &Ok(crate::core::bm25_index::SaveOutcome::Persisted {
480                compressed_bytes: 3 * 1024 * 1024,
481            }),
482        );
483        assert!(
484            note.contains("42 chunks"),
485            "note should report chunk count: {note}"
486        );
487        assert!(
488            note.contains("persisted"),
489            "note should report persistence: {note}"
490        );
491    }
492
493    #[test]
494    fn build_note_too_large_carries_remedy() {
495        let note = bm25_build_note(
496            1000,
497            &Ok(crate::core::bm25_index::SaveOutcome::SkippedTooLarge {
498                compressed_bytes: 600 * 1024 * 1024,
499                limit_bytes: 512 * 1024 * 1024,
500            }),
501        );
502        assert!(
503            note.contains("NOT persisted"),
504            "must flag non-persistence: {note}"
505        );
506        assert!(
507            note.contains("LEAN_CTX_BM25_MAX_CACHE_MB") && note.contains("reindex"),
508            "too-large note must carry an actionable remedy: {note}"
509        );
510    }
511
512    #[test]
513    fn build_note_persist_error_is_reported() {
514        let note = bm25_build_note(7, &Err(std::io::Error::other("disk full")));
515        assert!(note.contains("persisting failed"), "note: {note}");
516        assert!(
517            note.contains("disk full"),
518            "note should include the io error: {note}"
519        );
520    }
521
522    #[test]
523    fn bm25_summary_unknown_project_is_idle() {
524        let tmp = tempfile::tempdir().unwrap();
525        let summary = bm25_summary(tmp.path().to_string_lossy().as_ref());
526        assert_eq!(summary.state, "idle");
527        assert!(summary.note.is_none());
528        assert!(summary.last_error.is_none());
529    }
530
531    #[test]
532    fn extra_roots_skips_subdirs_of_primary() {
533        let tmp = tempfile::tempdir().unwrap();
534        let primary = tmp.path().join("primary");
535        std::fs::create_dir_all(&primary).unwrap();
536        let sub = primary.join("subdir");
537        std::fs::create_dir_all(&sub).unwrap();
538        let external = tmp.path().join("external");
539        std::fs::create_dir_all(&external).unwrap();
540
541        let primary_str = primary.to_string_lossy().to_string();
542        let extra = vec![
543            sub.to_string_lossy().to_string(),
544            external.to_string_lossy().to_string(),
545        ];
546
547        // Should not panic; subdirs are skipped, external is attempted
548        ensure_extra_roots_background(&primary_str, &extra);
549    }
550
551    #[test]
552    fn extra_roots_caps_at_max() {
553        let tmp = tempfile::tempdir().unwrap();
554        let primary = tmp.path().join("primary");
555        std::fs::create_dir_all(&primary).unwrap();
556
557        let mut extra = Vec::new();
558        for i in 0..20 {
559            let d = tmp.path().join(format!("ext-{i}"));
560            std::fs::create_dir_all(&d).unwrap();
561            extra.push(d.to_string_lossy().to_string());
562        }
563
564        let primary_str = primary.to_string_lossy().to_string();
565        // Should not spawn more than MAX_EXTRA_ROOT_BUILDS threads
566        ensure_extra_roots_background(&primary_str, &extra);
567    }
568}