prometheus_utils/instrumented_future.rs
1//! Utilities for instrumenting asynchronous code.
2//
3// `dyn_drop` is explicitly allowed in this module. As this lint indicates, `Drop` bounds are not
4// a proper heuristic for checking that a type can be trivially dropped. In our case, an
5// instrumented future is responsible for dealing with resource guards whose primary functionality
6// /is/ their drop implementations.
7#![allow(dyn_drop)]
8
9use super::{GuardedGauge, IntCounterWithLabels, Labels};
10use pin_project::pin_project;
11use prometheus::core::{Atomic, GenericCounter};
12use std::{any::Any, future, ops::Deref, pin::Pin, task};
13
14/// An instrumented [`Future`][std-future].
15///
16/// `InstrumentedFuture` provides a transparent observability layer for futures. An instrumented
17/// future is not created directly. Rather, an instrumented future is created _from_ an existing
18/// future using [`IntoInstrumentedFuture::into_instrumented_future`][into-fut].
19///
20/// Most importantly, the [`increment_until_resolved`][incr-until] method allows callers to
21/// increment a [`GuardedGauge`][guarded-gauge], and then decrement the gauge once the future has
22/// resolved.
23///
24/// [guarded-gauge]: trait.GuardedGauge.html
25/// [incr-until]: struct.InstrumentedFuture.html#method.increment_until_resolved
26/// [into-fut]: trait.IntoInstrumentedFuture.html#tymethod.into_instrumented_future
27/// [std-future]: https://doc.rust-lang.org/std/future/trait.Future.html
28///
29/// # Examples
30///
31/// ```no_run
32/// use prometheus_utils::IntoInstrumentedFuture;
33/// use tokio::time::{sleep, Duration};
34///
35/// #[tokio::main]
36/// async fn main() {
37/// let fut = sleep(Duration::from_millis(100));
38/// let instrumented = fut.into_instrumented_future();
39/// let _res = instrumented.await;
40/// }
41/// ```
42#[pin_project]
43#[must_use = "futures do nothing unless you `.await` or poll them"]
44pub struct InstrumentedFuture<F: future::Future> {
45 /// The inner [`Future`][std-future].
46 ///
47 /// ## Structural Pinning
48 ///
49 /// Pinning is structural for `fut`. Because we delegate to it's
50 /// [`Future::poll`][fut-poll] implementation, the inner future *must* be pinned if the outer
51 /// `InstrumentedFuture` is pinned.
52 ///
53 /// See [`std::pin`][projections] and [`pin-project`][pin-project] for more information on
54 /// structural pinning.
55 ///
56 /// [fut-poll]: https://doc.rust-lang.org/stable/core/future/trait.Future.html#tymethod.poll
57 /// [pin-project]: https://docs.rs/pin-project/latest/pin_project/
58 /// [projections]: https://doc.rust-lang.org/stable/std/pin/index.html#projections-and-structural-pinning
59 /// [std-future]: https://doc.rust-lang.org/std/future/trait.Future.html
60 #[pin]
61 inner: F,
62 /// Closures to call before polling the inner `Future`. These may, but are not required to,
63 /// return items to be `Drop`ped when the inner `Future` completes or is cancelled.
64 ///
65 /// In practice, this holds a list of Prometheus counters or gauges to increment when the inner
66 /// `Future` starts, returning a list of guards to decrement once the inner `Future` completes.
67 pre_polls: Vec<Box<dyn FnOnce() -> Option<Box<dyn Any + Send>> + Send>>,
68 /// RAII guards that will be dropped once the future has resolved.
69 ///
70 /// In practice, this is used to hold values like [`IntGaugeGuard`][int-guard] and
71 /// [`GaugeGuard`][float-guard], so that Prometheus metrics are properly decremented once the
72 /// underlying future has been polled to completion.
73 ///
74 /// See [`increment_until_resolved`][inc-until] for more information.
75 ///
76 /// [float-guard]: type.GaugeGuard.html
77 /// [int-guard]: type.IntGaugeGuard.html
78 /// [inc-until]: struct.InstrumentedFuture.html#method.increment_until_resolved
79 #[allow(dyn_drop)]
80 resource_guards: Vec<Box<dyn Any + Send>>,
81}
82
83/// Convert a [`Future`][future::Future] into an instrumented future.
84///
85/// See the [`InstrumentedFuture`] documentation for more information.
86pub trait IntoInstrumentedFuture {
87 /// The underlying to be instrumented.
88 type Future: future::Future;
89 /// Convert this future into an [`InstrumentedFuture`].
90 fn into_instrumented_future(self) -> InstrumentedFuture<Self::Future>;
91}
92
93impl<F: future::Future> IntoInstrumentedFuture for F {
94 type Future = Self;
95 fn into_instrumented_future(self) -> InstrumentedFuture<Self> {
96 InstrumentedFuture {
97 inner: self,
98 pre_polls: vec![],
99 resource_guards: vec![],
100 }
101 }
102}
103
104impl<F: future::Future> InstrumentedFuture<F> {
105 /// Queue `guard_fn` to execute when the future is polled, retaining the returned value until
106 /// the future completes.
107 ///
108 /// # Examples
109 ///
110 /// ```no_run
111 /// use prometheus_utils::IntoInstrumentedFuture;
112 /// use tokio::time::{sleep, Duration};
113 ///
114 /// /// An RAII guard that will be attached to the future.
115 /// struct FutureGuard;
116 ///
117 /// impl Drop for FutureGuard {
118 /// fn drop(&mut self) {
119 /// // This code will be run when the future is ready.
120 /// println!("100ms have elapsed!");
121 /// }
122 /// }
123 ///
124 /// /// An asynchronous function.
125 /// async fn do_work() {
126 /// sleep(Duration::from_millis(100)).await;
127 /// }
128 ///
129 /// #[tokio::main]
130 /// async fn main() {
131 /// do_work()
132 /// .into_instrumented_future()
133 /// .with_guard(|| {
134 /// // This code will be run once the future is polled.
135 /// println!("future was polled!");
136 /// Some(Box::new(FutureGuard))
137 /// })
138 /// .await;
139 /// }
140 /// ```
141 pub fn with_guard<GuardFn: FnOnce() -> Option<Box<dyn Any + Send>> + Send + 'static>(
142 mut self,
143 guard_fn: GuardFn,
144 ) -> Self {
145 self.pre_polls.push(Box::new(guard_fn));
146 self
147 }
148
149 /// Increment a Prometheus counter immediately.
150 pub fn with_count<P: Atomic + 'static>(mut self, counter: &'static GenericCounter<P>) -> Self {
151 self.pre_polls.push(Box::new(move || {
152 counter.inc();
153 None
154 }));
155 self
156 }
157
158 /// Increment a labeled Prometheus counter when the future is polled.
159 pub fn with_count_labeled<C, L>(mut self, counter: &'static C, labels: L) -> Self
160 where
161 C: Deref<Target = IntCounterWithLabels<L>> + Sync,
162 L: Labels + Sync + Send + 'static,
163 {
164 self.pre_polls.push(Box::new(move || {
165 counter.inc(&labels);
166 None
167 }));
168 self
169 }
170
171 /// Increment a Prometheus gauge until this future has resolved.
172 ///
173 /// When called, this method will immediately increment the given gauge using the
174 /// [`GuardedGauge::gaurded_inc`][gaurded-inc] trait method. This gauge will then be
175 /// decremented once this future's [`Future::poll`][fut-poll] implementation returns a
176 /// [`Poll::Ready`][poll-ready] value.
177 ///
178 /// See the [`GenericGaugeGuard`][gauge-guard] documentation for more information about RAII
179 /// guards for Prometheus metrics.
180 ///
181 /// [fut-poll]: https://doc.rust-lang.org/stable/core/future/trait.Future.html#tymethod.poll
182 /// [gauge-guard]: struct.GenericGaugeGuard.html
183 /// [gaurded-inc]: trait.GuardedGauge.html#tymethod.guarded_inc
184 /// [poll-ready]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Ready
185 pub fn with_count_gauge<G, T, P>(mut self, gauge: &'static G) -> Self
186 where
187 G: Deref<Target = T> + Sync,
188 T: GuardedGauge<P> + 'static,
189 P: Atomic + 'static,
190 {
191 self.pre_polls.push(Box::new(move || {
192 Some(Box::new(gauge.deref().guarded_inc()))
193 }));
194 self
195 }
196}
197
198impl<F: future::Future> future::Future for InstrumentedFuture<F> {
199 /// An instrumented future returns the same type as its inner future.
200 type Output = <F as future::Future>::Output;
201 /// Polls the inner future.
202 ///
203 /// If the inner future's [`Future::poll`][fut-poll] implementation returns a
204 /// [`Poll::Ready`][poll-ready] value, Prometheus gauges will be decremented accordingly.
205 fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
206 use task::Poll::{Pending, Ready};
207 let pin_projection = self.project();
208 for pre_poll in pin_projection.pre_polls.drain(..) {
209 if let Some(droppable) = pre_poll() {
210 pin_projection.resource_guards.push(droppable);
211 }
212 }
213 match pin_projection.inner.poll(cx) {
214 // The inner future is still pending...
215 p @ Pending => p,
216 // If we are here, the inner future resolved! Before returning we should drop any
217 // resource guards that may have been attached to this future.
218 out @ Ready(_) => {
219 pin_projection.resource_guards.clear();
220 out
221 }
222 }
223 }
224}
225
226#[test]
227fn counters_increment_only_when_futures_run() {
228 use lazy_static::lazy_static;
229 use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
230 use std::sync::{atomic::AtomicU8, atomic::Ordering, Arc, Mutex};
231 lazy_static! {
232 static ref WORK_COUNTER: IntCounter = register_int_counter!(opts!(
233 "work_counter",
234 "the number of times `work()` has been called"
235 ))
236 .unwrap();
237 static ref WORK_GAUGE: IntGauge =
238 register_int_gauge!(opts!("work_gauge", "the number `work()` currently running"))
239 .unwrap();
240 static ref CAN_MEASURE: AtomicU8 = AtomicU8::new(0);
241 }
242
243 let work_stoppage = Arc::new(Mutex::new(0));
244
245 async fn work(stop_ref: Arc<Mutex<usize>>) {
246 CAN_MEASURE.store(1, Ordering::SeqCst);
247 *stop_ref.lock().unwrap() = 4;
248 }
249
250 let stop_ref = Arc::clone(&work_stoppage);
251
252 let value_lock = work_stoppage.lock().unwrap();
253
254 // create a future to do some work, but don't run it yet
255 let f = work(stop_ref)
256 .into_instrumented_future()
257 .with_count(&WORK_COUNTER)
258 .with_count_gauge(&WORK_GAUGE);
259
260 assert_eq!(WORK_COUNTER.get(), 0);
261 assert_eq!(WORK_GAUGE.get(), 0);
262
263 let rt = tokio::runtime::Builder::new_multi_thread()
264 .build()
265 .expect("can build runtime");
266 let handle = rt.spawn(f);
267
268 while CAN_MEASURE.load(Ordering::SeqCst) == 0 {
269 // wait for a future point where we know we can sample the counters
270 std::thread::sleep(std::time::Duration::from_millis(10));
271 }
272
273 // we have started `f`, and so we have started `work`, but we have not allowed `work` to
274 // complete, so the gauge should still be 1.
275 assert_eq!(WORK_COUNTER.get(), 1);
276 assert_eq!(WORK_GAUGE.get(), 1);
277
278 std::mem::drop(value_lock);
279
280 rt.block_on(handle).expect("can block on f");
281
282 // now `f` is complete, so the gauge should once again be 0.
283 assert_eq!(WORK_COUNTER.get(), 1);
284 assert_eq!(WORK_GAUGE.get(), 0);
285
286 // and confirm the mutex has been work'd
287 assert_eq!(*work_stoppage.lock().unwrap(), 4);
288}