opentelemetry_lambda_tower/
future.rs

1//! Future implementation that manages span lifecycle and flushing.
2
3use opentelemetry_sdk::logs::SdkLoggerProvider;
4use opentelemetry_sdk::trace::SdkTracerProvider;
5use opentelemetry_semantic_conventions::attribute::{ERROR_MESSAGE, OTEL_STATUS_CODE};
6use pin_project::pin_project;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tracing::Span;
13
14/// Future that wraps an instrumented handler and manages span lifecycle.
15///
16/// This future:
17/// 1. Polls the inner future until completion
18/// 2. Records success/error status on the span
19/// 3. Ends the span
20/// 4. Optionally flushes the tracer provider before returning
21///
22/// The flush ensures spans are exported before Lambda freezes the process.
23/// Critically, the future does NOT return until the flush completes (or times
24/// out), ensuring spans are not lost due to Lambda freezing the execution
25/// environment.
26#[pin_project]
27pub struct OtelTracingFuture<F, T, E> {
28    #[pin]
29    inner: F,
30    #[pin]
31    flush_future: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
32    span: Option<Span>,
33    tracer_provider: Option<Arc<SdkTracerProvider>>,
34    logger_provider: Option<Arc<SdkLoggerProvider>>,
35    flush_on_end: bool,
36    flush_timeout: Duration,
37    pending_result: Option<Result<T, E>>,
38}
39
40impl<F, T, E> OtelTracingFuture<F, T, E> {
41    /// Creates a new tracing future wrapping the given future.
42    pub(crate) fn new(
43        inner: F,
44        span: Span,
45        tracer_provider: Option<Arc<SdkTracerProvider>>,
46        logger_provider: Option<Arc<SdkLoggerProvider>>,
47        flush_on_end: bool,
48        flush_timeout: Duration,
49    ) -> Self {
50        Self {
51            inner,
52            flush_future: None,
53            span: Some(span),
54            tracer_provider,
55            logger_provider,
56            flush_on_end,
57            flush_timeout,
58            pending_result: None,
59        }
60    }
61}
62
63impl<F, T, E> Future for OtelTracingFuture<F, T, E>
64where
65    F: Future<Output = Result<T, E>>,
66    E: std::fmt::Display,
67{
68    type Output = Result<T, E>;
69
70    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71        let mut this = self.project();
72
73        if this.pending_result.is_none() {
74            // Enter the span while polling the inner future so that any
75            // child spans created during the poll have the correct parent.
76            let poll_result = if let Some(span) = this.span.as_ref() {
77                let _guard = span.enter();
78                this.inner.poll(cx)
79            } else {
80                this.inner.poll(cx)
81            };
82
83            match poll_result {
84                Poll::Ready(result) => {
85                    if let Some(span) = this.span.as_ref() {
86                        match &result {
87                            Ok(_) => {
88                                span.record(OTEL_STATUS_CODE, "OK");
89                            }
90                            Err(e) => {
91                                span.record(OTEL_STATUS_CODE, "ERROR");
92                                span.record(ERROR_MESSAGE, e.to_string().as_str());
93                            }
94                        }
95                    }
96
97                    // Drop the span to close it before flushing. This is critical
98                    // because OpenTelemetry only exports spans after they're closed.
99                    let _ = this.span.take();
100
101                    let tracer_provider = this.tracer_provider.take();
102                    let logger_provider = this.logger_provider.take();
103
104                    if *this.flush_on_end
105                        && (tracer_provider.is_some() || logger_provider.is_some())
106                    {
107                        let timeout = *this.flush_timeout;
108                        let flush_future = Box::pin(async move {
109                            let _ = tokio::time::timeout(
110                                timeout,
111                                flush_providers(tracer_provider, logger_provider),
112                            )
113                            .await;
114                        });
115                        *this.flush_future = Some(flush_future);
116                        *this.pending_result = Some(result);
117                    } else {
118                        return Poll::Ready(result);
119                    }
120                }
121                Poll::Pending => return Poll::Pending,
122            }
123        }
124
125        if let Some(flush_fut) = this.flush_future.as_mut().as_pin_mut() {
126            match flush_fut.poll(cx) {
127                Poll::Ready(()) => {
128                    *this.flush_future = None;
129                    return Poll::Ready(
130                        this.pending_result
131                            .take()
132                            .expect("pending_result should be set when flushing"),
133                    );
134                }
135                Poll::Pending => return Poll::Pending,
136            }
137        }
138
139        Poll::Ready(
140            this.pending_result
141                .take()
142                .expect("pending_result should be set"),
143        )
144    }
145}
146
147/// Flushes both tracer and logger providers to ensure telemetry is exported.
148async fn flush_providers(
149    tracer_provider: Option<Arc<SdkTracerProvider>>,
150    logger_provider: Option<Arc<SdkLoggerProvider>>,
151) {
152    if let Some(Err(e)) = tracer_provider.map(|p| p.force_flush()) {
153        tracing::warn!(target: "otel_lifecycle", error = ?e, "Failed to flush tracer provider");
154    }
155
156    if let Some(Err(e)) = logger_provider.map(|p| p.force_flush()) {
157        tracing::warn!(target: "otel_lifecycle", error = ?e, "Failed to flush logger provider");
158    }
159}