1use super::*;
2
3impl Graph {
4 pub fn dfs(&self, start: NodeId, hops: u8) -> Result<Vec<NodeId>, Error> {
10 self.ensure_csr_fresh()?;
11 let guard = self.matrices.read();
12 let m = guard
13 .as_ref()
14 .ok_or(Error::Corrupt("matrices not initialized"))?;
15 let snap = self.csr_cache.snapshot.load();
16 self.dfs_graphblas(m, &snap, start, hops)
17 }
18
19 pub fn count_triangle_cycles(&self, spec: &TriangleCountSpec) -> Result<u64, Error> {
30 self.ensure_csr_fresh()?;
31 let snap = self.csr_cache.snapshot.load();
32 let n = snap.dense_to_id.len();
33 if n == 0 {
34 return Ok(0);
35 }
36
37 let mut type_ids: [Option<TypeId>; 3] = [None; 3];
39 {
40 let rtxn = self.storage.env.read_txn()?;
41 for (i, name) in spec.rel_types.iter().enumerate() {
42 if let Some(name) = name {
43 match get_type(&self.storage, &rtxn, name)? {
44 Some(tid) => type_ids[i] = Some(tid),
45 None => return Ok(0),
46 }
47 }
48 }
49 }
50
51 let mut masks: [Option<Vec<bool>>; 3] = [None, None, None];
55 for (i, label) in spec.labels.iter().enumerate() {
56 if let Some(name) = label {
57 let mut mask = vec![false; n];
58 for id in self.nodes_by_label(name)? {
59 if let Some(&d) = snap.id_to_dense.get(&id) {
60 mask[d as usize] = true;
61 }
62 }
63 masks[i] = Some(mask);
64 }
65 }
66 let label_ok = |mask: &Option<Vec<bool>>, d: usize| mask.as_ref().is_none_or(|m| m[d]);
67
68 let out1 = typed_out_sorted(&snap, type_ids[0]);
72 let out2_built = if type_ids[1] == type_ids[0] {
73 None
74 } else {
75 Some(typed_out_sorted(&snap, type_ids[1]))
76 };
77 let out2 = out2_built.as_ref().unwrap_or(&out1);
78 let in3 = typed_in_sorted(&snap, type_ids[2]);
79
80 let mut total: u64 = 0;
81 for a in 0..n {
82 if !label_ok(&masks[0], a) {
83 continue;
84 }
85 let in3_row = in3.row(a);
86 if in3_row.is_empty() {
87 continue;
88 }
89 let out1_row = out1.row(a);
90
91 let mut i = 0;
92 while i < out1_row.len() {
93 let b = out1_row[i].0 as usize;
94 let run1_start = i;
95 while i < out1_row.len() && out1_row[i].0 as usize == b {
96 i += 1;
97 }
98 if !label_ok(&masks[1], b) {
99 continue;
100 }
101 let m1 = (i - run1_start) as u64;
102 let out2_row = out2.row(b);
103
104 let (mut j, mut k) = (0, 0);
108 let mut pair_count: u64 = 0;
109 while j < out2_row.len() && k < in3_row.len() {
110 let c2 = out2_row[j].0;
111 let c3 = in3_row[k].0;
112 match c2.cmp(&c3) {
113 std::cmp::Ordering::Less => j += 1,
114 std::cmp::Ordering::Greater => k += 1,
115 std::cmp::Ordering::Equal => {
116 let c = c2 as usize;
117 let j0 = j;
118 while j < out2_row.len() && out2_row[j].0 as usize == c {
119 j += 1;
120 }
121 let k0 = k;
122 while k < in3_row.len() && in3_row[k].0 as usize == c {
123 k += 1;
124 }
125 if !label_ok(&masks[2], c) {
126 continue;
127 }
128 if a == b && c == a {
129 for &(_, e1) in &out1_row[run1_start..run1_start + m1 as usize] {
136 for &(_, e2) in &out2_row[j0..j] {
137 if e2 == e1 {
138 continue;
139 }
140 for &(_, e3) in &in3_row[k0..k] {
141 if e3 != e1 && e3 != e2 {
142 total += 1;
143 }
144 }
145 }
146 }
147 } else {
148 pair_count += ((j - j0) * (k - k0)) as u64;
149 }
150 }
151 }
152 }
153 total += m1 * pair_count;
154 }
155 }
156 Ok(total)
157 }
158
159 pub fn detect_cycle(&self) -> Result<bool, Error> {
161 self.ensure_csr_fresh()?;
162 let guard = self.matrices.read();
163 let m = guard
164 .as_ref()
165 .ok_or(Error::Corrupt("matrices not initialized"))?;
166 let snap = self.csr_cache.snapshot.load();
167 self.detect_cycle_graphblas(m, &snap)
168 }
169
170 pub fn all_neighbors(&self, node: NodeId) -> Result<Vec<DirectedNeighborEntry>, Error> {
172 let rtxn = self.storage.env.read_txn()?;
173 let mut neighbors = Vec::new();
174 for ne in self.out_neighbors_impl(&rtxn, node)? {
175 neighbors.push(DirectedNeighborEntry {
176 node: ne.node,
177 edge: ne.edge,
178 edge_type: ne.edge_type,
179 outgoing: true,
180 });
181 }
182 for ne in self.in_neighbors_impl(&rtxn, node)? {
183 neighbors.push(DirectedNeighborEntry {
184 node: ne.node,
185 edge: ne.edge,
186 edge_type: ne.edge_type,
187 outgoing: false,
188 });
189 }
190 Ok(neighbors)
191 }
192
193 pub fn all_paths(&self, src: NodeId, dst: NodeId) -> Result<Vec<Vec<NodeId>>, Error> {
195 self.ensure_csr_fresh()?;
196 let guard = self.matrices.read();
197 let m = guard
198 .as_ref()
199 .ok_or(Error::Corrupt("matrices not initialized"))?;
200 let snap = self.csr_cache.snapshot.load();
201 self.all_paths_graphblas(m, &snap, src, dst)
202 }
203
204 pub fn all_shortest_paths(&self, src: NodeId, dst: NodeId) -> Result<Vec<Vec<NodeId>>, Error> {
206 self.ensure_csr_fresh()?;
207 let guard = self.matrices.read();
208 let m = guard
209 .as_ref()
210 .ok_or(Error::Corrupt("matrices not initialized"))?;
211 let snap = self.csr_cache.snapshot.load();
212 self.all_shortest_paths_graphblas(m, &snap, src, dst)
213 }
214
215 pub fn longest_path(&self, src: NodeId, dst: NodeId) -> Result<Option<Vec<NodeId>>, Error> {
217 self.ensure_csr_fresh()?;
218 let guard = self.matrices.read();
219 let m = guard
220 .as_ref()
221 .ok_or(Error::Corrupt("matrices not initialized"))?;
222 let snap = self.csr_cache.snapshot.load();
223 self.longest_path_graphblas(m, &snap, src, dst)
224 }
225
226 pub fn shortest_path_dijkstra(
234 &self,
235 src: NodeId,
236 dst: NodeId,
237 ) -> Result<Option<WeightedPath>, Error> {
238 self.ensure_csr_fresh()?;
239 let guard = self.matrices.read();
240 let m = guard
241 .as_ref()
242 .ok_or(Error::Corrupt("matrices not initialized"))?;
243 let snap = self.csr_cache.snapshot.load();
244 self.shortest_path_dijkstra_graphblas(m, &snap, src, dst)
245 }
246
247 pub fn spanning_forest(
249 &self,
250 weight_property: &str,
251 maximum: bool,
252 ) -> Result<Vec<EdgeId>, Error> {
253 self.ensure_csr_fresh()?;
254 let guard = self.matrices.read();
255 let m = guard
256 .as_ref()
257 .ok_or(Error::Corrupt("matrices not initialized"))?;
258 let snap = self.csr_cache.snapshot.load();
259 self.spanning_forest_graphblas(m, &snap, weight_property, maximum)
260 }
261
262 pub fn label_propagation(&self, max_iterations: usize) -> Result<HashMap<NodeId, u64>, Error> {
264 self.ensure_csr_fresh()?;
265 let guard = self.matrices.read();
266 let m = guard
267 .as_ref()
268 .ok_or(Error::Corrupt("matrices not initialized"))?;
269 let snap = self.csr_cache.snapshot.load();
270 self.label_propagation_graphblas(m, &snap, max_iterations)
271 }
272
273 pub fn harmonic_centrality(&self) -> Result<HashMap<NodeId, f64>, Error> {
275 self.ensure_csr_fresh()?;
276 let guard = self.matrices.read();
277 let m = guard
278 .as_ref()
279 .ok_or(Error::Corrupt("matrices not initialized"))?;
280 let snap = self.csr_cache.snapshot.load();
281 self.harmonic_centrality_graphblas(m, &snap)
282 }
283
284 pub fn betweenness_centrality(&self) -> Result<HashMap<NodeId, f64>, Error> {
286 self.ensure_csr_fresh()?;
287 let guard = self.matrices.read();
288 let m = guard
289 .as_ref()
290 .ok_or(Error::Corrupt("matrices not initialized"))?;
291 let snap = self.csr_cache.snapshot.load();
292 self.betweenness_centrality_graphblas(m, &snap)
293 }
294
295 pub fn strongly_connected_components(&self) -> Result<HashMap<NodeId, u64>, Error> {
297 self.ensure_csr_fresh()?;
298 let guard = self.matrices.read();
299 let m = guard
300 .as_ref()
301 .ok_or(Error::Corrupt("matrices not initialized"))?;
302 let snap = self.csr_cache.snapshot.load();
303 self.strongly_connected_components_graphblas(m, &snap)
304 }
305
306 pub fn degree_centrality(
308 &self,
309 direction: DegreeDirection,
310 ) -> Result<HashMap<NodeId, u64>, Error> {
311 self.ensure_matrix_view()?;
312 let guard = self.matrices.read();
313 let m = guard
314 .as_ref()
315 .ok_or(Error::Corrupt("matrices not initialized"))?;
316 self.degree_centrality_graphblas(m, direction)
317 }
318
319 pub fn maximum_flow(
321 &self,
322 source: NodeId,
323 sink: NodeId,
324 capacity_property: &str,
325 ) -> Result<f64, Error> {
326 self.ensure_csr_fresh()?;
327 let guard = self.matrices.read();
328 let m = guard
329 .as_ref()
330 .ok_or(Error::Corrupt("matrices not initialized"))?;
331 let snap = self.csr_cache.snapshot.load();
332 self.maximum_flow_graphblas(m, &snap, source, sink, capacity_property)
333 }
334
335 pub fn shortest_path_top_k(
337 &self,
338 src: NodeId,
339 dst: NodeId,
340 k: usize,
341 weight_property: &str,
342 ) -> Result<Vec<WeightedPath>, Error> {
343 self.ensure_csr_fresh()?;
344 let guard = self.matrices.read();
345 let m = guard
346 .as_ref()
347 .ok_or(Error::Corrupt("matrices not initialized"))?;
348 let snap = self.csr_cache.snapshot.load();
349 let paths = self.shortest_path_top_k_graphblas(m, &snap, src, dst, k, weight_property)?;
350 Ok(paths
351 .into_iter()
352 .map(|(nodes, total_weight)| WeightedPath {
353 nodes,
354 total_weight,
355 })
356 .collect())
357 }
358
359 pub fn bfs(&self, start: NodeId, hops: u8) -> Result<Vec<NodeId>, Error> {
361 self.ensure_matrix_view()?;
362 self.bfs_graphblas(start, hops)
363 }
364
365 pub fn shortest_path(&self, src: NodeId, dst: NodeId) -> Result<Option<Vec<NodeId>>, Error> {
367 self.ensure_csr_fresh()?;
368 self.shortest_path_graphblas(src, dst)
369 }
370
371 pub fn page_rank(&self, iterations: u32, damping: f32) -> Result<HashMap<NodeId, f32>, Error> {
373 self.ensure_csr_fresh()?;
374 self.page_rank_graphblas(iterations, damping)
375 }
376
377 pub(crate) fn ensure_csr_fresh(&self) -> Result<(), Error> {
386 if self.matrices.read().is_none() || self.csr_cache.snapshot_is_stale() {
387 self.rebuild_csr()?;
388 } else {
389 self.ensure_matrix_view()?;
393 }
394 Ok(())
395 }
396
397 pub(crate) fn ensure_snapshot_fresh(&self) -> Result<(), Error> {
402 if self.csr_cache.snapshot_is_stale() {
403 let built_gen = self.csr_cache.current_gen();
404 let snap = CsrSnapshot::build(&self.storage)?;
405 self.csr_cache.install_snapshot(snap, built_gen);
406 }
407 Ok(())
408 }
409
410 pub(crate) fn ensure_matrix_view(&self) -> Result<(), Error> {
420 if self.matrices.read().is_none() || self.csr_cache.pending_force_full() {
423 return self.rebuild_csr();
424 }
425 if !self.csr_cache.has_pending() {
427 return Ok(());
428 }
429
430 let mut guard = self.matrices.write();
431 let delta = self.csr_cache.take_delta();
432 if delta.force_full {
433 drop(guard);
437 return self.rebuild_csr();
438 }
439 if delta.is_empty() {
440 return Ok(());
441 }
442
443 let mut clear_edges = Vec::new();
446 {
447 let rtxn = self.storage.env.read_txn()?;
448 for &(src, dst) in &delta.removed_edges {
449 let still_connected = self
450 .out_neighbors_impl(&rtxn, src)?
451 .into_iter()
452 .any(|ne| ne.node == dst);
453 if !still_connected {
454 clear_edges.push((src, dst));
455 }
456 }
457 }
458
459 if let Some(m) = guard.as_mut() {
460 m.apply_delta(&delta.added_nodes, &delta.added_edges, &clear_edges)?;
461 }
462 Ok(())
463 }
464
465 pub fn all_nodes(&self) -> Result<Vec<NodeId>, Error> {
467 let rtxn = self.storage.env.read_txn()?;
468 self.all_nodes_impl(&rtxn)
469 }
470
471 pub(super) fn all_nodes_impl(&self, rtxn: &heed::RoTxn) -> Result<Vec<NodeId>, Error> {
472 let mut ids = self
473 .storage
474 .nodes
475 .iter(rtxn)?
476 .map(|r| r.map(|(k, _)| k))
477 .collect::<Result<Vec<_>, _>>()?;
478 ids.sort_unstable();
479 Ok(ids)
480 }
481
482 pub fn connected_components(&self) -> Result<HashMap<NodeId, u64>, Error> {
488 self.ensure_matrix_view()?;
489 {
490 let guard = self.matrices.read();
491 if let Some(m) = guard.as_ref() {
492 if m.n_nodes > 0 {
493 return self.connected_components_graphblas(m);
494 }
495 }
496 }
497 let nodes: Vec<NodeId> = {
498 let rtxn = self.storage.env.read_txn()?;
499 self.storage
500 .nodes
501 .iter(&rtxn)?
502 .map(|r| r.map(|(k, _)| k))
503 .collect::<Result<Vec<_>, _>>()?
504 };
505
506 let mut component: HashMap<NodeId, u64> = HashMap::with_capacity(nodes.len());
507 let mut next_id: u64 = 0;
508
509 for &start in &nodes {
510 if component.contains_key(&start) {
511 continue;
512 }
513 let comp_id = next_id;
514 next_id += 1;
515 component.insert(start, comp_id);
516 let mut queue = vec![start];
517 while let Some(node) = queue.pop() {
518 for ne in self.out_neighbors(node)? {
519 if component.insert(ne.node, comp_id).is_none() {
520 queue.push(ne.node);
521 }
522 }
523 for ne in self.in_neighbors(node)? {
524 if component.insert(ne.node, comp_id).is_none() {
525 queue.push(ne.node);
526 }
527 }
528 }
529 }
530
531 Ok(component)
532 }
533
534 pub(super) fn maybe_spawn_rebuild(&self) {
542 self.maybe_spawn_rebuild_n(1);
543 }
544
545 pub(super) fn maybe_spawn_rebuild_n(&self, count: usize) {
546 if self.csr_cache.mark_dirty_n(count as u64) {
547 let cache = Arc::clone(&self.csr_cache);
548 let storage = Arc::clone(&self.storage);
549 let matrices = Arc::clone(&self.matrices);
550 let thread_count = Arc::clone(&self.n_threads);
551 std::thread::spawn(move || {
552 loop {
557 let built_gen = cache.current_gen();
561 cache.clear_delta();
564 match CsrSnapshot::build(&storage) {
565 Ok(snap) => {
566 if let Ok(m) = MatrixSet::materialize(
567 &snap,
568 thread_count.load(std::sync::atomic::Ordering::Acquire),
569 ) {
570 *matrices.write() = Some(m);
571 }
572 if !cache.install(snap, built_gen) {
573 break;
574 }
575 }
576 Err(_) => {
577 cache.cancel_rebuild();
578 break;
579 }
580 }
581 }
582 });
583 }
584 }
585
586 pub(super) fn append_adj(
588 &self,
589 wtxn: &mut heed::RwTxn,
590 node: NodeId,
591 other: NodeId,
592 edge_type: u32,
593 edge_id: EdgeId,
594 outgoing: bool,
595 ) -> Result<(), Error> {
596 let entry = AdjEntry {
597 edge_type,
598 other,
599 edge_id,
600 };
601 let db = if outgoing {
602 &self.storage.out_adj
603 } else {
604 &self.storage.in_adj
605 };
606 db.put(wtxn, &node, entry.as_bytes())?;
607 Ok(())
608 }
609
610 pub(super) fn adj_entries(
612 &self,
613 node: NodeId,
614 outgoing: bool,
615 ) -> Result<Vec<NeighborEntry>, Error> {
616 let rtxn = self.storage.env.read_txn()?;
617 self.adj_entries_impl(&rtxn, node, outgoing)
618 }
619
620 pub(super) fn adj_entries_impl(
621 &self,
622 rtxn: &heed::RoTxn,
623 node: NodeId,
624 outgoing: bool,
625 ) -> Result<Vec<NeighborEntry>, Error> {
626 let db = if outgoing {
627 &self.storage.out_adj
628 } else {
629 &self.storage.in_adj
630 };
631
632 let iter = match db.get_duplicates(rtxn, &node)? {
633 Some(iter) => iter,
634 None => return Ok(vec![]),
635 };
636
637 let mut out = Vec::new();
638 for result in iter {
639 let (_, bytes) = result?;
640 let entry = AdjEntry::read_from_bytes(bytes)
641 .ok()
642 .ok_or(Error::Corrupt("AdjEntry value is not exactly 20 bytes"))?;
643 out.push(NeighborEntry {
644 node: entry.other,
645 edge: entry.edge_id,
646 edge_type: entry.edge_type,
647 });
648 }
649 Ok(out)
650 }
651}
652
653struct TypedSortedAdj {
657 ptr: Vec<usize>,
658 adj: Vec<(u32, EdgeId)>,
659}
660
661impl TypedSortedAdj {
662 fn row(&self, d: usize) -> &[(u32, EdgeId)] {
663 &self.adj[self.ptr[d]..self.ptr[d + 1]]
664 }
665}
666
667fn typed_out_sorted(snap: &CsrSnapshot, type_id: Option<TypeId>) -> TypedSortedAdj {
670 let n = snap.dense_to_id.len();
671 let keep = |idx: usize| type_id.is_none_or(|t| snap.edge_type[idx] == t);
672
673 let mut ptr = vec![0usize; n + 1];
674 for row in 0..n {
675 let mut count = 0;
676 for idx in snap.row_ptr[row]..snap.row_ptr[row + 1] {
677 if keep(idx) {
678 count += 1;
679 }
680 }
681 ptr[row + 1] = ptr[row] + count;
682 }
683
684 let mut adj = vec![(0u32, 0u64); ptr[n]];
685 for row in 0..n {
686 let mut at = ptr[row];
687 for idx in snap.row_ptr[row]..snap.row_ptr[row + 1] {
688 if keep(idx) {
689 adj[at] = (snap.col_idx[idx], snap.edge_id[idx]);
690 at += 1;
691 }
692 }
693 adj[ptr[row]..at].sort_unstable();
694 }
695 TypedSortedAdj { ptr, adj }
696}
697
698fn typed_in_sorted(snap: &CsrSnapshot, type_id: Option<TypeId>) -> TypedSortedAdj {
701 let n = snap.dense_to_id.len();
702 let keep = |idx: usize| type_id.is_none_or(|t| snap.edge_type[idx] == t);
703
704 let mut ptr = vec![0usize; n + 1];
705 for idx in 0..snap.col_idx.len() {
706 if keep(idx) {
707 ptr[snap.col_idx[idx] as usize + 1] += 1;
708 }
709 }
710 for d in 0..n {
711 ptr[d + 1] += ptr[d];
712 }
713
714 let mut at = ptr.clone();
715 let mut adj = vec![(0u32, 0u64); ptr[n]];
716 for row in 0..n {
717 for idx in snap.row_ptr[row]..snap.row_ptr[row + 1] {
718 if keep(idx) {
719 let dst = snap.col_idx[idx] as usize;
720 adj[at[dst]] = (row as u32, snap.edge_id[idx]);
721 at[dst] += 1;
722 }
723 }
724 }
725 for d in 0..n {
726 adj[ptr[d]..ptr[d + 1]].sort_unstable();
727 }
728 TypedSortedAdj { ptr, adj }
729}
730
731#[cfg(test)]
732mod incremental_matrix_tests {
733 use issundb_graphblas::Matrix;
734 use serde_json::json;
735 use tempfile::TempDir;
736
737 use std::collections::{BTreeMap, HashMap};
738
739 use crate::Graph;
740 use crate::graph::DegreeDirection;
741 use crate::schema::NodeId;
742
743 type MatrixView = (Vec<(usize, usize)>, Vec<(usize, usize)>, Vec<NodeId>);
746
747 fn canonical_partition(cc: &HashMap<NodeId, u64>) -> BTreeMap<NodeId, NodeId> {
751 let mut groups: HashMap<u64, Vec<NodeId>> = HashMap::new();
752 for (&node, &comp) in cc {
753 groups.entry(comp).or_default().push(node);
754 }
755 let mut out = BTreeMap::new();
756 for members in groups.into_values() {
757 let rep = *members.iter().min().unwrap();
758 for n in members {
759 out.insert(n, rep);
760 }
761 }
762 out
763 }
764
765 fn matrix_coords(m: &Matrix<i32>) -> Vec<(usize, usize)> {
768 let mut out: Vec<(usize, usize)> = m
769 .triples()
770 .expect("triples")
771 .into_iter()
772 .map(|(r, c, _)| (r, c))
773 .collect();
774 out.sort_unstable();
775 out.dedup();
776 out
777 }
778
779 fn extract(graph: &Graph) -> MatrixView {
782 let guard = graph.matrices.read();
783 let m = guard.as_ref().expect("matrices materialized");
784 (
785 matrix_coords(&m.adjacency),
786 matrix_coords(&m.adjacency_t),
787 m.dense_to_id.clone(),
788 )
789 }
790
791 #[test]
797 fn incremental_matrices_match_full_rebuild() {
798 let dir = TempDir::new().unwrap();
799 let g = Graph::open(dir.path(), 1).unwrap();
800
801 let ids: Vec<NodeId> = (0..20)
803 .map(|i| g.add_node("N", &json!({ "v": i })).unwrap())
804 .collect();
805 let mut base_edges = Vec::new();
806 for i in 0..20 {
807 base_edges.push(
808 g.add_edge(ids[i], ids[(i + 1) % 20], "R", &json!({}))
809 .unwrap(),
810 );
811 }
812 g.rebuild_csr().unwrap();
814
815 g.add_edge(ids[0], ids[5], "R", &json!({})).unwrap();
818 g.add_edge(ids[3], ids[10], "R", &json!({})).unwrap();
819 let par_a = g.add_edge(ids[2], ids[4], "R", &json!({})).unwrap();
821 let _par_b = g.add_edge(ids[2], ids[4], "R", &json!({})).unwrap();
822 let n20 = g.add_node("N", &json!({ "v": 20 })).unwrap();
824 let n21 = g.add_node("N", &json!({ "v": 21 })).unwrap();
825 g.add_edge(n20, n21, "R", &json!({})).unwrap();
826 g.add_edge(ids[1], n20, "R", &json!({})).unwrap();
827 g.delete_edge(base_edges[7]).unwrap();
829 g.delete_edge(par_a).unwrap();
831
832 g.ensure_matrix_view().unwrap();
834 let incremental = extract(&g);
835
836 g.rebuild_csr().unwrap();
838 let full = extract(&g);
839
840 assert_eq!(incremental.0, full.0, "adjacency element sets differ");
841 assert_eq!(incremental.1, full.1, "adjacency_t element sets differ");
842 assert_eq!(incremental.2, full.2, "dense-index mapping differs");
843 }
844
845 #[test]
848 fn node_deletion_forces_full_rebuild_and_matches() {
849 let dir = TempDir::new().unwrap();
850 let g = Graph::open(dir.path(), 1).unwrap();
851 let ids: Vec<NodeId> = (0..10)
852 .map(|i| g.add_node("N", &json!({ "v": i })).unwrap())
853 .collect();
854 for i in 0..10 {
855 g.add_edge(ids[i], ids[(i + 1) % 10], "R", &json!({}))
856 .unwrap();
857 }
858 g.rebuild_csr().unwrap();
859
860 g.delete_node(ids[3]).unwrap();
862 g.add_edge(ids[5], ids[7], "R", &json!({})).unwrap();
863
864 g.ensure_matrix_view().unwrap();
865 let incremental = extract(&g);
866 g.rebuild_csr().unwrap();
867 let full = extract(&g);
868
869 assert_eq!(incremental.0, full.0, "adjacency element sets differ");
870 assert_eq!(incremental.1, full.1, "adjacency_t element sets differ");
871 assert_eq!(incremental.2, full.2, "dense-index mapping differs");
872 }
873
874 #[test]
878 #[ignore = "measurement: prints incremental-apply vs full-rebuild timings"]
879 fn incremental_apply_cost() {
880 use std::time::Instant;
881
882 fn measure(n_nodes: usize, out_degree: usize, k_added: usize) {
883 let dir = TempDir::new().unwrap();
884 let g = Graph::open(dir.path(), 4).unwrap();
885 let ids: Vec<NodeId> = g
888 .update(|txn| {
889 let ids: Vec<NodeId> = (0..n_nodes)
890 .map(|i| txn.add_node("N", &json!({ "v": i })).unwrap())
891 .collect();
892 for i in 0..n_nodes {
893 for k in 0..out_degree {
894 let off = 1 + k * 7;
895 txn.add_edge(ids[i], ids[(i + off) % n_nodes], "R", &json!({}))
896 .unwrap();
897 }
898 }
899 Ok(ids)
900 })
901 .unwrap();
902 g.rebuild_csr().unwrap();
903
904 for j in 0..k_added {
907 let a = (j * 31) % n_nodes;
908 let b = (j * 97 + 5) % n_nodes;
909 g.add_edge(ids[a], ids[b], "R", &json!({})).unwrap();
910 }
911 let t = Instant::now();
912 g.ensure_matrix_view().unwrap();
913 let incr = t.elapsed();
914
915 let mut best_full = std::time::Duration::from_secs(3600);
918 for _ in 0..3 {
919 let t = Instant::now();
920 g.rebuild_csr().unwrap();
921 let e = t.elapsed();
922 if e < best_full {
923 best_full = e;
924 }
925 }
926 let n_edges = n_nodes * out_degree + k_added;
927 println!(
928 "{:>7} nodes, {:>9} edges: incremental apply of {} edges = {:>8.3} ms; full rebuild = {:>8.2} ms",
929 n_nodes,
930 n_edges,
931 k_added,
932 incr.as_secs_f64() * 1e3,
933 best_full.as_secs_f64() * 1e3,
934 );
935 }
936
937 measure(10_000, 5, 1_000);
938 measure(50_000, 5, 1_000);
939 measure(100_000, 5, 1_000);
940 }
941
942 #[test]
947 fn incremental_consumers_match_full_rebuild() {
948 let dir = TempDir::new().unwrap();
949 let g = Graph::open(dir.path(), 1).unwrap();
950 let ids: Vec<NodeId> = (0..15)
951 .map(|i| g.add_node("N", &json!({ "v": i })).unwrap())
952 .collect();
953 for i in 0..15 {
954 g.add_edge(ids[i], ids[(i + 1) % 15], "R", &json!({}))
955 .unwrap();
956 }
957 g.rebuild_csr().unwrap();
958
959 g.add_edge(ids[0], ids[7], "R", &json!({})).unwrap();
961 let n15 = g.add_node("N", &json!({ "v": 15 })).unwrap();
962 g.add_edge(ids[2], n15, "R", &json!({})).unwrap();
963 g.add_edge(n15, ids[5], "R", &json!({})).unwrap();
964
965 let bfs_incr = {
967 let mut v = g.bfs(ids[0], 3).unwrap();
968 v.sort_unstable();
969 v
970 };
971 let deg_incr = g.degree_centrality(DegreeDirection::Both).unwrap();
972 let cc_incr = canonical_partition(&g.connected_components().unwrap());
973
974 g.rebuild_csr().unwrap();
976 let bfs_full = {
977 let mut v = g.bfs(ids[0], 3).unwrap();
978 v.sort_unstable();
979 v
980 };
981 let deg_full = g.degree_centrality(DegreeDirection::Both).unwrap();
982 let cc_full = canonical_partition(&g.connected_components().unwrap());
983
984 assert_eq!(bfs_incr, bfs_full, "bfs: incremental vs full rebuild");
985 assert_eq!(deg_incr, deg_full, "degree: incremental vs full rebuild");
986 assert_eq!(cc_incr, cc_full, "components: incremental vs full rebuild");
987 }
988
989 #[test]
993 fn matrix_view_consumers_reflect_writes_without_rebuild() {
994 let dir = TempDir::new().unwrap();
995 let g = Graph::open(dir.path(), 1).unwrap();
996 let a = g.add_node("N", &json!({})).unwrap();
997 let b = g.add_node("N", &json!({})).unwrap();
998 g.rebuild_csr().unwrap();
999 assert!(
1000 !g.bfs(a, 5).unwrap().contains(&b),
1001 "b is unreachable before the edge exists"
1002 );
1003
1004 g.add_edge(a, b, "R", &json!({})).unwrap();
1006 assert!(
1007 g.bfs(a, 1).unwrap().contains(&b),
1008 "b reachable from a after the edge, without a rebuild"
1009 );
1010
1011 let c = g.add_node("N", &json!({})).unwrap();
1014 g.add_edge(b, c, "R", &json!({})).unwrap();
1015 assert!(
1016 g.bfs(a, 2).unwrap().contains(&c),
1017 "new node c reachable two hops from a, without a rebuild"
1018 );
1019 }
1020
1021 #[test]
1025 fn csr_consumers_reflect_writes_without_rebuild() {
1026 let dir = TempDir::new().unwrap();
1027 let g = Graph::open(dir.path(), 1).unwrap();
1028 let a = g.add_node("N", &json!({})).unwrap();
1029 let b = g.add_node("N", &json!({})).unwrap();
1030 let c = g.add_node("N", &json!({})).unwrap();
1031 g.add_edge(a, b, "R", &json!({})).unwrap();
1032 g.rebuild_csr().unwrap();
1033 assert!(
1034 g.all_paths(a, c).unwrap().is_empty(),
1035 "no path a..c before the edge exists"
1036 );
1037
1038 g.add_edge(b, c, "R", &json!({})).unwrap();
1040 assert!(
1041 !g.all_paths(a, c).unwrap().is_empty(),
1042 "path a->b->c reflected without an explicit rebuild"
1043 );
1044 }
1045
1046 #[test]
1056 fn concurrent_bfs_after_incremental_write_is_consistent() {
1057 use std::sync::Barrier;
1058
1059 let dir = TempDir::new().unwrap();
1060 let g = Graph::open(dir.path(), 1).unwrap();
1061
1062 const N: usize = 30;
1064 let start = g.add_node("N", &json!({ "v": 0 })).unwrap();
1065 let mut prev = start;
1066 for i in 1..N {
1067 let node = g.add_node("N", &json!({ "v": i })).unwrap();
1068 g.add_edge(prev, node, "R", &json!({})).unwrap();
1069 prev = node;
1070 }
1071 g.rebuild_csr().unwrap();
1072
1073 const THREADS: usize = 6;
1074 const ROUNDS: usize = 200;
1075 let mut expected = N;
1076 for r in 0..ROUNDS {
1077 let leaf = g.add_node("N", &json!({ "leaf": r })).unwrap();
1083 g.add_edge(start, leaf, "R", &json!({})).unwrap();
1084 expected += 1;
1085
1086 let barrier = Barrier::new(THREADS);
1087 std::thread::scope(|s| {
1088 for _ in 0..THREADS {
1089 let g = &g;
1090 let barrier = &barrier;
1091 s.spawn(move || {
1092 barrier.wait();
1095 let reached = g.bfs(start, u8::MAX).unwrap();
1096 assert_eq!(
1097 reached.len(),
1098 expected,
1099 "concurrent bfs saw a partially materialized matrix"
1100 );
1101 });
1102 }
1103 });
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109mod triangle_cycle_count_tests {
1110 use serde_json::json;
1111 use tempfile::TempDir;
1112
1113 use crate::{Graph, TriangleCountSpec};
1114
1115 fn open_tmp() -> (TempDir, Graph) {
1116 let dir = TempDir::new().unwrap();
1117 let g = Graph::open(dir.path(), 1).unwrap();
1118 (dir, g)
1119 }
1120
1121 fn spec_all<'a>(rel: &'a str, label: &'a str) -> TriangleCountSpec<'a> {
1122 TriangleCountSpec {
1123 rel_types: [Some(rel); 3],
1124 labels: [Some(label); 3],
1125 }
1126 }
1127
1128 #[test]
1131 fn single_cycle_counts_one_per_rotation() {
1132 let (_dir, g) = open_tmp();
1133 let a = g.add_node("Person", &json!({})).unwrap();
1134 let b = g.add_node("Person", &json!({})).unwrap();
1135 let c = g.add_node("Person", &json!({})).unwrap();
1136 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1137 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1138 g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1139
1140 let n = g
1141 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1142 .unwrap();
1143 assert_eq!(n, 3);
1144 }
1145
1146 #[test]
1149 fn non_cyclic_orientation_does_not_count() {
1150 let (_dir, g) = open_tmp();
1151 let a = g.add_node("Person", &json!({})).unwrap();
1152 let b = g.add_node("Person", &json!({})).unwrap();
1153 let c = g.add_node("Person", &json!({})).unwrap();
1154 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1155 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1156 g.add_edge(a, c, "KNOWS", &json!({})).unwrap();
1157
1158 let n = g
1159 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1160 .unwrap();
1161 assert_eq!(n, 0);
1162 }
1163
1164 #[test]
1167 fn parallel_edges_multiply() {
1168 let (_dir, g) = open_tmp();
1169 let a = g.add_node("Person", &json!({})).unwrap();
1170 let b = g.add_node("Person", &json!({})).unwrap();
1171 let c = g.add_node("Person", &json!({})).unwrap();
1172 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1173 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1174 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1175 g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1176
1177 let n = g
1178 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1179 .unwrap();
1180 assert_eq!(n, 6);
1181 }
1182
1183 #[test]
1186 fn per_hop_types_are_positional() {
1187 let (_dir, g) = open_tmp();
1188 let a = g.add_node("Person", &json!({})).unwrap();
1189 let b = g.add_node("Person", &json!({})).unwrap();
1190 let c = g.add_node("Person", &json!({})).unwrap();
1191 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1192 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1193 g.add_edge(c, a, "LIKES", &json!({})).unwrap();
1194
1195 let homogeneous = g
1196 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1197 .unwrap();
1198 assert_eq!(homogeneous, 0);
1199
1200 let mixed = g
1201 .count_triangle_cycles(&TriangleCountSpec {
1202 rel_types: [Some("KNOWS"), Some("KNOWS"), Some("LIKES")],
1203 labels: [Some("Person"); 3],
1204 })
1205 .unwrap();
1206 assert_eq!(mixed, 1);
1207 }
1208
1209 #[test]
1211 fn untyped_hops_match_any_type() {
1212 let (_dir, g) = open_tmp();
1213 let a = g.add_node("Person", &json!({})).unwrap();
1214 let b = g.add_node("Person", &json!({})).unwrap();
1215 let c = g.add_node("Person", &json!({})).unwrap();
1216 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1217 g.add_edge(b, c, "LIKES", &json!({})).unwrap();
1218 g.add_edge(c, a, "FOLLOWS", &json!({})).unwrap();
1219
1220 let n = g
1221 .count_triangle_cycles(&TriangleCountSpec {
1222 rel_types: [None; 3],
1223 labels: [Some("Person"); 3],
1224 })
1225 .unwrap();
1226 assert_eq!(n, 3);
1227 }
1228
1229 #[test]
1232 fn label_filter_applies_per_variable() {
1233 let (_dir, g) = open_tmp();
1234 let a = g.add_node("Person", &json!({})).unwrap();
1235 let b = g.add_node("Person", &json!({})).unwrap();
1236 let c = g.add_node("Robot", &json!({})).unwrap();
1237 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1238 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1239 g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1240
1241 let strict = g
1242 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1243 .unwrap();
1244 assert_eq!(strict, 0);
1245
1246 g.add_label(c, "Person").unwrap();
1248 let after = g
1249 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1250 .unwrap();
1251 assert_eq!(after, 3);
1252
1253 let unlabeled = g
1254 .count_triangle_cycles(&TriangleCountSpec {
1255 rel_types: [Some("KNOWS"); 3],
1256 labels: [None; 3],
1257 })
1258 .unwrap();
1259 assert_eq!(unlabeled, 3);
1260 }
1261
1262 #[test]
1266 fn self_loop_assignments_respect_relationship_uniqueness() {
1267 let (_dir, g) = open_tmp();
1268 let a = g.add_node("Person", &json!({})).unwrap();
1269 g.add_edge(a, a, "KNOWS", &json!({})).unwrap();
1270 g.add_edge(a, a, "KNOWS", &json!({})).unwrap();
1271
1272 let two = g
1273 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1274 .unwrap();
1275 assert_eq!(two, 0);
1276
1277 g.add_edge(a, a, "KNOWS", &json!({})).unwrap();
1278 let three = g
1279 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1280 .unwrap();
1281 assert_eq!(three, 6);
1282 }
1283
1284 #[test]
1287 fn self_loop_with_two_cycle_counts_each_position() {
1288 let (_dir, g) = open_tmp();
1289 let x = g.add_node("Person", &json!({})).unwrap();
1290 let y = g.add_node("Person", &json!({})).unwrap();
1291 g.add_edge(x, x, "KNOWS", &json!({})).unwrap();
1292 g.add_edge(x, y, "KNOWS", &json!({})).unwrap();
1293 g.add_edge(y, x, "KNOWS", &json!({})).unwrap();
1294
1295 let n = g
1296 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1297 .unwrap();
1298 assert_eq!(n, 3);
1299 }
1300
1301 #[test]
1304 fn unknown_type_or_label_counts_zero() {
1305 let (_dir, g) = open_tmp();
1306 let a = g.add_node("Person", &json!({})).unwrap();
1307 let b = g.add_node("Person", &json!({})).unwrap();
1308 let c = g.add_node("Person", &json!({})).unwrap();
1309 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1310 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1311 g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1312
1313 assert_eq!(
1314 g.count_triangle_cycles(&spec_all("NOPE", "Person"))
1315 .unwrap(),
1316 0
1317 );
1318 assert_eq!(
1319 g.count_triangle_cycles(&spec_all("KNOWS", "Ghost"))
1320 .unwrap(),
1321 0
1322 );
1323 }
1324
1325 #[test]
1328 fn count_is_fresh_after_writes() {
1329 let (_dir, g) = open_tmp();
1330 let a = g.add_node("Person", &json!({})).unwrap();
1331 let b = g.add_node("Person", &json!({})).unwrap();
1332 let c = g.add_node("Person", &json!({})).unwrap();
1333 g.add_edge(a, b, "KNOWS", &json!({})).unwrap();
1334 g.add_edge(b, c, "KNOWS", &json!({})).unwrap();
1335
1336 let before = g
1337 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1338 .unwrap();
1339 assert_eq!(before, 0);
1340
1341 g.add_edge(c, a, "KNOWS", &json!({})).unwrap();
1342 let after = g
1343 .count_triangle_cycles(&spec_all("KNOWS", "Person"))
1344 .unwrap();
1345 assert_eq!(after, 3);
1346 }
1347
1348 #[test]
1350 fn empty_graph_counts_zero() {
1351 let (_dir, g) = open_tmp();
1352 assert_eq!(
1353 g.count_triangle_cycles(&spec_all("KNOWS", "Person"))
1354 .unwrap(),
1355 0
1356 );
1357 }
1358}