opentelemetry_configuration/
guard.rs1use crate::config::{ComputeEnvironment, OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::fallback::ExportFallback;
10use crate::rust_detector::RustResourceDetector;
11use opentelemetry::KeyValue;
12use opentelemetry::propagation::TextMapCompositePropagator;
13use opentelemetry::trace::TracerProvider as _;
14use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
15use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
16use opentelemetry_resource_detectors::{
17 HostResourceDetector, K8sResourceDetector, OsResourceDetector, ProcessResourceDetector,
18};
19use opentelemetry_sdk::Resource;
20use opentelemetry_sdk::logs::{
21 BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
22};
23use opentelemetry_sdk::metrics::SdkMeterProvider;
24use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
25use opentelemetry_sdk::resource::ResourceBuilder;
26use opentelemetry_sdk::trace::{
27 BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
28};
29use tonic::metadata::{MetadataKey, MetadataValue};
30use tracing_subscriber::EnvFilter;
31use tracing_subscriber::layer::SubscriberExt;
32use tracing_subscriber::util::SubscriberInitExt;
33
34pub struct OtelGuard {
57 tracer_provider: Option<SdkTracerProvider>,
58 meter_provider: Option<SdkMeterProvider>,
59 logger_provider: Option<SdkLoggerProvider>,
60 #[allow(dead_code)]
61 fallback: ExportFallback,
62}
63
64impl OtelGuard {
65 pub(crate) fn from_config(
69 config: OtelSdkConfig,
70 fallback: ExportFallback,
71 custom_resource: Option<Resource>,
72 ) -> Result<Self, SdkError> {
73 let resource = custom_resource.unwrap_or_else(|| build_resource(&config));
74
75 let tracer_provider = if config.traces.enabled {
76 Some(build_tracer_provider(&config, resource.clone())?)
77 } else {
78 None
79 };
80
81 let meter_provider = if config.metrics.enabled {
82 Some(build_meter_provider(&config, resource.clone())?)
83 } else {
84 None
85 };
86
87 let logger_provider = if config.logs.enabled {
88 Some(build_logger_provider(&config, resource)?)
89 } else {
90 None
91 };
92
93 if let Some(ref provider) = tracer_provider {
95 opentelemetry::global::set_tracer_provider(provider.clone());
96 }
97 if let Some(ref provider) = meter_provider {
98 opentelemetry::global::set_meter_provider(provider.clone());
99 }
100
101 let propagator = TextMapCompositePropagator::new(vec![
103 Box::new(TraceContextPropagator::new()),
104 Box::new(BaggagePropagator::new()),
105 ]);
106 opentelemetry::global::set_text_map_propagator(propagator);
107
108 if config.init_tracing_subscriber {
110 let scope_name = config
111 .instrumentation_scope_name
112 .clone()
113 .or_else(|| config.resource.service_name.clone())
114 .unwrap_or_else(|| "opentelemetry-configuration".to_string());
115 init_subscriber(&tracer_provider, &logger_provider, scope_name)?;
116 }
117
118 Ok(Self {
119 tracer_provider,
120 meter_provider,
121 logger_provider,
122 fallback,
123 })
124 }
125
126 pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
128 self.tracer_provider.as_ref()
129 }
130
131 pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
133 self.meter_provider.as_ref()
134 }
135
136 pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
138 self.logger_provider.as_ref()
139 }
140
141 pub fn flush(&self) {
150 if let Some(provider) = &self.tracer_provider
151 && let Err(e) = provider.force_flush()
152 {
153 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
154 }
155
156 if let Some(provider) = &self.meter_provider
157 && let Err(e) = provider.force_flush()
158 {
159 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
160 }
161
162 if let Some(provider) = &self.logger_provider
163 && let Err(e) = provider.force_flush()
164 {
165 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
166 }
167 }
168
169 pub fn shutdown(mut self) -> Result<(), SdkError> {
178 if let Some(provider) = self.tracer_provider.take() {
179 provider.force_flush().map_err(SdkError::Flush)?;
180 provider.shutdown().map_err(SdkError::Shutdown)?;
181 }
182
183 if let Some(provider) = self.logger_provider.take() {
184 provider.force_flush().map_err(SdkError::Flush)?;
185 provider.shutdown().map_err(SdkError::Shutdown)?;
186 }
187
188 if let Some(provider) = self.meter_provider.take() {
189 provider.force_flush().map_err(SdkError::Flush)?;
190 provider.shutdown().map_err(SdkError::Shutdown)?;
191 }
192
193 Ok(())
194 }
195}
196
197impl Drop for OtelGuard {
198 fn drop(&mut self) {
199 if let Some(provider) = self.tracer_provider.take() {
200 let _ = provider.force_flush();
201 if let Err(e) = provider.shutdown() {
202 eprintln!("Error shutting down tracer provider: {e}");
203 }
204 }
205
206 if let Some(provider) = self.logger_provider.take() {
207 let _ = provider.force_flush();
208 if let Err(e) = provider.shutdown() {
209 eprintln!("Error shutting down logger provider: {e}");
210 }
211 }
212
213 if let Some(provider) = self.meter_provider.take() {
214 let _ = provider.force_flush();
215 if let Err(e) = provider.shutdown() {
216 eprintln!("Error shutting down meter provider: {e}");
217 }
218 }
219 }
220}
221
222fn build_resource(config: &OtelSdkConfig) -> Resource {
223 let mut builder = Resource::builder();
224
225 match config.resource.compute_environment {
227 ComputeEnvironment::Auto => {
228 builder = builder
230 .with_detector(Box::new(HostResourceDetector::default()))
231 .with_detector(Box::new(OsResourceDetector))
232 .with_detector(Box::new(ProcessResourceDetector))
233 .with_detector(Box::new(RustResourceDetector));
234
235 if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() {
237 builder = add_lambda_attributes(builder);
238 }
239 if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() {
241 builder = builder.with_detector(Box::new(K8sResourceDetector));
242 }
243 }
244 ComputeEnvironment::Lambda => {
245 builder = builder
246 .with_detector(Box::new(HostResourceDetector::default()))
247 .with_detector(Box::new(OsResourceDetector))
248 .with_detector(Box::new(ProcessResourceDetector))
249 .with_detector(Box::new(RustResourceDetector));
250 builder = add_lambda_attributes(builder);
251 }
252 ComputeEnvironment::Kubernetes => {
253 builder = builder
254 .with_detector(Box::new(HostResourceDetector::default()))
255 .with_detector(Box::new(OsResourceDetector))
256 .with_detector(Box::new(ProcessResourceDetector))
257 .with_detector(Box::new(RustResourceDetector))
258 .with_detector(Box::new(K8sResourceDetector));
259 }
260 ComputeEnvironment::None => {
261 }
263 }
264
265 let mut attributes: Vec<KeyValue> = config
267 .resource
268 .attributes
269 .iter()
270 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
271 .collect();
272
273 if let Some(name) = &config.resource.service_name {
274 attributes.push(KeyValue::new("service.name", name.clone()));
275 }
276
277 if let Some(version) = &config.resource.service_version {
278 attributes.push(KeyValue::new("service.version", version.clone()));
279 }
280
281 if let Some(env) = &config.resource.deployment_environment {
282 attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
283 }
284
285 builder.with_attributes(attributes).build()
286}
287
288fn add_lambda_attributes(builder: ResourceBuilder) -> ResourceBuilder {
289 let mut attrs = vec![KeyValue::new("cloud.provider", "aws")];
290
291 if let Ok(region) = std::env::var("AWS_REGION") {
292 attrs.push(KeyValue::new("cloud.region", region));
293 }
294 if let Ok(memory) = std::env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") {
295 attrs.push(KeyValue::new("faas.max_memory", memory));
296 }
297 if let Ok(instance) = std::env::var("AWS_LAMBDA_LOG_STREAM_NAME") {
298 attrs.push(KeyValue::new("faas.instance", instance));
299 }
300 if let Ok(name) = std::env::var("AWS_LAMBDA_FUNCTION_NAME") {
301 attrs.push(KeyValue::new("faas.name", name));
302 }
303 if let Ok(version) = std::env::var("AWS_LAMBDA_FUNCTION_VERSION") {
304 attrs.push(KeyValue::new("faas.version", version));
305 }
306
307 builder.with_attributes(attrs)
308}
309
310fn build_tracer_provider(
311 config: &OtelSdkConfig,
312 resource: Resource,
313) -> Result<SdkTracerProvider, SdkError> {
314 let exporter = match config.endpoint.protocol {
315 Protocol::Grpc => {
316 let endpoint = config.effective_endpoint();
317 let mut builder = opentelemetry_otlp::SpanExporter::builder()
318 .with_tonic()
319 .with_endpoint(&endpoint)
320 .with_timeout(config.endpoint.timeout);
321
322 if !config.endpoint.headers.is_empty() {
323 let mut metadata = tonic::metadata::MetadataMap::new();
324 for (key, value) in &config.endpoint.headers {
325 if let (Ok(k), Ok(v)) = (
326 key.parse::<MetadataKey<_>>(),
327 value.parse::<MetadataValue<_>>(),
328 ) {
329 metadata.insert(k, v);
330 }
331 }
332 builder = builder.with_metadata(metadata);
333 }
334
335 builder.build().map_err(SdkError::TraceExporter)?
336 }
337 Protocol::HttpBinary => {
338 let endpoint = config.signal_endpoint("/v1/traces");
339 let mut builder = opentelemetry_otlp::SpanExporter::builder()
340 .with_http()
341 .with_endpoint(&endpoint)
342 .with_timeout(config.endpoint.timeout)
343 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
344
345 if !config.endpoint.headers.is_empty() {
346 builder = builder.with_headers(config.endpoint.headers.clone());
347 }
348
349 builder.build().map_err(SdkError::TraceExporter)?
350 }
351 Protocol::HttpJson => {
352 let endpoint = config.signal_endpoint("/v1/traces");
353 let mut builder = opentelemetry_otlp::SpanExporter::builder()
354 .with_http()
355 .with_endpoint(&endpoint)
356 .with_timeout(config.endpoint.timeout)
357 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
358
359 if !config.endpoint.headers.is_empty() {
360 builder = builder.with_headers(config.endpoint.headers.clone());
361 }
362
363 builder.build().map_err(SdkError::TraceExporter)?
364 }
365 };
366
367 let batch_config = TraceBatchConfigBuilder::default()
368 .with_max_queue_size(config.traces.batch.max_queue_size)
369 .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
370 .with_scheduled_delay(config.traces.batch.scheduled_delay)
371 .build();
372
373 let span_processor = BatchSpanProcessor::builder(exporter)
374 .with_batch_config(batch_config)
375 .build();
376
377 Ok(SdkTracerProvider::builder()
378 .with_span_processor(span_processor)
379 .with_resource(resource)
380 .build())
381}
382
383fn build_meter_provider(
384 config: &OtelSdkConfig,
385 resource: Resource,
386) -> Result<SdkMeterProvider, SdkError> {
387 let exporter = match config.endpoint.protocol {
388 Protocol::Grpc => {
389 let endpoint = config.effective_endpoint();
390 let mut builder = opentelemetry_otlp::MetricExporter::builder()
391 .with_tonic()
392 .with_endpoint(&endpoint)
393 .with_timeout(config.endpoint.timeout);
394
395 if !config.endpoint.headers.is_empty() {
396 let mut metadata = tonic::metadata::MetadataMap::new();
397 for (key, value) in &config.endpoint.headers {
398 if let (Ok(k), Ok(v)) = (
399 key.parse::<MetadataKey<_>>(),
400 value.parse::<MetadataValue<_>>(),
401 ) {
402 metadata.insert(k, v);
403 }
404 }
405 builder = builder.with_metadata(metadata);
406 }
407
408 builder.build().map_err(SdkError::MetricExporter)?
409 }
410 Protocol::HttpBinary => {
411 let endpoint = config.signal_endpoint("/v1/metrics");
412 let mut builder = opentelemetry_otlp::MetricExporter::builder()
413 .with_http()
414 .with_endpoint(&endpoint)
415 .with_timeout(config.endpoint.timeout)
416 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
417
418 if !config.endpoint.headers.is_empty() {
419 builder = builder.with_headers(config.endpoint.headers.clone());
420 }
421
422 builder.build().map_err(SdkError::MetricExporter)?
423 }
424 Protocol::HttpJson => {
425 let endpoint = config.signal_endpoint("/v1/metrics");
426 let mut builder = opentelemetry_otlp::MetricExporter::builder()
427 .with_http()
428 .with_endpoint(&endpoint)
429 .with_timeout(config.endpoint.timeout)
430 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
431
432 if !config.endpoint.headers.is_empty() {
433 builder = builder.with_headers(config.endpoint.headers.clone());
434 }
435
436 builder.build().map_err(SdkError::MetricExporter)?
437 }
438 };
439
440 let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
441 .with_interval(config.metrics.batch.scheduled_delay)
442 .build();
443
444 Ok(SdkMeterProvider::builder()
445 .with_reader(reader)
446 .with_resource(resource)
447 .build())
448}
449
450fn build_logger_provider(
451 config: &OtelSdkConfig,
452 resource: Resource,
453) -> Result<SdkLoggerProvider, SdkError> {
454 let exporter = match config.endpoint.protocol {
455 Protocol::Grpc => {
456 let endpoint = config.effective_endpoint();
457 let mut builder = opentelemetry_otlp::LogExporter::builder()
458 .with_tonic()
459 .with_endpoint(&endpoint)
460 .with_timeout(config.endpoint.timeout);
461
462 if !config.endpoint.headers.is_empty() {
463 let mut metadata = tonic::metadata::MetadataMap::new();
464 for (key, value) in &config.endpoint.headers {
465 if let (Ok(k), Ok(v)) = (
466 key.parse::<MetadataKey<_>>(),
467 value.parse::<MetadataValue<_>>(),
468 ) {
469 metadata.insert(k, v);
470 }
471 }
472 builder = builder.with_metadata(metadata);
473 }
474
475 builder.build().map_err(SdkError::LogExporter)?
476 }
477 Protocol::HttpBinary => {
478 let endpoint = config.signal_endpoint("/v1/logs");
479 let mut builder = opentelemetry_otlp::LogExporter::builder()
480 .with_http()
481 .with_endpoint(&endpoint)
482 .with_timeout(config.endpoint.timeout)
483 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
484
485 if !config.endpoint.headers.is_empty() {
486 builder = builder.with_headers(config.endpoint.headers.clone());
487 }
488
489 builder.build().map_err(SdkError::LogExporter)?
490 }
491 Protocol::HttpJson => {
492 let endpoint = config.signal_endpoint("/v1/logs");
493 let mut builder = opentelemetry_otlp::LogExporter::builder()
494 .with_http()
495 .with_endpoint(&endpoint)
496 .with_timeout(config.endpoint.timeout)
497 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
498
499 if !config.endpoint.headers.is_empty() {
500 builder = builder.with_headers(config.endpoint.headers.clone());
501 }
502
503 builder.build().map_err(SdkError::LogExporter)?
504 }
505 };
506
507 let batch_config = LogBatchConfigBuilder::default()
508 .with_max_queue_size(config.logs.batch.max_queue_size)
509 .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
510 .with_scheduled_delay(config.logs.batch.scheduled_delay)
511 .build();
512
513 let log_processor = BatchLogProcessor::builder(exporter)
514 .with_batch_config(batch_config)
515 .build();
516
517 Ok(SdkLoggerProvider::builder()
518 .with_log_processor(log_processor)
519 .with_resource(resource)
520 .build())
521}
522
523fn init_subscriber(
524 tracer_provider: &Option<SdkTracerProvider>,
525 logger_provider: &Option<SdkLoggerProvider>,
526 scope_name: String,
527) -> Result<(), SdkError> {
528 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
529
530 let fmt_layer = tracing_subscriber::fmt::layer()
531 .with_target(true)
532 .without_time();
533
534 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
535
536 match (tracer_provider, logger_provider) {
537 (Some(tp), Some(lp)) => {
538 let tracer = tp.tracer(scope_name);
539 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
540 let log_layer = OpenTelemetryTracingBridge::new(lp);
541 registry.with(telemetry_layer).with(log_layer).try_init()?;
542 }
543 (Some(tp), None) => {
544 let tracer = tp.tracer(scope_name);
545 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
546 registry.with(telemetry_layer).try_init()?;
547 }
548 (None, Some(lp)) => {
549 let log_layer = OpenTelemetryTracingBridge::new(lp);
550 registry.with(log_layer).try_init()?;
551 }
552 (None, None) => {
553 registry.try_init()?;
554 }
555 }
556
557 Ok(())
558}