Skip to main content

issundb_core/graph/
algo.rs

1use super::*;
2
3impl Graph {
4    // ------------------------------------------------------------------
5    // Graph algorithms
6    // ------------------------------------------------------------------
7
8    /// Depth-first search outward from `start` up to `hops` levels deep.
9    pub fn dfs(&self, start: NodeId, hops: u8) -> Result<Vec<NodeId>, Error> {
10        self.ensure_csr_fresh()?;
11        let guard = self.matrices.read();
12        let m = guard
13            .as_ref()
14            .ok_or(Error::Corrupt("matrices not initialized"))?;
15        let snap = self.csr_cache.snapshot.load();
16        self.dfs_graphblas(m, &snap, start, hops)
17    }
18
19    /// Counts variable assignments of the directed triangle pattern
20    /// `(a)-[t1]->(b)-[t2]->(c)-[t3]->(a)` under `spec`'s per-hop relationship
21    /// types and per-variable labels.
22    ///
23    /// The count follows Cypher MATCH semantics: each distinct assignment of
24    /// `(a, b, c, e1, e2, e3)` is one match, so a single 3-cycle of distinct
25    /// nodes counts once per rotation of `a` (three when all hops share one
26    /// type), parallel edges multiply, and the three relationships must be
27    /// pairwise distinct (relationship uniqueness), which only constrains
28    /// self-loop assignments where `a == b == c`.
29    pub fn count_triangle_cycles(&self, spec: &TriangleCountSpec) -> Result<u64, Error> {
30        self.ensure_csr_fresh()?;
31        let snap = self.csr_cache.snapshot.load();
32        let n = snap.dense_to_id.len();
33        if n == 0 {
34            return Ok(0);
35        }
36
37        // A named but unregistered relationship type matches nothing.
38        let mut type_ids: [Option<TypeId>; 3] = [None; 3];
39        {
40            let rtxn = self.storage.env.read_txn()?;
41            for (i, name) in spec.rel_types.iter().enumerate() {
42                if let Some(name) = name {
43                    match get_type(&self.storage, &rtxn, name)? {
44                        Some(tid) => type_ids[i] = Some(tid),
45                        None => return Ok(0),
46                    }
47                }
48            }
49        }
50
51        // Dense-index masks for the per-variable labels; `None` means
52        // unconstrained. An unknown label yields an all-false mask, which
53        // counts zero without a special case.
54        let mut masks: [Option<Vec<bool>>; 3] = [None, None, None];
55        for (i, label) in spec.labels.iter().enumerate() {
56            if let Some(name) = label {
57                let mut mask = vec![false; n];
58                for id in self.nodes_by_label(name)? {
59                    if let Some(&d) = snap.id_to_dense.get(&id) {
60                        mask[d as usize] = true;
61                    }
62                }
63                masks[i] = Some(mask);
64            }
65        }
66        let label_ok = |mask: &Option<Vec<bool>>, d: usize| mask.as_ref().is_none_or(|m| m[d]);
67
68        // Sorted typed adjacency for each hop: hop 1 and hop 2 read forward
69        // rows, hop 3 reads the transpose (edges into `a`). Hop 2 reuses the
70        // hop-1 view when the types coincide.
71        let out1 = typed_out_sorted(&snap, type_ids[0]);
72        let out2_built = if type_ids[1] == type_ids[0] {
73            None
74        } else {
75            Some(typed_out_sorted(&snap, type_ids[1]))
76        };
77        let out2 = out2_built.as_ref().unwrap_or(&out1);
78        let in3 = typed_in_sorted(&snap, type_ids[2]);
79
80        let mut total: u64 = 0;
81        for a in 0..n {
82            if !label_ok(&masks[0], a) {
83                continue;
84            }
85            let in3_row = in3.row(a);
86            if in3_row.is_empty() {
87                continue;
88            }
89            let out1_row = out1.row(a);
90
91            let mut i = 0;
92            while i < out1_row.len() {
93                let b = out1_row[i].0 as usize;
94                let run1_start = i;
95                while i < out1_row.len() && out1_row[i].0 as usize == b {
96                    i += 1;
97                }
98                if !label_ok(&masks[1], b) {
99                    continue;
100                }
101                let m1 = (i - run1_start) as u64;
102                let out2_row = out2.row(b);
103
104                // Sorted merge of the hop-2 candidates from `b` against the
105                // hop-3 sources into `a`; equal runs give parallel-edge
106                // multiplicities.
107                let (mut j, mut k) = (0, 0);
108                let mut pair_count: u64 = 0;
109                while j < out2_row.len() && k < in3_row.len() {
110                    let c2 = out2_row[j].0;
111                    let c3 = in3_row[k].0;
112                    match c2.cmp(&c3) {
113                        std::cmp::Ordering::Less => j += 1,
114                        std::cmp::Ordering::Greater => k += 1,
115                        std::cmp::Ordering::Equal => {
116                            let c = c2 as usize;
117                            let j0 = j;
118                            while j < out2_row.len() && out2_row[j].0 as usize == c {
119                                j += 1;
120                            }
121                            let k0 = k;
122                            while k < in3_row.len() && in3_row[k].0 as usize == c {
123                                k += 1;
124                            }
125                            if !label_ok(&masks[2], c) {
126                                continue;
127                            }
128                            if a == b && c == a {
129                                // Every hop is a self-loop at `a`, the one shape
130                                // where two hops can bind the same relationship.
131                                // Enumerate ordered triples of pairwise-distinct
132                                // edge IDs explicitly; this term replaces the
133                                // multiplicity product for this cell, so it is
134                                // not scaled by `m1`.
135                                for &(_, e1) in &out1_row[run1_start..run1_start + m1 as usize] {
136                                    for &(_, e2) in &out2_row[j0..j] {
137                                        if e2 == e1 {
138                                            continue;
139                                        }
140                                        for &(_, e3) in &in3_row[k0..k] {
141                                            if e3 != e1 && e3 != e2 {
142                                                total += 1;
143                                            }
144                                        }
145                                    }
146                                }
147                            } else {
148                                pair_count += ((j - j0) * (k - k0)) as u64;
149                            }
150                        }
151                    }
152                }
153                total += m1 * pair_count;
154            }
155        }
156        Ok(total)
157    }
158
159    /// Detects if there is at least one directed cycle in the graph.
160    pub fn detect_cycle(&self) -> Result<bool, Error> {
161        self.ensure_csr_fresh()?;
162        let guard = self.matrices.read();
163        let m = guard
164            .as_ref()
165            .ok_or(Error::Corrupt("matrices not initialized"))?;
166        let snap = self.csr_cache.snapshot.load();
167        self.detect_cycle_graphblas(m, &snap)
168    }
169
170    /// Returns directed neighbor entries for all outgoing and incoming edges of `node`.
171    pub fn all_neighbors(&self, node: NodeId) -> Result<Vec<DirectedNeighborEntry>, Error> {
172        let rtxn = self.storage.env.read_txn()?;
173        let mut neighbors = Vec::new();
174        for ne in self.out_neighbors_impl(&rtxn, node)? {
175            neighbors.push(DirectedNeighborEntry {
176                node: ne.node,
177                edge: ne.edge,
178                edge_type: ne.edge_type,
179                outgoing: true,
180            });
181        }
182        for ne in self.in_neighbors_impl(&rtxn, node)? {
183            neighbors.push(DirectedNeighborEntry {
184                node: ne.node,
185                edge: ne.edge,
186                edge_type: ne.edge_type,
187                outgoing: false,
188            });
189        }
190        Ok(neighbors)
191    }
192
193    /// Returns all simple paths (no repeated nodes) between `src` and `dst`.
194    pub fn all_paths(&self, src: NodeId, dst: NodeId) -> Result<Vec<Vec<NodeId>>, Error> {
195        self.ensure_csr_fresh()?;
196        let guard = self.matrices.read();
197        let m = guard
198            .as_ref()
199            .ok_or(Error::Corrupt("matrices not initialized"))?;
200        let snap = self.csr_cache.snapshot.load();
201        self.all_paths_graphblas(m, &snap, src, dst)
202    }
203
204    /// Returns all unweighted shortest paths between `src` and `dst`.
205    pub fn all_shortest_paths(&self, src: NodeId, dst: NodeId) -> Result<Vec<Vec<NodeId>>, Error> {
206        self.ensure_csr_fresh()?;
207        let guard = self.matrices.read();
208        let m = guard
209            .as_ref()
210            .ok_or(Error::Corrupt("matrices not initialized"))?;
211        let snap = self.csr_cache.snapshot.load();
212        self.all_shortest_paths_graphblas(m, &snap, src, dst)
213    }
214
215    /// Returns the longest simple path (no repeated nodes) between `src` and `dst`.
216    pub fn longest_path(&self, src: NodeId, dst: NodeId) -> Result<Option<Vec<NodeId>>, Error> {
217        self.ensure_csr_fresh()?;
218        let guard = self.matrices.read();
219        let m = guard
220            .as_ref()
221            .ok_or(Error::Corrupt("matrices not initialized"))?;
222        let snap = self.csr_cache.snapshot.load();
223        self.longest_path_graphblas(m, &snap, src, dst)
224    }
225
226    /// Computes the weighted shortest path between `src` and `dst` using Dijkstra's algorithm.
227    ///
228    /// Edge weights come from the materialized CSR snapshot, which reads the
229    /// first present of the `weight`, `cost`, `capacity`, or `cap` edge
230    /// properties, defaulting to `1.0`. The weight source is fixed: unlike
231    /// `shortest_path_top_k` and `spanning_forest`, this method does not take a
232    /// weight-property argument.
233    pub fn shortest_path_dijkstra(
234        &self,
235        src: NodeId,
236        dst: NodeId,
237    ) -> Result<Option<WeightedPath>, Error> {
238        self.ensure_csr_fresh()?;
239        let guard = self.matrices.read();
240        let m = guard
241            .as_ref()
242            .ok_or(Error::Corrupt("matrices not initialized"))?;
243        let snap = self.csr_cache.snapshot.load();
244        self.shortest_path_dijkstra_graphblas(m, &snap, src, dst)
245    }
246
247    /// Computes the Minimum or Maximum Spanning Forest (MSF) of the graph.
248    pub fn spanning_forest(
249        &self,
250        weight_property: &str,
251        maximum: bool,
252    ) -> Result<Vec<EdgeId>, Error> {
253        self.ensure_csr_fresh()?;
254        let guard = self.matrices.read();
255        let m = guard
256            .as_ref()
257            .ok_or(Error::Corrupt("matrices not initialized"))?;
258        let snap = self.csr_cache.snapshot.load();
259        self.spanning_forest_graphblas(m, &snap, weight_property, maximum)
260    }
261
262    /// Computes community detection on the graph using the Label Propagation Algorithm (LPA / CDLP).
263    pub fn label_propagation(&self, max_iterations: usize) -> Result<HashMap<NodeId, u64>, Error> {
264        self.ensure_csr_fresh()?;
265        let guard = self.matrices.read();
266        let m = guard
267            .as_ref()
268            .ok_or(Error::Corrupt("matrices not initialized"))?;
269        let snap = self.csr_cache.snapshot.load();
270        self.label_propagation_graphblas(m, &snap, max_iterations)
271    }
272
273    /// Computes the harmonic closeness centrality for all nodes in the graph.
274    pub fn harmonic_centrality(&self) -> Result<HashMap<NodeId, f64>, Error> {
275        self.ensure_csr_fresh()?;
276        let guard = self.matrices.read();
277        let m = guard
278            .as_ref()
279            .ok_or(Error::Corrupt("matrices not initialized"))?;
280        let snap = self.csr_cache.snapshot.load();
281        self.harmonic_centrality_graphblas(m, &snap)
282    }
283
284    /// Computes the betweenness centrality for all nodes in the graph.
285    pub fn betweenness_centrality(&self) -> Result<HashMap<NodeId, f64>, Error> {
286        self.ensure_csr_fresh()?;
287        let guard = self.matrices.read();
288        let m = guard
289            .as_ref()
290            .ok_or(Error::Corrupt("matrices not initialized"))?;
291        let snap = self.csr_cache.snapshot.load();
292        self.betweenness_centrality_graphblas(m, &snap)
293    }
294
295    /// Computes the strongly connected components (SCC) of the graph using Tarjan's algorithm.
296    pub fn strongly_connected_components(&self) -> Result<HashMap<NodeId, u64>, Error> {
297        self.ensure_csr_fresh()?;
298        let guard = self.matrices.read();
299        let m = guard
300            .as_ref()
301            .ok_or(Error::Corrupt("matrices not initialized"))?;
302        let snap = self.csr_cache.snapshot.load();
303        self.strongly_connected_components_graphblas(m, &snap)
304    }
305
306    /// Computes the degree centrality for all nodes in the graph based on the specified direction.
307    pub fn degree_centrality(
308        &self,
309        direction: DegreeDirection,
310    ) -> Result<HashMap<NodeId, u64>, Error> {
311        self.ensure_matrix_view()?;
312        let guard = self.matrices.read();
313        let m = guard
314            .as_ref()
315            .ok_or(Error::Corrupt("matrices not initialized"))?;
316        self.degree_centrality_graphblas(m, direction)
317    }
318
319    /// Computes the maximum flow from a source node to a sink node.
320    pub fn maximum_flow(
321        &self,
322        source: NodeId,
323        sink: NodeId,
324        capacity_property: &str,
325    ) -> Result<f64, Error> {
326        self.ensure_csr_fresh()?;
327        let guard = self.matrices.read();
328        let m = guard
329            .as_ref()
330            .ok_or(Error::Corrupt("matrices not initialized"))?;
331        let snap = self.csr_cache.snapshot.load();
332        self.maximum_flow_graphblas(m, &snap, source, sink, capacity_property)
333    }
334
335    /// Computes the K shortest paths from a source node to a destination node using Yen's algorithm.
336    pub fn shortest_path_top_k(
337        &self,
338        src: NodeId,
339        dst: NodeId,
340        k: usize,
341        weight_property: &str,
342    ) -> Result<Vec<WeightedPath>, Error> {
343        self.ensure_csr_fresh()?;
344        let guard = self.matrices.read();
345        let m = guard
346            .as_ref()
347            .ok_or(Error::Corrupt("matrices not initialized"))?;
348        let snap = self.csr_cache.snapshot.load();
349        let paths = self.shortest_path_top_k_graphblas(m, &snap, src, dst, k, weight_property)?;
350        Ok(paths
351            .into_iter()
352            .map(|(nodes, total_weight)| WeightedPath {
353                nodes,
354                total_weight,
355            })
356            .collect())
357    }
358
359    /// Breadth-first search outward from `start` up to `hops` levels deep.
360    pub fn bfs(&self, start: NodeId, hops: u8) -> Result<Vec<NodeId>, Error> {
361        self.ensure_matrix_view()?;
362        self.bfs_graphblas(start, hops)
363    }
364
365    /// Unweighted shortest path from `src` to `dst` by BFS.
366    pub fn shortest_path(&self, src: NodeId, dst: NodeId) -> Result<Option<Vec<NodeId>>, Error> {
367        self.ensure_csr_fresh()?;
368        self.shortest_path_graphblas(src, dst)
369    }
370
371    /// Iterative PageRank over the current CSR snapshot.
372    pub fn page_rank(&self, iterations: u32, damping: f32) -> Result<HashMap<NodeId, f32>, Error> {
373        self.ensure_csr_fresh()?;
374        self.page_rank_graphblas(iterations, damping)
375    }
376
377    /// Freshness gate for consumers that read the CSR snapshot: the native-CSR
378    /// algorithms (`dfs`, `strongly_connected_components`, `maximum_flow`,
379    /// `spanning_forest`, `shortest_path_top_k`, `all_paths`, `longest_path`,
380    /// `detect_cycle`) and the hybrid SpMV-plus-path-reconstruction algorithms
381    /// (`shortest_path_dijkstra`, `betweenness_centrality`, `harmonic_centrality`,
382    /// `all_shortest_paths`, `page_rank`). A full rebuild refreshes both the
383    /// snapshot and all matrices. Gated by the write generation, so it catches
384    /// edge-only drift, not just node-count changes.
385    pub(crate) fn ensure_csr_fresh(&self) -> Result<(), Error> {
386        if self.matrices.read().is_none() || self.csr_cache.snapshot_is_stale() {
387            self.rebuild_csr()?;
388        } else {
389            // A snapshot-only refresh (`ensure_snapshot_fresh`) leaves the
390            // structural delta pending, so a fresh snapshot generation does
391            // not imply fresh matrices; drain the delta into them.
392            self.ensure_matrix_view()?;
393        }
394        Ok(())
395    }
396
397    /// Freshness gate for consumers that read only the CSR snapshot (typed
398    /// expansion). Rebuilds the snapshot alone when it lags committed writes,
399    /// skipping GraphBLAS matrix materialization; the pending structural delta
400    /// stays in place for `ensure_matrix_view` to drain later.
401    pub(crate) fn ensure_snapshot_fresh(&self) -> Result<(), Error> {
402        if self.csr_cache.snapshot_is_stale() {
403            let built_gen = self.csr_cache.current_gen();
404            let snap = CsrSnapshot::build(&self.storage)?;
405            self.csr_cache.install_snapshot(snap, built_gen);
406        }
407        Ok(())
408    }
409
410    /// Freshness gate for the pure-adjacency consumers (`bfs`,
411    /// `bfs_multi_source`, untyped `expand`, `degree_centrality`,
412    /// `connected_components`), which read only `adjacency`/`adjacency_t` and the
413    /// dense mapping carried on `MatrixSet`. Applies the pending structural delta
414    /// to the cached matrices in place (resize plus per-element set/drop) in
415    /// O(delta), falling back to a full rebuild when a node was deleted (the
416    /// dense-index mapping is reshuffled) or the matrices are not yet
417    /// materialized. The take-and-apply runs under the matrices write lock, so a
418    /// reader's subsequent `matrices.read()` never observes a partial apply.
419    pub(crate) fn ensure_matrix_view(&self) -> Result<(), Error> {
420        // A node deletion or an unmaterialized matrix set needs a full rebuild,
421        // which refreshes the snapshot and all matrices from LMDB.
422        if self.matrices.read().is_none() || self.csr_cache.pending_force_full() {
423            return self.rebuild_csr();
424        }
425        // Cheap pre-check: skip the exclusive lock when nothing is pending.
426        if !self.csr_cache.has_pending() {
427            return Ok(());
428        }
429
430        let mut guard = self.matrices.write();
431        let delta = self.csr_cache.take_delta();
432        if delta.force_full {
433            // A node deletion raced in after the peek above. Drop the guard
434            // (rebuild_csr re-acquires the write lock) and rebuild from LMDB; the
435            // taken delta is superseded.
436            drop(guard);
437            return self.rebuild_csr();
438        }
439        if delta.is_empty() {
440            return Ok(());
441        }
442
443        // A removed edge clears the boolean adjacency bit only when no parallel
444        // edge between the same endpoints remains. LMDB is the fresh truth.
445        let mut clear_edges = Vec::new();
446        {
447            let rtxn = self.storage.env.read_txn()?;
448            for &(src, dst) in &delta.removed_edges {
449                let still_connected = self
450                    .out_neighbors_impl(&rtxn, src)?
451                    .into_iter()
452                    .any(|ne| ne.node == dst);
453                if !still_connected {
454                    clear_edges.push((src, dst));
455                }
456            }
457        }
458
459        if let Some(m) = guard.as_mut() {
460            m.apply_delta(&delta.added_nodes, &delta.added_edges, &clear_edges)?;
461        }
462        Ok(())
463    }
464
465    /// Returns all node IDs in the graph in ascending order.
466    pub fn all_nodes(&self) -> Result<Vec<NodeId>, Error> {
467        let rtxn = self.storage.env.read_txn()?;
468        self.all_nodes_impl(&rtxn)
469    }
470
471    pub(super) fn all_nodes_impl(&self, rtxn: &heed::RoTxn) -> Result<Vec<NodeId>, Error> {
472        let mut ids = self
473            .storage
474            .nodes
475            .iter(rtxn)?
476            .map(|r| r.map(|(k, _)| k))
477            .collect::<Result<Vec<_>, _>>()?;
478        ids.sort_unstable();
479        Ok(ids)
480    }
481
482    /// Weakly connected components via BFS treating all edges as undirected.
483    ///
484    /// Returns a map from each node ID to a component ID. Component IDs are
485    /// assigned in ascending order of first discovery and have no guaranteed
486    /// relationship to node IDs.
487    pub fn connected_components(&self) -> Result<HashMap<NodeId, u64>, Error> {
488        self.ensure_matrix_view()?;
489        {
490            let guard = self.matrices.read();
491            if let Some(m) = guard.as_ref() {
492                if m.n_nodes > 0 {
493                    return self.connected_components_graphblas(m);
494                }
495            }
496        }
497        let nodes: Vec<NodeId> = {
498            let rtxn = self.storage.env.read_txn()?;
499            self.storage
500                .nodes
501                .iter(&rtxn)?
502                .map(|r| r.map(|(k, _)| k))
503                .collect::<Result<Vec<_>, _>>()?
504        };
505
506        let mut component: HashMap<NodeId, u64> = HashMap::with_capacity(nodes.len());
507        let mut next_id: u64 = 0;
508
509        for &start in &nodes {
510            if component.contains_key(&start) {
511                continue;
512            }
513            let comp_id = next_id;
514            next_id += 1;
515            component.insert(start, comp_id);
516            let mut queue = vec![start];
517            while let Some(node) = queue.pop() {
518                for ne in self.out_neighbors(node)? {
519                    if component.insert(ne.node, comp_id).is_none() {
520                        queue.push(ne.node);
521                    }
522                }
523                for ne in self.in_neighbors(node)? {
524                    if component.insert(ne.node, comp_id).is_none() {
525                        queue.push(ne.node);
526                    }
527                }
528            }
529        }
530
531        Ok(component)
532    }
533
534    // ------------------------------------------------------------------
535    // Internals
536    // ------------------------------------------------------------------
537
538    /// Increment the dirty counter and, if the threshold is crossed and no
539    /// rebuild is already running, spawn a background thread to rebuild the
540    /// CSR snapshot from LMDB.
541    pub(super) fn maybe_spawn_rebuild(&self) {
542        self.maybe_spawn_rebuild_n(1);
543    }
544
545    pub(super) fn maybe_spawn_rebuild_n(&self, count: usize) {
546        if self.csr_cache.mark_dirty_n(count as u64) {
547            let cache = Arc::clone(&self.csr_cache);
548            let storage = Arc::clone(&self.storage);
549            let matrices = Arc::clone(&self.matrices);
550            let thread_count = Arc::clone(&self.n_threads);
551            std::thread::spawn(move || {
552                // Rebuild until the dirty count drops below the threshold: writes
553                // that commit while a rebuild runs keep the count above zero, and
554                // `install` retains the claim and asks for another pass so the
555                // snapshot does not silently lag behind LMDB.
556                loop {
557                    // Capture the generation before reading LMDB; writes that
558                    // commit during the build leave the snapshot stale until the
559                    // next pass, which the dirty-count loop already drives.
560                    let built_gen = cache.current_gen();
561                    // Clear before reading LMDB so writes during the build are
562                    // retained in the emptied delta for a later incremental apply.
563                    cache.clear_delta();
564                    match CsrSnapshot::build(&storage) {
565                        Ok(snap) => {
566                            if let Ok(m) = MatrixSet::materialize(
567                                &snap,
568                                thread_count.load(std::sync::atomic::Ordering::Acquire),
569                            ) {
570                                *matrices.write() = Some(m);
571                            }
572                            if !cache.install(snap, built_gen) {
573                                break;
574                            }
575                        }
576                        Err(_) => {
577                            cache.cancel_rebuild();
578                            break;
579                        }
580                    }
581                }
582            });
583        }
584    }
585
586    /// Append one `AdjEntry` as a new LMDB duplicate value: O(log n), no blob read.
587    pub(super) fn append_adj(
588        &self,
589        wtxn: &mut heed::RwTxn,
590        node: NodeId,
591        other: NodeId,
592        edge_type: u32,
593        edge_id: EdgeId,
594        outgoing: bool,
595    ) -> Result<(), Error> {
596        let entry = AdjEntry {
597            edge_type,
598            other,
599            edge_id,
600        };
601        let db = if outgoing {
602            &self.storage.out_adj
603        } else {
604            &self.storage.in_adj
605        };
606        db.put(wtxn, &node, entry.as_bytes())?;
607        Ok(())
608    }
609
610    /// Iterate all duplicate `AdjEntry` values for `node` via LMDB cursor.
611    pub(super) fn adj_entries(
612        &self,
613        node: NodeId,
614        outgoing: bool,
615    ) -> Result<Vec<NeighborEntry>, Error> {
616        let rtxn = self.storage.env.read_txn()?;
617        self.adj_entries_impl(&rtxn, node, outgoing)
618    }
619
620    pub(super) fn adj_entries_impl(
621        &self,
622        rtxn: &heed::RoTxn,
623        node: NodeId,
624        outgoing: bool,
625    ) -> Result<Vec<NeighborEntry>, Error> {
626        let db = if outgoing {
627            &self.storage.out_adj
628        } else {
629            &self.storage.in_adj
630        };
631
632        let iter = match db.get_duplicates(rtxn, &node)? {
633            Some(iter) => iter,
634            None => return Ok(vec![]),
635        };
636
637        let mut out = Vec::new();
638        for result in iter {
639            let (_, bytes) = result?;
640            let entry = AdjEntry::read_from_bytes(bytes)
641                .ok()
642                .ok_or(Error::Corrupt("AdjEntry value is not exactly 20 bytes"))?;
643            out.push(NeighborEntry {
644                node: entry.other,
645                edge: entry.edge_id,
646                edge_type: entry.edge_type,
647            });
648        }
649        Ok(out)
650    }
651}
652
653/// Per-row adjacency restricted to one relationship type, with each row
654/// sorted by `(neighbor, edge id)` so intersections run as sorted merges and
655/// parallel edges form contiguous runs.
656struct TypedSortedAdj {
657    ptr: Vec<usize>,
658    adj: Vec<(u32, EdgeId)>,
659}
660
661impl TypedSortedAdj {
662    fn row(&self, d: usize) -> &[(u32, EdgeId)] {
663        &self.adj[self.ptr[d]..self.ptr[d + 1]]
664    }
665}
666
667/// Forward adjacency from the CSR snapshot filtered to `type_id` (`None`
668/// keeps every edge), rows sorted by `(dst, edge id)`.
669fn typed_out_sorted(snap: &CsrSnapshot, type_id: Option<TypeId>) -> TypedSortedAdj {
670    let n = snap.dense_to_id.len();
671    let keep = |idx: usize| type_id.is_none_or(|t| snap.edge_type[idx] == t);
672
673    let mut ptr = vec![0usize; n + 1];
674    for row in 0..n {
675        let mut count = 0;
676        for idx in snap.row_ptr[row]..snap.row_ptr[row + 1] {
677            if keep(idx) {
678                count += 1;
679            }
680        }
681        ptr[row + 1] = ptr[row] + count;
682    }
683
684    let mut adj = vec![(0u32, 0u64); ptr[n]];
685    for row in 0..n {
686        let mut at = ptr[row];
687        for idx in snap.row_ptr[row]..snap.row_ptr[row + 1] {
688            if keep(idx) {
689                adj[at] = (snap.col_idx[idx], snap.edge_id[idx]);
690                at += 1;
691            }
692        }
693        adj[ptr[row]..at].sort_unstable();
694    }
695    TypedSortedAdj { ptr, adj }
696}
697
698/// Transposed adjacency (edges grouped by destination) filtered to
699/// `type_id`, rows sorted by `(src, edge id)`.
700fn typed_in_sorted(snap: &CsrSnapshot, type_id: Option<TypeId>) -> TypedSortedAdj {
701    let n = snap.dense_to_id.len();
702    let keep = |idx: usize| type_id.is_none_or(|t| snap.edge_type[idx] == t);
703
704    let mut ptr = vec![0usize; n + 1];
705    for idx in 0..snap.col_idx.len() {
706        if keep(idx) {
707            ptr[snap.col_idx[idx] as usize + 1] += 1;
708        }
709    }
710    for d in 0..n {
711        ptr[d + 1] += ptr[d];
712    }
713
714    let mut at = ptr.clone();
715    let mut adj = vec![(0u32, 0u64); ptr[n]];
716    for row in 0..n {
717        for idx in snap.row_ptr[row]..snap.row_ptr[row + 1] {
718            if keep(idx) {
719                let dst = snap.col_idx[idx] as usize;
720                adj[at[dst]] = (row as u32, snap.edge_id[idx]);
721                at[dst] += 1;
722            }
723        }
724    }
725    for d in 0..n {
726        adj[ptr[d]..ptr[d + 1]].sort_unstable();
727    }
728    TypedSortedAdj { ptr, adj }
729}
730
731#[cfg(test)]
732mod incremental_matrix_tests {
733    use issundb_graphblas::Matrix;
734    use serde_json::json;
735    use tempfile::TempDir;
736
737    use std::collections::{BTreeMap, HashMap};
738
739    use crate::Graph;
740    use crate::graph::DegreeDirection;
741    use crate::schema::NodeId;
742
743    /// Adjacency coordinates, transpose coordinates, and the dense-index mapping:
744    /// the matrix-view state the incremental path maintains.
745    type MatrixView = (Vec<(usize, usize)>, Vec<(usize, usize)>, Vec<NodeId>);
746
747    /// Canonicalize a component map to its underlying partition (each node mapped
748    /// to the smallest node id in its component), so two results compare equal
749    /// regardless of the arbitrary component-id numbering.
750    fn canonical_partition(cc: &HashMap<NodeId, u64>) -> BTreeMap<NodeId, NodeId> {
751        let mut groups: HashMap<u64, Vec<NodeId>> = HashMap::new();
752        for (&node, &comp) in cc {
753            groups.entry(comp).or_default().push(node);
754        }
755        let mut out = BTreeMap::new();
756        for members in groups.into_values() {
757            let rep = *members.iter().min().unwrap();
758            for n in members {
759                out.insert(n, rep);
760            }
761        }
762        out
763    }
764
765    /// Sorted, deduplicated `(row, col)` coordinates of a boolean adjacency
766    /// matrix, for set comparison independent of internal storage order.
767    fn matrix_coords(m: &Matrix<i32>) -> Vec<(usize, usize)> {
768        let mut out: Vec<(usize, usize)> = m
769            .triples()
770            .expect("triples")
771            .into_iter()
772            .map(|(r, c, _)| (r, c))
773            .collect();
774        out.sort_unstable();
775        out.dedup();
776        out
777    }
778
779    /// Snapshot the matrix-view state that the incremental path maintains:
780    /// adjacency coordinates, transpose coordinates, and the dense-index mapping.
781    fn extract(graph: &Graph) -> MatrixView {
782        let guard = graph.matrices.read();
783        let m = guard.as_ref().expect("matrices materialized");
784        (
785            matrix_coords(&m.adjacency),
786            matrix_coords(&m.adjacency_t),
787            m.dense_to_id.clone(),
788        )
789    }
790
791    /// The incrementally-maintained matrices must be byte-identical (as element
792    /// sets and dense mapping) to a full rebuild over the same final LMDB state.
793    /// Because the incremental matrices equal the freshly-built ones, any
794    /// consumer reading them sees every committed mutation: this is the freshness
795    /// proof as well as the correctness proof.
796    #[test]
797    fn incremental_matrices_match_full_rebuild() {
798        let dir = TempDir::new().unwrap();
799        let g = Graph::open(dir.path(), 1).unwrap();
800
801        // Base graph: a 20-node ring.
802        let ids: Vec<NodeId> = (0..20)
803            .map(|i| g.add_node("N", &json!({ "v": i })).unwrap())
804            .collect();
805        let mut base_edges = Vec::new();
806        for i in 0..20 {
807            base_edges.push(
808                g.add_edge(ids[i], ids[(i + 1) % 20], "R", &json!({}))
809                    .unwrap(),
810            );
811        }
812        // Establish the base matrices and clear the pending delta.
813        g.rebuild_csr().unwrap();
814
815        // Mutations recorded into the delta:
816        // 1. New edges among existing nodes.
817        g.add_edge(ids[0], ids[5], "R", &json!({})).unwrap();
818        g.add_edge(ids[3], ids[10], "R", &json!({})).unwrap();
819        // 2. Parallel edges, then remove one: the adjacency bit must stay set.
820        let par_a = g.add_edge(ids[2], ids[4], "R", &json!({})).unwrap();
821        let _par_b = g.add_edge(ids[2], ids[4], "R", &json!({})).unwrap();
822        // 3. New nodes with edges (matrix must grow).
823        let n20 = g.add_node("N", &json!({ "v": 20 })).unwrap();
824        let n21 = g.add_node("N", &json!({ "v": 21 })).unwrap();
825        g.add_edge(n20, n21, "R", &json!({})).unwrap();
826        g.add_edge(ids[1], n20, "R", &json!({})).unwrap();
827        // 4. Remove an edge with no parallel: the adjacency bit must clear.
828        g.delete_edge(base_edges[7]).unwrap();
829        // 5. Remove one of the parallel pair (the other still connects the pair).
830        g.delete_edge(par_a).unwrap();
831
832        // Incremental refresh, then snapshot.
833        g.ensure_matrix_view().unwrap();
834        let incremental = extract(&g);
835
836        // Full rebuild over the same LMDB state, then snapshot.
837        g.rebuild_csr().unwrap();
838        let full = extract(&g);
839
840        assert_eq!(incremental.0, full.0, "adjacency element sets differ");
841        assert_eq!(incremental.1, full.1, "adjacency_t element sets differ");
842        assert_eq!(incremental.2, full.2, "dense-index mapping differs");
843    }
844
845    /// A node deletion reshuffles dense indices, so the refresh must fall back to
846    /// a full rebuild and still match.
847    #[test]
848    fn node_deletion_forces_full_rebuild_and_matches() {
849        let dir = TempDir::new().unwrap();
850        let g = Graph::open(dir.path(), 1).unwrap();
851        let ids: Vec<NodeId> = (0..10)
852            .map(|i| g.add_node("N", &json!({ "v": i })).unwrap())
853            .collect();
854        for i in 0..10 {
855            g.add_edge(ids[i], ids[(i + 1) % 10], "R", &json!({}))
856                .unwrap();
857        }
858        g.rebuild_csr().unwrap();
859
860        // Delete a node (cascades its edges) and add a fresh edge.
861        g.delete_node(ids[3]).unwrap();
862        g.add_edge(ids[5], ids[7], "R", &json!({})).unwrap();
863
864        g.ensure_matrix_view().unwrap();
865        let incremental = extract(&g);
866        g.rebuild_csr().unwrap();
867        let full = extract(&g);
868
869        assert_eq!(incremental.0, full.0, "adjacency element sets differ");
870        assert_eq!(incremental.1, full.1, "adjacency_t element sets differ");
871        assert_eq!(incremental.2, full.2, "dense-index mapping differs");
872    }
873
874    /// Go/no-go measurement (ignored by default; the build dominates runtime).
875    /// Run with:
876    /// `cargo test -p issundb-core --release incremental_apply_cost -- --ignored --nocapture`
877    #[test]
878    #[ignore = "measurement: prints incremental-apply vs full-rebuild timings"]
879    fn incremental_apply_cost() {
880        use std::time::Instant;
881
882        fn measure(n_nodes: usize, out_degree: usize, k_added: usize) {
883            let dir = TempDir::new().unwrap();
884            let g = Graph::open(dir.path(), 4).unwrap();
885            // Build the base graph in one batched transaction: individual commits
886            // would dominate the runtime and swamp the measurement.
887            let ids: Vec<NodeId> = g
888                .update(|txn| {
889                    let ids: Vec<NodeId> = (0..n_nodes)
890                        .map(|i| txn.add_node("N", &json!({ "v": i })).unwrap())
891                        .collect();
892                    for i in 0..n_nodes {
893                        for k in 0..out_degree {
894                            let off = 1 + k * 7;
895                            txn.add_edge(ids[i], ids[(i + off) % n_nodes], "R", &json!({}))
896                                .unwrap();
897                        }
898                    }
899                    Ok(ids)
900                })
901                .unwrap();
902            g.rebuild_csr().unwrap();
903
904            // Stage `k_added` new edges among existing nodes, then time the
905            // incremental apply of exactly that delta.
906            for j in 0..k_added {
907                let a = (j * 31) % n_nodes;
908                let b = (j * 97 + 5) % n_nodes;
909                g.add_edge(ids[a], ids[b], "R", &json!({})).unwrap();
910            }
911            let t = Instant::now();
912            g.ensure_matrix_view().unwrap();
913            let incr = t.elapsed();
914
915            // Full rebuild is independent of the delta size: it is the cost the
916            // incremental path replaces.
917            let mut best_full = std::time::Duration::from_secs(3600);
918            for _ in 0..3 {
919                let t = Instant::now();
920                g.rebuild_csr().unwrap();
921                let e = t.elapsed();
922                if e < best_full {
923                    best_full = e;
924                }
925            }
926            let n_edges = n_nodes * out_degree + k_added;
927            println!(
928                "{:>7} nodes, {:>9} edges: incremental apply of {} edges = {:>8.3} ms; full rebuild = {:>8.2} ms",
929                n_nodes,
930                n_edges,
931                k_added,
932                incr.as_secs_f64() * 1e3,
933                best_full.as_secs_f64() * 1e3,
934            );
935        }
936
937        measure(10_000, 5, 1_000);
938        measure(50_000, 5, 1_000);
939        measure(100_000, 5, 1_000);
940    }
941
942    /// End-to-end differential check: the migrated matrix-view consumers (`bfs`,
943    /// `degree_centrality`, `connected_components`) must return identical results
944    /// whether refreshed incrementally or via a forced full rebuild, over a
945    /// mutation battery including a new node reached through a new edge.
946    #[test]
947    fn incremental_consumers_match_full_rebuild() {
948        let dir = TempDir::new().unwrap();
949        let g = Graph::open(dir.path(), 1).unwrap();
950        let ids: Vec<NodeId> = (0..15)
951            .map(|i| g.add_node("N", &json!({ "v": i })).unwrap())
952            .collect();
953        for i in 0..15 {
954            g.add_edge(ids[i], ids[(i + 1) % 15], "R", &json!({}))
955                .unwrap();
956        }
957        g.rebuild_csr().unwrap();
958
959        // Mutations recorded into the delta, with no rebuild in between.
960        g.add_edge(ids[0], ids[7], "R", &json!({})).unwrap();
961        let n15 = g.add_node("N", &json!({ "v": 15 })).unwrap();
962        g.add_edge(ids[2], n15, "R", &json!({})).unwrap();
963        g.add_edge(n15, ids[5], "R", &json!({})).unwrap();
964
965        // Results via the incremental matrix-view path.
966        let bfs_incr = {
967            let mut v = g.bfs(ids[0], 3).unwrap();
968            v.sort_unstable();
969            v
970        };
971        let deg_incr = g.degree_centrality(DegreeDirection::Both).unwrap();
972        let cc_incr = canonical_partition(&g.connected_components().unwrap());
973
974        // Results via a forced full rebuild over the same LMDB state.
975        g.rebuild_csr().unwrap();
976        let bfs_full = {
977            let mut v = g.bfs(ids[0], 3).unwrap();
978            v.sort_unstable();
979            v
980        };
981        let deg_full = g.degree_centrality(DegreeDirection::Both).unwrap();
982        let cc_full = canonical_partition(&g.connected_components().unwrap());
983
984        assert_eq!(bfs_incr, bfs_full, "bfs: incremental vs full rebuild");
985        assert_eq!(deg_incr, deg_full, "degree: incremental vs full rebuild");
986        assert_eq!(cc_incr, cc_full, "components: incremental vs full rebuild");
987    }
988
989    /// Freshness: a matrix-view consumer reflects an edge, and a brand-new node
990    /// reached through a new edge, with no explicit `rebuild_csr` between the
991    /// write and the read. This is the edge-drift bug the migration closes.
992    #[test]
993    fn matrix_view_consumers_reflect_writes_without_rebuild() {
994        let dir = TempDir::new().unwrap();
995        let g = Graph::open(dir.path(), 1).unwrap();
996        let a = g.add_node("N", &json!({})).unwrap();
997        let b = g.add_node("N", &json!({})).unwrap();
998        g.rebuild_csr().unwrap();
999        assert!(
1000            !g.bfs(a, 5).unwrap().contains(&b),
1001            "b is unreachable before the edge exists"
1002        );
1003
1004        // Edge between existing nodes, no rebuild: the incremental view sees it.
1005        g.add_edge(a, b, "R", &json!({})).unwrap();
1006        assert!(
1007            g.bfs(a, 1).unwrap().contains(&b),
1008            "b reachable from a after the edge, without a rebuild"
1009        );
1010
1011        // A brand-new node reached through a new edge, still no rebuild: this
1012        // exercises the matrix resize plus dense-mapping extension end to end.
1013        let c = g.add_node("N", &json!({})).unwrap();
1014        g.add_edge(b, c, "R", &json!({})).unwrap();
1015        assert!(
1016            g.bfs(a, 2).unwrap().contains(&c),
1017            "new node c reachable two hops from a, without a rebuild"
1018        );
1019    }
1020
1021    /// Freshness for the CSR-snapshot consumers: a generation-gated rebuild makes
1022    /// a native-CSR algorithm (`all_paths`) reflect an edge added with no explicit
1023    /// `rebuild_csr`.
1024    #[test]
1025    fn csr_consumers_reflect_writes_without_rebuild() {
1026        let dir = TempDir::new().unwrap();
1027        let g = Graph::open(dir.path(), 1).unwrap();
1028        let a = g.add_node("N", &json!({})).unwrap();
1029        let b = g.add_node("N", &json!({})).unwrap();
1030        let c = g.add_node("N", &json!({})).unwrap();
1031        g.add_edge(a, b, "R", &json!({})).unwrap();
1032        g.rebuild_csr().unwrap();
1033        assert!(
1034            g.all_paths(a, c).unwrap().is_empty(),
1035            "no path a..c before the edge exists"
1036        );
1037
1038        // Edge b->c, no rebuild: the write-generation gate forces a refresh.
1039        g.add_edge(b, c, "R", &json!({})).unwrap();
1040        assert!(
1041            !g.all_paths(a, c).unwrap().is_empty(),
1042            "path a->b->c reflected without an explicit rebuild"
1043        );
1044    }
1045
1046    /// After a write, `ensure_matrix_view` applies the delta with
1047    /// `GrB_Matrix_setElement` (lazy in non-blocking mode), then drops the write
1048    /// lock. Multiple `bfs` calls then take the shared `matrices.read()` lock and
1049    /// run `mxv` concurrently. If the pending operations were not materialized
1050    /// under the write lock, the first `mxv` triggers GraphBLAS lazy completion,
1051    /// which mutates the shared matrix's internal representation while other
1052    /// readers race on it: undefined behavior. With the fix (`apply_delta`
1053    /// materializes the adjacency matrices before releasing the write lock),
1054    /// every concurrent `bfs` returns the full reachable set deterministically.
1055    #[test]
1056    fn concurrent_bfs_after_incremental_write_is_consistent() {
1057        use std::sync::Barrier;
1058
1059        let dir = TempDir::new().unwrap();
1060        let g = Graph::open(dir.path(), 1).unwrap();
1061
1062        // A chain 0 -> 1 -> ... -> 29: bfs from node 0 reaches all 30 nodes.
1063        const N: usize = 30;
1064        let start = g.add_node("N", &json!({ "v": 0 })).unwrap();
1065        let mut prev = start;
1066        for i in 1..N {
1067            let node = g.add_node("N", &json!({ "v": i })).unwrap();
1068            g.add_edge(prev, node, "R", &json!({})).unwrap();
1069            prev = node;
1070        }
1071        g.rebuild_csr().unwrap();
1072
1073        const THREADS: usize = 6;
1074        const ROUNDS: usize = 200;
1075        let mut expected = N;
1076        for r in 0..ROUNDS {
1077            // Attach a fresh node directly to `start`. The edge start -> new is a
1078            // brand-new matrix coordinate, so `apply_delta` records a pending
1079            // `setElement` (lazy in non-blocking mode), re-opening the
1080            // lazy-completion race window. The reachable set from `start` grows by
1081            // exactly one, keeping the expected count deterministic.
1082            let leaf = g.add_node("N", &json!({ "leaf": r })).unwrap();
1083            g.add_edge(start, leaf, "R", &json!({})).unwrap();
1084            expected += 1;
1085
1086            let barrier = Barrier::new(THREADS);
1087            std::thread::scope(|s| {
1088                for _ in 0..THREADS {
1089                    let g = &g;
1090                    let barrier = &barrier;
1091                    s.spawn(move || {
1092                        // Synchronize so the threads reach the shared-read `mxv`
1093                        // together, maximizing the overlap on the pending matrix.
1094                        barrier.wait();
1095                        let reached = g.bfs(start, u8::MAX).unwrap();
1096                        assert_eq!(
1097                            reached.len(),
1098                            expected,
1099                            "concurrent bfs saw a partially materialized matrix"
1100                        );
1101                    });
1102                }
1103            });
1104        }
1105    }
1106}
1107
1108#[cfg(test)]
1109mod triangle_cycle_count_tests {
1110    use serde_json::json;
1111    use tempfile::TempDir;
1112
1113    use crate::{Graph, TriangleCountSpec};
1114
1115    fn open_tmp() -> (TempDir, Graph) {
1116        let dir = TempDir::new().unwrap();
1117        let g = Graph::open(dir.path(), 1).unwrap();
1118        (dir, g)
1119    }
1120
1121    fn spec_all<'a>(rel: &'a str, label: &'a str) -> TriangleCountSpec<'a> {
1122        TriangleCountSpec {
1123            rel_types: [Some(rel); 3],
1124            labels: [Some(label); 3],
1125        }
1126    }
1127
1128    /// One directed 3-cycle of distinct nodes matches once per rotation of
1129    /// `a`: three assignments, exactly what MATCH row semantics produce.
1130    #[test]
1131    fn single_cycle_counts_one_per_rotation() {
1132        let (_dir, g) = open_tmp();
1133        let a = g.add_node("Person", &json!({})).unwrap();
1134        let b = g.add_node("Person", &json!({})).unwrap();
1135        let c = g.add_node("Person", &json!({})).unwrap();
1136        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1137        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1138        g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1139
1140        let n = g
1141            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1142            .unwrap();
1143        assert_eq!(n, 3);
1144    }
1145
1146    /// A non-cycle triangle orientation (two edges out of one node) is not a
1147    /// directed cycle and must not count.
1148    #[test]
1149    fn non_cyclic_orientation_does_not_count() {
1150        let (_dir, g) = open_tmp();
1151        let a = g.add_node("Person", &json!({})).unwrap();
1152        let b = g.add_node("Person", &json!({})).unwrap();
1153        let c = g.add_node("Person", &json!({})).unwrap();
1154        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1155        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1156        g.add_edge(a, c, "KNOWS", &json!({})).unwrap();
1157
1158        let n = g
1159            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1160            .unwrap();
1161        assert_eq!(n, 0);
1162    }
1163
1164    /// Parallel edges are distinct relationships; doubling one hop doubles
1165    /// every assignment that uses it.
1166    #[test]
1167    fn parallel_edges_multiply() {
1168        let (_dir, g) = open_tmp();
1169        let a = g.add_node("Person", &json!({})).unwrap();
1170        let b = g.add_node("Person", &json!({})).unwrap();
1171        let c = g.add_node("Person", &json!({})).unwrap();
1172        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1173        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1174        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1175        g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1176
1177        let n = g
1178            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1179            .unwrap();
1180        assert_eq!(n, 6);
1181    }
1182
1183    /// Per-hop types are positional: a cycle whose third edge has a different
1184    /// type matches only the rotation whose hop order lines up with the spec.
1185    #[test]
1186    fn per_hop_types_are_positional() {
1187        let (_dir, g) = open_tmp();
1188        let a = g.add_node("Person", &json!({})).unwrap();
1189        let b = g.add_node("Person", &json!({})).unwrap();
1190        let c = g.add_node("Person", &json!({})).unwrap();
1191        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1192        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1193        g.add_edge(c, a, "LIKES", &json!({})).unwrap();
1194
1195        let homogeneous = g
1196            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1197            .unwrap();
1198        assert_eq!(homogeneous, 0);
1199
1200        let mixed = g
1201            .count_triangle_cycles(&TriangleCountSpec {
1202                rel_types: [Some("KNOWS"), Some("KNOWS"), Some("LIKES")],
1203                labels: [Some("Person"); 3],
1204            })
1205            .unwrap();
1206        assert_eq!(mixed, 1);
1207    }
1208
1209    /// Untyped hops match any relationship type.
1210    #[test]
1211    fn untyped_hops_match_any_type() {
1212        let (_dir, g) = open_tmp();
1213        let a = g.add_node("Person", &json!({})).unwrap();
1214        let b = g.add_node("Person", &json!({})).unwrap();
1215        let c = g.add_node("Person", &json!({})).unwrap();
1216        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1217        g.add_edge(b, c, "LIKES", &json!({})).unwrap();
1218        g.add_edge(c, a, "FOLLOWS", &json!({})).unwrap();
1219
1220        let n = g
1221            .count_triangle_cycles(&TriangleCountSpec {
1222                rel_types: [None; 3],
1223                labels: [Some("Person"); 3],
1224            })
1225            .unwrap();
1226        assert_eq!(n, 3);
1227    }
1228
1229    /// A node missing the required label excludes every assignment that
1230    /// binds it; a multi-label node still qualifies.
1231    #[test]
1232    fn label_filter_applies_per_variable() {
1233        let (_dir, g) = open_tmp();
1234        let a = g.add_node("Person", &json!({})).unwrap();
1235        let b = g.add_node("Person", &json!({})).unwrap();
1236        let c = g.add_node("Robot", &json!({})).unwrap();
1237        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1238        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1239        g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1240
1241        let strict = g
1242            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1243            .unwrap();
1244        assert_eq!(strict, 0);
1245
1246        // With the label added, the node carries both labels and qualifies.
1247        g.add_label(c, "Person").unwrap();
1248        let after = g
1249            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1250            .unwrap();
1251        assert_eq!(after, 3);
1252
1253        let unlabeled = g
1254            .count_triangle_cycles(&TriangleCountSpec {
1255                rel_types: [Some("KNOWS"); 3],
1256                labels: [None; 3],
1257            })
1258            .unwrap();
1259        assert_eq!(unlabeled, 3);
1260    }
1261
1262    /// Relationship uniqueness: with `a == b == c` every hop is a self-loop,
1263    /// so matches are ordered triples of pairwise-distinct self-loop edges.
1264    /// Three self-loops give 3! = 6; two give none.
1265    #[test]
1266    fn self_loop_assignments_respect_relationship_uniqueness() {
1267        let (_dir, g) = open_tmp();
1268        let a = g.add_node("Person", &json!({})).unwrap();
1269        g.add_edge(a, a, "KNOWS", &json!({})).unwrap();
1270        g.add_edge(a, a, "KNOWS", &json!({})).unwrap();
1271
1272        let two = g
1273            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1274            .unwrap();
1275        assert_eq!(two, 0);
1276
1277        g.add_edge(a, a, "KNOWS", &json!({})).unwrap();
1278        let three = g
1279            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1280            .unwrap();
1281        assert_eq!(three, 6);
1282    }
1283
1284    /// A self-loop combined with a 2-cycle yields one assignment per choice
1285    /// of the variable bound to the looped node: a=b, b=c, or c=a.
1286    #[test]
1287    fn self_loop_with_two_cycle_counts_each_position() {
1288        let (_dir, g) = open_tmp();
1289        let x = g.add_node("Person", &json!({})).unwrap();
1290        let y = g.add_node("Person", &json!({})).unwrap();
1291        g.add_edge(x, x, "KNOWS", &json!({})).unwrap();
1292        g.add_edge(x, y, "KNOWS", &json!({})).unwrap();
1293        g.add_edge(y, x, "KNOWS", &json!({})).unwrap();
1294
1295        let n = g
1296            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1297            .unwrap();
1298        assert_eq!(n, 3);
1299    }
1300
1301    /// Unknown relationship types and labels match nothing instead of
1302    /// erroring: the query layer maps absent registry entries to empty scans.
1303    #[test]
1304    fn unknown_type_or_label_counts_zero() {
1305        let (_dir, g) = open_tmp();
1306        let a = g.add_node("Person", &json!({})).unwrap();
1307        let b = g.add_node("Person", &json!({})).unwrap();
1308        let c = g.add_node("Person", &json!({})).unwrap();
1309        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1310        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1311        g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1312
1313        assert_eq!(
1314            g.count_triangle_cycles(&spec_all("NOPE", "Person"))
1315                .unwrap(),
1316            0
1317        );
1318        assert_eq!(
1319            g.count_triangle_cycles(&spec_all("KNOWS", "Ghost"))
1320                .unwrap(),
1321            0
1322        );
1323    }
1324
1325    /// The count must reflect committed writes without an explicit
1326    /// `rebuild_csr`: the freshness gate covers this consumer.
1327    #[test]
1328    fn count_is_fresh_after_writes() {
1329        let (_dir, g) = open_tmp();
1330        let a = g.add_node("Person", &json!({})).unwrap();
1331        let b = g.add_node("Person", &json!({})).unwrap();
1332        let c = g.add_node("Person", &json!({})).unwrap();
1333        g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1334        g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1335
1336        let before = g
1337            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1338            .unwrap();
1339        assert_eq!(before, 0);
1340
1341        g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1342        let after = g
1343            .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1344            .unwrap();
1345        assert_eq!(after, 3);
1346    }
1347
1348    /// An empty graph counts zero without erroring on unmaterialized state.
1349    #[test]
1350    fn empty_graph_counts_zero() {
1351        let (_dir, g) = open_tmp();
1352        assert_eq!(
1353            g.count_triangle_cycles(&spec_all("KNOWS", "Person"))
1354                .unwrap(),
1355            0
1356        );
1357    }
1358}