trait_net/metrics/
future_ext.rs

1use crate::metrics::Observer;
2use std::{
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8enum State {
9    Passive,
10    Polling,
11}
12
13pub struct ObservedFuture<Obs, Fut>
14where
15    Obs: Observer<Fut::Output>,
16    Fut: Future,
17{
18    state: State,
19    observer: Obs,
20    inner: Fut,
21}
22
23impl<Obs, Fut> Future for ObservedFuture<Obs, Fut>
24where
25    Obs: Observer<Fut::Output>,
26    Fut: Future,
27{
28    type Output = Fut::Output;
29
30    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
31        // Safety: We're not moving both the inner future and the observer.
32        let this = unsafe { self.get_unchecked_mut() };
33        let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
34        if matches!(this.state, State::Passive) {
35            // Interface allows us to call this code twice, but since the user
36            // is required to make observer behavior unspecified but safe in
37            // this cases, we're ok with just two states.
38            this.state = State::Polling;
39            this.observer.on_first_poll();
40        }
41        this.observer.on_poll();
42        match inner.poll(cx) {
43            Poll::Ready(output) => {
44                this.state = State::Passive;
45                this.observer.on_poll_ready(&output);
46                this.observer.on_finish(Some(&output));
47                Poll::Ready(output)
48            }
49            Poll::Pending => Poll::Pending,
50        }
51    }
52}
53
54impl<Obs, Fut> Drop for ObservedFuture<Obs, Fut>
55where
56    Obs: Observer<Fut::Output>,
57    Fut: Future,
58{
59    fn drop(&mut self) {
60        if matches!(self.state, State::Polling) {
61            self.observer.on_finish(None);
62        }
63        self.observer.on_drop();
64    }
65}
66
67pub trait ObservedFutureExt: Future + Sized {
68    fn observe<Obs: Observer<Self::Output>>(self, observer: Obs) -> ObservedFuture<Obs, Self> {
69        ObservedFuture {
70            state: State::Passive,
71            observer,
72            inner: self,
73        }
74    }
75}
76
77impl<F: Future + Sized> ObservedFutureExt for F {}