prometheus_utils/
instrumented_future.rs

1//! Utilities for instrumenting asynchronous code.
2//
3//  `dyn_drop` is explicitly allowed in this module. As this lint indicates, `Drop` bounds are not
4//  a proper heuristic for checking that a type can be trivially dropped. In our case, an
5//  instrumented future is responsible for dealing with resource guards whose primary functionality
6//  /is/ their drop implementations.
7#![allow(dyn_drop)]
8
9use super::{GuardedGauge, IntCounterWithLabels, Labels};
10use pin_project::pin_project;
11use prometheus::core::{Atomic, GenericCounter};
12use std::{any::Any, future, ops::Deref, pin::Pin, task};
13
14/// An instrumented [`Future`][std-future].
15///
16/// `InstrumentedFuture` provides a transparent observability layer for futures.  An instrumented
17/// future is not created directly. Rather, an instrumented future is created _from_ an existing
18/// future using [`IntoInstrumentedFuture::into_instrumented_future`][into-fut].
19///
20/// Most importantly, the [`increment_until_resolved`][incr-until] method allows callers to
21/// increment a [`GuardedGauge`][guarded-gauge], and then decrement the gauge once the future has
22/// resolved.
23///
24/// [guarded-gauge]: trait.GuardedGauge.html
25/// [incr-until]: struct.InstrumentedFuture.html#method.increment_until_resolved
26/// [into-fut]: trait.IntoInstrumentedFuture.html#tymethod.into_instrumented_future
27/// [std-future]: https://doc.rust-lang.org/std/future/trait.Future.html
28///
29/// # Examples
30///
31/// ```no_run
32/// use prometheus_utils::IntoInstrumentedFuture;
33/// use tokio::time::{sleep, Duration};
34///
35/// #[tokio::main]
36/// async fn main() {
37///     let fut = sleep(Duration::from_millis(100));
38///     let instrumented = fut.into_instrumented_future();
39///     let _res = instrumented.await;
40/// }
41/// ```
42#[pin_project]
43#[must_use = "futures do nothing unless you `.await` or poll them"]
44pub struct InstrumentedFuture<F: future::Future> {
45    /// The inner [`Future`][std-future].
46    ///
47    /// ## Structural Pinning
48    ///
49    /// Pinning is structural for `fut`. Because we delegate to it's
50    /// [`Future::poll`][fut-poll] implementation, the inner future *must* be pinned if the outer
51    /// `InstrumentedFuture` is pinned.
52    ///
53    /// See [`std::pin`][projections] and [`pin-project`][pin-project] for more information on
54    /// structural pinning.
55    ///
56    /// [fut-poll]: https://doc.rust-lang.org/stable/core/future/trait.Future.html#tymethod.poll
57    /// [pin-project]: https://docs.rs/pin-project/latest/pin_project/
58    /// [projections]: https://doc.rust-lang.org/stable/std/pin/index.html#projections-and-structural-pinning
59    /// [std-future]: https://doc.rust-lang.org/std/future/trait.Future.html
60    #[pin]
61    inner: F,
62    /// Closures to call before polling the inner `Future`. These may, but are not required to,
63    /// return items to be `Drop`ped when the inner `Future` completes or is cancelled.
64    ///
65    /// In practice, this holds a list of Prometheus counters or gauges to increment when the inner
66    /// `Future` starts, returning a list of guards to decrement once the inner `Future` completes.
67    pre_polls: Vec<Box<dyn FnOnce() -> Option<Box<dyn Any + Send>> + Send>>,
68    /// RAII guards that will be dropped once the future has resolved.
69    ///
70    /// In practice, this is used to hold values like [`IntGaugeGuard`][int-guard] and
71    /// [`GaugeGuard`][float-guard], so that Prometheus metrics are properly decremented once the
72    /// underlying future has been polled to completion.
73    ///
74    /// See [`increment_until_resolved`][inc-until] for more information.
75    ///
76    /// [float-guard]: type.GaugeGuard.html
77    /// [int-guard]: type.IntGaugeGuard.html
78    /// [inc-until]: struct.InstrumentedFuture.html#method.increment_until_resolved
79    #[allow(dyn_drop)]
80    resource_guards: Vec<Box<dyn Any + Send>>,
81}
82
83/// Convert a [`Future`][future::Future] into an instrumented future.
84///
85/// See the [`InstrumentedFuture`] documentation for more information.
86pub trait IntoInstrumentedFuture {
87    /// The underlying  to be instrumented.
88    type Future: future::Future;
89    /// Convert this future into an [`InstrumentedFuture`].
90    fn into_instrumented_future(self) -> InstrumentedFuture<Self::Future>;
91}
92
93impl<F: future::Future> IntoInstrumentedFuture for F {
94    type Future = Self;
95    fn into_instrumented_future(self) -> InstrumentedFuture<Self> {
96        InstrumentedFuture {
97            inner: self,
98            pre_polls: vec![],
99            resource_guards: vec![],
100        }
101    }
102}
103
104impl<F: future::Future> InstrumentedFuture<F> {
105    /// Queue `guard_fn` to execute when the future is polled, retaining the returned value until
106    /// the future completes.
107    ///
108    /// # Examples
109    ///
110    /// ```no_run
111    /// use prometheus_utils::IntoInstrumentedFuture;
112    /// use tokio::time::{sleep, Duration};
113    ///
114    /// /// An RAII guard that will be attached to the future.
115    /// struct FutureGuard;
116    ///
117    /// impl Drop for FutureGuard {
118    ///     fn drop(&mut self) {
119    ///         // This code will be run when the future is ready.
120    ///         println!("100ms have elapsed!");
121    ///     }
122    /// }
123    ///
124    /// /// An asynchronous function.
125    /// async fn do_work() {
126    ///     sleep(Duration::from_millis(100)).await;
127    /// }
128    ///
129    /// #[tokio::main]
130    /// async fn main() {
131    ///     do_work()
132    ///         .into_instrumented_future()
133    ///         .with_guard(|| {
134    ///             // This code will be run once the future is polled.
135    ///             println!("future was polled!");
136    ///             Some(Box::new(FutureGuard))
137    ///         })
138    ///         .await;
139    /// }
140    /// ```
141    pub fn with_guard<GuardFn: FnOnce() -> Option<Box<dyn Any + Send>> + Send + 'static>(
142        mut self,
143        guard_fn: GuardFn,
144    ) -> Self {
145        self.pre_polls.push(Box::new(guard_fn));
146        self
147    }
148
149    /// Increment a Prometheus counter immediately.
150    pub fn with_count<P: Atomic + 'static>(mut self, counter: &'static GenericCounter<P>) -> Self {
151        self.pre_polls.push(Box::new(move || {
152            counter.inc();
153            None
154        }));
155        self
156    }
157
158    /// Increment a labeled Prometheus counter when the future is polled.
159    pub fn with_count_labeled<C, L>(mut self, counter: &'static C, labels: L) -> Self
160    where
161        C: Deref<Target = IntCounterWithLabels<L>> + Sync,
162        L: Labels + Sync + Send + 'static,
163    {
164        self.pre_polls.push(Box::new(move || {
165            counter.inc(&labels);
166            None
167        }));
168        self
169    }
170
171    /// Increment a Prometheus gauge until this future has resolved.
172    ///
173    /// When called, this method will immediately increment the given gauge using the
174    /// [`GuardedGauge::gaurded_inc`][gaurded-inc] trait method. This gauge will then be
175    /// decremented once this future's [`Future::poll`][fut-poll] implementation returns a
176    /// [`Poll::Ready`][poll-ready] value.
177    ///
178    /// See the [`GenericGaugeGuard`][gauge-guard] documentation for more information about RAII
179    /// guards for Prometheus metrics.
180    ///
181    /// [fut-poll]: https://doc.rust-lang.org/stable/core/future/trait.Future.html#tymethod.poll
182    /// [gauge-guard]: struct.GenericGaugeGuard.html
183    /// [gaurded-inc]: trait.GuardedGauge.html#tymethod.guarded_inc
184    /// [poll-ready]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Ready
185    pub fn with_count_gauge<G, T, P>(mut self, gauge: &'static G) -> Self
186    where
187        G: Deref<Target = T> + Sync,
188        T: GuardedGauge<P> + 'static,
189        P: Atomic + 'static,
190    {
191        self.pre_polls.push(Box::new(move || {
192            Some(Box::new(gauge.deref().guarded_inc()))
193        }));
194        self
195    }
196}
197
198impl<F: future::Future> future::Future for InstrumentedFuture<F> {
199    /// An instrumented future returns the same type as its inner future.
200    type Output = <F as future::Future>::Output;
201    /// Polls the inner future.
202    ///
203    /// If the inner future's [`Future::poll`][fut-poll] implementation returns a
204    /// [`Poll::Ready`][poll-ready] value, Prometheus gauges will be decremented accordingly.
205    fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
206        use task::Poll::{Pending, Ready};
207        let pin_projection = self.project();
208        for pre_poll in pin_projection.pre_polls.drain(..) {
209            if let Some(droppable) = pre_poll() {
210                pin_projection.resource_guards.push(droppable);
211            }
212        }
213        match pin_projection.inner.poll(cx) {
214            // The inner future is still pending...
215            p @ Pending => p,
216            // If we are here, the inner future resolved! Before returning we should drop any
217            // resource guards that may have been attached to this future.
218            out @ Ready(_) => {
219                pin_projection.resource_guards.clear();
220                out
221            }
222        }
223    }
224}
225
226#[test]
227fn counters_increment_only_when_futures_run() {
228    use lazy_static::lazy_static;
229    use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
230    use std::sync::{atomic::AtomicU8, atomic::Ordering, Arc, Mutex};
231    lazy_static! {
232        static ref WORK_COUNTER: IntCounter = register_int_counter!(opts!(
233            "work_counter",
234            "the number of times `work()` has been called"
235        ))
236        .unwrap();
237        static ref WORK_GAUGE: IntGauge =
238            register_int_gauge!(opts!("work_gauge", "the number `work()` currently running"))
239                .unwrap();
240        static ref CAN_MEASURE: AtomicU8 = AtomicU8::new(0);
241    }
242
243    let work_stoppage = Arc::new(Mutex::new(0));
244
245    async fn work(stop_ref: Arc<Mutex<usize>>) {
246        CAN_MEASURE.store(1, Ordering::SeqCst);
247        *stop_ref.lock().unwrap() = 4;
248    }
249
250    let stop_ref = Arc::clone(&work_stoppage);
251
252    let value_lock = work_stoppage.lock().unwrap();
253
254    // create a future to do some work, but don't run it yet
255    let f = work(stop_ref)
256        .into_instrumented_future()
257        .with_count(&WORK_COUNTER)
258        .with_count_gauge(&WORK_GAUGE);
259
260    assert_eq!(WORK_COUNTER.get(), 0);
261    assert_eq!(WORK_GAUGE.get(), 0);
262
263    let rt = tokio::runtime::Builder::new_multi_thread()
264        .build()
265        .expect("can build runtime");
266    let handle = rt.spawn(f);
267
268    while CAN_MEASURE.load(Ordering::SeqCst) == 0 {
269        // wait for a future point where we know we can sample the counters
270        std::thread::sleep(std::time::Duration::from_millis(10));
271    }
272
273    // we have started `f`, and so we have started `work`, but we have not allowed `work` to
274    // complete, so the gauge should still be 1.
275    assert_eq!(WORK_COUNTER.get(), 1);
276    assert_eq!(WORK_GAUGE.get(), 1);
277
278    std::mem::drop(value_lock);
279
280    rt.block_on(handle).expect("can block on f");
281
282    // now `f` is complete, so the gauge should once again be 0.
283    assert_eq!(WORK_COUNTER.get(), 1);
284    assert_eq!(WORK_GAUGE.get(), 0);
285
286    // and confirm the mutex has been work'd
287    assert_eq!(*work_stoppage.lock().unwrap(), 4);
288}