telemetry_rust/
future.rs

1//! Future instrumentation utilities for async operation monitoring.
2//!
3//! This module provides wrapper types and traits for instrumenting async operations
4//! with callbacks that execute when futures complete, enabling monitoring and
5//! metrics collection for async workloads.
6
7use pin_project_lite::pin_project;
8use std::{
9    future::Future,
10    pin::Pin,
11    task::{Context as TaskContext, Poll, ready},
12};
13
14/// Trait for handling the completion of instrumented futures.
15///
16/// This trait provides a callback mechanism to perform actions when an instrumented
17/// future completes with a result. It's typically used for recording metrics,
18/// logging outcomes, or other side effects based on the future's result.
19pub trait InstrumentedFutureContext<T> {
20    /// Called when the instrumented future completes with a result.
21    ///
22    /// # Arguments
23    ///
24    /// - `result`: Reference to the result produced by the future
25    fn on_result(self, result: &T);
26}
27
28pin_project! {
29    /// A future wrapper that provides instrumentation hooks for result handling.
30    ///
31    /// This future wrapper allows for instrumentation of async operations by providing
32    /// a context that is called when the future completes. It ensures that the context
33    /// callback is invoked exactly once when the future produces its result.
34    ///
35    /// # State Management
36    ///
37    /// The future maintains two states:
38    /// - `Pending`: The wrapped future is still executing and contains the future and context
39    /// - `Complete`: The future has completed and the context has been invoked
40    ///
41    /// # Generic Parameters
42    ///
43    /// - `F`: The wrapped future type
44    /// - `C`: The context type that implements [`InstrumentedFutureContext`]
45    ///
46    /// # Fields
47    ///
48    /// The `Pending` variant contains the future being instrumented and the context
49    /// that will be called when it completes. Field documentation is not possible
50    /// within pin_project macros.
51    #[project = InstrumentedFutureProj]
52    #[project_replace = InstrumentedFutureOwn]
53    #[allow(missing_docs)]
54    pub enum InstrumentedFuture<F, C>
55    where
56        F: Future,
57        C: InstrumentedFutureContext<F::Output>,
58    {
59        /// Future is currently executing and waiting for completion
60        Pending {
61            #[pin]
62            future: F,
63            context: C,
64        },
65        /// Future has completed and context has been invoked
66        Complete,
67    }
68}
69
70impl<F, C> InstrumentedFuture<F, C>
71where
72    F: Future,
73    C: InstrumentedFutureContext<F::Output>,
74{
75    /// Creates a new instrumented future with the given future and context.
76    ///
77    /// # Arguments
78    ///
79    /// - `future`: The future to instrument
80    /// - `context`: The context that will be called when the future completes
81    ///
82    /// # Returns
83    ///
84    /// A new [`InstrumentedFuture`] in the `Pending` state
85    ///
86    /// # Examples
87    ///
88    /// ```rust
89    /// use telemetry_rust::future::{InstrumentedFuture, InstrumentedFutureContext};
90    ///
91    /// struct MyContext;
92    /// impl InstrumentedFutureContext<i32> for MyContext {
93    ///     fn on_result(self, result: &i32) {
94    ///         println!("Future completed with result: {}", result);
95    ///     }
96    /// }
97    ///
98    /// let future = async { 42 };
99    /// let instrumented = InstrumentedFuture::new(future, MyContext);
100    /// ```
101    pub fn new(future: F, context: C) -> Self {
102        Self::Pending { future, context }
103    }
104}
105
106impl<F, C> Future for InstrumentedFuture<F, C>
107where
108    F: Future,
109    C: InstrumentedFutureContext<F::Output>,
110{
111    type Output = F::Output;
112
113    fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
114        // First, try to get the ready value of the future
115        let ready = match self.as_mut().project() {
116            InstrumentedFutureProj::Pending { future, context: _ } => {
117                ready!(future.poll(cx))
118            }
119            InstrumentedFutureProj::Complete => panic!("future polled after completion"),
120        };
121
122        // If we got the ready value, we first drop the future: this ensures that the
123        // OpenTelemetry span attached to it is closed and included in the subsequent flush.
124        let context = match self.project_replace(InstrumentedFuture::Complete) {
125            InstrumentedFutureOwn::Pending { future: _, context } => context,
126            InstrumentedFutureOwn::Complete => unreachable!("future already completed"),
127        };
128
129        context.on_result(&ready);
130        Poll::Ready(ready)
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use assert2::assert;
138    use std::sync::atomic::{AtomicUsize, Ordering};
139
140    struct TestContext<'a>(&'a AtomicUsize, usize, i32);
141
142    impl InstrumentedFutureContext<i32> for TestContext<'_> {
143        fn on_result(self, result: &i32) {
144            let Self(counter, expected_count, expected_result) = self;
145            assert!(counter.fetch_add(1, Ordering::AcqRel) == expected_count);
146            assert!(result == &expected_result);
147        }
148    }
149
150    #[tokio::test]
151    async fn test_hooked_future() {
152        let hook_called = AtomicUsize::new(0);
153        let fut1 = async { 42 };
154        let fut2 = InstrumentedFuture::new(fut1, TestContext(&hook_called, 0, 42));
155        let fut3 = InstrumentedFuture::new(fut2, TestContext(&hook_called, 1, 42));
156
157        assert!(hook_called.load(Ordering::Acquire) == 0);
158        let res = fut3.await;
159
160        assert!(hook_called.load(Ordering::Acquire) == 2);
161        assert!(res == 42);
162    }
163}