Skip to main content

mati_core/graph/
graph.rs

1use std::collections::{HashMap, HashSet};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use anyhow::Result;
5use petgraph::graph::{DiGraph, NodeIndex};
6use petgraph::visit::EdgeRef;
7use petgraph::Direction;
8
9use super::edges::{Edge, EdgeKind};
10use crate::store::Store;
11
12const EDGE_PREFIX: &str = "graph:edge:";
13
14/// In-memory directed knowledge graph backed by SurrealKV.
15///
16/// Use `Graph::load` to restore the full graph from disk, or `Graph::empty`
17/// when the caller only needs store access (e.g. warm re-init with no new edges).
18/// Every mutation writes through to SurrealKV immediately — no edges are lost
19/// on restart. Traversal is fully in-memory (<1 ms).
20pub struct Graph {
21    /// petgraph directed graph. Node weights are namespaced record keys.
22    inner: DiGraph<String, EdgeKind>,
23    /// Maps a record key → its NodeIndex for O(1) lookup.
24    node_index: HashMap<String, NodeIndex>,
25    /// Tracks existing (from, kind, to) triples for O(1) duplicate detection.
26    /// Avoids an O(E) scan on every `insert_edge_in_memory` call.
27    edge_set: HashSet<(NodeIndex, NodeIndex, EdgeKind)>,
28    /// Handle to the store for write-through persistence. Private — callers
29    /// must go through Graph methods to keep in-memory and persisted state in sync.
30    store: Store,
31}
32
33impl Graph {
34    /// Wrap a Store without scanning any keys. All collections are empty.
35    /// Use when the caller needs store access but has no edges to add or read
36    /// (e.g. warm re-init with zero new edges — skips the 14k-key prefix scan).
37    pub fn empty(store: Store) -> Self {
38        Graph {
39            inner: DiGraph::new(),
40            node_index: HashMap::new(),
41            edge_set: HashSet::new(),
42            store,
43        }
44    }
45
46    /// Load the full graph from SurrealKV by scanning `graph:edge:*`.
47    pub async fn load(store: Store) -> Result<Self> {
48        let keys = store.scan_keys(EDGE_PREFIX).await?;
49        // Pre-size all collections to avoid incremental resizes during bulk insert.
50        // Each edge connects 2 nodes; upper bound is 2 × edge count unique nodes.
51        let n_edges = keys.len();
52        let n_nodes = n_edges * 2;
53        let mut g = Graph {
54            inner: DiGraph::with_capacity(n_nodes, n_edges),
55            node_index: HashMap::with_capacity(n_nodes),
56            edge_set: HashSet::with_capacity(n_edges),
57            store,
58        };
59        for key in keys {
60            if let Some(edge) = Edge::from_key(&key) {
61                g.insert_edge_in_memory(&edge);
62            }
63        }
64        Ok(g)
65    }
66
67    /// Add an edge and persist it to SurrealKV. Idempotent — skips the store
68    /// write entirely if the edge already exists in the in-memory set.
69    pub async fn add_edge(&mut self, from: &str, kind: EdgeKind, to: &str) -> Result<()> {
70        let edge = Edge::new(from, kind, to);
71        // Check before persisting: if both nodes exist and the edge is already
72        // in the set, skip the store write. Avoids bumping updated_at on every
73        // duplicate call and prevents unnecessary fsync on hot paths.
74        if let (Some(&fi), Some(&ti)) = (
75            self.node_index.get(&edge.from),
76            self.node_index.get(&edge.to),
77        ) {
78            if self.edge_set.contains(&(fi, ti, edge.kind.clone())) {
79                return Ok(());
80            }
81        }
82        persist_edge(&self.store, &edge).await?;
83        self.insert_edge_in_memory(&edge);
84        Ok(())
85    }
86
87    /// Add many edges in a single SurrealKV transaction (1 fsync for the whole
88    /// batch). Use this for Layer 0 bulk inserts — never call `add_edge` in a
89    /// loop for large graphs.
90    ///
91    /// Edges already present in `edge_set` are skipped (idempotent).
92    /// The batch is applied atomically to the store: either all new edges land
93    /// or none do (on write failure).
94    pub async fn add_edges_batch(&mut self, edges: &[(String, EdgeKind, String)]) -> Result<()> {
95        // Filter to only edges that don't already exist.
96        let new_edges: Vec<Edge> = edges
97            .iter()
98            .filter(|(from, kind, to)| {
99                match (self.node_index.get(from), self.node_index.get(to)) {
100                    (Some(&fi), Some(&ti)) => !self.edge_set.contains(&(fi, ti, kind.clone())),
101                    _ => true, // at least one node is new → edge definitely doesn't exist
102                }
103            })
104            .map(|(from, kind, to)| Edge::new(from.as_str(), kind.clone(), to.as_str()))
105            .collect();
106
107        if new_edges.is_empty() {
108            return Ok(());
109        }
110
111        // Pre-size in-memory collections to absorb the whole batch without resize.
112        let n = new_edges.len();
113        self.inner.reserve_nodes(n * 2);
114        self.inner.reserve_edges(n);
115        self.node_index.reserve(n * 2);
116        self.edge_set.reserve(n);
117
118        // Pre-compute timestamp once for the whole batch (not per-edge).
119        let now = SystemTime::now()
120            .duration_since(UNIX_EPOCH)
121            .unwrap_or_default()
122            .as_secs()
123            .to_le_bytes();
124
125        let keys: Vec<String> = new_edges.iter().map(|e| e.to_key()).collect();
126        let pairs: Vec<(&str, &[u8])> = keys.iter().map(|k| (k.as_str(), now.as_ref())).collect();
127        self.store.put_batch_raw(&pairs).await?;
128
129        // Update in-memory state only after the write succeeds.
130        for edge in &new_edges {
131            self.insert_edge_in_memory(edge);
132        }
133        Ok(())
134    }
135
136    /// Remove an edge from the in-memory graph and delete it from SurrealKV.
137    pub async fn remove_edge(&mut self, from: &str, kind: &EdgeKind, to: &str) -> Result<()> {
138        let edge = Edge::new(from, kind.clone(), to);
139        self.store.delete(&edge.to_key()).await?;
140        let from_idx = match self.node_index.get(from) {
141            Some(&i) => i,
142            None => return Ok(()),
143        };
144        let to_idx = match self.node_index.get(to) {
145            Some(&i) => i,
146            None => return Ok(()),
147        };
148        self.edge_set.remove(&(from_idx, to_idx, kind.clone()));
149        let to_remove: Vec<_> = self
150            .inner
151            .edges_connecting(from_idx, to_idx)
152            .filter(|e| e.weight() == kind)
153            .map(|e| e.id())
154            .collect();
155        for eid in to_remove {
156            self.inner.remove_edge(eid);
157        }
158        Ok(())
159    }
160
161    /// BFS traversal from `seed` following `edge_kind` edges up to `depth` hops.
162    /// Returns node keys reachable (seed itself excluded).
163    pub fn traverse(&self, seed: &str, edge_kind: &EdgeKind, depth: usize) -> Vec<String> {
164        if depth == 0 {
165            return vec![];
166        }
167        let Some(&start) = self.node_index.get(seed) else {
168            return vec![];
169        };
170        let mut visited = HashSet::new();
171        let mut queue = std::collections::VecDeque::new();
172        queue.push_back((start, 0usize));
173        visited.insert(start);
174        let mut results = vec![];
175        while let Some((node, d)) = queue.pop_front() {
176            if d >= depth {
177                continue;
178            }
179            for e in self.inner.edges(node) {
180                if e.weight() != edge_kind {
181                    continue;
182                }
183                let target = e.target();
184                if visited.insert(target) {
185                    results.push(self.inner[target].clone());
186                    queue.push_back((target, d + 1));
187                }
188            }
189        }
190        results
191    }
192
193    /// Immediate neighbors via `kind` edges (depth = 1).
194    pub fn neighbors(&self, node: &str, kind: &EdgeKind) -> Vec<String> {
195        self.traverse(node, kind, 1)
196    }
197
198    /// BFS traversal following incoming `edge_kind` edges up to `depth` hops.
199    /// Returns node keys that have a path *to* `seed` (i.e. sources, not targets).
200    /// Used for queries like "which files import this file?" (reverse Imports).
201    pub fn traverse_incoming(&self, seed: &str, edge_kind: &EdgeKind, depth: usize) -> Vec<String> {
202        if depth == 0 {
203            return vec![];
204        }
205        let Some(&start) = self.node_index.get(seed) else {
206            return vec![];
207        };
208        let mut visited = HashSet::new();
209        let mut queue = std::collections::VecDeque::new();
210        queue.push_back((start, 0usize));
211        visited.insert(start);
212        let mut results = vec![];
213        while let Some((node, d)) = queue.pop_front() {
214            if d >= depth {
215                continue;
216            }
217            for e in self.inner.edges_directed(node, Direction::Incoming) {
218                if e.weight() != edge_kind {
219                    continue;
220                }
221                let source = e.source();
222                if visited.insert(source) {
223                    results.push(self.inner[source].clone());
224                    queue.push_back((source, d + 1));
225                }
226            }
227        }
228        results
229    }
230
231    /// Immediate incoming neighbors via `kind` edges (depth = 1).
232    pub fn neighbors_incoming(&self, node: &str, kind: &EdgeKind) -> Vec<String> {
233        self.traverse_incoming(node, kind, 1)
234    }
235
236    /// Borrow the underlying store for read-only operations.
237    ///
238    /// MCP tools need both Store reads and Graph traversal. This accessor
239    /// avoids splitting ownership — the Graph owns the Store, and callers
240    /// borrow it through this method.
241    /// Build a reverse adjacency list for the given edge kind.
242    /// Maps target node key → list of source node keys.
243    /// Used by `BlastRadius::compute_all` to avoid repeated graph lookups.
244    pub fn reverse_adjacency(&self, kind: &EdgeKind) -> HashMap<String, Vec<String>> {
245        let mut adj: HashMap<String, Vec<String>> = HashMap::new();
246        for edge in self.inner.edge_references() {
247            if edge.weight() == kind {
248                let source = self.inner[edge.source()].clone();
249                let target = self.inner[edge.target()].clone();
250                adj.entry(target).or_default().push(source);
251            }
252        }
253        adj
254    }
255
256    pub fn store(&self) -> &Store {
257        &self.store
258    }
259
260    /// Flush pending writes and close the underlying store.
261    pub async fn close(self) -> Result<()> {
262        self.store.close().await
263    }
264
265    /// Number of nodes in the graph.
266    pub fn node_count(&self) -> usize {
267        self.inner.node_count()
268    }
269
270    /// Number of edges in the graph.
271    pub fn edge_count(&self) -> usize {
272        self.inner.edge_count()
273    }
274
275    // ── Private helpers ──────────────────────────────────────────────────────
276
277    fn get_or_insert_node(&mut self, key: &str) -> NodeIndex {
278        if let Some(&idx) = self.node_index.get(key) {
279            return idx;
280        }
281        let idx = self.inner.add_node(key.to_owned());
282        self.node_index.insert(key.to_owned(), idx);
283        idx
284    }
285
286    fn insert_edge_in_memory(&mut self, edge: &Edge) {
287        let from_idx = self.get_or_insert_node(&edge.from);
288        let to_idx = self.get_or_insert_node(&edge.to);
289        // O(1) duplicate check via the edge set.
290        if self.edge_set.insert((from_idx, to_idx, edge.kind.clone())) {
291            self.inner.add_edge(from_idx, to_idx, edge.kind.clone());
292        }
293    }
294}
295
296/// Persist a single edge as an 8-byte creation timestamp.
297///
298/// The key encodes the full edge (`graph:edge:<from>:<kind>:<to>`), so the
299/// stored value only needs to be a non-empty sentinel. An 8-byte LE timestamp
300/// is stored for debuggability; `Graph::load` never reads it.
301async fn persist_edge(store: &Store, edge: &Edge) -> Result<()> {
302    let now = SystemTime::now()
303        .duration_since(UNIX_EPOCH)
304        .unwrap_or_default()
305        .as_secs()
306        .to_le_bytes();
307    store.put_raw(&edge.to_key(), &now).await
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::store::Record;
314    use tempfile::TempDir;
315
316    async fn temp_graph() -> (Graph, TempDir) {
317        let dir = TempDir::new().unwrap();
318        let store = Store::open(dir.path()).await.unwrap();
319        let graph = Graph::load(store).await.unwrap();
320        (graph, dir)
321    }
322
323    fn make_edge_stub_record(key: &str) -> Record {
324        use crate::store::record::*;
325        let now = 0u64;
326        Record {
327            key: key.to_owned(),
328            value: String::new(),
329            category: Category::Stage,
330            priority: Priority::Normal,
331            tags: vec![],
332            created_at: now,
333            updated_at: now,
334            ref_url: None,
335            staleness: StalenessScore::fresh(),
336            lifecycle: RecordLifecycle::Active,
337            version: RecordVersion {
338                device_id: uuid::Uuid::nil(),
339                logical_clock: 0,
340                wall_clock: now,
341            },
342            quality: QualityScore {
343                value: 1.0,
344                tier: QualityTier::Good,
345                signals: vec![],
346                computed_at: now,
347            },
348            access_count: 0,
349            last_accessed: 0,
350            source: RecordSource::StaticAnalysis,
351            confidence: ConfidenceScore {
352                value: 1.0,
353                confirmation_count: 0,
354                contributor_count: 0,
355                last_challenged: None,
356                challenge_count: 0,
357            },
358            gap_analysis_score: 0.0,
359            payload: None,
360        }
361    }
362
363    #[tokio::test]
364    async fn load_empty_store_gives_empty_graph() {
365        let (g, _dir) = temp_graph().await;
366        assert_eq!(g.node_count(), 0);
367        assert_eq!(g.edge_count(), 0);
368    }
369
370    #[tokio::test]
371    async fn add_edge_increases_counts() {
372        let (mut g, _dir) = temp_graph().await;
373        g.add_edge("file:src/main.rs", EdgeKind::HasGotcha, "gotcha:write-txn")
374            .await
375            .unwrap();
376        assert_eq!(g.node_count(), 2);
377        assert_eq!(g.edge_count(), 1);
378    }
379
380    #[tokio::test]
381    async fn add_edge_is_idempotent() {
382        let (mut g, _dir) = temp_graph().await;
383        for _ in 0..3 {
384            g.add_edge("file:src/a.rs", EdgeKind::Imports, "file:src/b.rs")
385                .await
386                .unwrap();
387        }
388        assert_eq!(g.edge_count(), 1);
389    }
390
391    /// Duplicate add_edge must not write to the store a second time.
392    /// Verified by scanning the store prefix and checking exactly 1 record exists.
393    #[tokio::test]
394    async fn add_edge_duplicate_does_not_write_store() {
395        let dir = TempDir::new().unwrap();
396        let store = Store::open(dir.path()).await.unwrap();
397        let mut g = Graph::load(store).await.unwrap();
398        g.add_edge("file:a", EdgeKind::Imports, "file:b")
399            .await
400            .unwrap();
401        g.add_edge("file:a", EdgeKind::Imports, "file:b")
402            .await
403            .unwrap();
404        g.add_edge("file:a", EdgeKind::Imports, "file:b")
405            .await
406            .unwrap();
407        let keys = g.store.scan_keys("graph:edge:").await.unwrap();
408        assert_eq!(keys.len(), 1, "store must contain exactly 1 edge record");
409    }
410
411    #[tokio::test]
412    async fn edges_survive_reload() {
413        let dir = TempDir::new().unwrap();
414        {
415            let store = Store::open(dir.path()).await.unwrap();
416            let mut g = Graph::load(store).await.unwrap();
417            g.add_edge("file:src/a.rs", EdgeKind::CoChanges, "file:src/b.rs")
418                .await
419                .unwrap();
420            g.close().await.unwrap();
421        }
422        let store2 = Store::open(dir.path()).await.unwrap();
423        let g2 = Graph::load(store2).await.unwrap();
424        assert_eq!(g2.edge_count(), 1);
425        let neighbors = g2.neighbors("file:src/a.rs", &EdgeKind::CoChanges);
426        assert_eq!(neighbors, vec!["file:src/b.rs"]);
427    }
428
429    #[tokio::test]
430    async fn traverse_two_hops() {
431        let (mut g, _dir) = temp_graph().await;
432        g.add_edge("file:a", EdgeKind::Imports, "file:b")
433            .await
434            .unwrap();
435        g.add_edge("file:b", EdgeKind::Imports, "file:c")
436            .await
437            .unwrap();
438        g.add_edge("file:c", EdgeKind::Imports, "file:d")
439            .await
440            .unwrap();
441        let two_hop = g.traverse("file:a", &EdgeKind::Imports, 2);
442        assert!(two_hop.contains(&"file:b".to_string()));
443        assert!(two_hop.contains(&"file:c".to_string()));
444        assert!(
445            !two_hop.contains(&"file:d".to_string()),
446            "depth=2 must not reach d"
447        );
448    }
449
450    #[tokio::test]
451    async fn traverse_unknown_node_returns_empty() {
452        let (g, _dir) = temp_graph().await;
453        assert!(g
454            .traverse("file:nonexistent", &EdgeKind::Imports, 5)
455            .is_empty());
456    }
457
458    #[tokio::test]
459    async fn neighbors_returns_direct_targets_only() {
460        let (mut g, _dir) = temp_graph().await;
461        g.add_edge("file:a", EdgeKind::HasGotcha, "gotcha:x")
462            .await
463            .unwrap();
464        g.add_edge("file:a", EdgeKind::HasGotcha, "gotcha:y")
465            .await
466            .unwrap();
467        g.add_edge("file:a", EdgeKind::Imports, "file:b")
468            .await
469            .unwrap();
470        let gotchas = g.neighbors("file:a", &EdgeKind::HasGotcha);
471        assert_eq!(gotchas.len(), 2);
472        assert!(gotchas.contains(&"gotcha:x".to_string()));
473        assert!(gotchas.contains(&"gotcha:y".to_string()));
474        let imports = g.neighbors("file:a", &EdgeKind::Imports);
475        assert_eq!(imports, vec!["file:b"]);
476    }
477
478    #[tokio::test]
479    async fn traverse_does_not_cross_edge_kinds() {
480        let (mut g, _dir) = temp_graph().await;
481        g.add_edge("file:a", EdgeKind::Imports, "file:b")
482            .await
483            .unwrap();
484        g.add_edge("file:b", EdgeKind::HasGotcha, "gotcha:x")
485            .await
486            .unwrap();
487        let result = g.traverse("file:a", &EdgeKind::Imports, 5);
488        assert!(result.contains(&"file:b".to_string()));
489        assert!(!result.contains(&"gotcha:x".to_string()));
490    }
491
492    #[tokio::test]
493    async fn remove_edge_works() {
494        let (mut g, _dir) = temp_graph().await;
495        g.add_edge("file:a", EdgeKind::Imports, "file:b")
496            .await
497            .unwrap();
498        assert_eq!(g.edge_count(), 1);
499        g.remove_edge("file:a", &EdgeKind::Imports, "file:b")
500            .await
501            .unwrap();
502        assert_eq!(g.edge_count(), 0);
503        assert!(g.neighbors("file:a", &EdgeKind::Imports).is_empty());
504    }
505
506    /// remove_edge must work on edges that were loaded from the store (not just
507    /// added in the current session). After reload the edge_set is rebuilt from
508    /// scan_prefix — this test confirms remove operates on that rebuilt state.
509    #[tokio::test]
510    async fn remove_edge_after_reload() {
511        let dir = TempDir::new().unwrap();
512        {
513            let store = Store::open(dir.path()).await.unwrap();
514            let mut g = Graph::load(store).await.unwrap();
515            g.add_edge("file:a", EdgeKind::HasGotcha, "gotcha:x")
516                .await
517                .unwrap();
518            g.add_edge("file:a", EdgeKind::HasGotcha, "gotcha:y")
519                .await
520                .unwrap();
521            g.close().await.unwrap();
522        }
523        // Reload and remove one of the edges.
524        let store2 = Store::open(dir.path()).await.unwrap();
525        let mut g2 = Graph::load(store2).await.unwrap();
526        assert_eq!(g2.edge_count(), 2);
527        g2.remove_edge("file:a", &EdgeKind::HasGotcha, "gotcha:x")
528            .await
529            .unwrap();
530        assert_eq!(g2.edge_count(), 1);
531        assert!(g2
532            .neighbors("file:a", &EdgeKind::HasGotcha)
533            .iter()
534            .all(|n| n != "gotcha:x"));
535        assert!(g2
536            .neighbors("file:a", &EdgeKind::HasGotcha)
537            .contains(&"gotcha:y".to_string()));
538
539        // Reload again — the removed edge must not come back.
540        g2.close().await.unwrap();
541        let store3 = Store::open(dir.path()).await.unwrap();
542        let g3 = Graph::load(store3).await.unwrap();
543        assert_eq!(g3.edge_count(), 1);
544        assert!(g3
545            .neighbors("file:a", &EdgeKind::HasGotcha)
546            .contains(&"gotcha:y".to_string()));
547    }
548
549    #[tokio::test]
550    async fn remove_nonexistent_edge_is_noop() {
551        let (mut g, _dir) = temp_graph().await;
552        g.remove_edge("file:a", &EdgeKind::Imports, "file:b")
553            .await
554            .unwrap();
555        assert_eq!(g.edge_count(), 0);
556    }
557
558    #[tokio::test]
559    async fn ten_node_chain_correct_traversal() {
560        let (mut g, _dir) = temp_graph().await;
561        for i in 0..9usize {
562            g.add_edge(
563                &format!("file:n{i}"),
564                EdgeKind::Imports,
565                &format!("file:n{}", i + 1),
566            )
567            .await
568            .unwrap();
569        }
570        assert_eq!(g.node_count(), 10);
571        assert_eq!(g.edge_count(), 9);
572        let two_hop = g.traverse("file:n0", &EdgeKind::Imports, 2);
573        assert_eq!(two_hop.len(), 2);
574        assert!(two_hop.contains(&"file:n1".to_string()));
575        assert!(two_hop.contains(&"file:n2".to_string()));
576        let full = g.traverse("file:n0", &EdgeKind::Imports, 10);
577        assert_eq!(full.len(), 9);
578    }
579
580    /// Cycle a → b → a must not cause infinite BFS.
581    #[tokio::test]
582    async fn traverse_cycle_terminates() {
583        let (mut g, _dir) = temp_graph().await;
584        g.add_edge("file:a", EdgeKind::Imports, "file:b")
585            .await
586            .unwrap();
587        g.add_edge("file:b", EdgeKind::Imports, "file:a")
588            .await
589            .unwrap();
590        let result = g.traverse("file:a", &EdgeKind::Imports, 10);
591        // Only one unique non-seed node reachable.
592        assert_eq!(result.len(), 1);
593        assert!(result.contains(&"file:b".to_string()));
594    }
595
596    /// Diamond: a→b, a→c, b→d, c→d — d should appear exactly once.
597    #[tokio::test]
598    async fn traverse_diamond_no_duplicates() {
599        let (mut g, _dir) = temp_graph().await;
600        g.add_edge("file:a", EdgeKind::Imports, "file:b")
601            .await
602            .unwrap();
603        g.add_edge("file:a", EdgeKind::Imports, "file:c")
604            .await
605            .unwrap();
606        g.add_edge("file:b", EdgeKind::Imports, "file:d")
607            .await
608            .unwrap();
609        g.add_edge("file:c", EdgeKind::Imports, "file:d")
610            .await
611            .unwrap();
612        let result = g.traverse("file:a", &EdgeKind::Imports, 3);
613        assert_eq!(result.len(), 3, "b, c, d — no duplicates");
614        assert!(result.contains(&"file:b".to_string()));
615        assert!(result.contains(&"file:c".to_string()));
616        assert!(result.contains(&"file:d".to_string()));
617    }
618
619    /// depth=0 always returns empty, even when the seed has outgoing edges.
620    #[tokio::test]
621    async fn traverse_depth_zero_returns_empty() {
622        let (mut g, _dir) = temp_graph().await;
623        g.add_edge("file:a", EdgeKind::Imports, "file:b")
624            .await
625            .unwrap();
626        assert!(g.traverse("file:a", &EdgeKind::Imports, 0).is_empty());
627    }
628
629    /// Multiple edge kinds between the same pair are stored independently.
630    /// Removing one must not affect the other.
631    #[tokio::test]
632    async fn multiple_kinds_between_same_pair_are_independent() {
633        let (mut g, _dir) = temp_graph().await;
634        g.add_edge("file:a", EdgeKind::Imports, "file:b")
635            .await
636            .unwrap();
637        g.add_edge("file:a", EdgeKind::CoChanges, "file:b")
638            .await
639            .unwrap();
640        assert_eq!(g.edge_count(), 2);
641
642        g.remove_edge("file:a", &EdgeKind::Imports, "file:b")
643            .await
644            .unwrap();
645        assert_eq!(g.edge_count(), 1);
646        assert!(g.neighbors("file:a", &EdgeKind::Imports).is_empty());
647        assert_eq!(g.neighbors("file:a", &EdgeKind::CoChanges), vec!["file:b"]);
648    }
649
650    /// remove then add the same edge — edge_set must let the re-add through.
651    #[tokio::test]
652    async fn remove_then_readd_edge_works() {
653        let (mut g, _dir) = temp_graph().await;
654        g.add_edge("file:a", EdgeKind::Imports, "file:b")
655            .await
656            .unwrap();
657        g.remove_edge("file:a", &EdgeKind::Imports, "file:b")
658            .await
659            .unwrap();
660        assert_eq!(g.edge_count(), 0);
661        g.add_edge("file:a", EdgeKind::Imports, "file:b")
662            .await
663            .unwrap();
664        assert_eq!(g.edge_count(), 1);
665        assert_eq!(g.neighbors("file:a", &EdgeKind::Imports), vec!["file:b"]);
666    }
667
668    /// Removing edges does not shrink node_count — petgraph keeps orphan nodes.
669    #[tokio::test]
670    async fn node_count_stable_after_edge_removal() {
671        let (mut g, _dir) = temp_graph().await;
672        g.add_edge("file:a", EdgeKind::Imports, "file:b")
673            .await
674            .unwrap();
675        assert_eq!(g.node_count(), 2);
676        g.remove_edge("file:a", &EdgeKind::Imports, "file:b")
677            .await
678            .unwrap();
679        assert_eq!(g.node_count(), 2, "nodes are never removed from the graph");
680    }
681
682    /// Two disconnected components must not bleed into each other's traversal.
683    /// Graph is directed — traversal from the target of an edge must not
684    /// reach the source unless a reverse edge is also present.
685    #[tokio::test]
686    async fn traverse_incoming_finds_sources() {
687        let (mut g, _dir) = temp_graph().await;
688        g.add_edge("file:a", EdgeKind::Imports, "file:b")
689            .await
690            .unwrap();
691        g.add_edge("file:c", EdgeKind::Imports, "file:b")
692            .await
693            .unwrap();
694        // Two files import b — traverse_incoming from b should find both.
695        let sources = g.traverse_incoming("file:b", &EdgeKind::Imports, 1);
696        assert_eq!(sources.len(), 2);
697        assert!(sources.contains(&"file:a".to_string()));
698        assert!(sources.contains(&"file:c".to_string()));
699    }
700
701    #[tokio::test]
702    async fn traverse_incoming_does_not_return_outgoing() {
703        let (mut g, _dir) = temp_graph().await;
704        g.add_edge("file:a", EdgeKind::Imports, "file:b")
705            .await
706            .unwrap();
707        // traverse outgoing from a gives b; traverse_incoming from a gives nothing.
708        assert!(g
709            .traverse_incoming("file:a", &EdgeKind::Imports, 5)
710            .is_empty());
711    }
712
713    #[tokio::test]
714    async fn traverse_incoming_multi_hop() {
715        let (mut g, _dir) = temp_graph().await;
716        // Chain: c → b → a  (c imports b, b imports a)
717        g.add_edge("file:b", EdgeKind::Imports, "file:a")
718            .await
719            .unwrap();
720        g.add_edge("file:c", EdgeKind::Imports, "file:b")
721            .await
722            .unwrap();
723        // 1 hop from a: only b
724        let one = g.traverse_incoming("file:a", &EdgeKind::Imports, 1);
725        assert_eq!(one, vec!["file:b"]);
726        // 2 hops from a: b and c
727        let two = g.traverse_incoming("file:a", &EdgeKind::Imports, 2);
728        assert!(two.contains(&"file:b".to_string()));
729        assert!(two.contains(&"file:c".to_string()));
730    }
731
732    #[tokio::test]
733    async fn neighbors_incoming_returns_direct_sources_only() {
734        let (mut g, _dir) = temp_graph().await;
735        g.add_edge("file:x", EdgeKind::HasGotcha, "gotcha:y")
736            .await
737            .unwrap();
738        g.add_edge("file:z", EdgeKind::HasGotcha, "gotcha:y")
739            .await
740            .unwrap();
741        let sources = g.neighbors_incoming("gotcha:y", &EdgeKind::HasGotcha);
742        assert_eq!(sources.len(), 2);
743        assert!(sources.contains(&"file:x".to_string()));
744        assert!(sources.contains(&"file:z".to_string()));
745    }
746
747    #[tokio::test]
748    async fn traverse_is_directed() {
749        let (mut g, _dir) = temp_graph().await;
750        g.add_edge("file:a", EdgeKind::Imports, "file:b")
751            .await
752            .unwrap();
753        // Forward: a can reach b.
754        assert!(!g.traverse("file:a", &EdgeKind::Imports, 1).is_empty());
755        // Reverse: b cannot reach a — no back edge exists.
756        assert!(g.traverse("file:b", &EdgeKind::Imports, 5).is_empty());
757    }
758
759    /// Adding the same edge in two separate sessions must not duplicate it in
760    /// the store. The second session loads it from SurrealKV, detects it in
761    /// edge_set, and skips the write.
762    #[tokio::test]
763    async fn add_edge_idempotent_across_sessions() {
764        let dir = TempDir::new().unwrap();
765        for _ in 0..2 {
766            let store = Store::open(dir.path()).await.unwrap();
767            let mut g = Graph::load(store).await.unwrap();
768            g.add_edge("file:a", EdgeKind::Imports, "file:b")
769                .await
770                .unwrap();
771            g.close().await.unwrap();
772        }
773        let store = Store::open(dir.path()).await.unwrap();
774        let g = Graph::load(store).await.unwrap();
775        assert_eq!(g.edge_count(), 1);
776        // Store should also contain exactly one edge record.
777        let keys = g.store.scan_keys("graph:edge:").await.unwrap();
778        assert_eq!(keys.len(), 1);
779    }
780
781    #[tokio::test]
782    async fn disconnected_components_do_not_bleed() {
783        let (mut g, _dir) = temp_graph().await;
784        // Component 1: a → b → c
785        g.add_edge("file:a", EdgeKind::Imports, "file:b")
786            .await
787            .unwrap();
788        g.add_edge("file:b", EdgeKind::Imports, "file:c")
789            .await
790            .unwrap();
791        // Component 2: x → y
792        g.add_edge("file:x", EdgeKind::Imports, "file:y")
793            .await
794            .unwrap();
795
796        let from_a = g.traverse("file:a", &EdgeKind::Imports, 10);
797        assert!(!from_a.contains(&"file:x".to_string()));
798        assert!(!from_a.contains(&"file:y".to_string()));
799
800        let from_x = g.traverse("file:x", &EdgeKind::Imports, 10);
801        assert!(!from_x.contains(&"file:a".to_string()));
802        assert!(!from_x.contains(&"file:b".to_string()));
803    }
804
805    /// graph:edge:* keys that don't parse as valid edges (corrupt or manually
806    /// inserted records) must be silently skipped during load — not panic or error.
807    #[tokio::test]
808    async fn load_skips_unparseable_edge_keys() {
809        let dir = TempDir::new().unwrap();
810        {
811            let store = Store::open(dir.path()).await.unwrap();
812            let mut g = Graph::load(store).await.unwrap();
813
814            // Add one legitimate edge.
815            g.add_edge("file:a", EdgeKind::Imports, "file:b")
816                .await
817                .unwrap();
818
819            // Manually insert a record whose key has the graph:edge: prefix
820            // but is not a valid edge key (no parseable from/kind/to triple).
821            let corrupt_key = "graph:edge:this_is_not_parseable";
822            let record = make_edge_stub_record(corrupt_key);
823            g.store.put(corrupt_key, &record).await.unwrap();
824
825            g.close().await.unwrap();
826        }
827        // Reload — must not panic, must skip the corrupt key.
828        let store2 = Store::open(dir.path()).await.unwrap();
829        let g2 = Graph::load(store2).await.unwrap();
830        assert_eq!(g2.edge_count(), 1, "only the valid edge should be loaded");
831        assert_eq!(g2.neighbors("file:a", &EdgeKind::Imports), vec!["file:b"]);
832    }
833
834    /// Corrupt edge keys whose `to` endpoint is not a valid node key must be
835    /// skipped during load instead of creating phantom graph nodes.
836    #[tokio::test]
837    async fn load_skips_edge_keys_with_invalid_to_endpoint() {
838        let dir = TempDir::new().unwrap();
839        {
840            let store = Store::open(dir.path()).await.unwrap();
841            let mut g = Graph::load(store).await.unwrap();
842
843            g.add_edge("file:a", EdgeKind::Imports, "file:b")
844                .await
845                .unwrap();
846
847            let corrupt_key = "graph:edge:file:a:imports:unknown_ns:target";
848            let record = make_edge_stub_record(corrupt_key);
849            g.store.put(corrupt_key, &record).await.unwrap();
850
851            g.close().await.unwrap();
852        }
853
854        let store2 = Store::open(dir.path()).await.unwrap();
855        let g2 = Graph::load(store2).await.unwrap();
856        assert_eq!(g2.edge_count(), 1, "corrupt edge key must be skipped");
857        assert_eq!(g2.neighbors("file:a", &EdgeKind::Imports), vec!["file:b"]);
858        assert!(
859            !g2.node_index.contains_key("unknown_ns:target"),
860            "invalid to endpoint must not create a phantom node"
861        );
862    }
863
864    /// Load 100 pre-persisted edges from SurrealKV — validates the bulk load path.
865    #[tokio::test]
866    async fn load_100_edges_from_store() {
867        let dir = TempDir::new().unwrap();
868        let n = 100usize;
869        {
870            let store = Store::open(dir.path()).await.unwrap();
871            let mut g = Graph::load(store).await.unwrap();
872            for i in 0..n {
873                g.add_edge(
874                    &format!("file:src/mod{i}.rs"),
875                    EdgeKind::Imports,
876                    &format!("file:src/dep{i}.rs"),
877                )
878                .await
879                .unwrap();
880            }
881            g.close().await.unwrap();
882        }
883        let store2 = Store::open(dir.path()).await.unwrap();
884        let g2 = Graph::load(store2).await.unwrap();
885        assert_eq!(g2.edge_count(), n);
886        assert_eq!(g2.node_count(), n * 2);
887        assert_eq!(
888            g2.neighbors("file:src/mod0.rs", &EdgeKind::Imports),
889            vec!["file:src/dep0.rs"]
890        );
891        assert_eq!(
892            g2.neighbors(&format!("file:src/mod{}.rs", n - 1), &EdgeKind::Imports),
893            vec![format!("file:src/dep{}.rs", n - 1)]
894        );
895    }
896
897    #[tokio::test]
898    async fn add_edges_batch_correctness() {
899        let (mut g, _dir) = temp_graph().await;
900        let batch = vec![
901            (
902                "file:a".to_string(),
903                EdgeKind::Imports,
904                "file:b".to_string(),
905            ),
906            (
907                "file:a".to_string(),
908                EdgeKind::HasGotcha,
909                "gotcha:x".to_string(),
910            ),
911            (
912                "file:b".to_string(),
913                EdgeKind::CoChanges,
914                "file:c".to_string(),
915            ),
916        ];
917        g.add_edges_batch(&batch).await.unwrap();
918        assert_eq!(g.edge_count(), 3);
919        assert_eq!(g.neighbors("file:a", &EdgeKind::Imports), vec!["file:b"]);
920        assert_eq!(
921            g.neighbors("file:a", &EdgeKind::HasGotcha),
922            vec!["gotcha:x"]
923        );
924        assert_eq!(g.neighbors("file:b", &EdgeKind::CoChanges), vec!["file:c"]);
925    }
926
927    #[tokio::test]
928    async fn add_edges_batch_is_idempotent() {
929        let (mut g, _dir) = temp_graph().await;
930        let batch = vec![(
931            "file:a".to_string(),
932            EdgeKind::Imports,
933            "file:b".to_string(),
934        )];
935        g.add_edges_batch(&batch).await.unwrap();
936        g.add_edges_batch(&batch).await.unwrap();
937        g.add_edges_batch(&batch).await.unwrap();
938        assert_eq!(g.edge_count(), 1);
939        let keys = g.store.scan_keys("graph:edge:").await.unwrap();
940        assert_eq!(
941            keys.len(),
942            1,
943            "store must have exactly 1 record after duplicate batches"
944        );
945    }
946
947    #[tokio::test]
948    async fn add_edges_batch_survives_reload() {
949        let dir = TempDir::new().unwrap();
950        {
951            let store = Store::open(dir.path()).await.unwrap();
952            let mut g = Graph::load(store).await.unwrap();
953            let batch: Vec<(String, EdgeKind, String)> = (0..50)
954                .map(|i| {
955                    (
956                        format!("file:a{i}"),
957                        EdgeKind::Imports,
958                        format!("file:b{i}"),
959                    )
960                })
961                .collect();
962            g.add_edges_batch(&batch).await.unwrap();
963            g.close().await.unwrap();
964        }
965        let store2 = Store::open(dir.path()).await.unwrap();
966        let g2 = Graph::load(store2).await.unwrap();
967        assert_eq!(g2.edge_count(), 50);
968    }
969
970    /// Perf regression guard — flaky on CI runners with variable load.
971    /// Run with: `cargo test --lib add_edges_batch_faster -- --ignored`
972    #[tokio::test]
973    #[ignore]
974    async fn add_edges_batch_faster_than_sequential() {
975        use std::time::Instant;
976        let edges: Vec<(String, EdgeKind, String)> = (0..500)
977            .map(|i| {
978                (
979                    format!("file:m{i}"),
980                    EdgeKind::Imports,
981                    format!("file:d{i}"),
982                )
983            })
984            .collect();
985
986        // Sequential baseline.
987        let dir_seq = TempDir::new().unwrap();
988        let store_seq = Store::open(dir_seq.path()).await.unwrap();
989        let mut g_seq = Graph::load(store_seq).await.unwrap();
990        let seq_start = Instant::now();
991        for (from, kind, to) in &edges {
992            g_seq.add_edge(from, kind.clone(), to).await.unwrap();
993        }
994        let seq_ms = seq_start.elapsed().as_millis();
995
996        // Batch.
997        let dir_bat = TempDir::new().unwrap();
998        let store_bat = Store::open(dir_bat.path()).await.unwrap();
999        let mut g_bat = Graph::load(store_bat).await.unwrap();
1000        let bat_start = Instant::now();
1001        g_bat.add_edges_batch(&edges).await.unwrap();
1002        let bat_ms = bat_start.elapsed().as_millis();
1003
1004        assert!(
1005            bat_ms < seq_ms,
1006            "add_edges_batch ({bat_ms}ms) was not faster than sequential add_edge ({seq_ms}ms)"
1007        );
1008        assert_eq!(g_bat.edge_count(), 500);
1009    }
1010
1011    /// Stress test: 1,200 edges across all 10 EdgeKind variants — matching the
1012    /// upper bound of what Layer 0 static analysis produces on a real repo.
1013    /// Verifies correctness of edge_count, traversal, reverse traversal,
1014    /// duplicate rejection, and full persist + reload cycle.
1015    #[tokio::test]
1016    async fn stress_1200_edges_layer0_scale() {
1017        use std::time::Instant;
1018
1019        let dir = TempDir::new().unwrap();
1020        let n_files = 120usize; // 120 files × 10 kinds = 1,200 edges
1021        let all_kinds = [
1022            EdgeKind::HasGotcha,
1023            EdgeKind::Imports,
1024            EdgeKind::AffectedBy,
1025            EdgeKind::HasNote,
1026            EdgeKind::DiscoveredIn,
1027            EdgeKind::CausedBy,
1028            EdgeKind::Supersedes,
1029            EdgeKind::Touched,
1030            EdgeKind::DependencyAffects,
1031            EdgeKind::CoChanges,
1032        ];
1033
1034        // ── Session 1: build the graph via add_edges_batch ──────────────────
1035        let build_start = Instant::now();
1036        {
1037            let store = Store::open(dir.path()).await.unwrap();
1038            let mut g = Graph::load(store).await.unwrap();
1039            let batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1040                .flat_map(|i| {
1041                    all_kinds.iter().map(move |kind| {
1042                        (
1043                            format!("file:src/module{i}.rs"),
1044                            kind.clone(),
1045                            format!("file:src/target{i}.rs"),
1046                        )
1047                    })
1048                })
1049                .collect();
1050            g.add_edges_batch(&batch).await.unwrap();
1051            assert_eq!(g.edge_count(), 1_200);
1052            assert_eq!(g.node_count(), n_files * 2);
1053            g.close().await.unwrap();
1054        }
1055        let build_ms = build_start.elapsed().as_millis();
1056
1057        // ── Session 2: reload and verify ─────────────────────────────────────
1058        let load_start = Instant::now();
1059        let store2 = Store::open(dir.path()).await.unwrap();
1060        let g2 = Graph::load(store2).await.unwrap();
1061        let load_ms = load_start.elapsed().as_millis();
1062
1063        assert_eq!(g2.edge_count(), 1_200, "all edges must survive reload");
1064        assert_eq!(g2.node_count(), n_files * 2);
1065
1066        // Spot-check every kind on a mid-range file.
1067        let mid = n_files / 2;
1068        for kind in &all_kinds {
1069            let fwd = g2.neighbors(&format!("file:src/module{mid}.rs"), kind);
1070            assert_eq!(fwd.len(), 1, "forward: {kind:?}");
1071            assert_eq!(fwd[0], format!("file:src/target{mid}.rs"));
1072
1073            let rev = g2.neighbors_incoming(&format!("file:src/target{mid}.rs"), kind);
1074            assert_eq!(rev.len(), 1, "reverse: {kind:?}");
1075            assert_eq!(rev[0], format!("file:src/module{mid}.rs"));
1076        }
1077        g2.close().await.unwrap();
1078
1079        // ── Duplicate rejection at scale ─────────────────────────────────────
1080        // Re-adding all 1,200 edges must be a no-op — edge_count stays at 1,200.
1081        let store3 = Store::open(dir.path()).await.unwrap();
1082        let mut g3 = Graph::load(store3).await.unwrap();
1083        for i in 0..n_files {
1084            for kind in &all_kinds {
1085                g3.add_edge(
1086                    &format!("file:src/module{i}.rs"),
1087                    kind.clone(),
1088                    &format!("file:src/target{i}.rs"),
1089                )
1090                .await
1091                .unwrap();
1092            }
1093        }
1094        assert_eq!(
1095            g3.edge_count(),
1096            1_200,
1097            "duplicate adds must not grow the graph"
1098        );
1099        let store_keys = g3.store.scan_keys("graph:edge:").await.unwrap();
1100        assert_eq!(
1101            store_keys.len(),
1102            1_200,
1103            "store must not contain duplicate records"
1104        );
1105
1106        // ── Traversal correctness on a hub node ──────────────────────────────
1107        // Connect all 120 source files to a single hub via CoChanges.
1108        for i in 0..n_files {
1109            g3.add_edge(
1110                &format!("file:src/module{i}.rs"),
1111                EdgeKind::CoChanges,
1112                "file:src/hub.rs",
1113            )
1114            .await
1115            .unwrap();
1116        }
1117        let hub_sources = g3.neighbors_incoming("file:src/hub.rs", &EdgeKind::CoChanges);
1118        assert_eq!(
1119            hub_sources.len(),
1120            n_files,
1121            "hub must have {n_files} incoming CoChanges"
1122        );
1123
1124        // ── Timing assertions ─────────────────────────────────────────────────
1125        // Build uses add_edges_batch → 1 fsync. Load is sequential reads.
1126        // Both should be well under 1s in debug, <50ms in release.
1127        // Bounds are intentionally generous for slow CI machines.
1128        println!("stress_1200  build={build_ms}ms  load={load_ms}ms");
1129        assert!(
1130            load_ms < 500,
1131            "graph load took {load_ms}ms — expected <500ms for 1,200 edges"
1132        );
1133        assert!(
1134            build_ms < 5_000,
1135            "graph build took {build_ms}ms — expected <5s for 1,200 edges (1 fsync via batch)"
1136        );
1137    }
1138
1139    /// Stress test: 15,000 edges — realistic medium-large production repo.
1140    /// 1,500 files × 10 EdgeKind variants; exercises batch write, key-only
1141    /// scan on reload, and duplicate rejection at production scale.
1142    #[tokio::test]
1143    async fn stress_15000_edges_production_scale() {
1144        use std::time::Instant;
1145
1146        let dir = TempDir::new().unwrap();
1147        let n_files = 1_500usize; // 1,500 files × 10 kinds = 15,000 edges
1148        let all_kinds = [
1149            EdgeKind::HasGotcha,
1150            EdgeKind::Imports,
1151            EdgeKind::AffectedBy,
1152            EdgeKind::HasNote,
1153            EdgeKind::DiscoveredIn,
1154            EdgeKind::CausedBy,
1155            EdgeKind::Supersedes,
1156            EdgeKind::Touched,
1157            EdgeKind::DependencyAffects,
1158            EdgeKind::CoChanges,
1159        ];
1160
1161        // ── Session 1: build the graph via add_edges_batch ──────────────────
1162        let build_start = Instant::now();
1163        {
1164            let store = Store::open(dir.path()).await.unwrap();
1165            let mut g = Graph::load(store).await.unwrap();
1166            let batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1167                .flat_map(|i| {
1168                    all_kinds.iter().map(move |kind| {
1169                        (
1170                            format!("file:src/module{i}.rs"),
1171                            kind.clone(),
1172                            format!("file:src/target{i}.rs"),
1173                        )
1174                    })
1175                })
1176                .collect();
1177            g.add_edges_batch(&batch).await.unwrap();
1178            assert_eq!(g.edge_count(), 15_000);
1179            assert_eq!(g.node_count(), n_files * 2);
1180            g.close().await.unwrap();
1181        }
1182        let build_ms = build_start.elapsed().as_millis();
1183
1184        // ── Session 2: reload and verify ─────────────────────────────────────
1185        let load_start = Instant::now();
1186        let store2 = Store::open(dir.path()).await.unwrap();
1187        let g2 = Graph::load(store2).await.unwrap();
1188        let load_ms = load_start.elapsed().as_millis();
1189
1190        assert_eq!(g2.edge_count(), 15_000, "all edges must survive reload");
1191        assert_eq!(g2.node_count(), n_files * 2);
1192
1193        // Spot-check every kind on a mid-range file.
1194        let mid = n_files / 2;
1195        for kind in &all_kinds {
1196            let fwd = g2.neighbors(&format!("file:src/module{mid}.rs"), kind);
1197            assert_eq!(fwd.len(), 1, "forward: {kind:?}");
1198            assert_eq!(fwd[0], format!("file:src/target{mid}.rs"));
1199
1200            let rev = g2.neighbors_incoming(&format!("file:src/target{mid}.rs"), kind);
1201            assert_eq!(rev.len(), 1, "reverse: {kind:?}");
1202            assert_eq!(rev[0], format!("file:src/module{mid}.rs"));
1203        }
1204        g2.close().await.unwrap();
1205
1206        // ── Duplicate rejection at scale ─────────────────────────────────────
1207        // Re-adding the same batch must be a no-op.
1208        let store3 = Store::open(dir.path()).await.unwrap();
1209        let mut g3 = Graph::load(store3).await.unwrap();
1210        let dup_batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1211            .flat_map(|i| {
1212                all_kinds.iter().map(move |kind| {
1213                    (
1214                        format!("file:src/module{i}.rs"),
1215                        kind.clone(),
1216                        format!("file:src/target{i}.rs"),
1217                    )
1218                })
1219            })
1220            .collect();
1221        g3.add_edges_batch(&dup_batch).await.unwrap();
1222        assert_eq!(
1223            g3.edge_count(),
1224            15_000,
1225            "duplicate adds must not grow the graph"
1226        );
1227        let store_keys = g3.store.scan_keys("graph:edge:").await.unwrap();
1228        assert_eq!(
1229            store_keys.len(),
1230            15_000,
1231            "store must not contain duplicate records"
1232        );
1233
1234        // ── Traversal correctness on a hub node ──────────────────────────────
1235        // Connect all 1,500 source files to a single hub via CoChanges.
1236        let hub_batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1237            .map(|i| {
1238                (
1239                    format!("file:src/module{i}.rs"),
1240                    EdgeKind::CoChanges,
1241                    "file:src/hub.rs".to_string(),
1242                )
1243            })
1244            .collect();
1245        g3.add_edges_batch(&hub_batch).await.unwrap();
1246        let hub_sources = g3.neighbors_incoming("file:src/hub.rs", &EdgeKind::CoChanges);
1247        assert_eq!(
1248            hub_sources.len(),
1249            n_files,
1250            "hub must have {n_files} incoming CoChanges"
1251        );
1252
1253        // ── Timing assertions ─────────────────────────────────────────────────
1254        println!("stress_15000  build={build_ms}ms  load={load_ms}ms");
1255        assert!(
1256            load_ms < 2_000,
1257            "graph load took {load_ms}ms — expected <2s for 15,000 edges"
1258        );
1259        assert!(
1260            build_ms < 10_000,
1261            "graph build took {build_ms}ms — expected <10s for 15,000 edges (1 fsync via batch)"
1262        );
1263    }
1264
1265    /// Stress test: 100,000 edges — monorepo scale ceiling.
1266    /// 10,000 files × 10 EdgeKind variants; this is the upper bound for any
1267    /// real-world repo mati will encounter. Validates that batch write, key-only
1268    /// scan, and in-memory petgraph all hold up without degradation.
1269    #[tokio::test]
1270    async fn stress_100000_edges_monorepo_scale() {
1271        use std::time::Instant;
1272
1273        let dir = TempDir::new().unwrap();
1274        let n_files = 10_000usize; // 10,000 files × 10 kinds = 100,000 edges
1275        let all_kinds = [
1276            EdgeKind::HasGotcha,
1277            EdgeKind::Imports,
1278            EdgeKind::AffectedBy,
1279            EdgeKind::HasNote,
1280            EdgeKind::DiscoveredIn,
1281            EdgeKind::CausedBy,
1282            EdgeKind::Supersedes,
1283            EdgeKind::Touched,
1284            EdgeKind::DependencyAffects,
1285            EdgeKind::CoChanges,
1286        ];
1287
1288        // ── Session 1: build the graph via add_edges_batch ──────────────────
1289        let build_start = Instant::now();
1290        {
1291            let store = Store::open(dir.path()).await.unwrap();
1292            let mut g = Graph::load(store).await.unwrap();
1293            let batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1294                .flat_map(|i| {
1295                    all_kinds.iter().map(move |kind| {
1296                        (
1297                            format!("file:src/module{i}.rs"),
1298                            kind.clone(),
1299                            format!("file:src/target{i}.rs"),
1300                        )
1301                    })
1302                })
1303                .collect();
1304            g.add_edges_batch(&batch).await.unwrap();
1305            assert_eq!(g.edge_count(), 100_000);
1306            assert_eq!(g.node_count(), n_files * 2);
1307            g.close().await.unwrap();
1308        }
1309        let build_ms = build_start.elapsed().as_millis();
1310
1311        // ── Session 2: reload and verify ─────────────────────────────────────
1312        let load_start = Instant::now();
1313        let store2 = Store::open(dir.path()).await.unwrap();
1314        let g2 = Graph::load(store2).await.unwrap();
1315        let load_ms = load_start.elapsed().as_millis();
1316
1317        assert_eq!(g2.edge_count(), 100_000, "all edges must survive reload");
1318        assert_eq!(g2.node_count(), n_files * 2);
1319
1320        // Spot-check every kind on a mid-range file.
1321        let mid = n_files / 2;
1322        for kind in &all_kinds {
1323            let fwd = g2.neighbors(&format!("file:src/module{mid}.rs"), kind);
1324            assert_eq!(fwd.len(), 1, "forward: {kind:?}");
1325            assert_eq!(fwd[0], format!("file:src/target{mid}.rs"));
1326
1327            let rev = g2.neighbors_incoming(&format!("file:src/target{mid}.rs"), kind);
1328            assert_eq!(rev.len(), 1, "reverse: {kind:?}");
1329            assert_eq!(rev[0], format!("file:src/module{mid}.rs"));
1330        }
1331        g2.close().await.unwrap();
1332
1333        // ── Duplicate rejection at scale ─────────────────────────────────────
1334        let store3 = Store::open(dir.path()).await.unwrap();
1335        let mut g3 = Graph::load(store3).await.unwrap();
1336        let dup_batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1337            .flat_map(|i| {
1338                all_kinds.iter().map(move |kind| {
1339                    (
1340                        format!("file:src/module{i}.rs"),
1341                        kind.clone(),
1342                        format!("file:src/target{i}.rs"),
1343                    )
1344                })
1345            })
1346            .collect();
1347        g3.add_edges_batch(&dup_batch).await.unwrap();
1348        assert_eq!(
1349            g3.edge_count(),
1350            100_000,
1351            "duplicate adds must not grow the graph"
1352        );
1353        let store_keys = g3.store.scan_keys("graph:edge:").await.unwrap();
1354        assert_eq!(
1355            store_keys.len(),
1356            100_000,
1357            "store must not contain duplicate records"
1358        );
1359
1360        // ── Hub traversal: 10,000 incoming edges on a single node ─────────────
1361        let hub_batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1362            .map(|i| {
1363                (
1364                    format!("file:src/module{i}.rs"),
1365                    EdgeKind::CoChanges,
1366                    "file:src/hub.rs".to_string(),
1367                )
1368            })
1369            .collect();
1370        g3.add_edges_batch(&hub_batch).await.unwrap();
1371        let hub_sources = g3.neighbors_incoming("file:src/hub.rs", &EdgeKind::CoChanges);
1372        assert_eq!(
1373            hub_sources.len(),
1374            n_files,
1375            "hub must have {n_files} incoming CoChanges"
1376        );
1377
1378        // ── Timing assertions ─────────────────────────────────────────────────
1379        println!("stress_100000  build={build_ms}ms  load={load_ms}ms");
1380        assert!(
1381            load_ms < 5_000,
1382            "graph load took {load_ms}ms — expected <5s for 100,000 edges"
1383        );
1384        assert!(
1385            build_ms < 30_000,
1386            "graph build took {build_ms}ms — expected <30s for 100,000 edges (1 fsync via batch)"
1387        );
1388    }
1389
1390    /// Stress test: 700,000 edges — Linux kernel scale.
1391    /// The Linux kernel has ~70,000 source + header files. At 10 EdgeKind
1392    /// variants each this represents the absolute ceiling of what mati will
1393    /// ever index. If this passes, mati handles any real-world repo.
1394    ///
1395    /// Ignored by default — running alongside the rest of the test suite
1396    /// saturates APFS fsync + logd firehose on macOS (kernel watchdog
1397    /// panic). Run explicitly:
1398    ///   cargo test stress_700000 -- --ignored --test-threads=1 --nocapture
1399    #[tokio::test]
1400    #[ignore = "linux-scale stress: run with `cargo test stress_700000 -- --ignored --test-threads=1`"]
1401    async fn stress_700000_edges_linux_kernel_scale() {
1402        use std::time::Instant;
1403
1404        let dir = TempDir::new().unwrap();
1405        let n_files = 70_000usize; // 70,000 files × 10 kinds = 700,000 edges
1406        let all_kinds = [
1407            EdgeKind::HasGotcha,
1408            EdgeKind::Imports,
1409            EdgeKind::AffectedBy,
1410            EdgeKind::HasNote,
1411            EdgeKind::DiscoveredIn,
1412            EdgeKind::CausedBy,
1413            EdgeKind::Supersedes,
1414            EdgeKind::Touched,
1415            EdgeKind::DependencyAffects,
1416            EdgeKind::CoChanges,
1417        ];
1418
1419        // ── Step 1: batch Vec construction ──────────────────────────────────
1420        let t0 = Instant::now();
1421        let batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1422            .flat_map(|i| {
1423                all_kinds.iter().map(move |kind| {
1424                    (
1425                        format!("file:src/module{i}.rs"),
1426                        kind.clone(),
1427                        format!("file:src/target{i}.rs"),
1428                    )
1429                })
1430            })
1431            .collect();
1432        let batch_construct_ms = t0.elapsed().as_millis();
1433
1434        // ── Step 2: Store::open + Graph::load (empty) ───────────────────────
1435        let t1 = Instant::now();
1436        let store = Store::open(dir.path()).await.unwrap();
1437        let mut g = Graph::load(store).await.unwrap();
1438        let open_ms = t1.elapsed().as_millis();
1439
1440        // ── Step 3: add_edges_batch (SurrealKV writes + in-memory insert) ───
1441        let t2 = Instant::now();
1442        g.add_edges_batch(&batch).await.unwrap();
1443        let add_batch_ms = t2.elapsed().as_millis();
1444
1445        // ── Step 4: g.close() (flush / drop SurrealKV) ──────────────────────
1446        let t3 = Instant::now();
1447        assert_eq!(g.edge_count(), 700_000);
1448        g.close().await.unwrap();
1449        let close_ms = t3.elapsed().as_millis();
1450
1451        // ── Step 5: Store::open (cold reopen) ───────────────────────────────
1452        let t4 = Instant::now();
1453        let store2 = Store::open(dir.path()).await.unwrap();
1454        let reopen_ms = t4.elapsed().as_millis();
1455
1456        // ── Step 6: Graph::load (scan_keys + petgraph rebuild) ───────────────
1457        let t5 = Instant::now();
1458        let g2 = Graph::load(store2).await.unwrap();
1459        let graph_load_ms = t5.elapsed().as_millis();
1460
1461        assert_eq!(g2.edge_count(), 700_000, "all edges must survive reload");
1462        assert_eq!(g2.node_count(), n_files * 2);
1463
1464        // ── Step 7: traversal (neighbors on hub) ─────────────────────────────
1465        // Simulates a widely-included header (e.g. linux/types.h).
1466        // Close g2 first — SurrealKV only allows one writer per db path.
1467        // Spot-check before closing.
1468        let mid = n_files / 2;
1469        for kind in &all_kinds {
1470            let fwd = g2.neighbors(&format!("file:src/module{mid}.rs"), kind);
1471            assert_eq!(fwd.len(), 1, "forward: {kind:?}");
1472            let rev = g2.neighbors_incoming(&format!("file:src/target{mid}.rs"), kind);
1473            assert_eq!(rev.len(), 1, "reverse: {kind:?}");
1474        }
1475        g2.close().await.unwrap();
1476
1477        let store3 = Store::open(dir.path()).await.unwrap();
1478        let mut g3 = Graph::load(store3).await.unwrap();
1479        let hub_batch: Vec<(String, EdgeKind, String)> = (0..n_files)
1480            .map(|i| {
1481                (
1482                    format!("file:src/module{i}.rs"),
1483                    EdgeKind::Imports,
1484                    "file:include/linux/types.h".to_string(),
1485                )
1486            })
1487            .collect();
1488        g3.add_edges_batch(&hub_batch).await.unwrap();
1489        let t6 = Instant::now();
1490        let hub_sources = g3.neighbors_incoming("file:include/linux/types.h", &EdgeKind::Imports);
1491        let traversal_us = t6.elapsed().as_micros();
1492        assert_eq!(hub_sources.len(), n_files);
1493
1494        // ── Breakdown ─────────────────────────────────────────────────────────
1495        let build_ms = batch_construct_ms + open_ms + add_batch_ms + close_ms;
1496        let load_ms = reopen_ms + graph_load_ms;
1497        println!("\nstress_700000  Linux kernel scale — step breakdown");
1498        println!("  BUILD  total={build_ms}ms");
1499        println!("    batch Vec construction : {batch_construct_ms}ms");
1500        println!("    Store::open (empty)    : {open_ms}ms");
1501        println!(
1502            "    add_edges_batch        : {add_batch_ms}ms  ← SurrealKV writes + in-mem insert"
1503        );
1504        println!("    g.close / flush        : {close_ms}ms");
1505        println!("  LOAD   total={load_ms}ms");
1506        println!("    Store::open (cold)     : {reopen_ms}ms");
1507        println!(
1508            "    Graph::load (scan+petgraph): {graph_load_ms}ms  ← scan_keys + DiGraph rebuild"
1509        );
1510        println!("  TRAVERSAL");
1511        println!("    neighbors_incoming (70k edges): {traversal_us}µs  ← pure RAM");
1512
1513        assert!(build_ms < 60_000, "build took {build_ms}ms");
1514        assert!(load_ms < 30_000, "load took {load_ms}ms");
1515    }
1516}