use super::fields::SystemMetrics;
impl SystemMetrics {
pub fn to_prometheus(&self) -> String {
let mut out = String::with_capacity(8192);
self.prometheus_core(&mut out);
self.prometheus_engines(&mut out);
self.prometheus_catalog_sanity(&mut out);
self.prometheus_shutdown_phases(&mut out);
self.prometheus_cdc_stream_drops(&mut out);
self.prometheus_backpressure(&mut out);
self.prometheus_database_metrics(&mut out);
self.purge.write_prometheus(&mut out);
self.io_metrics.write_prometheus(&mut out);
out
}
pub(super) fn prometheus_database_metrics(&self, out: &mut String) {
use std::fmt::Write as _;
let queries = self
.database_queries_by_name
.read()
.unwrap_or_else(|p| p.into_inner());
if !queries.is_empty() {
let _ = out.write_str(
"# HELP nodedb_database_queries_total Queries executed against each database\n\
# TYPE nodedb_database_queries_total counter\n",
);
let mut pairs: Vec<_> = queries.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (db, count) in pairs {
let _ = writeln!(
out,
r#"nodedb_database_queries_total{{database="{db}"}} {count}"#
);
}
}
drop(queries);
let errors = self
.database_errors_by_name
.read()
.unwrap_or_else(|p| p.into_inner());
if !errors.is_empty() {
let _ = out.write_str(
"# HELP nodedb_database_errors_total Query errors against each database\n\
# TYPE nodedb_database_errors_total counter\n",
);
let mut pairs: Vec<_> = errors.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (db, count) in pairs {
let _ = writeln!(
out,
r#"nodedb_database_errors_total{{database="{db}"}} {count}"#
);
}
}
drop(errors);
let colls = self
.database_collections_by_name
.read()
.unwrap_or_else(|p| p.into_inner());
if !colls.is_empty() {
let _ = out.write_str(
"# HELP nodedb_database_collections_total Collections registered in each database\n\
# TYPE nodedb_database_collections_total gauge\n",
);
let mut pairs: Vec<_> = colls.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (db, count) in pairs {
let _ = writeln!(
out,
r#"nodedb_database_collections_total{{database="{db}"}} {count}"#
);
}
}
drop(colls);
let tenants = self
.database_tenants_by_name
.read()
.unwrap_or_else(|p| p.into_inner());
if !tenants.is_empty() {
let _ = out.write_str(
"# HELP nodedb_database_tenants_total Tenants registered in each database\n\
# TYPE nodedb_database_tenants_total gauge\n",
);
let mut pairs: Vec<_> = tenants.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (db, count) in pairs {
let _ = writeln!(
out,
r#"nodedb_database_tenants_total{{database="{db}"}} {count}"#
);
}
}
drop(tenants);
let mem = self
.database_memory_bytes_by_name
.read()
.unwrap_or_else(|p| p.into_inner());
if !mem.is_empty() {
let _ = out.write_str(
"# HELP nodedb_database_memory_bytes Memory used by each database (bytes)\n\
# TYPE nodedb_database_memory_bytes gauge\n",
);
let mut pairs: Vec<_> = mem.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (db, bytes) in pairs {
let _ = writeln!(
out,
r#"nodedb_database_memory_bytes{{database="{db}"}} {bytes}"#
);
}
}
drop(mem);
let storage = self
.database_storage_bytes_by_name
.read()
.unwrap_or_else(|p| p.into_inner());
if !storage.is_empty() {
let _ = out.write_str(
"# HELP nodedb_database_storage_bytes Storage used by each database (bytes)\n\
# TYPE nodedb_database_storage_bytes gauge\n",
);
let mut pairs: Vec<_> = storage.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (db, bytes) in pairs {
let _ = writeln!(
out,
r#"nodedb_database_storage_bytes{{database="{db}"}} {bytes}"#
);
}
}
}
pub fn record_database_query(&self, db_name: &str) {
if let Ok(mut m) = self.database_queries_by_name.write() {
*m.entry(db_name.to_string()).or_insert(0) += 1;
}
}
pub fn record_database_error(&self, db_name: &str) {
if let Ok(mut m) = self.database_errors_by_name.write() {
*m.entry(db_name.to_string()).or_insert(0) += 1;
}
}
pub fn set_database_collections(&self, db_name: &str, count: u64) {
if let Ok(mut m) = self.database_collections_by_name.write() {
m.insert(db_name.to_string(), count);
}
}
pub fn set_database_tenants(&self, db_name: &str, count: u64) {
if let Ok(mut m) = self.database_tenants_by_name.write() {
m.insert(db_name.to_string(), count);
}
}
pub fn set_database_memory_bytes(&self, db_name: &str, bytes: u64) {
if let Ok(mut m) = self.database_memory_bytes_by_name.write() {
m.insert(db_name.to_string(), bytes);
}
}
pub fn database_memory_bytes(&self, db_name: &str) -> u64 {
self.database_memory_bytes_by_name
.read()
.ok()
.and_then(|m| m.get(db_name).copied())
.unwrap_or(0)
}
pub fn database_storage_bytes(&self, db_name: &str) -> u64 {
self.database_storage_bytes_by_name
.read()
.ok()
.and_then(|m| m.get(db_name).copied())
.unwrap_or(0)
}
pub fn database_queries_total(&self, db_name: &str) -> u64 {
self.database_queries_by_name
.read()
.ok()
.and_then(|m| m.get(db_name).copied())
.unwrap_or(0)
}
pub fn set_database_storage_bytes(&self, db_name: &str, bytes: u64) {
if let Ok(mut m) = self.database_storage_bytes_by_name.write() {
m.insert(db_name.to_string(), bytes);
}
}
pub(super) fn prometheus_backpressure(&self, out: &mut String) {
use std::fmt::Write as _;
let critical = self
.backpressure_critical_by_engine
.read()
.unwrap_or_else(|p| p.into_inner());
let emergency = self
.backpressure_emergency_by_engine
.read()
.unwrap_or_else(|p| p.into_inner());
if !critical.is_empty() {
let _ = out.write_str(
"# HELP nodedb_backpressure_critical_total Write handlers that entered Critical-pressure flush path\n\
# TYPE nodedb_backpressure_critical_total counter\n",
);
let mut pairs: Vec<_> = critical.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (engine, count) in pairs {
let _ = writeln!(
out,
r#"nodedb_backpressure_critical_total{{engine="{engine}"}} {count}"#
);
}
}
if !emergency.is_empty() {
let _ = out.write_str(
"# HELP nodedb_backpressure_emergency_total Write handlers rejected by Emergency-pressure\n\
# TYPE nodedb_backpressure_emergency_total counter\n",
);
let mut pairs: Vec<_> = emergency.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (engine, count) in pairs {
let _ = writeln!(
out,
r#"nodedb_backpressure_emergency_total{{engine="{engine}"}} {count}"#
);
}
}
}
pub(super) fn prometheus_cdc_stream_drops(&self, out: &mut String) {
use std::fmt::Write as _;
let m = self
.cdc_events_dropped_by_stream
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_cdc_events_dropped_total CDC events dropped from stream buffers due to overflow\n\
# TYPE nodedb_cdc_events_dropped_total counter\n",
);
let mut pairs: Vec<_> = m.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for ((tenant_id, stream_name), count) in pairs {
let _ = writeln!(
out,
r#"nodedb_cdc_events_dropped_total{{tenant="{tenant_id}",stream="{stream_name}"}} {count}"#
);
}
}
pub(super) fn prometheus_shutdown_phases(&self, out: &mut String) {
use std::fmt::Write as _;
let m = self
.shutdown_phase_durations_ms
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_shutdown_phase_duration_seconds Duration of each shutdown phase in the last graceful shutdown\n\
# TYPE nodedb_shutdown_phase_duration_seconds gauge\n",
);
let mut pairs: Vec<_> = m.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for (phase, ms) in pairs {
let secs = *ms as f64 / 1_000.0;
let _ = writeln!(
out,
r#"nodedb_shutdown_phase_duration_seconds{{phase="{phase}"}} {secs}"#
);
}
}
pub(super) fn prometheus_catalog_sanity(&self, out: &mut String) {
use std::fmt::Write as _;
let m = self
.catalog_sanity_check_totals
.read()
.unwrap_or_else(|p| p.into_inner());
if m.is_empty() {
return;
}
let _ = out.write_str(
"# HELP nodedb_catalog_sanity_check_total Catalog sanity check outcomes per registry\n\
# TYPE nodedb_catalog_sanity_check_total counter\n",
);
let mut pairs: Vec<_> = m.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
for ((registry, outcome), count) in pairs {
let _ = writeln!(
out,
r#"nodedb_catalog_sanity_check_total{{registry="{registry}",outcome="{outcome}"}} {count}"#
);
}
}
pub fn prometheus_segment_quarantine_active(
out: &mut String,
active_counts: &std::collections::HashMap<(String, String), u64>,
) {
use std::fmt::Write as _;
if active_counts.is_empty() {
return;
}
let mut pairs: Vec<_> = active_counts.iter().collect();
pairs.sort_by(|a, b| a.0.cmp(b.0));
let _ = out.write_str(
"# HELP nodedb_segments_quarantined_active Currently-quarantined segment count per engine and collection\n\
# TYPE nodedb_segments_quarantined_active gauge\n",
);
for ((engine, collection), count) in &pairs {
let _ = writeln!(
out,
r#"nodedb_segments_quarantined_active{{engine="{engine}",collection="{collection}"}} {count}"#
);
}
let _ = out.write_str(
"# HELP nodedb_segments_quarantined_total Cumulative segments quarantined due to repeated CRC failures\n\
# TYPE nodedb_segments_quarantined_total counter\n",
);
for ((engine, collection), count) in pairs {
let _ = writeln!(
out,
r#"nodedb_segments_quarantined_total{{engine="{engine}",collection="{collection}"}} {count}"#
);
}
}
}