apollo-opentelemetry 0.8.0

OpenTelemetry configuration types for Apollo platform
Documentation
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use pin_project_lite::pin_project;

pin_project! {
    /// A future that drops a guard at the point it resolves, or when cancelled.
    ///
    /// Created by [`FutureExt::with_guard`].
    pub struct WithGuard<F, G> {
        #[pin]
        future: F,
        // Wrapped in Option so we can drop it eagerly when the future resolves rather than
        // waiting until WithGuard itself is dropped. On cancellation the Option is still Some
        // and the guard drops with the struct.
        guard: Option<G>,
    }
}

impl<F: Future, G> Future for WithGuard<F, G> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let result = this.future.poll(cx);
        if result.is_ready() {
            // Drop the guard as soon as the future resolves, not when WithGuard is dropped.
            // This ensures metrics (e.g. an active-request counter) reflect completion at the
            // point the result is produced, not when the caller finishes handling it.
            *this.guard = None;
        }
        result
    }
}

/// Extension trait for attaching a guard to a future.
///
/// The guard can be any type — it is held for its `Drop` side-effect.
///
/// - **Resolved**: the guard is dropped at the moment the future produces its result, before
///   control returns to the caller. This means an active-request counter, for example,
///   decrements as soon as the response is ready — not when the caller's scope exits.
/// - **Cancelled**: if the future is dropped before resolving, the guard drops with it.
///
/// # Example
///
/// ```
/// use opentelemetry::metrics::UpDownCounter;
/// use apollo_opentelemetry::metrics::{FutureExt, UpDownCounterExt};
/// # async fn process() {}
///
/// async fn handle(counter: UpDownCounter<i64>) {
///     process()
///         .with_guard(counter.track([]))
///         .await;
/// }
/// # fn main() {}
/// ```
pub trait FutureExt: Future + Sized {
    /// Attaches `guard` to this future. `guard` can be any type; it is dropped when the future
    /// resolves (at the point the result is produced) or when the future is cancelled.
    fn with_guard<G>(self, guard: G) -> WithGuard<Self, G>;
}

impl<F: Future> FutureExt for F {
    fn with_guard<G>(self, guard: G) -> WithGuard<Self, G> {
        WithGuard {
            future: self,
            guard: Some(guard),
        }
    }
}

#[cfg(test)]
mod tests {
    use apollo_opentelemetry_test::{TelemetryContext, assert_metrics_snapshot};
    use opentelemetry::global;

    use crate::metrics::{FutureExt, UpDownCounterExt};

    #[tokio::test]
    async fn guard_drops_on_future_completion() {
        let ctx = TelemetryContext::new();
        let counter = global::meter_provider()
            .meter("test")
            .i64_up_down_counter("test.active")
            .build();

        let guard = counter.track([]);

        async {
            assert_metrics_snapshot!(ctx, @r"
            - name: test.active
              data:
                type: Sum
                data_points:
                  - value: 1
                is_monotonic: false
                temporality: Cumulative
            ");
        }
        .with_guard(guard)
        .await;

        assert_metrics_snapshot!(ctx, @r"
        - name: test.active
          data:
            type: Sum
            data_points:
              - value: 0
            is_monotonic: false
            temporality: Cumulative
        ");
    }

    #[test]
    fn guard_drops_on_future_cancellation() {
        let ctx = TelemetryContext::new();
        let counter = global::meter_provider()
            .meter("test")
            .i64_up_down_counter("test.active")
            .build();

        let guard = counter.track([]);

        assert_metrics_snapshot!(ctx, @r"
        - name: test.active
          data:
            type: Sum
            data_points:
              - value: 1
            is_monotonic: false
            temporality: Cumulative
        ");

        // Drop the WithGuard future without polling it to completion (simulates cancellation).
        let future = std::future::pending::<()>().with_guard(guard);
        drop(future);

        assert_metrics_snapshot!(ctx, @r"
        - name: test.active
          data:
            type: Sum
            data_points:
              - value: 0
            is_monotonic: false
            temporality: Cumulative
        ");
    }
}