use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub const EXECUTOR_TXN_DURATION_BUCKETS: &[u64] = &[1, 5, 10, 50, 100, 500, 1000, 5000, 30_000];
pub struct SchedulerMetrics {
pub dispatch_count: AtomicU64,
pub blocked_count: AtomicU64,
pub lock_wait_ms_total: AtomicU64,
pub completed_count: AtomicU64,
pub executor_error_count: AtomicU64,
pub last_applied_epoch: AtomicU64,
pub executor_txn_duration_buckets: [AtomicU64; 10],
pub executor_txn_duration_sum_ms: AtomicU64,
pub infra_abort_counts: [AtomicU64; 6],
}
pub mod infra_abort_reason {
pub const WAL_CRC_ERROR: usize = 0;
pub const IO_ERROR: usize = 1;
pub const OOM: usize = 2;
pub const DISK_FULL: usize = 3;
pub const CORRUPTION_DETECTED: usize = 4;
pub const PASSIVE_PARTICIPANT_TIMEOUT: usize = 5;
pub const LABELS: &[&str] = &[
"wal_crc_error",
"io_error",
"oom",
"disk_full",
"corruption_detected",
"passive_participant_timeout",
];
}
impl SchedulerMetrics {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn record_dispatch(&self) {
self.dispatch_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_blocked(&self) {
self.blocked_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_lock_wait_ms(&self, ms: u64) {
self.lock_wait_ms_total.fetch_add(ms, Ordering::Relaxed);
}
pub fn record_completed(&self) {
self.completed_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_executor_error(&self) {
self.executor_error_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_infra_abort(&self, reason: usize) {
if let Some(counter) = self.infra_abort_counts.get(reason) {
counter.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_executor_txn_duration_ms(&self, ms: u64) {
self.executor_txn_duration_sum_ms
.fetch_add(ms, Ordering::Relaxed);
let bucket_idx = EXECUTOR_TXN_DURATION_BUCKETS
.iter()
.position(|&b| ms <= b)
.unwrap_or(EXECUTOR_TXN_DURATION_BUCKETS.len());
self.executor_txn_duration_buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
}
pub fn update_last_applied_epoch(&self, epoch: u64) {
let mut current = self.last_applied_epoch.load(Ordering::Relaxed);
loop {
if epoch <= current {
break;
}
match self.last_applied_epoch.compare_exchange_weak(
current,
epoch,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn render_prometheus(&self, vshard_id: u32) -> String {
use std::fmt::Write as _;
let label = format!("vshard=\"{vshard_id}\"");
let mut out = format!(
"# HELP nodedb_calvin_scheduler_dispatch_total Total txns dispatched to executor\n\
# TYPE nodedb_calvin_scheduler_dispatch_total counter\n\
nodedb_calvin_scheduler_dispatch_total{{{label}}} {}\n\
# HELP nodedb_calvin_scheduler_blocked_total Total txns blocked on lock acquisition\n\
# TYPE nodedb_calvin_scheduler_blocked_total counter\n\
nodedb_calvin_scheduler_blocked_total{{{label}}} {}\n\
# HELP nodedb_calvin_scheduler_lock_wait_ms_total Sum of lock-wait ms across all txns\n\
# TYPE nodedb_calvin_scheduler_lock_wait_ms_total counter\n\
nodedb_calvin_scheduler_lock_wait_ms_total{{{label}}} {}\n\
# HELP nodedb_calvin_scheduler_completed_total Total executor responses received\n\
# TYPE nodedb_calvin_scheduler_completed_total counter\n\
nodedb_calvin_scheduler_completed_total{{{label}}} {}\n\
# HELP nodedb_calvin_scheduler_executor_errors_total Executor infrastructure errors\n\
# TYPE nodedb_calvin_scheduler_executor_errors_total counter\n\
nodedb_calvin_scheduler_executor_errors_total{{{label}}} {}\n\
# HELP nodedb_calvin_scheduler_last_applied_epoch Last applied epoch number\n\
# TYPE nodedb_calvin_scheduler_last_applied_epoch gauge\n\
nodedb_calvin_scheduler_last_applied_epoch{{{label}}} {}\n",
self.dispatch_count.load(Ordering::Relaxed),
self.blocked_count.load(Ordering::Relaxed),
self.lock_wait_ms_total.load(Ordering::Relaxed),
self.completed_count.load(Ordering::Relaxed),
self.executor_error_count.load(Ordering::Relaxed),
self.last_applied_epoch.load(Ordering::Relaxed),
);
let _ = writeln!(
out,
"# HELP nodedb_calvin_executor_txn_duration_ms \
End-to-end executor transaction duration in milliseconds."
);
let _ = writeln!(
out,
"# TYPE nodedb_calvin_executor_txn_duration_ms histogram"
);
let mut cumulative: u64 = 0;
for (i, &boundary) in EXECUTOR_TXN_DURATION_BUCKETS.iter().enumerate() {
cumulative += self.executor_txn_duration_buckets[i].load(Ordering::Relaxed);
let _ = writeln!(
out,
"nodedb_calvin_executor_txn_duration_ms_bucket{{{label},le=\"{boundary}\"}} \
{cumulative}"
);
}
cumulative += self.executor_txn_duration_buckets[EXECUTOR_TXN_DURATION_BUCKETS.len()]
.load(Ordering::Relaxed);
let _ = writeln!(
out,
"nodedb_calvin_executor_txn_duration_ms_bucket{{{label},le=\"+Inf\"}} {cumulative}"
);
let _ = writeln!(
out,
"nodedb_calvin_executor_txn_duration_ms_sum{{{label}}} {}",
self.executor_txn_duration_sum_ms.load(Ordering::Relaxed)
);
let _ = writeln!(
out,
"nodedb_calvin_executor_txn_duration_ms_count{{{label}}} {cumulative}"
);
let _ = writeln!(
out,
"# HELP nodedb_calvin_infra_abort_total \
Total infrastructure-level aborts during Calvin execution."
);
let _ = writeln!(out, "# TYPE nodedb_calvin_infra_abort_total counter");
for (i, &reason_label) in infra_abort_reason::LABELS.iter().enumerate() {
let _ = writeln!(
out,
"nodedb_calvin_infra_abort_total{{{label},reason=\"{reason_label}\"}} {}",
self.infra_abort_counts[i].load(Ordering::Relaxed)
);
}
out
}
}
impl Default for SchedulerMetrics {
fn default() -> Self {
Self {
dispatch_count: AtomicU64::new(0),
blocked_count: AtomicU64::new(0),
lock_wait_ms_total: AtomicU64::new(0),
completed_count: AtomicU64::new(0),
executor_error_count: AtomicU64::new(0),
last_applied_epoch: AtomicU64::new(0),
executor_txn_duration_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
executor_txn_duration_sum_ms: AtomicU64::new(0),
infra_abort_counts: std::array::from_fn(|_| AtomicU64::new(0)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dispatch_count_increments() {
let m = SchedulerMetrics::new();
assert_eq!(m.dispatch_count.load(Ordering::Relaxed), 0);
m.record_dispatch();
m.record_dispatch();
assert_eq!(m.dispatch_count.load(Ordering::Relaxed), 2);
}
#[test]
fn lock_wait_ms_records_correctly() {
let m = SchedulerMetrics::new();
m.record_lock_wait_ms(10);
m.record_lock_wait_ms(25);
assert_eq!(m.lock_wait_ms_total.load(Ordering::Relaxed), 35);
}
}