Skip to main content

codemem_engine/
graph_ops.rs

1//! Engine facade methods for graph algorithms.
2//!
3//! These wrap lock acquisition + graph algorithm calls so the MCP/API
4//! transport layers don't interact with the graph mutex directly.
5
6use crate::CodememEngine;
7use chrono::{DateTime, Utc};
8use codemem_core::{CodememError, Edge, GraphNode, NodeKind, RelationshipType};
9use std::collections::{HashMap, HashSet, VecDeque};
10
11// ── Result Types ─────────────────────────────────────────────────────────────
12
13/// A node with its PageRank score.
14#[derive(Debug, Clone)]
15pub struct RankedNode {
16    pub id: String,
17    pub score: f64,
18    pub kind: Option<String>,
19    pub label: Option<String>,
20}
21
22/// In-memory graph statistics snapshot.
23#[derive(Debug, Clone)]
24pub struct GraphStats {
25    pub node_count: usize,
26    pub edge_count: usize,
27    pub node_kind_counts: HashMap<String, usize>,
28    pub relationship_type_counts: HashMap<String, usize>,
29}
30
31/// A commit entry in a temporal change report.
32#[derive(Debug, Clone, serde::Serialize)]
33pub struct ChangeEntry {
34    pub commit_id: String,
35    pub hash: String,
36    pub author: String,
37    pub date: String,
38    pub subject: String,
39    pub changed_symbols: Vec<String>,
40    pub changed_files: Vec<String>,
41}
42
43/// Snapshot of the graph at a point in time.
44#[derive(Debug, Clone, serde::Serialize)]
45pub struct TemporalSnapshot {
46    pub at: String,
47    pub live_nodes: usize,
48    pub live_edges: usize,
49    pub node_kind_counts: HashMap<String, usize>,
50}
51
52/// A file that may be stale (not modified recently but still referenced).
53#[derive(Debug, Clone, serde::Serialize)]
54pub struct StaleFile {
55    pub file_path: String,
56    pub centrality: f64,
57    pub last_modified: Option<String>,
58    pub incoming_edges: usize,
59}
60
61/// Result of test impact analysis.
62#[derive(Debug, Clone, serde::Serialize)]
63pub struct TestImpactResult {
64    pub direct_tests: Vec<TestHit>,
65    pub transitive_tests: Vec<TestHit>,
66    pub analyzed_symbols: Vec<String>,
67}
68
69/// A test discovered by impact analysis.
70#[derive(Debug, Clone, serde::Serialize)]
71pub struct TestHit {
72    pub test_symbol: String,
73    pub file_path: String,
74    pub depth: usize,
75}
76
77/// Report on detected circular dependencies (strongly connected components).
78#[derive(Debug, Clone, serde::Serialize)]
79pub struct CycleReport {
80    pub cycles: Vec<CycleGroup>,
81    pub total_cycles: usize,
82    pub critical_count: usize,
83}
84
85/// A single cycle group (SCC with >= 2 nodes).
86#[derive(Debug, Clone, serde::Serialize)]
87pub struct CycleGroup {
88    pub nodes: Vec<String>,
89    pub size: usize,
90    pub severity: String,
91}
92
93/// Report on architectural drift between two time periods.
94#[derive(Debug, Clone, serde::Serialize)]
95pub struct DriftReport {
96    pub period: String,
97    pub new_cross_module_edges: usize,
98    pub removed_files: usize,
99    pub added_files: usize,
100    pub hotspot_files: Vec<String>,
101    pub coupling_increases: Vec<(String, String, usize)>,
102}
103
104// ── Engine Methods ───────────────────────────────────────────────────────────
105
106impl CodememEngine {
107    /// BFS or DFS traversal from a start node, with optional kind/relationship filters.
108    /// When `at_time` is provided, nodes/edges outside their validity window are excluded.
109    ///
110    /// Note: temporal filtering is post-hoc — the traversal runs on the full graph,
111    /// then expired nodes are removed. This means depth counts may include hops through
112    /// expired nodes. For precise temporal traversals, use `graph_at_time` instead.
113    pub fn graph_traverse(
114        &self,
115        start_id: &str,
116        depth: usize,
117        algorithm: &str,
118        exclude_kinds: &[NodeKind],
119        include_relationships: Option<&[RelationshipType]>,
120        at_time: Option<DateTime<Utc>>,
121    ) -> Result<Vec<GraphNode>, CodememError> {
122        let graph = self.lock_graph()?;
123        let has_filters = !exclude_kinds.is_empty() || include_relationships.is_some();
124
125        let mut nodes = if has_filters {
126            match algorithm {
127                "bfs" => graph.bfs_filtered(start_id, depth, exclude_kinds, include_relationships),
128                "dfs" => graph.dfs_filtered(start_id, depth, exclude_kinds, include_relationships),
129                _ => Err(CodememError::InvalidInput(format!(
130                    "Unknown algorithm: {algorithm}"
131                ))),
132            }
133        } else {
134            match algorithm {
135                "bfs" => graph.bfs(start_id, depth),
136                "dfs" => graph.dfs(start_id, depth),
137                _ => Err(CodememError::InvalidInput(format!(
138                    "Unknown algorithm: {algorithm}"
139                ))),
140            }
141        }?;
142
143        // Filter by temporal validity if at_time is provided
144        if let Some(at) = at_time {
145            nodes.retain(|n| {
146                n.valid_from.is_none_or(|vf| vf <= at) && n.valid_to.is_none_or(|vt| vt > at)
147            });
148        }
149
150        Ok(nodes)
151    }
152
153    /// Get in-memory graph statistics.
154    pub fn graph_stats(&self) -> Result<GraphStats, CodememError> {
155        let graph = self.lock_graph()?;
156        let stats = graph.stats();
157        Ok(GraphStats {
158            node_count: stats.node_count,
159            edge_count: stats.edge_count,
160            node_kind_counts: stats.node_kind_counts,
161            relationship_type_counts: stats.relationship_type_counts,
162        })
163    }
164
165    /// Get all edges for a node.
166    pub fn get_node_edges(&self, node_id: &str) -> Result<Vec<Edge>, CodememError> {
167        let graph = self.lock_graph()?;
168        graph.get_edges(node_id)
169    }
170
171    /// Run Louvain community detection at the given resolution.
172    pub fn louvain_communities(&self, resolution: f64) -> Result<Vec<Vec<String>>, CodememError> {
173        let graph = self.lock_graph()?;
174        Ok(graph.louvain_communities(resolution))
175    }
176
177    /// Compute PageRank and return the top-k nodes with their scores,
178    /// kinds, and labels.
179    ///
180    /// If `namespace` is provided, PageRank is computed only for nodes
181    /// in that namespace, preventing cross-project score pollution.
182    pub fn find_important_nodes(
183        &self,
184        top_k: usize,
185        damping: f64,
186        namespace: Option<&str>,
187    ) -> Result<Vec<RankedNode>, CodememError> {
188        let graph = self.lock_graph()?;
189        let scores = if let Some(ns) = namespace {
190            graph.pagerank_for_namespace(
191                ns,
192                damping,
193                codemem_core::PAGERANK_ITERATIONS_DEFAULT,
194                codemem_core::PAGERANK_TOLERANCE_DEFAULT,
195            )
196        } else {
197            graph.pagerank(
198                damping,
199                codemem_core::PAGERANK_ITERATIONS_DEFAULT,
200                codemem_core::PAGERANK_TOLERANCE_DEFAULT,
201            )
202        };
203
204        let mut sorted: Vec<(String, f64)> = scores.into_iter().collect();
205        sorted.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
206        sorted.truncate(top_k);
207
208        let results = sorted
209            .into_iter()
210            .map(|(id, score)| {
211                let node = graph.get_node(&id).ok().flatten();
212                RankedNode {
213                    id,
214                    score,
215                    kind: node.as_ref().map(|n| n.kind.to_string()),
216                    label: node.as_ref().map(|n| n.label.clone()),
217                }
218            })
219            .collect();
220
221        Ok(results)
222    }
223
224    // ── Cycle Detection ──────────────────────────────────────────────────
225
226    /// Detect circular dependencies using Tarjan's SCC algorithm.
227    /// Groups of >= 5 nodes are "critical", 3-4 are "warning", 2 are "info".
228    pub fn detect_cycles(&self) -> Result<CycleReport, CodememError> {
229        let graph = self.lock_graph()?;
230        let sccs = graph.strongly_connected_components();
231
232        let mut cycles: Vec<CycleGroup> = sccs
233            .into_iter()
234            .filter(|scc| scc.len() >= 2)
235            .map(|nodes| {
236                let size = nodes.len();
237                let severity = if size >= 5 {
238                    "critical"
239                } else if size >= 3 {
240                    "warning"
241                } else {
242                    "info"
243                }
244                .to_string();
245                CycleGroup {
246                    nodes,
247                    size,
248                    severity,
249                }
250            })
251            .collect();
252
253        cycles.sort_by(|a, b| b.size.cmp(&a.size));
254        let critical_count = cycles.iter().filter(|c| c.severity == "critical").count();
255        let total_cycles = cycles.len();
256
257        Ok(CycleReport {
258            cycles,
259            total_cycles,
260            critical_count,
261        })
262    }
263
264    // ── Temporal Queries ─────────────────────────────────────────────────
265
266    /// Return what changed between two timestamps: commits, their files, and symbols.
267    pub fn what_changed(
268        &self,
269        from: DateTime<Utc>,
270        to: DateTime<Utc>,
271        namespace: Option<&str>,
272    ) -> Result<Vec<ChangeEntry>, CodememError> {
273        if from > to {
274            return Err(CodememError::InvalidInput(
275                "'from' must be before 'to'".into(),
276            ));
277        }
278        let all_nodes = {
279            let graph = self.lock_graph()?;
280            graph.get_all_nodes()
281        };
282        let all_edges = self.storage.all_graph_edges()?;
283
284        // Find commit nodes in the time range
285        let commits: Vec<&GraphNode> = all_nodes
286            .iter()
287            .filter(|n| {
288                n.kind == NodeKind::Commit
289                    && !n.payload.contains_key("sentinel")
290                    && n.valid_from.is_some_and(|vf| vf >= from && vf <= to)
291                    && (namespace.is_none() || n.namespace.as_deref() == namespace)
292            })
293            .collect();
294
295        // Index ModifiedBy edges by dst (commit ID) for O(1) lookup
296        let mut edges_by_dst: HashMap<&str, Vec<&Edge>> = HashMap::new();
297        for edge in &all_edges {
298            if edge.relationship == RelationshipType::ModifiedBy {
299                edges_by_dst.entry(&edge.dst).or_default().push(edge);
300            }
301        }
302
303        let mut entries = Vec::new();
304        for commit in &commits {
305            let mut changed_files = Vec::new();
306            let mut changed_symbols = Vec::new();
307
308            if let Some(commit_edges) = edges_by_dst.get(commit.id.as_str()) {
309                for edge in commit_edges {
310                    if edge.src.starts_with("file:") {
311                        changed_files.push(
312                            edge.src
313                                .strip_prefix("file:")
314                                .unwrap_or(&edge.src)
315                                .to_string(),
316                        );
317                    } else if edge.src.starts_with("sym:") {
318                        changed_symbols.push(
319                            edge.src
320                                .strip_prefix("sym:")
321                                .unwrap_or(&edge.src)
322                                .to_string(),
323                        );
324                    }
325                }
326            }
327
328            entries.push(ChangeEntry {
329                commit_id: commit.id.clone(),
330                hash: commit
331                    .payload
332                    .get("hash")
333                    .and_then(|v| v.as_str())
334                    .unwrap_or("")
335                    .to_string(),
336                author: commit
337                    .payload
338                    .get("author")
339                    .and_then(|v| v.as_str())
340                    .unwrap_or("")
341                    .to_string(),
342                date: commit
343                    .payload
344                    .get("date")
345                    .and_then(|v| v.as_str())
346                    .unwrap_or("")
347                    .to_string(),
348                subject: commit
349                    .payload
350                    .get("subject")
351                    .and_then(|v| v.as_str())
352                    .unwrap_or("")
353                    .to_string(),
354                changed_symbols,
355                changed_files,
356            });
357        }
358
359        // Sort by date descending (newest first)
360        entries.sort_by(|a, b| b.date.cmp(&a.date));
361        Ok(entries)
362    }
363
364    /// Snapshot of the graph at a point in time.
365    ///
366    /// Counts nodes/edges that were valid at `at`: `valid_from <= at` and
367    /// (`valid_to` is None or `valid_to > at`). Nodes/edges with no temporal
368    /// fields are always counted (they predate the temporal layer).
369    pub fn graph_at_time(&self, at: DateTime<Utc>) -> Result<TemporalSnapshot, CodememError> {
370        let all_nodes = {
371            let graph = self.lock_graph()?;
372            graph.get_all_nodes()
373        };
374        let all_edges = self.storage.all_graph_edges()?;
375
376        let is_node_live = |n: &GraphNode| -> bool {
377            let after_start = n.valid_from.is_none_or(|vf| vf <= at);
378            let before_end = n.valid_to.is_none_or(|vt| vt > at);
379            after_start && before_end
380        };
381
382        let is_edge_live = |e: &Edge| -> bool {
383            let after_start = e.valid_from.is_none_or(|vf| vf <= at);
384            let before_end = e.valid_to.is_none_or(|vt| vt > at);
385            after_start && before_end
386        };
387
388        let live_nodes: Vec<&GraphNode> = all_nodes.iter().filter(|n| is_node_live(n)).collect();
389        let live_edges = all_edges.iter().filter(|e| is_edge_live(e)).count();
390
391        let mut kind_counts: HashMap<String, usize> = HashMap::new();
392        for node in &live_nodes {
393            *kind_counts.entry(node.kind.to_string()).or_default() += 1;
394        }
395
396        Ok(TemporalSnapshot {
397            at: at.to_rfc3339(),
398            live_nodes: live_nodes.len(),
399            live_edges,
400            node_kind_counts: kind_counts,
401        })
402    }
403
404    /// Find files that haven't been modified recently but have high centrality
405    /// or many incoming edges (potential stale code).
406    pub fn find_stale_files(
407        &self,
408        namespace: Option<&str>,
409        stale_days: u64,
410    ) -> Result<Vec<StaleFile>, CodememError> {
411        let all_nodes = {
412            let graph = self.lock_graph()?;
413            graph.get_all_nodes()
414        };
415        let all_edges = self.storage.all_graph_edges()?;
416        let stale_days = stale_days.min(3650); // Cap at 10 years
417        let cutoff = Utc::now() - chrono::Duration::days(stale_days as i64);
418
419        // Find the latest ModifiedBy edge date for each file
420        let mut file_last_modified: HashMap<&str, DateTime<Utc>> = HashMap::new();
421        for edge in &all_edges {
422            if edge.relationship == RelationshipType::ModifiedBy && edge.src.starts_with("file:") {
423                if let Some(vf) = edge.valid_from {
424                    let entry = file_last_modified.entry(&edge.src).or_insert(vf);
425                    if vf > *entry {
426                        *entry = vf;
427                    }
428                }
429            }
430        }
431
432        // Count incoming edges per file (how many things depend on it)
433        let mut incoming: HashMap<&str, usize> = HashMap::new();
434        for edge in &all_edges {
435            if edge.dst.starts_with("file:") {
436                *incoming.entry(&edge.dst).or_default() += 1;
437            }
438        }
439
440        let mut stale = Vec::new();
441        for node in &all_nodes {
442            if node.kind != NodeKind::File {
443                continue;
444            }
445            if node.valid_to.is_some() {
446                continue; // Already deleted
447            }
448            if namespace.is_some() && node.namespace.as_deref() != namespace {
449                continue;
450            }
451
452            let last_mod = file_last_modified.get(node.id.as_str()).copied();
453            // Only flag as stale if we have temporal data — files with no
454            // ModifiedBy edges haven't been through temporal ingestion yet,
455            // so we can't determine staleness.
456            let is_stale = last_mod.is_some_and(|lm| lm < cutoff);
457
458            if is_stale
459                && (node.centrality > 0.0
460                    || incoming.get(node.id.as_str()).copied().unwrap_or(0) > 0)
461            {
462                let file_path = node
463                    .id
464                    .strip_prefix("file:")
465                    .unwrap_or(&node.id)
466                    .to_string();
467                stale.push(StaleFile {
468                    file_path,
469                    centrality: node.centrality,
470                    last_modified: last_mod.map(|d| d.to_rfc3339()),
471                    incoming_edges: incoming.get(node.id.as_str()).copied().unwrap_or(0),
472                });
473            }
474        }
475
476        // Sort by centrality descending (most important stale files first)
477        stale.sort_by(|a, b| {
478            b.centrality
479                .partial_cmp(&a.centrality)
480                .unwrap_or(std::cmp::Ordering::Equal)
481        });
482        Ok(stale)
483    }
484
485    /// Detect architectural drift between two time periods.
486    ///
487    /// Compares the graph at `from` vs `to` to find: new cross-module edges,
488    /// files added/removed, hotspot files (most commits), and coupling increases
489    /// (module pairs with growing CoChanged counts).
490    pub fn detect_drift(
491        &self,
492        from: DateTime<Utc>,
493        to: DateTime<Utc>,
494        namespace: Option<&str>,
495    ) -> Result<DriftReport, CodememError> {
496        if from > to {
497            return Err(CodememError::InvalidInput(
498                "'from' must be before 'to'".into(),
499            ));
500        }
501        let all_nodes = {
502            let graph = self.lock_graph()?;
503            graph.get_all_nodes()
504        };
505        let all_edges = self.storage.all_graph_edges()?;
506
507        // Files alive at `from` vs `to`
508        let files_at = |at: DateTime<Utc>| -> HashSet<String> {
509            all_nodes
510                .iter()
511                .filter(|n| {
512                    n.kind == NodeKind::File
513                        && n.valid_from.is_none_or(|vf| vf <= at)
514                        && n.valid_to.is_none_or(|vt| vt > at)
515                        && (namespace.is_none() || n.namespace.as_deref() == namespace)
516                })
517                .map(|n| n.id.clone())
518                .collect()
519        };
520
521        let files_before = files_at(from);
522        let files_after = files_at(to);
523        let added_files = files_after.difference(&files_before).count();
524        let removed_files = files_before.difference(&files_after).count();
525
526        // Count cross-module edges added in the period
527        // A "cross-module" edge connects nodes in different top-level directories
528        let module_of = |id: &str| -> String {
529            let path = id
530                .strip_prefix("file:")
531                .or_else(|| id.strip_prefix("sym:"))
532                .unwrap_or(id);
533            path.split('/').next().unwrap_or("root").to_string()
534        };
535
536        // When namespace filtering, build the set of node IDs that belong to it
537        // so we can restrict edge-based metrics to relevant nodes.
538        let ns_node_ids: Option<HashSet<&str>> = namespace.map(|_| {
539            all_nodes
540                .iter()
541                .filter(|n| n.namespace.as_deref() == namespace)
542                .map(|n| n.id.as_str())
543                .collect()
544        });
545        let in_ns = |id: &str| -> bool { ns_node_ids.as_ref().is_none_or(|ids| ids.contains(id)) };
546
547        let new_cross_module = all_edges
548            .iter()
549            .filter(|e| {
550                e.valid_from.is_some_and(|vf| vf >= from && vf <= to)
551                    && module_of(&e.src) != module_of(&e.dst)
552                    && !matches!(
553                        e.relationship,
554                        RelationshipType::ModifiedBy | RelationshipType::PartOf
555                    )
556                    && in_ns(&e.src)
557            })
558            .count();
559
560        // Hotspot files: most ModifiedBy edges in the period
561        let mut file_commit_count: HashMap<String, usize> = HashMap::new();
562        for edge in &all_edges {
563            if edge.relationship == RelationshipType::ModifiedBy
564                && edge.src.starts_with("file:")
565                && edge.valid_from.is_some_and(|vf| vf >= from && vf <= to)
566                && in_ns(&edge.src)
567            {
568                *file_commit_count
569                    .entry(
570                        edge.src
571                            .strip_prefix("file:")
572                            .unwrap_or(&edge.src)
573                            .to_string(),
574                    )
575                    .or_default() += 1;
576            }
577        }
578        let mut hotspots: Vec<(String, usize)> = file_commit_count.into_iter().collect();
579        hotspots.sort_by(|a, b| b.1.cmp(&a.1));
580        let hotspot_files: Vec<String> = hotspots.into_iter().take(10).map(|(f, _)| f).collect();
581
582        // Coupling increases: CoChanged pairs with growing counts.
583        // CoChanged edges from enrich_git_history use created_at (not valid_from),
584        // so check both fields.
585        let mut coupling: HashMap<(String, String), usize> = HashMap::new();
586        for edge in &all_edges {
587            if edge.relationship == RelationshipType::CoChanged
588                && (edge.valid_from.is_some_and(|vf| vf >= from && vf <= to)
589                    || (edge.valid_from.is_none()
590                        && edge.created_at >= from
591                        && edge.created_at <= to))
592                && in_ns(&edge.src)
593            {
594                let pair = if edge.src < edge.dst {
595                    (edge.src.clone(), edge.dst.clone())
596                } else {
597                    (edge.dst.clone(), edge.src.clone())
598                };
599                let count = edge
600                    .properties
601                    .get("commit_count")
602                    .and_then(|v| v.as_u64())
603                    .unwrap_or(1) as usize;
604                *coupling.entry(pair).or_default() += count;
605            }
606        }
607        let mut coupling_vec: Vec<(String, String, usize)> =
608            coupling.into_iter().map(|((a, b), c)| (a, b, c)).collect();
609        coupling_vec.sort_by(|a, b| b.2.cmp(&a.2));
610        coupling_vec.truncate(10);
611
612        Ok(DriftReport {
613            period: format!("{} to {}", from.to_rfc3339(), to.to_rfc3339()),
614            new_cross_module_edges: new_cross_module,
615            removed_files,
616            added_files,
617            hotspot_files,
618            coupling_increases: coupling_vec,
619        })
620    }
621
622    /// Get the history of commits that modified a specific symbol or file.
623    pub fn symbol_history(&self, node_id: &str) -> Result<Vec<ChangeEntry>, CodememError> {
624        // Phase 1: Find commit IDs via targeted edge query (not full scan)
625        let node_edges = self.storage.get_edges_for_node(node_id)?;
626        let commit_ids: HashSet<String> = node_edges
627            .iter()
628            .filter(|e| e.src == node_id && e.relationship == RelationshipType::ModifiedBy)
629            .map(|e| e.dst.clone())
630            .collect();
631
632        if commit_ids.is_empty() {
633            return Ok(Vec::new());
634        }
635
636        // Batch-load commit nodes from graph (single lock acquisition)
637        let commit_nodes: Vec<GraphNode> = {
638            let graph = self.lock_graph()?;
639            graph
640                .get_all_nodes()
641                .into_iter()
642                .filter(|n| commit_ids.contains(&n.id))
643                .collect()
644        };
645
646        // Phase 2: For each commit, find sibling files/symbols via targeted query
647        let mut entries = Vec::new();
648        for node in &commit_nodes {
649            let commit_edges = self.storage.get_edges_for_node(&node.id)?;
650            let mut changed_files = Vec::new();
651            let mut changed_symbols = Vec::new();
652
653            for edge in &commit_edges {
654                if edge.relationship == RelationshipType::ModifiedBy && edge.dst == node.id {
655                    if edge.src.starts_with("file:") {
656                        changed_files.push(
657                            edge.src
658                                .strip_prefix("file:")
659                                .unwrap_or(&edge.src)
660                                .to_string(),
661                        );
662                    } else if edge.src.starts_with("sym:") {
663                        changed_symbols.push(
664                            edge.src
665                                .strip_prefix("sym:")
666                                .unwrap_or(&edge.src)
667                                .to_string(),
668                        );
669                    }
670                }
671            }
672
673            entries.push(ChangeEntry {
674                commit_id: node.id.clone(),
675                hash: node
676                    .payload
677                    .get("hash")
678                    .and_then(|v| v.as_str())
679                    .unwrap_or("")
680                    .to_string(),
681                author: node
682                    .payload
683                    .get("author")
684                    .and_then(|v| v.as_str())
685                    .unwrap_or("")
686                    .to_string(),
687                date: node
688                    .payload
689                    .get("date")
690                    .and_then(|v| v.as_str())
691                    .unwrap_or("")
692                    .to_string(),
693                subject: node
694                    .payload
695                    .get("subject")
696                    .and_then(|v| v.as_str())
697                    .unwrap_or("")
698                    .to_string(),
699                changed_symbols,
700                changed_files,
701            });
702        }
703
704        entries.sort_by(|a, b| b.date.cmp(&a.date));
705        Ok(entries)
706    }
707
708    // ── Test Impact Analysis ─────────────────────────────────────────
709
710    /// Find tests affected by changes to the given symbols.
711    ///
712    /// Performs manual BFS traversal of **callers** (reverse direction) from each
713    /// changed symbol, up to `max_depth` hops, following only `Calls` edges.
714    /// Discovered test nodes are split into direct (depth <= 2) and transitive
715    /// (depth 3+).
716    pub fn test_impact(
717        &self,
718        changed_symbol_ids: &[&str],
719        max_depth: usize,
720    ) -> Result<TestImpactResult, CodememError> {
721        let graph = self.lock_graph()?;
722
723        // Collect all edges once for efficient lookup.
724        // Build a reverse index: dst -> Vec<(src, relationship)>
725        let all_nodes = graph.get_all_nodes();
726        let mut callers_of: HashMap<&str, Vec<&str>> = HashMap::new();
727        // We need owned edges to build the index
728        let mut all_edges_vec: Vec<Edge> = Vec::new();
729        for node in &all_nodes {
730            if let Ok(edges) = graph.get_edges(&node.id) {
731                all_edges_vec.extend(edges);
732            }
733        }
734
735        // Build caller index: for each node, who calls it?
736        // Edge semantics: src CALLS dst, so callers of dst are src nodes.
737        for edge in &all_edges_vec {
738            if edge.relationship == RelationshipType::Calls {
739                callers_of
740                    .entry(edge.dst.as_str())
741                    .or_default()
742                    .push(edge.src.as_str());
743            }
744        }
745
746        // Build node lookup
747        let node_map: HashMap<&str, &GraphNode> =
748            all_nodes.iter().map(|n| (n.id.as_str(), n)).collect();
749
750        // Track best (minimum) depth for each discovered test
751        let mut test_depths: HashMap<String, usize> = HashMap::new();
752        let mut test_files: HashMap<String, String> = HashMap::new();
753
754        // Shared visited set across all changed symbols to avoid redundant
755        // traversal of common intermediate nodes.
756        let mut visited: HashSet<&str> = HashSet::new();
757
758        for &symbol_id in changed_symbol_ids {
759            // BFS from symbol_id, following callers (reverse Calls edges)
760            let mut queue: VecDeque<(&str, usize)> = VecDeque::new();
761
762            visited.insert(symbol_id);
763            queue.push_back((symbol_id, 0));
764
765            while let Some((current_id, depth)) = queue.pop_front() {
766                if depth >= max_depth {
767                    continue;
768                }
769
770                // Find all callers of current_id
771                if let Some(callers) = callers_of.get(current_id) {
772                    for &caller_id in callers {
773                        if visited.contains(caller_id) {
774                            continue;
775                        }
776                        visited.insert(caller_id);
777
778                        let next_depth = depth + 1;
779
780                        // Check if this node is a test
781                        if let Some(node) = node_map.get(caller_id) {
782                            if is_test_node(node) {
783                                let file_path = node
784                                    .payload
785                                    .get("file_path")
786                                    .and_then(|v| v.as_str())
787                                    .unwrap_or("")
788                                    .to_string();
789
790                                let entry = test_depths
791                                    .entry(caller_id.to_string())
792                                    .or_insert(next_depth);
793                                if next_depth < *entry {
794                                    *entry = next_depth;
795                                }
796                                test_files.entry(caller_id.to_string()).or_insert(file_path);
797                            }
798                        }
799
800                        // Continue BFS regardless of whether it's a test
801                        queue.push_back((caller_id, next_depth));
802                    }
803                }
804            }
805        }
806
807        // Split into direct (depth <= 2) and transitive (depth 3+)
808        let mut direct_tests = Vec::new();
809        let mut transitive_tests = Vec::new();
810
811        for (test_id, depth) in &test_depths {
812            let hit = TestHit {
813                test_symbol: test_id.clone(),
814                file_path: test_files.get(test_id).cloned().unwrap_or_default(),
815                depth: *depth,
816            };
817            if *depth <= 2 {
818                direct_tests.push(hit);
819            } else {
820                transitive_tests.push(hit);
821            }
822        }
823
824        // Sort by depth for deterministic output
825        direct_tests.sort_by_key(|h| h.depth);
826        transitive_tests.sort_by_key(|h| h.depth);
827
828        Ok(TestImpactResult {
829            direct_tests,
830            transitive_tests,
831            analyzed_symbols: changed_symbol_ids.iter().map(|s| s.to_string()).collect(),
832        })
833    }
834}
835
836/// Check if a graph node represents a test.
837fn is_test_node(node: &GraphNode) -> bool {
838    // Check node kind
839    if node.kind == NodeKind::Test {
840        return true;
841    }
842    // Check payload fields
843    if node.payload.get("is_test") == Some(&serde_json::json!(true)) {
844        return true;
845    }
846    if node.payload.get("kind").and_then(|v| v.as_str()) == Some("test") {
847        return true;
848    }
849    // Check file path from payload
850    if let Some(path) = node.payload.get("file_path").and_then(|v| v.as_str()) {
851        if is_test_file(path) {
852            return true;
853        }
854    }
855    // Check node label as file path fallback
856    if is_test_file(&node.label) {
857        return true;
858    }
859    false
860}
861
862/// Check if a file path matches common test file patterns.
863///
864/// Anchored to filename component to avoid false positives like
865/// `src/test_utils/helpers.rs` or `src/load_test.config.yaml`.
866fn is_test_file(path: &str) -> bool {
867    let p = path.to_lowercase();
868
869    // Directory-based: file lives inside a test directory
870    if p.contains("/tests/") || p.contains("/__tests__/") || p.starts_with("tests/") {
871        return true;
872    }
873
874    // Filename-based: extract the filename component and check patterns
875    let filename = p.rsplit('/').next().unwrap_or(&p);
876    filename.starts_with("test_")
877        || filename.contains("_test.")
878        || filename.contains(".test.")
879        || filename.contains(".spec.")
880}