opentelemetry_configuration/
guard.rs

1//! OpenTelemetry provider lifecycle management.
2//!
3//! The [`OtelGuard`] manages the lifecycle of OpenTelemetry providers (traces,
4//! metrics, logs). When dropped, it automatically flushes pending data and
5//! shuts down providers gracefully.
6
7use crate::config::{ComputeEnvironment, OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::rust_detector::RustResourceDetector;
10use opentelemetry::KeyValue;
11use opentelemetry::propagation::TextMapCompositePropagator;
12use opentelemetry::trace::TracerProvider as _;
13use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
14use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
15use opentelemetry_resource_detectors::{
16    HostResourceDetector, K8sResourceDetector, OsResourceDetector, ProcessResourceDetector,
17};
18use opentelemetry_sdk::Resource;
19use opentelemetry_sdk::logs::{
20    BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
21};
22use opentelemetry_sdk::metrics::SdkMeterProvider;
23use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
24use opentelemetry_sdk::resource::ResourceBuilder;
25use opentelemetry_sdk::trace::{
26    BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
27};
28use std::collections::HashMap;
29use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
30use tracing_subscriber::EnvFilter;
31use tracing_subscriber::layer::SubscriberExt;
32use tracing_subscriber::util::SubscriberInitExt;
33
34/// Guard that manages OpenTelemetry provider lifecycle.
35///
36/// On drop, flushes pending telemetry and shuts down providers.
37/// Use [`shutdown()`](Self::shutdown) for explicit error handling.
38#[allow(clippy::struct_field_names)] // Fields follow OpenTelemetry SDK naming conventions
39pub struct OtelGuard {
40    tracer_provider: Option<SdkTracerProvider>,
41    meter_provider: Option<SdkMeterProvider>,
42    logger_provider: Option<SdkLoggerProvider>,
43}
44
45impl OtelGuard {
46    /// Creates an `OtelGuard` from configuration.
47    ///
48    /// This is typically called by [`OtelSdkBuilder::build`](super::OtelSdkBuilder::build).
49    pub(crate) fn from_config(
50        config: &OtelSdkConfig,
51        custom_resource: Option<Resource>,
52    ) -> Result<Self, SdkError> {
53        let resource = custom_resource.unwrap_or_else(|| build_resource(config));
54
55        let tracer_provider = if config.traces.enabled {
56            Some(build_tracer_provider(config, resource.clone())?)
57        } else {
58            None
59        };
60
61        let meter_provider = if config.metrics.enabled {
62            Some(build_meter_provider(config, resource.clone())?)
63        } else {
64            None
65        };
66
67        let logger_provider = if config.logs.enabled {
68            Some(build_logger_provider(config, resource)?)
69        } else {
70            None
71        };
72
73        if let Some(ref provider) = tracer_provider {
74            opentelemetry::global::set_tracer_provider(provider.clone());
75        }
76        if let Some(ref provider) = meter_provider {
77            opentelemetry::global::set_meter_provider(provider.clone());
78        }
79
80        let propagator = TextMapCompositePropagator::new(vec![
81            Box::new(TraceContextPropagator::new()),
82            Box::new(BaggagePropagator::new()),
83        ]);
84        opentelemetry::global::set_text_map_propagator(propagator);
85
86        if config.init_tracing_subscriber {
87            let scope_name = config
88                .instrumentation_scope_name
89                .clone()
90                .or_else(|| config.resource.service_name.clone())
91                .unwrap_or_else(|| "opentelemetry-configuration".to_string());
92            init_subscriber(
93                tracer_provider.as_ref(),
94                logger_provider.as_ref(),
95                scope_name,
96            )?;
97        }
98
99        Ok(Self {
100            tracer_provider,
101            meter_provider,
102            logger_provider,
103        })
104    }
105
106    /// Returns the tracer provider if configured.
107    #[must_use]
108    pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
109        self.tracer_provider.as_ref()
110    }
111
112    /// Returns the meter provider if configured.
113    #[must_use]
114    pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
115        self.meter_provider.as_ref()
116    }
117
118    /// Returns the logger provider if configured.
119    #[must_use]
120    pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
121        self.logger_provider.as_ref()
122    }
123
124    /// Flushes all configured providers. Errors are logged but not returned.
125    pub fn flush(&self) {
126        if let Some(provider) = &self.tracer_provider
127            && let Err(e) = provider.force_flush()
128        {
129            tracing::error!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
130        }
131
132        if let Some(provider) = &self.meter_provider
133            && let Err(e) = provider.force_flush()
134        {
135            tracing::error!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
136        }
137
138        if let Some(provider) = &self.logger_provider
139            && let Err(e) = provider.force_flush()
140        {
141            tracing::error!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
142        }
143    }
144
145    /// Shuts down all configured providers, returning the first error if any.
146    ///
147    /// # Errors
148    ///
149    /// Returns [`SdkError::Flush`] if flushing a provider fails, or
150    /// [`SdkError::Shutdown`] if shutting down a provider fails.
151    pub fn shutdown(mut self) -> Result<(), SdkError> {
152        if let Some(provider) = self.tracer_provider.take() {
153            provider.force_flush().map_err(SdkError::Flush)?;
154            provider.shutdown().map_err(SdkError::Shutdown)?;
155        }
156
157        if let Some(provider) = self.logger_provider.take() {
158            provider.force_flush().map_err(SdkError::Flush)?;
159            provider.shutdown().map_err(SdkError::Shutdown)?;
160        }
161
162        if let Some(provider) = self.meter_provider.take() {
163            provider.force_flush().map_err(SdkError::Flush)?;
164            provider.shutdown().map_err(SdkError::Shutdown)?;
165        }
166
167        Ok(())
168    }
169}
170
171impl Drop for OtelGuard {
172    fn drop(&mut self) {
173        if let Some(provider) = self.tracer_provider.take() {
174            let _ = provider.force_flush();
175            if let Err(e) = provider.shutdown() {
176                tracing::error!(target: "otel_lifecycle", error = %e, "Failed to shut down tracer provider");
177            }
178        }
179
180        if let Some(provider) = self.logger_provider.take() {
181            let _ = provider.force_flush();
182            if let Err(e) = provider.shutdown() {
183                tracing::error!(target: "otel_lifecycle", error = %e, "Failed to shut down logger provider");
184            }
185        }
186
187        if let Some(provider) = self.meter_provider.take() {
188            let _ = provider.force_flush();
189            if let Err(e) = provider.shutdown() {
190                tracing::error!(target: "otel_lifecycle", error = %e, "Failed to shut down meter provider");
191            }
192        }
193    }
194}
195
196fn build_resource(config: &OtelSdkConfig) -> Resource {
197    let mut builder = Resource::builder();
198
199    match config.resource.compute_environment {
200        ComputeEnvironment::Auto => {
201            builder = builder
202                .with_detector(Box::new(HostResourceDetector::default()))
203                .with_detector(Box::new(OsResourceDetector))
204                .with_detector(Box::new(ProcessResourceDetector))
205                .with_detector(Box::new(RustResourceDetector));
206
207            if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() {
208                builder = add_lambda_attributes(builder);
209            }
210
211            if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() {
212                builder = builder.with_detector(Box::new(K8sResourceDetector));
213            }
214        }
215        ComputeEnvironment::Lambda => {
216            builder = builder
217                .with_detector(Box::new(HostResourceDetector::default()))
218                .with_detector(Box::new(OsResourceDetector))
219                .with_detector(Box::new(ProcessResourceDetector))
220                .with_detector(Box::new(RustResourceDetector));
221            builder = add_lambda_attributes(builder);
222        }
223        ComputeEnvironment::Kubernetes => {
224            builder = builder
225                .with_detector(Box::new(HostResourceDetector::default()))
226                .with_detector(Box::new(OsResourceDetector))
227                .with_detector(Box::new(ProcessResourceDetector))
228                .with_detector(Box::new(RustResourceDetector))
229                .with_detector(Box::new(K8sResourceDetector));
230        }
231        ComputeEnvironment::None => {}
232    }
233
234    let mut attributes: Vec<KeyValue> = config
235        .resource
236        .attributes
237        .iter()
238        .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
239        .collect();
240
241    if let Some(name) = &config.resource.service_name {
242        attributes.push(KeyValue::new("service.name", name.clone()));
243    }
244
245    if let Some(version) = &config.resource.service_version {
246        attributes.push(KeyValue::new("service.version", version.clone()));
247    }
248
249    if let Some(env) = &config.resource.deployment_environment {
250        attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
251    }
252
253    builder.with_attributes(attributes).build()
254}
255
256fn add_lambda_attributes(builder: ResourceBuilder) -> ResourceBuilder {
257    let mut attrs = vec![KeyValue::new("cloud.provider", "aws")];
258
259    if let Ok(region) = std::env::var("AWS_REGION") {
260        attrs.push(KeyValue::new("cloud.region", region));
261    }
262    if let Ok(memory) = std::env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
263        attrs.push(KeyValue::new("faas.max_memory", memory));
264    }
265    if let Ok(instance) = std::env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
266        attrs.push(KeyValue::new("faas.instance", instance));
267    }
268    if let Ok(name) = std::env::var("AWS_LAMBDA_FUNCTION_NAME") {
269        attrs.push(KeyValue::new("faas.name", name));
270    }
271    if let Ok(version) = std::env::var("AWS_LAMBDA_FUNCTION_VERSION") {
272        attrs.push(KeyValue::new("faas.version", version));
273    }
274
275    builder.with_attributes(attrs)
276}
277
278fn build_tonic_metadata(headers: &HashMap<String, String>) -> MetadataMap {
279    let mut metadata = MetadataMap::new();
280    for (key, value) in headers {
281        if let (Ok(k), Ok(v)) = (
282            key.parse::<MetadataKey<_>>(),
283            value.parse::<MetadataValue<_>>(),
284        ) {
285            metadata.insert(k, v);
286        }
287    }
288    metadata
289}
290
291macro_rules! build_exporter {
292    ($config:expr, $exporter_type:ident, $signal_path:expr, $error_variant:ident) => {{
293        match $config.endpoint.protocol {
294            Protocol::Grpc => {
295                let endpoint = $config.effective_endpoint();
296                let mut builder = opentelemetry_otlp::$exporter_type::builder()
297                    .with_tonic()
298                    .with_endpoint(&endpoint)
299                    .with_timeout($config.endpoint.timeout);
300
301                if !$config.endpoint.headers.is_empty() {
302                    builder =
303                        builder.with_metadata(build_tonic_metadata(&$config.endpoint.headers));
304                }
305
306                builder.build().map_err(SdkError::$error_variant)?
307            }
308            Protocol::HttpBinary => {
309                let endpoint = $config.signal_endpoint($signal_path);
310                let mut builder = opentelemetry_otlp::$exporter_type::builder()
311                    .with_http()
312                    .with_endpoint(&endpoint)
313                    .with_timeout($config.endpoint.timeout)
314                    .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
315
316                if !$config.endpoint.headers.is_empty() {
317                    builder = builder.with_headers($config.endpoint.headers.clone());
318                }
319
320                builder.build().map_err(SdkError::$error_variant)?
321            }
322            Protocol::HttpJson => {
323                let endpoint = $config.signal_endpoint($signal_path);
324                let mut builder = opentelemetry_otlp::$exporter_type::builder()
325                    .with_http()
326                    .with_endpoint(&endpoint)
327                    .with_timeout($config.endpoint.timeout)
328                    .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
329
330                if !$config.endpoint.headers.is_empty() {
331                    builder = builder.with_headers($config.endpoint.headers.clone());
332                }
333
334                builder.build().map_err(SdkError::$error_variant)?
335            }
336        }
337    }};
338}
339
340fn build_tracer_provider(
341    config: &OtelSdkConfig,
342    resource: Resource,
343) -> Result<SdkTracerProvider, SdkError> {
344    let exporter = build_exporter!(config, SpanExporter, "/v1/traces", TraceExporter);
345
346    let batch_config = TraceBatchConfigBuilder::default()
347        .with_max_queue_size(config.traces.batch.max_queue_size)
348        .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
349        .with_scheduled_delay(config.traces.batch.scheduled_delay)
350        .build();
351
352    let span_processor = BatchSpanProcessor::builder(exporter)
353        .with_batch_config(batch_config)
354        .build();
355
356    Ok(SdkTracerProvider::builder()
357        .with_span_processor(span_processor)
358        .with_resource(resource)
359        .build())
360}
361
362fn build_meter_provider(
363    config: &OtelSdkConfig,
364    resource: Resource,
365) -> Result<SdkMeterProvider, SdkError> {
366    let exporter = build_exporter!(config, MetricExporter, "/v1/metrics", MetricExporter);
367
368    let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
369        .with_interval(config.metrics.batch.scheduled_delay)
370        .build();
371
372    Ok(SdkMeterProvider::builder()
373        .with_reader(reader)
374        .with_resource(resource)
375        .build())
376}
377
378fn build_logger_provider(
379    config: &OtelSdkConfig,
380    resource: Resource,
381) -> Result<SdkLoggerProvider, SdkError> {
382    let exporter = build_exporter!(config, LogExporter, "/v1/logs", LogExporter);
383
384    let batch_config = LogBatchConfigBuilder::default()
385        .with_max_queue_size(config.logs.batch.max_queue_size)
386        .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
387        .with_scheduled_delay(config.logs.batch.scheduled_delay)
388        .build();
389
390    let log_processor = BatchLogProcessor::builder(exporter)
391        .with_batch_config(batch_config)
392        .build();
393
394    Ok(SdkLoggerProvider::builder()
395        .with_log_processor(log_processor)
396        .with_resource(resource)
397        .build())
398}
399
400fn init_subscriber(
401    tracer_provider: Option<&SdkTracerProvider>,
402    logger_provider: Option<&SdkLoggerProvider>,
403    scope_name: String,
404) -> Result<(), SdkError> {
405    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
406
407    let fmt_layer = tracing_subscriber::fmt::layer()
408        .with_target(true)
409        .without_time();
410
411    let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
412
413    match (tracer_provider, logger_provider) {
414        (Some(tp), Some(lp)) => {
415            let tracer = tp.tracer(scope_name);
416            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
417            let log_layer = OpenTelemetryTracingBridge::new(lp);
418            registry.with(telemetry_layer).with(log_layer).try_init()?;
419        }
420        (Some(tp), None) => {
421            let tracer = tp.tracer(scope_name);
422            let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
423            registry.with(telemetry_layer).try_init()?;
424        }
425        (None, Some(lp)) => {
426            let log_layer = OpenTelemetryTracingBridge::new(lp);
427            registry.with(log_layer).try_init()?;
428        }
429        (None, None) => {
430            registry.try_init()?;
431        }
432    }
433
434    Ok(())
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440
441    #[test]
442    fn build_resource_with_auto_environment_includes_rust_detector() {
443        let config = OtelSdkConfig {
444            resource: crate::config::ResourceConfig {
445                service_name: Some("test-service".to_string()),
446                compute_environment: ComputeEnvironment::Auto,
447                ..Default::default()
448            },
449            ..Default::default()
450        };
451
452        let resource = build_resource(&config);
453
454        let runtime_name = resource
455            .iter()
456            .find(|(k, _)| k.as_str() == "process.runtime.name");
457        assert!(
458            runtime_name.is_some(),
459            "Auto environment should include Rust detector"
460        );
461    }
462
463    #[test]
464    fn build_resource_with_none_environment_excludes_detectors() {
465        let config = OtelSdkConfig {
466            resource: crate::config::ResourceConfig {
467                service_name: Some("test-service".to_string()),
468                compute_environment: ComputeEnvironment::None,
469                ..Default::default()
470            },
471            ..Default::default()
472        };
473
474        let resource = build_resource(&config);
475
476        let runtime_name = resource
477            .iter()
478            .find(|(k, _)| k.as_str() == "process.runtime.name");
479        assert!(
480            runtime_name.is_none(),
481            "None environment should not run detectors"
482        );
483    }
484
485    #[test]
486    fn build_resource_includes_service_name() {
487        let config = OtelSdkConfig {
488            resource: crate::config::ResourceConfig {
489                service_name: Some("my-test-service".to_string()),
490                compute_environment: ComputeEnvironment::None,
491                ..Default::default()
492            },
493            ..Default::default()
494        };
495
496        let resource = build_resource(&config);
497
498        let service_name = resource
499            .iter()
500            .find(|(k, _)| k.as_str() == "service.name")
501            .map(|(_, v)| v.to_string());
502        assert_eq!(service_name.as_deref(), Some("my-test-service"));
503    }
504
505    #[test]
506    fn build_resource_includes_custom_attributes() {
507        let mut attributes = HashMap::new();
508        attributes.insert("custom.key".to_string(), "custom-value".to_string());
509
510        let config = OtelSdkConfig {
511            resource: crate::config::ResourceConfig {
512                attributes,
513                compute_environment: ComputeEnvironment::None,
514                ..Default::default()
515            },
516            ..Default::default()
517        };
518
519        let resource = build_resource(&config);
520
521        let custom_attr = resource
522            .iter()
523            .find(|(k, _)| k.as_str() == "custom.key")
524            .map(|(_, v)| v.to_string());
525        assert_eq!(custom_attr.as_deref(), Some("custom-value"));
526    }
527
528    #[test]
529    fn build_tonic_metadata_parses_valid_headers() {
530        let mut headers = HashMap::new();
531        headers.insert("authorization".to_string(), "Bearer token123".to_string());
532        headers.insert("x-custom-header".to_string(), "value".to_string());
533
534        let metadata = build_tonic_metadata(&headers);
535
536        assert_eq!(metadata.len(), 2);
537        assert_eq!(
538            metadata.get("authorization").and_then(|v| v.to_str().ok()),
539            Some("Bearer token123")
540        );
541        assert_eq!(
542            metadata
543                .get("x-custom-header")
544                .and_then(|v| v.to_str().ok()),
545            Some("value")
546        );
547    }
548
549    #[test]
550    fn build_tonic_metadata_handles_empty_headers() {
551        let headers = HashMap::new();
552        let metadata = build_tonic_metadata(&headers);
553        assert_eq!(metadata.len(), 0);
554    }
555
556    #[test]
557    fn build_tonic_metadata_skips_invalid_keys() {
558        let mut headers = HashMap::new();
559        headers.insert("valid-key".to_string(), "value".to_string());
560        headers.insert("invalid key with spaces".to_string(), "value".to_string());
561
562        let metadata = build_tonic_metadata(&headers);
563
564        assert_eq!(metadata.len(), 1);
565        assert!(metadata.get("valid-key").is_some());
566    }
567
568    #[test]
569    fn build_tonic_metadata_skips_invalid_values() {
570        let mut headers = HashMap::new();
571        headers.insert("valid-key".to_string(), "valid-value".to_string());
572        headers.insert(
573            "invalid-value-key".to_string(),
574            "value\0with\0nulls".to_string(),
575        );
576
577        let metadata = build_tonic_metadata(&headers);
578
579        assert_eq!(metadata.len(), 1);
580        assert!(metadata.get("valid-key").is_some());
581    }
582
583    #[test]
584    fn build_resource_auto_detects_lambda_from_environment() {
585        temp_env::with_vars(
586            [
587                ("AWS_LAMBDA_FUNCTION_NAME", Some("test-lambda")),
588                ("AWS_REGION", Some("us-east-1")),
589            ],
590            || {
591                let config = OtelSdkConfig {
592                    resource: crate::config::ResourceConfig {
593                        compute_environment: ComputeEnvironment::Auto,
594                        ..Default::default()
595                    },
596                    ..Default::default()
597                };
598
599                let resource = build_resource(&config);
600
601                let faas_name = resource
602                    .iter()
603                    .find(|(k, _)| k.as_str() == "faas.name")
604                    .map(|(_, v)| v.to_string());
605                assert_eq!(faas_name.as_deref(), Some("test-lambda"));
606
607                let cloud_provider = resource
608                    .iter()
609                    .find(|(k, _)| k.as_str() == "cloud.provider")
610                    .map(|(_, v)| v.to_string());
611                assert_eq!(cloud_provider.as_deref(), Some("aws"));
612            },
613        );
614    }
615
616    #[test]
617    fn add_lambda_attributes_handles_missing_optional_vars() {
618        temp_env::with_var("AWS_LAMBDA_FUNCTION_NAME", Some("minimal-lambda"), || {
619            let config = OtelSdkConfig {
620                resource: crate::config::ResourceConfig {
621                    compute_environment: ComputeEnvironment::Lambda,
622                    ..Default::default()
623                },
624                ..Default::default()
625            };
626
627            let resource = build_resource(&config);
628
629            let cloud_provider = resource
630                .iter()
631                .find(|(k, _)| k.as_str() == "cloud.provider")
632                .map(|(_, v)| v.to_string());
633            assert_eq!(
634                cloud_provider.as_deref(),
635                Some("aws"),
636                "cloud.provider should always be set for Lambda environment"
637            );
638
639            let faas_name = resource
640                .iter()
641                .find(|(k, _)| k.as_str() == "faas.name")
642                .map(|(_, v)| v.to_string());
643            assert_eq!(faas_name.as_deref(), Some("minimal-lambda"));
644        });
645    }
646}