kubert_prometheus_tokio/
lib.rs

1//! A `prometheus-client` exporter for `tokio-metrics`.
2
3#![deny(rust_2018_idioms, missing_docs, warnings)]
4#![forbid(unsafe_code)]
5#![cfg_attr(docsrs, feature(doc_cfg))]
6
7#[cfg(all(feature = "rt", tokio_unstable))]
8pub use self::rt::Runtime;
9
10#[cfg(all(feature = "rt", not(tokio_unstable)))]
11compile_error!("RUSTFLAGS='--cfg tokio_unstable' must be set to use `tokio-metrics/rt`");
12
13#[cfg(all(feature = "rt", tokio_unstable))]
14mod rt {
15    use prometheus_client::{
16        metrics::{counter::Counter, gauge::Gauge},
17        registry::{Registry, Unit},
18    };
19    use tokio::time;
20    use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
21
22    /// Tokio runtime metrics.
23    ///
24    /// NOTE that this module requires unstable tokio functionality that must be
25    /// enabled via the `tokio_unstable` feature. When it is not enabled, no metrics
26    /// will be registered.
27    ///
28    /// `RUSTFLAGS="--cfg tokio_unstable"` must be set at build-time to use this feature.
29    #[derive(Debug)]
30    pub struct Runtime {
31        runtime: tokio::runtime::Handle,
32        metrics: Metrics,
33    }
34
35    #[derive(Debug, Default)]
36    struct Metrics {
37        workers: Gauge,
38        park: Counter,
39        noop: Counter,
40        steal: Counter,
41        steal_operations: Counter,
42        remote_schedule: Counter,
43        local_schedule: Counter,
44        overflow: Counter,
45        polls: Counter,
46        busy: Counter<f64>,
47        injection_queue_depth: Gauge,
48        local_queue_depth: Gauge,
49        budget_forced_yield: Counter,
50        io_driver_ready: Counter,
51        // TODO poll_count_histogram requires configuration
52    }
53
54    impl Runtime {
55        /// Registers Tokio runtime metrics with the given registry. Note that
56        /// metrics are NOT prefixed.
57        pub fn register(reg: &mut Registry, runtime: tokio::runtime::Handle) -> Self {
58            let metrics = Metrics::default();
59
60            reg.register(
61                "workers",
62                "The number of worker threads used by the runtime",
63                metrics.workers.clone(),
64            );
65
66            reg.register(
67                "park",
68                "Total number of times worker threads parked",
69                metrics.park.clone(),
70            );
71            reg.register(
72                "noop",
73                "Number of times workers unparked but found no new work",
74                metrics.noop.clone(),
75            );
76            reg.register(
77                "steal",
78                "Number of tasks stolen by workers from others",
79                metrics.steal.clone(),
80            );
81            reg.register(
82                "steal_operations",
83                "Number of times workers stole tasks from other",
84                metrics.steal_operations.clone(),
85            );
86
87            reg.register(
88                "remote_schedule",
89                "Total number of remote schedule operations",
90                metrics.remote_schedule.clone(),
91            );
92            reg.register(
93                "local_schedule",
94                "Total number of local schedule operations",
95                metrics.local_schedule.clone(),
96            );
97
98            reg.register(
99                "overflow",
100                "Total number of overflow operations",
101                metrics.overflow.clone(),
102            );
103            reg.register(
104                "polls",
105                "The number of tasks that have been polled across all worker threads",
106                metrics.polls.clone(),
107            );
108            reg.register_with_unit(
109                "busy",
110                "Total duration of time when worker threads were busy processing tasks",
111                Unit::Seconds,
112                metrics.busy.clone(),
113            );
114
115            reg.register(
116                "injection_queue_depth",
117                "The number of tasks currently scheduled in the runtime's injection queue",
118                metrics.injection_queue_depth.clone(),
119            );
120            reg.register(
121                "local_queue_depth",
122                "The total number of tasks currently scheduled in workers' local queues",
123                metrics.local_queue_depth.clone(),
124            );
125
126            reg.register(
127                "budget_forced_yield",
128                "Number of times a worker thread was forced to yield due to budget exhaustion",
129                metrics.budget_forced_yield.clone(),
130            );
131            reg.register(
132                "io_driver_ready",
133                "Number of times the IO driver was woken up",
134                metrics.io_driver_ready.clone(),
135            );
136
137            Self { runtime, metrics }
138        }
139
140        /// Drives metrics updates for a runtime according to a fixed interval.
141        pub async fn updated(&self, interval: &mut time::Interval) -> ! {
142            let mut probes = RuntimeMonitor::new(&self.runtime).intervals();
143            loop {
144                interval.tick().await;
145                self.metrics.probe(&mut probes);
146            }
147        }
148    }
149
150    impl Metrics {
151        #[tracing::instrument(skip_all, ret, level = tracing::Level::TRACE)]
152        fn probe(&self, probes: &mut RuntimeIntervals) {
153            let probe = probes.next().expect("runtime metrics stream must not end");
154
155            // Tokio-metrics tracks all of these values as rates so we have
156            // to turn them back into absolute counters:
157            self.park.inc_by(probe.total_park_count);
158            self.noop.inc_by(probe.total_noop_count);
159            self.steal.inc_by(probe.total_steal_count);
160            self.steal_operations.inc_by(probe.total_steal_operations);
161            self.remote_schedule.inc_by(probe.num_remote_schedules);
162            self.local_schedule.inc_by(probe.total_local_schedule_count);
163            self.overflow.inc_by(probe.total_overflow_count);
164            self.polls.inc_by(probe.total_polls_count);
165            self.busy.inc_by(probe.total_busy_duration.as_secs_f64());
166            self.io_driver_ready.inc_by(probe.io_driver_ready_count);
167
168            // Instantaneous gauges:
169            self.workers.set(probe.workers_count as i64);
170            self.injection_queue_depth
171                .set(probe.total_local_queue_depth as i64);
172            self.local_queue_depth
173                .set(probe.total_local_queue_depth as i64);
174
175            // Absolute counters need to be incremented by the delta:
176            if let Some(delta) = probe
177                .budget_forced_yield_count
178                .checked_sub(self.budget_forced_yield.get())
179            {
180                self.budget_forced_yield.inc_by(delta);
181            } else {
182                tracing::trace!("budget_forced_yield_count overflow");
183            }
184        }
185    }
186}