use super::LpgStore;
use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
use std::sync::Arc;
use std::sync::atomic::Ordering;
impl LpgStore {
#[must_use]
pub fn statistics(&self) -> Arc<Statistics> {
Arc::clone(&self.statistics.read())
}
#[doc(hidden)]
pub fn ensure_statistics_fresh(&self) {
if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
self.recompute_statistics_full();
} else {
self.compute_statistics();
}
}
pub(crate) fn compute_statistics(&self) {
let mut stats = Statistics::new();
stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
let registry = self.label_registry.read();
let label_index = self.label_index.read();
for (label_id, label_name) in registry.names().iter().enumerate() {
let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
if node_count > 0 {
let avg_out_degree = if stats.total_nodes > 0 {
stats.total_edges as f64 / stats.total_nodes as f64
} else {
0.0
};
let label_stats =
LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
stats.update_label(label_name.as_ref(), label_stats);
}
}
let id_to_edge_type = self.id_to_edge_type.read();
let edge_type_counts = self.edge_type_live_counts.read();
for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
if count > 0 {
let avg_degree = if stats.total_nodes > 0 {
count as f64 / stats.total_nodes as f64
} else {
0.0
};
let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
stats.update_edge_type(type_name.as_ref(), edge_stats);
}
}
*self.statistics.write() = Arc::new(stats);
}
#[cfg(not(feature = "tiered-storage"))]
fn recompute_statistics_full(&self) {
let epoch = self.current_epoch();
let total_nodes = self
.nodes
.read()
.values()
.filter_map(|chain| chain.visible_at(epoch))
.filter(|r| !r.is_deleted())
.count();
let edges = self.edges.read();
let mut total_edges: i64 = 0;
let id_to_edge_type = self.id_to_edge_type.read();
let mut type_counts = vec![0i64; id_to_edge_type.len()];
for chain in edges.values() {
if let Some(record) = chain.visible_at(epoch)
&& !record.is_deleted()
{
total_edges += 1;
if (record.type_id as usize) < type_counts.len() {
type_counts[record.type_id as usize] += 1;
}
}
}
self.live_node_count
.store(total_nodes as i64, Ordering::Relaxed);
self.live_edge_count.store(total_edges, Ordering::Relaxed);
*self.edge_type_live_counts.write() = type_counts;
drop(edges);
drop(id_to_edge_type);
self.compute_statistics();
}
#[cfg(feature = "tiered-storage")]
fn recompute_statistics_full(&self) {
let epoch = self.current_epoch();
let versions = self.node_versions.read();
let total_nodes = versions
.iter()
.filter(|(_, index)| {
index.visible_at(epoch).map_or(false, |vref| {
self.read_node_record(&vref)
.map_or(false, |r| !r.is_deleted())
})
})
.count();
drop(versions);
let edge_versions = self.edge_versions.read();
let id_to_edge_type = self.id_to_edge_type.read();
let mut total_edges: i64 = 0;
let mut type_counts = vec![0i64; id_to_edge_type.len()];
for index in edge_versions.values() {
if let Some(vref) = index.visible_at(epoch)
&& let Some(record) = self.read_edge_record(&vref)
&& !record.is_deleted()
{
total_edges += 1;
if (record.type_id as usize) < type_counts.len() {
type_counts[record.type_id as usize] += 1;
}
}
}
self.live_node_count
.store(total_nodes as i64, Ordering::Relaxed);
self.live_edge_count.store(total_edges, Ordering::Relaxed);
*self.edge_type_live_counts.write() = type_counts;
drop(edge_versions);
drop(id_to_edge_type);
self.compute_statistics();
}
#[must_use]
pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
self.statistics.read().estimate_label_cardinality(label)
}
#[must_use]
pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
self.statistics
.read()
.estimate_avg_degree(edge_type, outgoing)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_store() -> LpgStore {
LpgStore::new().unwrap()
}
#[test]
fn compute_statistics_empty_store() {
let store = make_store();
store.compute_statistics();
let stats = store.statistics();
assert_eq!(stats.total_nodes, 0);
assert_eq!(stats.total_edges, 0);
}
#[test]
fn compute_statistics_with_nodes_and_edges() {
let store = make_store();
let a = store.create_node(&["Person"]);
let b = store.create_node(&["Person"]);
store.create_edge(a, b, "KNOWS");
store.compute_statistics();
let stats = store.statistics();
assert_eq!(stats.total_nodes, 2);
assert_eq!(stats.total_edges, 1);
}
#[test]
fn ensure_statistics_fresh_uses_incremental_path_when_not_stale() {
let store = make_store();
store.create_node(&["X"]);
store.ensure_statistics_fresh();
assert_eq!(store.statistics().total_nodes, 1);
}
#[test]
fn ensure_statistics_fresh_does_full_recompute_when_stale() {
let store = make_store();
store.create_node(&["Y"]);
store
.needs_stats_recompute
.store(true, std::sync::atomic::Ordering::Relaxed);
store.ensure_statistics_fresh();
assert_eq!(store.statistics().total_nodes, 1);
assert!(
!store
.needs_stats_recompute
.load(std::sync::atomic::Ordering::Relaxed)
);
}
#[test]
fn estimate_label_cardinality_returns_nonzero_for_known_label() {
let store = make_store();
store.create_node(&["Doc"]);
store.compute_statistics();
let card = store.estimate_label_cardinality("Doc");
assert!(card > 0.0, "cardinality should be positive, got {card}");
}
#[test]
fn estimate_label_cardinality_returns_default_for_unknown_label() {
let store = make_store();
store.compute_statistics();
let card = store.estimate_label_cardinality("NeverSeen");
assert!(card >= 0.0);
}
#[test]
fn estimate_avg_degree_for_known_edge_type() {
let store = make_store();
let a = store.create_node(&[]);
let b = store.create_node(&[]);
store.create_edge(a, b, "FOLLOWS");
store.compute_statistics();
let deg = store.estimate_avg_degree("FOLLOWS", true);
assert!(deg >= 0.0);
}
#[test]
fn compute_statistics_zero_nodes_gives_zero_degree() {
let store = make_store();
store.compute_statistics();
let stats = store.statistics();
assert_eq!(stats.total_nodes, 0);
assert_eq!(stats.total_edges, 0);
}
}