Skip to main content

lean_ctx/core/
index_orchestrator.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::{Arc, Mutex, OnceLock};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::Serialize;
7
8use crate::core::bm25_index::BM25Index;
9use crate::core::graph_index::{self, ProjectIndex};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12enum State {
13    Idle,
14    Building,
15    Ready,
16    Failed,
17}
18
19#[derive(Debug, Clone)]
20struct Component {
21    state: State,
22    started_ms: Option<u64>,
23    finished_ms: Option<u64>,
24    duration_ms: Option<u64>,
25    last_error: Option<String>,
26}
27
28impl Component {
29    fn new() -> Self {
30        Self {
31            state: State::Idle,
32            started_ms: None,
33            finished_ms: None,
34            duration_ms: None,
35            last_error: None,
36        }
37    }
38}
39
40#[derive(Debug)]
41struct ProjectBuild {
42    worker_running: bool,
43    graph: Component,
44    bm25: Component,
45}
46
47impl ProjectBuild {
48    fn new() -> Self {
49        Self {
50            worker_running: false,
51            graph: Component::new(),
52            bm25: Component::new(),
53        }
54    }
55}
56
57// Lock ordering (see rust/LOCK_ORDERING.md):
58//   L1 = REGISTRY outer Mutex  (the HashMap guard)
59//   L2 = per-project Arc<Mutex<ProjectBuild>>  (inner guard)
60//
61// Invariant: L1 must NEVER be held while locking L2.
62// `entry_for()` enforces this by cloning the Arc and dropping L1 before
63// the caller acquires L2.
64static REGISTRY: OnceLock<Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>>> = OnceLock::new();
65
66fn registry() -> &'static Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>> {
67    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
68}
69
70fn entry_for(project_root: &str) -> Arc<Mutex<ProjectBuild>> {
71    let mut map = registry()
72        .lock()
73        .unwrap_or_else(std::sync::PoisonError::into_inner);
74    map.entry(project_root.to_string())
75        .or_insert_with(|| Arc::new(Mutex::new(ProjectBuild::new())))
76        .clone()
77}
78
79fn now_ms() -> u64 {
80    SystemTime::now()
81        .duration_since(UNIX_EPOCH)
82        .unwrap_or_default()
83        .as_millis() as u64
84}
85
86fn start_component(c: &mut Component) {
87    c.state = State::Building;
88    c.started_ms = Some(now_ms());
89    c.finished_ms = None;
90    c.duration_ms = None;
91    c.last_error = None;
92}
93
94fn finish_ok(c: &mut Component) {
95    c.state = State::Ready;
96    let end = now_ms();
97    c.finished_ms = Some(end);
98    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
99}
100
101fn finish_err(c: &mut Component, e: String) {
102    c.state = State::Failed;
103    let end = now_ms();
104    c.finished_ms = Some(end);
105    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
106    c.last_error = Some(e);
107}
108
109pub fn ensure_all_background(project_root: &str) {
110    let state = entry_for(project_root);
111    let should_spawn = {
112        let mut s = state
113            .lock()
114            .unwrap_or_else(std::sync::PoisonError::into_inner);
115        if s.worker_running {
116            false
117        } else {
118            s.worker_running = true;
119            true
120        }
121    };
122
123    if !should_spawn {
124        return;
125    }
126
127    let root = project_root.to_string();
128    std::thread::spawn(move || {
129        let state = entry_for(&root);
130
131        // Phase 1: Graph index — may produce a content cache from the file walk
132        {
133            let mut s = state
134                .lock()
135                .unwrap_or_else(std::sync::PoisonError::into_inner);
136            start_component(&mut s.graph);
137        }
138        let graph_result = std::panic::catch_unwind(|| {
139            let (idx, content_cache) = graph_index::scan_with_content_cache(&root);
140            // JSON index write is kept for backward compatibility with remaining
141            // direct ProjectIndex consumers. Will be removed when all consumers
142            // are migrated to GraphProvider/PropertyGraph. (OPT-14/15 Phase 6)
143            let _ = idx.save();
144            (idx, content_cache)
145        });
146        let content_cache = if let Ok((_idx, cache)) = graph_result {
147            let mut s = state
148                .lock()
149                .unwrap_or_else(std::sync::PoisonError::into_inner);
150            finish_ok(&mut s.graph);
151            cache
152        } else {
153            let mut s = state
154                .lock()
155                .unwrap_or_else(std::sync::PoisonError::into_inner);
156            finish_err(&mut s.graph, "graph index build panicked".to_string());
157            HashMap::new()
158        };
159
160        // Phase 2: BM25 index — reuses content from graph scan when available
161        {
162            let mut s = state
163                .lock()
164                .unwrap_or_else(std::sync::PoisonError::into_inner);
165            start_component(&mut s.bm25);
166        }
167        let bm = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
168            let root_pb = Path::new(&root);
169            let idx = if content_cache.is_empty() {
170                BM25Index::load_or_build(root_pb)
171            } else {
172                BM25Index::build_with_content_hint(root_pb, &content_cache)
173            };
174            let _ = idx.save(root_pb);
175        }));
176        if let Ok(()) = bm {
177            let mut s = state
178                .lock()
179                .unwrap_or_else(std::sync::PoisonError::into_inner);
180            finish_ok(&mut s.bm25);
181        } else {
182            let mut s = state
183                .lock()
184                .unwrap_or_else(std::sync::PoisonError::into_inner);
185            finish_err(&mut s.bm25, "bm25 build panicked".to_string());
186        }
187
188        let mut s = state
189            .lock()
190            .unwrap_or_else(std::sync::PoisonError::into_inner);
191        s.worker_running = false;
192    });
193}
194
195pub fn try_load_graph_index(project_root: &str) -> Option<ProjectIndex> {
196    ProjectIndex::load(project_root).filter(|idx| !idx.files.is_empty())
197}
198
199pub fn try_load_bm25_index(project_root: &str) -> Option<BM25Index> {
200    BM25Index::load(Path::new(project_root))
201}
202
203/// Returns true if any project is currently building its indices.
204pub fn is_building() -> bool {
205    let map = registry()
206        .lock()
207        .unwrap_or_else(std::sync::PoisonError::into_inner);
208    map.values().any(|entry| {
209        let s = entry
210            .lock()
211            .unwrap_or_else(std::sync::PoisonError::into_inner);
212        matches!(s.bm25.state, State::Building) || matches!(s.graph.state, State::Building)
213    })
214}
215
216#[derive(Debug, Serialize)]
217struct ComponentStatus<'a> {
218    state: &'a str,
219    started_ms: Option<u64>,
220    finished_ms: Option<u64>,
221    duration_ms: Option<u64>,
222    last_error: Option<&'a str>,
223}
224
225fn component_status(c: &Component) -> ComponentStatus<'_> {
226    ComponentStatus {
227        state: match c.state {
228            State::Idle => "idle",
229            State::Building => "building",
230            State::Ready => "ready",
231            State::Failed => "failed",
232        },
233        started_ms: c.started_ms,
234        finished_ms: c.finished_ms,
235        duration_ms: c.duration_ms,
236        last_error: c.last_error.as_deref(),
237    }
238}
239
240#[derive(Debug, Serialize)]
241struct StatusResponse<'a> {
242    project_root: &'a str,
243    graph_index: ComponentStatus<'a>,
244    bm25_index: ComponentStatus<'a>,
245    disk: DiskStatusAll,
246}
247
248#[derive(Debug, Serialize, Default)]
249pub struct DiskStatus {
250    pub exists: bool,
251    pub size_bytes: Option<u64>,
252    pub file_count: Option<u64>,
253    pub modified_at: Option<String>,
254}
255
256#[derive(Debug, Serialize, Default)]
257pub struct DiskStatusAll {
258    pub graph_index: DiskStatus,
259    pub bm25_index: DiskStatus,
260    pub code_graph: DiskStatus,
261}
262
263fn disk_status_for_graph(project_root: &str) -> DiskStatus {
264    let Some(dir) = graph_index::ProjectIndex::index_dir(project_root) else {
265        return DiskStatus::default();
266    };
267    let zst = dir.join("index.json.zst");
268    let json = dir.join("index.json");
269    let path = if zst.exists() {
270        zst
271    } else if json.exists() {
272        json
273    } else {
274        return DiskStatus::default();
275    };
276    let meta = std::fs::metadata(&path).ok();
277    let file_count =
278        graph_index::ProjectIndex::load(project_root).map(|idx| idx.files.len() as u64);
279    DiskStatus {
280        exists: true,
281        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
282        file_count,
283        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
284    }
285}
286
287fn disk_status_for_bm25(project_root: &str) -> DiskStatus {
288    let root = Path::new(project_root);
289    let path = BM25Index::index_file_path(root);
290    if !path.exists() {
291        return DiskStatus::default();
292    }
293    let meta = std::fs::metadata(&path).ok();
294    DiskStatus {
295        exists: true,
296        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
297        file_count: None,
298        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
299    }
300}
301
302fn disk_status_for_code_graph(project_root: &str) -> DiskStatus {
303    let dir = crate::core::property_graph::graph_dir(project_root);
304    let db_path = dir.join("graph.db");
305    if !db_path.exists() {
306        return DiskStatus::default();
307    }
308    let meta = std::fs::metadata(&db_path).ok();
309    let node_count = crate::core::property_graph::CodeGraph::open(project_root)
310        .ok()
311        .and_then(|g| {
312            g.connection()
313                .query_row("SELECT count(*) FROM nodes", [], |r| r.get::<_, i64>(0))
314                .ok()
315                .map(|c| c as u64)
316        });
317    DiskStatus {
318        exists: true,
319        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
320        file_count: node_count,
321        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
322    }
323}
324
325fn format_time(t: SystemTime) -> String {
326    let secs = t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
327    let dt = chrono::DateTime::from_timestamp(secs as i64, 0);
328    dt.map_or_else(
329        || format!("{secs}"),
330        |d| d.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
331    )
332}
333
334pub fn disk_status(project_root: &str) -> DiskStatusAll {
335    DiskStatusAll {
336        graph_index: disk_status_for_graph(project_root),
337        bm25_index: disk_status_for_bm25(project_root),
338        code_graph: disk_status_for_code_graph(project_root),
339    }
340}
341
342pub fn status_json(project_root: &str) -> String {
343    let state = entry_for(project_root);
344    let s = state
345        .lock()
346        .unwrap_or_else(std::sync::PoisonError::into_inner);
347    let res = StatusResponse {
348        project_root,
349        graph_index: component_status(&s.graph),
350        bm25_index: component_status(&s.bm25),
351        disk: disk_status(project_root),
352    };
353    serde_json::to_string(&res).unwrap_or_else(|_| "{}".to_string())
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    #[test]
361    fn status_json_is_valid_json() {
362        let s = status_json("/tmp");
363        let _: serde_json::Value = serde_json::from_str(&s).unwrap();
364    }
365}