opentelemetry_configuration/
guard.rs1use crate::config::{ComputeEnvironment, OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::rust_detector::RustResourceDetector;
10use opentelemetry::KeyValue;
11use opentelemetry::propagation::TextMapCompositePropagator;
12use opentelemetry::trace::TracerProvider as _;
13use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
14use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
15use opentelemetry_resource_detectors::{
16 HostResourceDetector, K8sResourceDetector, OsResourceDetector, ProcessResourceDetector,
17};
18use opentelemetry_sdk::Resource;
19use opentelemetry_sdk::logs::{
20 BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
21};
22use opentelemetry_sdk::metrics::SdkMeterProvider;
23use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
24use opentelemetry_sdk::resource::ResourceBuilder;
25use opentelemetry_sdk::trace::{
26 BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
27};
28use std::collections::HashMap;
29use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
30use tracing_subscriber::EnvFilter;
31use tracing_subscriber::layer::SubscriberExt;
32use tracing_subscriber::util::SubscriberInitExt;
33
34#[allow(clippy::struct_field_names)] pub struct OtelGuard {
40 tracer_provider: Option<SdkTracerProvider>,
41 meter_provider: Option<SdkMeterProvider>,
42 logger_provider: Option<SdkLoggerProvider>,
43}
44
45impl OtelGuard {
46 pub(crate) fn from_config(
50 config: &OtelSdkConfig,
51 custom_resource: Option<Resource>,
52 ) -> Result<Self, SdkError> {
53 let resource = custom_resource.unwrap_or_else(|| build_resource(config));
54
55 let tracer_provider = if config.traces.enabled {
56 Some(build_tracer_provider(config, resource.clone())?)
57 } else {
58 None
59 };
60
61 let meter_provider = if config.metrics.enabled {
62 Some(build_meter_provider(config, resource.clone())?)
63 } else {
64 None
65 };
66
67 let logger_provider = if config.logs.enabled {
68 Some(build_logger_provider(config, resource)?)
69 } else {
70 None
71 };
72
73 if let Some(ref provider) = tracer_provider {
74 opentelemetry::global::set_tracer_provider(provider.clone());
75 }
76 if let Some(ref provider) = meter_provider {
77 opentelemetry::global::set_meter_provider(provider.clone());
78 }
79
80 let propagator = TextMapCompositePropagator::new(vec![
81 Box::new(TraceContextPropagator::new()),
82 Box::new(BaggagePropagator::new()),
83 ]);
84 opentelemetry::global::set_text_map_propagator(propagator);
85
86 if config.init_tracing_subscriber {
87 let scope_name = config
88 .instrumentation_scope_name
89 .clone()
90 .or_else(|| config.resource.service_name.clone())
91 .unwrap_or_else(|| "opentelemetry-configuration".to_string());
92 init_subscriber(
93 tracer_provider.as_ref(),
94 logger_provider.as_ref(),
95 scope_name,
96 )?;
97 }
98
99 Ok(Self {
100 tracer_provider,
101 meter_provider,
102 logger_provider,
103 })
104 }
105
106 #[must_use]
108 pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
109 self.tracer_provider.as_ref()
110 }
111
112 #[must_use]
114 pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
115 self.meter_provider.as_ref()
116 }
117
118 #[must_use]
120 pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
121 self.logger_provider.as_ref()
122 }
123
124 pub fn flush(&self) {
126 if let Some(provider) = &self.tracer_provider
127 && let Err(e) = provider.force_flush()
128 {
129 tracing::error!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
130 }
131
132 if let Some(provider) = &self.meter_provider
133 && let Err(e) = provider.force_flush()
134 {
135 tracing::error!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
136 }
137
138 if let Some(provider) = &self.logger_provider
139 && let Err(e) = provider.force_flush()
140 {
141 tracing::error!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
142 }
143 }
144
145 pub fn shutdown(mut self) -> Result<(), SdkError> {
152 if let Some(provider) = self.tracer_provider.take() {
153 provider.force_flush().map_err(SdkError::Flush)?;
154 provider.shutdown().map_err(SdkError::Shutdown)?;
155 }
156
157 if let Some(provider) = self.logger_provider.take() {
158 provider.force_flush().map_err(SdkError::Flush)?;
159 provider.shutdown().map_err(SdkError::Shutdown)?;
160 }
161
162 if let Some(provider) = self.meter_provider.take() {
163 provider.force_flush().map_err(SdkError::Flush)?;
164 provider.shutdown().map_err(SdkError::Shutdown)?;
165 }
166
167 Ok(())
168 }
169}
170
171impl Drop for OtelGuard {
172 fn drop(&mut self) {
173 if let Some(provider) = self.tracer_provider.take() {
174 let _ = provider.force_flush();
175 if let Err(e) = provider.shutdown() {
176 tracing::error!(target: "otel_lifecycle", error = %e, "Failed to shut down tracer provider");
177 }
178 }
179
180 if let Some(provider) = self.logger_provider.take() {
181 let _ = provider.force_flush();
182 if let Err(e) = provider.shutdown() {
183 tracing::error!(target: "otel_lifecycle", error = %e, "Failed to shut down logger provider");
184 }
185 }
186
187 if let Some(provider) = self.meter_provider.take() {
188 let _ = provider.force_flush();
189 if let Err(e) = provider.shutdown() {
190 tracing::error!(target: "otel_lifecycle", error = %e, "Failed to shut down meter provider");
191 }
192 }
193 }
194}
195
196fn build_resource(config: &OtelSdkConfig) -> Resource {
197 let mut builder = Resource::builder();
198
199 match config.resource.compute_environment {
200 ComputeEnvironment::Auto => {
201 builder = builder
202 .with_detector(Box::new(HostResourceDetector::default()))
203 .with_detector(Box::new(OsResourceDetector))
204 .with_detector(Box::new(ProcessResourceDetector))
205 .with_detector(Box::new(RustResourceDetector));
206
207 if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() {
208 builder = add_lambda_attributes(builder);
209 }
210
211 if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() {
212 builder = builder.with_detector(Box::new(K8sResourceDetector));
213 }
214 }
215 ComputeEnvironment::Lambda => {
216 builder = builder
217 .with_detector(Box::new(HostResourceDetector::default()))
218 .with_detector(Box::new(OsResourceDetector))
219 .with_detector(Box::new(ProcessResourceDetector))
220 .with_detector(Box::new(RustResourceDetector));
221 builder = add_lambda_attributes(builder);
222 }
223 ComputeEnvironment::Kubernetes => {
224 builder = builder
225 .with_detector(Box::new(HostResourceDetector::default()))
226 .with_detector(Box::new(OsResourceDetector))
227 .with_detector(Box::new(ProcessResourceDetector))
228 .with_detector(Box::new(RustResourceDetector))
229 .with_detector(Box::new(K8sResourceDetector));
230 }
231 ComputeEnvironment::None => {}
232 }
233
234 let mut attributes: Vec<KeyValue> = config
235 .resource
236 .attributes
237 .iter()
238 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
239 .collect();
240
241 if let Some(name) = &config.resource.service_name {
242 attributes.push(KeyValue::new("service.name", name.clone()));
243 }
244
245 if let Some(version) = &config.resource.service_version {
246 attributes.push(KeyValue::new("service.version", version.clone()));
247 }
248
249 if let Some(env) = &config.resource.deployment_environment {
250 attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
251 }
252
253 builder.with_attributes(attributes).build()
254}
255
256fn add_lambda_attributes(builder: ResourceBuilder) -> ResourceBuilder {
257 let mut attrs = vec![KeyValue::new("cloud.provider", "aws")];
258
259 if let Ok(region) = std::env::var("AWS_REGION") {
260 attrs.push(KeyValue::new("cloud.region", region));
261 }
262 if let Ok(memory) = std::env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
263 attrs.push(KeyValue::new("faas.max_memory", memory));
264 }
265 if let Ok(instance) = std::env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
266 attrs.push(KeyValue::new("faas.instance", instance));
267 }
268 if let Ok(name) = std::env::var("AWS_LAMBDA_FUNCTION_NAME") {
269 attrs.push(KeyValue::new("faas.name", name));
270 }
271 if let Ok(version) = std::env::var("AWS_LAMBDA_FUNCTION_VERSION") {
272 attrs.push(KeyValue::new("faas.version", version));
273 }
274
275 builder.with_attributes(attrs)
276}
277
278fn build_tonic_metadata(headers: &HashMap<String, String>) -> MetadataMap {
279 let mut metadata = MetadataMap::new();
280 for (key, value) in headers {
281 if let (Ok(k), Ok(v)) = (
282 key.parse::<MetadataKey<_>>(),
283 value.parse::<MetadataValue<_>>(),
284 ) {
285 metadata.insert(k, v);
286 }
287 }
288 metadata
289}
290
291macro_rules! build_exporter {
292 ($config:expr, $exporter_type:ident, $signal_path:expr, $error_variant:ident) => {{
293 match $config.endpoint.protocol {
294 Protocol::Grpc => {
295 let endpoint = $config.effective_endpoint();
296 let mut builder = opentelemetry_otlp::$exporter_type::builder()
297 .with_tonic()
298 .with_endpoint(&endpoint)
299 .with_timeout($config.endpoint.timeout);
300
301 if !$config.endpoint.headers.is_empty() {
302 builder =
303 builder.with_metadata(build_tonic_metadata(&$config.endpoint.headers));
304 }
305
306 builder.build().map_err(SdkError::$error_variant)?
307 }
308 Protocol::HttpBinary => {
309 let endpoint = $config.signal_endpoint($signal_path);
310 let mut builder = opentelemetry_otlp::$exporter_type::builder()
311 .with_http()
312 .with_endpoint(&endpoint)
313 .with_timeout($config.endpoint.timeout)
314 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
315
316 if !$config.endpoint.headers.is_empty() {
317 builder = builder.with_headers($config.endpoint.headers.clone());
318 }
319
320 builder.build().map_err(SdkError::$error_variant)?
321 }
322 Protocol::HttpJson => {
323 let endpoint = $config.signal_endpoint($signal_path);
324 let mut builder = opentelemetry_otlp::$exporter_type::builder()
325 .with_http()
326 .with_endpoint(&endpoint)
327 .with_timeout($config.endpoint.timeout)
328 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
329
330 if !$config.endpoint.headers.is_empty() {
331 builder = builder.with_headers($config.endpoint.headers.clone());
332 }
333
334 builder.build().map_err(SdkError::$error_variant)?
335 }
336 }
337 }};
338}
339
340fn build_tracer_provider(
341 config: &OtelSdkConfig,
342 resource: Resource,
343) -> Result<SdkTracerProvider, SdkError> {
344 let exporter = build_exporter!(config, SpanExporter, "/v1/traces", TraceExporter);
345
346 let batch_config = TraceBatchConfigBuilder::default()
347 .with_max_queue_size(config.traces.batch.max_queue_size)
348 .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
349 .with_scheduled_delay(config.traces.batch.scheduled_delay)
350 .build();
351
352 let span_processor = BatchSpanProcessor::builder(exporter)
353 .with_batch_config(batch_config)
354 .build();
355
356 Ok(SdkTracerProvider::builder()
357 .with_span_processor(span_processor)
358 .with_resource(resource)
359 .build())
360}
361
362fn build_meter_provider(
363 config: &OtelSdkConfig,
364 resource: Resource,
365) -> Result<SdkMeterProvider, SdkError> {
366 let exporter = build_exporter!(config, MetricExporter, "/v1/metrics", MetricExporter);
367
368 let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
369 .with_interval(config.metrics.batch.scheduled_delay)
370 .build();
371
372 Ok(SdkMeterProvider::builder()
373 .with_reader(reader)
374 .with_resource(resource)
375 .build())
376}
377
378fn build_logger_provider(
379 config: &OtelSdkConfig,
380 resource: Resource,
381) -> Result<SdkLoggerProvider, SdkError> {
382 let exporter = build_exporter!(config, LogExporter, "/v1/logs", LogExporter);
383
384 let batch_config = LogBatchConfigBuilder::default()
385 .with_max_queue_size(config.logs.batch.max_queue_size)
386 .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
387 .with_scheduled_delay(config.logs.batch.scheduled_delay)
388 .build();
389
390 let log_processor = BatchLogProcessor::builder(exporter)
391 .with_batch_config(batch_config)
392 .build();
393
394 Ok(SdkLoggerProvider::builder()
395 .with_log_processor(log_processor)
396 .with_resource(resource)
397 .build())
398}
399
400fn init_subscriber(
401 tracer_provider: Option<&SdkTracerProvider>,
402 logger_provider: Option<&SdkLoggerProvider>,
403 scope_name: String,
404) -> Result<(), SdkError> {
405 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
406
407 let fmt_layer = tracing_subscriber::fmt::layer()
408 .with_target(true)
409 .without_time();
410
411 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
412
413 match (tracer_provider, logger_provider) {
414 (Some(tp), Some(lp)) => {
415 let tracer = tp.tracer(scope_name);
416 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
417 let log_layer = OpenTelemetryTracingBridge::new(lp);
418 registry.with(telemetry_layer).with(log_layer).try_init()?;
419 }
420 (Some(tp), None) => {
421 let tracer = tp.tracer(scope_name);
422 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
423 registry.with(telemetry_layer).try_init()?;
424 }
425 (None, Some(lp)) => {
426 let log_layer = OpenTelemetryTracingBridge::new(lp);
427 registry.with(log_layer).try_init()?;
428 }
429 (None, None) => {
430 registry.try_init()?;
431 }
432 }
433
434 Ok(())
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 #[test]
442 fn build_resource_with_auto_environment_includes_rust_detector() {
443 let config = OtelSdkConfig {
444 resource: crate::config::ResourceConfig {
445 service_name: Some("test-service".to_string()),
446 compute_environment: ComputeEnvironment::Auto,
447 ..Default::default()
448 },
449 ..Default::default()
450 };
451
452 let resource = build_resource(&config);
453
454 let runtime_name = resource
455 .iter()
456 .find(|(k, _)| k.as_str() == "process.runtime.name");
457 assert!(
458 runtime_name.is_some(),
459 "Auto environment should include Rust detector"
460 );
461 }
462
463 #[test]
464 fn build_resource_with_none_environment_excludes_detectors() {
465 let config = OtelSdkConfig {
466 resource: crate::config::ResourceConfig {
467 service_name: Some("test-service".to_string()),
468 compute_environment: ComputeEnvironment::None,
469 ..Default::default()
470 },
471 ..Default::default()
472 };
473
474 let resource = build_resource(&config);
475
476 let runtime_name = resource
477 .iter()
478 .find(|(k, _)| k.as_str() == "process.runtime.name");
479 assert!(
480 runtime_name.is_none(),
481 "None environment should not run detectors"
482 );
483 }
484
485 #[test]
486 fn build_resource_includes_service_name() {
487 let config = OtelSdkConfig {
488 resource: crate::config::ResourceConfig {
489 service_name: Some("my-test-service".to_string()),
490 compute_environment: ComputeEnvironment::None,
491 ..Default::default()
492 },
493 ..Default::default()
494 };
495
496 let resource = build_resource(&config);
497
498 let service_name = resource
499 .iter()
500 .find(|(k, _)| k.as_str() == "service.name")
501 .map(|(_, v)| v.to_string());
502 assert_eq!(service_name.as_deref(), Some("my-test-service"));
503 }
504
505 #[test]
506 fn build_resource_includes_custom_attributes() {
507 let mut attributes = HashMap::new();
508 attributes.insert("custom.key".to_string(), "custom-value".to_string());
509
510 let config = OtelSdkConfig {
511 resource: crate::config::ResourceConfig {
512 attributes,
513 compute_environment: ComputeEnvironment::None,
514 ..Default::default()
515 },
516 ..Default::default()
517 };
518
519 let resource = build_resource(&config);
520
521 let custom_attr = resource
522 .iter()
523 .find(|(k, _)| k.as_str() == "custom.key")
524 .map(|(_, v)| v.to_string());
525 assert_eq!(custom_attr.as_deref(), Some("custom-value"));
526 }
527
528 #[test]
529 fn build_tonic_metadata_parses_valid_headers() {
530 let mut headers = HashMap::new();
531 headers.insert("authorization".to_string(), "Bearer token123".to_string());
532 headers.insert("x-custom-header".to_string(), "value".to_string());
533
534 let metadata = build_tonic_metadata(&headers);
535
536 assert_eq!(metadata.len(), 2);
537 assert_eq!(
538 metadata.get("authorization").and_then(|v| v.to_str().ok()),
539 Some("Bearer token123")
540 );
541 assert_eq!(
542 metadata
543 .get("x-custom-header")
544 .and_then(|v| v.to_str().ok()),
545 Some("value")
546 );
547 }
548
549 #[test]
550 fn build_tonic_metadata_handles_empty_headers() {
551 let headers = HashMap::new();
552 let metadata = build_tonic_metadata(&headers);
553 assert_eq!(metadata.len(), 0);
554 }
555
556 #[test]
557 fn build_tonic_metadata_skips_invalid_keys() {
558 let mut headers = HashMap::new();
559 headers.insert("valid-key".to_string(), "value".to_string());
560 headers.insert("invalid key with spaces".to_string(), "value".to_string());
561
562 let metadata = build_tonic_metadata(&headers);
563
564 assert_eq!(metadata.len(), 1);
565 assert!(metadata.get("valid-key").is_some());
566 }
567
568 #[test]
569 fn build_tonic_metadata_skips_invalid_values() {
570 let mut headers = HashMap::new();
571 headers.insert("valid-key".to_string(), "valid-value".to_string());
572 headers.insert(
573 "invalid-value-key".to_string(),
574 "value\0with\0nulls".to_string(),
575 );
576
577 let metadata = build_tonic_metadata(&headers);
578
579 assert_eq!(metadata.len(), 1);
580 assert!(metadata.get("valid-key").is_some());
581 }
582
583 #[test]
584 fn build_resource_auto_detects_lambda_from_environment() {
585 temp_env::with_vars(
586 [
587 ("AWS_LAMBDA_FUNCTION_NAME", Some("test-lambda")),
588 ("AWS_REGION", Some("us-east-1")),
589 ],
590 || {
591 let config = OtelSdkConfig {
592 resource: crate::config::ResourceConfig {
593 compute_environment: ComputeEnvironment::Auto,
594 ..Default::default()
595 },
596 ..Default::default()
597 };
598
599 let resource = build_resource(&config);
600
601 let faas_name = resource
602 .iter()
603 .find(|(k, _)| k.as_str() == "faas.name")
604 .map(|(_, v)| v.to_string());
605 assert_eq!(faas_name.as_deref(), Some("test-lambda"));
606
607 let cloud_provider = resource
608 .iter()
609 .find(|(k, _)| k.as_str() == "cloud.provider")
610 .map(|(_, v)| v.to_string());
611 assert_eq!(cloud_provider.as_deref(), Some("aws"));
612 },
613 );
614 }
615
616 #[test]
617 fn add_lambda_attributes_handles_missing_optional_vars() {
618 temp_env::with_var("AWS_LAMBDA_FUNCTION_NAME", Some("minimal-lambda"), || {
619 let config = OtelSdkConfig {
620 resource: crate::config::ResourceConfig {
621 compute_environment: ComputeEnvironment::Lambda,
622 ..Default::default()
623 },
624 ..Default::default()
625 };
626
627 let resource = build_resource(&config);
628
629 let cloud_provider = resource
630 .iter()
631 .find(|(k, _)| k.as_str() == "cloud.provider")
632 .map(|(_, v)| v.to_string());
633 assert_eq!(
634 cloud_provider.as_deref(),
635 Some("aws"),
636 "cloud.provider should always be set for Lambda environment"
637 );
638
639 let faas_name = resource
640 .iter()
641 .find(|(k, _)| k.as_str() == "faas.name")
642 .map(|(_, v)| v.to_string());
643 assert_eq!(faas_name.as_deref(), Some("minimal-lambda"));
644 });
645 }
646}