integrationos_domain/algebra/
timed.rs

1use futures::Future;
2use pin_project::pin_project;
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6    time::{Duration, Instant},
7};
8
9pub trait TimedExt: Sized + Future {
10    fn timed<F>(self, f: F) -> Timed<Self, F>
11    where
12        F: FnMut(&Self::Output, Duration),
13    {
14        Timed {
15            inner: self,
16            f,
17            start: None,
18        }
19    }
20}
21
22#[pin_project]
23pub struct Timed<Fut, F>
24where
25    Fut: Future,
26    F: FnMut(&Fut::Output, Duration),
27{
28    #[pin]
29    inner: Fut,
30    f: F,
31    start: Option<Instant>,
32}
33
34impl<Fut, F> Future for Timed<Fut, F>
35where
36    Fut: Future,
37    F: FnMut(&Fut::Output, Duration),
38{
39    type Output = Fut::Output;
40
41    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
42        let this = self.project();
43
44        match this.start {
45            Some(start) => match this.inner.poll(cx) {
46                Poll::Ready(output) => {
47                    let elapsed = start.elapsed();
48                    (this.f)(&output, elapsed);
49                    Poll::Ready(output)
50                }
51                Poll::Pending => Poll::Pending,
52            },
53            None => {
54                *this.start = Some(Instant::now());
55                // Continue polling after setting the start time.
56                cx.waker().wake_by_ref();
57                Poll::Pending
58            }
59        }
60    }
61}
62
63impl<F: Future> TimedExt for F {}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use futures::future::ready;
69
70    #[tokio::test]
71    async fn test_timed() {
72        let mut elapsed = None;
73        let fut = ready(42).timed(|output, duration| {
74            elapsed = Some(duration);
75            assert_eq!(*output, 42);
76        });
77        assert_eq!(fut.await, 42);
78        assert!(elapsed.is_some());
79    }
80}