use std::collections::HashMap;
use nodedb_types::TenantId;
use crate::engine::graph::edge_store::store::{EDGES, EdgeStore, redb_err};
use crate::engine::graph::edge_store::temporal::keys::{is_sentinel, parse_versioned_edge_key};
use super::table::{
CollectionStats, GRAPH_STATS, LabelRow, SummaryRow, label_key, label_prefix, summary_key,
};
impl EdgeStore {
pub fn collection_stats(
&self,
tid: TenantId,
collection: &str,
as_of: Option<i64>,
) -> crate::Result<CollectionStats> {
match as_of {
None => self.collection_stats_live(tid, collection),
Some(ts) => self.collection_stats_historical(tid, collection, ts),
}
}
pub fn tenant_stats(
&self,
tid: TenantId,
as_of: Option<i64>,
) -> crate::Result<Vec<CollectionStats>> {
match as_of {
None => self.tenant_stats_live(tid),
Some(ts) => self.tenant_stats_historical(tid, ts),
}
}
fn collection_stats_live(
&self,
tid: TenantId,
collection: &str,
) -> crate::Result<CollectionStats> {
let t = tid.as_u64();
let skey = summary_key(collection);
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read (collection_stats)", e))?;
let stats_table = read_txn
.open_table(GRAPH_STATS)
.map_err(|e| redb_err("open graph_stats (read)", e))?;
let summary_bytes = stats_table
.get((t, skey.as_str()))
.map_err(|e| redb_err("read summary", e))?;
if let Some(bytes) = summary_bytes {
let summary = SummaryRow::decode(bytes.value())?;
drop(bytes);
let labels = read_labels_from_table(&stats_table, t, collection)?;
return Ok(CollectionStats {
collection: collection.to_string(),
edge_count: summary.edge_count,
distinct_node_count: summary.distinct_node_count,
distinct_label_count: summary.distinct_label_count,
labels,
});
}
drop(stats_table);
drop(read_txn);
let prefix = format!("{collection}\x00");
let has_edges = self.collection_prefix_has_entries(tid, &prefix)?;
if !has_edges {
return Ok(CollectionStats::zero(collection.to_string()));
}
let rebuilt = self.collection_stats_historical(tid, collection, i64::MAX)?;
self.write_stats_atomically(tid, collection, &rebuilt)?;
Ok(rebuilt)
}
fn tenant_stats_live(&self, tid: TenantId) -> crate::Result<Vec<CollectionStats>> {
let t = tid.as_u64();
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read (tenant_stats)", e))?;
let stats_table = read_txn
.open_table(GRAPH_STATS)
.map_err(|e| redb_err("open graph_stats (tenant read)", e))?;
let mut collections: Vec<String> = Vec::new();
let range = stats_table
.range((t, "")..(t + 1, ""))
.map_err(|e| redb_err("tenant_stats range", e))?;
for entry in range {
let (k, _v) = entry.map_err(|e| redb_err("tenant_stats iter", e))?;
let (_kt, row_key) = k.value();
if row_key.ends_with("\x00summary") {
let collection = row_key.trim_end_matches("\x00summary").to_string();
collections.push(collection);
}
}
drop(stats_table);
drop(read_txn);
if collections.is_empty() {
return self.tenant_stats_historical(tid, i64::MAX);
}
let mut result = Vec::with_capacity(collections.len());
for coll in collections {
let s = self.collection_stats_live(tid, &coll)?;
result.push(s);
}
result.sort_by(|a, b| a.collection.cmp(&b.collection));
Ok(result)
}
fn collection_stats_historical(
&self,
tid: TenantId,
collection: &str,
as_of: i64,
) -> crate::Result<CollectionStats> {
let prefix = format!("{collection}\x00");
let t = tid.as_u64();
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read (historical)", e))?;
let edges = read_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges (historical)", e))?;
let stats = materialise_collection_stats(collection, &edges, t, &prefix, as_of)?;
Ok(stats)
}
fn tenant_stats_historical(
&self,
tid: TenantId,
as_of: i64,
) -> crate::Result<Vec<CollectionStats>> {
let t = tid.as_u64();
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read (tenant_stats_hist)", e))?;
let edges = read_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges (tenant_stats_hist)", e))?;
let mut per_coll: HashMap<String, CollectionAccum> = HashMap::new();
let range = edges
.range((t, "")..(t + 1, ""))
.map_err(|e| redb_err("tenant_stats_hist range", e))?;
for entry in range {
let (k, v) = entry.map_err(|e| redb_err("tenant_stats_hist iter", e))?;
let (_kt, composite) = k.value();
let Some((coll, src, label, dst, sys)) = parse_versioned_edge_key(composite) else {
continue;
};
if sys > as_of {
continue;
}
let bytes = v.value();
let accum = per_coll.entry(coll.to_string()).or_default();
let base = format!("{src}\x00{label}\x00{dst}");
let entry = accum.bases.entry(base).or_default();
if sys > entry.latest_sys {
entry.latest_sys = sys;
entry.is_sentinel = is_sentinel(bytes);
entry.label = label.to_string();
entry.src = src.to_string();
entry.dst = dst.to_string();
}
}
let mut result = Vec::with_capacity(per_coll.len());
for (coll, accum) in per_coll {
let stats = accum.into_collection_stats(coll);
result.push(stats);
}
result.sort_by(|a, b| a.collection.cmp(&b.collection));
Ok(result)
}
fn collection_prefix_has_entries(&self, tid: TenantId, prefix: &str) -> crate::Result<bool> {
let t = tid.as_u64();
let read_txn = self
.db
.begin_read()
.map_err(|e| redb_err("begin_read (prefix check)", e))?;
let edges = read_txn
.open_table(EDGES)
.map_err(|e| redb_err("open edges (prefix check)", e))?;
let mut range = edges
.range((t, prefix)..)
.map_err(|e| redb_err("prefix range", e))?;
match range.next() {
None => Ok(false),
Some(Err(e)) => Err(redb_err("prefix range first", e)),
Some(Ok((k, _))) => {
let (kt, composite) = k.value();
Ok(kt == t && composite.starts_with(prefix))
}
}
}
fn write_stats_atomically(
&self,
tid: TenantId,
collection: &str,
stats: &CollectionStats,
) -> crate::Result<()> {
let t = tid.as_u64();
let write_txn = self
.db
.begin_write()
.map_err(|e| redb_err("begin_write (rebuild)", e))?;
{
let mut st = write_txn
.open_table(GRAPH_STATS)
.map_err(|e| redb_err("open graph_stats (rebuild)", e))?;
let summary = SummaryRow {
edge_count: stats.edge_count,
distinct_node_count: stats.distinct_node_count,
distinct_label_count: stats.distinct_label_count,
};
let skey = summary_key(collection);
st.insert((t, skey.as_str()), summary.encode()?.as_slice())
.map_err(|e| redb_err("insert rebuilt summary", e))?;
for (label, count) in &stats.labels {
let lkey = label_key(collection, label);
let lrow = LabelRow { count: *count };
st.insert((t, lkey.as_str()), lrow.encode()?.as_slice())
.map_err(|e| redb_err("insert rebuilt label", e))?;
}
}
write_txn
.commit()
.map_err(|e| redb_err("commit rebuild", e))?;
Ok(())
}
}
fn read_labels_from_table(
stats_table: &redb::ReadOnlyTable<(u64, &str), &[u8]>,
t: u64,
collection: &str,
) -> crate::Result<Vec<(String, u64)>> {
let lp = label_prefix(collection);
let range = stats_table
.range((t, lp.as_str())..)
.map_err(|e| redb_err("label scan range", e))?;
let mut labels = Vec::new();
for entry in range {
let (k, v) = entry.map_err(|e| redb_err("label scan iter", e))?;
let (kt, row_key) = k.value();
if kt != t || !row_key.starts_with(&lp) {
break;
}
let label_name = row_key[lp.len()..].to_string();
let lrow = LabelRow::decode(v.value())?;
labels.push((label_name, lrow.count));
}
labels.sort_by(|a, b| a.0.cmp(&b.0));
Ok(labels)
}
fn materialise_collection_stats(
collection: &str,
edges: &redb::ReadOnlyTable<(u64, &str), &[u8]>,
t: u64,
prefix: &str,
as_of: i64,
) -> crate::Result<CollectionStats> {
struct BaseEntry {
latest_sys: i64,
is_sentinel: bool,
label: String,
src: String,
dst: String,
}
let mut bases: HashMap<String, BaseEntry> = HashMap::new();
let range = edges
.range((t, prefix)..)
.map_err(|e| redb_err("materialise range", e))?;
for entry in range {
let (k, v) = entry.map_err(|e| redb_err("materialise iter", e))?;
let (kt, composite) = k.value();
if kt != t || !composite.starts_with(prefix) {
break;
}
let Some((_c, src, label, dst, sys)) = parse_versioned_edge_key(composite) else {
continue;
};
if sys > as_of {
continue;
}
let base = format!("{src}\x00{label}\x00{dst}");
let e = bases.entry(base).or_insert_with(|| BaseEntry {
latest_sys: i64::MIN,
is_sentinel: true,
label: label.to_string(),
src: src.to_string(),
dst: dst.to_string(),
});
if sys > e.latest_sys {
e.latest_sys = sys;
e.is_sentinel = is_sentinel(v.value());
e.label = label.to_string();
e.src = src.to_string();
e.dst = dst.to_string();
}
}
let mut edge_count = 0u64;
let mut label_counts: HashMap<String, u64> = HashMap::new();
let mut node_refs: HashMap<String, u32> = HashMap::new();
for entry in bases.values() {
if entry.is_sentinel {
continue;
}
edge_count += 1;
*label_counts.entry(entry.label.clone()).or_insert(0) += 1;
*node_refs.entry(entry.src.clone()).or_insert(0) += 1;
if entry.src != entry.dst {
*node_refs.entry(entry.dst.clone()).or_insert(0) += 1;
}
}
let distinct_node_count = node_refs.len() as u64;
let distinct_label_count = label_counts.len() as u64;
let mut labels: Vec<(String, u64)> = label_counts.into_iter().collect();
labels.sort_by(|a, b| a.0.cmp(&b.0));
Ok(CollectionStats {
collection: collection.to_string(),
edge_count,
distinct_node_count,
distinct_label_count,
labels,
})
}
#[derive(Default)]
struct CollectionAccum {
bases: HashMap<String, BaseVersionEntry>,
}
#[derive(Default)]
struct BaseVersionEntry {
latest_sys: i64,
is_sentinel: bool,
label: String,
src: String,
dst: String,
}
impl CollectionAccum {
fn into_collection_stats(self, collection: String) -> CollectionStats {
let mut edge_count = 0u64;
let mut label_counts: HashMap<String, u64> = HashMap::new();
let mut node_refs: HashMap<String, u32> = HashMap::new();
for entry in self.bases.values() {
if entry.is_sentinel {
continue;
}
edge_count += 1;
*label_counts.entry(entry.label.clone()).or_insert(0) += 1;
*node_refs.entry(entry.src.clone()).or_insert(0) += 1;
if entry.src != entry.dst {
*node_refs.entry(entry.dst.clone()).or_insert(0) += 1;
}
}
let distinct_node_count = node_refs.len() as u64;
let distinct_label_count = label_counts.len() as u64;
let mut labels: Vec<(String, u64)> = label_counts.into_iter().collect();
labels.sort_by(|a, b| a.0.cmp(&b.0));
CollectionStats {
collection,
edge_count,
distinct_node_count,
distinct_label_count,
labels,
}
}
}