use std::collections::HashMap;
use crate::engine::graph::csr::CsrIndex;
use crate::engine::graph::edge_store::EdgeStore;
use nodedb_graph::LocalNodeId;
use nodedb_graph::csr::weights::extract_weight_from_properties;
use nodedb_types::TenantId;
pub struct CsrSnapshot {
node_to_id: HashMap<String, u32>,
id_to_node: Vec<String>,
label_to_id: HashMap<String, u32>,
id_to_label: Vec<String>,
out_offsets: Vec<u32>,
out_targets: Vec<u32>,
out_labels: Vec<u32>,
out_weights: Option<Vec<f64>>,
in_offsets: Vec<u32>,
in_targets: Vec<u32>,
in_labels: Vec<u32>,
in_weights: Option<Vec<f64>>,
has_weights: bool,
partition_tag: u32,
}
impl CsrSnapshot {
pub fn from_csr(csr: &mut CsrIndex) -> Self {
csr.compact().expect("no governor, cannot fail");
Self::snapshot_dense(csr)
}
pub fn from_csr_no_compact(csr: &CsrIndex) -> Self {
Self::snapshot_dense(csr)
}
pub fn from_edge_store_as_of(
edge_store: &EdgeStore,
tid: TenantId,
system_as_of: Option<i64>,
) -> crate::Result<Self> {
let mut csr = CsrIndex::new();
let records = edge_store.scan_all_edges_decoded(system_as_of)?;
for (rec_tid, _coll, src, _label, dst, _props) in &records {
if *rec_tid != tid {
continue;
}
csr.add_node(src).map_err(|e| crate::Error::Internal {
detail: format!("CsrSnapshot::from_edge_store_as_of: {e}"),
})?;
csr.add_node(dst).map_err(|e| crate::Error::Internal {
detail: format!("CsrSnapshot::from_edge_store_as_of: {e}"),
})?;
}
for (rec_tid, _coll, src, label, dst, props) in &records {
if *rec_tid != tid {
continue;
}
let weight = extract_weight_from_properties(props);
let res = if weight != 1.0 {
csr.add_edge_weighted(src, label, dst, weight)
} else {
csr.add_edge(src, label, dst)
};
res.map_err(|e| crate::Error::Internal {
detail: format!("CsrSnapshot::from_edge_store_as_of: {e}"),
})?;
}
csr.compact().expect("no governor, cannot fail");
Ok(Self::snapshot_dense(&csr))
}
fn snapshot_dense(csr: &CsrIndex) -> Self {
Self {
node_to_id: csr.node_to_id_map().clone(),
id_to_node: csr.id_to_node_list().to_vec(),
label_to_id: csr.label_to_id_map().clone(),
id_to_label: csr.id_to_label_list().to_vec(),
out_offsets: csr.out_offsets_slice().to_vec(),
out_targets: csr.out_targets_slice().to_vec(),
out_labels: csr.out_labels_slice().to_vec(),
out_weights: csr.out_weights_slice().map(|w| w.to_vec()),
in_offsets: csr.in_offsets_slice().to_vec(),
in_targets: csr.in_targets_slice().to_vec(),
in_labels: csr.in_labels_slice().to_vec(),
in_weights: csr.in_weights_slice().map(|w| w.to_vec()),
has_weights: csr.has_weights(),
partition_tag: csr.partition_tag(),
}
}
pub fn node_count(&self) -> usize {
self.id_to_node.len()
}
pub fn edge_count(&self) -> usize {
self.out_targets.len()
}
pub fn has_weights(&self) -> bool {
self.has_weights
}
pub fn label_name(&self, label_id: u32) -> &str {
&self.id_to_label[label_id as usize]
}
pub fn label_id(&self, name: &str) -> Option<u32> {
self.label_to_id.get(name).copied()
}
pub fn partition_tag(&self) -> u32 {
self.partition_tag
}
pub fn local(&self, raw: u32) -> LocalNodeId {
LocalNodeId::new(raw, self.partition_tag)
}
pub fn node_name(&self, id: LocalNodeId) -> &str {
&self.id_to_node[id.raw(self.partition_tag) as usize]
}
pub fn node_id(&self, name: &str) -> Option<LocalNodeId> {
self.node_to_id
.get(name)
.copied()
.map(|raw| LocalNodeId::new(raw, self.partition_tag))
}
pub fn out_degree(&self, node: LocalNodeId) -> usize {
self.out_degree_raw(node.raw(self.partition_tag))
}
pub fn in_degree(&self, node: LocalNodeId) -> usize {
self.in_degree_raw(node.raw(self.partition_tag))
}
pub fn iter_out_edges(
&self,
node: LocalNodeId,
) -> impl Iterator<Item = (u32, LocalNodeId)> + '_ {
let tag = self.partition_tag;
self.iter_out_edges_raw(node.raw(tag))
.map(move |(lid, dst)| (lid, LocalNodeId::new(dst, tag)))
}
pub fn iter_in_edges(
&self,
node: LocalNodeId,
) -> impl Iterator<Item = (u32, LocalNodeId)> + '_ {
let tag = self.partition_tag;
self.iter_in_edges_raw(node.raw(tag))
.map(move |(lid, src)| (lid, LocalNodeId::new(src, tag)))
}
pub fn iter_out_edges_weighted(
&self,
node: LocalNodeId,
) -> impl Iterator<Item = (u32, LocalNodeId, f64)> + '_ {
let tag = self.partition_tag;
self.iter_out_edges_weighted_raw(node.raw(tag))
.map(move |(lid, dst, w)| (lid, LocalNodeId::new(dst, tag), w))
}
pub fn node_name_raw(&self, dense_id: u32) -> &str {
&self.id_to_node[dense_id as usize]
}
pub fn node_id_raw(&self, name: &str) -> Option<u32> {
self.node_to_id.get(name).copied()
}
pub fn out_degree_raw(&self, node: u32) -> usize {
let idx = node as usize;
if idx + 1 >= self.out_offsets.len() {
return 0;
}
(self.out_offsets[idx + 1] - self.out_offsets[idx]) as usize
}
pub fn in_degree_raw(&self, node: u32) -> usize {
let idx = node as usize;
if idx + 1 >= self.in_offsets.len() {
return 0;
}
(self.in_offsets[idx + 1] - self.in_offsets[idx]) as usize
}
pub fn iter_out_edges_raw(&self, node: u32) -> impl Iterator<Item = (u32, u32)> + '_ {
let idx = node as usize;
let (start, end) = if idx + 1 < self.out_offsets.len() {
(
self.out_offsets[idx] as usize,
self.out_offsets[idx + 1] as usize,
)
} else {
(0, 0)
};
(start..end).map(move |i| (self.out_labels[i], self.out_targets[i]))
}
pub fn iter_in_edges_raw(&self, node: u32) -> impl Iterator<Item = (u32, u32)> + '_ {
let idx = node as usize;
let (start, end) = if idx + 1 < self.in_offsets.len() {
(
self.in_offsets[idx] as usize,
self.in_offsets[idx + 1] as usize,
)
} else {
(0, 0)
};
(start..end).map(move |i| (self.in_labels[i], self.in_targets[i]))
}
pub fn iter_out_edges_weighted_raw(
&self,
node: u32,
) -> impl Iterator<Item = (u32, u32, f64)> + '_ {
let idx = node as usize;
let (start, end) = if idx + 1 < self.out_offsets.len() {
(
self.out_offsets[idx] as usize,
self.out_offsets[idx + 1] as usize,
)
} else {
(0, 0)
};
(start..end).map(move |i| {
let w = self
.out_weights
.as_ref()
.and_then(|ws| ws.get(i).copied())
.unwrap_or(1.0);
(self.out_labels[i], self.out_targets[i], w)
})
}
pub fn estimated_memory_bytes(&self) -> usize {
let offsets = (self.out_offsets.len() + self.in_offsets.len()) * 4;
let targets = (self.out_targets.len() + self.in_targets.len()) * 4;
let labels = (self.out_labels.len() + self.in_labels.len()) * 2;
let weights = self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
+ self.in_weights.as_ref().map_or(0, |w| w.len() * 8);
let interning = self.id_to_node.iter().map(|s| s.len() + 24).sum::<usize>()
+ self.id_to_label.iter().map(|s| s.len() + 24).sum::<usize>();
offsets + targets + labels + weights + interning
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_csr() -> CsrIndex {
let mut csr = CsrIndex::new();
csr.add_edge("a", "KNOWS", "b").unwrap();
csr.add_edge("b", "KNOWS", "c").unwrap();
csr.add_edge("a", "LIKES", "c").unwrap();
csr
}
#[test]
fn snapshot_captures_all_edges() {
let mut csr = make_csr();
let snap = CsrSnapshot::from_csr(&mut csr);
assert_eq!(snap.node_count(), 3);
assert_eq!(snap.edge_count(), 3);
}
#[test]
fn snapshot_immutable_while_csr_mutates() {
let mut csr = make_csr();
let snap = CsrSnapshot::from_csr(&mut csr);
csr.add_edge("c", "KNOWS", "d").unwrap();
assert_eq!(snap.edge_count(), 3);
assert_eq!(snap.node_count(), 3);
}
#[test]
fn snapshot_node_lookup() {
let mut csr = make_csr();
let snap = CsrSnapshot::from_csr(&mut csr);
assert_eq!(snap.node_id_raw("a"), Some(0));
assert_eq!(snap.node_name_raw(0), "a");
assert_eq!(snap.node_id_raw("nonexistent"), None);
}
#[test]
fn snapshot_edge_iteration() {
let mut csr = make_csr();
let snap = CsrSnapshot::from_csr(&mut csr);
let a_id = snap.node_id_raw("a").unwrap();
let out_edges: Vec<(u32, u32)> = snap.iter_out_edges_raw(a_id).collect();
assert_eq!(out_edges.len(), 2); }
#[test]
fn snapshot_degree() {
let mut csr = make_csr();
let snap = CsrSnapshot::from_csr(&mut csr);
let a_id = snap.node_id_raw("a").unwrap();
assert_eq!(snap.out_degree_raw(a_id), 2);
assert_eq!(snap.in_degree_raw(a_id), 0);
let b_id = snap.node_id_raw("b").unwrap();
assert_eq!(snap.out_degree_raw(b_id), 1);
assert_eq!(snap.in_degree_raw(b_id), 1);
}
#[test]
fn snapshot_weighted() {
let mut csr = CsrIndex::new();
csr.add_edge_weighted("a", "R", "b", 2.5).unwrap();
csr.add_edge_weighted("b", "R", "c", 7.0).unwrap();
let snap = CsrSnapshot::from_csr(&mut csr);
assert!(snap.has_weights());
let edges: Vec<(u32, u32, f64)> = snap.iter_out_edges_weighted_raw(0).collect();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].2, 2.5);
}
#[test]
fn snapshot_memory_estimate() {
let mut csr = make_csr();
let snap = CsrSnapshot::from_csr(&mut csr);
assert!(snap.estimated_memory_bytes() > 0);
}
#[test]
fn no_compact_snapshot() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b").unwrap();
let snap = CsrSnapshot::from_csr_no_compact(&csr);
assert_eq!(snap.edge_count(), 0);
}
}