use std::collections::BTreeMap;
use std::fmt::Write as _;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConflictKey {
pub tenant: u64,
pub engine: &'static str,
pub collection: String,
}
pub const EPOCH_DURATION_BUCKETS: &[u64] = &[1, 5, 10, 20, 50, 100, 250, 500, 1000];
pub struct SequencerMetrics {
pub epochs_total: AtomicU64,
pub admitted_total: AtomicU64,
pub rejected_conflict_total: AtomicU64,
pub rejected_txn_too_large: AtomicU64,
pub rejected_fanout_too_wide: AtomicU64,
pub rejected_inbox_full: AtomicU64,
pub rejected_not_leader: AtomicU64,
pub rejected_tenant_quota_exceeded: AtomicU64,
pub conflict_by_context: Mutex<BTreeMap<ConflictKey, u64>>,
pub epoch_duration_buckets: [AtomicU64; 10],
pub epoch_duration_sum_ms: AtomicU64,
pub inbox_depth: AtomicU64,
}
impl SequencerMetrics {
pub fn new() -> std::sync::Arc<Self> {
std::sync::Arc::new(Self::default())
}
pub fn record_conflict(&self, key: ConflictKey) {
let mut map = self
.conflict_by_context
.lock()
.unwrap_or_else(|p| p.into_inner());
*map.entry(key).or_insert(0) += 1;
}
pub fn record_epoch_duration_ms(&self, ms: u64) {
self.epoch_duration_sum_ms.fetch_add(ms, Ordering::Relaxed);
let bucket_idx = EPOCH_DURATION_BUCKETS
.iter()
.position(|&b| ms <= b)
.unwrap_or(EPOCH_DURATION_BUCKETS.len());
self.epoch_duration_buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
}
pub fn render_prometheus(&self) -> String {
let mut out = String::new();
macro_rules! counter {
($name:expr, $help:expr, $value:expr) => {
let _ = writeln!(out, "# HELP {} {}", $name, $help);
let _ = writeln!(out, "# TYPE {} counter", $name);
let _ = writeln!(out, "{} {}", $name, $value);
};
}
counter!(
"nodedb_sequencer_epochs_total",
"Total epochs processed by the sequencer leader.",
self.epochs_total.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"# HELP nodedb_sequencer_admitted_txns_total \
Total transactions processed by the sequencer, by outcome."
);
let _ = writeln!(out, "# TYPE nodedb_sequencer_admitted_txns_total counter");
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"admitted\"}} {}",
self.admitted_total.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"rejected_conflict\"}} {}",
self.rejected_conflict_total.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"rejected_inbox_full\"}} {}",
self.rejected_inbox_full.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"rejected_txn_too_large\"}} {}",
self.rejected_txn_too_large.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"rejected_fanout_too_wide\"}} {}",
self.rejected_fanout_too_wide.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"rejected_tenant_quota\"}} {}",
self.rejected_tenant_quota_exceeded.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_sequencer_admitted_txns_total{{outcome=\"rejected_not_leader\"}} {}",
self.rejected_not_leader.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"# HELP nodedb_sequencer_inbox_depth Current number of transactions queued in the inbox."
);
let _ = writeln!(out, "# TYPE nodedb_sequencer_inbox_depth gauge");
let _ = writeln!(
out,
"nodedb_sequencer_inbox_depth {}",
self.inbox_depth.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"# HELP nodedb_sequencer_epoch_duration_ms \
Duration of each sequencer epoch tick in milliseconds."
);
let _ = writeln!(out, "# TYPE nodedb_sequencer_epoch_duration_ms histogram");
let mut cumulative: u64 = 0;
for (i, &boundary) in EPOCH_DURATION_BUCKETS.iter().enumerate() {
cumulative += self.epoch_duration_buckets[i].load(Ordering::Relaxed);
let _ = writeln!(
out,
"nodedb_sequencer_epoch_duration_ms_bucket{{le=\"{boundary}\"}} {cumulative}"
);
}
cumulative +=
self.epoch_duration_buckets[EPOCH_DURATION_BUCKETS.len()].load(Ordering::Relaxed);
let _ = writeln!(
out,
"nodedb_sequencer_epoch_duration_ms_bucket{{le=\"+Inf\"}} {cumulative}"
);
let _ = writeln!(
out,
"nodedb_sequencer_epoch_duration_ms_sum {}",
self.epoch_duration_sum_ms.load(Ordering::Relaxed)
);
let _ = writeln!(out, "nodedb_sequencer_epoch_duration_ms_count {cumulative}");
let _ = writeln!(
out,
"# HELP nodedb_sequencer_conflict_by_context_total \
Conflict rejections keyed by tenant, engine, and collection."
);
let _ = writeln!(
out,
"# TYPE nodedb_sequencer_conflict_by_context_total counter"
);
let map = self
.conflict_by_context
.lock()
.unwrap_or_else(|p| p.into_inner());
if map.is_empty() {
let _ = writeln!(
out,
"nodedb_sequencer_conflict_by_context_total\
{{tenant=\"\",engine=\"\",collection=\"\"}} 0"
);
} else {
for (key, count) in map.iter() {
let _ = writeln!(
out,
"nodedb_sequencer_conflict_by_context_total\
{{tenant=\"{}\",engine=\"{}\",collection=\"{}\"}} {}",
key.tenant, key.engine, key.collection, count
);
}
}
out
}
}
impl Default for SequencerMetrics {
fn default() -> Self {
Self {
epochs_total: AtomicU64::new(0),
admitted_total: AtomicU64::new(0),
rejected_conflict_total: AtomicU64::new(0),
rejected_txn_too_large: AtomicU64::new(0),
rejected_fanout_too_wide: AtomicU64::new(0),
rejected_inbox_full: AtomicU64::new(0),
rejected_not_leader: AtomicU64::new(0),
rejected_tenant_quota_exceeded: AtomicU64::new(0),
conflict_by_context: Mutex::new(BTreeMap::new()),
epoch_duration_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
epoch_duration_sum_ms: AtomicU64::new(0),
inbox_depth: AtomicU64::new(0),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rejection_counters_increment() {
let m = SequencerMetrics::default();
m.rejected_txn_too_large.fetch_add(1, Ordering::Relaxed);
m.rejected_fanout_too_wide.fetch_add(2, Ordering::Relaxed);
m.rejected_inbox_full.fetch_add(3, Ordering::Relaxed);
m.rejected_not_leader.fetch_add(4, Ordering::Relaxed);
m.rejected_tenant_quota_exceeded
.fetch_add(5, Ordering::Relaxed);
assert_eq!(m.rejected_txn_too_large.load(Ordering::Relaxed), 1);
assert_eq!(m.rejected_fanout_too_wide.load(Ordering::Relaxed), 2);
assert_eq!(m.rejected_inbox_full.load(Ordering::Relaxed), 3);
assert_eq!(m.rejected_not_leader.load(Ordering::Relaxed), 4);
assert_eq!(m.rejected_tenant_quota_exceeded.load(Ordering::Relaxed), 5);
}
#[test]
fn record_conflict_increments_map() {
let m = SequencerMetrics::default();
let key = ConflictKey {
tenant: 42,
engine: "document",
collection: "orders".to_owned(),
};
m.record_conflict(key.clone());
m.record_conflict(key.clone());
let map = m
.conflict_by_context
.lock()
.unwrap_or_else(|p| p.into_inner());
assert_eq!(*map.get(&key).unwrap(), 2);
}
#[test]
fn render_prometheus_includes_conflict_labels() {
let m = SequencerMetrics::default();
m.record_conflict(ConflictKey {
tenant: 1,
engine: "kv",
collection: "cache".to_owned(),
});
let output = m.render_prometheus();
assert!(
output.contains("tenant=\"1\",engine=\"kv\",collection=\"cache\""),
"expected conflict label in output:\n{output}"
);
}
#[test]
fn render_prometheus_uses_nodedb_prefix() {
let m = SequencerMetrics::default();
let output = m.render_prometheus();
for line in output.lines() {
if line.starts_with('#') || line.is_empty() {
continue;
}
assert!(
line.starts_with("nodedb_"),
"metric line missing nodedb_ prefix: {line}"
);
}
}
#[test]
fn epoch_duration_histogram_records_into_correct_bucket() {
let m = SequencerMetrics::default();
m.record_epoch_duration_ms(10); m.record_epoch_duration_ms(1000); m.record_epoch_duration_ms(2000); let output = m.render_prometheus();
assert!(
output.contains("nodedb_sequencer_epoch_duration_ms_bucket"),
"histogram output missing: {output}"
);
assert!(
output.contains("nodedb_sequencer_epoch_duration_ms_sum 3010"),
"sum incorrect in: {output}"
);
}
#[test]
fn admitted_txns_total_has_outcome_labels() {
let m = SequencerMetrics::default();
m.admitted_total.fetch_add(5, Ordering::Relaxed);
m.rejected_conflict_total.fetch_add(2, Ordering::Relaxed);
let output = m.render_prometheus();
assert!(
output.contains("outcome=\"admitted\"} 5"),
"admitted outcome missing: {output}"
);
assert!(
output.contains("outcome=\"rejected_conflict\"} 2"),
"rejected_conflict outcome missing: {output}"
);
}
}