measured_tokio/
lib.rs

1//! Monitor a tokio runtime.
2//!
3//! # Usage
4//!
5//! ```
6//! use measured::MetricGroup;
7//!
8//! #[derive(MetricGroup)]
9//! #[metric(new())]
10//! struct MyAppMetrics {
11//!     #[cfg(tokio_unstable)]
12//!     #[metric(namespace = "tokio")]
13//!     #[metric(init = measured_tokio::RuntimeCollector::current())]
14//!     tokio: measured_tokio::RuntimeCollector,
15//!
16//!     // other metrics
17//! }
18//!
19//! #[tokio::main]
20//! async fn main() {
21//!     let metrics = MyAppMetrics::new();
22//!
23//!     // when you run metrics.collect_group_into(...), you will sample tokio to get runtime state.
24//!
25//!     # drop(metrics);
26//! }
27//! ```
28
29use std::{borrow::Cow, sync::RwLock};
30
31use measured::{
32    FixedCardinalityLabel, LabelGroup, MetricGroup,
33    label::{ComposedGroup, LabelGroupVisitor, LabelName, LabelValue, LabelVisitor, NoLabels},
34    metric::{
35        MetricEncoding,
36        counter::CounterState,
37        gauge::{FloatGaugeState, GaugeState},
38        group::Encoding,
39        name::MetricName,
40    },
41};
42use tokio::runtime::RuntimeMetrics;
43
44/// A collector which contains multiple named tokio runtimes
45pub struct NamedRuntimesCollector {
46    runtimes: RwLock<Vec<RuntimeCollector>>,
47}
48
49impl NamedRuntimesCollector {
50    /// Create a new empty `NamedRuntimesCollector`
51    pub fn new() -> Self {
52        Self {
53            runtimes: RwLock::new(vec![]),
54        }
55    }
56
57    /// Inserts a `RuntimeCollector` with the given runtime.
58    pub fn add(&self, rt: RuntimeMetrics, name: impl Into<Cow<'static, str>>) {
59        self.runtimes
60            .write()
61            .unwrap()
62            .push(RuntimeCollector::new(rt).with_name(name))
63    }
64
65    /// Inserts a `RuntimeCollector` for the current runtime.
66    ///
67    /// # Panics
68    ///
69    /// This will panic if called outside the context of a Tokio runtime. That means that you must
70    /// call this on one of the threads **being run by the runtime**, or from a thread with an active
71    /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
72    /// will cause a panic unless that thread has an active `EnterGuard`.
73    pub fn add_current(&self, name: impl Into<Cow<'static, str>>) {
74        self.add(tokio::runtime::Handle::current().metrics(), name);
75    }
76}
77
78impl Default for NamedRuntimesCollector {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl<Enc: Encoding> MetricGroup<Enc> for NamedRuntimesCollector
85where
86    CounterState: MetricEncoding<Enc>,
87    GaugeState: MetricEncoding<Enc>,
88    FloatGaugeState: MetricEncoding<Enc>,
89{
90    fn collect_group_into(&self, enc: &mut Enc) -> Result<(), <Enc as Encoding>::Err> {
91        collect(&self.runtimes.read().unwrap(), enc)
92    }
93}
94
95/// A collector which exports the current state of tokio metrics
96pub struct RuntimeCollector {
97    runtime: RuntimeMetrics,
98    name: RuntimeName,
99}
100
101impl RuntimeCollector {
102    /// Create a `RuntimeCollector` with the given runtime.
103    pub fn new(runtime: RuntimeMetrics) -> Self {
104        RuntimeCollector {
105            runtime,
106            name: RuntimeName { name: None },
107        }
108    }
109
110    /// Return a `RuntimeCollector` for the current runtime.
111    ///
112    /// # Panics
113    ///
114    /// This will panic if called outside the context of a Tokio runtime. That means that you must
115    /// call this on one of the threads **being run by the runtime**, or from a thread with an active
116    /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
117    /// will cause a panic unless that thread has an active `EnterGuard`.
118    pub fn current() -> Self {
119        RuntimeCollector::new(tokio::runtime::Handle::current().metrics())
120    }
121
122    pub fn with_name(self, name: impl Into<Cow<'static, str>>) -> Self {
123        Self {
124            runtime: self.runtime,
125            name: RuntimeName {
126                name: Some(name.into()),
127            },
128        }
129    }
130}
131
132#[cfg(tokio_unstable)]
133fn histogram_le(rt: &RuntimeMetrics, bucket: usize) -> HistogramLabelLe {
134    let le = rt.poll_time_histogram_bucket_range(bucket).end;
135    let le = if le == std::time::Duration::from_nanos(u64::MAX) {
136        f64::INFINITY
137    } else {
138        le.as_secs_f64()
139    };
140    HistogramLabelLe { le }
141}
142
143fn collect<Enc: Encoding>(runtimes: &[RuntimeCollector], enc: &mut Enc) -> Result<(), Enc::Err>
144where
145    CounterState: MetricEncoding<Enc>,
146    GaugeState: MetricEncoding<Enc>,
147    FloatGaugeState: MetricEncoding<Enc>,
148{
149    macro_rules! metric {
150        ($name:literal, $help:literal, |$rt:ident| $expr:expr) => {{
151            #![allow(unused_macros)]
152            const NAME: &MetricName = MetricName::from_str($name);
153            enc.write_help(NAME, $help)?;
154            for rt in runtimes {
155                let rt_name = &rt.name;
156                macro_rules! write_counter {
157                    ($labels:expr, $val:expr) => {
158                        measured::metric::counter::write_counter(
159                            enc,
160                            NAME,
161                            ComposedGroup(rt_name, $labels),
162                            $val,
163                        )?
164                    };
165                    ($suffix:expr, $labels:expr, $val:expr) => {
166                        measured::metric::counter::write_counter(
167                            enc,
168                            NAME.with_suffix($suffix),
169                            ComposedGroup(rt_name, $labels),
170                            $val,
171                        )?
172                    };
173                }
174                macro_rules! write_gauge {
175                    ($labels:expr, $val:expr) => {
176                        measured::metric::gauge::write_gauge(
177                            enc,
178                            NAME,
179                            ComposedGroup(rt_name, $labels),
180                            $val,
181                        )?
182                    };
183                    ($suffix:expr, $labels:expr, $val:expr) => {
184                        measured::metric::gauge::write_gauge(
185                            enc,
186                            NAME.with_suffix($suffix),
187                            ComposedGroup(rt_name, $labels),
188                            $val,
189                        )?
190                    };
191                }
192                macro_rules! write_float_gauge {
193                    ($labels:expr, $val:expr) => {
194                        measured::metric::gauge::write_float_gauge(
195                            enc,
196                            NAME,
197                            ComposedGroup(rt_name, $labels),
198                            $val,
199                        )?
200                    };
201                    ($suffix:expr, $labels:expr, $val:expr) => {
202                        measured::metric::gauge::write_float_gauge(
203                            enc,
204                            NAME.with_suffix($suffix),
205                            ComposedGroup(rt_name, $labels),
206                            $val,
207                        )?
208                    };
209                }
210                let $rt = &rt.runtime;
211                ($expr)
212            }
213        }};
214    }
215
216    metric!(
217        "threads_total",
218        "number of threads used by the runtime",
219        |rt| {
220            write_gauge!(ThreadKind::Worker, rt.num_workers() as i64);
221
222            #[cfg(tokio_unstable)]
223            let idle = rt.num_idle_blocking_threads();
224
225            // we subtract here so that `sum(threads)` actually gives the total number of threads.
226            #[cfg(tokio_unstable)]
227            write_gauge!(
228                ThreadKind::Blocking,
229                rt.num_blocking_threads().saturating_sub(idle) as i64
230            );
231
232            #[cfg(tokio_unstable)]
233            write_gauge!(ThreadKind::BlockingIdle, idle as i64);
234        }
235    );
236
237    metric!(
238        "alive_tasks",
239        "number of live tasks spawned in the runtime",
240        |rt| write_gauge!(NoLabels, rt.num_alive_tasks() as i64)
241    );
242
243    #[cfg(tokio_unstable)]
244    metric!("tasks_total", "number of tasks", |rt| {
245        write_counter!(NoLabels, rt.spawned_tasks_count());
246    });
247
248    metric!(
249        "queued_tasks",
250        "number of tasks currently in a queue",
251        |rt| {
252            #[cfg(tokio_unstable)]
253            write_gauge!(QueueKind::Blocking, rt.blocking_queue_depth() as i64);
254
255            write_gauge!(QueueKind::Global, rt.global_queue_depth() as i64);
256
257            #[cfg(tokio_unstable)]
258            for worker in 0..rt.num_workers() {
259                let queue_depth = rt.worker_local_queue_depth(worker);
260                write_gauge!(QueueKind::Worker(worker), queue_depth as i64);
261            }
262        }
263    );
264
265    #[cfg(tokio_unstable)]
266    metric!(
267        "scheduled_tasks_total",
268        "total number of tasks scheduled into the runtime",
269        |rt| {
270            struct Overflow(bool);
271
272            impl LabelGroup for Overflow {
273                fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
274                    const OVERFLOW: &LabelName = LabelName::from_str("overflow");
275                    v.write_value(OVERFLOW, if self.0 { &Str("true") } else { &Str("false") });
276                }
277            }
278
279            struct Remote;
280
281            impl LabelGroup for Remote {
282                fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
283                    const LE: &LabelName = LabelName::from_str("worker");
284                    v.write_value(LE, &Str("remote"));
285                }
286            }
287
288            for worker in 0..rt.num_workers() {
289                write_counter!(
290                    Worker(worker).compose_with(Overflow(false)),
291                    rt.worker_local_schedule_count(worker)
292                );
293                write_counter!(
294                    Worker(worker).compose_with(Overflow(true)),
295                    rt.worker_overflow_count(worker)
296                );
297            }
298            write_counter!(
299                Remote.compose_with(Overflow(true)),
300                rt.remote_schedule_count()
301            );
302        }
303    );
304
305    #[cfg(tokio_unstable)]
306    metric!(
307        "budget_forced_yield_total",
308        "number of tasks forced to yield after exhausting their budget",
309        |rt| write_counter!(NoLabels, rt.budget_forced_yield_count())
310    );
311
312    #[cfg(tokio_unstable)]
313    metric!(
314        "worker_mean_poll_time_seconds",
315        "estimated weighted moving average of the poll time for this worker",
316        |rt| for worker in 0..rt.num_workers() {
317            let poll_time = rt.worker_mean_poll_time(worker);
318            write_float_gauge!(Worker(worker), poll_time.as_secs_f64());
319        }
320    );
321
322    #[cfg(tokio_unstable)]
323    metric!(
324        "worker_noop_total",
325        "number of times the given worker thread woke up with no work",
326        |rt| for worker in 0..rt.num_workers() {
327            let noops = rt.worker_noop_count(worker);
328            write_counter!(Worker(worker), noops);
329        }
330    );
331
332    metric!(
333        "worker_park_total",
334        "number of times the given worker thread has parked",
335        |rt| for worker in 0..rt.num_workers() {
336            let count = rt.worker_park_count(worker);
337            write_counter!(Worker(worker), count);
338        }
339    );
340
341    metric!(
342        "workers_park_unpark_total",
343        "number of times the given worker thread has parked and unparked",
344        |rt| for worker in 0..rt.num_workers() {
345            let count = rt.worker_park_unpark_count(worker);
346            write_counter!(Worker(worker), count);
347        }
348    );
349
350    #[cfg(tokio_unstable)]
351    metric!(
352        "worker_steal_total",
353        "number of tasks the given worker thread has stolen",
354        |rt| for worker in 0..rt.num_workers() {
355            let count = rt.worker_steal_count(worker);
356            write_counter!(Worker(worker), count);
357        }
358    );
359
360    #[cfg(tokio_unstable)]
361    metric!(
362        "worker_steal_operations_total",
363        "number of times the given worker thread has attempted to steal tasks",
364        |rt| for worker in 0..rt.num_workers() {
365            let count = rt.worker_steal_operations(worker);
366            write_counter!(Worker(worker), count);
367        }
368    );
369
370    metric!(
371        "worker_poll_time_seconds",
372        "time this runtime thread has spent polling tasks",
373        |rt| for worker in 0..rt.num_workers() {
374            use measured::metric::name::Sum;
375
376            let worker_label = Worker(worker);
377
378            #[cfg(tokio_unstable)]
379            {
380                use measured::metric::name::{Bucket, Count};
381                if rt.poll_time_histogram_enabled() {
382                    let buckets = rt.poll_time_histogram_num_buckets();
383                    let mut total = 0;
384                    for bucket in 0..buckets {
385                        let le = histogram_le(rt, bucket);
386                        total += rt.poll_time_histogram_bucket_count(worker, bucket);
387                        write_counter!(Bucket, worker_label.compose_with(le), total);
388                    }
389                }
390
391                let count = rt.worker_poll_count(worker);
392                write_counter!(Count, worker_label, count);
393            }
394
395            let busy = rt.worker_total_busy_duration(worker);
396            write_float_gauge!(Sum, worker_label, busy.as_secs_f64());
397        }
398    );
399
400    #[cfg(tokio_unstable)]
401    #[cfg(feature = "net")]
402    {
403        metric!(
404            "registered_fds_total",
405            "total number of file descriptors that have been registered in the runtime",
406            |rt| write_counter!(NoLabels, rt.io_driver_fd_registered_count())
407        );
408        metric!(
409            "deregistered_fds_total",
410            "total number of file descriptors that have been deregistered from the runtime",
411            |rt| write_counter!(NoLabels, rt.io_driver_fd_deregistered_count())
412        );
413        metric!(
414            "io_ready_events_total",
415            "total number of ready events the runtime's IO driver has processed",
416            |rt| write_counter!(NoLabels, rt.io_driver_ready_count())
417        );
418    }
419
420    Ok(())
421}
422
423impl<Enc: Encoding> MetricGroup<Enc> for RuntimeCollector
424where
425    CounterState: MetricEncoding<Enc>,
426    GaugeState: MetricEncoding<Enc>,
427    FloatGaugeState: MetricEncoding<Enc>,
428{
429    fn collect_group_into(&self, enc: &mut Enc) -> Result<(), Enc::Err> {
430        collect(std::slice::from_ref(self), enc)
431    }
432}
433
434struct I64(i64);
435
436impl LabelValue for I64 {
437    fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
438        v.write_int(self.0)
439    }
440}
441
442#[cfg(tokio_unstable)]
443struct F64(f64);
444
445#[cfg(tokio_unstable)]
446impl LabelValue for F64 {
447    fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
448        v.write_float(self.0)
449    }
450}
451
452#[derive(Copy, Clone)]
453struct Worker(usize);
454
455impl LabelGroup for Worker {
456    fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
457        const LE: &LabelName = LabelName::from_str("worker");
458        v.write_value(LE, &I64(self.0 as i64));
459    }
460}
461
462#[cfg(tokio_unstable)]
463struct HistogramLabelLe {
464    le: f64,
465}
466
467#[cfg(tokio_unstable)]
468impl LabelGroup for HistogramLabelLe {
469    fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
470        const LE: &LabelName = LabelName::from_str("le");
471        v.write_value(LE, &F64(self.le));
472    }
473}
474
475struct Str<'a>(&'a str);
476impl LabelValue for Str<'_> {
477    fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
478        v.write_str(self.0)
479    }
480}
481
482struct RuntimeName {
483    name: Option<Cow<'static, str>>,
484}
485
486impl LabelGroup for RuntimeName {
487    fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
488        const LE: &LabelName = LabelName::from_str("runtime");
489        if let Some(name) = self.name.as_deref() {
490            v.write_value(LE, &Str(name));
491        }
492    }
493}
494
495#[derive(FixedCardinalityLabel, Clone, Copy)]
496#[label(singleton = "kind")]
497enum ThreadKind {
498    Worker,
499    BlockingIdle,
500    Blocking,
501}
502
503#[allow(unused)]
504enum QueueKind {
505    Worker(usize),
506    Blocking,
507    Global,
508}
509
510#[automatically_derived]
511impl LabelValue for QueueKind {
512    fn visit<V: LabelVisitor>(&self, v: V) -> V::Output {
513        match self {
514            QueueKind::Worker(i) => v.write_str(itoa::Buffer::new().format(*i)),
515            QueueKind::Blocking => v.write_str("blocking"),
516            QueueKind::Global => v.write_str("global"),
517        }
518    }
519}
520
521impl LabelGroup for QueueKind {
522    fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
523        const NAME: &LabelName = LabelName::from_str("kind");
524        v.write_value(NAME, self);
525    }
526}
527
528// #[cfg(test)]
529// mod tests {
530//     use std::io::Write;
531
532//     use measured::{text::BufferedTextEncoder, MetricGroup};
533//     use tokio::task::JoinSet;
534
535//     use crate::{NamedRuntimesCollector, RuntimeCollector};
536
537//     #[test]
538//     fn demo() {
539//         let rt = tokio::runtime::Builder::new_multi_thread()
540//             .worker_threads(4)
541//             .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
542//             .enable_metrics_poll_count_histogram()
543//             .enable_all()
544//             .build()
545//             .unwrap();
546//         rt.block_on(async {
547//             let mut js = JoinSet::new();
548//             for _ in 0..100 {
549//                 js.spawn(async {
550//                     for _ in 0..100 {
551//                         tokio::task::yield_now().await;
552//                     }
553//                 });
554//             }
555//             while js.join_next().await.is_some() {}
556//         });
557
558//         let rt2 = tokio::runtime::Builder::new_multi_thread()
559//             .worker_threads(8)
560//             .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
561//             .enable_metrics_poll_count_histogram()
562//             .enable_all()
563//             .build()
564//             .unwrap();
565//         rt2.block_on(async {
566//             let mut js = JoinSet::new();
567//             for _ in 0..100 {
568//                 js.spawn(async {
569//                     for _ in 0..100 {
570//                         tokio::task::yield_now().await;
571//                     }
572//                 });
573//             }
574//             while js.join_next().await.is_some() {}
575//         });
576
577//         let collector = NamedRuntimesCollector::new();
578//         collector.add(rt.metrics(), "foo");
579//         collector.add(rt2.metrics(), "bar");
580
581//         let mut enc = BufferedTextEncoder::new();
582//         collector.collect_group_into(&mut enc).unwrap();
583//         std::io::stdout().write_all(&enc.finish()).unwrap();
584//     }
585// }