opentelemetry_configuration/
guard.rs

1//! OpenTelemetry provider lifecycle management.
2//!
3//! The [`OtelGuard`] manages the lifecycle of OpenTelemetry providers (traces,
4//! metrics, logs). When dropped, it automatically flushes pending data and
5//! shuts down providers gracefully.
6
7use crate::config::{OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::fallback::ExportFallback;
10use opentelemetry::KeyValue;
11use opentelemetry::trace::TracerProvider as _;
12use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
13use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
14use opentelemetry_sdk::Resource;
15use opentelemetry_sdk::logs::{
16    BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
17};
18use opentelemetry_sdk::metrics::SdkMeterProvider;
19use opentelemetry_sdk::trace::{
20    BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
21};
22use tonic::metadata::{MetadataKey, MetadataValue};
23use tracing_subscriber::EnvFilter;
24use tracing_subscriber::layer::SubscriberExt;
25use tracing_subscriber::util::SubscriberInitExt;
26
27/// Guard that manages OpenTelemetry provider lifecycle.
28///
29/// When this guard is dropped, it automatically flushes and shuts down all
30/// configured providers. This ensures telemetry is exported before the Lambda
31/// execution environment freezes.
32///
33/// # Example
34///
35/// ```no_run
36/// use opentelemetry_configuration::{OtelSdkBuilder, SdkError};
37///
38/// fn main() -> Result<(), SdkError> {
39///     let _guard = OtelSdkBuilder::new()
40///         .service_name("my-lambda")
41///         .build()?;
42///
43///     tracing::info!("Application running");
44///
45///     // On drop, all providers are flushed and shut down
46///     Ok(())
47/// }
48/// ```
49pub struct OtelGuard {
50    tracer_provider: Option<SdkTracerProvider>,
51    meter_provider: Option<SdkMeterProvider>,
52    logger_provider: Option<SdkLoggerProvider>,
53    #[allow(dead_code)]
54    fallback: ExportFallback,
55}
56
57impl OtelGuard {
58    /// Creates an OtelGuard from configuration.
59    ///
60    /// This is typically called by [`OtelSdkBuilder::build`](super::OtelSdkBuilder::build).
61    pub(crate) fn from_config(
62        config: OtelSdkConfig,
63        fallback: ExportFallback,
64        custom_resource: Option<Resource>,
65    ) -> Result<Self, SdkError> {
66        let resource = custom_resource.unwrap_or_else(|| build_resource(&config));
67
68        let tracer_provider = if config.traces.enabled {
69            Some(build_tracer_provider(&config, resource.clone())?)
70        } else {
71            None
72        };
73
74        let meter_provider = if config.metrics.enabled {
75            Some(build_meter_provider(&config, resource.clone())?)
76        } else {
77            None
78        };
79
80        let logger_provider = if config.logs.enabled {
81            Some(build_logger_provider(&config, resource)?)
82        } else {
83            None
84        };
85
86        // Set global providers
87        if let Some(ref provider) = tracer_provider {
88            opentelemetry::global::set_tracer_provider(provider.clone());
89        }
90        if let Some(ref provider) = meter_provider {
91            opentelemetry::global::set_meter_provider(provider.clone());
92        }
93
94        // Initialise tracing subscriber if requested
95        if config.init_tracing_subscriber {
96            init_subscriber(&tracer_provider, &logger_provider)?;
97        }
98
99        Ok(Self {
100            tracer_provider,
101            meter_provider,
102            logger_provider,
103            fallback,
104        })
105    }
106
107    /// Returns the tracer provider if configured.
108    pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
109        self.tracer_provider.as_ref()
110    }
111
112    /// Returns the meter provider if configured.
113    pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
114        self.meter_provider.as_ref()
115    }
116
117    /// Returns the logger provider if configured.
118    pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
119        self.logger_provider.as_ref()
120    }
121
122    /// Flushes all configured providers.
123    ///
124    /// This method is called automatically on drop, but can be called manually
125    /// if you need to ensure telemetry is exported at a specific point.
126    ///
127    /// Flush errors are logged via `tracing::warn!` with target `otel_lifecycle`.
128    /// To see these warnings, enable the target in your `RUST_LOG` filter:
129    /// `RUST_LOG=otel_lifecycle=warn`
130    pub fn flush(&self) {
131        if let Some(provider) = &self.tracer_provider
132            && let Err(e) = provider.force_flush()
133        {
134            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
135        }
136
137        if let Some(provider) = &self.meter_provider
138            && let Err(e) = provider.force_flush()
139        {
140            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
141        }
142
143        if let Some(provider) = &self.logger_provider
144            && let Err(e) = provider.force_flush()
145        {
146            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
147        }
148    }
149
150    /// Shuts down all configured providers.
151    ///
152    /// This consumes the guard and shuts down all providers immediately.
153    /// Any further attempts to use the providers will fail.
154    ///
155    /// # Errors
156    ///
157    /// Returns the first error encountered during shutdown.
158    pub fn shutdown(mut self) -> Result<(), SdkError> {
159        if let Some(provider) = self.tracer_provider.take() {
160            provider.force_flush().map_err(SdkError::Flush)?;
161            provider.shutdown().map_err(SdkError::Shutdown)?;
162        }
163
164        if let Some(provider) = self.logger_provider.take() {
165            provider.force_flush().map_err(SdkError::Flush)?;
166            provider.shutdown().map_err(SdkError::Shutdown)?;
167        }
168
169        if let Some(provider) = self.meter_provider.take() {
170            provider.force_flush().map_err(SdkError::Flush)?;
171            provider.shutdown().map_err(SdkError::Shutdown)?;
172        }
173
174        Ok(())
175    }
176}
177
178impl Drop for OtelGuard {
179    fn drop(&mut self) {
180        if let Some(provider) = self.tracer_provider.take() {
181            let _ = provider.force_flush();
182            if let Err(e) = provider.shutdown() {
183                eprintln!("Error shutting down tracer provider: {e}");
184            }
185        }
186
187        if let Some(provider) = self.logger_provider.take() {
188            let _ = provider.force_flush();
189            if let Err(e) = provider.shutdown() {
190                eprintln!("Error shutting down logger provider: {e}");
191            }
192        }
193
194        if let Some(provider) = self.meter_provider.take() {
195            let _ = provider.force_flush();
196            if let Err(e) = provider.shutdown() {
197                eprintln!("Error shutting down meter provider: {e}");
198            }
199        }
200    }
201}
202
203fn build_resource(config: &OtelSdkConfig) -> Resource {
204    let mut attributes: Vec<KeyValue> = config
205        .resource
206        .attributes
207        .iter()
208        .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
209        .collect();
210
211    if let Some(name) = &config.resource.service_name {
212        attributes.push(KeyValue::new("service.name", name.clone()));
213    }
214
215    if let Some(version) = &config.resource.service_version {
216        attributes.push(KeyValue::new("service.version", version.clone()));
217    }
218
219    if let Some(env) = &config.resource.deployment_environment {
220        attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
221    }
222
223    Resource::builder().with_attributes(attributes).build()
224}
225
226fn build_tracer_provider(
227    config: &OtelSdkConfig,
228    resource: Resource,
229) -> Result<SdkTracerProvider, SdkError> {
230    let exporter = match config.endpoint.protocol {
231        Protocol::Grpc => {
232            let endpoint = config.effective_endpoint();
233            let mut builder = opentelemetry_otlp::SpanExporter::builder()
234                .with_tonic()
235                .with_endpoint(&endpoint)
236                .with_timeout(config.endpoint.timeout);
237
238            if !config.endpoint.headers.is_empty() {
239                let mut metadata = tonic::metadata::MetadataMap::new();
240                for (key, value) in &config.endpoint.headers {
241                    if let (Ok(k), Ok(v)) = (
242                        key.parse::<MetadataKey<_>>(),
243                        value.parse::<MetadataValue<_>>(),
244                    ) {
245                        metadata.insert(k, v);
246                    }
247                }
248                builder = builder.with_metadata(metadata);
249            }
250
251            builder.build().map_err(SdkError::TraceExporter)?
252        }
253        Protocol::HttpBinary => {
254            let endpoint = config.signal_endpoint("/v1/traces");
255            let mut builder = opentelemetry_otlp::SpanExporter::builder()
256                .with_http()
257                .with_endpoint(&endpoint)
258                .with_timeout(config.endpoint.timeout)
259                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
260
261            if !config.endpoint.headers.is_empty() {
262                builder = builder.with_headers(config.endpoint.headers.clone());
263            }
264
265            builder.build().map_err(SdkError::TraceExporter)?
266        }
267        Protocol::HttpJson => {
268            let endpoint = config.signal_endpoint("/v1/traces");
269            let mut builder = opentelemetry_otlp::SpanExporter::builder()
270                .with_http()
271                .with_endpoint(&endpoint)
272                .with_timeout(config.endpoint.timeout)
273                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
274
275            if !config.endpoint.headers.is_empty() {
276                builder = builder.with_headers(config.endpoint.headers.clone());
277            }
278
279            builder.build().map_err(SdkError::TraceExporter)?
280        }
281    };
282
283    let batch_config = TraceBatchConfigBuilder::default()
284        .with_max_queue_size(config.traces.batch.max_queue_size)
285        .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
286        .with_scheduled_delay(config.traces.batch.scheduled_delay)
287        .build();
288
289    let span_processor = BatchSpanProcessor::builder(exporter)
290        .with_batch_config(batch_config)
291        .build();
292
293    Ok(SdkTracerProvider::builder()
294        .with_span_processor(span_processor)
295        .with_resource(resource)
296        .build())
297}
298
299fn build_meter_provider(
300    config: &OtelSdkConfig,
301    resource: Resource,
302) -> Result<SdkMeterProvider, SdkError> {
303    let exporter = match config.endpoint.protocol {
304        Protocol::Grpc => {
305            let endpoint = config.effective_endpoint();
306            let mut builder = opentelemetry_otlp::MetricExporter::builder()
307                .with_tonic()
308                .with_endpoint(&endpoint)
309                .with_timeout(config.endpoint.timeout);
310
311            if !config.endpoint.headers.is_empty() {
312                let mut metadata = tonic::metadata::MetadataMap::new();
313                for (key, value) in &config.endpoint.headers {
314                    if let (Ok(k), Ok(v)) = (
315                        key.parse::<MetadataKey<_>>(),
316                        value.parse::<MetadataValue<_>>(),
317                    ) {
318                        metadata.insert(k, v);
319                    }
320                }
321                builder = builder.with_metadata(metadata);
322            }
323
324            builder.build().map_err(SdkError::MetricExporter)?
325        }
326        Protocol::HttpBinary => {
327            let endpoint = config.signal_endpoint("/v1/metrics");
328            let mut builder = opentelemetry_otlp::MetricExporter::builder()
329                .with_http()
330                .with_endpoint(&endpoint)
331                .with_timeout(config.endpoint.timeout)
332                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
333
334            if !config.endpoint.headers.is_empty() {
335                builder = builder.with_headers(config.endpoint.headers.clone());
336            }
337
338            builder.build().map_err(SdkError::MetricExporter)?
339        }
340        Protocol::HttpJson => {
341            let endpoint = config.signal_endpoint("/v1/metrics");
342            let mut builder = opentelemetry_otlp::MetricExporter::builder()
343                .with_http()
344                .with_endpoint(&endpoint)
345                .with_timeout(config.endpoint.timeout)
346                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
347
348            if !config.endpoint.headers.is_empty() {
349                builder = builder.with_headers(config.endpoint.headers.clone());
350            }
351
352            builder.build().map_err(SdkError::MetricExporter)?
353        }
354    };
355
356    let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
357        .with_interval(config.metrics.batch.scheduled_delay)
358        .build();
359
360    Ok(SdkMeterProvider::builder()
361        .with_reader(reader)
362        .with_resource(resource)
363        .build())
364}
365
366fn build_logger_provider(
367    config: &OtelSdkConfig,
368    resource: Resource,
369) -> Result<SdkLoggerProvider, SdkError> {
370    let exporter = match config.endpoint.protocol {
371        Protocol::Grpc => {
372            let endpoint = config.effective_endpoint();
373            let mut builder = opentelemetry_otlp::LogExporter::builder()
374                .with_tonic()
375                .with_endpoint(&endpoint)
376                .with_timeout(config.endpoint.timeout);
377
378            if !config.endpoint.headers.is_empty() {
379                let mut metadata = tonic::metadata::MetadataMap::new();
380                for (key, value) in &config.endpoint.headers {
381                    if let (Ok(k), Ok(v)) = (
382                        key.parse::<MetadataKey<_>>(),
383                        value.parse::<MetadataValue<_>>(),
384                    ) {
385                        metadata.insert(k, v);
386                    }
387                }
388                builder = builder.with_metadata(metadata);
389            }
390
391            builder.build().map_err(SdkError::LogExporter)?
392        }
393        Protocol::HttpBinary => {
394            let endpoint = config.signal_endpoint("/v1/logs");
395            let mut builder = opentelemetry_otlp::LogExporter::builder()
396                .with_http()
397                .with_endpoint(&endpoint)
398                .with_timeout(config.endpoint.timeout)
399                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
400
401            if !config.endpoint.headers.is_empty() {
402                builder = builder.with_headers(config.endpoint.headers.clone());
403            }
404
405            builder.build().map_err(SdkError::LogExporter)?
406        }
407        Protocol::HttpJson => {
408            let endpoint = config.signal_endpoint("/v1/logs");
409            let mut builder = opentelemetry_otlp::LogExporter::builder()
410                .with_http()
411                .with_endpoint(&endpoint)
412                .with_timeout(config.endpoint.timeout)
413                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
414
415            if !config.endpoint.headers.is_empty() {
416                builder = builder.with_headers(config.endpoint.headers.clone());
417            }
418
419            builder.build().map_err(SdkError::LogExporter)?
420        }
421    };
422
423    let batch_config = LogBatchConfigBuilder::default()
424        .with_max_queue_size(config.logs.batch.max_queue_size)
425        .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
426        .with_scheduled_delay(config.logs.batch.scheduled_delay)
427        .build();
428
429    let log_processor = BatchLogProcessor::builder(exporter)
430        .with_batch_config(batch_config)
431        .build();
432
433    Ok(SdkLoggerProvider::builder()
434        .with_log_processor(log_processor)
435        .with_resource(resource)
436        .build())
437}
438
439fn init_subscriber(
440    tracer_provider: &Option<SdkTracerProvider>,
441    logger_provider: &Option<SdkLoggerProvider>,
442) -> Result<(), SdkError> {
443    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
444
445    let fmt_layer = tracing_subscriber::fmt::layer()
446        .with_target(true)
447        .without_time();
448
449    let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
450
451    match (tracer_provider, logger_provider) {
452        (Some(tp), Some(lp)) => {
453            let tracer = tp.tracer("lambda-otel-extension");
454            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
455            let log_layer = OpenTelemetryTracingBridge::new(lp);
456            registry.with(telemetry_layer).with(log_layer).try_init()?;
457        }
458        (Some(tp), None) => {
459            let tracer = tp.tracer("lambda-otel-extension");
460            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
461            registry.with(telemetry_layer).try_init()?;
462        }
463        (None, Some(lp)) => {
464            let log_layer = OpenTelemetryTracingBridge::new(lp);
465            registry.with(log_layer).try_init()?;
466        }
467        (None, None) => {
468            registry.try_init()?;
469        }
470    }
471
472    Ok(())
473}