future_metrics/
lib.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::{Duration, Instant},
6};
7
8/// Task execution stats.
9#[derive(Clone, Debug)]
10pub struct ExecutionStats {
11    /// Task creation timestamp.
12    pub created: Instant,
13
14    /// Timestamp of the first poll.
15    pub started: Option<Instant>,
16
17    /// Timestamp of the poll that returned `Poll::Ready`.
18    pub finished: Option<Instant>,
19
20    /// Total time spent polling.
21    pub poll_duration: Duration,
22
23    /// Maximum time spent in the poll method.
24    pub poll_duration_max: Duration,
25
26    /// Number of times the task was polled during execution.
27    pub poll_entries: usize,
28}
29
30/// Trait for tracking task execution stats with [`MetricsFuture`].
31pub trait Recorder {
32    /// Reports that a task was created.
33    fn task_created(&self);
34
35    /// Reports task execution stats when it's dropped.
36    fn task_destroyed(&self, stats: ExecutionStats);
37}
38
39/// Convenience trait that simplifies the construction of [`MetricsFuture`].
40pub trait FutureExt {
41    type Future;
42
43    /// Consumes the future, returning a new future that records the execution
44    /// stats.
45    fn with_metrics<R: Recorder>(self, recorder: R) -> MetricsFuture<Self::Future, R>;
46}
47
48struct State<R: Recorder> {
49    created: Instant,
50    started: Option<Instant>,
51    finished: Option<Instant>,
52    poll_duration: Duration,
53    poll_duration_max: Duration,
54    poll_entries: usize,
55    recorder: R,
56}
57
58impl<R: Recorder> State<R> {
59    fn new(recorder: R) -> Self {
60        recorder.task_created();
61
62        Self {
63            created: Instant::now(),
64            started: None,
65            finished: None,
66            poll_duration: Duration::ZERO,
67            poll_duration_max: Duration::ZERO,
68            poll_entries: 0,
69            recorder,
70        }
71    }
72}
73
74impl<R: Recorder> Drop for State<R> {
75    fn drop(&mut self) {
76        self.recorder.task_destroyed(ExecutionStats {
77            created: self.created,
78            started: self.started,
79            finished: self.finished,
80            poll_duration: self.poll_duration,
81            poll_duration_max: self.poll_duration_max,
82            poll_entries: self.poll_entries,
83        });
84    }
85}
86
87#[pin_project::pin_project]
88#[must_use = "futures do nothing unless you `.await` or poll them"]
89pub struct MetricsFuture<F, R: Recorder> {
90    #[pin]
91    inner: F,
92    state: State<R>,
93}
94
95impl<F, R: Recorder> MetricsFuture<F, R> {
96    pub fn new(inner: F, recorder: R) -> Self {
97        Self {
98            inner,
99            state: State::new(recorder),
100        }
101    }
102}
103
104impl<F, R> Future for MetricsFuture<F, R>
105where
106    F: Future,
107    R: Recorder,
108{
109    type Output = F::Output;
110
111    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
112        let this = self.project();
113
114        let poll_start = Instant::now();
115        let result = this.inner.poll(cx);
116        let poll_end = Instant::now();
117
118        let state = this.state;
119
120        if state.started.is_none() {
121            state.started = Some(poll_start);
122        }
123
124        if result.is_ready() && state.finished.is_none() {
125            state.finished = Some(poll_end);
126        }
127
128        let poll_duration = poll_end - poll_start;
129
130        state.poll_duration += poll_duration;
131        state.poll_duration_max = state.poll_duration_max.max(poll_duration);
132        state.poll_entries += 1;
133
134        result
135    }
136}
137
138impl<T: Future> FutureExt for T {
139    type Future = T;
140
141    fn with_metrics<R: Recorder>(self, recorder: R) -> MetricsFuture<Self::Future, R> {
142        MetricsFuture::new(self, recorder)
143    }
144}