Skip to main content

codemem_engine/
file_indexing.rs

1use crate::index::{self, IndexAndResolveResult, Indexer};
2use crate::patterns;
3use crate::CodememEngine;
4use codemem_core::{CodememError, DetectedPattern, GraphBackend, MemoryNode, VectorBackend};
5use std::collections::HashSet;
6use std::path::Path;
7use std::sync::atomic::Ordering;
8
9impl CodememEngine {
10    // ── Index Persistence ────────────────────────────────────────────────
11
12    /// Save the vector and BM25 indexes to disk if a db_path is configured.
13    /// Compacts the HNSW index if ghost entries exceed 20% of live entries.
14    /// Always clears the dirty flag so `flush_if_dirty()` won't double-save.
15    pub fn save_index(&self) {
16        if let Some(ref db_path) = self.db_path {
17            let idx_path = db_path.with_extension("idx");
18            if let Ok(mut vi) = self.lock_vector() {
19                // Compact HNSW if ghost entries exceed threshold
20                if vi.needs_compaction() {
21                    let ghost = vi.ghost_count();
22                    let live = vi.stats().count;
23                    tracing::info!(
24                        "HNSW ghost compaction: {ghost} ghosts / {live} live entries, rebuilding..."
25                    );
26                    if let Ok(embeddings) = self.storage.list_all_embeddings() {
27                        if let Err(e) = vi.rebuild_from_entries(&embeddings) {
28                            tracing::warn!("HNSW compaction failed: {e}");
29                        }
30                    }
31                }
32                if let Err(e) = vi.save(&idx_path) {
33                    tracing::warn!("Failed to save vector index: {e}");
34                }
35            }
36
37            // Persist BM25 index alongside the vector index
38            let bm25_path = db_path.with_extension("bm25");
39            if let Ok(bm25) = self.lock_bm25() {
40                if bm25.needs_save() {
41                    let data = bm25.serialize();
42                    let tmp_path = db_path.with_extension("bm25.tmp");
43                    if let Err(e) = std::fs::write(&tmp_path, &data)
44                        .and_then(|_| std::fs::rename(&tmp_path, &bm25_path))
45                    {
46                        tracing::warn!("Failed to save BM25 index: {e}");
47                    }
48                }
49            }
50        }
51        self.dirty.store(false, Ordering::Release);
52    }
53
54    /// Reload the in-memory graph from the database.
55    pub fn reload_graph(&self) -> Result<(), CodememError> {
56        let new_graph = codemem_storage::graph::GraphEngine::from_storage(&*self.storage)?;
57        let mut graph = self.lock_graph()?;
58        *graph = new_graph;
59        graph.recompute_centrality();
60        Ok(())
61    }
62
63    // ── A2: File Watcher Event Processing ───────────────────────────────
64
65    /// Process a single file watcher event by re-indexing changed/created files
66    /// or cleaning up deleted file nodes.
67    ///
68    /// Call this from a watcher event loop:
69    /// ```ignore
70    /// while let Ok(event) = watcher.receiver().recv() {
71    ///     engine.process_watch_event(&event, namespace, Some(root));
72    /// }
73    /// ```
74    pub fn process_watch_event(
75        &self,
76        event: &crate::watch::WatchEvent,
77        namespace: Option<&str>,
78        project_root: Option<&Path>,
79    ) -> Result<(), CodememError> {
80        match event {
81            crate::watch::WatchEvent::FileChanged(path)
82            | crate::watch::WatchEvent::FileCreated(path) => {
83                self.index_single_file(path, namespace, project_root)?;
84            }
85            crate::watch::WatchEvent::FileDeleted(path) => {
86                // Relativize the deleted path so the node ID matches what was indexed.
87                let rel = if let Some(root) = project_root {
88                    path.strip_prefix(root)
89                        .unwrap_or(path)
90                        .to_string_lossy()
91                        .to_string()
92                } else {
93                    path.to_string_lossy().to_string()
94                };
95                self.cleanup_file_nodes(&rel)?;
96            }
97        }
98        Ok(())
99    }
100
101    /// Index (or re-index) a single file: parse it, persist nodes/edges/embeddings,
102    /// and update the index cache.
103    ///
104    /// `project_root` is used to relativize the absolute `path` so node IDs are
105    /// portable. If `None`, the path is stored as-is (absolute).
106    fn index_single_file(
107        &self,
108        path: &Path,
109        namespace: Option<&str>,
110        project_root: Option<&Path>,
111    ) -> Result<(), CodememError> {
112        let content = std::fs::read(path)?;
113
114        let path_str = if let Some(root) = project_root {
115            path.strip_prefix(root)
116                .unwrap_or(path)
117                .to_string_lossy()
118                .to_string()
119        } else {
120            path.to_string_lossy().to_string()
121        };
122        let parser = index::CodeParser::new();
123
124        let parse_result = match parser.parse_file(&path_str, &content) {
125            Some(pr) => pr,
126            None => return Ok(()), // Unsupported file type or parse failure
127        };
128
129        // Build a minimal IndexAndResolveResult for this single file
130        let mut file_paths = HashSet::new();
131        file_paths.insert(parse_result.file_path.clone());
132
133        let mut resolver = index::ReferenceResolver::new();
134        resolver.add_symbols(&parse_result.symbols);
135        let edges = resolver.resolve_all(&parse_result.references);
136
137        let results = IndexAndResolveResult {
138            index: index::IndexResult {
139                files_scanned: 1,
140                files_parsed: 1,
141                files_skipped: 0,
142                total_symbols: parse_result.symbols.len(),
143                total_references: parse_result.references.len(),
144                total_chunks: parse_result.chunks.len(),
145                parse_results: Vec::new(),
146            },
147            symbols: parse_result.symbols,
148            references: parse_result.references,
149            chunks: parse_result.chunks,
150            file_paths,
151            edges,
152            root_path: project_root
153                .map(|p| p.to_path_buf())
154                .unwrap_or_else(|| path.to_path_buf()),
155        };
156
157        self.persist_index_results(&results, namespace)?;
158        Ok(())
159    }
160
161    // ── A3: File Deletion Cleanup ───────────────────────────────────────
162
163    /// Remove graph nodes, edges, and embeddings for a single deleted file.
164    fn cleanup_file_nodes(&self, file_path: &str) -> Result<(), CodememError> {
165        let file_node_id = format!("file:{file_path}");
166
167        // Remove all chunk nodes for this file
168        let chunk_prefix = format!("chunk:{file_path}:");
169        if let Err(e) = self.storage.delete_graph_nodes_by_prefix(&chunk_prefix) {
170            tracing::warn!("Failed to delete chunk nodes for {file_path}: {e}");
171        }
172
173        // Remove symbol nodes for this file by checking graph
174        let graph = self.lock_graph()?;
175        let sym_ids: Vec<String> = graph
176            .get_all_nodes()
177            .into_iter()
178            .filter(|n| {
179                n.id.starts_with("sym:")
180                    && n.payload.get("file_path").and_then(|v| v.as_str()) == Some(file_path)
181            })
182            .map(|n| n.id.clone())
183            .collect();
184        drop(graph);
185
186        for sym_id in &sym_ids {
187            if let Err(e) = self.storage.delete_graph_edges_for_node(sym_id) {
188                tracing::warn!("Failed to delete graph edges for {sym_id}: {e}");
189            }
190            if let Err(e) = self.storage.delete_graph_node(sym_id) {
191                tracing::warn!("Failed to delete graph node {sym_id}: {e}");
192            }
193            if let Err(e) = self.storage.delete_embedding(sym_id) {
194                tracing::warn!("Failed to delete embedding {sym_id}: {e}");
195            }
196        }
197
198        // Remove file node itself
199        if let Err(e) = self.storage.delete_graph_edges_for_node(&file_node_id) {
200            tracing::warn!("Failed to delete graph edges for {file_node_id}: {e}");
201        }
202        if let Err(e) = self.storage.delete_graph_node(&file_node_id) {
203            tracing::warn!("Failed to delete graph node {file_node_id}: {e}");
204        }
205
206        // Clean up in-memory graph
207        let mut graph = self.lock_graph()?;
208        for sym_id in &sym_ids {
209            if let Err(e) = graph.remove_node(sym_id) {
210                tracing::warn!("Failed to remove {sym_id} from in-memory graph: {e}");
211            }
212        }
213        // Remove chunk nodes from in-memory graph
214        let chunk_ids: Vec<String> = graph
215            .get_all_nodes()
216            .into_iter()
217            .filter(|n| n.id.starts_with(&format!("chunk:{file_path}:")))
218            .map(|n| n.id.clone())
219            .collect();
220        for chunk_id in &chunk_ids {
221            if let Err(e) = graph.remove_node(chunk_id) {
222                tracing::warn!("Failed to remove {chunk_id} from in-memory graph: {e}");
223            }
224        }
225        if let Err(e) = graph.remove_node(&file_node_id) {
226            tracing::warn!("Failed to remove {file_node_id} from in-memory graph: {e}");
227        }
228        drop(graph);
229
230        // Remove stale embeddings from vector index
231        let mut vec = self.lock_vector()?;
232        for sym_id in &sym_ids {
233            if let Err(e) = vec.remove(sym_id) {
234                tracing::warn!("Failed to remove {sym_id} from vector index: {e}");
235            }
236        }
237        for chunk_id in &chunk_ids {
238            if let Err(e) = vec.remove(chunk_id) {
239                tracing::warn!("Failed to remove {chunk_id} from vector index: {e}");
240            }
241        }
242        drop(vec);
243
244        self.save_index();
245        Ok(())
246    }
247
248    // ── A4: Unified Analyze Pipeline ────────────────────────────────────
249
250    /// Full analysis pipeline: index → persist → enrich → recompute centrality.
251    ///
252    /// This is the single entry point for all callers (CLI, MCP, API).
253    /// Supports incremental indexing via `ChangeDetector`, progress callbacks,
254    /// and returns comprehensive results.
255    pub fn analyze(&self, options: AnalyzeOptions<'_>) -> Result<AnalyzeResult, CodememError> {
256        let root = options.path;
257
258        // 1. Index
259        let mut indexer = match options.change_detector {
260            Some(cd) => Indexer::with_change_detector(cd),
261            None => Indexer::new(),
262        };
263        let resolved = indexer.index_and_resolve(root)?;
264
265        // 2. Persist (with or without progress callback)
266        let persist = if let Some(ref on_progress) = options.progress {
267            self.persist_index_results_with_progress(
268                &resolved,
269                Some(options.namespace),
270                |done, total| {
271                    on_progress(AnalyzeProgress::Embedding { done, total });
272                },
273            )?
274        } else {
275            self.persist_index_results(&resolved, Some(options.namespace))?
276        };
277
278        // Cache results for structural queries
279        {
280            if let Ok(mut cache) = self.lock_index_cache() {
281                *cache = Some(crate::IndexCache {
282                    symbols: resolved.symbols,
283                    chunks: resolved.chunks,
284                    root_path: root.to_string_lossy().to_string(),
285                });
286            }
287        }
288
289        // 3. Enrich
290        let path_str = root.to_str().unwrap_or("");
291        let enrichment = self.run_enrichments(
292            path_str,
293            &[],
294            options.git_days,
295            Some(options.namespace),
296            None,
297        );
298
299        // 4. Recompute centrality
300        self.lock_graph()?.recompute_centrality();
301
302        // 5. Compute summary stats
303        let top_nodes = self.find_important_nodes(10, 0.85).unwrap_or_default();
304        let community_count = self.louvain_communities(1.0).map(|c| c.len()).unwrap_or(0);
305
306        // 6. Save indexes
307        self.save_index();
308
309        // Save incremental state
310        indexer.change_detector().save_to_storage(self.storage())?;
311
312        Ok(AnalyzeResult {
313            files_parsed: resolved.index.files_parsed,
314            files_skipped: resolved.index.files_skipped,
315            symbols_found: resolved.index.total_symbols,
316            edges_resolved: persist.edges_resolved,
317            chunks_stored: persist.chunks_stored,
318            symbols_embedded: persist.symbols_embedded,
319            chunks_embedded: persist.chunks_embedded,
320            chunks_pruned: persist.chunks_pruned,
321            symbols_pruned: persist.symbols_pruned,
322            enrichment_results: enrichment.results,
323            total_insights: enrichment.total_insights,
324            top_nodes,
325            community_count,
326        })
327    }
328
329    // ── A8: Session Context Synthesis ───────────────────────────────────
330
331    /// Synthesize context for a new session: recent memories, pending analyses,
332    /// active patterns, and last session summary.
333    pub fn session_context(&self, namespace: Option<&str>) -> Result<SessionContext, CodememError> {
334        let now = chrono::Utc::now();
335        let cutoff_24h = now - chrono::Duration::hours(24);
336
337        // 1. Recent memories (last 24h)
338        let ids = match namespace {
339            Some(ns) => self.storage.list_memory_ids_for_namespace(ns)?,
340            None => self.storage.list_memory_ids()?,
341        };
342
343        let mut recent_memories = Vec::new();
344        let mut pending_analyses = Vec::new();
345
346        for id in ids.iter().rev().take(200) {
347            if let Ok(Some(m)) = self.storage.get_memory_no_touch(id) {
348                // Collect pending analyses
349                if m.tags.contains(&"pending-analysis".to_string()) {
350                    pending_analyses.push(m.clone());
351                }
352                // Collect recent memories from last 24h
353                if m.created_at >= cutoff_24h {
354                    recent_memories.push(m);
355                }
356                if recent_memories.len() >= 50 && pending_analyses.len() >= 10 {
357                    break;
358                }
359            }
360        }
361
362        // 2. Active patterns
363        let session_count = self.storage.session_count(namespace).unwrap_or(1).max(1);
364        let active_patterns = patterns::detect_patterns(
365            &*self.storage,
366            namespace,
367            2, // min_frequency
368            session_count,
369        )
370        .unwrap_or_default();
371
372        // 3. Last session summary
373        let last_session_summary = self
374            .storage
375            .list_sessions(namespace, 1)?
376            .into_iter()
377            .next()
378            .and_then(|s| s.summary);
379
380        Ok(SessionContext {
381            recent_memories,
382            pending_analyses,
383            active_patterns,
384            last_session_summary,
385        })
386    }
387}
388
389// ── Result Types ────────────────────────────────────────────────────────────
390
391/// Options for the unified `analyze()` pipeline.
392pub struct AnalyzeOptions<'a> {
393    pub path: &'a Path,
394    pub namespace: &'a str,
395    pub git_days: u64,
396    pub change_detector: Option<index::incremental::ChangeDetector>,
397    pub progress: Option<Box<dyn Fn(AnalyzeProgress) + Send + 'a>>,
398}
399
400/// Progress events emitted during analysis.
401#[derive(Debug, Clone)]
402pub enum AnalyzeProgress {
403    Embedding { done: usize, total: usize },
404}
405
406/// Result of the unified `analyze()` pipeline.
407#[derive(Debug)]
408pub struct AnalyzeResult {
409    pub files_parsed: usize,
410    pub files_skipped: usize,
411    pub symbols_found: usize,
412    pub edges_resolved: usize,
413    pub chunks_stored: usize,
414    pub symbols_embedded: usize,
415    pub chunks_embedded: usize,
416    pub chunks_pruned: usize,
417    pub symbols_pruned: usize,
418    pub enrichment_results: serde_json::Value,
419    pub total_insights: usize,
420    pub top_nodes: Vec<crate::graph_ops::RankedNode>,
421    pub community_count: usize,
422}
423
424/// Session context synthesized at session start.
425#[derive(Debug)]
426pub struct SessionContext {
427    /// Memories created in the last 24 hours.
428    pub recent_memories: Vec<MemoryNode>,
429    /// Memories tagged `pending-analysis` awaiting code-mapper review.
430    pub pending_analyses: Vec<MemoryNode>,
431    /// Cross-session patterns detected with sufficient frequency.
432    pub active_patterns: Vec<DetectedPattern>,
433    /// Summary text from the most recent session (if any).
434    pub last_session_summary: Option<String>,
435}