use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounterVec, IntGaugeVec, Opts};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::runtime::Handle;
use tokio_util::sync::CancellationToken;
use super::prometheus_names::{frontend_perf, name_prefix, tokio_perf as names};
use crate::MetricsRegistry;
fn tokio_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::TOKIO, suffix)
}
pub static TOKIO_GLOBAL_QUEUE_DEPTH: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::GLOBAL_QUEUE_DEPTH),
"Number of tasks in the runtime global queue",
)
.expect("tokio global_queue_depth gauge")
});
pub static TOKIO_BUDGET_FORCED_YIELD_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
tokio_metric_name(names::BUDGET_FORCED_YIELD_TOTAL),
"Number of times tasks were forced to yield after exhausting budget",
)
.expect("tokio budget_forced_yield_total counter")
});
pub static TOKIO_BLOCKING_THREADS: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::BLOCKING_THREADS),
"Number of blocking threads",
)
.expect("tokio blocking_threads gauge")
});
pub static TOKIO_BLOCKING_IDLE_THREADS: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::BLOCKING_IDLE_THREADS),
"Number of idle blocking threads",
)
.expect("tokio blocking_idle_threads gauge")
});
pub static TOKIO_BLOCKING_QUEUE_DEPTH: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::BLOCKING_QUEUE_DEPTH),
"Number of tasks in the blocking thread pool queue",
)
.expect("tokio blocking_queue_depth gauge")
});
pub static TOKIO_ALIVE_TASKS: Lazy<Gauge> = Lazy::new(|| {
Gauge::new(
tokio_metric_name(names::ALIVE_TASKS),
"Number of alive tasks in the runtime",
)
.expect("tokio alive_tasks gauge")
});
pub static TOKIO_WORKER_MEAN_POLL_TIME_NS: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
tokio_metric_name(names::WORKER_MEAN_POLL_TIME_NS),
"Worker mean task poll time (nanoseconds)",
),
&["worker"],
)
.expect("tokio worker_mean_poll_time_ns gauge vec")
});
pub static TOKIO_WORKER_BUSY_RATIO_VEC: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
tokio_metric_name(names::WORKER_BUSY_RATIO),
"Worker busy ratio (0-1) as integer mill ratio; >950 = saturated",
),
&["worker"],
)
.expect("tokio worker_busy_ratio vec")
});
pub static TOKIO_WORKER_PARK_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
tokio_metric_name(names::WORKER_PARK_COUNT_TOTAL),
"Total number of times worker has parked",
),
&["worker"],
)
.expect("tokio worker_park_count_total")
});
pub static TOKIO_WORKER_LOCAL_QUEUE_DEPTH: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
tokio_metric_name(names::WORKER_LOCAL_QUEUE_DEPTH),
"Number of tasks in worker local queue",
),
&["worker"],
)
.expect("tokio worker_local_queue_depth")
});
pub static TOKIO_WORKER_STEAL_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
tokio_metric_name(names::WORKER_STEAL_COUNT_TOTAL),
"Total number of tasks stolen by worker",
),
&["worker"],
)
.expect("tokio worker_steal_count_total")
});
pub static TOKIO_WORKER_OVERFLOW_COUNT_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
tokio_metric_name(names::WORKER_OVERFLOW_COUNT_TOTAL),
"Total number of times worker local queue overflowed",
),
&["worker"],
)
.expect("tokio worker_overflow_count_total")
});
pub static EVENT_LOOP_DELAY_SECONDS: Lazy<Histogram> = Lazy::new(|| {
Histogram::with_opts(
HistogramOpts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_perf::EVENT_LOOP_DELAY_SECONDS
),
"Event loop delay canary: drift from 10ms sleep (seconds)",
)
.buckets(vec![
0.0, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
]),
)
.expect("event_loop_delay_seconds histogram")
});
pub static EVENT_LOOP_STALL_TOTAL: Lazy<Counter> = Lazy::new(|| {
Counter::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_perf::EVENT_LOOP_STALL_TOTAL
),
"Number of event loop stalls (delay > 5ms)",
)
.expect("event_loop_stall_total counter")
});
static REGISTERED: OnceCell<()> = OnceCell::new();
static PROMETHEUS_REGISTERED: OnceCell<()> = OnceCell::new();
pub fn ensure_tokio_perf_metrics_registered(registry: &MetricsRegistry) {
let _ = REGISTERED.get_or_init(|| {
registry
.add_metric(Box::new(TOKIO_GLOBAL_QUEUE_DEPTH.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BUDGET_FORCED_YIELD_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BLOCKING_THREADS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BLOCKING_IDLE_THREADS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_BLOCKING_QUEUE_DEPTH.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_ALIVE_TASKS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_MEAN_POLL_TIME_NS.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_BUSY_RATIO_VEC.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_PARK_COUNT_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_LOCAL_QUEUE_DEPTH.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_STEAL_COUNT_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(TOKIO_WORKER_OVERFLOW_COUNT_TOTAL.clone()))
.ok();
registry
.add_metric(Box::new(EVENT_LOOP_DELAY_SECONDS.clone()))
.ok();
registry
.add_metric(Box::new(EVENT_LOOP_STALL_TOTAL.clone()))
.ok();
});
}
pub fn ensure_tokio_perf_metrics_registered_prometheus(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
if PROMETHEUS_REGISTERED.get().is_some() {
return Ok(());
}
registry.register(Box::new(TOKIO_GLOBAL_QUEUE_DEPTH.clone()))?;
registry.register(Box::new(TOKIO_BUDGET_FORCED_YIELD_TOTAL.clone()))?;
registry.register(Box::new(TOKIO_BLOCKING_THREADS.clone()))?;
registry.register(Box::new(TOKIO_BLOCKING_IDLE_THREADS.clone()))?;
registry.register(Box::new(TOKIO_BLOCKING_QUEUE_DEPTH.clone()))?;
registry.register(Box::new(TOKIO_ALIVE_TASKS.clone()))?;
registry.register(Box::new(TOKIO_WORKER_MEAN_POLL_TIME_NS.clone()))?;
registry.register(Box::new(TOKIO_WORKER_BUSY_RATIO_VEC.clone()))?;
registry.register(Box::new(TOKIO_WORKER_PARK_COUNT_TOTAL.clone()))?;
registry.register(Box::new(TOKIO_WORKER_LOCAL_QUEUE_DEPTH.clone()))?;
registry.register(Box::new(TOKIO_WORKER_STEAL_COUNT_TOTAL.clone()))?;
registry.register(Box::new(TOKIO_WORKER_OVERFLOW_COUNT_TOTAL.clone()))?;
registry.register(Box::new(EVENT_LOOP_DELAY_SECONDS.clone()))?;
registry.register(Box::new(EVENT_LOOP_STALL_TOTAL.clone()))?;
let _ = PROMETHEUS_REGISTERED.set(());
Ok(())
}
pub async fn tokio_metrics_and_canary_loop(cancel: CancellationToken) {
let canary_interval = Duration::from_millis(10);
let stall_threshold = Duration::from_millis(5);
let collect_interval = Duration::from_secs(1);
let mut next_collect = Instant::now() + collect_interval;
let mut prev_counters = PrevWorkerCounters::new();
loop {
let start = Instant::now();
tokio::select! {
_ = tokio::time::sleep(canary_interval) => {}
_ = cancel.cancelled() => {
tracing::debug!("tokio metrics and canary loop shutting down");
return;
}
}
let delay = start.elapsed().saturating_sub(canary_interval);
EVENT_LOOP_DELAY_SECONDS.observe(delay.as_secs_f64());
if delay > stall_threshold {
EVENT_LOOP_STALL_TOTAL.inc();
}
if Instant::now() >= next_collect {
next_collect = Instant::now() + collect_interval;
sample_tokio_metrics(&mut prev_counters);
}
}
}
static PREV_BUDGET_FORCED_YIELD: AtomicU64 = AtomicU64::new(0);
struct PrevWorkerCounters {
park: Vec<u64>,
steal: Vec<u64>,
overflow: Vec<u64>,
}
impl PrevWorkerCounters {
fn new() -> Self {
Self {
park: Vec::new(),
steal: Vec::new(),
overflow: Vec::new(),
}
}
fn ensure_capacity(&mut self, num_workers: usize) {
if self.park.len() < num_workers {
self.park.resize(num_workers, 0);
self.steal.resize(num_workers, 0);
self.overflow.resize(num_workers, 0);
}
}
}
fn sample_tokio_metrics(prev: &mut PrevWorkerCounters) {
let metrics = Handle::current().metrics();
TOKIO_GLOBAL_QUEUE_DEPTH.set(metrics.global_queue_depth() as f64);
let budget = metrics.budget_forced_yield_count();
let prev_budget = PREV_BUDGET_FORCED_YIELD.swap(budget, Ordering::Relaxed);
TOKIO_BUDGET_FORCED_YIELD_TOTAL.inc_by((budget.saturating_sub(prev_budget)) as f64);
TOKIO_BLOCKING_THREADS.set(metrics.num_blocking_threads() as f64);
TOKIO_BLOCKING_IDLE_THREADS.set(metrics.num_idle_blocking_threads() as f64);
TOKIO_BLOCKING_QUEUE_DEPTH.set(metrics.blocking_queue_depth() as f64);
TOKIO_ALIVE_TASKS.set(metrics.num_alive_tasks() as f64);
let num_workers = metrics.num_workers();
prev.ensure_capacity(num_workers);
for w in 0..num_workers {
let worker_label = w.to_string();
let mean_poll = metrics.worker_mean_poll_time(w);
TOKIO_WORKER_MEAN_POLL_TIME_NS
.with_label_values(&[&worker_label])
.set(mean_poll.as_nanos() as i64);
TOKIO_WORKER_LOCAL_QUEUE_DEPTH
.with_label_values(&[&worker_label])
.set(metrics.worker_local_queue_depth(w) as i64);
let park = metrics.worker_park_count(w);
TOKIO_WORKER_PARK_COUNT_TOTAL
.with_label_values(&[&worker_label])
.inc_by(park.saturating_sub(prev.park[w]));
prev.park[w] = park;
let steal = metrics.worker_steal_count(w);
TOKIO_WORKER_STEAL_COUNT_TOTAL
.with_label_values(&[&worker_label])
.inc_by(steal.saturating_sub(prev.steal[w]));
prev.steal[w] = steal;
let overflow = metrics.worker_overflow_count(w);
TOKIO_WORKER_OVERFLOW_COUNT_TOTAL
.with_label_values(&[&worker_label])
.inc_by(overflow.saturating_sub(prev.overflow[w]));
prev.overflow[w] = overflow;
let busy_proxy = (mean_poll.as_secs_f64() / 0.001).min(1.0); TOKIO_WORKER_BUSY_RATIO_VEC
.with_label_values(&[&worker_label])
.set((busy_proxy * 1000.0) as i64);
}
}