Skip to main content

dynamo_runtime/metrics/
work_handler_pool.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Worker-pool saturation metrics for the shared TCP server (backend side).
5//!
6//! These metrics expose queue buildup between `work_tx.send()` and dispatcher
7//! pickup, and permit starvation in the bounded worker pool.
8
9use once_cell::sync::{Lazy, OnceCell};
10use prometheus::{Histogram, HistogramOpts, IntCounter, IntGauge};
11
12use super::prometheus_names::{name_prefix, work_handler};
13use crate::MetricsRegistry;
14
15fn work_handler_metric_name(suffix: &str) -> String {
16    format!("{}_{}", name_prefix::WORK_HANDLER, suffix)
17}
18
19/// Current items sitting in the bounded mpsc work queue awaiting dispatcher
20/// pickup. Incremented on successful `work_tx.send()` and decremented immediately
21/// after `work_rx.recv()`. Permit-acquire wait is NOT counted here — see
22/// `WORK_HANDLER_PERMIT_WAIT_SECONDS`.
23pub static WORK_HANDLER_QUEUE_DEPTH: Lazy<IntGauge> = Lazy::new(|| {
24    IntGauge::new(
25        work_handler_metric_name(work_handler::QUEUE_DEPTH),
26        "Current items in the bounded work queue awaiting dispatcher pickup",
27    )
28    .expect("work_handler_queue_depth gauge")
29});
30
31/// Configured capacity of the bounded work queue. Static; set once at server init.
32pub static WORK_HANDLER_QUEUE_CAPACITY: Lazy<IntGauge> = Lazy::new(|| {
33    IntGauge::new(
34        work_handler_metric_name(work_handler::QUEUE_CAPACITY),
35        "Configured capacity of the bounded work queue",
36    )
37    .expect("work_handler_queue_capacity gauge")
38});
39
40/// Total times `work_tx.send().await` returned an error, which for tokio's
41/// bounded mpsc only happens when the receiver (dispatcher task) is gone — the
42/// channel applies backpressure on "full" rather than returning an error.
43pub static WORK_HANDLER_ENQUEUE_REJECTED_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
44    IntCounter::new(
45        work_handler_metric_name(work_handler::ENQUEUE_REJECTED_TOTAL),
46        "Times enqueuing work failed because the dispatcher channel was closed",
47    )
48    .expect("work_handler_enqueue_rejected_total counter")
49});
50
51/// Time spent waiting to acquire a worker-pool permit. Normal operation is
52/// sub-millisecond; saturation pushes p99 into seconds.
53pub static WORK_HANDLER_PERMIT_WAIT_SECONDS: Lazy<Histogram> = Lazy::new(|| {
54    Histogram::with_opts(
55        HistogramOpts::new(
56            work_handler_metric_name(work_handler::PERMIT_WAIT_SECONDS),
57            "Time spent waiting for a worker-pool permit (seconds)",
58        )
59        .buckets(vec![
60            0.0001, 0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0,
61        ]),
62    )
63    .expect("work_handler_permit_wait_seconds histogram")
64});
65
66/// Current number of active worker-pool tasks (permits in use).
67pub static WORK_HANDLER_POOL_ACTIVE_TASKS: Lazy<IntGauge> = Lazy::new(|| {
68    IntGauge::new(
69        work_handler_metric_name(work_handler::POOL_ACTIVE_TASKS),
70        "Current number of active worker-pool tasks (permits in use)",
71    )
72    .expect("work_handler_pool_active_tasks gauge")
73});
74
75/// Configured worker-pool capacity (total permits). Static; set once at server init.
76pub static WORK_HANDLER_POOL_CAPACITY: Lazy<IntGauge> = Lazy::new(|| {
77    IntGauge::new(
78        work_handler_metric_name(work_handler::POOL_CAPACITY),
79        "Configured worker-pool capacity (total permits)",
80    )
81    .expect("work_handler_pool_capacity gauge")
82});
83
84/// Guards idempotency for the `MetricsRegistry` registration path.
85static METRICS_REGISTERED: OnceCell<()> = OnceCell::new();
86
87/// Register worker-pool saturation metrics with the given registry. Idempotent.
88pub fn ensure_work_handler_pool_metrics_registered(registry: &MetricsRegistry) {
89    let _ = METRICS_REGISTERED.get_or_init(|| {
90        registry.add_metric_or_warn(
91            Box::new(WORK_HANDLER_QUEUE_DEPTH.clone()),
92            "work_handler_queue_depth",
93        );
94        registry.add_metric_or_warn(
95            Box::new(WORK_HANDLER_QUEUE_CAPACITY.clone()),
96            "work_handler_queue_capacity",
97        );
98        registry.add_metric_or_warn(
99            Box::new(WORK_HANDLER_ENQUEUE_REJECTED_TOTAL.clone()),
100            "work_handler_enqueue_rejected_total",
101        );
102        registry.add_metric_or_warn(
103            Box::new(WORK_HANDLER_PERMIT_WAIT_SECONDS.clone()),
104            "work_handler_permit_wait_seconds",
105        );
106        registry.add_metric_or_warn(
107            Box::new(WORK_HANDLER_POOL_ACTIVE_TASKS.clone()),
108            "work_handler_pool_active_tasks",
109        );
110        registry.add_metric_or_warn(
111            Box::new(WORK_HANDLER_POOL_CAPACITY.clone()),
112            "work_handler_pool_capacity",
113        );
114    });
115}