opentelemetry_configuration/
guard.rs

1//! OpenTelemetry provider lifecycle management.
2//!
3//! The [`OtelGuard`] manages the lifecycle of OpenTelemetry providers (traces,
4//! metrics, logs). When dropped, it automatically flushes pending data and
5//! shuts down providers gracefully.
6
7use crate::config::{ComputeEnvironment, OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::fallback::ExportFallback;
10use crate::rust_detector::RustResourceDetector;
11use opentelemetry::KeyValue;
12use opentelemetry::propagation::TextMapCompositePropagator;
13use opentelemetry::trace::TracerProvider as _;
14use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
15use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
16use opentelemetry_resource_detectors::{
17    HostResourceDetector, K8sResourceDetector, OsResourceDetector, ProcessResourceDetector,
18};
19use opentelemetry_sdk::Resource;
20use opentelemetry_sdk::logs::{
21    BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
22};
23use opentelemetry_sdk::metrics::SdkMeterProvider;
24use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
25use opentelemetry_sdk::resource::ResourceBuilder;
26use opentelemetry_sdk::trace::{
27    BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
28};
29use tonic::metadata::{MetadataKey, MetadataValue};
30use tracing_subscriber::EnvFilter;
31use tracing_subscriber::layer::SubscriberExt;
32use tracing_subscriber::util::SubscriberInitExt;
33
34/// Guard that manages OpenTelemetry provider lifecycle.
35///
36/// When this guard is dropped, it automatically flushes and shuts down all
37/// configured providers. This ensures telemetry is exported before the
38/// application exits or the execution environment freezes.
39///
40/// # Example
41///
42/// ```no_run
43/// use opentelemetry_configuration::{OtelSdkBuilder, SdkError};
44///
45/// fn main() -> Result<(), SdkError> {
46///     let _guard = OtelSdkBuilder::new()
47///         .service_name("my-lambda")
48///         .build()?;
49///
50///     tracing::info!("Application running");
51///
52///     // On drop, all providers are flushed and shut down
53///     Ok(())
54/// }
55/// ```
56pub struct OtelGuard {
57    tracer_provider: Option<SdkTracerProvider>,
58    meter_provider: Option<SdkMeterProvider>,
59    logger_provider: Option<SdkLoggerProvider>,
60    #[allow(dead_code)]
61    fallback: ExportFallback,
62}
63
64impl OtelGuard {
65    /// Creates an OtelGuard from configuration.
66    ///
67    /// This is typically called by [`OtelSdkBuilder::build`](super::OtelSdkBuilder::build).
68    pub(crate) fn from_config(
69        config: OtelSdkConfig,
70        fallback: ExportFallback,
71        custom_resource: Option<Resource>,
72    ) -> Result<Self, SdkError> {
73        let resource = custom_resource.unwrap_or_else(|| build_resource(&config));
74
75        let tracer_provider = if config.traces.enabled {
76            Some(build_tracer_provider(&config, resource.clone())?)
77        } else {
78            None
79        };
80
81        let meter_provider = if config.metrics.enabled {
82            Some(build_meter_provider(&config, resource.clone())?)
83        } else {
84            None
85        };
86
87        let logger_provider = if config.logs.enabled {
88            Some(build_logger_provider(&config, resource)?)
89        } else {
90            None
91        };
92
93        // Set global providers
94        if let Some(ref provider) = tracer_provider {
95            opentelemetry::global::set_tracer_provider(provider.clone());
96        }
97        if let Some(ref provider) = meter_provider {
98            opentelemetry::global::set_meter_provider(provider.clone());
99        }
100
101        // Set W3C propagators for trace context (traceparent) and baggage headers
102        let propagator = TextMapCompositePropagator::new(vec![
103            Box::new(TraceContextPropagator::new()),
104            Box::new(BaggagePropagator::new()),
105        ]);
106        opentelemetry::global::set_text_map_propagator(propagator);
107
108        // Initialise tracing subscriber if requested
109        if config.init_tracing_subscriber {
110            let scope_name = config
111                .instrumentation_scope_name
112                .clone()
113                .or_else(|| config.resource.service_name.clone())
114                .unwrap_or_else(|| "opentelemetry-configuration".to_string());
115            init_subscriber(&tracer_provider, &logger_provider, scope_name)?;
116        }
117
118        Ok(Self {
119            tracer_provider,
120            meter_provider,
121            logger_provider,
122            fallback,
123        })
124    }
125
126    /// Returns the tracer provider if configured.
127    pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
128        self.tracer_provider.as_ref()
129    }
130
131    /// Returns the meter provider if configured.
132    pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
133        self.meter_provider.as_ref()
134    }
135
136    /// Returns the logger provider if configured.
137    pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
138        self.logger_provider.as_ref()
139    }
140
141    /// Flushes all configured providers.
142    ///
143    /// This method is called automatically on drop, but can be called manually
144    /// if you need to ensure telemetry is exported at a specific point.
145    ///
146    /// Flush errors are logged via `tracing::warn!` with target `otel_lifecycle`.
147    /// To see these warnings, enable the target in your `RUST_LOG` filter:
148    /// `RUST_LOG=otel_lifecycle=warn`
149    pub fn flush(&self) {
150        if let Some(provider) = &self.tracer_provider
151            && let Err(e) = provider.force_flush()
152        {
153            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
154        }
155
156        if let Some(provider) = &self.meter_provider
157            && let Err(e) = provider.force_flush()
158        {
159            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
160        }
161
162        if let Some(provider) = &self.logger_provider
163            && let Err(e) = provider.force_flush()
164        {
165            tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
166        }
167    }
168
169    /// Shuts down all configured providers.
170    ///
171    /// This consumes the guard and shuts down all providers immediately.
172    /// Any further attempts to use the providers will fail.
173    ///
174    /// # Errors
175    ///
176    /// Returns the first error encountered during shutdown.
177    pub fn shutdown(mut self) -> Result<(), SdkError> {
178        if let Some(provider) = self.tracer_provider.take() {
179            provider.force_flush().map_err(SdkError::Flush)?;
180            provider.shutdown().map_err(SdkError::Shutdown)?;
181        }
182
183        if let Some(provider) = self.logger_provider.take() {
184            provider.force_flush().map_err(SdkError::Flush)?;
185            provider.shutdown().map_err(SdkError::Shutdown)?;
186        }
187
188        if let Some(provider) = self.meter_provider.take() {
189            provider.force_flush().map_err(SdkError::Flush)?;
190            provider.shutdown().map_err(SdkError::Shutdown)?;
191        }
192
193        Ok(())
194    }
195}
196
197impl Drop for OtelGuard {
198    fn drop(&mut self) {
199        if let Some(provider) = self.tracer_provider.take() {
200            let _ = provider.force_flush();
201            if let Err(e) = provider.shutdown() {
202                eprintln!("Error shutting down tracer provider: {e}");
203            }
204        }
205
206        if let Some(provider) = self.logger_provider.take() {
207            let _ = provider.force_flush();
208            if let Err(e) = provider.shutdown() {
209                eprintln!("Error shutting down logger provider: {e}");
210            }
211        }
212
213        if let Some(provider) = self.meter_provider.take() {
214            let _ = provider.force_flush();
215            if let Err(e) = provider.shutdown() {
216                eprintln!("Error shutting down meter provider: {e}");
217            }
218        }
219    }
220}
221
222fn build_resource(config: &OtelSdkConfig) -> Resource {
223    let mut builder = Resource::builder();
224
225    // Run detectors based on compute environment
226    match config.resource.compute_environment {
227        ComputeEnvironment::Auto => {
228            // Generic detectors (always useful)
229            builder = builder
230                .with_detector(Box::new(HostResourceDetector::default()))
231                .with_detector(Box::new(OsResourceDetector))
232                .with_detector(Box::new(ProcessResourceDetector))
233                .with_detector(Box::new(RustResourceDetector));
234
235            // Probe for Lambda
236            if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() {
237                builder = add_lambda_attributes(builder);
238            }
239            // Probe for K8s
240            if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() {
241                builder = builder.with_detector(Box::new(K8sResourceDetector));
242            }
243        }
244        ComputeEnvironment::Lambda => {
245            builder = builder
246                .with_detector(Box::new(HostResourceDetector::default()))
247                .with_detector(Box::new(OsResourceDetector))
248                .with_detector(Box::new(ProcessResourceDetector))
249                .with_detector(Box::new(RustResourceDetector));
250            builder = add_lambda_attributes(builder);
251        }
252        ComputeEnvironment::Kubernetes => {
253            builder = builder
254                .with_detector(Box::new(HostResourceDetector::default()))
255                .with_detector(Box::new(OsResourceDetector))
256                .with_detector(Box::new(ProcessResourceDetector))
257                .with_detector(Box::new(RustResourceDetector))
258                .with_detector(Box::new(K8sResourceDetector));
259        }
260        ComputeEnvironment::None => {
261            // No automatic detection
262        }
263    }
264
265    // Add explicit attributes from config (these take precedence)
266    let mut attributes: Vec<KeyValue> = config
267        .resource
268        .attributes
269        .iter()
270        .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
271        .collect();
272
273    if let Some(name) = &config.resource.service_name {
274        attributes.push(KeyValue::new("service.name", name.clone()));
275    }
276
277    if let Some(version) = &config.resource.service_version {
278        attributes.push(KeyValue::new("service.version", version.clone()));
279    }
280
281    if let Some(env) = &config.resource.deployment_environment {
282        attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
283    }
284
285    builder.with_attributes(attributes).build()
286}
287
288fn add_lambda_attributes(builder: ResourceBuilder) -> ResourceBuilder {
289    let mut attrs = vec![KeyValue::new("cloud.provider", "aws")];
290
291    if let Ok(region) = std::env::var("AWS_REGION") {
292        attrs.push(KeyValue::new("cloud.region", region));
293    }
294    if let Ok(memory) = std::env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
295        attrs.push(KeyValue::new("faas.max_memory", memory));
296    }
297    if let Ok(instance) = std::env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
298        attrs.push(KeyValue::new("faas.instance", instance));
299    }
300    if let Ok(name) = std::env::var("AWS_LAMBDA_FUNCTION_NAME") {
301        attrs.push(KeyValue::new("faas.name", name));
302    }
303    if let Ok(version) = std::env::var("AWS_LAMBDA_FUNCTION_VERSION") {
304        attrs.push(KeyValue::new("faas.version", version));
305    }
306
307    builder.with_attributes(attrs)
308}
309
310fn build_tracer_provider(
311    config: &OtelSdkConfig,
312    resource: Resource,
313) -> Result<SdkTracerProvider, SdkError> {
314    let exporter = match config.endpoint.protocol {
315        Protocol::Grpc => {
316            let endpoint = config.effective_endpoint();
317            let mut builder = opentelemetry_otlp::SpanExporter::builder()
318                .with_tonic()
319                .with_endpoint(&endpoint)
320                .with_timeout(config.endpoint.timeout);
321
322            if !config.endpoint.headers.is_empty() {
323                let mut metadata = tonic::metadata::MetadataMap::new();
324                for (key, value) in &config.endpoint.headers {
325                    if let (Ok(k), Ok(v)) = (
326                        key.parse::<MetadataKey<_>>(),
327                        value.parse::<MetadataValue<_>>(),
328                    ) {
329                        metadata.insert(k, v);
330                    }
331                }
332                builder = builder.with_metadata(metadata);
333            }
334
335            builder.build().map_err(SdkError::TraceExporter)?
336        }
337        Protocol::HttpBinary => {
338            let endpoint = config.signal_endpoint("/v1/traces");
339            let mut builder = opentelemetry_otlp::SpanExporter::builder()
340                .with_http()
341                .with_endpoint(&endpoint)
342                .with_timeout(config.endpoint.timeout)
343                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
344
345            if !config.endpoint.headers.is_empty() {
346                builder = builder.with_headers(config.endpoint.headers.clone());
347            }
348
349            builder.build().map_err(SdkError::TraceExporter)?
350        }
351        Protocol::HttpJson => {
352            let endpoint = config.signal_endpoint("/v1/traces");
353            let mut builder = opentelemetry_otlp::SpanExporter::builder()
354                .with_http()
355                .with_endpoint(&endpoint)
356                .with_timeout(config.endpoint.timeout)
357                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
358
359            if !config.endpoint.headers.is_empty() {
360                builder = builder.with_headers(config.endpoint.headers.clone());
361            }
362
363            builder.build().map_err(SdkError::TraceExporter)?
364        }
365    };
366
367    let batch_config = TraceBatchConfigBuilder::default()
368        .with_max_queue_size(config.traces.batch.max_queue_size)
369        .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
370        .with_scheduled_delay(config.traces.batch.scheduled_delay)
371        .build();
372
373    let span_processor = BatchSpanProcessor::builder(exporter)
374        .with_batch_config(batch_config)
375        .build();
376
377    Ok(SdkTracerProvider::builder()
378        .with_span_processor(span_processor)
379        .with_resource(resource)
380        .build())
381}
382
383fn build_meter_provider(
384    config: &OtelSdkConfig,
385    resource: Resource,
386) -> Result<SdkMeterProvider, SdkError> {
387    let exporter = match config.endpoint.protocol {
388        Protocol::Grpc => {
389            let endpoint = config.effective_endpoint();
390            let mut builder = opentelemetry_otlp::MetricExporter::builder()
391                .with_tonic()
392                .with_endpoint(&endpoint)
393                .with_timeout(config.endpoint.timeout);
394
395            if !config.endpoint.headers.is_empty() {
396                let mut metadata = tonic::metadata::MetadataMap::new();
397                for (key, value) in &config.endpoint.headers {
398                    if let (Ok(k), Ok(v)) = (
399                        key.parse::<MetadataKey<_>>(),
400                        value.parse::<MetadataValue<_>>(),
401                    ) {
402                        metadata.insert(k, v);
403                    }
404                }
405                builder = builder.with_metadata(metadata);
406            }
407
408            builder.build().map_err(SdkError::MetricExporter)?
409        }
410        Protocol::HttpBinary => {
411            let endpoint = config.signal_endpoint("/v1/metrics");
412            let mut builder = opentelemetry_otlp::MetricExporter::builder()
413                .with_http()
414                .with_endpoint(&endpoint)
415                .with_timeout(config.endpoint.timeout)
416                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
417
418            if !config.endpoint.headers.is_empty() {
419                builder = builder.with_headers(config.endpoint.headers.clone());
420            }
421
422            builder.build().map_err(SdkError::MetricExporter)?
423        }
424        Protocol::HttpJson => {
425            let endpoint = config.signal_endpoint("/v1/metrics");
426            let mut builder = opentelemetry_otlp::MetricExporter::builder()
427                .with_http()
428                .with_endpoint(&endpoint)
429                .with_timeout(config.endpoint.timeout)
430                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
431
432            if !config.endpoint.headers.is_empty() {
433                builder = builder.with_headers(config.endpoint.headers.clone());
434            }
435
436            builder.build().map_err(SdkError::MetricExporter)?
437        }
438    };
439
440    let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
441        .with_interval(config.metrics.batch.scheduled_delay)
442        .build();
443
444    Ok(SdkMeterProvider::builder()
445        .with_reader(reader)
446        .with_resource(resource)
447        .build())
448}
449
450fn build_logger_provider(
451    config: &OtelSdkConfig,
452    resource: Resource,
453) -> Result<SdkLoggerProvider, SdkError> {
454    let exporter = match config.endpoint.protocol {
455        Protocol::Grpc => {
456            let endpoint = config.effective_endpoint();
457            let mut builder = opentelemetry_otlp::LogExporter::builder()
458                .with_tonic()
459                .with_endpoint(&endpoint)
460                .with_timeout(config.endpoint.timeout);
461
462            if !config.endpoint.headers.is_empty() {
463                let mut metadata = tonic::metadata::MetadataMap::new();
464                for (key, value) in &config.endpoint.headers {
465                    if let (Ok(k), Ok(v)) = (
466                        key.parse::<MetadataKey<_>>(),
467                        value.parse::<MetadataValue<_>>(),
468                    ) {
469                        metadata.insert(k, v);
470                    }
471                }
472                builder = builder.with_metadata(metadata);
473            }
474
475            builder.build().map_err(SdkError::LogExporter)?
476        }
477        Protocol::HttpBinary => {
478            let endpoint = config.signal_endpoint("/v1/logs");
479            let mut builder = opentelemetry_otlp::LogExporter::builder()
480                .with_http()
481                .with_endpoint(&endpoint)
482                .with_timeout(config.endpoint.timeout)
483                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
484
485            if !config.endpoint.headers.is_empty() {
486                builder = builder.with_headers(config.endpoint.headers.clone());
487            }
488
489            builder.build().map_err(SdkError::LogExporter)?
490        }
491        Protocol::HttpJson => {
492            let endpoint = config.signal_endpoint("/v1/logs");
493            let mut builder = opentelemetry_otlp::LogExporter::builder()
494                .with_http()
495                .with_endpoint(&endpoint)
496                .with_timeout(config.endpoint.timeout)
497                .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
498
499            if !config.endpoint.headers.is_empty() {
500                builder = builder.with_headers(config.endpoint.headers.clone());
501            }
502
503            builder.build().map_err(SdkError::LogExporter)?
504        }
505    };
506
507    let batch_config = LogBatchConfigBuilder::default()
508        .with_max_queue_size(config.logs.batch.max_queue_size)
509        .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
510        .with_scheduled_delay(config.logs.batch.scheduled_delay)
511        .build();
512
513    let log_processor = BatchLogProcessor::builder(exporter)
514        .with_batch_config(batch_config)
515        .build();
516
517    Ok(SdkLoggerProvider::builder()
518        .with_log_processor(log_processor)
519        .with_resource(resource)
520        .build())
521}
522
523fn init_subscriber(
524    tracer_provider: &Option<SdkTracerProvider>,
525    logger_provider: &Option<SdkLoggerProvider>,
526    scope_name: String,
527) -> Result<(), SdkError> {
528    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
529
530    let fmt_layer = tracing_subscriber::fmt::layer()
531        .with_target(true)
532        .without_time();
533
534    let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
535
536    match (tracer_provider, logger_provider) {
537        (Some(tp), Some(lp)) => {
538            let tracer = tp.tracer(scope_name);
539            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
540            let log_layer = OpenTelemetryTracingBridge::new(lp);
541            registry.with(telemetry_layer).with(log_layer).try_init()?;
542        }
543        (Some(tp), None) => {
544            let tracer = tp.tracer(scope_name);
545            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
546            registry.with(telemetry_layer).try_init()?;
547        }
548        (None, Some(lp)) => {
549            let log_layer = OpenTelemetryTracingBridge::new(lp);
550            registry.with(log_layer).try_init()?;
551        }
552        (None, None) => {
553            registry.try_init()?;
554        }
555    }
556
557    Ok(())
558}