use redb::{ReadableTable, WriteTransaction};
use crate::engine::graph::edge_store::store::{EDGES, redb_err};
use crate::engine::graph::edge_store::temporal::keys::{
edge_version_prefix, is_sentinel, parse_versioned_edge_key,
};
use super::table::{GRAPH_STATS, LabelRow, NodeRow, SummaryRow, label_key, node_key, summary_key};
pub fn increment_for_insert(
write_txn: &WriteTransaction,
tid: u64,
collection: &str,
label: &str,
src: &str,
dst: &str,
current_system_from: i64,
) -> crate::Result<()> {
if prior_live_exists(
write_txn,
tid,
collection,
src,
label,
dst,
current_system_from,
)? {
return Ok(());
}
let mut stats = write_txn
.open_table(GRAPH_STATS)
.map_err(|e| redb_err("open graph_stats (increment)", e))?;
let skey = summary_key(collection);
let mut summary = read_summary(&stats, tid, &skey)?;
summary.edge_count = summary.edge_count.saturating_add(1);
let lkey = label_key(collection, label);
let lrow = read_label_opt(&stats, tid, &lkey)?;
let new_label_count = match lrow {
None => {
summary.distinct_label_count = summary.distinct_label_count.saturating_add(1);
1u64
}
Some(r) => r.count.saturating_add(1),
};
stats
.insert(
(tid, lkey.as_str()),
LabelRow {
count: new_label_count,
}
.encode()?
.as_slice(),
)
.map_err(|e| redb_err("insert label row", e))?;
increment_node(&mut stats, tid, collection, src, &mut summary)?;
if src != dst {
increment_node(&mut stats, tid, collection, dst, &mut summary)?;
}
stats
.insert((tid, skey.as_str()), summary.encode()?.as_slice())
.map_err(|e| redb_err("insert summary row", e))?;
Ok(())
}
pub fn decrement_for_delete(
write_txn: &WriteTransaction,
tid: u64,
collection: &str,
label: &str,
src: &str,
dst: &str,
sentinel_system_from: i64,
) -> crate::Result<()> {
if !prior_live_exists(
write_txn,
tid,
collection,
src,
label,
dst,
sentinel_system_from,
)? {
return Ok(());
}
let mut stats = write_txn
.open_table(GRAPH_STATS)
.map_err(|e| redb_err("open graph_stats (decrement)", e))?;
let skey = summary_key(collection);
let mut summary = read_summary(&stats, tid, &skey)?;
summary.edge_count = summary.edge_count.saturating_sub(1);
let lkey = label_key(collection, label);
let lrow = read_label_opt(&stats, tid, &lkey)?;
if let Some(r) = lrow {
let new_count = r.count.saturating_sub(1);
if new_count == 0 {
stats
.remove((tid, lkey.as_str()))
.map_err(|e| redb_err("remove label row", e))?;
summary.distinct_label_count = summary.distinct_label_count.saturating_sub(1);
} else {
stats
.insert(
(tid, lkey.as_str()),
LabelRow { count: new_count }.encode()?.as_slice(),
)
.map_err(|e| redb_err("update label row", e))?;
}
}
decrement_node(&mut stats, tid, collection, src, &mut summary)?;
if src != dst {
decrement_node(&mut stats, tid, collection, dst, &mut summary)?;
}
stats
.insert((tid, skey.as_str()), summary.encode()?.as_slice())
.map_err(|e| redb_err("update summary row", e))?;
Ok(())
}
fn prior_live_exists(
write_txn: &WriteTransaction,
tid: u64,
collection: &str,
src: &str,
label: &str,
dst: &str,
exclude_system_from: i64,
) -> crate::Result<bool> {
let prefix = edge_version_prefix(collection, src, label, dst);
let edges = write_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges (prior_live probe)", e))?;
let range = edges
.range((tid, prefix.as_str())..)
.map_err(|e| redb_err("prior_live range", e))?;
for entry in range {
let (k, v) = entry.map_err(|e| redb_err("prior_live iter", e))?;
let (kt, composite) = k.value();
if kt != tid || !composite.starts_with(&prefix) {
break;
}
let Some((_c, _s, _l, _d, sys)) = parse_versioned_edge_key(composite) else {
continue;
};
if sys >= exclude_system_from {
continue;
}
if !is_sentinel(v.value()) {
return Ok(true);
}
return Ok(false);
}
Ok(false)
}
fn read_summary(
stats: &redb::Table<'_, (u64, &str), &[u8]>,
tid: u64,
skey: &str,
) -> crate::Result<SummaryRow> {
match stats
.get((tid, skey))
.map_err(|e| redb_err("read summary row", e))?
{
Some(g) => SummaryRow::decode(g.value()),
None => Ok(SummaryRow::zero()),
}
}
fn read_label_opt(
stats: &redb::Table<'_, (u64, &str), &[u8]>,
tid: u64,
lkey: &str,
) -> crate::Result<Option<LabelRow>> {
match stats
.get((tid, lkey))
.map_err(|e| redb_err("read label row", e))?
{
Some(g) => Ok(Some(LabelRow::decode(g.value())?)),
None => Ok(None),
}
}
fn increment_node(
stats: &mut redb::Table<'_, (u64, &str), &[u8]>,
tid: u64,
collection: &str,
node_id: &str,
summary: &mut SummaryRow,
) -> crate::Result<()> {
let nkey = node_key(collection, node_id);
let existing_bytes: Option<Vec<u8>> = {
let guard = stats
.get((tid, nkey.as_str()))
.map_err(|e| redb_err("read node row", e))?;
guard.map(|g| g.value().to_vec())
};
let new_refcount = match existing_bytes {
None => {
summary.distinct_node_count = summary.distinct_node_count.saturating_add(1);
1u32
}
Some(b) => NodeRow::decode(&b)?.refcount.saturating_add(1),
};
stats
.insert(
(tid, nkey.as_str()),
NodeRow {
refcount: new_refcount,
}
.encode()?
.as_slice(),
)
.map_err(|e| redb_err("insert node row", e))?;
Ok(())
}
fn decrement_node(
stats: &mut redb::Table<'_, (u64, &str), &[u8]>,
tid: u64,
collection: &str,
node_id: &str,
summary: &mut SummaryRow,
) -> crate::Result<()> {
let nkey = node_key(collection, node_id);
let existing_bytes: Option<Vec<u8>> = {
let guard = stats
.get((tid, nkey.as_str()))
.map_err(|e| redb_err("read node row", e))?;
guard.map(|g| g.value().to_vec())
};
if let Some(b) = existing_bytes {
let current = NodeRow::decode(&b)?.refcount;
if current <= 1 {
stats
.remove((tid, nkey.as_str()))
.map_err(|e| redb_err("remove node row", e))?;
summary.distinct_node_count = summary.distinct_node_count.saturating_sub(1);
} else {
stats
.insert(
(tid, nkey.as_str()),
NodeRow {
refcount: current - 1,
}
.encode()?
.as_slice(),
)
.map_err(|e| redb_err("update node row", e))?;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use nodedb_types::TenantId;
use crate::engine::graph::edge_store::EdgeStore;
use crate::engine::graph::edge_store::stats::table::CollectionStats;
use crate::engine::graph::edge_store::temporal::keys::versioned_edge_key;
use crate::engine::graph::edge_store::temporal::payload::EdgeValuePayload;
const T1: TenantId = TenantId::new(1);
const T2: TenantId = TenantId::new(2);
const COLL: &str = "people";
fn make_store() -> (EdgeStore, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let store = EdgeStore::open(&dir.path().join("graph.redb")).unwrap();
(store, dir)
}
fn put(store: &EdgeStore, src: &str, label: &str, dst: &str, sys: i64) {
store
.put_edge_versioned(
crate::engine::graph::edge_store::temporal::keys::EdgeRef::new(
T1, COLL, src, label, dst,
),
b"props",
sys,
0,
i64::MAX,
)
.unwrap();
}
fn delete(store: &EdgeStore, src: &str, label: &str, dst: &str, sys: i64) {
store
.soft_delete_edge(
crate::engine::graph::edge_store::temporal::keys::EdgeRef::new(
T1, COLL, src, label, dst,
),
sys,
)
.unwrap();
}
fn stats(store: &EdgeStore) -> CollectionStats {
store.collection_stats(T1, COLL, None).unwrap()
}
#[test]
fn single_put_increments_counters() {
let (store, _dir) = make_store();
put(&store, "a", "L", "b", 100);
let s = stats(&store);
assert_eq!(s.edge_count, 1);
assert_eq!(s.distinct_node_count, 2);
assert_eq!(s.distinct_label_count, 1);
assert_eq!(s.labels, vec![("L".to_string(), 1)]);
}
#[test]
fn update_same_edge_no_double_count() {
let (store, _dir) = make_store();
put(&store, "a", "L", "b", 100);
put(&store, "a", "L", "b", 200);
let s = stats(&store);
assert_eq!(s.edge_count, 1);
assert_eq!(s.distinct_node_count, 2);
assert_eq!(s.distinct_label_count, 1);
}
#[test]
fn two_edges_same_label() {
let (store, _dir) = make_store();
put(&store, "a", "L", "b", 100);
put(&store, "c", "L", "d", 110);
let s = stats(&store);
assert_eq!(s.edge_count, 2);
assert_eq!(s.distinct_node_count, 4);
assert_eq!(s.distinct_label_count, 1);
assert_eq!(s.labels, vec![("L".to_string(), 2)]);
}
#[test]
fn two_edges_same_nodes_different_labels() {
let (store, _dir) = make_store();
put(&store, "a", "L1", "b", 100);
put(&store, "a", "L2", "b", 110);
let s = stats(&store);
assert_eq!(s.edge_count, 2);
assert_eq!(s.distinct_node_count, 2);
assert_eq!(s.distinct_label_count, 2);
let mut labels = s.labels.clone();
labels.sort();
assert_eq!(labels, vec![("L1".to_string(), 1), ("L2".to_string(), 1)]);
}
#[test]
fn self_loop_counts_one_node() {
let (store, _dir) = make_store();
put(&store, "a", "L", "a", 100);
let s = stats(&store);
assert_eq!(s.edge_count, 1);
assert_eq!(s.distinct_node_count, 1);
assert_eq!(s.distinct_label_count, 1);
}
#[test]
fn put_then_delete_zeroes_counters() {
let (store, _dir) = make_store();
put(&store, "a", "L", "b", 100);
delete(&store, "a", "L", "b", 200);
let s = stats(&store);
assert_eq!(s.edge_count, 0);
assert_eq!(s.distinct_node_count, 0);
assert_eq!(s.distinct_label_count, 0);
assert!(s.labels.is_empty());
}
#[test]
fn delete_without_prior_live_is_noop() {
let (store, _dir) = make_store();
delete(&store, "x", "L", "y", 100);
let s = stats(&store);
assert_eq!(s.edge_count, 0);
assert_eq!(s.distinct_node_count, 0);
assert_eq!(s.distinct_label_count, 0);
}
#[test]
fn cold_start_rebuild() {
let (store, _dir) = make_store();
let key = versioned_edge_key(COLL, "a", "L", "b", 100).unwrap();
let payload = EdgeValuePayload::new(0, i64::MAX, b"p".to_vec())
.encode()
.unwrap();
store.put_edge_raw(T1, &key, &payload).unwrap();
let s1 = store.collection_stats(T1, COLL, None).unwrap();
assert_eq!(s1.edge_count, 1);
assert_eq!(s1.distinct_node_count, 2);
assert_eq!(s1.distinct_label_count, 1);
let s2 = store.collection_stats(T1, COLL, None).unwrap();
assert_eq!(s2, s1);
}
#[test]
fn multi_tenant_isolation() {
let (store, _dir) = make_store();
store
.put_edge_versioned(
crate::engine::graph::edge_store::temporal::keys::EdgeRef::new(
T1, COLL, "a", "L", "b",
),
b"t1",
100,
0,
i64::MAX,
)
.unwrap();
store
.put_edge_versioned(
crate::engine::graph::edge_store::temporal::keys::EdgeRef::new(
T2, COLL, "x", "M", "y",
),
b"t2",
100,
0,
i64::MAX,
)
.unwrap();
let s1 = store.collection_stats(T1, COLL, None).unwrap();
let s2 = store.collection_stats(T2, COLL, None).unwrap();
assert_eq!(s1.edge_count, 1);
assert_eq!(s1.labels, vec![("L".to_string(), 1)]);
assert_eq!(s2.edge_count, 1);
assert_eq!(s2.labels, vec![("M".to_string(), 1)]);
}
#[test]
fn tenant_stats_one_entry_per_collection() {
let (store, _dir) = make_store();
store
.put_edge_versioned(
crate::engine::graph::edge_store::temporal::keys::EdgeRef::new(
T1, "alpha", "a", "L", "b",
),
b"p",
100,
0,
i64::MAX,
)
.unwrap();
store
.put_edge_versioned(
crate::engine::graph::edge_store::temporal::keys::EdgeRef::new(
T1, "beta", "c", "L", "d",
),
b"p",
100,
0,
i64::MAX,
)
.unwrap();
let mut all = store.tenant_stats(T1, None).unwrap();
all.sort_by(|a, b| a.collection.cmp(&b.collection));
assert_eq!(all.len(), 2);
assert_eq!(all[0].collection, "alpha");
assert_eq!(all[1].collection, "beta");
assert_eq!(all[0].edge_count, 1);
assert_eq!(all[1].edge_count, 1);
}
}