tokio_metrics_collector/
task.rs

1use lazy_static::lazy_static;
2use parking_lot::RwLock;
3use std::error::Error;
4use std::fmt;
5
6use prometheus::{
7    core::Desc,
8    core::{Collector, Opts},
9    proto, CounterVec, IntCounterVec, IntGaugeVec,
10};
11use std::collections::HashMap;
12
13use tokio_metrics::TaskMetrics as TaskMetricsData;
14use tokio_metrics::TaskMonitor;
15
16const TASK_LABEL: &str = "task";
17#[allow(unused)]
18const METRICS_COUNT: usize = 19;
19
20#[derive(Debug)]
21pub struct LabelAlreadyExists {
22    label: String,
23}
24
25impl LabelAlreadyExists {
26    fn new(label: String) -> Self {
27        Self { label }
28    }
29}
30
31impl fmt::Display for LabelAlreadyExists {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(f, "label '{}' already exists", self.label)
34    }
35}
36
37impl Error for LabelAlreadyExists {}
38
39// Reference: https://docs.rs/tokio-metrics/latest/tokio_metrics/struct.RuntimeMetrics.html
40#[derive(Debug)]
41struct TaskMetrics {
42    instrumented_count: IntGaugeVec,
43    dropped_count: IntGaugeVec,
44    first_poll_count: IntGaugeVec,
45    total_first_poll_delay: CounterVec,
46    total_idled_count: IntCounterVec,
47    total_idle_duration: CounterVec,
48    total_scheduled_count: IntCounterVec,
49    total_scheduled_duration: CounterVec,
50    total_poll_count: IntCounterVec,
51    total_poll_duration: CounterVec,
52    total_first_poll_count: IntCounterVec,
53    total_fast_poll_count: IntCounterVec,
54    total_fast_poll_duration: CounterVec,
55    total_slow_poll_count: IntCounterVec,
56    total_slow_poll_duration: CounterVec,
57    total_short_delay_count: IntCounterVec,
58    total_long_delay_count: IntCounterVec,
59    total_short_delay_duration: CounterVec,
60    total_long_delay_duration: CounterVec,
61}
62
63impl TaskMetrics {
64    fn new<S: Into<String>>(namespace: S) -> Self {
65        let namespace = namespace.into();
66        let instrumented_count = IntGaugeVec::new(
67            Opts::new(
68                "tokio_task_instrumented_count",
69                r#"The number of tasks instrumented."#,
70            )
71            .namespace(namespace.clone()),
72            &[TASK_LABEL],
73        )
74        .unwrap();
75
76        let dropped_count = IntGaugeVec::new(
77            Opts::new(
78                "tokio_task_dropped_count",
79                r#"The number of tasks dropped."#,
80            )
81            .namespace(namespace.clone()),
82            &[TASK_LABEL],
83        )
84        .unwrap();
85
86        let first_poll_count = IntGaugeVec::new(
87            Opts::new(
88                "tokio_task_first_poll_count",
89                r#"The number of tasks polled for the first time."#,
90            )
91            .namespace(namespace.clone()),
92            &[TASK_LABEL],
93        )
94        .unwrap();
95
96        let total_first_poll_delay = CounterVec::new(
97            Opts::new(
98                "tokio_task_total_first_poll_delay",
99                r#"The total duration elapsed between the instant tasks are instrumented, and the instant they are first polled."#,
100            )
101            .namespace(namespace.clone()),
102            &[TASK_LABEL]
103        )
104        .unwrap();
105
106        let total_idled_count = IntCounterVec::new(
107            Opts::new(
108                "tokio_task_total_idled_count",
109                r#"The total number of times that tasks idled, waiting to be awoken."#,
110            )
111            .namespace(namespace.clone()),
112            &[TASK_LABEL],
113        )
114        .unwrap();
115
116        let total_idle_duration = CounterVec::new(
117            Opts::new(
118                "tokio_task_total_idle_duration",
119                r#"The total duration that tasks idled."#,
120            )
121            .namespace(namespace.clone()),
122            &[TASK_LABEL],
123        )
124        .unwrap();
125
126        let total_scheduled_count = IntCounterVec::new(
127            Opts::new(
128                "tokio_task_total_scheduled_count",
129                r#"The total number of times that tasks were awoken (and then, presumably, scheduled for execution)."#,
130            )
131            .namespace(namespace.clone()),
132            &[TASK_LABEL]
133        )
134        .unwrap();
135
136        let total_scheduled_duration = CounterVec::new(
137            Opts::new(
138                "tokio_task_total_scheduled_duration",
139                r#"The total duration that tasks spent waiting to be polled after awakening."#,
140            )
141            .namespace(namespace.clone()),
142            &[TASK_LABEL],
143        )
144        .unwrap();
145
146        let total_poll_count = IntCounterVec::new(
147            Opts::new(
148                "tokio_task_total_poll_count",
149                r#"The total number of times that tasks were polled."#,
150            )
151            .namespace(namespace.clone()),
152            &[TASK_LABEL],
153        )
154        .unwrap();
155
156        let total_poll_duration = CounterVec::new(
157            Opts::new(
158                "tokio_task_total_poll_duration",
159                r#"The total duration elapsed during polls."#,
160            )
161            .namespace(namespace.clone()),
162            &[TASK_LABEL],
163        )
164        .unwrap();
165
166        let total_fast_poll_count = IntCounterVec::new(
167            Opts::new(
168                "tokio_task_total_fast_poll_count",
169                r#"The amount of time worker threads were busy."#,
170            )
171            .namespace(namespace.clone()),
172            &[TASK_LABEL],
173        )
174        .unwrap();
175
176        let total_first_poll_count = IntCounterVec::new(
177            Opts::new(
178                "tokio_task_total_first_poll_count",
179                r#"The total number of times that tasks were polled for the first time."#,
180            )
181            .namespace(namespace.clone()),
182            &[TASK_LABEL],
183        )
184        .unwrap();
185
186        let total_fast_poll_duration = CounterVec::new(
187            Opts::new(
188                "tokio_task_total_fast_poll_duration",
189                r#"The total duration of fast polls."#,
190            )
191            .namespace(namespace.clone()),
192            &[TASK_LABEL],
193        )
194        .unwrap();
195
196        let total_slow_poll_count = IntCounterVec::new(
197            Opts::new(
198                "tokio_task_total_slow_poll_count",
199                r#"The total number of times that polling tasks completed slowly."#,
200            )
201            .namespace(namespace.clone()),
202            &[TASK_LABEL],
203        )
204        .unwrap();
205
206        let total_slow_poll_duration = CounterVec::new(
207            Opts::new(
208                "tokio_task_total_slow_poll_duration",
209                r#"The total duration of slow polls."#,
210            )
211            .namespace(namespace.clone()),
212            &[TASK_LABEL],
213        )
214        .unwrap();
215
216        let total_short_delay_count = IntCounterVec::new(
217            Opts::new(
218                "tokio_task_total_short_delay_count",
219                r#"The total count of tasks with short scheduling delays."#,
220            )
221            .namespace(namespace.clone()),
222            &[TASK_LABEL],
223        )
224        .unwrap();
225
226        let total_long_delay_count = IntCounterVec::new(
227            Opts::new(
228                "tokio_task_total_long_delay_count",
229                r#"The total count of tasks with long scheduling delays."#,
230            )
231            .namespace(namespace.clone()),
232            &[TASK_LABEL],
233        )
234        .unwrap();
235
236        let total_short_delay_duration = CounterVec::new(
237            Opts::new(
238                "tokio_task_total_short_delay_duration",
239                r#"The total duration of tasks with short scheduling delays."#,
240            )
241            .namespace(namespace.clone()),
242            &[TASK_LABEL],
243        )
244        .unwrap();
245
246        let total_long_delay_duration = CounterVec::new(
247            Opts::new(
248                "tokio_task_total_long_delay_duration",
249                r#"The total number of times that a task had a long scheduling duration."#,
250            )
251            .namespace(namespace.clone()),
252            &[TASK_LABEL],
253        )
254        .unwrap();
255
256        Self {
257            instrumented_count,
258            dropped_count,
259            first_poll_count,
260            total_first_poll_delay,
261            total_idled_count,
262            total_idle_duration,
263            total_scheduled_count,
264            total_scheduled_duration,
265            total_poll_count,
266            total_poll_duration,
267            total_first_poll_count,
268            total_fast_poll_count,
269            total_fast_poll_duration,
270            total_slow_poll_count,
271            total_slow_poll_duration,
272            total_short_delay_count,
273            total_long_delay_count,
274            total_short_delay_duration,
275            total_long_delay_duration,
276        }
277    }
278
279    fn update(&self, label: &str, data: TaskMetricsData) {
280        macro_rules! update_counter {
281            ( $field:ident,  "int" ) => {{
282                let new = data.$field as u64;
283                self.$field.with_label_values(&[label]).inc_by(new);
284            }};
285            ( $field:ident, $metrics_field:ident,  "int" ) => {{
286                let new = data.$metrics_field as u64;
287                self.$field.with_label_values(&[label]).inc_by(new);
288            }};
289            ( $field:ident,  "duration" ) => {{
290                let new = data.$field.as_secs_f64();
291                self.$field.with_label_values(&[label]).inc_by(new);
292            }};
293        }
294
295        macro_rules! update_gauge {
296            ( $field:ident) => {
297                self.$field
298                    .with_label_values(&[label])
299                    .set(data.$field as i64);
300            };
301        }
302
303        update_gauge!(instrumented_count);
304        update_gauge!(dropped_count);
305        update_gauge!(first_poll_count);
306
307        update_counter!(total_first_poll_delay, "duration");
308        update_counter!(total_idled_count, "int");
309        update_counter!(total_idle_duration, "duration");
310        update_counter!(total_scheduled_count, "int");
311        update_counter!(total_scheduled_duration, "duration");
312        update_counter!(total_poll_count, "int");
313        update_counter!(total_poll_duration, "duration");
314        update_counter!(total_first_poll_count, first_poll_count, "int");
315        update_counter!(total_fast_poll_count, "int");
316        update_counter!(total_fast_poll_duration, "duration");
317        update_counter!(total_slow_poll_count, "int");
318        update_counter!(total_slow_poll_duration, "duration");
319        update_counter!(total_short_delay_count, "int");
320        update_counter!(total_long_delay_count, "int");
321        update_counter!(total_short_delay_duration, "duration");
322        update_counter!(total_long_delay_duration, "duration");
323    }
324
325    fn to_desc(&self) -> Vec<&Desc> {
326        let mut desc = vec![];
327        desc.extend(self.instrumented_count.desc());
328        desc.extend(self.dropped_count.desc());
329        desc.extend(self.first_poll_count.desc());
330        desc.extend(self.total_first_poll_delay.desc());
331        desc.extend(self.total_idled_count.desc());
332        desc.extend(self.total_idle_duration.desc());
333        desc.extend(self.total_scheduled_count.desc());
334        desc.extend(self.total_scheduled_duration.desc());
335        desc.extend(self.total_poll_count.desc());
336        desc.extend(self.total_poll_duration.desc());
337        desc.extend(self.total_first_poll_count.desc());
338        desc.extend(self.total_fast_poll_count.desc());
339        desc.extend(self.total_fast_poll_duration.desc());
340        desc.extend(self.total_slow_poll_count.desc());
341        desc.extend(self.total_slow_poll_duration.desc());
342        desc.extend(self.total_short_delay_count.desc());
343        desc.extend(self.total_long_delay_count.desc());
344        desc.extend(self.total_short_delay_duration.desc());
345        desc.extend(self.total_long_delay_duration.desc());
346
347        assert_eq!(desc.len(), 19);
348        desc
349    }
350
351    fn to_metrics(&self) -> Vec<proto::MetricFamily> {
352        let mut metrics = vec![];
353        metrics.extend(self.instrumented_count.collect());
354        metrics.extend(self.dropped_count.collect());
355        metrics.extend(self.first_poll_count.collect());
356        metrics.extend(self.total_first_poll_delay.collect());
357        metrics.extend(self.total_idled_count.collect());
358        metrics.extend(self.total_idle_duration.collect());
359        metrics.extend(self.total_scheduled_count.collect());
360        metrics.extend(self.total_scheduled_duration.collect());
361        metrics.extend(self.total_poll_count.collect());
362        metrics.extend(self.total_poll_duration.collect());
363        metrics.extend(self.total_first_poll_count.collect());
364        metrics.extend(self.total_fast_poll_count.collect());
365        metrics.extend(self.total_fast_poll_duration.collect());
366        metrics.extend(self.total_slow_poll_count.collect());
367        metrics.extend(self.total_slow_poll_duration.collect());
368        metrics.extend(self.total_short_delay_count.collect());
369        metrics.extend(self.total_long_delay_count.collect());
370        metrics.extend(self.total_short_delay_duration.collect());
371        metrics.extend(self.total_long_delay_duration.collect());
372
373        assert_eq!(metrics.len(), 19);
374        metrics
375    }
376}
377
378/// TaskCollector
379pub struct TaskCollector {
380    metrics: TaskMetrics,
381    producer:
382        RwLock<HashMap<String, Box<dyn Iterator<Item = tokio_metrics::TaskMetrics> + Send + Sync>>>,
383}
384
385impl TaskCollector {
386    /// Create a [`TaskCollector`] in namespace.
387    pub fn new<S: Into<String>>(namespace: S) -> Self {
388        let producer = RwLock::new(HashMap::new());
389        let metrics = TaskMetrics::new(namespace);
390
391        Self { metrics, producer }
392    }
393
394    /// Add a [`TaskMonitor`] to collector.
395    /// If the label is already used by another monitor, an error will be thrown.
396    pub fn add(&self, label: &str, monitor: TaskMonitor) -> Result<(), LabelAlreadyExists> {
397        if self.producer.read().contains_key(label) {
398            return Err(LabelAlreadyExists::new(label.into()));
399        }
400        self.producer
401            .write()
402            .insert(label.to_string(), Box::new(monitor.intervals()));
403
404        Ok(())
405    }
406
407    /// Remove a [`TaskMonitor`] from collector.
408    pub fn remove(&mut self, label: &str) {
409        self.producer.write().remove(label);
410    }
411
412    fn get_metrics_data_by_label(&self, label: &str) -> TaskMetricsData {
413        let data = self.producer.write().get_mut(label).unwrap().next();
414        data.unwrap()
415    }
416}
417
418impl Collector for TaskCollector {
419    fn desc(&self) -> Vec<&Desc> {
420        self.metrics.to_desc()
421    }
422
423    fn collect(&self) -> Vec<proto::MetricFamily> {
424        let mut labels = vec![];
425
426        {
427            let producer = self.producer.read();
428
429            for (label, _) in producer.iter() {
430                labels.push(label.to_string());
431            }
432        }
433
434        for label in labels {
435            let data = self.get_metrics_data_by_label(&label);
436            self.metrics.update(&label, data);
437        }
438        self.metrics.to_metrics()
439    }
440}
441
442impl Collector for &TaskCollector {
443    fn desc(&self) -> Vec<&Desc> {
444        self.metrics.to_desc()
445    }
446
447    fn collect(&self) -> Vec<proto::MetricFamily> {
448        let mut labels = vec![];
449
450        {
451            let producer = self.producer.read();
452
453            for (label, _) in producer.iter() {
454                labels.push(label.to_string());
455            }
456        }
457
458        for label in labels {
459            let data = self.get_metrics_data_by_label(&label);
460            self.metrics.update(&label, data);
461        }
462        self.metrics.to_metrics()
463    }
464}
465
466lazy_static! {
467    static ref DEFAULT_COLLECTOR: TaskCollector = {
468        let collector = TaskCollector::new("");
469
470        collector
471    };
472}
473
474/// Get the global [`TaskCollector`], the namespace is under `""`.
475pub fn default_collector() -> &'static TaskCollector {
476    lazy_static::initialize(&DEFAULT_COLLECTOR);
477    &DEFAULT_COLLECTOR
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483
484    #[test]
485    fn test_task_collector_descs() {
486        let monitor = tokio_metrics::TaskMonitor::new();
487        let tc = TaskCollector::new("");
488
489        let descs = tc.desc();
490        assert_eq!(descs.len(), METRICS_COUNT);
491        assert_eq!(
492            descs[0].fq_name,
493            "tokio_task_instrumented_count".to_string()
494        );
495        assert_eq!(descs[0].help, "The number of tasks instrumented.");
496        assert_eq!(descs[0].variable_labels.len(), 1);
497    }
498
499    #[test]
500    fn test_task_collector_add() {
501        let monitor = tokio_metrics::TaskMonitor::new();
502        let tc = TaskCollector::new("");
503
504        let res = tc.add("custom", monitor.clone());
505        assert!(res.is_ok());
506
507        let res2 = tc.add("custom", monitor.clone());
508        assert!(res2.is_err());
509        assert_eq!(
510            format!("{}", res2.err().unwrap()),
511            "label 'custom' already exists".to_string()
512        );
513    }
514
515    #[tokio::test]
516    async fn test_runtime_collector_metrics() {
517        let monitor = tokio_metrics::TaskMonitor::new();
518        let tc = TaskCollector::new("");
519
520        tc.add("custom", monitor.clone()).unwrap();
521
522        monitor.instrument(tokio::spawn(async {
523            tokio::time::sleep(std::time::Duration::from_secs(2)).await
524        }));
525
526        let metrics = tc.collect();
527        assert_eq!(metrics.len(), METRICS_COUNT);
528        assert_eq!(metrics[0].name(), "tokio_task_instrumented_count");
529        assert_eq!(
530            metrics[0].help(),
531            "The number of tasks instrumented.".to_string()
532        );
533        assert_eq!(metrics[0].get_metric().len(), 1);
534        assert_eq!(metrics[0].get_metric()[0].get_gauge().value(), 1.0);
535    }
536
537    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
538    async fn test_default() {
539        let collector = default_collector();
540        assert_eq!(collector.desc().len(), METRICS_COUNT);
541        let metrics = collector.collect();
542        assert_eq!(metrics.len(), METRICS_COUNT);
543        assert_eq!(metrics[0].name(), "tokio_task_instrumented_count");
544        assert_eq!(
545            metrics[0].help(),
546            "The number of tasks instrumented.".to_string()
547        );
548        assert_eq!(metrics[0].get_metric().len(), 0);
549    }
550
551    #[tokio::test]
552    async fn test_integrated_with_prometheus() {
553        use prometheus::Encoder;
554
555        let tc = TaskCollector::new("");
556
557        let monitor = tokio_metrics::TaskMonitor::new();
558        tc.add("custom", monitor.clone()).unwrap();
559
560        prometheus::default_registry()
561            .register(Box::new(tc))
562            .unwrap();
563
564        monitor.instrument(tokio::spawn(async {
565            tokio::time::sleep(std::time::Duration::from_secs(2)).await
566        }));
567
568        let encoder = prometheus::TextEncoder::new();
569
570        let mut buffer = Vec::new();
571        encoder
572            .encode(&prometheus::default_registry().gather(), &mut buffer)
573            .expect("Failed to encode");
574        String::from_utf8(buffer.clone()).expect("Failed to convert to string.");
575    }
576
577    #[tokio::test]
578    async fn test_task_first_poll_count() {
579        let monitor = tokio_metrics::TaskMonitor::new();
580        let tc = TaskCollector::new("");
581
582        tc.add("custom", monitor.clone()).unwrap();
583
584        let mut interval = monitor.intervals();
585        let mut next_interval = || interval.next().unwrap();
586
587        // no tasks have been constructed, instrumented, and polled at least once
588        assert_eq!(next_interval().first_poll_count, 0);
589
590        let task = monitor.instrument(async {});
591
592        // `task` has been constructed and instrumented, but has not yet been polled
593        assert_eq!(next_interval().first_poll_count, 0);
594
595        // poll `task` to completion
596        task.await;
597
598        // `task` has been constructed, instrumented, and polled at least once
599        assert_eq!(next_interval().first_poll_count, 1);
600
601        let metrics = tc.collect();
602        let gauge_index = metrics
603            .iter()
604            .position(|m| m.name() == "tokio_task_first_poll_count")
605            .unwrap();
606
607        let counter_index = metrics
608            .iter()
609            .position(|m| m.name() == "tokio_task_total_first_poll_count")
610            .unwrap();
611
612        assert_eq!(
613            metrics[gauge_index].get_metric()[0].get_gauge().value(),
614            1.0
615        );
616        assert_eq!(
617            metrics[counter_index].get_metric()[0].get_counter().value(),
618            1.0
619        );
620
621        let task2 = monitor.instrument(async {});
622        task2.await;
623
624        let metrics = tc.collect();
625        assert_eq!(
626            metrics[gauge_index].get_metric()[0].get_gauge().value(),
627            1.0
628        );
629        // check total counter - 2
630        assert_eq!(
631            metrics[counter_index].get_metric()[0].get_counter().value(),
632            2.0
633        );
634    }
635
636    #[test]
637    fn test_send() {
638        fn test<C: Send>() {}
639        test::<DEFAULT_COLLECTOR>();
640    }
641
642    #[test]
643    fn test_sync() {
644        fn test<C: Sync>() {}
645        test::<DEFAULT_COLLECTOR>();
646    }
647}