mm1_common/
metrics.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use std::time::{Duration, Instant};
4
5use metrics::{Counter, Histogram};
6
7#[macro_export]
8macro_rules! make_metrics {
9    ($name:literal $(, $k:literal => $v:expr )* $(,)?) => {{
10        const _: &str = $name;
11        const POLL_COUNT: &str = concat!($name, "_poll_count");
12        const DONE_COUNT: &str = concat!($name, "_done_count");
13        const BUSY_TIME: &str = concat!($name, "_busy_sec");
14        const WAIT_TIME: &str = concat!($name, "_wait_sec");
15        const WALL_TIME: &str = concat!($name, "_wall_sec");
16
17        let poll_count = ::metrics::counter!(POLL_COUNT $(, $k => $v )*);
18        let done_count = ::metrics::counter!(DONE_COUNT $(, $k => $v )*);
19        let busy_time = ::metrics::histogram!(BUSY_TIME $(, $k => $v )*);
20        let wait_time = ::metrics::histogram!(WAIT_TIME $(, $k => $v )*);
21        let wall_time = ::metrics::histogram!(WALL_TIME $(, $k => $v )*);
22
23        $crate::metrics::Metrics {
24            poll_count,
25            done_count,
26            busy_time,
27            wait_time,
28            wall_time,
29        }
30    }};
31}
32
33pub trait MeasuredFutureExt: Future + Sized {
34    fn measured(self, metrics: Metrics) -> MeasuredFuture<Self> {
35        MeasuredFuture::new(self, metrics)
36    }
37}
38impl<F> MeasuredFutureExt for F where F: Future + Sized {}
39
40#[pin_project::pin_project]
41#[derive(derive_more::Debug)]
42pub struct MeasuredFuture<F> {
43    #[debug(skip)]
44    #[pin]
45    inner: F,
46
47    metrics:      Metrics,
48    measurements: Measurements,
49}
50
51#[derive(Debug)]
52pub struct Metrics {
53    pub poll_count: Counter,
54    pub done_count: Counter,
55    pub busy_time:  Histogram,
56    pub wait_time:  Histogram,
57    pub wall_time:  Histogram,
58}
59
60#[derive(Default, Debug)]
61struct Measurements {
62    first_poll: Option<Instant>,
63    last_poll:  Option<Instant>,
64    busy_time:  Duration,
65    wait_time:  Duration,
66}
67
68impl<F> MeasuredFuture<F> {
69    pub fn new(inner: F, metrics: Metrics) -> Self {
70        Self {
71            inner,
72            metrics,
73            measurements: Default::default(),
74        }
75    }
76}
77
78impl<F> Future for MeasuredFuture<F>
79where
80    F: Future,
81{
82    type Output = F::Output;
83
84    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85        let this = self.project();
86
87        this.metrics.poll_count.increment(1);
88
89        let t_start = Instant::now();
90        if let Some(last_poll) = this.measurements.last_poll {
91            this.measurements.wait_time += t_start.duration_since(last_poll);
92        }
93        let first_poll = this.measurements.first_poll.get_or_insert(t_start);
94
95        let poll = this.inner.poll(cx);
96
97        let t_end = Instant::now();
98        this.measurements.busy_time += t_end.duration_since(t_start);
99
100        if poll.is_ready() {
101            this.metrics
102                .wall_time
103                .record(t_end.duration_since(*first_poll));
104            this.metrics.busy_time.record(this.measurements.busy_time);
105            this.metrics.wait_time.record(this.measurements.wait_time);
106
107            this.metrics.done_count.increment(1);
108        }
109
110        poll
111    }
112}