tokio_metrics/runtime/metrics_rs_integration.rs
1use std::{fmt, time::Duration};
2
3use tokio::runtime::Handle;
4
5use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor};
6use crate::metrics_rs::{metric_refs, DEFAULT_METRIC_SAMPLING_INTERVAL};
7
8/// A builder for the [`RuntimeMetricsReporter`] that wraps the RuntimeMonitor, periodically
9/// reporting RuntimeMetrics to any configured [metrics-rs] recorder.
10///
11/// ### Published Metrics
12///
13/// The published metrics are the fields of [RuntimeMetrics], but with the
14/// `tokio_` prefix added, for example, `tokio_workers_count`. If desired, you
15/// can use the [`with_metrics_transformer`] function to customize the metric names.
16///
17/// ### Usage
18///
19/// To upload metrics via [metrics-rs], you need to set up a reporter, which
20/// is actually what exports the metrics outside of the program. You must set
21/// up the reporter before you call [`describe_and_run`].
22///
23/// You can find exporters within the [metrics-rs] docs. One such reporter
24/// is the [metrics_exporter_prometheus] reporter, which makes metrics visible
25/// through Prometheus.
26///
27/// You can use it for example to export Prometheus metrics by listening on a local Unix socket
28/// called `prometheus.sock`, which you can access for debugging by
29/// `curl --unix-socket prometheus.sock localhost`, as follows:
30///
31/// ```
32/// use std::time::Duration;
33///
34/// #[tokio::main]
35/// async fn main() {
36/// metrics_exporter_prometheus::PrometheusBuilder::new()
37/// .with_http_uds_listener("prometheus.sock")
38/// .install()
39/// .unwrap();
40/// tokio::task::spawn(
41/// tokio_metrics::RuntimeMetricsReporterBuilder::default()
42/// // the default metric sampling interval is 30 seconds, which is
43/// // too long for quick tests, so have it be 1 second.
44/// .with_interval(std::time::Duration::from_secs(1))
45/// .describe_and_run(),
46/// );
47/// // Run some code
48/// tokio::task::spawn(async move {
49/// for _ in 0..1000 {
50/// tokio::time::sleep(Duration::from_millis(10)).await;
51/// }
52/// })
53/// .await
54/// .unwrap();
55/// }
56/// ```
57///
58/// [`describe_and_run`]: RuntimeMetricsReporterBuilder::describe_and_run
59/// [`with_metrics_transformer`]: RuntimeMetricsReporterBuilder::with_metrics_transformer
60/// [metrics-rs]: metrics
61/// [metrics_exporter_prometheus]: https://docs.rs/metrics_exporter_prometheus
62pub struct RuntimeMetricsReporterBuilder {
63 interval: Duration,
64 metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
65}
66
67impl fmt::Debug for RuntimeMetricsReporterBuilder {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_struct("RuntimeMetricsReporterBuilder")
70 .field("interval", &self.interval)
71 // skip metrics_transformer field
72 .finish()
73 }
74}
75
76impl Default for RuntimeMetricsReporterBuilder {
77 fn default() -> Self {
78 RuntimeMetricsReporterBuilder {
79 interval: DEFAULT_METRIC_SAMPLING_INTERVAL,
80 metrics_transformer: Box::new(metrics::Key::from_static_name),
81 }
82 }
83}
84
85impl RuntimeMetricsReporterBuilder {
86 /// Set the metric sampling interval, default: 30 seconds.
87 ///
88 /// Note that this is the interval on which metrics are *sampled* from
89 /// the Tokio runtime and then set on the [metrics-rs] reporter. Uploading the
90 /// metrics upstream is controlled by the reporter set up in the
91 /// application, and is normally controlled by a different period.
92 ///
93 /// For example, if metrics are exported via Prometheus, that
94 /// normally operates at a pull-based fashion, and the actual collection
95 /// period is controlled by the Prometheus server, which periodically polls the
96 /// application's Prometheus exporter to get the latest value of the metrics.
97 ///
98 /// [metrics-rs]: metrics
99 pub fn with_interval(mut self, interval: Duration) -> Self {
100 self.interval = interval;
101 self
102 }
103
104 /// Set a custom "metrics transformer", which is used during `build` to transform the metric
105 /// names into metric keys, for example to add dimensions. The string metric names used by this reporter
106 /// all start with `tokio_`. The default transformer is just [`metrics::Key::from_static_name`]
107 ///
108 /// For example, to attach a dimension named "application" with value "my_app", and to replace
109 /// `tokio_` with `my_app_`
110 /// ```
111 /// # use metrics::Key;
112 ///
113 /// #[tokio::main]
114 /// async fn main() {
115 /// metrics_exporter_prometheus::PrometheusBuilder::new()
116 /// .with_http_uds_listener("prometheus.sock")
117 /// .install()
118 /// .unwrap();
119 /// tokio::task::spawn(
120 /// tokio_metrics::RuntimeMetricsReporterBuilder::default().with_metrics_transformer(|name| {
121 /// let name = name.replacen("tokio_", "my_app_", 1);
122 /// Key::from_parts(name, &[("application", "my_app")])
123 /// })
124 /// .describe_and_run()
125 /// );
126 /// }
127 /// ```
128 pub fn with_metrics_transformer(
129 mut self,
130 transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static,
131 ) -> Self {
132 self.metrics_transformer = Box::new(transformer);
133 self
134 }
135
136 /// Build the [`RuntimeMetricsReporter`] for the current Tokio runtime. This function will capture
137 /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
138 /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`] with it.
139 ///
140 /// For example:
141 /// ```
142 /// # use std::sync::Arc;
143 ///
144 /// #[tokio::main]
145 /// async fn main() {
146 /// let builder = tokio_metrics::RuntimeMetricsReporterBuilder::default();
147 /// let recorder = Arc::new(metrics_util::debugging::DebuggingRecorder::new());
148 /// let metrics_reporter = metrics::with_local_recorder(&recorder, || builder.describe().build());
149 ///
150 /// // no need to wrap `run()`, since the metrics are already captured
151 /// tokio::task::spawn(metrics_reporter.run());
152 /// }
153 /// ```
154 ///
155 ///
156 /// [`Counter`]: metrics::Counter
157 /// [`Gauge`]: metrics::Counter
158 /// [`Histogram`]: metrics::Counter
159 /// [metrics-rs]: metrics
160 /// [`with_local_recorder`]: metrics::with_local_recorder
161 /// [`describe`]: Self::describe
162 #[must_use = "reporter does nothing unless run"]
163 pub fn build(self) -> RuntimeMetricsReporter {
164 self.build_with_monitor(RuntimeMonitor::new(&Handle::current()))
165 }
166
167 /// Build the [`RuntimeMetricsReporter`] with a specific [`RuntimeMonitor`]. This function will capture
168 /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
169 /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`]
170 /// with it.
171 ///
172 /// [`Counter`]: metrics::Counter
173 /// [`Gauge`]: metrics::Counter
174 /// [`Histogram`]: metrics::Counter
175 /// [metrics-rs]: metrics
176 /// [`with_local_recorder`]: metrics::with_local_recorder
177 /// [`describe`]: Self::describe
178 #[must_use = "reporter does nothing unless run"]
179 pub fn build_with_monitor(mut self, monitor: RuntimeMonitor) -> RuntimeMetricsReporter {
180 RuntimeMetricsReporter {
181 interval: self.interval,
182 intervals: monitor.intervals(),
183 emitter: RuntimeMetricRefs::capture(&mut self.metrics_transformer),
184 }
185 }
186
187 /// Call [`describe_counter`] etc. to describe the emitted metrics.
188 ///
189 /// Describing metrics makes the reporter attach descriptions and units to them,
190 /// which makes them easier to use. However, some reporters don't support
191 /// describing the same metric name more than once, so it is generally a good
192 /// idea to only call this function once per metric reporter.
193 ///
194 /// [`describe_counter`]: metrics::describe_counter
195 /// [metrics-rs]: metrics
196 pub fn describe(mut self) -> Self {
197 RuntimeMetricRefs::describe(&mut self.metrics_transformer);
198 self
199 }
200
201 /// Runs the reporter (within the returned future), [describing] the metrics beforehand.
202 ///
203 /// Describing metrics makes the reporter attach descriptions and units to them,
204 /// which makes them easier to use. However, some reporters don't support
205 /// describing the same metric name more than once. If you are emitting multiple
206 /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each
207 /// runtime metrics reporter.
208 ///
209 /// ### Working with a custom reporter
210 ///
211 /// If you want to set a local metrics reporter, you shouldn't be calling this method,
212 /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
213 /// call `run` (see the docs on [`build`]).
214 ///
215 /// [describing]: Self::describe
216 /// [`describe`]: Self::describe
217 /// [`build`]: Self::build.
218 /// [`run`]: RuntimeMetricsReporter::run
219 /// [`with_local_recorder`]: metrics::with_local_recorder
220 pub async fn describe_and_run(self) {
221 self.describe().build().run().await;
222 }
223
224 /// Runs the reporter (within the returned future), not describing the metrics beforehand.
225 ///
226 /// ### Working with a custom reporter
227 ///
228 /// If you want to set a local metrics reporter, you shouldn't be calling this method,
229 /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
230 /// call [`run`] (see the docs on [`build`]).
231 ///
232 /// [`build`]: Self::build
233 /// [`run`]: RuntimeMetricsReporter::run
234 /// [`with_local_recorder`]: metrics::with_local_recorder
235 pub async fn run_without_describing(self) {
236 self.build().run().await;
237 }
238}
239
240/// Collects metrics from a Tokio runtime and uploads them to [metrics_rs](metrics).
241pub struct RuntimeMetricsReporter {
242 interval: Duration,
243 intervals: RuntimeIntervals,
244 emitter: RuntimeMetricRefs,
245}
246
247metric_refs! {
248 [RuntimeMetricRefs] [elapsed] [RuntimeMetrics] [&tokio::runtime::RuntimeMetrics] {
249 stable {
250 /// The number of worker threads used by the runtime
251 workers_count: Gauge<Count> [],
252 /// The number of times worker threads parked
253 max_park_count: Gauge<Count> [],
254 /// The minimum number of times any worker thread parked
255 min_park_count: Gauge<Count> [],
256 /// The number of times worker threads parked
257 total_park_count: Gauge<Count> [],
258 /// The amount of time worker threads were busy
259 total_busy_duration: Counter<Microseconds> [],
260 /// The maximum amount of time a worker thread was busy
261 max_busy_duration: Counter<Microseconds> [],
262 /// The minimum amount of time a worker thread was busy
263 min_busy_duration: Counter<Microseconds> [],
264 /// The number of tasks currently scheduled in the runtime's global queue
265 global_queue_depth: Gauge<Count> [],
266 }
267 unstable {
268 /// The average duration of a single invocation of poll on a task
269 mean_poll_duration: Gauge<Microseconds> [],
270 /// The average duration of a single invocation of poll on a task on the worker with the lowest value
271 mean_poll_duration_worker_min: Gauge<Microseconds> [],
272 /// The average duration of a single invocation of poll on a task on the worker with the highest value
273 mean_poll_duration_worker_max: Gauge<Microseconds> [],
274 /// A histogram of task polls since the previous probe grouped by poll times
275 poll_time_histogram: PollTimeHistogram<Microseconds> [],
276 /// The number of times worker threads unparked but performed no work before parking again
277 total_noop_count: Counter<Count> [],
278 /// The maximum number of times any worker thread unparked but performed no work before parking again
279 max_noop_count: Counter<Count> [],
280 /// The minimum number of times any worker thread unparked but performed no work before parking again
281 min_noop_count: Counter<Count> [],
282 /// The number of tasks worker threads stole from another worker thread
283 total_steal_count: Counter<Count> [],
284 /// The maximum number of tasks any worker thread stole from another worker thread.
285 max_steal_count: Counter<Count> [],
286 /// The minimum number of tasks any worker thread stole from another worker thread
287 min_steal_count: Counter<Count> [],
288 /// The number of times worker threads stole tasks from another worker thread
289 total_steal_operations: Counter<Count> [],
290 /// The maximum number of times any worker thread stole tasks from another worker thread
291 max_steal_operations: Counter<Count> [],
292 /// The minimum number of times any worker thread stole tasks from another worker thread
293 min_steal_operations: Counter<Count> [],
294 /// The number of tasks scheduled from **outside** of the runtime
295 num_remote_schedules: Counter<Count> [],
296 /// The number of tasks scheduled from worker threads
297 total_local_schedule_count: Counter<Count> [],
298 /// The maximum number of tasks scheduled from any one worker thread
299 max_local_schedule_count: Counter<Count> [],
300 /// The minimum number of tasks scheduled from any one worker thread
301 min_local_schedule_count: Counter<Count> [],
302 /// The number of times worker threads saturated their local queues
303 total_overflow_count: Counter<Count> [],
304 /// The maximum number of times any one worker saturated its local queue
305 max_overflow_count: Counter<Count> [],
306 /// The minimum number of times any one worker saturated its local queue
307 min_overflow_count: Counter<Count> [],
308 /// The number of tasks that have been polled across all worker threads
309 total_polls_count: Counter<Count> [],
310 /// The maximum number of tasks that have been polled in any worker thread
311 max_polls_count: Counter<Count> [],
312 /// The minimum number of tasks that have been polled in any worker thread
313 min_polls_count: Counter<Count> [],
314 /// The total number of tasks currently scheduled in workers' local queues
315 total_local_queue_depth: Gauge<Count> [],
316 /// The maximum number of tasks currently scheduled any worker's local queue
317 max_local_queue_depth: Gauge<Count> [],
318 /// The minimum number of tasks currently scheduled any worker's local queue
319 min_local_queue_depth: Gauge<Count> [],
320 /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
321 blocking_queue_depth: Gauge<Count> [],
322 /// The current number of alive tasks in the runtime.
323 live_tasks_count: Gauge<Count> [],
324 /// The number of additional threads spawned by the runtime.
325 blocking_threads_count: Gauge<Count> [],
326 /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
327 idle_blocking_threads_count: Gauge<Count> [],
328 /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets
329 budget_forced_yield_count: Counter<Count> [],
330 /// Returns the number of ready events processed by the runtime’s I/O driver
331 io_driver_ready_count: Counter<Count> [],
332 }
333 }
334}
335
336impl fmt::Debug for RuntimeMetricsReporter {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 f.debug_struct("RuntimeMetricsReporter")
339 .field("interval", &self.interval)
340 // skip intervals field
341 .finish()
342 }
343}
344
345impl RuntimeMetricsReporter {
346 /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter.
347 pub fn run_once(&mut self) {
348 let metrics = self
349 .intervals
350 .next()
351 .expect("RuntimeIntervals::next never returns None");
352 self.emitter.emit(metrics, &self.intervals.runtime);
353 }
354
355 /// Collect and publish metrics periodically to the configured [metrics_rs](metrics) reporter.
356 ///
357 /// You probably want to run this within its own task (using [`tokio::task::spawn`])
358 pub async fn run(mut self) {
359 loop {
360 self.run_once();
361 tokio::time::sleep(self.interval).await;
362 }
363 }
364}