telemetry_rust/future.rs
1//! Future instrumentation utilities for async operation monitoring.
2//!
3//! This module provides wrapper types and traits for instrumenting async operations
4//! with callbacks that execute when futures complete, enabling monitoring and
5//! metrics collection for async workloads.
6
7use pin_project_lite::pin_project;
8use std::{
9 future::Future,
10 pin::Pin,
11 task::{Context as TaskContext, Poll, ready},
12};
13
14/// Trait for handling the completion of instrumented futures.
15///
16/// This trait provides a callback mechanism to perform actions when an instrumented
17/// future completes with a result. It's typically used for recording metrics,
18/// logging outcomes, or other side effects based on the future's result.
19pub trait InstrumentedFutureContext<T> {
20 /// Called when the instrumented future completes with a result.
21 ///
22 /// # Arguments
23 ///
24 /// - `result`: Reference to the result produced by the future
25 fn on_result(self, result: &T);
26}
27
28pin_project! {
29 /// A future wrapper that provides instrumentation hooks for result handling.
30 ///
31 /// This future wrapper allows for instrumentation of async operations by providing
32 /// a context that is called when the future completes. It ensures that the context
33 /// callback is invoked exactly once when the future produces its result.
34 ///
35 /// # State Management
36 ///
37 /// The future maintains two states:
38 /// - `Pending`: The wrapped future is still executing and contains the future and context
39 /// - `Complete`: The future has completed and the context has been invoked
40 ///
41 /// # Generic Parameters
42 ///
43 /// - `F`: The wrapped future type
44 /// - `C`: The context type that implements [`InstrumentedFutureContext`]
45 ///
46 /// # Fields
47 ///
48 /// The `Pending` variant contains the future being instrumented and the context
49 /// that will be called when it completes. Field documentation is not possible
50 /// within pin_project macros.
51 #[project = InstrumentedFutureProj]
52 #[project_replace = InstrumentedFutureOwn]
53 #[allow(missing_docs)]
54 pub enum InstrumentedFuture<F, C>
55 where
56 F: Future,
57 C: InstrumentedFutureContext<F::Output>,
58 {
59 /// Future is currently executing and waiting for completion
60 Pending {
61 #[pin]
62 future: F,
63 context: C,
64 },
65 /// Future has completed and context has been invoked
66 Complete,
67 }
68}
69
70impl<F, C> InstrumentedFuture<F, C>
71where
72 F: Future,
73 C: InstrumentedFutureContext<F::Output>,
74{
75 /// Creates a new instrumented future with the given future and context.
76 ///
77 /// # Arguments
78 ///
79 /// - `future`: The future to instrument
80 /// - `context`: The context that will be called when the future completes
81 ///
82 /// # Returns
83 ///
84 /// A new [`InstrumentedFuture`] in the `Pending` state
85 ///
86 /// # Examples
87 ///
88 /// ```rust
89 /// use telemetry_rust::future::{InstrumentedFuture, InstrumentedFutureContext};
90 ///
91 /// struct MyContext;
92 /// impl InstrumentedFutureContext<i32> for MyContext {
93 /// fn on_result(self, result: &i32) {
94 /// println!("Future completed with result: {}", result);
95 /// }
96 /// }
97 ///
98 /// let future = async { 42 };
99 /// let instrumented = InstrumentedFuture::new(future, MyContext);
100 /// ```
101 pub fn new(future: F, context: C) -> Self {
102 Self::Pending { future, context }
103 }
104}
105
106impl<F, C> Future for InstrumentedFuture<F, C>
107where
108 F: Future,
109 C: InstrumentedFutureContext<F::Output>,
110{
111 type Output = F::Output;
112
113 fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
114 // First, try to get the ready value of the future
115 let ready = match self.as_mut().project() {
116 InstrumentedFutureProj::Pending { future, context: _ } => {
117 ready!(future.poll(cx))
118 }
119 InstrumentedFutureProj::Complete => panic!("future polled after completion"),
120 };
121
122 // If we got the ready value, we first drop the future: this ensures that the
123 // OpenTelemetry span attached to it is closed and included in the subsequent flush.
124 let context = match self.project_replace(InstrumentedFuture::Complete) {
125 InstrumentedFutureOwn::Pending { future: _, context } => context,
126 InstrumentedFutureOwn::Complete => unreachable!("future already completed"),
127 };
128
129 context.on_result(&ready);
130 Poll::Ready(ready)
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use assert2::assert;
138 use std::sync::atomic::{AtomicUsize, Ordering};
139
140 struct TestContext<'a>(&'a AtomicUsize, usize, i32);
141
142 impl InstrumentedFutureContext<i32> for TestContext<'_> {
143 fn on_result(self, result: &i32) {
144 let Self(counter, expected_count, expected_result) = self;
145 assert!(counter.fetch_add(1, Ordering::AcqRel) == expected_count);
146 assert!(result == &expected_result);
147 }
148 }
149
150 #[tokio::test]
151 async fn test_hooked_future() {
152 let hook_called = AtomicUsize::new(0);
153 let fut1 = async { 42 };
154 let fut2 = InstrumentedFuture::new(fut1, TestContext(&hook_called, 0, 42));
155 let fut3 = InstrumentedFuture::new(fut2, TestContext(&hook_called, 1, 42));
156
157 assert!(hook_called.load(Ordering::Acquire) == 0);
158 let res = fut3.await;
159
160 assert!(hook_called.load(Ordering::Acquire) == 2);
161 assert!(res == 42);
162 }
163}