tokio_metrics/task/
metrics_rs_integration.rs

1use std::{fmt, time::Duration};
2
3use super::{TaskIntervals, TaskMetrics, TaskMonitor};
4use crate::metrics_rs::{metric_refs, DEFAULT_METRIC_SAMPLING_INTERVAL};
5
6/// A builder for the [`TaskMetricsReporter`] that wraps the [`TaskMonitor`], periodically
7/// reporting [`TaskMetrics`] to any configured [metrics-rs] recorder.
8///
9/// ### Published Metrics
10///
11/// The published metrics are the fields of [`TaskMetrics`], but with the
12/// `tokio_` prefix added, for example, `tokio_instrumented_count`. If you have multiple
13/// [`TaskMonitor`]s then it is strongly recommended to give each [`TaskMonitor`] a unique metric
14/// name or dimension value.
15///
16/// ### Usage
17///
18/// To upload metrics via [metrics-rs], you need to set up a reporter, which
19/// is actually what exports the metrics outside of the program. You must set
20/// up the reporter before you call [`describe_and_run`].
21///
22/// You can find exporters within the [metrics-rs] docs. One such reporter
23/// is the [metrics_exporter_prometheus] reporter, which makes metrics visible
24/// through Prometheus.
25///
26/// You can use it for example to export Prometheus metrics by listening on a local Unix socket
27/// called `prometheus.sock`, which you can access for debugging by
28/// `curl --unix-socket prometheus.sock localhost`, as follows:
29///
30/// ```
31/// use std::time::Duration;
32///
33/// use metrics::Key;
34///
35/// #[tokio::main]
36/// async fn main() {
37///     metrics_exporter_prometheus::PrometheusBuilder::new()
38///         .with_http_uds_listener("prometheus.sock")
39///         .install()
40///         .unwrap();
41///     let monitor = tokio_metrics::TaskMonitor::new();
42///     tokio::task::spawn(
43///         tokio_metrics::TaskMetricsReporterBuilder::new(|name| {
44///             let name = name.replacen("tokio_", "my_task_", 1);
45///             Key::from_parts(name, &[("application", "my_app")])
46///         })
47///         // the default metric sampling interval is 30 seconds, which is
48///         // too long for quick tests, so have it be 1 second.
49///         .with_interval(std::time::Duration::from_secs(1))
50///         .describe_and_run(monitor.clone()),
51///     );
52///     // Run some code
53///     tokio::task::spawn(monitor.instrument(async move {
54///         for _ in 0..1000 {
55///             tokio::time::sleep(Duration::from_millis(10)).await;
56///         }
57///     }))
58///     .await
59///     .unwrap();
60/// }
61/// ```
62///
63/// [`describe_and_run`]: TaskMetricsReporterBuilder::describe_and_run
64/// [metrics-rs]: metrics
65/// [metrics_exporter_prometheus]: https://docs.rs/metrics_exporter_prometheus
66pub struct TaskMetricsReporterBuilder {
67    interval: Duration,
68    metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
69}
70
71impl fmt::Debug for TaskMetricsReporterBuilder {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("TaskMetricsReporterBuilder")
74            .field("interval", &self.interval)
75            // skip metrics_transformer field
76            .finish()
77    }
78}
79
80impl TaskMetricsReporterBuilder {
81    /// Creates a new [`TaskMetricsReporterBuilder`] with a custom "metrics transformer". The custom
82    /// transformer is used during `build` to transform the metric names into metric keys, for
83    /// example to add dimensions. The string metric names used by this reporter all start with
84    /// `tokio_`. The default transformer is just [`metrics::Key::from_static_name`]
85    ///
86    /// For example, to attach a dimension named "application" with value "my_app", and to replace
87    /// `tokio_` with `my_task_`
88    /// ```
89    /// # use metrics::Key;
90    ///
91    /// #[tokio::main]
92    /// async fn main() {
93    ///     metrics_exporter_prometheus::PrometheusBuilder::new()
94    ///         .with_http_uds_listener("prometheus.sock")
95    ///         .install()
96    ///         .unwrap();
97    ///     let monitor = tokio_metrics::TaskMonitor::new();
98    ///     tokio::task::spawn(
99    ///         tokio_metrics::TaskMetricsReporterBuilder::new(|name| {
100    ///             let name = name.replacen("tokio_", "my_task_", 1);
101    ///             Key::from_parts(name, &[("application", "my_app")])
102    ///         })
103    ///         .describe_and_run(monitor)
104    ///     );
105    /// }
106    /// ```
107    pub fn new(transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static) -> Self {
108        TaskMetricsReporterBuilder {
109            interval: DEFAULT_METRIC_SAMPLING_INTERVAL,
110            metrics_transformer: Box::new(transformer),
111        }
112    }
113
114    /// Set the metric sampling interval, default: 30 seconds.
115    ///
116    /// Note that this is the interval on which metrics are *sampled* from
117    /// the Tokio task and then set on the [metrics-rs] reporter. Uploading the
118    /// metrics upstream is controlled by the reporter set up in the
119    /// application, and is normally controlled by a different period.
120    ///
121    /// For example, if metrics are exported via Prometheus, that
122    /// normally operates at a pull-based fashion, and the actual collection
123    /// period is controlled by the Prometheus server, which periodically polls the
124    /// application's Prometheus exporter to get the latest value of the metrics.
125    ///
126    /// [metrics-rs]: metrics
127    pub fn with_interval(mut self, interval: Duration) -> Self {
128        self.interval = interval;
129        self
130    }
131
132    /// Build the [`TaskMetricsReporter`] with a specific [`TaskMonitor`]. This function will capture
133    /// the [`Counter`]s and [`Gauge`]s from the current [metrics-rs] reporter,
134    /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`]
135    /// with it.
136    ///
137    /// [`Counter`]: metrics::Counter
138    /// [`Gauge`]: metrics::Counter
139    /// [`Histogram`]: metrics::Counter
140    /// [metrics-rs]: metrics
141    /// [`with_local_recorder`]: metrics::with_local_recorder
142    /// [`describe`]: Self::describe
143    #[must_use = "reporter does nothing unless run"]
144    pub fn build_with_monitor(mut self, monitor: TaskMonitor) -> TaskMetricsReporter {
145        TaskMetricsReporter {
146            interval: self.interval,
147            intervals: monitor.intervals(),
148            emitter: TaskMetricRefs::capture(&mut self.metrics_transformer),
149        }
150    }
151
152    /// Call [`describe_counter`] etc. to describe the emitted metrics.
153    ///
154    /// Describing metrics makes the reporter attach descriptions and units to them,
155    /// which makes them easier to use. However, some reporters don't support
156    /// describing the same metric name more than once, so it is generally a good
157    /// idea to only call this function once per metric reporter.
158    ///
159    /// [`describe_counter`]: metrics::describe_counter
160    /// [metrics-rs]: metrics
161    pub fn describe(mut self) -> Self {
162        TaskMetricRefs::describe(&mut self.metrics_transformer);
163        self
164    }
165
166    /// Runs the reporter (within the returned future), [describing] the metrics beforehand.
167    ///
168    /// Describing metrics makes the reporter attach descriptions and units to them,
169    /// which makes them easier to use. However, some reporters don't support
170    /// describing the same metric name more than once. If you are emitting multiple
171    /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each
172    /// task metrics reporter.
173    ///
174    /// ### Working with a custom reporter
175    ///
176    /// If you want to set a local metrics reporter, you shouldn't be calling this method,
177    /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
178    /// call `run` (see the docs on [`build_with_monitor`]).
179    ///
180    /// [describing]: Self::describe
181    /// [`describe`]: Self::describe
182    /// [`build_with_monitor`]: Self::build_with_monitor.
183    /// [`run`]: TaskMetricsReporter::run
184    /// [`with_local_recorder`]: metrics::with_local_recorder
185    pub async fn describe_and_run(self, monitor: TaskMonitor) {
186        self.describe().build_with_monitor(monitor).run().await;
187    }
188
189    /// Runs the reporter (within the returned future), not describing the metrics beforehand.
190    ///
191    /// ### Working with a custom reporter
192    ///
193    /// If you want to set a local metrics reporter, you shouldn't be calling this method,
194    /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
195    /// call [`run`] (see the docs on [`build_with_monitor`]).
196    ///
197    /// [`build_with_monitor`]: Self::build_with_monitor
198    /// [`run`]: TaskMetricsReporter::run
199    /// [`with_local_recorder`]: metrics::with_local_recorder
200    pub async fn run_without_describing(self, monitor: TaskMonitor) {
201        self.build_with_monitor(monitor).run().await;
202    }
203}
204
205/// Collects metrics from a Tokio task and uploads them to [metrics_rs](metrics).
206pub struct TaskMetricsReporter {
207    interval: Duration,
208    intervals: TaskIntervals,
209    emitter: TaskMetricRefs,
210}
211
212metric_refs! {
213    [TaskMetricRefs] [elapsed] [TaskMetrics] [()] {
214        stable {
215            /// The number of tasks instrumented.
216            instrumented_count: Gauge<Count> [],
217            /// The number of tasks dropped.
218            dropped_count: Gauge<Count> [],
219            /// The number of tasks polled for the first time.
220            first_poll_count: Gauge<Count> [],
221            /// The total duration elapsed between the instant tasks are instrumented, and the instant they are first polled.
222            total_first_poll_delay: Counter<Microseconds> [],
223            /// The total number of times that tasks idled, waiting to be awoken.
224            total_idled_count: Gauge<Count> [],
225            /// The total duration that tasks idled.
226            total_idle_duration: Counter<Microseconds> [],
227            /// The maximum idle duration that a task took.
228            max_idle_duration: Counter<Microseconds> [],
229            /// The total number of times that tasks were awoken (and then, presumably, scheduled for execution).
230            total_scheduled_count: Gauge<Count> [],
231            /// The total duration that tasks spent waiting to be polled after awakening.
232            total_scheduled_duration: Counter<Microseconds> [],
233            /// The total number of times that tasks were polled.
234            total_poll_count: Gauge<Count> [],
235            /// The total duration elapsed during polls.
236            total_poll_duration: Counter<Microseconds> [],
237            /// The total number of times that polling tasks completed swiftly.
238            total_fast_poll_count: Gauge<Count> [],
239            /// The total duration of fast polls.
240            total_fast_poll_duration: Counter<Microseconds> [],
241            /// The total number of times that polling tasks completed slowly.
242            total_slow_poll_count: Gauge<Count> [],
243            /// The total duration of slow polls.
244            total_slow_poll_duration: Counter<Microseconds> [],
245            /// The total count of tasks with short scheduling delays.
246            total_short_delay_count: Gauge<Count> [],
247            /// The total count of tasks with long scheduling delays.
248            total_long_delay_count: Gauge<Count> [],
249            /// The total duration of tasks with short scheduling delays.
250            total_short_delay_duration: Counter<Microseconds> [],
251            /// The total number of times that a task had a long scheduling duration.
252            total_long_delay_duration: Counter<Microseconds> [],
253        }
254        unstable {}
255    }
256}
257
258impl fmt::Debug for TaskMetricsReporter {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        f.debug_struct("TaskMetricsReporter")
261            .field("interval", &self.interval)
262            // skip intervals field
263            .finish()
264    }
265}
266
267impl TaskMetricsReporter {
268    /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter.
269    pub fn run_once(&mut self) {
270        let metrics = self
271            .intervals
272            .next()
273            .expect("TaskIntervals::next never returns None");
274        self.emitter.emit(metrics, ());
275    }
276
277    /// Collect and publish metrics periodically to the configured [metrics_rs](metrics) reporter.
278    ///
279    /// You probably want to run this within its own task (using [`tokio::task::spawn`])
280    pub async fn run(mut self) {
281        loop {
282            self.run_once();
283            tokio::time::sleep(self.interval).await;
284        }
285    }
286}