use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use nodedb_types::DatabaseId;
#[derive(Debug, Default)]
pub struct DatabaseCounters {
pub qps_total: AtomicU64,
pub memory_bytes: AtomicU64,
pub storage_bytes: AtomicU64,
pub connections: AtomicU64,
pub bridge_queue_depth: AtomicU64,
pub wal_commit_latency_p99_us: AtomicU64,
pub maintenance_cpu_seconds_total: AtomicU64,
pub mirror_lag_ms: AtomicU64,
}
#[derive(Debug, Default)]
pub struct DatabaseMetricsRegistry {
counters: RwLock<HashMap<String, Arc<DatabaseCounters>>>,
}
impl DatabaseMetricsRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn get_or_create(&self, db_name: &str) -> Arc<DatabaseCounters> {
{
let r = self.counters.read().unwrap_or_else(|p| p.into_inner());
if let Some(c) = r.get(db_name) {
return Arc::clone(c);
}
}
let mut w = self.counters.write().unwrap_or_else(|p| p.into_inner());
w.entry(db_name.to_string())
.or_insert_with(|| Arc::new(DatabaseCounters::default()))
.clone()
}
pub fn record_qps(&self, db_name: &str) {
self.get_or_create(db_name)
.qps_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn set_memory_bytes(&self, db_name: &str, bytes: u64) {
self.get_or_create(db_name)
.memory_bytes
.store(bytes, Ordering::Relaxed);
}
pub fn set_storage_bytes(&self, db_name: &str, bytes: u64) {
self.get_or_create(db_name)
.storage_bytes
.store(bytes, Ordering::Relaxed);
}
pub fn set_connections(&self, db_name: &str, count: u64) {
self.get_or_create(db_name)
.connections
.store(count, Ordering::Relaxed);
}
pub fn set_bridge_queue_depth(&self, db_name: &str, depth: u64) {
self.get_or_create(db_name)
.bridge_queue_depth
.store(depth, Ordering::Relaxed);
}
pub fn set_wal_latency_p99(&self, db_name: &str, us: u64) {
self.get_or_create(db_name)
.wal_commit_latency_p99_us
.store(us, Ordering::Relaxed);
}
pub fn set_mirror_lag_ms(&self, db_name: &str, lag_ms: u64) {
self.get_or_create(db_name)
.mirror_lag_ms
.store(lag_ms, Ordering::Relaxed);
}
pub fn add_maintenance_cpu_secs(&self, db_name: &str, secs: f64) {
let us = if secs.is_finite() && secs > 0.0 {
(secs * 1_000_000.0).round() as u64
} else {
0
};
if us == 0 {
return;
}
self.get_or_create(db_name)
.maintenance_cpu_seconds_total
.fetch_add(us, Ordering::Relaxed);
}
pub fn render_prometheus(&self, out: &mut String) {
use std::fmt::Write as _;
let r = self.counters.read().unwrap_or_else(|p| p.into_inner());
if r.is_empty() {
return;
}
let mut pairs: Vec<_> = r.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
macro_rules! emit_counter {
($name:literal, $help:literal, $field:ident) => {
let _ = write!(
out,
"# HELP {} {}\n# TYPE {} counter\n",
$name, $help, $name
);
for (db, c) in &pairs {
let _ = writeln!(
out,
r#"{}{{database="{}"}} {}"#,
$name,
db,
c.$field.load(Ordering::Relaxed)
);
}
out.push('\n');
};
}
macro_rules! emit_gauge {
($name:literal, $help:literal, $field:ident) => {
let _ = write!(out, "# HELP {} {}\n# TYPE {} gauge\n", $name, $help, $name);
for (db, c) in &pairs {
let _ = writeln!(
out,
r#"{}{{database="{}"}} {}"#,
$name,
db,
c.$field.load(Ordering::Relaxed)
);
}
out.push('\n');
};
}
emit_counter!(
"nodedb_database_qps_total",
"Cumulative requests admitted per database",
qps_total
);
emit_gauge!(
"nodedb_database_memory_used_bytes",
"Current memory usage per database in bytes",
memory_bytes
);
emit_gauge!(
"nodedb_database_storage_used_bytes",
"Current storage usage per database in bytes",
storage_bytes
);
emit_gauge!(
"nodedb_database_connections",
"Active connections per database",
connections
);
emit_gauge!(
"nodedb_database_bridge_queue_depth",
"SPSC bridge virtual-queue depth per database",
bridge_queue_depth
);
emit_gauge!(
"nodedb_database_wal_commit_latency_p99_us",
"WAL commit latency P99 in microseconds per database",
wal_commit_latency_p99_us
);
emit_counter!(
"nodedb_database_maintenance_cpu_us_total",
"Cumulative maintenance CPU microseconds consumed per database",
maintenance_cpu_seconds_total
);
emit_gauge!(
"nodedb_database_mirror_lag_ms",
"Mirror replication lag in milliseconds (wall-clock now minus last apply timestamp); \
zero for non-mirror or promoted databases",
mirror_lag_ms
);
}
}
#[derive(Debug, Clone)]
pub struct DatabaseQuotaMetrics {
pub database_id: DatabaseId,
pub database_name: String,
pub qps_total: u64,
pub memory_bytes_used: u64,
pub memory_bytes_limit: u64,
pub storage_bytes_used: u64,
pub storage_bytes_limit: u64,
pub connections_active: u64,
pub connections_limit: u64,
pub bridge_queue_depth: u64,
pub wal_commit_latency_p99_us: u64,
pub maintenance_cpu_seconds_used: f64,
pub maintenance_cpu_pct_limit: u8,
}
impl DatabaseQuotaMetrics {
pub fn is_over_quota(&self) -> bool {
(self.memory_bytes_limit > 0 && self.memory_bytes_used > self.memory_bytes_limit)
|| (self.storage_bytes_limit > 0 && self.storage_bytes_used > self.storage_bytes_limit)
|| (self.connections_limit > 0 && self.connections_active > self.connections_limit)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registry_get_or_create_idempotent() {
let reg = DatabaseMetricsRegistry::new();
let a = reg.get_or_create("mydb");
let b = reg.get_or_create("mydb");
assert!(Arc::ptr_eq(&a, &b));
}
#[test]
fn record_qps_increments() {
let reg = DatabaseMetricsRegistry::new();
reg.record_qps("db1");
reg.record_qps("db1");
let c = reg.get_or_create("db1");
assert_eq!(c.qps_total.load(Ordering::Relaxed), 2);
}
#[test]
fn render_prometheus_contains_labels() {
let reg = DatabaseMetricsRegistry::new();
reg.record_qps("alpha");
reg.record_qps("beta");
let mut out = String::new();
reg.render_prometheus(&mut out);
assert!(out.contains(r#"database="alpha""#));
assert!(out.contains(r#"database="beta""#));
assert!(out.contains("nodedb_database_qps_total"));
}
#[test]
fn over_quota_detection() {
let m = DatabaseQuotaMetrics {
database_id: DatabaseId::DEFAULT,
database_name: "test".into(),
qps_total: 0,
memory_bytes_used: 200,
memory_bytes_limit: 100,
storage_bytes_used: 0,
storage_bytes_limit: 0,
connections_active: 0,
connections_limit: 0,
bridge_queue_depth: 0,
wal_commit_latency_p99_us: 0,
maintenance_cpu_seconds_used: 0.0,
maintenance_cpu_pct_limit: 25,
};
assert!(m.is_over_quota());
}
}