1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
//! Utilities for instrumenting asynchronous code.
//
//  `dyn_drop` is explicitly allowed in this module. As this lint indicates, `Drop` bounds are not
//  a proper heuristic for checking that a type can be trivially dropped. In our case, an
//  instrumented future is responsible for dealing with resource guards whose primary functionality
//  /is/ their drop implementations.
#![allow(dyn_drop)]

use super::{GuardedGauge, IntCounterWithLabels, Labels};
use pin_project::pin_project;
use prometheus::core::{Atomic, GenericCounter};
use std::{any::Any, future, ops::Deref, pin::Pin, task};

/// An instrumented [`Future`][std-future].
///
/// `InstrumentedFuture` provides a transparent observability layer for futures.  An instrumented
/// future is not created directly. Rather, an instrumented future is created _from_ an existing
/// future using [`IntoInstrumentedFuture::into_instrumented_future`][into-fut].
///
/// Most importantly, the [`increment_until_resolved`][incr-until] method allows callers to
/// increment a [`GuardedGauge`][guarded-gauge], and then decrement the gauge once the future has
/// resolved.
///
/// [guarded-gauge]: trait.GuardedGauge.html
/// [incr-until]: struct.InstrumentedFuture.html#method.increment_until_resolved
/// [into-fut]: trait.IntoInstrumentedFuture.html#tymethod.into_instrumented_future
/// [std-future]: https://doc.rust-lang.org/std/future/trait.Future.html
///
/// # Examples
///
/// ```no_run
/// use prometheus_utils::IntoInstrumentedFuture;
/// use tokio::time::{sleep, Duration};
///
/// #[tokio::main]
/// async fn main() {
///     let fut = sleep(Duration::from_millis(100));
///     let instrumented = fut.into_instrumented_future();
///     let _res = instrumented.await;
/// }
/// ```
#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct InstrumentedFuture<F: future::Future> {
    /// The inner [`Future`][std-future].
    ///
    /// ## Structural Pinning
    ///
    /// Pinning is structural for `fut`. Because we delegate to it's
    /// [`Future::poll`][fut-poll] implementation, the inner future *must* be pinned if the outer
    /// `InstrumentedFuture` is pinned.
    ///
    /// See [`std::pin`][projections] and [`pin-project`][pin-project] for more information on
    /// structural pinning.
    ///
    /// [fut-poll]: https://doc.rust-lang.org/stable/core/future/trait.Future.html#tymethod.poll
    /// [pin-project]: https://docs.rs/pin-project/latest/pin_project/
    /// [projections]: https://doc.rust-lang.org/stable/std/pin/index.html#projections-and-structural-pinning
    /// [std-future]: https://doc.rust-lang.org/std/future/trait.Future.html
    #[pin]
    inner: F,
    /// Closures to call before polling the inner `Future`. These may, but are not required to,
    /// return items to be `Drop`ped when the inner `Future` completes or is cancelled.
    ///
    /// In practice, this holds a list of Prometheus counters or gauges to increment when the inner
    /// `Future` starts, returning a list of guards to decrement once the inner `Future` completes.
    pre_polls: Vec<Box<dyn FnOnce() -> Option<Box<dyn Any + Send>> + Send>>,
    /// RAII guards that will be dropped once the future has resolved.
    ///
    /// In practice, this is used to hold values like [`IntGaugeGuard`][int-guard] and
    /// [`GaugeGuard`][float-guard], so that Prometheus metrics are properly decremented once the
    /// underlying future has been polled to completion.
    ///
    /// See [`increment_until_resolved`][inc-until] for more information.
    ///
    /// [float-guard]: type.GaugeGuard.html
    /// [int-guard]: type.IntGaugeGuard.html
    /// [inc-until]: struct.InstrumentedFuture.html#method.increment_until_resolved
    #[allow(dyn_drop)]
    resource_guards: Vec<Box<dyn Any + Send>>,
}

/// Convert a [`Future`][future::Future] into an instrumented future.
///
/// See the [`InstrumentedFuture`] documentation for more information.
pub trait IntoInstrumentedFuture {
    /// The underlying  to be instrumented.
    type Future: future::Future;
    /// Convert this future into an [`InstrumentedFuture`].
    fn into_instrumented_future(self) -> InstrumentedFuture<Self::Future>;
}

impl<F: future::Future> IntoInstrumentedFuture for F {
    type Future = Self;
    fn into_instrumented_future(self) -> InstrumentedFuture<Self> {
        InstrumentedFuture {
            inner: self,
            pre_polls: vec![],
            resource_guards: vec![],
        }
    }
}

impl<F: future::Future> InstrumentedFuture<F> {
    /// Queue `guard_fn` to execute when the future is polled, retaining the returned value until
    /// the future completes.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use prometheus_utils::IntoInstrumentedFuture;
    /// use tokio::time::{sleep, Duration};
    ///
    /// /// An RAII guard that will be attached to the future.
    /// struct FutureGuard;
    ///
    /// impl Drop for FutureGuard {
    ///     fn drop(&mut self) {
    ///         // This code will be run when the future is ready.
    ///         println!("100ms have elapsed!");
    ///     }
    /// }
    ///
    /// /// An asynchronous function.
    /// async fn do_work() {
    ///     sleep(Duration::from_millis(100)).await;
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     do_work()
    ///         .into_instrumented_future()
    ///         .with_guard(|| {
    ///             // This code will be run once the future is polled.
    ///             println!("future was polled!");
    ///             Some(Box::new(FutureGuard))
    ///         })
    ///         .await;
    /// }
    /// ```
    pub fn with_guard<GuardFn: FnOnce() -> Option<Box<dyn Any + Send>> + Send + 'static>(
        mut self,
        guard_fn: GuardFn,
    ) -> Self {
        self.pre_polls.push(Box::new(guard_fn));
        self
    }

    /// Increment a Prometheus counter immediately.
    pub fn with_count<P: Atomic + 'static>(mut self, counter: &'static GenericCounter<P>) -> Self {
        self.pre_polls.push(Box::new(move || {
            counter.inc();
            None
        }));
        self
    }

    /// Increment a labeled Prometheus counter when the future is polled.
    pub fn with_count_labeled<C, L>(mut self, counter: &'static C, labels: L) -> Self
    where
        C: Deref<Target = IntCounterWithLabels<L>> + Sync,
        L: Labels + Sync + Send + 'static,
    {
        self.pre_polls.push(Box::new(move || {
            counter.inc(&labels);
            None
        }));
        self
    }

    /// Increment a Prometheus gauge until this future has resolved.
    ///
    /// When called, this method will immediately increment the given gauge using the
    /// [`GuardedGauge::gaurded_inc`][gaurded-inc] trait method. This gauge will then be
    /// decremented once this future's [`Future::poll`][fut-poll] implementation returns a
    /// [`Poll::Ready`][poll-ready] value.
    ///
    /// See the [`GenericGaugeGuard`][gauge-guard] documentation for more information about RAII
    /// guards for Prometheus metrics.
    ///
    /// [fut-poll]: https://doc.rust-lang.org/stable/core/future/trait.Future.html#tymethod.poll
    /// [gauge-guard]: struct.GenericGaugeGuard.html
    /// [gaurded-inc]: trait.GuardedGauge.html#tymethod.guarded_inc
    /// [poll-ready]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Ready
    pub fn with_count_gauge<G, T, P>(mut self, gauge: &'static G) -> Self
    where
        G: Deref<Target = T> + Sync,
        T: GuardedGauge<P> + 'static,
        P: Atomic + 'static,
    {
        self.pre_polls.push(Box::new(move || {
            Some(Box::new(gauge.deref().guarded_inc()))
        }));
        self
    }
}

impl<F: future::Future> future::Future for InstrumentedFuture<F> {
    /// An instrumented future returns the same type as its inner future.
    type Output = <F as future::Future>::Output;
    /// Polls the inner future.
    ///
    /// If the inner future's [`Future::poll`][fut-poll] implementation returns a
    /// [`Poll::Ready`][poll-ready] value, Prometheus gauges will be decremented accordingly.
    fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
        use task::Poll::{Pending, Ready};
        let pin_projection = self.project();
        for pre_poll in pin_projection.pre_polls.drain(..) {
            if let Some(droppable) = pre_poll() {
                pin_projection.resource_guards.push(droppable);
            }
        }
        match pin_projection.inner.poll(cx) {
            // The inner future is still pending...
            p @ Pending => p,
            // If we are here, the inner future resolved! Before returning we should drop any
            // resource guards that may have been attached to this future.
            out @ Ready(_) => {
                pin_projection.resource_guards.clear();
                out
            }
        }
    }
}

#[test]
fn counters_increment_only_when_futures_run() {
    use lazy_static::lazy_static;
    use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
    use std::sync::{atomic::AtomicU8, atomic::Ordering, Arc, Mutex};
    lazy_static! {
        static ref WORK_COUNTER: IntCounter = register_int_counter!(opts!(
            "work_counter",
            "the number of times `work()` has been called"
        ))
        .unwrap();
        static ref WORK_GAUGE: IntGauge =
            register_int_gauge!(opts!("work_gauge", "the number `work()` currently running"))
                .unwrap();
        static ref CAN_MEASURE: AtomicU8 = AtomicU8::new(0);
    }

    let work_stoppage = Arc::new(Mutex::new(0));

    async fn work(stop_ref: Arc<Mutex<usize>>) {
        CAN_MEASURE.store(1, Ordering::SeqCst);
        *stop_ref.lock().unwrap() = 4;
    }

    let stop_ref = Arc::clone(&work_stoppage);

    let value_lock = work_stoppage.lock().unwrap();

    // create a future to do some work, but don't run it yet
    let f = work(stop_ref)
        .into_instrumented_future()
        .with_count(&WORK_COUNTER)
        .with_count_gauge(&WORK_GAUGE);

    assert_eq!(WORK_COUNTER.get(), 0);
    assert_eq!(WORK_GAUGE.get(), 0);

    let rt = tokio::runtime::Builder::new_multi_thread()
        .build()
        .expect("can build runtime");
    let handle = rt.spawn(f);

    while CAN_MEASURE.load(Ordering::SeqCst) == 0 {
        // wait for a future point where we know we can sample the counters
        std::thread::sleep(std::time::Duration::from_millis(10));
    }

    // we have started `f`, and so we have started `work`, but we have not allowed `work` to
    // complete, so the gauge should still be 1.
    assert_eq!(WORK_COUNTER.get(), 1);
    assert_eq!(WORK_GAUGE.get(), 1);

    std::mem::drop(value_lock);

    rt.block_on(handle).expect("can block on f");

    // now `f` is complete, so the gauge should once again be 0.
    assert_eq!(WORK_COUNTER.get(), 1);
    assert_eq!(WORK_GAUGE.get(), 0);

    // and confirm the mutex has been work'd
    assert_eq!(*work_stoppage.lock().unwrap(), 4);
}