Skip to main content

lean_ctx/core/
index_orchestrator.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::{Arc, Mutex, OnceLock};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::Serialize;
7
8use crate::core::bm25_index::BM25Index;
9use crate::core::graph_index::{self, ProjectIndex};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12enum State {
13    Idle,
14    Building,
15    Ready,
16    Failed,
17}
18
19#[derive(Debug, Clone)]
20struct Component {
21    state: State,
22    started_ms: Option<u64>,
23    finished_ms: Option<u64>,
24    duration_ms: Option<u64>,
25    last_error: Option<String>,
26}
27
28impl Component {
29    fn new() -> Self {
30        Self {
31            state: State::Idle,
32            started_ms: None,
33            finished_ms: None,
34            duration_ms: None,
35            last_error: None,
36        }
37    }
38}
39
40#[derive(Debug)]
41struct ProjectBuild {
42    worker_running: bool,
43    graph: Component,
44    bm25: Component,
45}
46
47impl ProjectBuild {
48    fn new() -> Self {
49        Self {
50            worker_running: false,
51            graph: Component::new(),
52            bm25: Component::new(),
53        }
54    }
55}
56
57// Lock ordering (see rust/LOCK_ORDERING.md):
58//   L1 = REGISTRY outer Mutex  (the HashMap guard)
59//   L2 = per-project Arc<Mutex<ProjectBuild>>  (inner guard)
60//
61// Invariant: L1 must NEVER be held while locking L2.
62// `entry_for()` enforces this by cloning the Arc and dropping L1 before
63// the caller acquires L2.
64static REGISTRY: OnceLock<Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>>> = OnceLock::new();
65
66fn registry() -> &'static Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>> {
67    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
68}
69
70fn entry_for(project_root: &str) -> Arc<Mutex<ProjectBuild>> {
71    let mut map = registry()
72        .lock()
73        .unwrap_or_else(std::sync::PoisonError::into_inner);
74    map.entry(project_root.to_string())
75        .or_insert_with(|| Arc::new(Mutex::new(ProjectBuild::new())))
76        .clone()
77}
78
79fn now_ms() -> u64 {
80    SystemTime::now()
81        .duration_since(UNIX_EPOCH)
82        .unwrap_or_default()
83        .as_millis() as u64
84}
85
86fn start_component(c: &mut Component) {
87    c.state = State::Building;
88    c.started_ms = Some(now_ms());
89    c.finished_ms = None;
90    c.duration_ms = None;
91    c.last_error = None;
92}
93
94fn finish_ok(c: &mut Component) {
95    c.state = State::Ready;
96    let end = now_ms();
97    c.finished_ms = Some(end);
98    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
99}
100
101fn finish_err(c: &mut Component, e: String) {
102    c.state = State::Failed;
103    let end = now_ms();
104    c.finished_ms = Some(end);
105    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
106    c.last_error = Some(e);
107}
108
109pub fn ensure_all_background(project_root: &str) {
110    let state = entry_for(project_root);
111    let should_spawn = {
112        let mut s = state
113            .lock()
114            .unwrap_or_else(std::sync::PoisonError::into_inner);
115        if s.worker_running {
116            false
117        } else {
118            s.worker_running = true;
119            true
120        }
121    };
122
123    if !should_spawn {
124        return;
125    }
126
127    let root = project_root.to_string();
128    std::thread::spawn(move || {
129        let state = entry_for(&root);
130
131        // Pre-warm the resident line-search index in parallel (own thread,
132        // deduped internally) so the first ctx_search hits the fast path.
133        crate::core::search_index::ensure_background(&root, true, false);
134
135        // Phase 1: Graph index — may produce a content cache from the file walk
136        {
137            let mut s = state
138                .lock()
139                .unwrap_or_else(std::sync::PoisonError::into_inner);
140            start_component(&mut s.graph);
141        }
142        let graph_result = std::panic::catch_unwind(|| {
143            let (idx, content_cache) = graph_index::scan_with_content_cache(&root);
144            // JSON index write is kept for backward compatibility with remaining
145            // direct ProjectIndex consumers. Will be removed when all consumers
146            // are migrated to GraphProvider/PropertyGraph. (OPT-14/15 Phase 6)
147            let _ = idx.save();
148            (idx, content_cache)
149        });
150        let content_cache = if let Ok((_idx, cache)) = graph_result {
151            let mut s = state
152                .lock()
153                .unwrap_or_else(std::sync::PoisonError::into_inner);
154            finish_ok(&mut s.graph);
155            cache
156        } else {
157            let mut s = state
158                .lock()
159                .unwrap_or_else(std::sync::PoisonError::into_inner);
160            finish_err(&mut s.graph, "graph index build panicked".to_string());
161            HashMap::new()
162        };
163
164        // Phase 2: BM25 index — reuses content from graph scan when available
165        {
166            let mut s = state
167                .lock()
168                .unwrap_or_else(std::sync::PoisonError::into_inner);
169            start_component(&mut s.bm25);
170        }
171        let bm = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
172            let root_pb = Path::new(&root);
173            let idx = if content_cache.is_empty() {
174                BM25Index::load_or_build(root_pb)
175            } else {
176                BM25Index::build_with_content_hint(root_pb, &content_cache)
177            };
178            let _ = idx.save(root_pb);
179        }));
180        if let Ok(()) = bm {
181            let mut s = state
182                .lock()
183                .unwrap_or_else(std::sync::PoisonError::into_inner);
184            finish_ok(&mut s.bm25);
185        } else {
186            let mut s = state
187                .lock()
188                .unwrap_or_else(std::sync::PoisonError::into_inner);
189            finish_err(&mut s.bm25, "bm25 build panicked".to_string());
190        }
191
192        let mut s = state
193            .lock()
194            .unwrap_or_else(std::sync::PoisonError::into_inner);
195        s.worker_running = false;
196    });
197}
198
199pub fn try_load_graph_index(project_root: &str) -> Option<ProjectIndex> {
200    // Resident cache: avoids re-reading + zstd-decompressing + serde-parsing the
201    // on-disk index on every graph-touching query. Returns an in-memory clone.
202    crate::core::graph_cache::get_cached(project_root).map(|arc| (*arc).clone())
203}
204
205pub fn try_load_bm25_index(project_root: &str) -> Option<BM25Index> {
206    BM25Index::load(Path::new(project_root))
207}
208
209/// Returns true if any project is currently building its indices.
210pub fn is_building() -> bool {
211    let map = registry()
212        .lock()
213        .unwrap_or_else(std::sync::PoisonError::into_inner);
214    map.values().any(|entry| {
215        let s = entry
216            .lock()
217            .unwrap_or_else(std::sync::PoisonError::into_inner);
218        matches!(s.bm25.state, State::Building) || matches!(s.graph.state, State::Building)
219    })
220}
221
222#[derive(Debug, Serialize)]
223struct ComponentStatus<'a> {
224    state: &'a str,
225    started_ms: Option<u64>,
226    finished_ms: Option<u64>,
227    duration_ms: Option<u64>,
228    last_error: Option<&'a str>,
229}
230
231fn component_status(c: &Component) -> ComponentStatus<'_> {
232    ComponentStatus {
233        state: match c.state {
234            State::Idle => "idle",
235            State::Building => "building",
236            State::Ready => "ready",
237            State::Failed => "failed",
238        },
239        started_ms: c.started_ms,
240        finished_ms: c.finished_ms,
241        duration_ms: c.duration_ms,
242        last_error: c.last_error.as_deref(),
243    }
244}
245
246#[derive(Debug, Serialize)]
247struct StatusResponse<'a> {
248    project_root: &'a str,
249    graph_index: ComponentStatus<'a>,
250    bm25_index: ComponentStatus<'a>,
251    disk: DiskStatusAll,
252}
253
254#[derive(Debug, Serialize, Default)]
255pub struct DiskStatus {
256    pub exists: bool,
257    pub size_bytes: Option<u64>,
258    pub file_count: Option<u64>,
259    pub modified_at: Option<String>,
260}
261
262#[derive(Debug, Serialize, Default)]
263pub struct DiskStatusAll {
264    pub graph_index: DiskStatus,
265    pub bm25_index: DiskStatus,
266    pub code_graph: DiskStatus,
267}
268
269fn disk_status_for_graph(project_root: &str) -> DiskStatus {
270    let Some(dir) = graph_index::ProjectIndex::index_dir(project_root) else {
271        return DiskStatus::default();
272    };
273    let zst = dir.join("index.json.zst");
274    let json = dir.join("index.json");
275    let path = if zst.exists() {
276        zst
277    } else if json.exists() {
278        json
279    } else {
280        return DiskStatus::default();
281    };
282    let meta = std::fs::metadata(&path).ok();
283    let file_count =
284        graph_index::ProjectIndex::load(project_root).map(|idx| idx.files.len() as u64);
285    DiskStatus {
286        exists: true,
287        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
288        file_count,
289        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
290    }
291}
292
293fn disk_status_for_bm25(project_root: &str) -> DiskStatus {
294    let root = Path::new(project_root);
295    let path = BM25Index::index_file_path(root);
296    if !path.exists() {
297        return DiskStatus::default();
298    }
299    let meta = std::fs::metadata(&path).ok();
300    DiskStatus {
301        exists: true,
302        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
303        file_count: None,
304        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
305    }
306}
307
308fn disk_status_for_code_graph(project_root: &str) -> DiskStatus {
309    let dir = crate::core::property_graph::graph_dir(project_root);
310    let db_path = dir.join("graph.db");
311    if !db_path.exists() {
312        return DiskStatus::default();
313    }
314    let meta = std::fs::metadata(&db_path).ok();
315    let node_count = crate::core::property_graph::CodeGraph::open(project_root)
316        .ok()
317        .and_then(|g| {
318            g.connection()
319                .query_row("SELECT count(*) FROM nodes", [], |r| r.get::<_, i64>(0))
320                .ok()
321                .map(|c| c as u64)
322        });
323    DiskStatus {
324        exists: true,
325        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
326        file_count: node_count,
327        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
328    }
329}
330
331fn format_time(t: SystemTime) -> String {
332    let secs = t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
333    let dt = chrono::DateTime::from_timestamp(secs as i64, 0);
334    dt.map_or_else(
335        || format!("{secs}"),
336        |d| d.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
337    )
338}
339
340pub fn disk_status(project_root: &str) -> DiskStatusAll {
341    DiskStatusAll {
342        graph_index: disk_status_for_graph(project_root),
343        bm25_index: disk_status_for_bm25(project_root),
344        code_graph: disk_status_for_code_graph(project_root),
345    }
346}
347
348pub fn status_json(project_root: &str) -> String {
349    let state = entry_for(project_root);
350    let s = state
351        .lock()
352        .unwrap_or_else(std::sync::PoisonError::into_inner);
353    let res = StatusResponse {
354        project_root,
355        graph_index: component_status(&s.graph),
356        bm25_index: component_status(&s.bm25),
357        disk: disk_status(project_root),
358    };
359    serde_json::to_string(&res).unwrap_or_else(|_| "{}".to_string())
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn status_json_is_valid_json() {
368        let s = status_json("/tmp");
369        let _: serde_json::Value = serde_json::from_str(&s).unwrap();
370    }
371}