opentelemetry_configuration/
guard.rs

1//! OpenTelemetry provider lifecycle management.
2//!
3//! The [`OtelGuard`] manages the lifecycle of OpenTelemetry providers (traces,
4//! metrics, logs). When dropped, it automatically flushes pending data and
5//! shuts down providers gracefully.
6
7use crate::config::{OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::fallback::ExportFallback;
10use opentelemetry::KeyValue;
11use opentelemetry::propagation::TextMapCompositePropagator;
12use opentelemetry::trace::TracerProvider as _;
13use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
14use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
15use opentelemetry_sdk::Resource;
16use opentelemetry_sdk::logs::{
17    BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
18};
19use opentelemetry_sdk::metrics::SdkMeterProvider;
20use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
21use opentelemetry_sdk::trace::{
22    BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
23};
24use tonic::metadata::{MetadataKey, MetadataValue};
25use tracing_subscriber::EnvFilter;
26use tracing_subscriber::layer::SubscriberExt;
27use tracing_subscriber::util::SubscriberInitExt;
28
29/// Guard that manages OpenTelemetry provider lifecycle.
30///
31/// When this guard is dropped, it automatically flushes and shuts down all
32/// configured providers. This ensures telemetry is exported before the Lambda
33/// execution environment freezes.
34///
35/// # Example
36///
37/// ```no_run
38/// use opentelemetry_configuration::{OtelSdkBuilder, SdkError};
39///
40/// fn main() -> Result<(), SdkError> {
41///     let _guard = OtelSdkBuilder::new()
42///         .service_name("my-lambda")
43///         .build()?;
44///
45///     tracing::info!("Application running");
46///
47///     // On drop, all providers are flushed and shut down
48///     Ok(())
49/// }
50/// ```
51pub struct OtelGuard {
52    tracer_provider: Option<SdkTracerProvider>,
53    meter_provider: Option<SdkMeterProvider>,
54    logger_provider: Option<SdkLoggerProvider>,
55    #[allow(dead_code)]
56    fallback: ExportFallback,
57}
58
59impl OtelGuard {
60    /// Creates an OtelGuard from configuration.
61    ///
62    /// This is typically called by [`OtelSdkBuilder::build`](super::OtelSdkBuilder::build).
63    pub(crate) fn from_config(
64        config: OtelSdkConfig,
65        fallback: ExportFallback,
66        custom_resource: Option<Resource>,
67    ) -> Result<Self, SdkError> {
68        let resource = custom_resource.unwrap_or_else(|| build_resource(&config));
69
70        let tracer_provider = if config.traces.enabled {
71            Some(build_tracer_provider(&config, resource.clone())?)
72        } else {
73            None
74        };
75
76        let meter_provider = if config.metrics.enabled {
77            Some(build_meter_provider(&config, resource.clone())?)
78        } else {
79            None
80        };
81
82        let logger_provider = if config.logs.enabled {
83            Some(build_logger_provider(&config, resource)?)
84        } else {
85            None
86        };
87
88        // Set global providers
89        if let Some(ref provider) = tracer_provider {
90            opentelemetry::global::set_tracer_provider(provider.clone());
91        }
92        if let Some(ref provider) = meter_provider {
93            opentelemetry::global::set_meter_provider(provider.clone());
94        }
95
96        // Set W3C propagators for trace context (traceparent) and baggage headers
97        let propagator = TextMapCompositePropagator::new(vec![
98            Box::new(TraceContextPropagator::new()),
99            Box::new(BaggagePropagator::new()),
100        ]);
101        opentelemetry::global::set_text_map_propagator(propagator);
102
103        // Initialise tracing subscriber if requested
104        if config.init_tracing_subscriber {
105            init_subscriber(&tracer_provider, &logger_provider)?;
106        }
107
108        Ok(Self {
109            tracer_provider,
110            meter_provider,
111            logger_provider,
112            fallback,
113        })
114    }
115
116    /// Returns the tracer provider if configured.
117    pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
118        self.tracer_provider.as_ref()
119    }
120
121    /// Returns the meter provider if configured.
122    pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
123        self.meter_provider.as_ref()
124    }
125
126    /// Returns the logger provider if configured.
127    pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
128        self.logger_provider.as_ref()
129    }
130
131    /// Flushes all configured providers.
132    ///
133    /// This method is called automatically on drop, but can be called manually
134    /// if you need to ensure telemetry is exported at a specific point.
135    ///
136    /// Flush errors are logged via `tracing::warn!` with target `otel_lifecycle`.
137    /// To see these warnings, enable the target in your `RUST_LOG` filter:
138    /// `RUST_LOG=otel_lifecycle=warn`
139    pub fn flush(&self) {
140        if let Some(provider) = &self.tracer_provider
141            && let Err(e) = provider.force_flush()
142        {
143            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
144        }
145
146        if let Some(provider) = &self.meter_provider
147            && let Err(e) = provider.force_flush()
148        {
149            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
150        }
151
152        if let Some(provider) = &self.logger_provider
153            && let Err(e) = provider.force_flush()
154        {
155            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
156        }
157    }
158
159    /// Shuts down all configured providers.
160    ///
161    /// This consumes the guard and shuts down all providers immediately.
162    /// Any further attempts to use the providers will fail.
163    ///
164    /// # Errors
165    ///
166    /// Returns the first error encountered during shutdown.
167    pub fn shutdown(mut self) -> Result<(), SdkError> {
168        if let Some(provider) = self.tracer_provider.take() {
169            provider.force_flush().map_err(SdkError::Flush)?;
170            provider.shutdown().map_err(SdkError::Shutdown)?;
171        }
172
173        if let Some(provider) = self.logger_provider.take() {
174            provider.force_flush().map_err(SdkError::Flush)?;
175            provider.shutdown().map_err(SdkError::Shutdown)?;
176        }
177
178        if let Some(provider) = self.meter_provider.take() {
179            provider.force_flush().map_err(SdkError::Flush)?;
180            provider.shutdown().map_err(SdkError::Shutdown)?;
181        }
182
183        Ok(())
184    }
185}
186
187impl Drop for OtelGuard {
188    fn drop(&mut self) {
189        if let Some(provider) = self.tracer_provider.take() {
190            let _ = provider.force_flush();
191            if let Err(e) = provider.shutdown() {
192                eprintln!("Error shutting down tracer provider: {e}");
193            }
194        }
195
196        if let Some(provider) = self.logger_provider.take() {
197            let _ = provider.force_flush();
198            if let Err(e) = provider.shutdown() {
199                eprintln!("Error shutting down logger provider: {e}");
200            }
201        }
202
203        if let Some(provider) = self.meter_provider.take() {
204            let _ = provider.force_flush();
205            if let Err(e) = provider.shutdown() {
206                eprintln!("Error shutting down meter provider: {e}");
207            }
208        }
209    }
210}
211
212fn build_resource(config: &OtelSdkConfig) -> Resource {
213    let mut attributes: Vec<KeyValue> = config
214        .resource
215        .attributes
216        .iter()
217        .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
218        .collect();
219
220    if let Some(name) = &config.resource.service_name {
221        attributes.push(KeyValue::new("service.name", name.clone()));
222    }
223
224    if let Some(version) = &config.resource.service_version {
225        attributes.push(KeyValue::new("service.version", version.clone()));
226    }
227
228    if let Some(env) = &config.resource.deployment_environment {
229        attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
230    }
231
232    Resource::builder().with_attributes(attributes).build()
233}
234
235fn build_tracer_provider(
236    config: &OtelSdkConfig,
237    resource: Resource,
238) -> Result<SdkTracerProvider, SdkError> {
239    let exporter = match config.endpoint.protocol {
240        Protocol::Grpc => {
241            let endpoint = config.effective_endpoint();
242            let mut builder = opentelemetry_otlp::SpanExporter::builder()
243                .with_tonic()
244                .with_endpoint(&endpoint)
245                .with_timeout(config.endpoint.timeout);
246
247            if !config.endpoint.headers.is_empty() {
248                let mut metadata = tonic::metadata::MetadataMap::new();
249                for (key, value) in &config.endpoint.headers {
250                    if let (Ok(k), Ok(v)) = (
251                        key.parse::<MetadataKey<_>>(),
252                        value.parse::<MetadataValue<_>>(),
253                    ) {
254                        metadata.insert(k, v);
255                    }
256                }
257                builder = builder.with_metadata(metadata);
258            }
259
260            builder.build().map_err(SdkError::TraceExporter)?
261        }
262        Protocol::HttpBinary => {
263            let endpoint = config.signal_endpoint("/v1/traces");
264            let mut builder = opentelemetry_otlp::SpanExporter::builder()
265                .with_http()
266                .with_endpoint(&endpoint)
267                .with_timeout(config.endpoint.timeout)
268                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
269
270            if !config.endpoint.headers.is_empty() {
271                builder = builder.with_headers(config.endpoint.headers.clone());
272            }
273
274            builder.build().map_err(SdkError::TraceExporter)?
275        }
276        Protocol::HttpJson => {
277            let endpoint = config.signal_endpoint("/v1/traces");
278            let mut builder = opentelemetry_otlp::SpanExporter::builder()
279                .with_http()
280                .with_endpoint(&endpoint)
281                .with_timeout(config.endpoint.timeout)
282                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
283
284            if !config.endpoint.headers.is_empty() {
285                builder = builder.with_headers(config.endpoint.headers.clone());
286            }
287
288            builder.build().map_err(SdkError::TraceExporter)?
289        }
290    };
291
292    let batch_config = TraceBatchConfigBuilder::default()
293        .with_max_queue_size(config.traces.batch.max_queue_size)
294        .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
295        .with_scheduled_delay(config.traces.batch.scheduled_delay)
296        .build();
297
298    let span_processor = BatchSpanProcessor::builder(exporter)
299        .with_batch_config(batch_config)
300        .build();
301
302    Ok(SdkTracerProvider::builder()
303        .with_span_processor(span_processor)
304        .with_resource(resource)
305        .build())
306}
307
308fn build_meter_provider(
309    config: &OtelSdkConfig,
310    resource: Resource,
311) -> Result<SdkMeterProvider, SdkError> {
312    let exporter = match config.endpoint.protocol {
313        Protocol::Grpc => {
314            let endpoint = config.effective_endpoint();
315            let mut builder = opentelemetry_otlp::MetricExporter::builder()
316                .with_tonic()
317                .with_endpoint(&endpoint)
318                .with_timeout(config.endpoint.timeout);
319
320            if !config.endpoint.headers.is_empty() {
321                let mut metadata = tonic::metadata::MetadataMap::new();
322                for (key, value) in &config.endpoint.headers {
323                    if let (Ok(k), Ok(v)) = (
324                        key.parse::<MetadataKey<_>>(),
325                        value.parse::<MetadataValue<_>>(),
326                    ) {
327                        metadata.insert(k, v);
328                    }
329                }
330                builder = builder.with_metadata(metadata);
331            }
332
333            builder.build().map_err(SdkError::MetricExporter)?
334        }
335        Protocol::HttpBinary => {
336            let endpoint = config.signal_endpoint("/v1/metrics");
337            let mut builder = opentelemetry_otlp::MetricExporter::builder()
338                .with_http()
339                .with_endpoint(&endpoint)
340                .with_timeout(config.endpoint.timeout)
341                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
342
343            if !config.endpoint.headers.is_empty() {
344                builder = builder.with_headers(config.endpoint.headers.clone());
345            }
346
347            builder.build().map_err(SdkError::MetricExporter)?
348        }
349        Protocol::HttpJson => {
350            let endpoint = config.signal_endpoint("/v1/metrics");
351            let mut builder = opentelemetry_otlp::MetricExporter::builder()
352                .with_http()
353                .with_endpoint(&endpoint)
354                .with_timeout(config.endpoint.timeout)
355                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
356
357            if !config.endpoint.headers.is_empty() {
358                builder = builder.with_headers(config.endpoint.headers.clone());
359            }
360
361            builder.build().map_err(SdkError::MetricExporter)?
362        }
363    };
364
365    let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
366        .with_interval(config.metrics.batch.scheduled_delay)
367        .build();
368
369    Ok(SdkMeterProvider::builder()
370        .with_reader(reader)
371        .with_resource(resource)
372        .build())
373}
374
375fn build_logger_provider(
376    config: &OtelSdkConfig,
377    resource: Resource,
378) -> Result<SdkLoggerProvider, SdkError> {
379    let exporter = match config.endpoint.protocol {
380        Protocol::Grpc => {
381            let endpoint = config.effective_endpoint();
382            let mut builder = opentelemetry_otlp::LogExporter::builder()
383                .with_tonic()
384                .with_endpoint(&endpoint)
385                .with_timeout(config.endpoint.timeout);
386
387            if !config.endpoint.headers.is_empty() {
388                let mut metadata = tonic::metadata::MetadataMap::new();
389                for (key, value) in &config.endpoint.headers {
390                    if let (Ok(k), Ok(v)) = (
391                        key.parse::<MetadataKey<_>>(),
392                        value.parse::<MetadataValue<_>>(),
393                    ) {
394                        metadata.insert(k, v);
395                    }
396                }
397                builder = builder.with_metadata(metadata);
398            }
399
400            builder.build().map_err(SdkError::LogExporter)?
401        }
402        Protocol::HttpBinary => {
403            let endpoint = config.signal_endpoint("/v1/logs");
404            let mut builder = opentelemetry_otlp::LogExporter::builder()
405                .with_http()
406                .with_endpoint(&endpoint)
407                .with_timeout(config.endpoint.timeout)
408                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
409
410            if !config.endpoint.headers.is_empty() {
411                builder = builder.with_headers(config.endpoint.headers.clone());
412            }
413
414            builder.build().map_err(SdkError::LogExporter)?
415        }
416        Protocol::HttpJson => {
417            let endpoint = config.signal_endpoint("/v1/logs");
418            let mut builder = opentelemetry_otlp::LogExporter::builder()
419                .with_http()
420                .with_endpoint(&endpoint)
421                .with_timeout(config.endpoint.timeout)
422                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
423
424            if !config.endpoint.headers.is_empty() {
425                builder = builder.with_headers(config.endpoint.headers.clone());
426            }
427
428            builder.build().map_err(SdkError::LogExporter)?
429        }
430    };
431
432    let batch_config = LogBatchConfigBuilder::default()
433        .with_max_queue_size(config.logs.batch.max_queue_size)
434        .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
435        .with_scheduled_delay(config.logs.batch.scheduled_delay)
436        .build();
437
438    let log_processor = BatchLogProcessor::builder(exporter)
439        .with_batch_config(batch_config)
440        .build();
441
442    Ok(SdkLoggerProvider::builder()
443        .with_log_processor(log_processor)
444        .with_resource(resource)
445        .build())
446}
447
448fn init_subscriber(
449    tracer_provider: &Option<SdkTracerProvider>,
450    logger_provider: &Option<SdkLoggerProvider>,
451) -> Result<(), SdkError> {
452    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
453
454    let fmt_layer = tracing_subscriber::fmt::layer()
455        .with_target(true)
456        .without_time();
457
458    let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
459
460    match (tracer_provider, logger_provider) {
461        (Some(tp), Some(lp)) => {
462            let tracer = tp.tracer("lambda-otel-extension");
463            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
464            let log_layer = OpenTelemetryTracingBridge::new(lp);
465            registry.with(telemetry_layer).with(log_layer).try_init()?;
466        }
467        (Some(tp), None) => {
468            let tracer = tp.tracer("lambda-otel-extension");
469            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
470            registry.with(telemetry_layer).try_init()?;
471        }
472        (None, Some(lp)) => {
473            let log_layer = OpenTelemetryTracingBridge::new(lp);
474            registry.with(log_layer).try_init()?;
475        }
476        (None, None) => {
477            registry.try_init()?;
478        }
479    }
480
481    Ok(())
482}