1use lazy_static::lazy_static;
2use parking_lot::RwLock;
3use std::error::Error;
4use std::fmt;
5
6use prometheus::{
7 core::Desc,
8 core::{Collector, Opts},
9 proto, CounterVec, IntCounterVec, IntGaugeVec,
10};
11use std::collections::HashMap;
12
13use tokio_metrics::TaskMetrics as TaskMetricsData;
14use tokio_metrics::TaskMonitor;
15
16const TASK_LABEL: &str = "task";
17#[allow(unused)]
18const METRICS_COUNT: usize = 19;
19
20#[derive(Debug)]
21pub struct LabelAlreadyExists {
22 label: String,
23}
24
25impl LabelAlreadyExists {
26 fn new(label: String) -> Self {
27 Self { label }
28 }
29}
30
31impl fmt::Display for LabelAlreadyExists {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(f, "label '{}' already exists", self.label)
34 }
35}
36
37impl Error for LabelAlreadyExists {}
38
39#[derive(Debug)]
41struct TaskMetrics {
42 instrumented_count: IntGaugeVec,
43 dropped_count: IntGaugeVec,
44 first_poll_count: IntGaugeVec,
45 total_first_poll_delay: CounterVec,
46 total_idled_count: IntCounterVec,
47 total_idle_duration: CounterVec,
48 total_scheduled_count: IntCounterVec,
49 total_scheduled_duration: CounterVec,
50 total_poll_count: IntCounterVec,
51 total_poll_duration: CounterVec,
52 total_first_poll_count: IntCounterVec,
53 total_fast_poll_count: IntCounterVec,
54 total_fast_poll_duration: CounterVec,
55 total_slow_poll_count: IntCounterVec,
56 total_slow_poll_duration: CounterVec,
57 total_short_delay_count: IntCounterVec,
58 total_long_delay_count: IntCounterVec,
59 total_short_delay_duration: CounterVec,
60 total_long_delay_duration: CounterVec,
61}
62
63impl TaskMetrics {
64 fn new<S: Into<String>>(namespace: S) -> Self {
65 let namespace = namespace.into();
66 let instrumented_count = IntGaugeVec::new(
67 Opts::new(
68 "tokio_task_instrumented_count",
69 r#"The number of tasks instrumented."#,
70 )
71 .namespace(namespace.clone()),
72 &[TASK_LABEL],
73 )
74 .unwrap();
75
76 let dropped_count = IntGaugeVec::new(
77 Opts::new(
78 "tokio_task_dropped_count",
79 r#"The number of tasks dropped."#,
80 )
81 .namespace(namespace.clone()),
82 &[TASK_LABEL],
83 )
84 .unwrap();
85
86 let first_poll_count = IntGaugeVec::new(
87 Opts::new(
88 "tokio_task_first_poll_count",
89 r#"The number of tasks polled for the first time."#,
90 )
91 .namespace(namespace.clone()),
92 &[TASK_LABEL],
93 )
94 .unwrap();
95
96 let total_first_poll_delay = CounterVec::new(
97 Opts::new(
98 "tokio_task_total_first_poll_delay",
99 r#"The total duration elapsed between the instant tasks are instrumented, and the instant they are first polled."#,
100 )
101 .namespace(namespace.clone()),
102 &[TASK_LABEL]
103 )
104 .unwrap();
105
106 let total_idled_count = IntCounterVec::new(
107 Opts::new(
108 "tokio_task_total_idled_count",
109 r#"The total number of times that tasks idled, waiting to be awoken."#,
110 )
111 .namespace(namespace.clone()),
112 &[TASK_LABEL],
113 )
114 .unwrap();
115
116 let total_idle_duration = CounterVec::new(
117 Opts::new(
118 "tokio_task_total_idle_duration",
119 r#"The total duration that tasks idled."#,
120 )
121 .namespace(namespace.clone()),
122 &[TASK_LABEL],
123 )
124 .unwrap();
125
126 let total_scheduled_count = IntCounterVec::new(
127 Opts::new(
128 "tokio_task_total_scheduled_count",
129 r#"The total number of times that tasks were awoken (and then, presumably, scheduled for execution)."#,
130 )
131 .namespace(namespace.clone()),
132 &[TASK_LABEL]
133 )
134 .unwrap();
135
136 let total_scheduled_duration = CounterVec::new(
137 Opts::new(
138 "tokio_task_total_scheduled_duration",
139 r#"The total duration that tasks spent waiting to be polled after awakening."#,
140 )
141 .namespace(namespace.clone()),
142 &[TASK_LABEL],
143 )
144 .unwrap();
145
146 let total_poll_count = IntCounterVec::new(
147 Opts::new(
148 "tokio_task_total_poll_count",
149 r#"The total number of times that tasks were polled."#,
150 )
151 .namespace(namespace.clone()),
152 &[TASK_LABEL],
153 )
154 .unwrap();
155
156 let total_poll_duration = CounterVec::new(
157 Opts::new(
158 "tokio_task_total_poll_duration",
159 r#"The total duration elapsed during polls."#,
160 )
161 .namespace(namespace.clone()),
162 &[TASK_LABEL],
163 )
164 .unwrap();
165
166 let total_fast_poll_count = IntCounterVec::new(
167 Opts::new(
168 "tokio_task_total_fast_poll_count",
169 r#"The amount of time worker threads were busy."#,
170 )
171 .namespace(namespace.clone()),
172 &[TASK_LABEL],
173 )
174 .unwrap();
175
176 let total_first_poll_count = IntCounterVec::new(
177 Opts::new(
178 "tokio_task_total_first_poll_count",
179 r#"The total number of times that tasks were polled for the first time."#,
180 )
181 .namespace(namespace.clone()),
182 &[TASK_LABEL],
183 )
184 .unwrap();
185
186 let total_fast_poll_duration = CounterVec::new(
187 Opts::new(
188 "tokio_task_total_fast_poll_duration",
189 r#"The total duration of fast polls."#,
190 )
191 .namespace(namespace.clone()),
192 &[TASK_LABEL],
193 )
194 .unwrap();
195
196 let total_slow_poll_count = IntCounterVec::new(
197 Opts::new(
198 "tokio_task_total_slow_poll_count",
199 r#"The total number of times that polling tasks completed slowly."#,
200 )
201 .namespace(namespace.clone()),
202 &[TASK_LABEL],
203 )
204 .unwrap();
205
206 let total_slow_poll_duration = CounterVec::new(
207 Opts::new(
208 "tokio_task_total_slow_poll_duration",
209 r#"The total duration of slow polls."#,
210 )
211 .namespace(namespace.clone()),
212 &[TASK_LABEL],
213 )
214 .unwrap();
215
216 let total_short_delay_count = IntCounterVec::new(
217 Opts::new(
218 "tokio_task_total_short_delay_count",
219 r#"The total count of tasks with short scheduling delays."#,
220 )
221 .namespace(namespace.clone()),
222 &[TASK_LABEL],
223 )
224 .unwrap();
225
226 let total_long_delay_count = IntCounterVec::new(
227 Opts::new(
228 "tokio_task_total_long_delay_count",
229 r#"The total count of tasks with long scheduling delays."#,
230 )
231 .namespace(namespace.clone()),
232 &[TASK_LABEL],
233 )
234 .unwrap();
235
236 let total_short_delay_duration = CounterVec::new(
237 Opts::new(
238 "tokio_task_total_short_delay_duration",
239 r#"The total duration of tasks with short scheduling delays."#,
240 )
241 .namespace(namespace.clone()),
242 &[TASK_LABEL],
243 )
244 .unwrap();
245
246 let total_long_delay_duration = CounterVec::new(
247 Opts::new(
248 "tokio_task_total_long_delay_duration",
249 r#"The total number of times that a task had a long scheduling duration."#,
250 )
251 .namespace(namespace.clone()),
252 &[TASK_LABEL],
253 )
254 .unwrap();
255
256 Self {
257 instrumented_count,
258 dropped_count,
259 first_poll_count,
260 total_first_poll_delay,
261 total_idled_count,
262 total_idle_duration,
263 total_scheduled_count,
264 total_scheduled_duration,
265 total_poll_count,
266 total_poll_duration,
267 total_first_poll_count,
268 total_fast_poll_count,
269 total_fast_poll_duration,
270 total_slow_poll_count,
271 total_slow_poll_duration,
272 total_short_delay_count,
273 total_long_delay_count,
274 total_short_delay_duration,
275 total_long_delay_duration,
276 }
277 }
278
279 fn update(&self, label: &str, data: TaskMetricsData) {
280 macro_rules! update_counter {
281 ( $field:ident, "int" ) => {{
282 let new = data.$field as u64;
283 self.$field.with_label_values(&[label]).inc_by(new);
284 }};
285 ( $field:ident, $metrics_field:ident, "int" ) => {{
286 let new = data.$metrics_field as u64;
287 self.$field.with_label_values(&[label]).inc_by(new);
288 }};
289 ( $field:ident, "duration" ) => {{
290 let new = data.$field.as_secs_f64();
291 self.$field.with_label_values(&[label]).inc_by(new);
292 }};
293 }
294
295 macro_rules! update_gauge {
296 ( $field:ident) => {
297 self.$field
298 .with_label_values(&[label])
299 .set(data.$field as i64);
300 };
301 }
302
303 update_gauge!(instrumented_count);
304 update_gauge!(dropped_count);
305 update_gauge!(first_poll_count);
306
307 update_counter!(total_first_poll_delay, "duration");
308 update_counter!(total_idled_count, "int");
309 update_counter!(total_idle_duration, "duration");
310 update_counter!(total_scheduled_count, "int");
311 update_counter!(total_scheduled_duration, "duration");
312 update_counter!(total_poll_count, "int");
313 update_counter!(total_poll_duration, "duration");
314 update_counter!(total_first_poll_count, first_poll_count, "int");
315 update_counter!(total_fast_poll_count, "int");
316 update_counter!(total_fast_poll_duration, "duration");
317 update_counter!(total_slow_poll_count, "int");
318 update_counter!(total_slow_poll_duration, "duration");
319 update_counter!(total_short_delay_count, "int");
320 update_counter!(total_long_delay_count, "int");
321 update_counter!(total_short_delay_duration, "duration");
322 update_counter!(total_long_delay_duration, "duration");
323 }
324
325 fn to_desc(&self) -> Vec<&Desc> {
326 let mut desc = vec![];
327 desc.extend(self.instrumented_count.desc());
328 desc.extend(self.dropped_count.desc());
329 desc.extend(self.first_poll_count.desc());
330 desc.extend(self.total_first_poll_delay.desc());
331 desc.extend(self.total_idled_count.desc());
332 desc.extend(self.total_idle_duration.desc());
333 desc.extend(self.total_scheduled_count.desc());
334 desc.extend(self.total_scheduled_duration.desc());
335 desc.extend(self.total_poll_count.desc());
336 desc.extend(self.total_poll_duration.desc());
337 desc.extend(self.total_first_poll_count.desc());
338 desc.extend(self.total_fast_poll_count.desc());
339 desc.extend(self.total_fast_poll_duration.desc());
340 desc.extend(self.total_slow_poll_count.desc());
341 desc.extend(self.total_slow_poll_duration.desc());
342 desc.extend(self.total_short_delay_count.desc());
343 desc.extend(self.total_long_delay_count.desc());
344 desc.extend(self.total_short_delay_duration.desc());
345 desc.extend(self.total_long_delay_duration.desc());
346
347 assert_eq!(desc.len(), 19);
348 desc
349 }
350
351 fn to_metrics(&self) -> Vec<proto::MetricFamily> {
352 let mut metrics = vec![];
353 metrics.extend(self.instrumented_count.collect());
354 metrics.extend(self.dropped_count.collect());
355 metrics.extend(self.first_poll_count.collect());
356 metrics.extend(self.total_first_poll_delay.collect());
357 metrics.extend(self.total_idled_count.collect());
358 metrics.extend(self.total_idle_duration.collect());
359 metrics.extend(self.total_scheduled_count.collect());
360 metrics.extend(self.total_scheduled_duration.collect());
361 metrics.extend(self.total_poll_count.collect());
362 metrics.extend(self.total_poll_duration.collect());
363 metrics.extend(self.total_first_poll_count.collect());
364 metrics.extend(self.total_fast_poll_count.collect());
365 metrics.extend(self.total_fast_poll_duration.collect());
366 metrics.extend(self.total_slow_poll_count.collect());
367 metrics.extend(self.total_slow_poll_duration.collect());
368 metrics.extend(self.total_short_delay_count.collect());
369 metrics.extend(self.total_long_delay_count.collect());
370 metrics.extend(self.total_short_delay_duration.collect());
371 metrics.extend(self.total_long_delay_duration.collect());
372
373 assert_eq!(metrics.len(), 19);
374 metrics
375 }
376}
377
378pub struct TaskCollector {
380 metrics: TaskMetrics,
381 producer:
382 RwLock<HashMap<String, Box<dyn Iterator<Item = tokio_metrics::TaskMetrics> + Send + Sync>>>,
383}
384
385impl TaskCollector {
386 pub fn new<S: Into<String>>(namespace: S) -> Self {
388 let producer = RwLock::new(HashMap::new());
389 let metrics = TaskMetrics::new(namespace);
390
391 Self { metrics, producer }
392 }
393
394 pub fn add(&self, label: &str, monitor: TaskMonitor) -> Result<(), LabelAlreadyExists> {
397 if self.producer.read().contains_key(label) {
398 return Err(LabelAlreadyExists::new(label.into()));
399 }
400 self.producer
401 .write()
402 .insert(label.to_string(), Box::new(monitor.intervals()));
403
404 Ok(())
405 }
406
407 pub fn remove(&mut self, label: &str) {
409 self.producer.write().remove(label);
410 }
411
412 fn get_metrics_data_by_label(&self, label: &str) -> TaskMetricsData {
413 let data = self.producer.write().get_mut(label).unwrap().next();
414 data.unwrap()
415 }
416}
417
418impl Collector for TaskCollector {
419 fn desc(&self) -> Vec<&Desc> {
420 self.metrics.to_desc()
421 }
422
423 fn collect(&self) -> Vec<proto::MetricFamily> {
424 let mut labels = vec![];
425
426 {
427 let producer = self.producer.read();
428
429 for (label, _) in producer.iter() {
430 labels.push(label.to_string());
431 }
432 }
433
434 for label in labels {
435 let data = self.get_metrics_data_by_label(&label);
436 self.metrics.update(&label, data);
437 }
438 self.metrics.to_metrics()
439 }
440}
441
442impl Collector for &TaskCollector {
443 fn desc(&self) -> Vec<&Desc> {
444 self.metrics.to_desc()
445 }
446
447 fn collect(&self) -> Vec<proto::MetricFamily> {
448 let mut labels = vec![];
449
450 {
451 let producer = self.producer.read();
452
453 for (label, _) in producer.iter() {
454 labels.push(label.to_string());
455 }
456 }
457
458 for label in labels {
459 let data = self.get_metrics_data_by_label(&label);
460 self.metrics.update(&label, data);
461 }
462 self.metrics.to_metrics()
463 }
464}
465
466lazy_static! {
467 static ref DEFAULT_COLLECTOR: TaskCollector = {
468 let collector = TaskCollector::new("");
469
470 collector
471 };
472}
473
474pub fn default_collector() -> &'static TaskCollector {
476 lazy_static::initialize(&DEFAULT_COLLECTOR);
477 &DEFAULT_COLLECTOR
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483
484 #[test]
485 fn test_task_collector_descs() {
486 let monitor = tokio_metrics::TaskMonitor::new();
487 let tc = TaskCollector::new("");
488
489 let descs = tc.desc();
490 assert_eq!(descs.len(), METRICS_COUNT);
491 assert_eq!(
492 descs[0].fq_name,
493 "tokio_task_instrumented_count".to_string()
494 );
495 assert_eq!(descs[0].help, "The number of tasks instrumented.");
496 assert_eq!(descs[0].variable_labels.len(), 1);
497 }
498
499 #[test]
500 fn test_task_collector_add() {
501 let monitor = tokio_metrics::TaskMonitor::new();
502 let tc = TaskCollector::new("");
503
504 let res = tc.add("custom", monitor.clone());
505 assert!(res.is_ok());
506
507 let res2 = tc.add("custom", monitor.clone());
508 assert!(res2.is_err());
509 assert_eq!(
510 format!("{}", res2.err().unwrap()),
511 "label 'custom' already exists".to_string()
512 );
513 }
514
515 #[tokio::test]
516 async fn test_runtime_collector_metrics() {
517 let monitor = tokio_metrics::TaskMonitor::new();
518 let tc = TaskCollector::new("");
519
520 tc.add("custom", monitor.clone()).unwrap();
521
522 monitor.instrument(tokio::spawn(async {
523 tokio::time::sleep(std::time::Duration::from_secs(2)).await
524 }));
525
526 let metrics = tc.collect();
527 assert_eq!(metrics.len(), METRICS_COUNT);
528 assert_eq!(metrics[0].name(), "tokio_task_instrumented_count");
529 assert_eq!(
530 metrics[0].help(),
531 "The number of tasks instrumented.".to_string()
532 );
533 assert_eq!(metrics[0].get_metric().len(), 1);
534 assert_eq!(metrics[0].get_metric()[0].get_gauge().value(), 1.0);
535 }
536
537 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
538 async fn test_default() {
539 let collector = default_collector();
540 assert_eq!(collector.desc().len(), METRICS_COUNT);
541 let metrics = collector.collect();
542 assert_eq!(metrics.len(), METRICS_COUNT);
543 assert_eq!(metrics[0].name(), "tokio_task_instrumented_count");
544 assert_eq!(
545 metrics[0].help(),
546 "The number of tasks instrumented.".to_string()
547 );
548 assert_eq!(metrics[0].get_metric().len(), 0);
549 }
550
551 #[tokio::test]
552 async fn test_integrated_with_prometheus() {
553 use prometheus::Encoder;
554
555 let tc = TaskCollector::new("");
556
557 let monitor = tokio_metrics::TaskMonitor::new();
558 tc.add("custom", monitor.clone()).unwrap();
559
560 prometheus::default_registry()
561 .register(Box::new(tc))
562 .unwrap();
563
564 monitor.instrument(tokio::spawn(async {
565 tokio::time::sleep(std::time::Duration::from_secs(2)).await
566 }));
567
568 let encoder = prometheus::TextEncoder::new();
569
570 let mut buffer = Vec::new();
571 encoder
572 .encode(&prometheus::default_registry().gather(), &mut buffer)
573 .expect("Failed to encode");
574 String::from_utf8(buffer.clone()).expect("Failed to convert to string.");
575 }
576
577 #[tokio::test]
578 async fn test_task_first_poll_count() {
579 let monitor = tokio_metrics::TaskMonitor::new();
580 let tc = TaskCollector::new("");
581
582 tc.add("custom", monitor.clone()).unwrap();
583
584 let mut interval = monitor.intervals();
585 let mut next_interval = || interval.next().unwrap();
586
587 assert_eq!(next_interval().first_poll_count, 0);
589
590 let task = monitor.instrument(async {});
591
592 assert_eq!(next_interval().first_poll_count, 0);
594
595 task.await;
597
598 assert_eq!(next_interval().first_poll_count, 1);
600
601 let metrics = tc.collect();
602 let gauge_index = metrics
603 .iter()
604 .position(|m| m.name() == "tokio_task_first_poll_count")
605 .unwrap();
606
607 let counter_index = metrics
608 .iter()
609 .position(|m| m.name() == "tokio_task_total_first_poll_count")
610 .unwrap();
611
612 assert_eq!(
613 metrics[gauge_index].get_metric()[0].get_gauge().value(),
614 1.0
615 );
616 assert_eq!(
617 metrics[counter_index].get_metric()[0].get_counter().value(),
618 1.0
619 );
620
621 let task2 = monitor.instrument(async {});
622 task2.await;
623
624 let metrics = tc.collect();
625 assert_eq!(
626 metrics[gauge_index].get_metric()[0].get_gauge().value(),
627 1.0
628 );
629 assert_eq!(
631 metrics[counter_index].get_metric()[0].get_counter().value(),
632 2.0
633 );
634 }
635
636 #[test]
637 fn test_send() {
638 fn test<C: Send>() {}
639 test::<DEFAULT_COLLECTOR>();
640 }
641
642 #[test]
643 fn test_sync() {
644 fn test<C: Sync>() {}
645 test::<DEFAULT_COLLECTOR>();
646 }
647}