use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use ahash::AHashMap;
use arc_swap::ArcSwap;
use crate::{
error::Error,
schema::{EdgeId, EdgeRecord, NodeId, TypeId},
storage::{lmdb::Storage, props},
};
pub const REBUILD_THRESHOLD: u64 = 1_000;
pub struct CsrSnapshot {
pub row_ptr: Vec<usize>,
pub col_idx: Vec<u32>,
pub edge_type: Vec<TypeId>,
pub edge_id: Vec<EdgeId>,
pub edge_weight: Vec<f64>,
pub in_row_ptr: Vec<usize>,
pub in_col_idx: Vec<u32>,
pub in_edge_type: Vec<TypeId>,
pub in_edge_id: Vec<EdgeId>,
pub dense_to_id: Vec<NodeId>,
pub id_to_dense: AHashMap<NodeId, u32>,
}
impl CsrSnapshot {
#[cfg(test)]
pub fn empty() -> Self {
Self {
row_ptr: vec![0],
col_idx: vec![],
edge_type: vec![],
edge_id: vec![],
edge_weight: vec![],
in_row_ptr: vec![0],
in_col_idx: vec![],
in_edge_type: vec![],
in_edge_id: vec![],
dense_to_id: vec![],
id_to_dense: AHashMap::new(),
}
}
pub fn build(storage: &Storage) -> Result<Self, Error> {
let rtxn = storage.env.read_txn()?;
let mut dense_to_id: Vec<NodeId> = storage
.nodes
.iter(&rtxn)?
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>, _>>()?;
dense_to_id.sort_unstable();
let n = dense_to_id.len();
let id_to_dense: AHashMap<NodeId, u32> = dense_to_id
.iter()
.enumerate()
.map(|(i, &id)| (id, i as u32))
.collect();
let mut adj: Vec<Vec<(u32, TypeId, EdgeId, f64)>> = vec![vec![]; n];
for result in storage.edges.iter(&rtxn)? {
let (edge_id, bytes) = result?;
let rec: EdgeRecord = props::decode(bytes)?;
if let (Some(&src_d), Some(&dst_d)) =
(id_to_dense.get(&rec.src), id_to_dense.get(&rec.dst))
{
let weight: f64 = {
let val: serde_json::Value =
props::decode(&rec.props).unwrap_or(serde_json::Value::Null);
val.get("weight")
.or_else(|| val.get("cost"))
.or_else(|| val.get("capacity"))
.or_else(|| val.get("cap"))
.and_then(|v| v.as_f64())
.unwrap_or(1.0)
};
adj[src_d as usize].push((dst_d, rec.edge_type, edge_id, weight));
}
}
let mut row_ptr = vec![0usize; n + 1];
for (i, neighbors) in adj.iter().enumerate() {
row_ptr[i + 1] = row_ptr[i] + neighbors.len();
}
let total = row_ptr[n];
let mut col_idx = vec![0u32; total];
let mut edge_type = vec![0u32; total];
let mut edge_id_arr = vec![0u64; total];
let mut edge_weight_arr = vec![0.0f64; total];
for (i, neighbors) in adj.iter().enumerate() {
let base = row_ptr[i];
for (j, &(dst_d, etype, eid, weight)) in neighbors.iter().enumerate() {
col_idx[base + j] = dst_d;
edge_type[base + j] = etype;
edge_id_arr[base + j] = eid;
edge_weight_arr[base + j] = weight;
}
}
let mut in_row_ptr = vec![0usize; n + 1];
for &dst_d in &col_idx {
in_row_ptr[dst_d as usize + 1] += 1;
}
for i in 0..n {
in_row_ptr[i + 1] += in_row_ptr[i];
}
let mut in_col_idx = vec![0u32; total];
let mut in_edge_type = vec![0u32; total];
let mut in_edge_id = vec![0u64; total];
let mut cursor = in_row_ptr.clone();
for src_d in 0..n {
for k in row_ptr[src_d]..row_ptr[src_d + 1] {
let slot = cursor[col_idx[k] as usize];
cursor[col_idx[k] as usize] += 1;
in_col_idx[slot] = src_d as u32;
in_edge_type[slot] = edge_type[k];
in_edge_id[slot] = edge_id_arr[k];
}
}
Ok(Self {
row_ptr,
col_idx,
edge_type,
edge_id: edge_id_arr,
edge_weight: edge_weight_arr,
in_row_ptr,
in_col_idx,
in_edge_type,
in_edge_id,
dense_to_id,
id_to_dense,
})
}
}
#[derive(Default)]
pub struct GraphDelta {
pub added_nodes: Vec<NodeId>,
pub updated_nodes: Vec<NodeId>,
pub added_edges: Vec<(NodeId, NodeId)>,
pub removed_edges: Vec<(NodeId, NodeId)>,
pub force_full: bool,
}
impl GraphDelta {
pub fn is_empty(&self) -> bool {
!self.force_full
&& self.added_nodes.is_empty()
&& self.updated_nodes.is_empty()
&& self.added_edges.is_empty()
&& self.removed_edges.is_empty()
}
}
pub struct CsrCache {
pub snapshot: ArcSwap<CsrSnapshot>,
dirty: AtomicU64,
rebuilding: AtomicBool,
claimed: AtomicU64,
pending: parking_lot::Mutex<GraphDelta>,
write_gen: AtomicU64,
snapshot_gen: AtomicU64,
}
impl CsrCache {
pub fn new(initial: CsrSnapshot) -> Self {
Self {
snapshot: ArcSwap::from_pointee(initial),
dirty: AtomicU64::new(0),
rebuilding: AtomicBool::new(false),
claimed: AtomicU64::new(0),
pending: parking_lot::Mutex::new(GraphDelta::default()),
write_gen: AtomicU64::new(0),
snapshot_gen: AtomicU64::new(0),
}
}
pub fn current_gen(&self) -> u64 {
self.write_gen.load(Ordering::Acquire)
}
pub fn snapshot_is_stale(&self) -> bool {
self.write_gen.load(Ordering::Acquire) != self.snapshot_gen.load(Ordering::Acquire)
}
pub fn record_added_node(&self, node: NodeId) {
self.pending.lock().added_nodes.push(node);
}
pub fn record_added_edge(&self, src: NodeId, dst: NodeId) {
self.pending.lock().added_edges.push((src, dst));
}
pub fn record_removed_edge(&self, src: NodeId, dst: NodeId) {
self.pending.lock().removed_edges.push((src, dst));
}
pub fn mark_force_full(&self) {
self.pending.lock().force_full = true;
}
pub fn has_pending(&self) -> bool {
!self.pending.lock().is_empty()
}
pub fn pending_force_full(&self) -> bool {
self.pending.lock().force_full
}
pub fn record_batch(&self, batch: GraphDelta) {
if batch.is_empty() {
return;
}
let mut pending = self.pending.lock();
pending.force_full |= batch.force_full;
pending.added_nodes.extend(batch.added_nodes);
pending.added_edges.extend(batch.added_edges);
pending.removed_edges.extend(batch.removed_edges);
}
pub fn take_delta(&self) -> GraphDelta {
std::mem::take(&mut *self.pending.lock())
}
pub fn clear_delta(&self) {
*self.pending.lock() = GraphDelta::default();
}
pub fn mark_dirty_n(&self, count: u64) -> bool {
self.write_gen.fetch_add(count, Ordering::AcqRel);
let prev = self.dirty.fetch_add(count, Ordering::Relaxed);
let total = prev + count;
if total >= REBUILD_THRESHOLD && !self.rebuilding.swap(true, Ordering::AcqRel) {
self.claimed.store(total, Ordering::Release);
true
} else {
false
}
}
#[must_use]
pub fn install(&self, snap: CsrSnapshot, built_gen: u64) -> bool {
self.snapshot.store(Arc::new(snap));
self.snapshot_gen.store(built_gen, Ordering::Release);
let claimed = self.claimed.swap(0, Ordering::AcqRel);
let prev = self.dirty.fetch_sub(claimed, Ordering::AcqRel);
let remaining = prev.saturating_sub(claimed);
if remaining >= REBUILD_THRESHOLD {
self.claimed.store(remaining, Ordering::Release);
true
} else {
self.rebuilding.store(false, Ordering::Release);
false
}
}
pub fn install_snapshot(&self, snap: CsrSnapshot, built_gen: u64) {
self.snapshot.store(Arc::new(snap));
self.snapshot_gen.store(built_gen, Ordering::Release);
}
pub fn install_full(&self, snap: CsrSnapshot, built_gen: u64) {
self.snapshot.store(Arc::new(snap));
self.snapshot_gen.store(built_gen, Ordering::Release);
self.dirty.store(0, Ordering::Release);
self.claimed.store(0, Ordering::Release);
self.rebuilding.store(false, Ordering::Release);
}
pub fn cancel_rebuild(&self) {
self.claimed.store(0, Ordering::Release);
self.rebuilding.store(false, Ordering::Release);
}
}
#[cfg(test)]
mod snapshot_tests {
use tempfile::TempDir;
use super::*;
use crate::Graph;
#[test]
fn build_transposes_incoming_adjacency() {
let dir = TempDir::new().unwrap();
let g = Graph::open(dir.path(), 1).unwrap();
let a = g.add_node("n", &()).unwrap();
let b = g.add_node("n", &()).unwrap();
let c = g.add_node("n", &()).unwrap();
let e_ab = g.add_edge(a, b, "t", &()).unwrap();
let e_cb = g.add_edge(c, b, "u", &()).unwrap();
let e_ab2 = g.add_edge(a, b, "t", &()).unwrap();
let e_aa = g.add_edge(a, a, "t", &()).unwrap();
let snap = CsrSnapshot::build(&g.storage).unwrap();
let da = snap.id_to_dense[&a] as usize;
let db = snap.id_to_dense[&b] as usize;
let dc = snap.id_to_dense[&c] as usize;
assert_eq!(snap.in_row_ptr.len(), snap.dense_to_id.len() + 1);
assert_eq!(snap.in_col_idx.len(), snap.col_idx.len());
assert_eq!(snap.in_edge_id.len(), snap.col_idx.len());
assert_eq!(snap.in_edge_type.len(), snap.col_idx.len());
let in_row = |d: usize| -> Vec<(u32, EdgeId)> {
(snap.in_row_ptr[d]..snap.in_row_ptr[d + 1])
.map(|k| (snap.in_col_idx[k], snap.in_edge_id[k]))
.collect()
};
assert_eq!(in_row(da), vec![(da as u32, e_aa)]);
assert_eq!(
in_row(db),
vec![(da as u32, e_ab), (da as u32, e_ab2), (dc as u32, e_cb)]
);
assert_eq!(in_row(dc), vec![]);
let out_type: AHashMap<EdgeId, TypeId> = snap
.edge_id
.iter()
.zip(snap.edge_type.iter())
.map(|(&e, &t)| (e, t))
.collect();
for k in 0..snap.in_edge_id.len() {
assert_eq!(snap.in_edge_type[k], out_type[&snap.in_edge_id[k]]);
}
}
}
#[cfg(test)]
mod cache_tests {
use super::*;
#[test]
fn install_retains_writes_during_rebuild() {
let cache = CsrCache::new(CsrSnapshot::empty());
assert!(
cache.mark_dirty_n(REBUILD_THRESHOLD),
"crossing claims a rebuild"
);
assert!(!cache.mark_dirty_n(5));
assert!(!cache.install(CsrSnapshot::empty(), 0));
assert!(cache.mark_dirty_n(REBUILD_THRESHOLD - 5));
}
#[test]
fn install_requests_followup_when_still_dirty() {
let cache = CsrCache::new(CsrSnapshot::empty());
assert!(cache.mark_dirty_n(REBUILD_THRESHOLD));
assert!(!cache.mark_dirty_n(REBUILD_THRESHOLD));
assert!(
cache.install(CsrSnapshot::empty(), 0),
"still dirty: rebuild again"
);
assert!(!cache.install(CsrSnapshot::empty(), 0), "now caught up");
}
#[test]
fn install_snapshot_leaves_the_matrix_delta() {
let cache = CsrCache::new(CsrSnapshot::empty());
assert!(!cache.mark_dirty_n(1));
cache.record_added_edge(1, 2);
assert!(cache.snapshot_is_stale());
cache.install_snapshot(CsrSnapshot::empty(), cache.current_gen());
assert!(!cache.snapshot_is_stale());
assert!(
cache.has_pending(),
"the delta is drained by ensure_matrix_view, not by a snapshot install"
);
}
#[test]
fn install_full_clears_dirty_and_claim() {
let cache = CsrCache::new(CsrSnapshot::empty());
assert!(cache.mark_dirty_n(REBUILD_THRESHOLD));
cache.install_full(CsrSnapshot::empty(), 0);
assert!(
!cache.mark_dirty_n(1),
"counter was reset by the full rebuild"
);
}
}