tokio_metrics/task/metrics_rs_integration.rs
1use std::{fmt, time::Duration};
2
3use super::{TaskIntervals, TaskMetrics, TaskMonitor};
4use crate::metrics_rs::{metric_refs, DEFAULT_METRIC_SAMPLING_INTERVAL};
5
6/// A builder for the [`TaskMetricsReporter`] that wraps the [`TaskMonitor`], periodically
7/// reporting [`TaskMetrics`] to any configured [metrics-rs] recorder.
8///
9/// ### Published Metrics
10///
11/// The published metrics are the fields of [`TaskMetrics`], but with the
12/// `tokio_` prefix added, for example, `tokio_instrumented_count`. If you have multiple
13/// [`TaskMonitor`]s then it is strongly recommended to give each [`TaskMonitor`] a unique metric
14/// name or dimension value.
15///
16/// ### Usage
17///
18/// To upload metrics via [metrics-rs], you need to set up a reporter, which
19/// is actually what exports the metrics outside of the program. You must set
20/// up the reporter before you call [`describe_and_run`].
21///
22/// You can find exporters within the [metrics-rs] docs. One such reporter
23/// is the [metrics_exporter_prometheus] reporter, which makes metrics visible
24/// through Prometheus.
25///
26/// You can use it for example to export Prometheus metrics by listening on a local Unix socket
27/// called `prometheus.sock`, which you can access for debugging by
28/// `curl --unix-socket prometheus.sock localhost`, as follows:
29///
30/// ```
31/// use std::time::Duration;
32///
33/// use metrics::Key;
34///
35/// #[tokio::main]
36/// async fn main() {
37/// metrics_exporter_prometheus::PrometheusBuilder::new()
38/// .with_http_uds_listener("prometheus.sock")
39/// .install()
40/// .unwrap();
41/// let monitor = tokio_metrics::TaskMonitor::new();
42/// tokio::task::spawn(
43/// tokio_metrics::TaskMetricsReporterBuilder::new(|name| {
44/// let name = name.replacen("tokio_", "my_task_", 1);
45/// Key::from_parts(name, &[("application", "my_app")])
46/// })
47/// // the default metric sampling interval is 30 seconds, which is
48/// // too long for quick tests, so have it be 1 second.
49/// .with_interval(std::time::Duration::from_secs(1))
50/// .describe_and_run(monitor.clone()),
51/// );
52/// // Run some code
53/// tokio::task::spawn(monitor.instrument(async move {
54/// for _ in 0..1000 {
55/// tokio::time::sleep(Duration::from_millis(10)).await;
56/// }
57/// }))
58/// .await
59/// .unwrap();
60/// }
61/// ```
62///
63/// [`describe_and_run`]: TaskMetricsReporterBuilder::describe_and_run
64/// [metrics-rs]: metrics
65/// [metrics_exporter_prometheus]: https://docs.rs/metrics_exporter_prometheus
66pub struct TaskMetricsReporterBuilder {
67 interval: Duration,
68 metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
69}
70
71impl fmt::Debug for TaskMetricsReporterBuilder {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("TaskMetricsReporterBuilder")
74 .field("interval", &self.interval)
75 // skip metrics_transformer field
76 .finish()
77 }
78}
79
80impl TaskMetricsReporterBuilder {
81 /// Creates a new [`TaskMetricsReporterBuilder`] with a custom "metrics transformer". The custom
82 /// transformer is used during `build` to transform the metric names into metric keys, for
83 /// example to add dimensions. The string metric names used by this reporter all start with
84 /// `tokio_`. The default transformer is just [`metrics::Key::from_static_name`]
85 ///
86 /// For example, to attach a dimension named "application" with value "my_app", and to replace
87 /// `tokio_` with `my_task_`
88 /// ```
89 /// # use metrics::Key;
90 ///
91 /// #[tokio::main]
92 /// async fn main() {
93 /// metrics_exporter_prometheus::PrometheusBuilder::new()
94 /// .with_http_uds_listener("prometheus.sock")
95 /// .install()
96 /// .unwrap();
97 /// let monitor = tokio_metrics::TaskMonitor::new();
98 /// tokio::task::spawn(
99 /// tokio_metrics::TaskMetricsReporterBuilder::new(|name| {
100 /// let name = name.replacen("tokio_", "my_task_", 1);
101 /// Key::from_parts(name, &[("application", "my_app")])
102 /// })
103 /// .describe_and_run(monitor)
104 /// );
105 /// }
106 /// ```
107 pub fn new(transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static) -> Self {
108 TaskMetricsReporterBuilder {
109 interval: DEFAULT_METRIC_SAMPLING_INTERVAL,
110 metrics_transformer: Box::new(transformer),
111 }
112 }
113
114 /// Set the metric sampling interval, default: 30 seconds.
115 ///
116 /// Note that this is the interval on which metrics are *sampled* from
117 /// the Tokio task and then set on the [metrics-rs] reporter. Uploading the
118 /// metrics upstream is controlled by the reporter set up in the
119 /// application, and is normally controlled by a different period.
120 ///
121 /// For example, if metrics are exported via Prometheus, that
122 /// normally operates at a pull-based fashion, and the actual collection
123 /// period is controlled by the Prometheus server, which periodically polls the
124 /// application's Prometheus exporter to get the latest value of the metrics.
125 ///
126 /// [metrics-rs]: metrics
127 pub fn with_interval(mut self, interval: Duration) -> Self {
128 self.interval = interval;
129 self
130 }
131
132 /// Build the [`TaskMetricsReporter`] with a specific [`TaskMonitor`]. This function will capture
133 /// the [`Counter`]s and [`Gauge`]s from the current [metrics-rs] reporter,
134 /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`]
135 /// with it.
136 ///
137 /// [`Counter`]: metrics::Counter
138 /// [`Gauge`]: metrics::Counter
139 /// [`Histogram`]: metrics::Counter
140 /// [metrics-rs]: metrics
141 /// [`with_local_recorder`]: metrics::with_local_recorder
142 /// [`describe`]: Self::describe
143 #[must_use = "reporter does nothing unless run"]
144 pub fn build_with_monitor(mut self, monitor: TaskMonitor) -> TaskMetricsReporter {
145 TaskMetricsReporter {
146 interval: self.interval,
147 intervals: monitor.intervals(),
148 emitter: TaskMetricRefs::capture(&mut self.metrics_transformer),
149 }
150 }
151
152 /// Call [`describe_counter`] etc. to describe the emitted metrics.
153 ///
154 /// Describing metrics makes the reporter attach descriptions and units to them,
155 /// which makes them easier to use. However, some reporters don't support
156 /// describing the same metric name more than once, so it is generally a good
157 /// idea to only call this function once per metric reporter.
158 ///
159 /// [`describe_counter`]: metrics::describe_counter
160 /// [metrics-rs]: metrics
161 pub fn describe(mut self) -> Self {
162 TaskMetricRefs::describe(&mut self.metrics_transformer);
163 self
164 }
165
166 /// Runs the reporter (within the returned future), [describing] the metrics beforehand.
167 ///
168 /// Describing metrics makes the reporter attach descriptions and units to them,
169 /// which makes them easier to use. However, some reporters don't support
170 /// describing the same metric name more than once. If you are emitting multiple
171 /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each
172 /// task metrics reporter.
173 ///
174 /// ### Working with a custom reporter
175 ///
176 /// If you want to set a local metrics reporter, you shouldn't be calling this method,
177 /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
178 /// call `run` (see the docs on [`build_with_monitor`]).
179 ///
180 /// [describing]: Self::describe
181 /// [`describe`]: Self::describe
182 /// [`build_with_monitor`]: Self::build_with_monitor.
183 /// [`run`]: TaskMetricsReporter::run
184 /// [`with_local_recorder`]: metrics::with_local_recorder
185 pub async fn describe_and_run(self, monitor: TaskMonitor) {
186 self.describe().build_with_monitor(monitor).run().await;
187 }
188
189 /// Runs the reporter (within the returned future), not describing the metrics beforehand.
190 ///
191 /// ### Working with a custom reporter
192 ///
193 /// If you want to set a local metrics reporter, you shouldn't be calling this method,
194 /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then
195 /// call [`run`] (see the docs on [`build_with_monitor`]).
196 ///
197 /// [`build_with_monitor`]: Self::build_with_monitor
198 /// [`run`]: TaskMetricsReporter::run
199 /// [`with_local_recorder`]: metrics::with_local_recorder
200 pub async fn run_without_describing(self, monitor: TaskMonitor) {
201 self.build_with_monitor(monitor).run().await;
202 }
203}
204
205/// Collects metrics from a Tokio task and uploads them to [metrics_rs](metrics).
206pub struct TaskMetricsReporter {
207 interval: Duration,
208 intervals: TaskIntervals,
209 emitter: TaskMetricRefs,
210}
211
212metric_refs! {
213 [TaskMetricRefs] [elapsed] [TaskMetrics] [()] {
214 stable {
215 /// The number of tasks instrumented.
216 instrumented_count: Gauge<Count> [],
217 /// The number of tasks dropped.
218 dropped_count: Gauge<Count> [],
219 /// The number of tasks polled for the first time.
220 first_poll_count: Gauge<Count> [],
221 /// The total duration elapsed between the instant tasks are instrumented, and the instant they are first polled.
222 total_first_poll_delay: Counter<Microseconds> [],
223 /// The total number of times that tasks idled, waiting to be awoken.
224 total_idled_count: Gauge<Count> [],
225 /// The total duration that tasks idled.
226 total_idle_duration: Counter<Microseconds> [],
227 /// The maximum idle duration that a task took.
228 max_idle_duration: Counter<Microseconds> [],
229 /// The total number of times that tasks were awoken (and then, presumably, scheduled for execution).
230 total_scheduled_count: Gauge<Count> [],
231 /// The total duration that tasks spent waiting to be polled after awakening.
232 total_scheduled_duration: Counter<Microseconds> [],
233 /// The total number of times that tasks were polled.
234 total_poll_count: Gauge<Count> [],
235 /// The total duration elapsed during polls.
236 total_poll_duration: Counter<Microseconds> [],
237 /// The total number of times that polling tasks completed swiftly.
238 total_fast_poll_count: Gauge<Count> [],
239 /// The total duration of fast polls.
240 total_fast_poll_duration: Counter<Microseconds> [],
241 /// The total number of times that polling tasks completed slowly.
242 total_slow_poll_count: Gauge<Count> [],
243 /// The total duration of slow polls.
244 total_slow_poll_duration: Counter<Microseconds> [],
245 /// The total count of tasks with short scheduling delays.
246 total_short_delay_count: Gauge<Count> [],
247 /// The total count of tasks with long scheduling delays.
248 total_long_delay_count: Gauge<Count> [],
249 /// The total duration of tasks with short scheduling delays.
250 total_short_delay_duration: Counter<Microseconds> [],
251 /// The total number of times that a task had a long scheduling duration.
252 total_long_delay_duration: Counter<Microseconds> [],
253 }
254 unstable {}
255 }
256}
257
258impl fmt::Debug for TaskMetricsReporter {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 f.debug_struct("TaskMetricsReporter")
261 .field("interval", &self.interval)
262 // skip intervals field
263 .finish()
264 }
265}
266
267impl TaskMetricsReporter {
268 /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter.
269 pub fn run_once(&mut self) {
270 let metrics = self
271 .intervals
272 .next()
273 .expect("TaskIntervals::next never returns None");
274 self.emitter.emit(metrics, ());
275 }
276
277 /// Collect and publish metrics periodically to the configured [metrics_rs](metrics) reporter.
278 ///
279 /// You probably want to run this within its own task (using [`tokio::task::spawn`])
280 pub async fn run(mut self) {
281 loop {
282 self.run_once();
283 tokio::time::sleep(self.interval).await;
284 }
285 }
286}