1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use super::{AsStatusLabel, ScopedObserver, StatusObserver};
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

enum ObserverGuardState {
    Waiting,
    Running,
}

struct ObserverGuard<O: ScopedObserver> {
    state: ObserverGuardState,
    observer: O,
}

impl<O: ScopedObserver> ObserverGuard<O> {
    fn new(observer: O) -> Self {
        Self {
            state: ObserverGuardState::Waiting,
            observer,
        }
    }

    fn start_if_first_time(&mut self) {
        if matches!(self.state, ObserverGuardState::Waiting) {
            self.observer.start();
            self.state = ObserverGuardState::Running;
        }
    }
}

impl<O: ScopedObserver> Drop for ObserverGuard<O> {
    fn drop(&mut self) {
        if matches!(self.state, ObserverGuardState::Running) {
            self.observer.stop();
        }
    }
}

pub struct MonitorFuture<O: ScopedObserver, F> {
    guard: ObserverGuard<O>,
    inner: F,
}

impl<O: ScopedObserver, F> MonitorFuture<O, F> {
    fn new(observer: O, inner: F) -> Self {
        Self {
            guard: ObserverGuard::new(observer),
            inner,
        }
    }
}

impl<O: ScopedObserver, F: Future> Future for MonitorFuture<O, F> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Safety: We're not moving both the inner future and the guard.
        let this = unsafe { self.get_unchecked_mut() };
        let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
        this.guard.start_if_first_time();
        inner.poll(cx)
    }
}

pub struct RecordFuture<O, F> {
    observer: O,
    inner: F,
}

impl<O, F> RecordFuture<O, F> {
    fn new(observer: O, inner: F) -> Self {
        Self { observer, inner }
    }
}

impl<O, F> Future for RecordFuture<O, F>
where
    O: StatusObserver,
    F: Future,
    F::Output: AsStatusLabel,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Safety: We're not moving both the inner future and the guard.
        let this = unsafe { self.get_unchecked_mut() };
        let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
        match inner.poll(cx) {
            Poll::Ready(output) => {
                this.observer.record(&output);
                Poll::Ready(output)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

// TODO: remove and use metaprogramming for only .monitor() func
pub trait MetricsFutureExt: Future + Sized {
    fn monitor<O: ScopedObserver>(self, observer: O) -> MonitorFuture<O, Self> {
        MonitorFuture::new(observer, self)
    }

    fn record<O>(self, observer: O) -> RecordFuture<O, Self>
    where
        O: StatusObserver,
        Self::Output: AsStatusLabel,
    {
        RecordFuture::new(observer, self)
    }
}

impl<F: Future + Sized> MetricsFutureExt for F {}