tokio_metrics/runtime/
metrics_rs_integration.rs

1use std::{fmt, time::Duration};
2
3use tokio::runtime::Handle;
4
5use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor};
6use crate::metrics_rs::{metric_refs, DEFAULT_METRIC_SAMPLING_INTERVAL};
7
8/// A builder for the [`RuntimeMetricsReporter`] that wraps the RuntimeMonitor, periodically
9/// reporting RuntimeMetrics to any configured [metrics-rs] recorder.
10///
11/// ### Published Metrics
12///
13/// The published metrics are the fields of [RuntimeMetrics], but with the
14/// `tokio_` prefix added, for example, `tokio_workers_count`. If desired, you
15/// can use the [`with_metrics_transformer`] function to customize the metric names.
16///
17/// ### Usage
18///
19/// To upload metrics via [metrics-rs], you need to set up a reporter, which
20/// is actually what exports the metrics outside of the program. You must set
21/// up the reporter before you call [`describe_and_run`].
22///
23/// You can find exporters within the [metrics-rs] docs. One such reporter
24/// is the [metrics_exporter_prometheus] reporter, which makes metrics visible
25/// through Prometheus.
26///
27/// You can use it for example to export Prometheus metrics by listening on a local Unix socket
28/// called `prometheus.sock`, which you can access for debugging by
29/// `curl --unix-socket prometheus.sock localhost`, as follows:
30///
31/// ```
32/// use std::time::Duration;
33///
34/// #[tokio::main]
35/// async fn main() {
36///     metrics_exporter_prometheus::PrometheusBuilder::new()
37///         .with_http_uds_listener("prometheus.sock")
38///         .install()
39///         .unwrap();
40///     tokio::task::spawn(
41///         tokio_metrics::RuntimeMetricsReporterBuilder::default()
42///             // the default metric sampling interval is 30 seconds, which is
43///             // too long for quick tests, so have it be 1 second.
44///             .with_interval(std::time::Duration::from_secs(1))
45///             .describe_and_run(),
46///     );
47///     // Run some code
48///     tokio::task::spawn(async move {
49///         for _ in 0..1000 {
50///             tokio::time::sleep(Duration::from_millis(10)).await;
51///         }
52///     })
53///     .await
54///     .unwrap();
55/// }
56/// ```
57///
58/// [`describe_and_run`]: RuntimeMetricsReporterBuilder::describe_and_run
59/// [`with_metrics_transformer`]: RuntimeMetricsReporterBuilder::with_metrics_transformer
60/// [metrics-rs]: metrics
61/// [metrics_exporter_prometheus]: https://docs.rs/metrics_exporter_prometheus
62pub struct RuntimeMetricsReporterBuilder {
63    interval: Duration,
64    metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
65}
66
67impl fmt::Debug for RuntimeMetricsReporterBuilder {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct("RuntimeMetricsReporterBuilder")
70            .field("interval", &self.interval)
71            // skip metrics_transformer field
72            .finish()
73    }
74}
75
76impl Default for RuntimeMetricsReporterBuilder {
77    fn default() -> Self {
78        RuntimeMetricsReporterBuilder {
79            interval: DEFAULT_METRIC_SAMPLING_INTERVAL,
80            metrics_transformer: Box::new(metrics::Key::from_static_name),
81        }
82    }
83}
84
85impl RuntimeMetricsReporterBuilder {
86    /// Set the metric sampling interval, default: 30 seconds.
87    ///
88    /// Note that this is the interval on which metrics are *sampled* from
89    /// the Tokio runtime and then set on the [metrics-rs] reporter. Uploading the
90    /// metrics upstream is controlled by the reporter set up in the
91    /// application, and is normally controlled by a different period.
92    ///
93    /// For example, if metrics are exported via Prometheus, that
94    /// normally operates at a pull-based fashion, and the actual collection
95    /// period is controlled by the Prometheus server, which periodically polls the
96    /// application's Prometheus exporter to get the latest value of the metrics.
97    ///
98    /// [metrics-rs]: metrics
99    pub fn with_interval(mut self, interval: Duration) -> Self {
100        self.interval = interval;
101        self
102    }
103
104    /// Set a custom "metrics transformer", which is used during `build` to transform the metric
105    /// names into metric keys, for example to add dimensions. The string metric names used by this reporter
106    /// all start with `tokio_`. The default transformer is just [`metrics::Key::from_static_name`]
107    ///
108    /// For example, to attach a dimension named "application" with value "my_app", and to replace
109    /// `tokio_` with `my_app_`
110    /// ```
111    /// # use metrics::Key;
112    ///
113    /// #[tokio::main]
114    /// async fn main() {
115    ///     metrics_exporter_prometheus::PrometheusBuilder::new()
116    ///         .with_http_uds_listener("prometheus.sock")
117    ///         .install()
118    ///         .unwrap();
119    ///     tokio::task::spawn(
120    ///         tokio_metrics::RuntimeMetricsReporterBuilder::default().with_metrics_transformer(|name| {
121    ///             let name = name.replacen("tokio_", "my_app_", 1);
122    ///             Key::from_parts(name, &[("application", "my_app")])
123    ///         })
124    ///         .describe_and_run()
125    ///     );
126    /// }
127    /// ```
128    pub fn with_metrics_transformer(
129        mut self,
130        transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static,
131    ) -> Self {
132        self.metrics_transformer = Box::new(transformer);
133        self
134    }
135
136    /// Build the [`RuntimeMetricsReporter`] for the current Tokio runtime. This function will capture
137    /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
138    /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`] with it.
139    ///
140    /// For example:
141    /// ```
142    /// # use std::sync::Arc;
143    ///
144    /// #[tokio::main]
145    /// async fn main() {
146    ///     let builder = tokio_metrics::RuntimeMetricsReporterBuilder::default();
147    ///     let recorder = Arc::new(metrics_util::debugging::DebuggingRecorder::new());
148    ///     let metrics_reporter = metrics::with_local_recorder(&recorder, || builder.describe().build());
149    ///
150    ///     // no need to wrap `run()`, since the metrics are already captured
151    ///     tokio::task::spawn(metrics_reporter.run());
152    /// }
153    /// ```
154    ///
155    ///
156    /// [`Counter`]: metrics::Counter
157    /// [`Gauge`]: metrics::Counter
158    /// [`Histogram`]: metrics::Counter
159    /// [metrics-rs]: metrics
160    /// [`with_local_recorder`]: metrics::with_local_recorder
161    /// [`describe`]: Self::describe
162    #[must_use = "reporter does nothing unless run"]
163    pub fn build(self) -> RuntimeMetricsReporter {
164        self.build_with_monitor(RuntimeMonitor::new(&Handle::current()))
165    }
166
167    /// Build the [`RuntimeMetricsReporter`] with a specific [`RuntimeMonitor`]. This function will capture
168    /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
169    /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`]
170    /// with it.
171    ///
172    /// [`Counter`]: metrics::Counter
173    /// [`Gauge`]: metrics::Counter
174    /// [`Histogram`]: metrics::Counter
175    /// [metrics-rs]: metrics
176    /// [`with_local_recorder`]: metrics::with_local_recorder
177    /// [`describe`]: Self::describe
178    #[must_use = "reporter does nothing unless run"]
179    pub fn build_with_monitor(mut self, monitor: RuntimeMonitor) -> RuntimeMetricsReporter {
180        RuntimeMetricsReporter {
181            interval: self.interval,
182            intervals: monitor.intervals(),
183            emitter: RuntimeMetricRefs::capture(&mut self.metrics_transformer),
184        }
185    }
186
187    /// Call [`describe_counter`] etc. to describe the emitted metrics.
188    ///
189    /// Describing metrics makes the reporter attach descriptions and units to them,
190    /// which makes them easier to use. However, some reporters don't support
191    /// describing the same metric name more than once, so it is generally a good
192    /// idea to only call this function once per metric reporter.
193    ///
194    /// [`describe_counter`]: metrics::describe_counter
195    /// [metrics-rs]: metrics
196    pub fn describe(mut self) -> Self {
197        RuntimeMetricRefs::describe(&mut self.metrics_transformer);
198        self
199    }
200
201    /// Runs the reporter (within the returned future), [describing] the metrics beforehand.
202    ///
203    /// Describing metrics makes the reporter attach descriptions and units to them,
204    /// which makes them easier to use. However, some reporters don't support
205    /// describing the same metric name more than once. If you are emitting multiple
206    /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each
207    /// runtime metrics reporter.
208    ///
209    /// ### Working with a custom reporter
210    ///
211    /// If you want to set a local metrics reporter, you shouldn't be calling this method,
212    /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
213    /// call `run` (see the docs on [`build`]).
214    ///
215    /// [describing]: Self::describe
216    /// [`describe`]: Self::describe
217    /// [`build`]: Self::build.
218    /// [`run`]: RuntimeMetricsReporter::run
219    /// [`with_local_recorder`]: metrics::with_local_recorder
220    pub async fn describe_and_run(self) {
221        self.describe().build().run().await;
222    }
223
224    /// Runs the reporter (within the returned future), not describing the metrics beforehand.
225    ///
226    /// ### Working with a custom reporter
227    ///
228    /// If you want to set a local metrics reporter, you shouldn't be calling this method,
229    /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
230    /// call [`run`] (see the docs on [`build`]).
231    ///
232    /// [`build`]: Self::build
233    /// [`run`]: RuntimeMetricsReporter::run
234    /// [`with_local_recorder`]: metrics::with_local_recorder
235    pub async fn run_without_describing(self) {
236        self.build().run().await;
237    }
238}
239
240/// Collects metrics from a Tokio runtime and uploads them to [metrics_rs](metrics).
241pub struct RuntimeMetricsReporter {
242    interval: Duration,
243    intervals: RuntimeIntervals,
244    emitter: RuntimeMetricRefs,
245}
246
247metric_refs! {
248    [RuntimeMetricRefs] [elapsed] [RuntimeMetrics] [&tokio::runtime::RuntimeMetrics] {
249        stable {
250            /// The number of worker threads used by the runtime
251            workers_count: Gauge<Count> [],
252            /// The number of times worker threads parked
253            max_park_count: Gauge<Count> [],
254            /// The minimum number of times any worker thread parked
255            min_park_count: Gauge<Count> [],
256            /// The number of times worker threads parked
257            total_park_count: Gauge<Count> [],
258            /// The amount of time worker threads were busy
259            total_busy_duration: Counter<Microseconds> [],
260            /// The maximum amount of time a worker thread was busy
261            max_busy_duration: Counter<Microseconds> [],
262            /// The minimum amount of time a worker thread was busy
263            min_busy_duration: Counter<Microseconds> [],
264            /// The number of tasks currently scheduled in the runtime's global queue
265            global_queue_depth: Gauge<Count> [],
266        }
267        unstable {
268            /// The average duration of a single invocation of poll on a task
269            mean_poll_duration: Gauge<Microseconds> [],
270            /// The average duration of a single invocation of poll on a task on the worker with the lowest value
271            mean_poll_duration_worker_min: Gauge<Microseconds> [],
272            /// The average duration of a single invocation of poll on a task on the worker with the highest value
273            mean_poll_duration_worker_max: Gauge<Microseconds> [],
274            /// A histogram of task polls since the previous probe grouped by poll times
275            poll_time_histogram: PollTimeHistogram<Microseconds> [],
276            /// The number of times worker threads unparked but performed no work before parking again
277            total_noop_count: Counter<Count> [],
278            /// The maximum number of times any worker thread unparked but performed no work before parking again
279            max_noop_count: Counter<Count> [],
280            /// The minimum number of times any worker thread unparked but performed no work before parking again
281            min_noop_count: Counter<Count> [],
282            /// The number of tasks worker threads stole from another worker thread
283            total_steal_count: Counter<Count> [],
284            /// The maximum number of tasks any worker thread stole from another worker thread.
285            max_steal_count: Counter<Count> [],
286            /// The minimum number of tasks any worker thread stole from another worker thread
287            min_steal_count: Counter<Count> [],
288            /// The number of times worker threads stole tasks from another worker thread
289            total_steal_operations: Counter<Count> [],
290            /// The maximum number of times any worker thread stole tasks from another worker thread
291            max_steal_operations: Counter<Count> [],
292            /// The minimum number of times any worker thread stole tasks from another worker thread
293            min_steal_operations: Counter<Count> [],
294            /// The number of tasks scheduled from **outside** of the runtime
295            num_remote_schedules: Counter<Count> [],
296            /// The number of tasks scheduled from worker threads
297            total_local_schedule_count: Counter<Count> [],
298            /// The maximum number of tasks scheduled from any one worker thread
299            max_local_schedule_count: Counter<Count> [],
300            /// The minimum number of tasks scheduled from any one worker thread
301            min_local_schedule_count: Counter<Count> [],
302            /// The number of times worker threads saturated their local queues
303            total_overflow_count: Counter<Count> [],
304            /// The maximum number of times any one worker saturated its local queue
305            max_overflow_count: Counter<Count> [],
306            /// The minimum number of times any one worker saturated its local queue
307            min_overflow_count: Counter<Count> [],
308            /// The number of tasks that have been polled across all worker threads
309            total_polls_count: Counter<Count> [],
310            /// The maximum number of tasks that have been polled in any worker thread
311            max_polls_count: Counter<Count> [],
312            /// The minimum number of tasks that have been polled in any worker thread
313            min_polls_count: Counter<Count> [],
314            /// The total number of tasks currently scheduled in workers' local queues
315            total_local_queue_depth: Gauge<Count> [],
316            /// The maximum number of tasks currently scheduled any worker's local queue
317            max_local_queue_depth: Gauge<Count> [],
318            /// The minimum number of tasks currently scheduled any worker's local queue
319            min_local_queue_depth: Gauge<Count> [],
320            /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
321            blocking_queue_depth: Gauge<Count> [],
322            /// The current number of alive tasks in the runtime.
323            live_tasks_count: Gauge<Count> [],
324            /// The number of additional threads spawned by the runtime.
325            blocking_threads_count: Gauge<Count> [],
326            /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
327            idle_blocking_threads_count: Gauge<Count> [],
328            /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets
329            budget_forced_yield_count: Counter<Count> [],
330            /// Returns the number of ready events processed by the runtime’s I/O driver
331            io_driver_ready_count: Counter<Count> [],
332        }
333    }
334}
335
336impl fmt::Debug for RuntimeMetricsReporter {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        f.debug_struct("RuntimeMetricsReporter")
339            .field("interval", &self.interval)
340            // skip intervals field
341            .finish()
342    }
343}
344
345impl RuntimeMetricsReporter {
346    /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter.
347    pub fn run_once(&mut self) {
348        let metrics = self
349            .intervals
350            .next()
351            .expect("RuntimeIntervals::next never returns None");
352        self.emitter.emit(metrics, &self.intervals.runtime);
353    }
354
355    /// Collect and publish metrics periodically to the configured [metrics_rs](metrics) reporter.
356    ///
357    /// You probably want to run this within its own task (using [`tokio::task::spawn`])
358    pub async fn run(mut self) {
359        loop {
360            self.run_once();
361            tokio::time::sleep(self.interval).await;
362        }
363    }
364}