use std::collections::{HashMap, hash_map::Entry};
use crate::engine::graph::edge_store::{Direction, EdgeStore};
use super::weights::extract_weight_from_properties;
pub struct CsrIndex {
pub(super) node_to_id: HashMap<String, u32>,
pub(super) id_to_node: Vec<String>,
pub(super) label_to_id: HashMap<String, u16>,
pub(super) id_to_label: Vec<String>,
pub(super) out_offsets: Vec<u32>,
pub(super) out_targets: Vec<u32>,
pub(super) out_labels: Vec<u16>,
pub(super) out_weights: Option<Vec<f64>>,
pub(super) in_offsets: Vec<u32>,
pub(super) in_targets: Vec<u32>,
pub(super) in_labels: Vec<u16>,
pub(super) in_weights: Option<Vec<f64>>,
pub(super) buffer_out: Vec<Vec<(u16, u32)>>,
pub(super) buffer_in: Vec<Vec<(u16, u32)>>,
pub(super) buffer_out_weights: Vec<Vec<f64>>,
pub(super) buffer_in_weights: Vec<Vec<f64>>,
pub(super) deleted_edges: std::collections::HashSet<(u32, u16, u32)>,
pub(super) has_weights: bool,
pub(super) access_counts: Vec<std::cell::Cell<u32>>,
pub(super) query_epoch: u64,
}
impl Default for CsrIndex {
fn default() -> Self {
Self::new()
}
}
impl CsrIndex {
pub fn new() -> Self {
Self {
node_to_id: HashMap::new(),
id_to_node: Vec::new(),
label_to_id: HashMap::new(),
id_to_label: Vec::new(),
out_offsets: vec![0],
out_targets: Vec::new(),
out_labels: Vec::new(),
out_weights: None,
in_offsets: vec![0],
in_targets: Vec::new(),
in_labels: Vec::new(),
in_weights: None,
buffer_out: Vec::new(),
buffer_in: Vec::new(),
buffer_out_weights: Vec::new(),
buffer_in_weights: Vec::new(),
deleted_edges: std::collections::HashSet::new(),
has_weights: false,
access_counts: Vec::new(),
query_epoch: 0,
}
}
pub fn rebuild_from(store: &EdgeStore) -> crate::Result<Self> {
let mut csr = Self::new();
let all_edges = store.scan_all_edges()?;
for edge in &all_edges {
csr.ensure_node(&edge.src_id);
csr.ensure_node(&edge.dst_id);
}
for edge in &all_edges {
let weight = extract_weight_from_properties(&edge.properties);
if weight != 1.0 {
csr.add_edge_weighted(&edge.src_id, &edge.label, &edge.dst_id, weight);
} else {
csr.add_edge(&edge.src_id, &edge.label, &edge.dst_id);
}
}
csr.compact();
Ok(csr)
}
pub(super) fn ensure_node(&mut self, node: &str) -> u32 {
match self.node_to_id.entry(node.to_string()) {
Entry::Occupied(e) => *e.get(),
Entry::Vacant(e) => {
let id = self.id_to_node.len() as u32;
e.insert(id);
self.id_to_node.push(node.to_string());
self.out_offsets
.push(*self.out_offsets.last().unwrap_or(&0));
self.in_offsets.push(*self.in_offsets.last().unwrap_or(&0));
self.buffer_out.push(Vec::new());
self.buffer_in.push(Vec::new());
self.buffer_out_weights.push(Vec::new());
self.buffer_in_weights.push(Vec::new());
self.access_counts.push(std::cell::Cell::new(0));
id
}
}
}
fn ensure_label(&mut self, label: &str) -> u16 {
match self.label_to_id.entry(label.to_string()) {
Entry::Occupied(e) => *e.get(),
Entry::Vacant(e) => {
let id = self.id_to_label.len() as u16;
e.insert(id);
self.id_to_label.push(label.to_string());
id
}
}
}
pub fn add_edge(&mut self, src: &str, label: &str, dst: &str) {
self.add_edge_internal(src, label, dst, 1.0, false);
}
pub fn add_edge_weighted(&mut self, src: &str, label: &str, dst: &str, weight: f64) {
self.add_edge_internal(src, label, dst, weight, weight != 1.0);
}
fn add_edge_internal(
&mut self,
src: &str,
label: &str,
dst: &str,
weight: f64,
force_weights: bool,
) {
let src_id = self.ensure_node(src);
let dst_id = self.ensure_node(dst);
let label_id = self.ensure_label(label);
let out = &self.buffer_out[src_id as usize];
if out.iter().any(|&(l, d)| l == label_id && d == dst_id) {
return;
}
if self.dense_has_edge(src_id, label_id, dst_id, true) {
return;
}
if force_weights && !self.has_weights {
self.enable_weights();
}
self.buffer_out[src_id as usize].push((label_id, dst_id));
self.buffer_in[dst_id as usize].push((label_id, src_id));
if self.has_weights {
self.buffer_out_weights[src_id as usize].push(weight);
self.buffer_in_weights[dst_id as usize].push(weight);
}
self.deleted_edges.remove(&(src_id, label_id, dst_id));
}
pub fn remove_edge(&mut self, src: &str, label: &str, dst: &str) {
let (Some(&src_id), Some(&dst_id)) = (self.node_to_id.get(src), self.node_to_id.get(dst))
else {
return;
};
let Some(&label_id) = self.label_to_id.get(label) else {
return;
};
let out_buf = &self.buffer_out[src_id as usize];
if let Some(pos) = out_buf
.iter()
.position(|&(l, d)| l == label_id && d == dst_id)
{
self.buffer_out[src_id as usize].swap_remove(pos);
if self.has_weights {
self.buffer_out_weights[src_id as usize].swap_remove(pos);
}
}
let in_buf = &self.buffer_in[dst_id as usize];
if let Some(pos) = in_buf
.iter()
.position(|&(l, s)| l == label_id && s == src_id)
{
self.buffer_in[dst_id as usize].swap_remove(pos);
if self.has_weights {
self.buffer_in_weights[dst_id as usize].swap_remove(pos);
}
}
if self.dense_has_edge(src_id, label_id, dst_id, true) {
self.deleted_edges.insert((src_id, label_id, dst_id));
}
}
pub fn remove_node_edges(&mut self, node: &str) -> usize {
let Some(&node_id) = self.node_to_id.get(node) else {
return 0;
};
let mut removed = 0;
let out_edges: Vec<(u16, u32)> = self.iter_out_edges(node_id).collect();
for (label_id, dst_id) in &out_edges {
let in_buf = &self.buffer_in[*dst_id as usize];
if let Some(pos) = in_buf
.iter()
.position(|&(l, s)| l == *label_id && s == node_id)
{
self.buffer_in[*dst_id as usize].swap_remove(pos);
if self.has_weights {
self.buffer_in_weights[*dst_id as usize].swap_remove(pos);
}
}
self.deleted_edges.insert((node_id, *label_id, *dst_id));
removed += 1;
}
self.buffer_out[node_id as usize].clear();
if self.has_weights {
self.buffer_out_weights[node_id as usize].clear();
}
let in_edges: Vec<(u16, u32)> = self.iter_in_edges(node_id).collect();
for (label_id, src_id) in &in_edges {
let out_buf = &self.buffer_out[*src_id as usize];
if let Some(pos) = out_buf
.iter()
.position(|&(l, d)| l == *label_id && d == node_id)
{
self.buffer_out[*src_id as usize].swap_remove(pos);
if self.has_weights {
self.buffer_out_weights[*src_id as usize].swap_remove(pos);
}
}
self.deleted_edges.insert((*src_id, *label_id, node_id));
removed += 1;
}
self.buffer_in[node_id as usize].clear();
if self.has_weights {
self.buffer_in_weights[node_id as usize].clear();
}
removed
}
pub fn neighbors(
&self,
node: &str,
label_filter: Option<&str>,
direction: Direction,
) -> Vec<(String, String)> {
let Some(&node_id) = self.node_to_id.get(node) else {
return Vec::new();
};
self.record_access(node_id);
let label_id = label_filter.and_then(|l| self.label_to_id.get(l).copied());
let mut result = Vec::new();
if matches!(direction, Direction::Out | Direction::Both) {
for (lid, dst) in self.iter_out_edges(node_id) {
if label_id.is_none_or(|f| f == lid) {
result.push((
self.id_to_label[lid as usize].clone(),
self.id_to_node[dst as usize].clone(),
));
}
}
}
if matches!(direction, Direction::In | Direction::Both) {
for (lid, src) in self.iter_in_edges(node_id) {
if label_id.is_none_or(|f| f == lid) {
result.push((
self.id_to_label[lid as usize].clone(),
self.id_to_node[src as usize].clone(),
));
}
}
}
result
}
pub fn node_count(&self) -> usize {
self.id_to_node.len()
}
pub fn contains_node(&self, node: &str) -> bool {
self.node_to_id.contains_key(node)
}
pub fn node_name(&self, dense_id: u32) -> &str {
&self.id_to_node[dense_id as usize]
}
pub fn node_id(&self, name: &str) -> Option<u32> {
self.node_to_id.get(name).copied()
}
pub fn add_node(&mut self, name: &str) -> u32 {
self.ensure_node(name)
}
pub fn label_name(&self, label_id: u16) -> &str {
&self.id_to_label[label_id as usize]
}
pub fn label_id(&self, name: &str) -> Option<u16> {
self.label_to_id.get(name).copied()
}
pub fn node_to_id_map(&self) -> &HashMap<String, u32> {
&self.node_to_id
}
pub fn id_to_node_list(&self) -> &[String] {
&self.id_to_node
}
pub fn label_to_id_map(&self) -> &HashMap<String, u16> {
&self.label_to_id
}
pub fn id_to_label_list(&self) -> &[String] {
&self.id_to_label
}
pub fn out_offsets_slice(&self) -> &[u32] {
&self.out_offsets
}
pub fn out_targets_slice(&self) -> &[u32] {
&self.out_targets
}
pub fn out_labels_slice(&self) -> &[u16] {
&self.out_labels
}
pub fn out_weights_slice(&self) -> Option<&[f64]> {
self.out_weights.as_deref()
}
pub fn in_offsets_slice(&self) -> &[u32] {
&self.in_offsets
}
pub fn in_targets_slice(&self) -> &[u32] {
&self.in_targets
}
pub fn in_labels_slice(&self) -> &[u16] {
&self.in_labels
}
pub fn in_weights_slice(&self) -> Option<&[f64]> {
self.in_weights.as_deref()
}
pub(super) fn build_dense(edges: &[Vec<(u16, u32)>]) -> (Vec<u32>, Vec<u32>, Vec<u16>) {
let n = edges.len();
let total: usize = edges.iter().map(|e| e.len()).sum();
let mut offsets = Vec::with_capacity(n + 1);
let mut targets = Vec::with_capacity(total);
let mut labels = Vec::with_capacity(total);
let mut offset = 0u32;
for node_edges in edges {
offsets.push(offset);
for &(lid, target) in node_edges {
targets.push(target);
labels.push(lid);
}
offset += node_edges.len() as u32;
}
offsets.push(offset);
(offsets, targets, labels)
}
fn dense_has_edge(&self, src: u32, label: u16, dst: u32, check_out: bool) -> bool {
if check_out {
for (lid, target) in self.dense_out_edges(src) {
if lid == label && target == dst {
return true;
}
}
}
false
}
pub(super) fn dense_out_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
let idx = node as usize;
if idx + 1 >= self.out_offsets.len() {
return Vec::new().into_iter();
}
let start = self.out_offsets[idx] as usize;
let end = self.out_offsets[idx + 1] as usize;
(start..end)
.map(move |i| (self.out_labels[i], self.out_targets[i]))
.collect::<Vec<_>>()
.into_iter()
}
pub(super) fn dense_in_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
let idx = node as usize;
if idx + 1 >= self.in_offsets.len() {
return Vec::new().into_iter();
}
let start = self.in_offsets[idx] as usize;
let end = self.in_offsets[idx + 1] as usize;
(start..end)
.map(move |i| (self.in_labels[i], self.in_targets[i]))
.collect::<Vec<_>>()
.into_iter()
}
pub fn iter_out_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
let idx = node as usize;
let dense = self
.dense_out_edges(node)
.filter(move |&(lid, dst)| !self.deleted_edges.contains(&(node, lid, dst)));
let buffer = if idx < self.buffer_out.len() {
self.buffer_out[idx].to_vec()
} else {
Vec::new()
};
dense.chain(buffer)
}
pub fn out_degree(&self, node_id: u32) -> usize {
self.iter_out_edges(node_id).count()
}
pub fn in_degree(&self, node_id: u32) -> usize {
self.iter_in_edges(node_id).count()
}
pub fn edge_count(&self) -> usize {
let n = self.id_to_node.len();
(0..n).map(|i| self.out_degree(i as u32)).sum()
}
pub fn iter_in_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
let idx = node as usize;
let dense = self
.dense_in_edges(node)
.filter(move |&(lid, src)| !self.deleted_edges.contains(&(src, lid, node)));
let buffer = if idx < self.buffer_in.len() {
self.buffer_in[idx].to_vec()
} else {
Vec::new()
};
dense.chain(buffer)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_csr() -> CsrIndex {
let mut csr = CsrIndex::new();
csr.add_edge("a", "KNOWS", "b");
csr.add_edge("b", "KNOWS", "c");
csr.add_edge("c", "KNOWS", "d");
csr.add_edge("a", "WORKS", "e");
csr
}
#[test]
fn neighbors_out() {
let csr = make_csr();
let n = csr.neighbors("a", None, Direction::Out);
assert_eq!(n.len(), 2);
let dsts: Vec<&str> = n.iter().map(|(_, d)| d.as_str()).collect();
assert!(dsts.contains(&"b"));
assert!(dsts.contains(&"e"));
}
#[test]
fn neighbors_filtered() {
let csr = make_csr();
let n = csr.neighbors("a", Some("KNOWS"), Direction::Out);
assert_eq!(n.len(), 1);
assert_eq!(n[0].1, "b");
}
#[test]
fn neighbors_in() {
let csr = make_csr();
let n = csr.neighbors("b", None, Direction::In);
assert_eq!(n.len(), 1);
assert_eq!(n[0].1, "a");
}
#[test]
fn incremental_remove() {
let mut csr = make_csr();
assert_eq!(csr.neighbors("a", Some("KNOWS"), Direction::Out).len(), 1);
csr.remove_edge("a", "KNOWS", "b");
assert_eq!(csr.neighbors("a", Some("KNOWS"), Direction::Out).len(), 0);
}
#[test]
fn duplicate_add_is_idempotent() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
csr.add_edge("a", "L", "b");
assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
}
#[test]
fn rebuild_from_edge_store() {
let dir = tempfile::tempdir().unwrap();
let store = EdgeStore::open(&dir.path().join("graph.redb")).unwrap();
store.put_edge("x", "REL", "y", b"").unwrap();
store.put_edge("y", "REL", "z", b"").unwrap();
let csr = CsrIndex::rebuild_from(&store).unwrap();
assert_eq!(csr.node_count(), 3);
let mut result = csr.traverse_bfs(&["x"], Some("REL"), Direction::Out, 3, 100_000);
result.sort();
assert_eq!(result, vec!["x", "y", "z"]);
}
#[test]
fn compact_merges_buffer_into_dense() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
csr.add_edge("b", "L", "c");
assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
csr.compact();
assert!(csr.buffer_out.iter().all(|b| b.is_empty()));
assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
assert_eq!(csr.neighbors("b", None, Direction::Out).len(), 1);
}
#[test]
fn compact_handles_deletes() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
csr.add_edge("a", "L", "c");
csr.compact();
csr.remove_edge("a", "L", "b");
assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
csr.compact();
assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
assert_eq!(csr.neighbors("a", None, Direction::Out)[0].1, "c");
}
#[test]
fn label_interning_reduces_memory() {
let mut csr = CsrIndex::new();
for i in 0..100 {
csr.add_edge(&format!("n{i}"), "FOLLOWS", &format!("n{}", i + 1));
}
assert_eq!(csr.id_to_label.len(), 1);
assert_eq!(csr.id_to_label[0], "FOLLOWS");
}
#[test]
fn unweighted_graph_has_no_weight_arrays() {
let csr = make_csr();
assert!(!csr.has_weights());
assert!(csr.out_weights.is_none());
assert!(csr.in_weights.is_none());
}
#[test]
fn weighted_edge_basic() {
let mut csr = CsrIndex::new();
csr.add_edge_weighted("a", "ROAD", "b", 5.0);
csr.add_edge_weighted("b", "ROAD", "c", 3.0);
csr.add_edge("c", "ROAD", "d");
assert!(csr.has_weights());
assert_eq!(csr.edge_weight("a", "ROAD", "b"), Some(5.0));
assert_eq!(csr.edge_weight("b", "ROAD", "c"), Some(3.0));
assert_eq!(csr.edge_weight("c", "ROAD", "d"), Some(1.0));
assert_eq!(csr.edge_weight("a", "ROAD", "c"), None); }
#[test]
fn weighted_edges_survive_compaction() {
let mut csr = CsrIndex::new();
csr.add_edge_weighted("a", "R", "b", 2.5);
csr.add_edge_weighted("b", "R", "c", 7.0);
csr.add_edge("c", "R", "d");
csr.compact();
assert!(csr.has_weights());
assert_eq!(csr.edge_weight("a", "R", "b"), Some(2.5));
assert_eq!(csr.edge_weight("b", "R", "c"), Some(7.0));
assert_eq!(csr.edge_weight("c", "R", "d"), Some(1.0));
}
#[test]
fn weighted_edge_remove_keeps_weights_consistent() {
let mut csr = CsrIndex::new();
csr.add_edge_weighted("a", "R", "b", 2.0);
csr.add_edge_weighted("a", "R", "c", 3.0);
csr.add_edge_weighted("a", "R", "d", 4.0);
csr.remove_edge("a", "R", "c");
assert_eq!(csr.edge_weight("a", "R", "b"), Some(2.0));
assert_eq!(csr.edge_weight("a", "R", "c"), None);
assert_eq!(csr.edge_weight("a", "R", "d"), Some(4.0));
}
#[test]
fn iter_out_edges_weighted_returns_weights() {
let mut csr = CsrIndex::new();
csr.add_edge_weighted("a", "R", "b", 2.5);
csr.add_edge_weighted("a", "R", "c", 7.0);
csr.compact();
let edges: Vec<(u16, u32, f64)> = csr.iter_out_edges_weighted(0).collect();
assert_eq!(edges.len(), 2);
let weights: Vec<f64> = edges.iter().map(|e| e.2).collect();
assert!(weights.contains(&2.5));
assert!(weights.contains(&7.0));
}
#[test]
fn mixed_weighted_unweighted_backfill() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
csr.add_edge("b", "L", "c");
assert!(!csr.has_weights());
csr.add_edge_weighted("c", "L", "d", 5.0);
assert!(csr.has_weights());
assert_eq!(csr.edge_weight("a", "L", "b"), Some(1.0));
assert_eq!(csr.edge_weight("c", "L", "d"), Some(5.0));
}
#[test]
fn out_degree_and_in_degree() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
csr.add_edge("a", "L", "c");
csr.add_edge("d", "L", "b");
let a_id = *csr.node_to_id.get("a").unwrap();
let b_id = *csr.node_to_id.get("b").unwrap();
assert_eq!(csr.out_degree(a_id), 2);
assert_eq!(csr.in_degree(b_id), 2);
}
#[test]
fn edge_count_total() {
let csr = make_csr();
assert_eq!(csr.edge_count(), 4);
}
#[test]
fn extract_weight_from_msgpack() {
let props = rmpv::Value::Map(vec![(
rmpv::Value::String("weight".into()),
rmpv::Value::F64(0.75),
)]);
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &props).unwrap();
assert_eq!(extract_weight_from_properties(&buf), 0.75);
}
#[test]
fn extract_weight_from_empty_properties() {
assert_eq!(extract_weight_from_properties(b""), 1.0);
}
#[test]
fn extract_weight_integer() {
let props = rmpv::Value::Map(vec![(
rmpv::Value::String("weight".into()),
rmpv::Value::Integer(rmpv::Integer::from(42)),
)]);
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &props).unwrap();
assert_eq!(extract_weight_from_properties(&buf), 42.0);
}
#[test]
fn extract_weight_missing_key() {
let props = rmpv::Value::Map(vec![(
rmpv::Value::String("color".into()),
rmpv::Value::String("red".into()),
)]);
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &props).unwrap();
assert_eq!(extract_weight_from_properties(&buf), 1.0);
}
#[test]
fn rebuild_from_weighted_edges() {
const TEST_WEIGHT: f64 = 0.75;
let dir = tempfile::tempdir().unwrap();
let store = EdgeStore::open(&dir.path().join("graph.redb")).unwrap();
let props = rmpv::Value::Map(vec![(
rmpv::Value::String("weight".into()),
rmpv::Value::F64(TEST_WEIGHT),
)]);
let mut buf = Vec::new();
rmpv::encode::write_value(&mut buf, &props).unwrap();
store.put_edge("x", "R", "y", &buf).unwrap();
store.put_edge("y", "R", "z", b"").unwrap();
let csr = CsrIndex::rebuild_from(&store).unwrap();
assert!(csr.has_weights());
assert_eq!(csr.edge_weight("x", "R", "y"), Some(TEST_WEIGHT));
assert_eq!(csr.edge_weight("y", "R", "z"), Some(1.0));
}
}