tokio_metrics/runtime/metrics_rs_integration.rs
1use std::{fmt, time::Duration};
2
3use tokio::runtime::Handle;
4
5use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor};
6
7/// A builder for the a [`RuntimeMetricsReporter`] that wraps the RuntimeMonitor, periodically
8/// reporting RuntimeMetrics to any configured [metrics-rs] recorder.
9///
10/// ### Published Metrics
11///
12/// The published metrics are the fields of [RuntimeMetrics], but with the
13/// `tokio_` prefix added, for example, `tokio_workers_count`. If desired, you
14/// can use the [`with_metrics_transformer`] function to customize the metric names.
15///
16/// ### Usage
17///
18/// To upload metrics via [metrics-rs], you need to set up a reporter, which
19/// is actually what exports the metrics outside of the program. You must set
20/// up the reporter before you call [`describe_and_run`].
21///
22/// You can find exporters within the [metrics-rs] docs. One such reporter
23/// is the [metrics_exporter_prometheus] reporter, which makes metrics visible
24/// through Prometheus.
25///
26/// You can use it for exampleto export Prometheus metrics by listening on a local Unix socket
27/// called `prometheus.sock`, which you can access for debugging by
28/// `curl --unix-socket prometheus.sock localhost`, as follows:
29///
30/// ```
31/// use std::time::Duration;
32///
33/// #[tokio::main]
34/// async fn main() {
35/// metrics_exporter_prometheus::PrometheusBuilder::new()
36/// .with_http_uds_listener("prometheus.sock")
37/// .install()
38/// .unwrap();
39/// tokio::task::spawn(
40/// tokio_metrics::RuntimeMetricsReporterBuilder::default()
41/// // the default metric sampling interval is 30 seconds, which is
42/// // too long for quick tests, so have it be 1 second.
43/// .with_interval(std::time::Duration::from_secs(1))
44/// .describe_and_run(),
45/// );
46/// // Run some code
47/// tokio::task::spawn(async move {
48/// for _ in 0..1000 {
49/// tokio::time::sleep(Duration::from_millis(10)).await;
50/// }
51/// })
52/// .await
53/// .unwrap();
54/// }
55/// ```
56///
57/// [`describe_and_run`]: RuntimeMetricsReporterBuilder::describe_and_run
58/// [`with_metrics_transformer`]: RuntimeMetricsReporterBuilder::with_metrics_transformer
59/// [metrics-rs]: metrics
60/// [metrics_exporter_prometheus]: https://docs.rs/metrics_exporter_prometheus
61pub struct RuntimeMetricsReporterBuilder {
62 interval: Duration,
63 metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
64}
65
66impl fmt::Debug for RuntimeMetricsReporterBuilder {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct("RuntimeMetricsReporterBuilder")
69 .field("interval", &self.interval)
70 // skip metrics_transformer field
71 .finish()
72 }
73}
74
75const DEFAULT_METRIC_SAMPLING_INTERVAL: Duration = Duration::from_secs(30);
76
77impl Default for RuntimeMetricsReporterBuilder {
78 fn default() -> Self {
79 RuntimeMetricsReporterBuilder {
80 interval: DEFAULT_METRIC_SAMPLING_INTERVAL,
81 metrics_transformer: Box::new(metrics::Key::from_static_name),
82 }
83 }
84}
85
86impl RuntimeMetricsReporterBuilder {
87 /// Set the metric sampling interval, default: 30 seconds.
88 ///
89 /// Note that this is the interval on which metrics are *sampled* from
90 /// the Tokio runtime and then set on the [metrics-rs] reporter. Uploading the
91 /// metrics upstream is controlled by the reporter set up in the
92 /// application, and is normally controlled by a different period.
93 ///
94 /// For example, if metrics are exported via Prometheus, that
95 /// normally operates at a pull-based fashion, and the actual collection
96 /// period is controlled by the Prometheus server, which periodically polls the
97 /// application's Prometheus exporter to get the latest value of the metrics.
98 ///
99 /// [metrics-rs]: metrics
100 pub fn with_interval(mut self, interval: Duration) -> Self {
101 self.interval = interval;
102 self
103 }
104
105 /// Set a custom "metrics transformer", which is used during `build` to transform the metric
106 /// names into metric keys, for example to add dimensions. The string metric names used by this reporter
107 /// all start with `tokio_`. The default transformer is just [`metrics::Key::from_static_name`]
108 ///
109 /// For example, to attach a dimension named "application" with value "my_app", and to replace
110 /// `tokio_` with `my_app_`
111 /// ```
112 /// # use metrics::Key;
113 ///
114 /// #[tokio::main]
115 /// async fn main() {
116 /// metrics_exporter_prometheus::PrometheusBuilder::new()
117 /// .with_http_uds_listener("prometheus.sock")
118 /// .install()
119 /// .unwrap();
120 /// tokio::task::spawn(
121 /// tokio_metrics::RuntimeMetricsReporterBuilder::default().with_metrics_transformer(|name| {
122 /// let name = name.replacen("tokio_", "my_app_", 1);
123 /// Key::from_parts(name, &[("application", "my_app")])
124 /// })
125 /// .describe_and_run()
126 /// );
127 /// }
128 /// ```
129 pub fn with_metrics_transformer(
130 mut self,
131 transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static,
132 ) -> Self {
133 self.metrics_transformer = Box::new(transformer);
134 self
135 }
136
137 /// Build the [`RuntimeMetricsReporter`] for the current Tokio runtime. This function will capture
138 /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
139 /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`] with it.
140 ///
141 /// For example:
142 /// ```
143 /// # use std::sync::Arc;
144 ///
145 /// #[tokio::main]
146 /// async fn main() {
147 /// let builder = tokio_metrics::RuntimeMetricsReporterBuilder::default();
148 /// let recorder = Arc::new(metrics_util::debugging::DebuggingRecorder::new());
149 /// let metrics_reporter = metrics::with_local_recorder(&recorder, || builder.describe().build());
150 ///
151 /// // no need to wrap `run()`, since the metrics are already captured
152 /// tokio::task::spawn(metrics_reporter.run());
153 /// }
154 /// ```
155 ///
156 ///
157 /// [`Counter`]: metrics::Counter
158 /// [`Gauge`]: metrics::Counter
159 /// [`Histogram`]: metrics::Counter
160 /// [metrics-rs]: metrics
161 /// [`with_local_recorder`]: metrics::with_local_recorder
162 /// [`describe`]: Self::describe
163 #[must_use = "reporter does nothing unless run"]
164 pub fn build(self) -> RuntimeMetricsReporter {
165 self.build_with_monitor(RuntimeMonitor::new(&Handle::current()))
166 }
167
168 /// Build the [`RuntimeMetricsReporter`] with a specific [`RuntimeMonitor`]. This function will capture
169 /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter,
170 /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`]
171 /// with it.
172 ///
173 /// [`Counter`]: metrics::Counter
174 /// [`Gauge`]: metrics::Counter
175 /// [`Histogram`]: metrics::Counter
176 /// [metrics-rs]: metrics
177 /// [`with_local_recorder`]: metrics::with_local_recorder
178 /// [`describe`]: Self::describe
179 #[must_use = "reporter does nothing unless run"]
180 pub fn build_with_monitor(mut self, monitor: RuntimeMonitor) -> RuntimeMetricsReporter {
181 RuntimeMetricsReporter {
182 interval: self.interval,
183 intervals: monitor.intervals(),
184 emitter: RuntimeMetricRefs::capture(&mut self.metrics_transformer),
185 }
186 }
187
188 /// Call [`describe_counter`] etc. to describe the emitted metrics.
189 ///
190 /// Describing metrics makes the reporter attach descriptions and units to them,
191 /// which makes them easier to use. However, some reporters don't support
192 /// describing the same metric name more than once, so it is generally a good
193 /// idea to only call this function once per metric reporter.
194 ///
195 /// [`describe_counter`]: metrics::describe_counter
196 /// [metrics-rs]: metrics
197 pub fn describe(mut self) -> Self {
198 RuntimeMetricRefs::describe(&mut self.metrics_transformer);
199 self
200 }
201
202 /// Runs the reporter (within the returned future), [describing] the metrics beforehand.
203 ///
204 /// Describing metrics makes the reporter attach descriptions and units to them,
205 /// which makes them easier to use. However, some reporters don't support
206 /// describing the same metric name more than once. If you are emitting multiple
207 /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each
208 /// runtime metrics reporter.
209 ///
210 /// ### Working with a custom reporter
211 ///
212 /// If you want to set a local metrics reporter, you shouldn't be calling this method,
213 /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
214 /// call `run` (see the docs on [`build`]).
215 ///
216 /// [describing]: Self::describe
217 /// [`describe`]: Self::describe
218 /// [`build`]: Self::build.
219 /// [`run`]: RuntimeMetricsReporter::run
220 /// [`with_local_recorder`]: metrics::with_local_recorder
221 pub async fn describe_and_run(self) {
222 self.describe().build().run().await;
223 }
224
225 /// Runs the reporter (within the returned future), not describing the metrics beforehand.
226 ///
227 /// ### Working with a custom reporter
228 ///
229 /// If you want to set a local metrics reporter, you shouldn't be calling this method,
230 /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
231 /// call [`run`] (see the docs on [`build`]).
232 ///
233 /// [`build`]: Self::build
234 /// [`run`]: RuntimeMetricsReporter::run
235 /// [`with_local_recorder`]: metrics::with_local_recorder
236 pub async fn run_without_describing(self) {
237 self.build().run().await;
238 }
239}
240
241/// Collects metrics from a Tokio runtime and uploads them to [metrics_rs](metrics).
242pub struct RuntimeMetricsReporter {
243 interval: Duration,
244 intervals: RuntimeIntervals,
245 emitter: RuntimeMetricRefs,
246}
247
248macro_rules! kind_to_type {
249 (Counter) => {
250 metrics::Counter
251 };
252 (Gauge) => {
253 metrics::Gauge
254 };
255 (Histogram) => {
256 metrics::Histogram
257 };
258}
259
260macro_rules! metric_key {
261 ($transform_fn:ident, $name:ident) => {
262 $transform_fn(concat!("tokio_", stringify!($name)))
263 };
264}
265
266// calling `trim` since /// inserts spaces into docs
267macro_rules! describe_metric_ref {
268 ($transform_fn:ident, $doc:expr, $name:ident: Counter<$unit:ident> []) => {
269 metrics::describe_counter!(
270 metric_key!($transform_fn, $name).name().to_owned(),
271 metrics::Unit::$unit,
272 $doc.trim()
273 )
274 };
275 ($transform_fn:ident, $doc:expr, $name:ident: Gauge<$unit:ident> []) => {
276 metrics::describe_gauge!(
277 metric_key!($transform_fn, $name).name().to_owned(),
278 metrics::Unit::$unit,
279 $doc.trim()
280 )
281 };
282 ($transform_fn:ident, $doc:expr, $name:ident: Histogram<$unit:ident> []) => {
283 metrics::describe_histogram!(
284 metric_key!($transform_fn, $name).name().to_owned(),
285 metrics::Unit::$unit,
286 $doc.trim()
287 )
288 };
289}
290
291macro_rules! capture_metric_ref {
292 ($transform_fn:ident, $name:ident: Counter []) => {{
293 let (name, labels) = metric_key!($transform_fn, $name).into_parts();
294 metrics::counter!(name, labels)
295 }};
296 ($transform_fn:ident, $name:ident: Gauge []) => {{
297 let (name, labels) = metric_key!($transform_fn, $name).into_parts();
298 metrics::gauge!(name, labels)
299 }};
300 ($transform_fn:ident, $name:ident: Histogram []) => {{
301 let (name, labels) = metric_key!($transform_fn, $name).into_parts();
302 metrics::histogram!(name, labels)
303 }};
304}
305
306macro_rules! metric_refs {
307 (
308 [$struct_name:ident] [$($ignore:ident),* $(,)?] {
309 stable {
310 $(
311 #[doc = $doc:tt]
312 $name:ident: $kind:tt <$unit:ident> $opts:tt
313 ),*
314 $(,)?
315 }
316 unstable {
317 $(
318 #[doc = $unstable_doc:tt]
319 $unstable_name:ident: $unstable_kind:tt <$unstable_unit:ident> $unstable_opts:tt
320 ),*
321 $(,)?
322 }
323 }
324 ) => {
325 struct $struct_name {
326 $(
327 $name: kind_to_type!($kind),
328 )*
329 $(
330 #[cfg(tokio_unstable)]
331 $unstable_name: kind_to_type!($unstable_kind),
332 )*
333 }
334
335 impl $struct_name {
336 fn capture(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) -> Self {
337 Self {
338 $(
339 $name: capture_metric_ref!(transform_fn, $name: $kind $opts),
340 )*
341 $(
342 #[cfg(tokio_unstable)]
343 $unstable_name: capture_metric_ref!(transform_fn, $unstable_name: $unstable_kind $unstable_opts),
344 )*
345 }
346 }
347
348 fn emit(&self, metrics: RuntimeMetrics, tokio: &tokio::runtime::RuntimeMetrics) {
349 $(
350 MyMetricOp::op((&self.$name, metrics.$name), tokio);
351 )*
352 $(
353 #[cfg(tokio_unstable)]
354 MyMetricOp::op((&self.$unstable_name, metrics.$unstable_name), tokio);
355 )*
356 }
357
358 fn describe(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) {
359 $(
360 describe_metric_ref!(transform_fn, $doc, $name: $kind<$unit> $opts);
361 )*
362 $(
363 #[cfg(tokio_unstable)]
364 describe_metric_ref!(transform_fn, $unstable_doc, $unstable_name: $unstable_kind<$unstable_unit> $unstable_opts);
365 )*
366 }
367 }
368
369 #[test]
370 fn test_no_fields_missing() {
371 // test that no fields are missing. We can't use exhaustive matching here
372 // since RuntimeMetrics is #[non_exhaustive], so use a debug impl
373 let debug = format!("{:#?}", RuntimeMetrics::default());
374 for line in debug.lines() {
375 if line == "RuntimeMetrics {" || line == "}" {
376 continue
377 }
378 $(
379 let expected = format!(" {}:", stringify!($ignore));
380 if line.contains(&expected) {
381 continue
382 }
383 );*
384 $(
385 let expected = format!(" {}:", stringify!($name));
386 eprintln!("{}", expected);
387 if line.contains(&expected) {
388 continue
389 }
390 );*
391 $(
392 let expected = format!(" {}:", stringify!($unstable_name));
393 eprintln!("{}", expected);
394 if line.contains(&expected) {
395 continue
396 }
397 );*
398 panic!("missing metric {:?}", line);
399 }
400 }
401 }
402}
403
404metric_refs! {
405 [RuntimeMetricRefs] [elapsed] {
406 stable {
407 /// The number of worker threads used by the runtime
408 workers_count: Gauge<Count> [],
409 /// The number of times worker threads parked
410 max_park_count: Gauge<Count> [],
411 /// The minimum number of times any worker thread parked
412 min_park_count: Gauge<Count> [],
413 /// The number of times worker threads parked
414 total_park_count: Gauge<Count> [],
415 /// The amount of time worker threads were busy
416 total_busy_duration: Counter<Microseconds> [],
417 /// The maximum amount of time a worker thread was busy
418 max_busy_duration: Counter<Microseconds> [],
419 /// The minimum amount of time a worker thread was busy
420 min_busy_duration: Counter<Microseconds> [],
421 /// The number of tasks currently scheduled in the runtime's global queue
422 global_queue_depth: Gauge<Count> [],
423 }
424 unstable {
425 /// The average duration of a single invocation of poll on a task
426 mean_poll_duration: Gauge<Microseconds> [],
427 /// The average duration of a single invocation of poll on a task on the worker with the lowest value
428 mean_poll_duration_worker_min: Gauge<Microseconds> [],
429 /// The average duration of a single invocation of poll on a task on the worker with the highest value
430 mean_poll_duration_worker_max: Gauge<Microseconds> [],
431 /// A histogram of task polls since the previous probe grouped by poll times
432 poll_time_histogram: Histogram<Microseconds> [],
433 /// The number of times worker threads unparked but performed no work before parking again
434 total_noop_count: Counter<Count> [],
435 /// The maximum number of times any worker thread unparked but performed no work before parking again
436 max_noop_count: Counter<Count> [],
437 /// The minimum number of times any worker thread unparked but performed no work before parking again
438 min_noop_count: Counter<Count> [],
439 /// The number of tasks worker threads stole from another worker thread
440 total_steal_count: Counter<Count> [],
441 /// The maximum number of tasks any worker thread stole from another worker thread.
442 max_steal_count: Counter<Count> [],
443 /// The minimum number of tasks any worker thread stole from another worker thread
444 min_steal_count: Counter<Count> [],
445 /// The number of times worker threads stole tasks from another worker thread
446 total_steal_operations: Counter<Count> [],
447 /// The maximum number of times any worker thread stole tasks from another worker thread
448 max_steal_operations: Counter<Count> [],
449 /// The minimum number of times any worker thread stole tasks from another worker thread
450 min_steal_operations: Counter<Count> [],
451 /// The number of tasks scheduled from **outside** of the runtime
452 num_remote_schedules: Counter<Count> [],
453 /// The number of tasks scheduled from worker threads
454 total_local_schedule_count: Counter<Count> [],
455 /// The maximum number of tasks scheduled from any one worker thread
456 max_local_schedule_count: Counter<Count> [],
457 /// The minimum number of tasks scheduled from any one worker thread
458 min_local_schedule_count: Counter<Count> [],
459 /// The number of times worker threads saturated their local queues
460 total_overflow_count: Counter<Count> [],
461 /// The maximum number of times any one worker saturated its local queue
462 max_overflow_count: Counter<Count> [],
463 /// The minimum number of times any one worker saturated its local queue
464 min_overflow_count: Counter<Count> [],
465 /// The number of tasks that have been polled across all worker threads
466 total_polls_count: Counter<Count> [],
467 /// The maximum number of tasks that have been polled in any worker thread
468 max_polls_count: Counter<Count> [],
469 /// The minimum number of tasks that have been polled in any worker thread
470 min_polls_count: Counter<Count> [],
471 /// The total number of tasks currently scheduled in workers' local queues
472 total_local_queue_depth: Gauge<Count> [],
473 /// The maximum number of tasks currently scheduled any worker's local queue
474 max_local_queue_depth: Gauge<Count> [],
475 /// The minimum number of tasks currently scheduled any worker's local queue
476 min_local_queue_depth: Gauge<Count> [],
477 /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
478 blocking_queue_depth: Gauge<Count> [],
479 /// The current number of alive tasks in the runtime.
480 live_tasks_count: Gauge<Count> [],
481 /// The number of additional threads spawned by the runtime.
482 blocking_threads_count: Gauge<Count> [],
483 /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
484 idle_blocking_threads_count: Gauge<Count> [],
485 /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets
486 budget_forced_yield_count: Counter<Count> [],
487 /// Returns the number of ready events processed by the runtime’s I/O driver
488 io_driver_ready_count: Counter<Count> [],
489 }
490 }
491}
492trait MyMetricOp {
493 fn op(self, tokio: &tokio::runtime::RuntimeMetrics);
494}
495
496impl MyMetricOp for (&metrics::Counter, Duration) {
497 fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
498 self.0
499 .increment(self.1.as_micros().try_into().unwrap_or(u64::MAX));
500 }
501}
502
503impl MyMetricOp for (&metrics::Counter, u64) {
504 fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
505 self.0.increment(self.1);
506 }
507}
508
509impl MyMetricOp for (&metrics::Gauge, Duration) {
510 fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
511 self.0.set(self.1.as_micros() as f64);
512 }
513}
514
515impl MyMetricOp for (&metrics::Gauge, u64) {
516 fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
517 self.0.set(self.1 as f64);
518 }
519}
520
521impl MyMetricOp for (&metrics::Gauge, usize) {
522 fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) {
523 self.0.set(self.1 as f64);
524 }
525}
526
527#[cfg(tokio_unstable)]
528impl MyMetricOp for (&metrics::Histogram, Vec<u64>) {
529 fn op(self, tokio: &tokio::runtime::RuntimeMetrics) {
530 for (i, bucket) in self.1.iter().enumerate() {
531 let range = tokio.poll_time_histogram_bucket_range(i);
532 if *bucket > 0 {
533 // emit using range.start to avoid very large numbers for open bucket
534 // FIXME: do we want to do something else here?
535 self.0
536 .record_many(range.start.as_micros() as f64, *bucket as usize);
537 }
538 }
539 }
540}
541
542impl fmt::Debug for RuntimeMetricsReporter {
543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544 f.debug_struct("RuntimeMetricsReporter")
545 .field("interval", &self.interval)
546 // skip intervals field
547 .finish()
548 }
549}
550
551impl RuntimeMetricsReporter {
552 /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter.
553 pub fn run_once(&mut self) {
554 let metrics = self
555 .intervals
556 .next()
557 .expect("RuntimeIntervals::next never returns None");
558 self.emitter.emit(metrics, &self.intervals.runtime);
559 }
560
561 /// Collect and publish metrics periodically to the configured [metrics_rs](metrics) reporter.
562 ///
563 /// You probably want to run this within its own task (using [`tokio::task::spawn`])
564 pub async fn run(mut self) {
565 loop {
566 self.run_once();
567 tokio::time::sleep(self.interval).await;
568 }
569 }
570}