use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use futures::stream;
use nodedb_types::DatabaseId;
use nodedb_types::diagnostic::DiagnosticLayer;
use pgwire::api::Type;
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response};
use pgwire::error::PgWireResult;
use tracing::info_span;
static GRAPH_STATS_CALLS: AtomicU64 = AtomicU64::new(0);
#[allow(dead_code)]
pub fn graph_stats_calls_total() -> u64 {
GRAPH_STATS_CALLS.load(Ordering::Relaxed)
}
use crate::bridge::envelope::PhysicalPlan;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::server::broadcast::broadcast_to_all_cores;
use crate::control::server::pgwire::types::sqlstate_error;
use crate::control::state::SharedState;
use crate::engine::graph::edge_store::stats::CollectionStats;
use crate::types::TraceId;
use nodedb_physical::physical_plan::GraphOp;
pub async fn show_graph_stats(
state: &SharedState,
identity: &AuthenticatedIdentity,
collection: Option<String>,
verbose: bool,
as_of: Option<i64>,
) -> PgWireResult<Vec<Response>> {
GRAPH_STATS_CALLS.fetch_add(1, Ordering::Relaxed);
let scope = if collection.is_some() {
"collection"
} else {
"tenant"
};
let _span = info_span!(
"graph.stats",
layer = DiagnosticLayer::WritePath.as_str(),
tenant_id = identity.tenant_id.as_u64(),
scope = scope,
collection = ?collection,
verbose,
as_of = ?as_of,
);
if let Some(ref name) = collection {
let catalog = match state.credentials.catalog() {
Some(c) => c,
None => return Err(sqlstate_error("XX000", "catalog not available")),
};
match catalog.get_collection(DatabaseId::DEFAULT, identity.tenant_id.as_u64(), name) {
Ok(Some(c)) if c.is_active => {}
Ok(Some(_)) => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' is deactivated"),
));
}
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' not found"),
));
}
}
}
let plan = PhysicalPlan::Graph(GraphOp::Stats {
collection: collection.clone(),
as_of,
});
let resp = broadcast_to_all_cores(state, identity.tenant_id, plan, TraceId::ZERO)
.await
.map_err(|e| sqlstate_error("58000", &format!("graph stats dispatch failed: {e}")))?;
let merged: Vec<CollectionStats> = decode_merged_stats(resp.payload.as_bytes())
.map_err(|e| sqlstate_error("XX000", &format!("graph stats decode failed: {e}")))?;
let aggregated = aggregate_by_collection(merged);
if verbose {
encode_verbose_response(aggregated)
} else {
encode_compact_response(aggregated)
}
}
fn decode_merged_stats(payload: &[u8]) -> crate::Result<Vec<CollectionStats>> {
if payload.is_empty() {
return Ok(Vec::new());
}
zerompk::from_msgpack(payload).map_err(|e| crate::Error::Serialization {
format: "msgpack".into(),
detail: e.to_string(),
})
}
fn aggregate_by_collection(entries: Vec<CollectionStats>) -> Vec<CollectionStats> {
let mut label_acc: BTreeMap<String, BTreeMap<String, u64>> = BTreeMap::new();
let mut by_name: BTreeMap<String, CollectionStats> = BTreeMap::new();
for e in entries {
let acc = by_name
.entry(e.collection.clone())
.or_insert_with(|| CollectionStats::zero(e.collection.clone()));
acc.edge_count = acc.edge_count.saturating_add(e.edge_count);
acc.distinct_node_count = acc
.distinct_node_count
.saturating_add(e.distinct_node_count);
let labels = label_acc.entry(e.collection.clone()).or_default();
for (label, count) in e.labels {
let slot = labels.entry(label).or_insert(0);
*slot = slot.saturating_add(count);
}
}
let mut result: Vec<CollectionStats> = Vec::with_capacity(by_name.len());
for (collection, mut acc) in by_name {
let labels_map = label_acc.remove(&collection).unwrap_or_default();
let labels: Vec<(String, u64)> = labels_map.into_iter().collect();
acc.distinct_label_count = labels.len() as u64;
acc.labels = labels;
result.push(acc);
}
result
}
fn text_field(name: &str) -> FieldInfo {
FieldInfo::new(name.to_string(), None, None, Type::TEXT, FieldFormat::Text)
}
fn int8_field(name: &str) -> FieldInfo {
FieldInfo::new(name.to_string(), None, None, Type::INT8, FieldFormat::Text)
}
fn encode_compact_response(rows: Vec<CollectionStats>) -> PgWireResult<Vec<Response>> {
let schema = Arc::new(vec![
text_field("collection"),
int8_field("node_count"),
int8_field("edge_count"),
int8_field("distinct_label_count"),
text_field("labels"),
]);
let mut data_rows = Vec::with_capacity(rows.len());
for r in rows {
let labels_json = serde_json::Value::Array(
r.labels
.iter()
.map(|(name, count)| {
let mut m = serde_json::Map::new();
m.insert("label".into(), serde_json::Value::String(name.clone()));
m.insert("count".into(), serde_json::Value::Number((*count).into()));
serde_json::Value::Object(m)
})
.collect(),
)
.to_string();
let mut enc = DataRowEncoder::new(schema.clone());
enc.encode_field(&r.collection)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
enc.encode_field(&(r.distinct_node_count as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
enc.encode_field(&(r.edge_count as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
enc.encode_field(&(r.distinct_label_count as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
enc.encode_field(&labels_json)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
data_rows.push(Ok(enc.take_row()));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(data_rows),
))])
}
fn encode_verbose_response(rows: Vec<CollectionStats>) -> PgWireResult<Vec<Response>> {
let schema = Arc::new(vec![
text_field("collection"),
text_field("label"),
int8_field("edge_count"),
]);
let mut data_rows = Vec::new();
for r in &rows {
for (label, count) in &r.labels {
let mut enc = DataRowEncoder::new(schema.clone());
enc.encode_field(&r.collection)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
enc.encode_field(label)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
enc.encode_field(&(*count as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
data_rows.push(Ok(enc.take_row()));
}
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(data_rows),
))])
}