use std::time::Duration;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;
use tracing::trace;
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub(crate) struct BuildInfoLabels {
pub(crate) version: String,
pub(crate) peer_id: String,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub(crate) struct DeltaApplyLabels {
pub(crate) outcome: String,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub(crate) struct GovernanceDrainLabels {
pub(crate) outcome: String,
}
#[derive(Clone, Debug)]
pub(crate) struct NodeMetrics {
pub(crate) build_info: Family<BuildInfoLabels, Gauge>,
pub(crate) blob_cache_entries: Gauge,
pub(crate) blob_cache_size_bytes: Gauge,
pub(crate) delta_stores_count: Gauge,
pub(crate) sync_sessions_active: Gauge,
pub(crate) governance_pending_contexts: Gauge,
pub(crate) governance_pending_queue_depth: Gauge,
pub(crate) specialized_node_pending_invites: Gauge,
pub(crate) blob_cache_evictions_age_total: Counter,
pub(crate) blob_cache_evictions_count_total: Counter,
pub(crate) blob_cache_evictions_memory_total: Counter,
pub(crate) delta_outcomes_total: Family<DeltaApplyLabels, Counter>,
pub(crate) delta_cascade_size: Histogram,
pub(crate) delta_missing_parents_total: Counter,
pub(crate) governance_drain_outcomes_total: Family<GovernanceDrainLabels, Counter>,
pub(crate) process_resident_memory_bytes: Gauge,
pub(crate) process_virtual_memory_bytes: Gauge,
pub(crate) process_threads: Gauge,
pub(crate) process_open_fds: Gauge,
}
impl NodeMetrics {
pub(crate) fn new(registry: &mut Registry) -> Self {
let build_info: Family<BuildInfoLabels, Gauge> = Family::default();
registry.register(
"merod_build_info",
"Constant 1 gauge labeled with merod version and peer_id — \
operators use it to verify the metrics pipeline end-to-end",
build_info.clone(),
);
let blob_cache_entries = Gauge::default();
registry.register(
"blob_cache_entries",
"Number of blobs currently held in the in-memory blob cache",
blob_cache_entries.clone(),
);
let blob_cache_size_bytes = Gauge::default();
registry.register(
"blob_cache_size_bytes",
"Total resident bytes across all blobs in the blob cache",
blob_cache_size_bytes.clone(),
);
let delta_stores_count = Gauge::default();
registry.register(
"delta_stores_count",
"Number of contexts with a live in-memory DeltaStore",
delta_stores_count.clone(),
);
let sync_sessions_active = Gauge::default();
registry.register(
"sync_sessions_active",
"Number of contexts with an open snapshot-sync session (buffering deltas)",
sync_sessions_active.clone(),
);
let governance_pending_contexts = Gauge::default();
registry.register(
"governance_pending_contexts",
"Number of contexts that currently have at least one delta in the \
B2 governance-pending buffer",
governance_pending_contexts.clone(),
);
let governance_pending_queue_depth = Gauge::default();
registry.register(
"governance_pending_queue_depth",
"Sum of governance-pending buffer depths across all contexts — \
monotonic growth indicates B2 buffer cannot drain",
governance_pending_queue_depth.clone(),
);
let specialized_node_pending_invites = Gauge::default();
registry.register(
"specialized_node_pending_invites",
"Number of in-flight specialized-node invite verifications",
specialized_node_pending_invites.clone(),
);
let blob_cache_evictions_age_total = Counter::default();
registry.register(
"blob_cache_evictions_age_total",
"Blob cache entries evicted because they exceeded MAX_BLOB_AGE_S",
blob_cache_evictions_age_total.clone(),
);
let blob_cache_evictions_count_total = Counter::default();
registry.register(
"blob_cache_evictions_count_total",
"Blob cache entries evicted to keep entry count under MAX_BLOB_CACHE_COUNT",
blob_cache_evictions_count_total.clone(),
);
let blob_cache_evictions_memory_total = Counter::default();
registry.register(
"blob_cache_evictions_memory_total",
"Blob cache entries evicted to keep resident bytes under MAX_BLOB_CACHE_SIZE_BYTES",
blob_cache_evictions_memory_total.clone(),
);
let delta_outcomes_total: Family<DeltaApplyLabels, Counter> = Family::default();
registry.register(
"delta_outcomes_total",
"DAG delta-apply outcomes, sliced by outcome \
(applied, pending, cascaded, duplicate, error)",
delta_outcomes_total.clone(),
);
let delta_cascade_size = Histogram::new([0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 256.0]);
registry.register(
"delta_cascade_size",
"Number of pending deltas applied via cascade when a parent finally lands",
delta_cascade_size.clone(),
);
let delta_missing_parents_total = Counter::default();
registry.register(
"delta_missing_parents_total",
"Number of missing-parent requests issued to peers across all contexts",
delta_missing_parents_total.clone(),
);
let governance_drain_outcomes_total: Family<GovernanceDrainLabels, Counter> =
Family::default();
registry.register(
"governance_drain_outcomes_total",
"B2 governance-pending drain outcomes per delta",
governance_drain_outcomes_total.clone(),
);
let process_resident_memory_bytes = Gauge::default();
registry.register(
"process_resident_memory_bytes",
"Resident set size of the merod process, in bytes (linux only; 0 elsewhere)",
process_resident_memory_bytes.clone(),
);
let process_virtual_memory_bytes = Gauge::default();
registry.register(
"process_virtual_memory_bytes",
"Virtual memory size of the merod process, in bytes (linux only; 0 elsewhere)",
process_virtual_memory_bytes.clone(),
);
let process_threads = Gauge::default();
registry.register(
"process_threads",
"Thread count of the merod process (linux only; 0 elsewhere)",
process_threads.clone(),
);
let process_open_fds = Gauge::default();
registry.register(
"process_open_fds",
"Open file descriptors of the merod process (linux only; 0 elsewhere)",
process_open_fds.clone(),
);
Self {
build_info,
blob_cache_entries,
blob_cache_size_bytes,
delta_stores_count,
sync_sessions_active,
governance_pending_contexts,
governance_pending_queue_depth,
specialized_node_pending_invites,
blob_cache_evictions_age_total,
blob_cache_evictions_count_total,
blob_cache_evictions_memory_total,
delta_outcomes_total,
delta_cascade_size,
delta_missing_parents_total,
governance_drain_outcomes_total,
process_resident_memory_bytes,
process_virtual_memory_bytes,
process_threads,
process_open_fds,
}
}
pub(crate) fn set_build_info(&self, version: &str, peer_id: &str) {
self.build_info
.get_or_create(&BuildInfoLabels {
version: version.to_owned(),
peer_id: peer_id.to_owned(),
})
.set(1);
}
}
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct NodeStateSnapshot {
pub blob_cache_entries: usize,
pub blob_cache_size_bytes: usize,
pub delta_stores_count: usize,
pub sync_sessions_active: usize,
pub governance_pending_contexts: usize,
pub governance_pending_queue_depth: usize,
pub specialized_node_pending_invites: usize,
}
impl NodeStateSnapshot {
pub(crate) fn capture(state: &crate::state::NodeState) -> Self {
let blob_cache_entries = state.blob_cache.len();
let blob_cache_size_bytes = state
.blob_cache
.iter()
.map(|entry| entry.value().data.len())
.sum();
let delta_stores_count = state.delta_stores.len();
let sync_sessions_active = state.sync_sessions.len();
let mut governance_pending_contexts = 0;
let mut governance_pending_queue_depth = 0;
for entry in state.governance_pending.iter() {
governance_pending_contexts += 1;
governance_pending_queue_depth += entry.value().len();
}
let specialized_node_pending_invites = state.pending_specialized_node_invites.len();
Self {
blob_cache_entries,
blob_cache_size_bytes,
delta_stores_count,
sync_sessions_active,
governance_pending_contexts,
governance_pending_queue_depth,
specialized_node_pending_invites,
}
}
pub(crate) fn publish(&self, metrics: &NodeMetrics) {
metrics
.blob_cache_entries
.set(self.blob_cache_entries as i64);
metrics
.blob_cache_size_bytes
.set(self.blob_cache_size_bytes as i64);
metrics
.delta_stores_count
.set(self.delta_stores_count as i64);
metrics
.sync_sessions_active
.set(self.sync_sessions_active as i64);
metrics
.governance_pending_contexts
.set(self.governance_pending_contexts as i64);
metrics
.governance_pending_queue_depth
.set(self.governance_pending_queue_depth as i64);
metrics
.specialized_node_pending_invites
.set(self.specialized_node_pending_invites as i64);
}
}
pub(crate) const METRICS_TICK_INTERVAL: Duration = Duration::from_secs(30);
pub(crate) fn spawn_metrics_tick(
metrics: NodeMetrics,
state: crate::state::NodeState,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(METRICS_TICK_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let _ = interval.tick().await;
loop {
interval.tick().await;
let snapshot = NodeStateSnapshot::capture(&state);
trace!(?snapshot, "node_metrics tick");
snapshot.publish(&metrics);
update_process_metrics(&metrics);
}
})
}
fn update_process_metrics(metrics: &NodeMetrics) {
#[cfg(target_os = "linux")]
{
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
for line in status.lines() {
if let Some(rest) = line.strip_prefix("VmRSS:") {
if let Some(kb) = rest
.split_whitespace()
.next()
.and_then(|v| v.parse::<i64>().ok())
{
metrics
.process_resident_memory_bytes
.set(kb.saturating_mul(1024));
}
} else if let Some(rest) = line.strip_prefix("VmSize:") {
if let Some(kb) = rest
.split_whitespace()
.next()
.and_then(|v| v.parse::<i64>().ok())
{
metrics
.process_virtual_memory_bytes
.set(kb.saturating_mul(1024));
}
} else if let Some(rest) = line.strip_prefix("Threads:") {
if let Some(n) = rest
.split_whitespace()
.next()
.and_then(|v| v.parse::<i64>().ok())
{
metrics.process_threads.set(n);
}
}
}
}
if let Ok(fd_dir) = std::fs::read_dir("/proc/self/fd") {
let count = (fd_dir.filter_map(Result::ok).count() as i64).saturating_sub(1);
metrics.process_open_fds.set(count);
}
}
#[cfg(not(target_os = "linux"))]
{
let _ = metrics;
}
}
static GLOBAL: std::sync::OnceLock<NodeMetrics> = std::sync::OnceLock::new();
pub(crate) fn install_global(metrics: NodeMetrics) {
let _ = GLOBAL.set(metrics);
}
pub(crate) fn global() -> Option<&'static NodeMetrics> {
GLOBAL.get()
}
pub(crate) fn record_delta_outcome(outcome: &str) {
if let Some(m) = global() {
m.delta_outcomes_total
.get_or_create(&DeltaApplyLabels {
outcome: outcome.to_owned(),
})
.inc();
}
}
pub(crate) fn observe_delta_cascade(size: usize) {
if let Some(m) = global() {
m.delta_cascade_size.observe(size as f64);
}
}
pub(crate) fn record_missing_parents_request(count: usize) {
if let Some(m) = global() {
m.delta_missing_parents_total.inc_by(count as u64);
}
}
pub(crate) fn record_governance_drain_outcome(outcome: &str) {
if let Some(m) = global() {
m.governance_drain_outcomes_total
.get_or_create(&GovernanceDrainLabels {
outcome: outcome.to_owned(),
})
.inc();
}
}
pub(crate) fn record_blob_cache_eviction(reason: &str, n: u64) {
if let Some(m) = global() {
let counter = match reason {
"age" => &m.blob_cache_evictions_age_total,
"count" => &m.blob_cache_evictions_count_total,
"memory" => &m.blob_cache_evictions_memory_total,
_ => return,
};
counter.inc_by(n);
}
}