use super::*;
const STALE_POINT_EXPAND_MAX: usize = 64;
impl Graph {
#[doc(hidden)]
pub fn bfs_graphblas(&self, start: NodeId, hops: u8) -> Result<Vec<NodeId>, Error> {
use issundb_graphblas::{Descriptor, Monoid, Semiring, Vector, ewise_add, mxv};
let guard = self.matrices.read();
let m = match guard.as_ref() {
Some(m) => m,
None => return self.bfs(start, hops),
};
let n = m.n_nodes;
if n == 0 {
return Ok(vec![]);
}
let start_dense = match m.id_to_dense.get(&start) {
Some(&d) => d as usize,
None => return self.bfs(start, hops),
};
let mut level = Vector::<i32>::new(m.context.clone(), n)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
level
.set(start_dense, 0)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
let opts_next = Descriptor::new(false, true, true, true);
for _ in 0..hops {
let mut next = Vector::<i32>::new(m.context.clone(), n)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
mxv(
&mut next,
Some(&level),
Semiring::MinPlus,
&m.adjacency,
&level,
opts_next,
)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
if next.nvals().map_err(|e| Error::GraphBLAS(e.to_string()))? == 0 {
break;
}
let mut merged = Vector::<i32>::new(m.context.clone(), n)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
ewise_add(
&mut merged,
None,
Monoid::Plus,
&level,
&next,
Descriptor::NULL,
)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
level = merged;
}
let dense_indices: Vec<usize> = level
.indices()
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
Ok(dense_indices
.into_iter()
.filter_map(|d| m.dense_to_id.get(d).copied())
.collect())
}
#[doc(hidden)]
pub fn bfs_multi_source_graphblas(
&self,
seeds: &[NodeId],
hops: u8,
max_nodes: Option<usize>,
) -> Result<Vec<NodeId>, Error> {
use issundb_graphblas::{Descriptor, Monoid, Semiring, Vector, ewise_add, mxv};
self.ensure_matrix_view()?;
let guard = self.matrices.read();
let m = match guard.as_ref() {
Some(m) => m,
None => return Ok(vec![]),
};
let n = m.n_nodes;
if seeds.is_empty() || n == 0 {
return Ok(vec![]);
}
let mut level = Vector::<i32>::new(m.context.clone(), n)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
let mut seeds_added: usize = 0;
for &start in seeds {
if max_nodes.is_some_and(|max| seeds_added >= max) {
break;
}
if let Some(&d) = m.id_to_dense.get(&start) {
level
.set(d as usize, 0)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
seeds_added += 1;
}
}
if seeds_added == 0 {
return Ok(vec![]);
}
let opts_next = Descriptor::new(false, true, true, true);
let mut current_hop = 0;
for _ in 0..hops {
current_hop += 1;
let mut next = Vector::<i32>::new(m.context.clone(), n)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
mxv(
&mut next,
Some(&level),
Semiring::MinPlus,
&m.adjacency,
&level,
opts_next,
)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
let next_count = next.nvals().map_err(|e| Error::GraphBLAS(e.to_string()))?;
if next_count == 0 {
break;
}
let current_count = level.nvals().map_err(|e| Error::GraphBLAS(e.to_string()))?;
if let Some(max) = max_nodes {
if current_count >= max {
break;
}
if current_count + next_count > max {
let allowed = max - current_count;
let next_indices: Vec<usize> = next
.indices()
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
for &idx in next_indices.iter().take(allowed) {
level
.set(idx, current_hop)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
}
break;
}
}
let mut merged = Vector::<i32>::new(m.context.clone(), n)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
ewise_add(
&mut merged,
None,
Monoid::Plus,
&level,
&next,
Descriptor::NULL,
)
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
level = merged;
}
let dense_indices: Vec<usize> = level
.indices()
.map_err(|e| Error::GraphBLAS(e.to_string()))?;
Ok(dense_indices
.into_iter()
.filter_map(|d| m.dense_to_id.get(d).copied())
.collect())
}
#[doc(hidden)]
pub fn expand_spmv_graphblas(
&self,
src_nodes: &[NodeId],
rel_type: Option<&str>,
is_incoming: bool,
) -> Result<Vec<(NodeId, EdgeId, NodeId)>, Error> {
let type_id = if let Some(t) = rel_type {
let rtxn = self.storage.env.read_txn()?;
match get_type(&self.storage, &rtxn, t)? {
Some(id) => Some(id),
None => return Ok(vec![]),
}
} else {
None
};
if self.csr_cache.snapshot_is_stale() && src_nodes.len() <= STALE_POINT_EXPAND_MAX {
let mut results = Vec::new();
for &src in src_nodes {
let neighbors = if is_incoming {
self.in_neighbors(src)?
} else {
self.out_neighbors(src)?
};
for ne in neighbors {
if let Some(tid) = type_id {
if ne.edge_type == tid {
results.push((src, ne.edge, ne.node));
}
} else {
results.push((src, ne.edge, ne.node));
}
}
}
return Ok(results);
}
self.ensure_snapshot_fresh()?;
let snap = self.csr_cache.snapshot.load();
let (row_ptr, col_idx, edge_type, edge_id) = if is_incoming {
(
&snap.in_row_ptr,
&snap.in_col_idx,
&snap.in_edge_type,
&snap.in_edge_id,
)
} else {
(&snap.row_ptr, &snap.col_idx, &snap.edge_type, &snap.edge_id)
};
let mut results = Vec::new();
for &src in src_nodes {
let d = match snap.id_to_dense.get(&src) {
Some(&d) => d as usize,
None => continue,
};
for k in row_ptr[d]..row_ptr[d + 1] {
if let Some(tid) = type_id {
if edge_type[k] == tid {
results.push((src, edge_id[k], snap.dense_to_id[col_idx[k] as usize]));
}
} else {
results.push((src, edge_id[k], snap.dense_to_id[col_idx[k] as usize]));
}
}
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use crate::Graph;
fn open_tmp() -> (TempDir, Graph) {
let dir = TempDir::new().unwrap();
let g = Graph::open(dir.path(), 1).unwrap();
(dir, g)
}
#[test]
fn typed_expand_sees_writes_without_matrix_refresh() {
let (_dir, g) = open_tmp();
let a = g.add_node("person", &()).unwrap();
let b = g.add_node("person", &()).unwrap();
let e = g.add_edge(a, b, "knows", &()).unwrap();
let out = g.expand_spmv_graphblas(&[a], Some("knows"), false).unwrap();
assert_eq!(out, vec![(a, e, b)]);
let incoming = g.expand_spmv_graphblas(&[b], Some("knows"), true).unwrap();
assert_eq!(incoming, vec![(b, e, a)]);
}
#[test]
fn typed_expand_unknown_type_is_empty() {
let (_dir, g) = open_tmp();
let a = g.add_node("person", &()).unwrap();
let b = g.add_node("person", &()).unwrap();
g.add_edge(a, b, "knows", &()).unwrap();
let out = g.expand_spmv_graphblas(&[a], Some("likes"), false).unwrap();
assert!(out.is_empty());
}
#[test]
fn typed_expand_reads_the_csr_when_fresh() {
let (_dir, g) = open_tmp();
let a = g.add_node("person", &()).unwrap();
let b = g.add_node("person", &()).unwrap();
let e_ab = g.add_edge(a, b, "knows", &()).unwrap();
let e_ab2 = g.add_edge(a, b, "knows", &()).unwrap();
let e_aa = g.add_edge(a, a, "knows", &()).unwrap();
g.add_edge(a, b, "likes", &()).unwrap();
g.rebuild_csr().unwrap();
let out = g.expand_spmv_graphblas(&[a], Some("knows"), false).unwrap();
assert_eq!(out, vec![(a, e_ab, b), (a, e_ab2, b), (a, e_aa, a)]);
let incoming = g.expand_spmv_graphblas(&[b], Some("knows"), true).unwrap();
assert_eq!(incoming, vec![(b, e_ab, a), (b, e_ab2, a)]);
}
#[test]
fn bulk_typed_expand_over_a_stale_snapshot_refreshes_it() {
let (_dir, g) = open_tmp();
let mut nodes = Vec::new();
for _ in 0..66 {
nodes.push(g.add_node("person", &()).unwrap());
}
let mut expected = Vec::new();
for w in nodes.windows(2) {
let e = g.add_edge(w[0], w[1], "knows", &()).unwrap();
expected.push((w[0], e, w[1]));
}
assert!(g.csr_cache.snapshot_is_stale());
let out = g
.expand_spmv_graphblas(&nodes, Some("knows"), false)
.unwrap();
assert_eq!(out, expected);
assert!(
!g.csr_cache.snapshot_is_stale(),
"a bulk typed expansion refreshes the snapshot"
);
}
#[test]
fn stale_point_expand_skips_the_snapshot_rebuild() {
let (_dir, g) = open_tmp();
let a = g.add_node("person", &()).unwrap();
let b = g.add_node("person", &()).unwrap();
let e = g.add_edge(a, b, "knows", &()).unwrap();
assert!(g.csr_cache.snapshot_is_stale());
let out = g.expand_spmv_graphblas(&[a], Some("knows"), false).unwrap();
assert_eq!(out, vec![(a, e, b)]);
assert!(g.csr_cache.snapshot_is_stale());
}
#[test]
fn hybrid_consumers_stay_fresh_after_a_snapshot_only_refresh() {
let (_dir, g) = open_tmp();
let mut nodes = Vec::new();
for _ in 0..66 {
nodes.push(g.add_node("person", &()).unwrap());
}
for w in nodes.windows(2) {
g.add_edge(w[0], w[1], "knows", &()).unwrap();
}
g.expand_spmv_graphblas(&nodes, Some("knows"), false)
.unwrap();
assert!(!g.csr_cache.snapshot_is_stale());
let reached = g.dfs(nodes[0], 1).unwrap();
assert_eq!(reached, vec![nodes[0], nodes[1]]);
}
#[test]
fn untyped_expand_preserves_parallel_edges_and_multiple_types() {
let (_dir, g) = open_tmp();
let a = g.add_node("person", &()).unwrap();
let b = g.add_node("person", &()).unwrap();
let e_ab = g.add_edge(a, b, "knows", &()).unwrap();
let e_ab2 = g.add_edge(a, b, "knows", &()).unwrap();
let e_likes = g.add_edge(a, b, "likes", &()).unwrap();
g.rebuild_csr().unwrap();
let out = g.expand_spmv_graphblas(&[a], None, false).unwrap();
assert_eq!(out, vec![(a, e_ab, b), (a, e_ab2, b), (a, e_likes, b)]);
}
}