Skip to main content

codemem_engine/
graph_ops.rs

1//! Engine facade methods for graph algorithms.
2//!
3//! These wrap lock acquisition + graph algorithm calls so the MCP/API
4//! transport layers don't interact with the graph mutex directly.
5
6use crate::CodememEngine;
7use chrono::{DateTime, Utc};
8use codemem_core::{CodememError, Edge, GraphNode, NodeKind, RelationshipType};
9use std::collections::{HashMap, HashSet};
10
11// ── Result Types ─────────────────────────────────────────────────────────────
12
13/// A node with its PageRank score.
14#[derive(Debug, Clone)]
15pub struct RankedNode {
16    pub id: String,
17    pub score: f64,
18    pub kind: Option<String>,
19    pub label: Option<String>,
20}
21
22/// In-memory graph statistics snapshot.
23#[derive(Debug, Clone)]
24pub struct GraphStats {
25    pub node_count: usize,
26    pub edge_count: usize,
27    pub node_kind_counts: HashMap<String, usize>,
28    pub relationship_type_counts: HashMap<String, usize>,
29}
30
31/// A commit entry in a temporal change report.
32#[derive(Debug, Clone, serde::Serialize)]
33pub struct ChangeEntry {
34    pub commit_id: String,
35    pub hash: String,
36    pub author: String,
37    pub date: String,
38    pub subject: String,
39    pub changed_symbols: Vec<String>,
40    pub changed_files: Vec<String>,
41}
42
43/// Snapshot of the graph at a point in time.
44#[derive(Debug, Clone, serde::Serialize)]
45pub struct TemporalSnapshot {
46    pub at: String,
47    pub live_nodes: usize,
48    pub live_edges: usize,
49    pub node_kind_counts: HashMap<String, usize>,
50}
51
52/// A file that may be stale (not modified recently but still referenced).
53#[derive(Debug, Clone, serde::Serialize)]
54pub struct StaleFile {
55    pub file_path: String,
56    pub centrality: f64,
57    pub last_modified: Option<String>,
58    pub incoming_edges: usize,
59}
60
61/// Report on architectural drift between two time periods.
62#[derive(Debug, Clone, serde::Serialize)]
63pub struct DriftReport {
64    pub period: String,
65    pub new_cross_module_edges: usize,
66    pub removed_files: usize,
67    pub added_files: usize,
68    pub hotspot_files: Vec<String>,
69    pub coupling_increases: Vec<(String, String, usize)>,
70}
71
72// ── Engine Methods ───────────────────────────────────────────────────────────
73
74impl CodememEngine {
75    /// BFS or DFS traversal from a start node, with optional kind/relationship filters.
76    /// When `at_time` is provided, nodes/edges outside their validity window are excluded.
77    ///
78    /// Note: temporal filtering is post-hoc — the traversal runs on the full graph,
79    /// then expired nodes are removed. This means depth counts may include hops through
80    /// expired nodes. For precise temporal traversals, use `graph_at_time` instead.
81    pub fn graph_traverse(
82        &self,
83        start_id: &str,
84        depth: usize,
85        algorithm: &str,
86        exclude_kinds: &[NodeKind],
87        include_relationships: Option<&[RelationshipType]>,
88        at_time: Option<DateTime<Utc>>,
89    ) -> Result<Vec<GraphNode>, CodememError> {
90        let graph = self.lock_graph()?;
91        let has_filters = !exclude_kinds.is_empty() || include_relationships.is_some();
92
93        let mut nodes = if has_filters {
94            match algorithm {
95                "bfs" => graph.bfs_filtered(start_id, depth, exclude_kinds, include_relationships),
96                "dfs" => graph.dfs_filtered(start_id, depth, exclude_kinds, include_relationships),
97                _ => Err(CodememError::InvalidInput(format!(
98                    "Unknown algorithm: {algorithm}"
99                ))),
100            }
101        } else {
102            match algorithm {
103                "bfs" => graph.bfs(start_id, depth),
104                "dfs" => graph.dfs(start_id, depth),
105                _ => Err(CodememError::InvalidInput(format!(
106                    "Unknown algorithm: {algorithm}"
107                ))),
108            }
109        }?;
110
111        // Filter by temporal validity if at_time is provided
112        if let Some(at) = at_time {
113            nodes.retain(|n| {
114                n.valid_from.is_none_or(|vf| vf <= at) && n.valid_to.is_none_or(|vt| vt > at)
115            });
116        }
117
118        Ok(nodes)
119    }
120
121    /// Get in-memory graph statistics.
122    pub fn graph_stats(&self) -> Result<GraphStats, CodememError> {
123        let graph = self.lock_graph()?;
124        let stats = graph.stats();
125        Ok(GraphStats {
126            node_count: stats.node_count,
127            edge_count: stats.edge_count,
128            node_kind_counts: stats.node_kind_counts,
129            relationship_type_counts: stats.relationship_type_counts,
130        })
131    }
132
133    /// Get all edges for a node.
134    pub fn get_node_edges(&self, node_id: &str) -> Result<Vec<Edge>, CodememError> {
135        let graph = self.lock_graph()?;
136        graph.get_edges(node_id)
137    }
138
139    /// Run Louvain community detection at the given resolution.
140    pub fn louvain_communities(&self, resolution: f64) -> Result<Vec<Vec<String>>, CodememError> {
141        let graph = self.lock_graph()?;
142        Ok(graph.louvain_communities(resolution))
143    }
144
145    /// Compute PageRank and return the top-k nodes with their scores,
146    /// kinds, and labels.
147    pub fn find_important_nodes(
148        &self,
149        top_k: usize,
150        damping: f64,
151    ) -> Result<Vec<RankedNode>, CodememError> {
152        let graph = self.lock_graph()?;
153        let scores = graph.pagerank(damping, 100, 1e-6);
154
155        let mut sorted: Vec<(String, f64)> = scores.into_iter().collect();
156        sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
157        sorted.truncate(top_k);
158
159        let results = sorted
160            .into_iter()
161            .map(|(id, score)| {
162                let node = graph.get_node(&id).ok().flatten();
163                RankedNode {
164                    id,
165                    score,
166                    kind: node.as_ref().map(|n| n.kind.to_string()),
167                    label: node.as_ref().map(|n| n.label.clone()),
168                }
169            })
170            .collect();
171
172        Ok(results)
173    }
174
175    // ── Temporal Queries ─────────────────────────────────────────────────
176
177    /// Return what changed between two timestamps: commits, their files, and symbols.
178    pub fn what_changed(
179        &self,
180        from: DateTime<Utc>,
181        to: DateTime<Utc>,
182        namespace: Option<&str>,
183    ) -> Result<Vec<ChangeEntry>, CodememError> {
184        if from > to {
185            return Err(CodememError::InvalidInput(
186                "'from' must be before 'to'".into(),
187            ));
188        }
189        let all_nodes = {
190            let graph = self.lock_graph()?;
191            graph.get_all_nodes()
192        };
193        let all_edges = self.storage.all_graph_edges()?;
194
195        // Find commit nodes in the time range
196        let commits: Vec<&GraphNode> = all_nodes
197            .iter()
198            .filter(|n| {
199                n.kind == NodeKind::Commit
200                    && !n.payload.contains_key("sentinel")
201                    && n.valid_from.is_some_and(|vf| vf >= from && vf <= to)
202                    && (namespace.is_none() || n.namespace.as_deref() == namespace)
203            })
204            .collect();
205
206        // Index ModifiedBy edges by dst (commit ID) for O(1) lookup
207        let mut edges_by_dst: HashMap<&str, Vec<&Edge>> = HashMap::new();
208        for edge in &all_edges {
209            if edge.relationship == RelationshipType::ModifiedBy {
210                edges_by_dst.entry(&edge.dst).or_default().push(edge);
211            }
212        }
213
214        let mut entries = Vec::new();
215        for commit in &commits {
216            let mut changed_files = Vec::new();
217            let mut changed_symbols = Vec::new();
218
219            if let Some(commit_edges) = edges_by_dst.get(commit.id.as_str()) {
220                for edge in commit_edges {
221                    if edge.src.starts_with("file:") {
222                        changed_files.push(
223                            edge.src
224                                .strip_prefix("file:")
225                                .unwrap_or(&edge.src)
226                                .to_string(),
227                        );
228                    } else if edge.src.starts_with("sym:") {
229                        changed_symbols.push(
230                            edge.src
231                                .strip_prefix("sym:")
232                                .unwrap_or(&edge.src)
233                                .to_string(),
234                        );
235                    }
236                }
237            }
238
239            entries.push(ChangeEntry {
240                commit_id: commit.id.clone(),
241                hash: commit
242                    .payload
243                    .get("hash")
244                    .and_then(|v| v.as_str())
245                    .unwrap_or("")
246                    .to_string(),
247                author: commit
248                    .payload
249                    .get("author")
250                    .and_then(|v| v.as_str())
251                    .unwrap_or("")
252                    .to_string(),
253                date: commit
254                    .payload
255                    .get("date")
256                    .and_then(|v| v.as_str())
257                    .unwrap_or("")
258                    .to_string(),
259                subject: commit
260                    .payload
261                    .get("subject")
262                    .and_then(|v| v.as_str())
263                    .unwrap_or("")
264                    .to_string(),
265                changed_symbols,
266                changed_files,
267            });
268        }
269
270        // Sort by date descending (newest first)
271        entries.sort_by(|a, b| b.date.cmp(&a.date));
272        Ok(entries)
273    }
274
275    /// Snapshot of the graph at a point in time.
276    ///
277    /// Counts nodes/edges that were valid at `at`: `valid_from <= at` and
278    /// (`valid_to` is None or `valid_to > at`). Nodes/edges with no temporal
279    /// fields are always counted (they predate the temporal layer).
280    pub fn graph_at_time(&self, at: DateTime<Utc>) -> Result<TemporalSnapshot, CodememError> {
281        let all_nodes = {
282            let graph = self.lock_graph()?;
283            graph.get_all_nodes()
284        };
285        let all_edges = self.storage.all_graph_edges()?;
286
287        let is_node_live = |n: &GraphNode| -> bool {
288            let after_start = n.valid_from.is_none_or(|vf| vf <= at);
289            let before_end = n.valid_to.is_none_or(|vt| vt > at);
290            after_start && before_end
291        };
292
293        let is_edge_live = |e: &Edge| -> bool {
294            let after_start = e.valid_from.is_none_or(|vf| vf <= at);
295            let before_end = e.valid_to.is_none_or(|vt| vt > at);
296            after_start && before_end
297        };
298
299        let live_nodes: Vec<&GraphNode> = all_nodes.iter().filter(|n| is_node_live(n)).collect();
300        let live_edges = all_edges.iter().filter(|e| is_edge_live(e)).count();
301
302        let mut kind_counts: HashMap<String, usize> = HashMap::new();
303        for node in &live_nodes {
304            *kind_counts.entry(node.kind.to_string()).or_default() += 1;
305        }
306
307        Ok(TemporalSnapshot {
308            at: at.to_rfc3339(),
309            live_nodes: live_nodes.len(),
310            live_edges,
311            node_kind_counts: kind_counts,
312        })
313    }
314
315    /// Find files that haven't been modified recently but have high centrality
316    /// or many incoming edges (potential stale code).
317    pub fn find_stale_files(
318        &self,
319        namespace: Option<&str>,
320        stale_days: u64,
321    ) -> Result<Vec<StaleFile>, CodememError> {
322        let all_nodes = {
323            let graph = self.lock_graph()?;
324            graph.get_all_nodes()
325        };
326        let all_edges = self.storage.all_graph_edges()?;
327        let stale_days = stale_days.min(3650); // Cap at 10 years
328        let cutoff = Utc::now() - chrono::Duration::days(stale_days as i64);
329
330        // Find the latest ModifiedBy edge date for each file
331        let mut file_last_modified: HashMap<&str, DateTime<Utc>> = HashMap::new();
332        for edge in &all_edges {
333            if edge.relationship == RelationshipType::ModifiedBy && edge.src.starts_with("file:") {
334                if let Some(vf) = edge.valid_from {
335                    let entry = file_last_modified.entry(&edge.src).or_insert(vf);
336                    if vf > *entry {
337                        *entry = vf;
338                    }
339                }
340            }
341        }
342
343        // Count incoming edges per file (how many things depend on it)
344        let mut incoming: HashMap<&str, usize> = HashMap::new();
345        for edge in &all_edges {
346            if edge.dst.starts_with("file:") {
347                *incoming.entry(&edge.dst).or_default() += 1;
348            }
349        }
350
351        let mut stale = Vec::new();
352        for node in &all_nodes {
353            if node.kind != NodeKind::File {
354                continue;
355            }
356            if node.valid_to.is_some() {
357                continue; // Already deleted
358            }
359            if namespace.is_some() && node.namespace.as_deref() != namespace {
360                continue;
361            }
362
363            let last_mod = file_last_modified.get(node.id.as_str()).copied();
364            // Only flag as stale if we have temporal data — files with no
365            // ModifiedBy edges haven't been through temporal ingestion yet,
366            // so we can't determine staleness.
367            let is_stale = last_mod.is_some_and(|lm| lm < cutoff);
368
369            if is_stale
370                && (node.centrality > 0.0
371                    || incoming.get(node.id.as_str()).copied().unwrap_or(0) > 0)
372            {
373                let file_path = node
374                    .id
375                    .strip_prefix("file:")
376                    .unwrap_or(&node.id)
377                    .to_string();
378                stale.push(StaleFile {
379                    file_path,
380                    centrality: node.centrality,
381                    last_modified: last_mod.map(|d| d.to_rfc3339()),
382                    incoming_edges: incoming.get(node.id.as_str()).copied().unwrap_or(0),
383                });
384            }
385        }
386
387        // Sort by centrality descending (most important stale files first)
388        stale.sort_by(|a, b| {
389            b.centrality
390                .partial_cmp(&a.centrality)
391                .unwrap_or(std::cmp::Ordering::Equal)
392        });
393        Ok(stale)
394    }
395
396    /// Detect architectural drift between two time periods.
397    ///
398    /// Compares the graph at `from` vs `to` to find: new cross-module edges,
399    /// files added/removed, hotspot files (most commits), and coupling increases
400    /// (module pairs with growing CoChanged counts).
401    pub fn detect_drift(
402        &self,
403        from: DateTime<Utc>,
404        to: DateTime<Utc>,
405        namespace: Option<&str>,
406    ) -> Result<DriftReport, CodememError> {
407        if from > to {
408            return Err(CodememError::InvalidInput(
409                "'from' must be before 'to'".into(),
410            ));
411        }
412        let all_nodes = {
413            let graph = self.lock_graph()?;
414            graph.get_all_nodes()
415        };
416        let all_edges = self.storage.all_graph_edges()?;
417
418        // Files alive at `from` vs `to`
419        let files_at = |at: DateTime<Utc>| -> HashSet<String> {
420            all_nodes
421                .iter()
422                .filter(|n| {
423                    n.kind == NodeKind::File
424                        && n.valid_from.is_none_or(|vf| vf <= at)
425                        && n.valid_to.is_none_or(|vt| vt > at)
426                        && (namespace.is_none() || n.namespace.as_deref() == namespace)
427                })
428                .map(|n| n.id.clone())
429                .collect()
430        };
431
432        let files_before = files_at(from);
433        let files_after = files_at(to);
434        let added_files = files_after.difference(&files_before).count();
435        let removed_files = files_before.difference(&files_after).count();
436
437        // Count cross-module edges added in the period
438        // A "cross-module" edge connects nodes in different top-level directories
439        let module_of = |id: &str| -> String {
440            let path = id
441                .strip_prefix("file:")
442                .or_else(|| id.strip_prefix("sym:"))
443                .unwrap_or(id);
444            path.split('/').next().unwrap_or("root").to_string()
445        };
446
447        let new_cross_module = all_edges
448            .iter()
449            .filter(|e| {
450                e.valid_from.is_some_and(|vf| vf >= from && vf <= to)
451                    && module_of(&e.src) != module_of(&e.dst)
452                    && !matches!(
453                        e.relationship,
454                        RelationshipType::ModifiedBy | RelationshipType::PartOf
455                    )
456            })
457            .count();
458
459        // Hotspot files: most ModifiedBy edges in the period
460        let mut file_commit_count: HashMap<String, usize> = HashMap::new();
461        for edge in &all_edges {
462            if edge.relationship == RelationshipType::ModifiedBy
463                && edge.src.starts_with("file:")
464                && edge.valid_from.is_some_and(|vf| vf >= from && vf <= to)
465            {
466                *file_commit_count
467                    .entry(
468                        edge.src
469                            .strip_prefix("file:")
470                            .unwrap_or(&edge.src)
471                            .to_string(),
472                    )
473                    .or_default() += 1;
474            }
475        }
476        let mut hotspots: Vec<(String, usize)> = file_commit_count.into_iter().collect();
477        hotspots.sort_by(|a, b| b.1.cmp(&a.1));
478        let hotspot_files: Vec<String> = hotspots.into_iter().take(10).map(|(f, _)| f).collect();
479
480        // Coupling increases: CoChanged pairs with growing counts.
481        // CoChanged edges from enrich_git_history use created_at (not valid_from),
482        // so check both fields.
483        let mut coupling: HashMap<(String, String), usize> = HashMap::new();
484        for edge in &all_edges {
485            if edge.relationship == RelationshipType::CoChanged
486                && (edge.valid_from.is_some_and(|vf| vf >= from && vf <= to)
487                    || (edge.valid_from.is_none()
488                        && edge.created_at >= from
489                        && edge.created_at <= to))
490            {
491                let pair = if edge.src < edge.dst {
492                    (edge.src.clone(), edge.dst.clone())
493                } else {
494                    (edge.dst.clone(), edge.src.clone())
495                };
496                let count = edge
497                    .properties
498                    .get("commit_count")
499                    .and_then(|v| v.as_u64())
500                    .unwrap_or(1) as usize;
501                *coupling.entry(pair).or_default() += count;
502            }
503        }
504        let mut coupling_vec: Vec<(String, String, usize)> =
505            coupling.into_iter().map(|((a, b), c)| (a, b, c)).collect();
506        coupling_vec.sort_by(|a, b| b.2.cmp(&a.2));
507        coupling_vec.truncate(10);
508
509        Ok(DriftReport {
510            period: format!("{} to {}", from.to_rfc3339(), to.to_rfc3339()),
511            new_cross_module_edges: new_cross_module,
512            removed_files,
513            added_files,
514            hotspot_files,
515            coupling_increases: coupling_vec,
516        })
517    }
518
519    /// Get the history of commits that modified a specific symbol or file.
520    pub fn symbol_history(&self, node_id: &str) -> Result<Vec<ChangeEntry>, CodememError> {
521        // Phase 1: Find commit IDs via targeted edge query (not full scan)
522        let node_edges = self.storage.get_edges_for_node(node_id)?;
523        let commit_ids: HashSet<String> = node_edges
524            .iter()
525            .filter(|e| e.src == node_id && e.relationship == RelationshipType::ModifiedBy)
526            .map(|e| e.dst.clone())
527            .collect();
528
529        if commit_ids.is_empty() {
530            return Ok(Vec::new());
531        }
532
533        // Batch-load commit nodes from graph (single lock acquisition)
534        let commit_nodes: Vec<GraphNode> = {
535            let graph = self.lock_graph()?;
536            graph
537                .get_all_nodes()
538                .into_iter()
539                .filter(|n| commit_ids.contains(&n.id))
540                .collect()
541        };
542
543        // Phase 2: For each commit, find sibling files/symbols via targeted query
544        let mut entries = Vec::new();
545        for node in &commit_nodes {
546            let commit_edges = self.storage.get_edges_for_node(&node.id)?;
547            let mut changed_files = Vec::new();
548            let mut changed_symbols = Vec::new();
549
550            for edge in &commit_edges {
551                if edge.relationship == RelationshipType::ModifiedBy && edge.dst == node.id {
552                    if edge.src.starts_with("file:") {
553                        changed_files.push(
554                            edge.src
555                                .strip_prefix("file:")
556                                .unwrap_or(&edge.src)
557                                .to_string(),
558                        );
559                    } else if edge.src.starts_with("sym:") {
560                        changed_symbols.push(
561                            edge.src
562                                .strip_prefix("sym:")
563                                .unwrap_or(&edge.src)
564                                .to_string(),
565                        );
566                    }
567                }
568            }
569
570            entries.push(ChangeEntry {
571                commit_id: node.id.clone(),
572                hash: node
573                    .payload
574                    .get("hash")
575                    .and_then(|v| v.as_str())
576                    .unwrap_or("")
577                    .to_string(),
578                author: node
579                    .payload
580                    .get("author")
581                    .and_then(|v| v.as_str())
582                    .unwrap_or("")
583                    .to_string(),
584                date: node
585                    .payload
586                    .get("date")
587                    .and_then(|v| v.as_str())
588                    .unwrap_or("")
589                    .to_string(),
590                subject: node
591                    .payload
592                    .get("subject")
593                    .and_then(|v| v.as_str())
594                    .unwrap_or("")
595                    .to_string(),
596                changed_symbols,
597                changed_files,
598            });
599        }
600
601        entries.sort_by(|a, b| b.date.cmp(&a.date));
602        Ok(entries)
603    }
604}