tokio_metrics/runtime/
metrics_rs_integration.rs

1use std::{fmt, time::Duration};
2
3use tokio::runtime::Handle;
4
5use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor};
6
7/// A builder for the a [`RuntimeMetricsReporter`] that wraps the RuntimeMonitor, periodically
8/// reporting RuntimeMetrics to any configured [metrics-rs] recorder.
9///
10/// ### Published Metrics
11///
12/// The published metrics are the fields of [RuntimeMetrics], but with the
13/// `tokio_` prefix added, for example, `tokio_workers_count`. If desired, you
14/// can use the [`with_metrics_transformer`] function to customize the metric names.
15///
16/// ### Usage
17///
18/// To upload metrics via [metrics-rs], you need to set up a reporter, which
19/// is actually what exports the metrics outside of the program. You must set
20/// up the reporter before you call [`describe_and_run`].
21///
22/// You can find exporters within the [metrics-rs] docs. One such reporter
23/// is the [metrics_exporter_prometheus] reporter, which makes metrics visible
24/// through Prometheus.
25///
26/// You can use it for exampleto  export Prometheus metrics by listening on a local Unix socket
27/// called `prometheus.sock`, which you can access for debugging by
28/// `curl --unix-socket prometheus.sock localhost`, as follows:
29///
30/// ```
31/// use std::time::Duration;
32///
33/// #[tokio::main]
34/// async fn main() {
35///     metrics_exporter_prometheus::PrometheusBuilder::new()
36///         .with_http_uds_listener("prometheus.sock")
37///         .install()
38///         .unwrap();
39///     tokio::task::spawn(
40///         tokio_metrics::RuntimeMetricsReporterBuilder::default()
41///             // the default metric sampling interval is 30 seconds, which is
42///             // too long for quick tests, so have it be 1 second.
43///             .with_interval(std::time::Duration::from_secs(1))
44///             .describe_and_run(),
45///     );
46///     // Run some code
47///     tokio::task::spawn(async move {
48///         for _ in 0..1000 {
49///             tokio::time::sleep(Duration::from_millis(10)).await;
50///         }
51///     })
52///     .await
53///     .unwrap();
54/// }
55/// ```
56///
57/// [`describe_and_run`]: RuntimeMetricsReporterBuilder::describe_and_run
58/// [`with_metrics_transformer`]: RuntimeMetricsReporterBuilder::with_metrics_transformer
59/// [metrics-rs]: metrics
60/// [metrics_exporter_prometheus]: https://docs.rs/metrics_exporter_prometheus
61pub struct RuntimeMetricsReporterBuilder {
62    interval: Duration,
63    metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
64}
65
66impl fmt::Debug for RuntimeMetricsReporterBuilder {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct("RuntimeMetricsReporterBuilder")
69            .field("interval", &self.interval)
70            // skip metrics_transformer field
71            .finish()
72    }
73}
74
75const DEFAULT_METRIC_SAMPLING_INTERVAL: Duration = Duration::from_secs(30);
76
77impl Default for RuntimeMetricsReporterBuilder {
78    fn default() -> Self {
79        RuntimeMetricsReporterBuilder {
80            interval: DEFAULT_METRIC_SAMPLING_INTERVAL,
81            metrics_transformer: Box::new(metrics::Key::from_static_name),
82        }
83    }
84}
85
86impl RuntimeMetricsReporterBuilder {
87    /// Set the metric sampling interval, default: 30 seconds.
88    ///
89    /// Note that this is the interval on which metrics are *sampled* from
90    /// the Tokio runtime and then set on the [metrics-rs] reporter. Uploading the
91    /// metrics upstream is controlled by the reporter set up in the
92    /// application, and is normally controlled by a different period.
93    ///
94    /// For example, if metrics are exported via Prometheus, that
95    /// normally operates at a pull-based fashion, and the actual collection
96    /// period is controlled by the Prometheus server, which periodically polls the
97    /// application's Prometheus exporter to get the latest value of the metrics.
98    ///
99    /// [metrics-rs]: metrics
100    pub fn with_interval(mut self, interval: Duration) -> Self {
101        self.interval = interval;
102        self
103    }
104
105    /// Set a custom "metrics transformer", which is used during `build` to transform the metric
106    /// names into metric keys, for example to add dimensions. The string metric names used by this reporter
107    /// all start with `tokio_`. The default transformer is just [`metrics::Key::from_static_name`]
108    ///
109    /// For example, to attach a dimension named "application" with value "my_app", and to replace
110    /// `tokio_` with `my_app_`
111    /// ```
112    /// # use metrics::Key;
113    ///
114    /// #[tokio::main]
115    /// async fn main() {
116    ///     metrics_exporter_prometheus::PrometheusBuilder::new()
117    ///         .with_http_uds_listener("prometheus.sock")
118    ///         .install()
119    ///         .unwrap();
120    ///     tokio::task::spawn(
121    ///         tokio_metrics::RuntimeMetricsReporterBuilder::default().with_metrics_transformer(|name| {
122    ///             let name = name.replacen("tokio_", "my_app_", 1);
123    ///             Key::from_parts(name, &[("application", "my_app")])
124    ///         })
125    ///         .describe_and_run()
126    ///     );
127    /// }
128    /// ```
129    pub fn with_metrics_transformer(
130        mut self,
131        transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static,
132    ) -> Self {
133        self.metrics_transformer = Box::new(transformer);
134        self
135    }
136
137    /// Build the [`RuntimeMetricsReporter`] for the current Tokio runtime. This function will capture
138    /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
139    /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`] with it.
140    ///
141    /// For example:
142    /// ```
143    /// # use std::sync::Arc;
144    ///
145    /// #[tokio::main]
146    /// async fn main() {
147    ///     let builder = tokio_metrics::RuntimeMetricsReporterBuilder::default();
148    ///     let recorder = Arc::new(metrics_util::debugging::DebuggingRecorder::new());
149    ///     let metrics_reporter = metrics::with_local_recorder(&recorder, || builder.describe().build());
150    ///
151    ///     // no need to wrap `run()`, since the metrics are already captured
152    ///     tokio::task::spawn(metrics_reporter.run());
153    /// }
154    /// ```
155    ///
156    ///
157    /// [`Counter`]: metrics::Counter
158    /// [`Gauge`]: metrics::Counter
159    /// [`Histogram`]: metrics::Counter
160    /// [metrics-rs]: metrics
161    /// [`with_local_recorder`]: metrics::with_local_recorder
162    /// [`describe`]: Self::describe
163    #[must_use = "reporter does nothing unless run"]
164    pub fn build(self) -> RuntimeMetricsReporter {
165        self.build_with_monitor(RuntimeMonitor::new(&Handle::current()))
166    }
167
168    /// Build the [`RuntimeMetricsReporter`] with a specific [`RuntimeMonitor`]. This function will capture
169    /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
170    /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`]
171    /// with it.
172    ///
173    /// [`Counter`]: metrics::Counter
174    /// [`Gauge`]: metrics::Counter
175    /// [`Histogram`]: metrics::Counter
176    /// [metrics-rs]: metrics
177    /// [`with_local_recorder`]: metrics::with_local_recorder
178    /// [`describe`]: Self::describe
179    #[must_use = "reporter does nothing unless run"]
180    pub fn build_with_monitor(mut self, monitor: RuntimeMonitor) -> RuntimeMetricsReporter {
181        RuntimeMetricsReporter {
182            interval: self.interval,
183            intervals: monitor.intervals(),
184            emitter: RuntimeMetricRefs::capture(&mut self.metrics_transformer),
185        }
186    }
187
188    /// Call [`describe_counter`] etc. to describe the emitted metrics.
189    ///
190    /// Describing metrics makes the reporter attach descriptions and units to them,
191    /// which makes them easier to use. However, some reporters don't support
192    /// describing the same metric name more than once, so it is generally a good
193    /// idea to only call this function once per metric reporter.
194    ///
195    /// [`describe_counter`]: metrics::describe_counter
196    /// [metrics-rs]: metrics
197    pub fn describe(mut self) -> Self {
198        RuntimeMetricRefs::describe(&mut self.metrics_transformer);
199        self
200    }
201
202    /// Runs the reporter (within the returned future), [describing] the metrics beforehand.
203    ///
204    /// Describing metrics makes the reporter attach descriptions and units to them,
205    /// which makes them easier to use. However, some reporters don't support
206    /// describing the same metric name more than once. If you are emitting multiple
207    /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each
208    /// runtime metrics reporter.
209    ///
210    /// ### Working with a custom reporter
211    ///
212    /// If you want to set a local metrics reporter, you shouldn't be calling this method,
213    /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
214    /// call `run` (see the docs on [`build`]).
215    ///
216    /// [describing]: Self::describe
217    /// [`describe`]: Self::describe
218    /// [`build`]: Self::build.
219    /// [`run`]: RuntimeMetricsReporter::run
220    /// [`with_local_recorder`]: metrics::with_local_recorder
221    pub async fn describe_and_run(self) {
222        self.describe().build().run().await;
223    }
224
225    /// Runs the reporter (within the returned future), not describing the metrics beforehand.
226    ///
227    /// ### Working with a custom reporter
228    ///
229    /// If you want to set a local metrics reporter, you shouldn't be calling this method,
230    /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
231    /// call [`run`] (see the docs on [`build`]).
232    ///
233    /// [`build`]: Self::build
234    /// [`run`]: RuntimeMetricsReporter::run
235    /// [`with_local_recorder`]: metrics::with_local_recorder
236    pub async fn run_without_describing(self) {
237        self.build().run().await;
238    }
239}
240
241/// Collects metrics from a Tokio runtime and uploads them to [metrics_rs](metrics).
242pub struct RuntimeMetricsReporter {
243    interval: Duration,
244    intervals: RuntimeIntervals,
245    emitter: RuntimeMetricRefs,
246}
247
248macro_rules! kind_to_type {
249    (Counter) => {
250        metrics::Counter
251    };
252    (Gauge) => {
253        metrics::Gauge
254    };
255    (Histogram) => {
256        metrics::Histogram
257    };
258}
259
260macro_rules! metric_key {
261    ($transform_fn:ident, $name:ident) => {
262        $transform_fn(concat!("tokio_", stringify!($name)))
263    };
264}
265
266// calling `trim` since /// inserts spaces into docs
267macro_rules! describe_metric_ref {
268    ($transform_fn:ident, $doc:expr, $name:ident: Counter<$unit:ident> []) => {
269        metrics::describe_counter!(
270            metric_key!($transform_fn, $name).name().to_owned(),
271            metrics::Unit::$unit,
272            $doc.trim()
273        )
274    };
275    ($transform_fn:ident, $doc:expr, $name:ident: Gauge<$unit:ident> []) => {
276        metrics::describe_gauge!(
277            metric_key!($transform_fn, $name).name().to_owned(),
278            metrics::Unit::$unit,
279            $doc.trim()
280        )
281    };
282    ($transform_fn:ident, $doc:expr, $name:ident: Histogram<$unit:ident> []) => {
283        metrics::describe_histogram!(
284            metric_key!($transform_fn, $name).name().to_owned(),
285            metrics::Unit::$unit,
286            $doc.trim()
287        )
288    };
289}
290
291macro_rules! capture_metric_ref {
292    ($transform_fn:ident, $name:ident: Counter []) => {{
293        let (name, labels) = metric_key!($transform_fn, $name).into_parts();
294        metrics::counter!(name, labels)
295    }};
296    ($transform_fn:ident, $name:ident: Gauge []) => {{
297        let (name, labels) = metric_key!($transform_fn, $name).into_parts();
298        metrics::gauge!(name, labels)
299    }};
300    ($transform_fn:ident, $name:ident: Histogram []) => {{
301        let (name, labels) = metric_key!($transform_fn, $name).into_parts();
302        metrics::histogram!(name, labels)
303    }};
304}
305
306macro_rules! metric_refs {
307    (
308        [$struct_name:ident] [$($ignore:ident),* $(,)?] {
309         stable {
310            $(
311                #[doc = $doc:tt]
312                $name:ident: $kind:tt <$unit:ident> $opts:tt
313            ),*
314            $(,)?
315         }
316         unstable {
317            $(
318                #[doc = $unstable_doc:tt]
319                $unstable_name:ident: $unstable_kind:tt <$unstable_unit:ident> $unstable_opts:tt
320            ),*
321            $(,)?
322         }
323        }
324  ) => {
325        struct $struct_name {
326            $(
327                $name: kind_to_type!($kind),
328            )*
329            $(
330                #[cfg(tokio_unstable)]
331                $unstable_name: kind_to_type!($unstable_kind),
332            )*
333        }
334
335        impl $struct_name {
336            fn capture(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) -> Self {
337                Self {
338                    $(
339                        $name: capture_metric_ref!(transform_fn, $name: $kind $opts),
340                    )*
341                    $(
342                        #[cfg(tokio_unstable)]
343                        $unstable_name: capture_metric_ref!(transform_fn, $unstable_name: $unstable_kind $unstable_opts),
344                    )*
345                }
346            }
347
348            fn emit(&self, metrics: RuntimeMetrics, tokio: &tokio::runtime::RuntimeMetrics) {
349                $(
350                    MyMetricOp::op((&self.$name, metrics.$name), tokio);
351                )*
352                $(
353                    #[cfg(tokio_unstable)]
354                    MyMetricOp::op((&self.$unstable_name, metrics.$unstable_name), tokio);
355                )*
356            }
357
358            fn describe(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) {
359                $(
360                    describe_metric_ref!(transform_fn, $doc, $name: $kind<$unit> $opts);
361                )*
362                $(
363                    #[cfg(tokio_unstable)]
364                    describe_metric_ref!(transform_fn, $unstable_doc, $unstable_name: $unstable_kind<$unstable_unit> $unstable_opts);
365                )*
366            }
367        }
368
369        #[test]
370        fn test_no_fields_missing() {
371            // test that no fields are missing. We can't use exhaustive matching here
372            // since RuntimeMetrics is #[non_exhaustive], so use a debug impl
373            let debug = format!("{:#?}", RuntimeMetrics::default());
374            for line in debug.lines() {
375                if line == "RuntimeMetrics {" || line == "}" {
376                    continue
377                }
378                $(
379                    let expected = format!("    {}:", stringify!($ignore));
380                    if line.contains(&expected) {
381                        continue
382                    }
383                );*
384                $(
385                    let expected = format!("    {}:", stringify!($name));
386                    eprintln!("{}", expected);
387                    if line.contains(&expected) {
388                        continue
389                    }
390                );*
391                $(
392                    let expected = format!("    {}:", stringify!($unstable_name));
393                    eprintln!("{}", expected);
394                    if line.contains(&expected) {
395                        continue
396                    }
397                );*
398                panic!("missing metric {:?}", line);
399            }
400        }
401    }
402}
403
404metric_refs! {
405    [RuntimeMetricRefs] [elapsed] {
406        stable {
407            /// The number of worker threads used by the runtime
408            workers_count: Gauge<Count> [],
409            /// The number of times worker threads parked
410            max_park_count: Gauge<Count> [],
411            /// The minimum number of times any worker thread parked
412            min_park_count: Gauge<Count> [],
413            /// The number of times worker threads parked
414            total_park_count: Gauge<Count> [],
415            /// The amount of time worker threads were busy
416            total_busy_duration: Counter<Microseconds> [],
417            /// The maximum amount of time a worker thread was busy
418            max_busy_duration: Counter<Microseconds> [],
419            /// The minimum amount of time a worker thread was busy
420            min_busy_duration: Counter<Microseconds> [],
421            /// The number of tasks currently scheduled in the runtime's global queue
422            global_queue_depth: Gauge<Count> [],
423        }
424        unstable {
425            /// The average duration of a single invocation of poll on a task
426            mean_poll_duration: Gauge<Microseconds> [],
427            /// The average duration of a single invocation of poll on a task on the worker with the lowest value
428            mean_poll_duration_worker_min: Gauge<Microseconds> [],
429            /// The average duration of a single invocation of poll on a task on the worker with the highest value
430            mean_poll_duration_worker_max: Gauge<Microseconds> [],
431            /// A histogram of task polls since the previous probe grouped by poll times
432            poll_time_histogram: Histogram<Microseconds> [],
433            /// The number of times worker threads unparked but performed no work before parking again
434            total_noop_count: Counter<Count> [],
435            /// The maximum number of times any worker thread unparked but performed no work before parking again
436            max_noop_count: Counter<Count> [],
437            /// The minimum number of times any worker thread unparked but performed no work before parking again
438            min_noop_count: Counter<Count> [],
439            /// The number of tasks worker threads stole from another worker thread
440            total_steal_count: Counter<Count> [],
441            /// The maximum number of tasks any worker thread stole from another worker thread.
442            max_steal_count: Counter<Count> [],
443            /// The minimum number of tasks any worker thread stole from another worker thread
444            min_steal_count: Counter<Count> [],
445            /// The number of times worker threads stole tasks from another worker thread
446            total_steal_operations: Counter<Count> [],
447            /// The maximum number of times any worker thread stole tasks from another worker thread
448            max_steal_operations: Counter<Count> [],
449            /// The minimum number of times any worker thread stole tasks from another worker thread
450            min_steal_operations: Counter<Count> [],
451            /// The number of tasks scheduled from **outside** of the runtime
452            num_remote_schedules: Counter<Count> [],
453            /// The number of tasks scheduled from worker threads
454            total_local_schedule_count: Counter<Count> [],
455            /// The maximum number of tasks scheduled from any one worker thread
456            max_local_schedule_count: Counter<Count> [],
457            /// The minimum number of tasks scheduled from any one worker thread
458            min_local_schedule_count: Counter<Count> [],
459            /// The number of times worker threads saturated their local queues
460            total_overflow_count: Counter<Count> [],
461            /// The maximum number of times any one worker saturated its local queue
462            max_overflow_count: Counter<Count> [],
463            /// The minimum number of times any one worker saturated its local queue
464            min_overflow_count: Counter<Count> [],
465            /// The number of tasks that have been polled across all worker threads
466            total_polls_count: Counter<Count> [],
467            /// The maximum number of tasks that have been polled in any worker thread
468            max_polls_count: Counter<Count> [],
469            /// The minimum number of tasks that have been polled in any worker thread
470            min_polls_count: Counter<Count> [],
471            /// The total number of tasks currently scheduled in workers' local queues
472            total_local_queue_depth: Gauge<Count> [],
473            /// The maximum number of tasks currently scheduled any worker's local queue
474            max_local_queue_depth: Gauge<Count> [],
475            /// The minimum number of tasks currently scheduled any worker's local queue
476            min_local_queue_depth: Gauge<Count> [],
477            /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
478            blocking_queue_depth: Gauge<Count> [],
479            /// The current number of alive tasks in the runtime.
480            live_tasks_count: Gauge<Count> [],
481            /// The number of additional threads spawned by the runtime.
482            blocking_threads_count: Gauge<Count> [],
483            /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
484            idle_blocking_threads_count: Gauge<Count> [],
485            /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets
486            budget_forced_yield_count: Counter<Count> [],
487            /// Returns the number of ready events processed by the runtime’s I/O driver
488            io_driver_ready_count: Counter<Count> [],
489        }
490    }
491}
492trait MyMetricOp {
493    fn op(self, tokio: &tokio::runtime::RuntimeMetrics);
494}
495
496impl MyMetricOp for (&metrics::Counter, Duration) {
497    fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
498        self.0
499            .increment(self.1.as_micros().try_into().unwrap_or(u64::MAX));
500    }
501}
502
503impl MyMetricOp for (&metrics::Counter, u64) {
504    fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
505        self.0.increment(self.1);
506    }
507}
508
509impl MyMetricOp for (&metrics::Gauge, Duration) {
510    fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
511        self.0.set(self.1.as_micros() as f64);
512    }
513}
514
515impl MyMetricOp for (&metrics::Gauge, u64) {
516    fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
517        self.0.set(self.1 as f64);
518    }
519}
520
521impl MyMetricOp for (&metrics::Gauge, usize) {
522    fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
523        self.0.set(self.1 as f64);
524    }
525}
526
527#[cfg(tokio_unstable)]
528impl MyMetricOp for (&metrics::Histogram, Vec<u64>) {
529    fn op(self, tokio: &tokio::runtime::RuntimeMetrics) {
530        for (i, bucket) in self.1.iter().enumerate() {
531            let range = tokio.poll_time_histogram_bucket_range(i);
532            if *bucket > 0 {
533                // emit using range.start to avoid very large numbers for open bucket
534                // FIXME: do we want to do something else here?
535                self.0
536                    .record_many(range.start.as_micros() as f64, *bucket as usize);
537            }
538        }
539    }
540}
541
542impl fmt::Debug for RuntimeMetricsReporter {
543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544        f.debug_struct("RuntimeMetricsReporter")
545            .field("interval", &self.interval)
546            // skip intervals field
547            .finish()
548    }
549}
550
551impl RuntimeMetricsReporter {
552    /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter.
553    pub fn run_once(&mut self) {
554        let metrics = self
555            .intervals
556            .next()
557            .expect("RuntimeIntervals::next never returns None");
558        self.emitter.emit(metrics, &self.intervals.runtime);
559    }
560
561    /// Collect and publish metrics periodically to the configured [metrics_rs](metrics) reporter.
562    ///
563    /// You probably want to run this within its own task (using [`tokio::task::spawn`])
564    pub async fn run(mut self) {
565        loop {
566            self.run_once();
567            tokio::time::sleep(self.interval).await;
568        }
569    }
570}