Skip to main content

lean_ctx/core/
index_orchestrator.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::{Arc, Mutex, OnceLock};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::Serialize;
7
8use crate::core::bm25_index::BM25Index;
9use crate::core::graph_index::{self, ProjectIndex};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12enum State {
13    Idle,
14    Building,
15    Ready,
16    Failed,
17}
18
19#[derive(Debug, Clone)]
20struct Component {
21    state: State,
22    started_ms: Option<u64>,
23    finished_ms: Option<u64>,
24    duration_ms: Option<u64>,
25    last_error: Option<String>,
26}
27
28impl Component {
29    fn new() -> Self {
30        Self {
31            state: State::Idle,
32            started_ms: None,
33            finished_ms: None,
34            duration_ms: None,
35            last_error: None,
36        }
37    }
38}
39
40#[derive(Debug)]
41struct ProjectBuild {
42    worker_running: bool,
43    graph: Component,
44    bm25: Component,
45}
46
47impl ProjectBuild {
48    fn new() -> Self {
49        Self {
50            worker_running: false,
51            graph: Component::new(),
52            bm25: Component::new(),
53        }
54    }
55}
56
57// Lock ordering (see rust/LOCK_ORDERING.md):
58//   L1 = REGISTRY outer Mutex  (the HashMap guard)
59//   L2 = per-project Arc<Mutex<ProjectBuild>>  (inner guard)
60//
61// Invariant: L1 must NEVER be held while locking L2.
62// `entry_for()` enforces this by cloning the Arc and dropping L1 before
63// the caller acquires L2.
64static REGISTRY: OnceLock<Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>>> = OnceLock::new();
65
66fn registry() -> &'static Mutex<HashMap<String, Arc<Mutex<ProjectBuild>>>> {
67    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
68}
69
70fn entry_for(project_root: &str) -> Arc<Mutex<ProjectBuild>> {
71    let mut map = registry()
72        .lock()
73        .unwrap_or_else(std::sync::PoisonError::into_inner);
74    map.entry(project_root.to_string())
75        .or_insert_with(|| Arc::new(Mutex::new(ProjectBuild::new())))
76        .clone()
77}
78
79fn now_ms() -> u64 {
80    SystemTime::now()
81        .duration_since(UNIX_EPOCH)
82        .unwrap_or_default()
83        .as_millis() as u64
84}
85
86fn start_component(c: &mut Component) {
87    c.state = State::Building;
88    c.started_ms = Some(now_ms());
89    c.finished_ms = None;
90    c.duration_ms = None;
91    c.last_error = None;
92}
93
94fn finish_ok(c: &mut Component) {
95    c.state = State::Ready;
96    let end = now_ms();
97    c.finished_ms = Some(end);
98    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
99}
100
101fn finish_err(c: &mut Component, e: String) {
102    c.state = State::Failed;
103    let end = now_ms();
104    c.finished_ms = Some(end);
105    c.duration_ms = c.started_ms.map(|s| end.saturating_sub(s));
106    c.last_error = Some(e);
107}
108
109pub fn ensure_all_background(project_root: &str) {
110    let state = entry_for(project_root);
111    let should_spawn = {
112        let mut s = state
113            .lock()
114            .unwrap_or_else(std::sync::PoisonError::into_inner);
115        if s.worker_running {
116            false
117        } else {
118            s.worker_running = true;
119            true
120        }
121    };
122
123    if !should_spawn {
124        return;
125    }
126
127    let root = project_root.to_string();
128    std::thread::spawn(move || {
129        let state = entry_for(&root);
130
131        {
132            let mut s = state
133                .lock()
134                .unwrap_or_else(std::sync::PoisonError::into_inner);
135            start_component(&mut s.graph);
136        }
137        let idx = std::panic::catch_unwind(|| {
138            let idx = graph_index::load_or_build(&root);
139            let _ = idx.save();
140            idx
141        });
142        if let Ok(_i) = idx {
143            let mut s = state
144                .lock()
145                .unwrap_or_else(std::sync::PoisonError::into_inner);
146            finish_ok(&mut s.graph);
147        } else {
148            let mut s = state
149                .lock()
150                .unwrap_or_else(std::sync::PoisonError::into_inner);
151            finish_err(&mut s.graph, "graph index build panicked".to_string());
152        }
153
154        {
155            let mut s = state
156                .lock()
157                .unwrap_or_else(std::sync::PoisonError::into_inner);
158            start_component(&mut s.bm25);
159        }
160        let bm = std::panic::catch_unwind(|| {
161            let root_pb = Path::new(&root);
162            let idx = BM25Index::load_or_build(root_pb);
163            let _ = idx.save(root_pb);
164        });
165        if let Ok(()) = bm {
166            let mut s = state
167                .lock()
168                .unwrap_or_else(std::sync::PoisonError::into_inner);
169            finish_ok(&mut s.bm25);
170        } else {
171            let mut s = state
172                .lock()
173                .unwrap_or_else(std::sync::PoisonError::into_inner);
174            finish_err(&mut s.bm25, "bm25 build panicked".to_string());
175        }
176
177        let mut s = state
178            .lock()
179            .unwrap_or_else(std::sync::PoisonError::into_inner);
180        s.worker_running = false;
181    });
182}
183
184pub fn try_load_graph_index(project_root: &str) -> Option<ProjectIndex> {
185    ProjectIndex::load(project_root).filter(|idx| !idx.files.is_empty())
186}
187
188pub fn try_load_bm25_index(project_root: &str) -> Option<BM25Index> {
189    BM25Index::load(Path::new(project_root))
190}
191
192#[derive(Debug, Serialize)]
193struct ComponentStatus<'a> {
194    state: &'a str,
195    started_ms: Option<u64>,
196    finished_ms: Option<u64>,
197    duration_ms: Option<u64>,
198    last_error: Option<&'a str>,
199}
200
201fn component_status(c: &Component) -> ComponentStatus<'_> {
202    ComponentStatus {
203        state: match c.state {
204            State::Idle => "idle",
205            State::Building => "building",
206            State::Ready => "ready",
207            State::Failed => "failed",
208        },
209        started_ms: c.started_ms,
210        finished_ms: c.finished_ms,
211        duration_ms: c.duration_ms,
212        last_error: c.last_error.as_deref(),
213    }
214}
215
216#[derive(Debug, Serialize)]
217struct StatusResponse<'a> {
218    project_root: &'a str,
219    graph_index: ComponentStatus<'a>,
220    bm25_index: ComponentStatus<'a>,
221    disk: DiskStatusAll,
222}
223
224#[derive(Debug, Serialize, Default)]
225pub struct DiskStatus {
226    pub exists: bool,
227    pub size_bytes: Option<u64>,
228    pub file_count: Option<u64>,
229    pub modified_at: Option<String>,
230}
231
232#[derive(Debug, Serialize, Default)]
233pub struct DiskStatusAll {
234    pub graph_index: DiskStatus,
235    pub bm25_index: DiskStatus,
236    pub code_graph: DiskStatus,
237}
238
239fn disk_status_for_graph(project_root: &str) -> DiskStatus {
240    let Some(dir) = graph_index::ProjectIndex::index_dir(project_root) else {
241        return DiskStatus::default();
242    };
243    let zst = dir.join("index.json.zst");
244    let json = dir.join("index.json");
245    let path = if zst.exists() {
246        zst
247    } else if json.exists() {
248        json
249    } else {
250        return DiskStatus::default();
251    };
252    let meta = std::fs::metadata(&path).ok();
253    let file_count =
254        graph_index::ProjectIndex::load(project_root).map(|idx| idx.files.len() as u64);
255    DiskStatus {
256        exists: true,
257        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
258        file_count,
259        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
260    }
261}
262
263fn disk_status_for_bm25(project_root: &str) -> DiskStatus {
264    let root = Path::new(project_root);
265    let path = BM25Index::index_file_path(root);
266    if !path.exists() {
267        return DiskStatus::default();
268    }
269    let meta = std::fs::metadata(&path).ok();
270    DiskStatus {
271        exists: true,
272        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
273        file_count: None,
274        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
275    }
276}
277
278fn disk_status_for_code_graph(project_root: &str) -> DiskStatus {
279    let dir = crate::core::property_graph::graph_dir(project_root);
280    let db_path = dir.join("graph.db");
281    if !db_path.exists() {
282        return DiskStatus::default();
283    }
284    let meta = std::fs::metadata(&db_path).ok();
285    let node_count = crate::core::property_graph::CodeGraph::open(project_root)
286        .ok()
287        .and_then(|g| {
288            g.connection()
289                .query_row("SELECT count(*) FROM nodes", [], |r| r.get::<_, i64>(0))
290                .ok()
291                .map(|c| c as u64)
292        });
293    DiskStatus {
294        exists: true,
295        size_bytes: meta.as_ref().map(std::fs::Metadata::len),
296        file_count: node_count,
297        modified_at: meta.and_then(|m| m.modified().ok()).map(format_time),
298    }
299}
300
301fn format_time(t: SystemTime) -> String {
302    let secs = t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
303    let dt = chrono::DateTime::from_timestamp(secs as i64, 0);
304    dt.map_or_else(
305        || format!("{secs}"),
306        |d| d.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
307    )
308}
309
310pub fn disk_status(project_root: &str) -> DiskStatusAll {
311    DiskStatusAll {
312        graph_index: disk_status_for_graph(project_root),
313        bm25_index: disk_status_for_bm25(project_root),
314        code_graph: disk_status_for_code_graph(project_root),
315    }
316}
317
318pub fn status_json(project_root: &str) -> String {
319    let state = entry_for(project_root);
320    let s = state
321        .lock()
322        .unwrap_or_else(std::sync::PoisonError::into_inner);
323    let res = StatusResponse {
324        project_root,
325        graph_index: component_status(&s.graph),
326        bm25_index: component_status(&s.bm25),
327        disk: disk_status(project_root),
328    };
329    serde_json::to_string(&res).unwrap_or_else(|_| "{}".to_string())
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn status_json_is_valid_json() {
338        let s = status_json("/tmp");
339        let _: serde_json::Value = serde_json::from_str(&s).unwrap();
340    }
341}