kubert_prometheus_tokio/
lib.rs1#![deny(rust_2018_idioms, missing_docs, warnings)]
4#![forbid(unsafe_code)]
5#![cfg_attr(docsrs, feature(doc_cfg))]
6
7#[cfg(all(feature = "rt", tokio_unstable))]
8pub use self::rt::Runtime;
9
10#[cfg(all(feature = "rt", not(tokio_unstable)))]
11compile_error!("RUSTFLAGS='--cfg tokio_unstable' must be set to use `tokio-metrics/rt`");
12
13#[cfg(all(feature = "rt", tokio_unstable))]
14mod rt {
15 use prometheus_client::{
16 metrics::{counter::Counter, gauge::Gauge},
17 registry::{Registry, Unit},
18 };
19 use tokio::time;
20 use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
21
22 #[derive(Debug)]
30 pub struct Runtime {
31 runtime: tokio::runtime::Handle,
32 metrics: Metrics,
33 }
34
35 #[derive(Debug, Default)]
36 struct Metrics {
37 workers: Gauge,
38 park: Counter,
39 noop: Counter,
40 steal: Counter,
41 steal_operations: Counter,
42 remote_schedule: Counter,
43 local_schedule: Counter,
44 overflow: Counter,
45 polls: Counter,
46 busy: Counter<f64>,
47 injection_queue_depth: Gauge,
48 local_queue_depth: Gauge,
49 budget_forced_yield: Counter,
50 io_driver_ready: Counter,
51 }
53
54 impl Runtime {
55 pub fn register(reg: &mut Registry, runtime: tokio::runtime::Handle) -> Self {
58 let metrics = Metrics::default();
59
60 reg.register(
61 "workers",
62 "The number of worker threads used by the runtime",
63 metrics.workers.clone(),
64 );
65
66 reg.register(
67 "park",
68 "Total number of times worker threads parked",
69 metrics.park.clone(),
70 );
71 reg.register(
72 "noop",
73 "Number of times workers unparked but found no new work",
74 metrics.noop.clone(),
75 );
76 reg.register(
77 "steal",
78 "Number of tasks stolen by workers from others",
79 metrics.steal.clone(),
80 );
81 reg.register(
82 "steal_operations",
83 "Number of times workers stole tasks from other",
84 metrics.steal_operations.clone(),
85 );
86
87 reg.register(
88 "remote_schedule",
89 "Total number of remote schedule operations",
90 metrics.remote_schedule.clone(),
91 );
92 reg.register(
93 "local_schedule",
94 "Total number of local schedule operations",
95 metrics.local_schedule.clone(),
96 );
97
98 reg.register(
99 "overflow",
100 "Total number of overflow operations",
101 metrics.overflow.clone(),
102 );
103 reg.register(
104 "polls",
105 "The number of tasks that have been polled across all worker threads",
106 metrics.polls.clone(),
107 );
108 reg.register_with_unit(
109 "busy",
110 "Total duration of time when worker threads were busy processing tasks",
111 Unit::Seconds,
112 metrics.busy.clone(),
113 );
114
115 reg.register(
116 "injection_queue_depth",
117 "The number of tasks currently scheduled in the runtime's injection queue",
118 metrics.injection_queue_depth.clone(),
119 );
120 reg.register(
121 "local_queue_depth",
122 "The total number of tasks currently scheduled in workers' local queues",
123 metrics.local_queue_depth.clone(),
124 );
125
126 reg.register(
127 "budget_forced_yield",
128 "Number of times a worker thread was forced to yield due to budget exhaustion",
129 metrics.budget_forced_yield.clone(),
130 );
131 reg.register(
132 "io_driver_ready",
133 "Number of times the IO driver was woken up",
134 metrics.io_driver_ready.clone(),
135 );
136
137 Self { runtime, metrics }
138 }
139
140 pub async fn updated(&self, interval: &mut time::Interval) -> ! {
142 let mut probes = RuntimeMonitor::new(&self.runtime).intervals();
143 loop {
144 interval.tick().await;
145 self.metrics.probe(&mut probes);
146 }
147 }
148 }
149
150 impl Metrics {
151 #[tracing::instrument(skip_all, ret, level = tracing::Level::TRACE)]
152 fn probe(&self, probes: &mut RuntimeIntervals) {
153 let probe = probes.next().expect("runtime metrics stream must not end");
154
155 self.park.inc_by(probe.total_park_count);
158 self.noop.inc_by(probe.total_noop_count);
159 self.steal.inc_by(probe.total_steal_count);
160 self.steal_operations.inc_by(probe.total_steal_operations);
161 self.remote_schedule.inc_by(probe.num_remote_schedules);
162 self.local_schedule.inc_by(probe.total_local_schedule_count);
163 self.overflow.inc_by(probe.total_overflow_count);
164 self.polls.inc_by(probe.total_polls_count);
165 self.busy.inc_by(probe.total_busy_duration.as_secs_f64());
166 self.io_driver_ready.inc_by(probe.io_driver_ready_count);
167
168 self.workers.set(probe.workers_count as i64);
170 self.injection_queue_depth
171 .set(probe.total_local_queue_depth as i64);
172 self.local_queue_depth
173 .set(probe.total_local_queue_depth as i64);
174
175 if let Some(delta) = probe
177 .budget_forced_yield_count
178 .checked_sub(self.budget_forced_yield.get())
179 {
180 self.budget_forced_yield.inc_by(delta);
181 } else {
182 tracing::trace!("budget_forced_yield_count overflow");
183 }
184 }
185 }
186}