use std::collections::HashMap;
use std::fmt::Write;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use super::histogram::AtomicHistogram;
const MAX_PROM_TENANTS: usize = 256;
const TENANT_OVERFLOW_BUCKET: &str = "__overflow__";
#[derive(Debug, Default)]
pub struct PurgeMetrics {
pub pending_by_tenant: RwLock<HashMap<u64, u64>>,
pub purge_duration_us: RwLock<HashMap<(u64, String), AtomicHistogram>>,
pub bytes_reclaimed: RwLock<HashMap<(u64, String, &'static str), AtomicU64>>,
pub l2_cleanup_queue_depth: RwLock<HashMap<u64, u64>>,
}
impl PurgeMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn set_pending_by_tenant(&self, snapshot: HashMap<u64, u64>) {
let mut m = self
.pending_by_tenant
.write()
.unwrap_or_else(|p| p.into_inner());
*m = snapshot;
}
pub fn record_purge_duration(&self, tenant: u64, engine: &str, duration_us: u64) {
let key = (tenant, engine.to_string());
let mut m = self
.purge_duration_us
.write()
.unwrap_or_else(|p| p.into_inner());
m.entry(key).or_default().observe(duration_us);
}
pub fn add_bytes_reclaimed(&self, tenant: u64, engine: &str, tier: &'static str, bytes: u64) {
let key = (tenant, engine.to_string(), tier);
let mut m = self
.bytes_reclaimed
.write()
.unwrap_or_else(|p| p.into_inner());
m.entry(key)
.or_default()
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn set_l2_cleanup_queue_depth(&self, snapshot: HashMap<u64, u64>) {
let mut m = self
.l2_cleanup_queue_depth
.write()
.unwrap_or_else(|p| p.into_inner());
*m = snapshot;
}
pub fn write_prometheus(&self, out: &mut String) {
self.write_pending(out);
self.write_durations(out);
self.write_bytes_reclaimed(out);
self.write_l2_queue(out);
}
fn write_pending(&self, out: &mut String) {
let m = self
.pending_by_tenant
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_deactivated_collections_pending_purge Soft-deleted collections inside the retention window\n\
# TYPE nodedb_deactivated_collections_pending_purge gauge\n",
);
let rows = cap_simple_tenant_map(&m);
for (label, count) in rows {
let _ = writeln!(
out,
r#"nodedb_deactivated_collections_pending_purge{{tenant="{label}"}} {count}"#
);
}
}
fn write_durations(&self, out: &mut String) {
let m = self
.purge_duration_us
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_collection_purge_duration_seconds End-to-end hard-delete duration per collection\n\
# TYPE nodedb_collection_purge_duration_seconds histogram\n",
);
let rows = cap_duration_tenant_map(&m);
for (label, engine, hist) in rows {
let name = format!(
r#"nodedb_collection_purge_duration_seconds{{tenant="{label}",engine="{engine}"}}"#
);
hist.write_prometheus(out, &name, "");
}
}
fn write_bytes_reclaimed(&self, out: &mut String) {
let m = self
.bytes_reclaimed
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_collection_purge_bytes_reclaimed_total Bytes reclaimed during hard-delete per tier\n\
# TYPE nodedb_collection_purge_bytes_reclaimed_total counter\n",
);
let rows = cap_bytes_tenant_map(&m);
for (label, engine, tier, v) in rows {
let _ = writeln!(
out,
r#"nodedb_collection_purge_bytes_reclaimed_total{{tenant="{label}",engine="{engine}",tier="{tier}"}} {v}"#
);
}
}
fn write_l2_queue(&self, out: &mut String) {
let m = self
.l2_cleanup_queue_depth
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_l2_cleanup_queue_depth Pending S3 delete operations per tenant\n\
# TYPE nodedb_l2_cleanup_queue_depth gauge\n",
);
let rows = cap_simple_tenant_map(&m);
for (label, depth) in rows {
let _ = writeln!(
out,
r#"nodedb_l2_cleanup_queue_depth{{tenant="{label}"}} {depth}"#
);
}
}
}
fn cap_simple_tenant_map(map: &HashMap<u64, u64>) -> Vec<(String, u64)> {
if map.len() <= MAX_PROM_TENANTS {
let mut rows: Vec<(String, u64)> = map.iter().map(|(t, v)| (t.to_string(), *v)).collect();
rows.sort_by(|a, b| a.0.cmp(&b.0));
return rows;
}
let mut sorted: Vec<(u64, u64)> = map.iter().map(|(t, v)| (*t, *v)).collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
let mut rows: Vec<(String, u64)> = sorted[..MAX_PROM_TENANTS]
.iter()
.map(|(t, v)| (t.to_string(), *v))
.collect();
rows.sort_by(|a, b| a.0.cmp(&b.0));
let overflow: u64 = sorted[MAX_PROM_TENANTS..].iter().map(|(_, v)| v).sum();
rows.push((TENANT_OVERFLOW_BUCKET.to_string(), overflow));
rows
}
fn cap_duration_tenant_map(
map: &HashMap<(u64, String), AtomicHistogram>,
) -> Vec<(String, String, AtomicHistogram)> {
let mut tenant_counts: HashMap<u64, u64> = HashMap::new();
for ((tenant, _), hist) in map.iter() {
*tenant_counts.entry(*tenant).or_default() += hist.count();
}
let kept_tenants: std::collections::HashSet<u64> = if tenant_counts.len() <= MAX_PROM_TENANTS {
tenant_counts.keys().copied().collect()
} else {
let mut ranked: Vec<(u64, u64)> = tenant_counts.into_iter().collect();
ranked.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
ranked[..MAX_PROM_TENANTS].iter().map(|(t, _)| *t).collect()
};
let mut kept: Vec<(String, String, AtomicHistogram)> = Vec::new();
let mut overflow_by_engine: HashMap<String, AtomicHistogram> = HashMap::new();
for ((tenant, engine), hist) in map.iter() {
if kept_tenants.contains(tenant) {
kept.push((tenant.to_string(), engine.clone(), hist.snapshot()));
} else {
overflow_by_engine
.entry(engine.clone())
.or_default()
.merge(hist);
}
}
for (engine, hist) in overflow_by_engine {
kept.push((TENANT_OVERFLOW_BUCKET.to_string(), engine, hist));
}
kept.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
kept
}
fn cap_bytes_tenant_map(
map: &HashMap<(u64, String, &'static str), AtomicU64>,
) -> Vec<(String, String, &'static str, u64)> {
let mut tenant_totals: HashMap<u64, u64> = HashMap::new();
for ((tenant, _, _), counter) in map.iter() {
*tenant_totals.entry(*tenant).or_default() += counter.load(Ordering::Relaxed);
}
let kept_tenants: std::collections::HashSet<u64> = if tenant_totals.len() <= MAX_PROM_TENANTS {
tenant_totals.keys().copied().collect()
} else {
let mut ranked: Vec<(u64, u64)> = tenant_totals.into_iter().collect();
ranked.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
ranked[..MAX_PROM_TENANTS].iter().map(|(t, _)| *t).collect()
};
let mut kept: Vec<(String, String, &'static str, u64)> = Vec::new();
let mut overflow: HashMap<(String, &'static str), u64> = HashMap::new();
for ((tenant, engine, tier), counter) in map.iter() {
let v = counter.load(Ordering::Relaxed);
if kept_tenants.contains(tenant) {
kept.push((tenant.to_string(), engine.clone(), tier, v));
} else {
*overflow.entry((engine.clone(), tier)).or_default() += v;
}
}
for ((engine, tier), v) in overflow {
kept.push((TENANT_OVERFLOW_BUCKET.to_string(), engine, tier, v));
}
kept.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)).then(a.2.cmp(b.2)));
kept
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pending_renders_per_tenant() {
let m = PurgeMetrics::new();
let mut snap = HashMap::new();
snap.insert(1_u64, 3);
snap.insert(7_u64, 0);
m.set_pending_by_tenant(snap);
let mut out = String::new();
m.write_prometheus(&mut out);
assert!(out.contains(r#"nodedb_deactivated_collections_pending_purge{tenant="1"} 3"#));
assert!(out.contains(r#"nodedb_deactivated_collections_pending_purge{tenant="7"} 0"#));
}
#[test]
fn bytes_reclaimed_accumulates_per_tier() {
let m = PurgeMetrics::new();
m.add_bytes_reclaimed(2_u64, "columnar", "l1", 1_000);
m.add_bytes_reclaimed(2_u64, "columnar", "l1", 500);
m.add_bytes_reclaimed(2_u64, "columnar", "l2", 9_000);
let mut out = String::new();
m.write_prometheus(&mut out);
assert!(out.contains(
r#"nodedb_collection_purge_bytes_reclaimed_total{tenant="2",engine="columnar",tier="l1"} 1500"#
));
assert!(out.contains(
r#"nodedb_collection_purge_bytes_reclaimed_total{tenant="2",engine="columnar",tier="l2"} 9000"#
));
}
#[test]
fn l2_queue_gauge_renders() {
let m = PurgeMetrics::new();
let mut snap = HashMap::new();
snap.insert(4_u64, 17);
m.set_l2_cleanup_queue_depth(snap);
let mut out = String::new();
m.write_prometheus(&mut out);
assert!(out.contains(r#"nodedb_l2_cleanup_queue_depth{tenant="4"} 17"#));
}
#[test]
fn empty_renders_nothing() {
let m = PurgeMetrics::new();
let mut out = String::new();
m.write_prometheus(&mut out);
assert!(out.is_empty());
}
#[test]
fn duration_histogram_emits() {
let m = PurgeMetrics::new();
m.record_purge_duration(1_u64, "document_schemaless", 12_000);
m.record_purge_duration(1_u64, "document_schemaless", 34_000);
let mut out = String::new();
m.write_prometheus(&mut out);
assert!(out.contains(r#"tenant="1""#));
assert!(out.contains(r#"engine="document_schemaless""#));
}
#[test]
fn pending_under_cap_emits_all_rows() {
let m = PurgeMetrics::new();
let snap: HashMap<u64, u64> = (0..MAX_PROM_TENANTS as u64).map(|i| (i, i)).collect();
m.set_pending_by_tenant(snap);
let mut out = String::new();
m.write_prometheus(&mut out);
let individual = out
.lines()
.filter(|l| {
l.starts_with("nodedb_deactivated_collections_pending_purge")
&& !l.contains(TENANT_OVERFLOW_BUCKET)
&& !l.starts_with('#')
})
.count();
assert_eq!(individual, MAX_PROM_TENANTS);
assert!(!out.contains(TENANT_OVERFLOW_BUCKET));
}
#[test]
fn pending_over_cap_produces_overflow_row() {
let m = PurgeMetrics::new();
let total = 300_u64;
let snap: HashMap<u64, u64> = (0..total).map(|i| (i, i + 1)).collect();
m.set_pending_by_tenant(snap);
let mut out = String::new();
m.write_prometheus(&mut out);
let individual: Vec<&str> = out
.lines()
.filter(|l| {
l.starts_with("nodedb_deactivated_collections_pending_purge")
&& !l.contains(TENANT_OVERFLOW_BUCKET)
&& !l.starts_with('#')
})
.collect();
assert_eq!(
individual.len(),
MAX_PROM_TENANTS,
"expected 256 individual rows"
);
let overflow_line = out
.lines()
.find(|l| {
l.starts_with("nodedb_deactivated_collections_pending_purge")
&& l.contains(TENANT_OVERFLOW_BUCKET)
})
.expect("overflow row missing");
let expected_overflow: u64 = (1..=44).sum();
assert!(
overflow_line.ends_with(&format!(" {expected_overflow}")),
"overflow value wrong: {overflow_line}"
);
}
#[test]
fn overflow_value_equals_sum_of_dropped_tenants() {
let total: u64 = 300;
let map: HashMap<u64, u64> = (0..total).map(|i| (i, i + 1)).collect();
let rows = cap_simple_tenant_map(&map);
assert_eq!(rows.len(), MAX_PROM_TENANTS + 1);
let overflow_row = rows
.iter()
.find(|(label, _)| label == TENANT_OVERFLOW_BUCKET)
.expect("no overflow row");
let expected: u64 = (1..=44).sum();
assert_eq!(overflow_row.1, expected);
}
}