metriki_tokio/
lib.rs

1//! # Metriki Tokio Instrument
2//!
3//! This library provides
4//! [tokio-metrics](https://github.com/tokio/tokio-metrics)
5//! integration for metriki.
6//!
7//! # Setup
8//!
9//! This library provides `RuntimeMetrics` by default. Accroding to
10//! tokio-metrics, `tokio_unstable` is required for this feature. See
11//! the
12//! [doc](https://github.com/tokio-rs/tokio-metrics#getting-started-with-runtime-metrics)
13//! for steps to setup the cargo configuration.
14//!
15//! To disable unstable features, you can include this library with
16//! `default-features = false`.
17//!
18//! # Usage
19//!
20//! [An
21//! Example](https://github.com/sunng87/metriki/blob/master/metriki-tokio/examples/server.rs)
22//! is provided in the codebase.
23//!
24//! Check [docs of tokio-metrics](https://docs.rs/tokio-metrics/) for
25//! meaning of the metrics.
26//!
27use std::collections::HashMap;
28use std::fmt;
29use std::sync::{Arc, Mutex};
30
31use derive_builder::Builder;
32use metriki_core::metrics::{Metric, StaticGauge};
33use metriki_core::MetricsSet;
34
35#[cfg(feature = "rt")]
36use tokio_metrics::{RuntimeMetrics, RuntimeMonitor};
37use tokio_metrics::{TaskMetrics, TaskMonitor};
38
39/// A MetricsSet works with tokio_metrics `TaskMonitor`.
40///
41#[derive(Builder)]
42pub struct TokioTaskMetricsSet {
43    #[builder(setter(into))]
44    name: String,
45    #[builder(setter(custom))]
46    monitor: Arc<Mutex<dyn Iterator<Item = TaskMetrics> + Send>>,
47}
48
49impl fmt::Debug for TokioTaskMetricsSet {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
51        f.debug_struct("TokioTaskMetricsSet")
52            .field("name", &self.name)
53            .finish()
54    }
55}
56
57impl TokioTaskMetricsSet {
58    pub fn name(&self) -> &String {
59        &self.name
60    }
61}
62
63impl TokioTaskMetricsSetBuilder {
64    pub fn monitor(&mut self, monitor: &TaskMonitor) -> &Self {
65        self.monitor = Some(Arc::new(Mutex::new(monitor.intervals())));
66        self
67    }
68}
69
70impl MetricsSet for TokioTaskMetricsSet {
71    fn get_all(&self) -> HashMap<String, Metric> {
72        let metrics: TaskMetrics = self.monitor.lock().unwrap().next().unwrap();
73
74        let mut result = HashMap::new();
75        result.insert(
76            format!("{}.first_poll_count", self.name),
77            Metric::gauge(Box::new(StaticGauge(metrics.first_poll_count as f64))).into(),
78        );
79        result.insert(
80            format!("{}.instrumented_count", self.name),
81            Metric::gauge(Box::new(StaticGauge(metrics.instrumented_count as f64))).into(),
82        );
83        result.insert(
84            format!("{}.dropped_count", self.name),
85            Metric::gauge(Box::new(StaticGauge(metrics.dropped_count as f64))).into(),
86        );
87        result.insert(
88            format!("{}.total_poll_count", self.name),
89            Metric::gauge(Box::new(StaticGauge(metrics.total_poll_count as f64))).into(),
90        );
91        result.insert(
92            format!("{}.total_idled_count", self.name),
93            Metric::gauge(Box::new(StaticGauge(metrics.total_idled_count as f64))).into(),
94        );
95        result.insert(
96            format!("{}.total_scheduled_count", self.name),
97            Metric::gauge(Box::new(StaticGauge(metrics.total_scheduled_count as f64))).into(),
98        );
99        result.insert(
100            format!("{}.total_slow_poll_count", self.name),
101            Metric::gauge(Box::new(StaticGauge(metrics.total_slow_poll_count as f64))).into(),
102        );
103        result.insert(
104            format!("{}.total_fast_poll_count", self.name),
105            Metric::gauge(Box::new(StaticGauge(metrics.total_fast_poll_count as f64))).into(),
106        );
107
108        result.insert(
109            format!("{}.mean_poll_duration", self.name),
110            Metric::gauge(Box::new(StaticGauge(
111                metrics.mean_poll_duration().as_millis() as f64,
112            )))
113            .into(),
114        );
115
116        result.insert(
117            format!("{}.mean_first_poll_delay", self.name),
118            Metric::gauge(Box::new(StaticGauge(
119                metrics.mean_first_poll_delay().as_millis() as f64,
120            )))
121            .into(),
122        );
123
124        result.insert(
125            format!("{}.mean_scheduled_duration", self.name),
126            Metric::gauge(Box::new(StaticGauge(
127                metrics.mean_scheduled_duration().as_millis() as f64,
128            )))
129            .into(),
130        );
131
132        result
133    }
134}
135
136/// A MetricsSet works with tokio_metrics `TaskMonitor`.
137///
138#[cfg(feature = "rt")]
139#[derive(Builder)]
140pub struct TokioRuntimeMetricsSet {
141    #[builder(setter(into))]
142    name: String,
143    #[builder(setter(custom))]
144    monitor: Arc<Mutex<dyn Iterator<Item = RuntimeMetrics> + Send>>,
145}
146
147#[cfg(feature = "rt")]
148impl TokioRuntimeMetricsSetBuilder {
149    pub fn monitor(&mut self, monitor: &RuntimeMonitor) -> &Self {
150        self.monitor = Some(Arc::new(Mutex::new(monitor.intervals())));
151        self
152    }
153}
154
155#[cfg(feature = "rt")]
156impl fmt::Debug for TokioRuntimeMetricsSet {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
158        f.debug_struct("TokioRuntimeMetricsSet")
159            .field("name", &self.name)
160            .finish()
161    }
162}
163
164#[cfg(feature = "rt")]
165impl TokioRuntimeMetricsSet {
166    pub fn name(&self) -> &String {
167        &self.name
168    }
169}
170
171#[cfg(feature = "rt")]
172impl MetricsSet for TokioRuntimeMetricsSet {
173    fn get_all(&self) -> HashMap<String, Metric> {
174        let metrics: RuntimeMetrics = self.monitor.lock().unwrap().next().unwrap();
175
176        let mut result = HashMap::new();
177        result.insert(
178            format!("{}.total_polls_count", self.name),
179            Metric::gauge(Box::new(StaticGauge(metrics.total_polls_count as f64))).into(),
180        );
181        result.insert(
182            format!("{}.total_steal_count", self.name),
183            Metric::gauge(Box::new(StaticGauge(metrics.total_steal_count as f64))).into(),
184        );
185        result.insert(
186            format!("{}.total_park_count", self.name),
187            Metric::gauge(Box::new(StaticGauge(metrics.total_park_count as f64))).into(),
188        );
189        result.insert(
190            format!("{}.num_remote_schedules", self.name),
191            Metric::gauge(Box::new(StaticGauge(metrics.num_remote_schedules as f64))).into(),
192        );
193        result.insert(
194            format!("{}.total_local_schedule_count", self.name),
195            Metric::gauge(Box::new(StaticGauge(
196                metrics.total_local_schedule_count as f64,
197            )))
198            .into(),
199        );
200        result.insert(
201            format!("{}.total_overflow_count", self.name),
202            Metric::gauge(Box::new(StaticGauge(metrics.total_overflow_count as f64))).into(),
203        );
204        result.insert(
205            format!("{}.total_noop_count", self.name),
206            Metric::gauge(Box::new(StaticGauge(metrics.total_noop_count as f64))).into(),
207        );
208
209        result.insert(
210            format!("{}.total_busy_duration", self.name),
211            Metric::gauge(Box::new(StaticGauge(
212                metrics.total_busy_duration.as_millis() as f64,
213            )))
214            .into(),
215        );
216
217        result.insert(
218            format!("{}.busy_ratio", self.name),
219            Metric::gauge(Box::new(StaticGauge(metrics.busy_ratio()))).into(),
220        );
221
222        result
223    }
224}