opentelemetry_lambda_tower/
future.rs1use opentelemetry_sdk::logs::SdkLoggerProvider;
4use opentelemetry_sdk::trace::SdkTracerProvider;
5use opentelemetry_semantic_conventions::attribute::{ERROR_MESSAGE, OTEL_STATUS_CODE};
6use pin_project::pin_project;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11use std::time::Duration;
12use tracing::Span;
13
14#[pin_project]
27pub struct OtelTracingFuture<F, T, E> {
28 #[pin]
29 inner: F,
30 #[pin]
31 flush_future: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
32 span: Option<Span>,
33 tracer_provider: Option<Arc<SdkTracerProvider>>,
34 logger_provider: Option<Arc<SdkLoggerProvider>>,
35 flush_on_end: bool,
36 flush_timeout: Duration,
37 pending_result: Option<Result<T, E>>,
38}
39
40impl<F, T, E> OtelTracingFuture<F, T, E> {
41 pub(crate) fn new(
43 inner: F,
44 span: Span,
45 tracer_provider: Option<Arc<SdkTracerProvider>>,
46 logger_provider: Option<Arc<SdkLoggerProvider>>,
47 flush_on_end: bool,
48 flush_timeout: Duration,
49 ) -> Self {
50 Self {
51 inner,
52 flush_future: None,
53 span: Some(span),
54 tracer_provider,
55 logger_provider,
56 flush_on_end,
57 flush_timeout,
58 pending_result: None,
59 }
60 }
61}
62
63impl<F, T, E> Future for OtelTracingFuture<F, T, E>
64where
65 F: Future<Output = Result<T, E>>,
66 E: std::fmt::Display,
67{
68 type Output = Result<T, E>;
69
70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71 let mut this = self.project();
72
73 if this.pending_result.is_none() {
74 let poll_result = if let Some(span) = this.span.as_ref() {
77 let _guard = span.enter();
78 this.inner.poll(cx)
79 } else {
80 this.inner.poll(cx)
81 };
82
83 match poll_result {
84 Poll::Ready(result) => {
85 if let Some(span) = this.span.as_ref() {
86 match &result {
87 Ok(_) => {
88 span.record(OTEL_STATUS_CODE, "OK");
89 }
90 Err(e) => {
91 span.record(OTEL_STATUS_CODE, "ERROR");
92 span.record(ERROR_MESSAGE, e.to_string().as_str());
93 }
94 }
95 }
96
97 let _ = this.span.take();
100
101 let tracer_provider = this.tracer_provider.take();
102 let logger_provider = this.logger_provider.take();
103
104 if *this.flush_on_end
105 && (tracer_provider.is_some() || logger_provider.is_some())
106 {
107 let timeout = *this.flush_timeout;
108 let flush_future = Box::pin(async move {
109 let _ = tokio::time::timeout(
110 timeout,
111 flush_providers(tracer_provider, logger_provider),
112 )
113 .await;
114 });
115 *this.flush_future = Some(flush_future);
116 *this.pending_result = Some(result);
117 } else {
118 return Poll::Ready(result);
119 }
120 }
121 Poll::Pending => return Poll::Pending,
122 }
123 }
124
125 if let Some(flush_fut) = this.flush_future.as_mut().as_pin_mut() {
126 match flush_fut.poll(cx) {
127 Poll::Ready(()) => {
128 *this.flush_future = None;
129 return Poll::Ready(
130 this.pending_result
131 .take()
132 .expect("pending_result should be set when flushing"),
133 );
134 }
135 Poll::Pending => return Poll::Pending,
136 }
137 }
138
139 Poll::Ready(
140 this.pending_result
141 .take()
142 .expect("pending_result should be set"),
143 )
144 }
145}
146
147async fn flush_providers(
149 tracer_provider: Option<Arc<SdkTracerProvider>>,
150 logger_provider: Option<Arc<SdkLoggerProvider>>,
151) {
152 if let Some(Err(e)) = tracer_provider.map(|p| p.force_flush()) {
153 tracing::warn!(target: "otel_lifecycle", error = ?e, "Failed to flush tracer provider");
154 }
155
156 if let Some(Err(e)) = logger_provider.map(|p| p.force_flush()) {
157 tracing::warn!(target: "otel_lifecycle", error = ?e, "Failed to flush logger provider");
158 }
159}