1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::{Duration, Instant},
6};
7
8#[derive(Clone, Debug)]
10pub struct ExecutionStats {
11 pub created: Instant,
13
14 pub started: Option<Instant>,
16
17 pub finished: Option<Instant>,
19
20 pub poll_duration: Duration,
22
23 pub poll_duration_max: Duration,
25
26 pub poll_entries: usize,
28}
29
30pub trait Recorder {
32 fn task_created(&self);
34
35 fn task_destroyed(&self, stats: ExecutionStats);
37}
38
39pub trait FutureExt {
41 type Future;
42
43 fn with_metrics<R: Recorder>(self, recorder: R) -> MetricsFuture<Self::Future, R>;
46}
47
48struct State<R: Recorder> {
49 created: Instant,
50 started: Option<Instant>,
51 finished: Option<Instant>,
52 poll_duration: Duration,
53 poll_duration_max: Duration,
54 poll_entries: usize,
55 recorder: R,
56}
57
58impl<R: Recorder> State<R> {
59 fn new(recorder: R) -> Self {
60 recorder.task_created();
61
62 Self {
63 created: Instant::now(),
64 started: None,
65 finished: None,
66 poll_duration: Duration::ZERO,
67 poll_duration_max: Duration::ZERO,
68 poll_entries: 0,
69 recorder,
70 }
71 }
72}
73
74impl<R: Recorder> Drop for State<R> {
75 fn drop(&mut self) {
76 self.recorder.task_destroyed(ExecutionStats {
77 created: self.created,
78 started: self.started,
79 finished: self.finished,
80 poll_duration: self.poll_duration,
81 poll_duration_max: self.poll_duration_max,
82 poll_entries: self.poll_entries,
83 });
84 }
85}
86
87#[pin_project::pin_project]
88#[must_use = "futures do nothing unless you `.await` or poll them"]
89pub struct MetricsFuture<F, R: Recorder> {
90 #[pin]
91 inner: F,
92 state: State<R>,
93}
94
95impl<F, R: Recorder> MetricsFuture<F, R> {
96 pub fn new(inner: F, recorder: R) -> Self {
97 Self {
98 inner,
99 state: State::new(recorder),
100 }
101 }
102}
103
104impl<F, R> Future for MetricsFuture<F, R>
105where
106 F: Future,
107 R: Recorder,
108{
109 type Output = F::Output;
110
111 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
112 let this = self.project();
113
114 let poll_start = Instant::now();
115 let result = this.inner.poll(cx);
116 let poll_end = Instant::now();
117
118 let state = this.state;
119
120 if state.started.is_none() {
121 state.started = Some(poll_start);
122 }
123
124 if result.is_ready() && state.finished.is_none() {
125 state.finished = Some(poll_end);
126 }
127
128 let poll_duration = poll_end - poll_start;
129
130 state.poll_duration += poll_duration;
131 state.poll_duration_max = state.poll_duration_max.max(poll_duration);
132 state.poll_entries += 1;
133
134 result
135 }
136}
137
138impl<T: Future> FutureExt for T {
139 type Future = T;
140
141 fn with_metrics<R: Recorder>(self, recorder: R) -> MetricsFuture<Self::Future, R> {
142 MetricsFuture::new(self, recorder)
143 }
144}