opentelemetry_configuration/
guard.rs1use crate::config::{OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::fallback::ExportFallback;
10use opentelemetry::KeyValue;
11use opentelemetry::propagation::TextMapCompositePropagator;
12use opentelemetry::trace::TracerProvider as _;
13use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
14use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
15use opentelemetry_sdk::Resource;
16use opentelemetry_sdk::logs::{
17 BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
18};
19use opentelemetry_sdk::metrics::SdkMeterProvider;
20use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
21use opentelemetry_sdk::trace::{
22 BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
23};
24use tonic::metadata::{MetadataKey, MetadataValue};
25use tracing_subscriber::EnvFilter;
26use tracing_subscriber::layer::SubscriberExt;
27use tracing_subscriber::util::SubscriberInitExt;
28
29pub struct OtelGuard {
52 tracer_provider: Option<SdkTracerProvider>,
53 meter_provider: Option<SdkMeterProvider>,
54 logger_provider: Option<SdkLoggerProvider>,
55 #[allow(dead_code)]
56 fallback: ExportFallback,
57}
58
59impl OtelGuard {
60 pub(crate) fn from_config(
64 config: OtelSdkConfig,
65 fallback: ExportFallback,
66 custom_resource: Option<Resource>,
67 ) -> Result<Self, SdkError> {
68 let resource = custom_resource.unwrap_or_else(|| build_resource(&config));
69
70 let tracer_provider = if config.traces.enabled {
71 Some(build_tracer_provider(&config, resource.clone())?)
72 } else {
73 None
74 };
75
76 let meter_provider = if config.metrics.enabled {
77 Some(build_meter_provider(&config, resource.clone())?)
78 } else {
79 None
80 };
81
82 let logger_provider = if config.logs.enabled {
83 Some(build_logger_provider(&config, resource)?)
84 } else {
85 None
86 };
87
88 if let Some(ref provider) = tracer_provider {
90 opentelemetry::global::set_tracer_provider(provider.clone());
91 }
92 if let Some(ref provider) = meter_provider {
93 opentelemetry::global::set_meter_provider(provider.clone());
94 }
95
96 let propagator = TextMapCompositePropagator::new(vec![
98 Box::new(TraceContextPropagator::new()),
99 Box::new(BaggagePropagator::new()),
100 ]);
101 opentelemetry::global::set_text_map_propagator(propagator);
102
103 if config.init_tracing_subscriber {
105 init_subscriber(&tracer_provider, &logger_provider)?;
106 }
107
108 Ok(Self {
109 tracer_provider,
110 meter_provider,
111 logger_provider,
112 fallback,
113 })
114 }
115
116 pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
118 self.tracer_provider.as_ref()
119 }
120
121 pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
123 self.meter_provider.as_ref()
124 }
125
126 pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
128 self.logger_provider.as_ref()
129 }
130
131 pub fn flush(&self) {
140 if let Some(provider) = &self.tracer_provider
141 && let Err(e) = provider.force_flush()
142 {
143 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
144 }
145
146 if let Some(provider) = &self.meter_provider
147 && let Err(e) = provider.force_flush()
148 {
149 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
150 }
151
152 if let Some(provider) = &self.logger_provider
153 && let Err(e) = provider.force_flush()
154 {
155 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
156 }
157 }
158
159 pub fn shutdown(mut self) -> Result<(), SdkError> {
168 if let Some(provider) = self.tracer_provider.take() {
169 provider.force_flush().map_err(SdkError::Flush)?;
170 provider.shutdown().map_err(SdkError::Shutdown)?;
171 }
172
173 if let Some(provider) = self.logger_provider.take() {
174 provider.force_flush().map_err(SdkError::Flush)?;
175 provider.shutdown().map_err(SdkError::Shutdown)?;
176 }
177
178 if let Some(provider) = self.meter_provider.take() {
179 provider.force_flush().map_err(SdkError::Flush)?;
180 provider.shutdown().map_err(SdkError::Shutdown)?;
181 }
182
183 Ok(())
184 }
185}
186
187impl Drop for OtelGuard {
188 fn drop(&mut self) {
189 if let Some(provider) = self.tracer_provider.take() {
190 let _ = provider.force_flush();
191 if let Err(e) = provider.shutdown() {
192 eprintln!("Error shutting down tracer provider: {e}");
193 }
194 }
195
196 if let Some(provider) = self.logger_provider.take() {
197 let _ = provider.force_flush();
198 if let Err(e) = provider.shutdown() {
199 eprintln!("Error shutting down logger provider: {e}");
200 }
201 }
202
203 if let Some(provider) = self.meter_provider.take() {
204 let _ = provider.force_flush();
205 if let Err(e) = provider.shutdown() {
206 eprintln!("Error shutting down meter provider: {e}");
207 }
208 }
209 }
210}
211
212fn build_resource(config: &OtelSdkConfig) -> Resource {
213 let mut attributes: Vec<KeyValue> = config
214 .resource
215 .attributes
216 .iter()
217 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
218 .collect();
219
220 if let Some(name) = &config.resource.service_name {
221 attributes.push(KeyValue::new("service.name", name.clone()));
222 }
223
224 if let Some(version) = &config.resource.service_version {
225 attributes.push(KeyValue::new("service.version", version.clone()));
226 }
227
228 if let Some(env) = &config.resource.deployment_environment {
229 attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
230 }
231
232 Resource::builder().with_attributes(attributes).build()
233}
234
235fn build_tracer_provider(
236 config: &OtelSdkConfig,
237 resource: Resource,
238) -> Result<SdkTracerProvider, SdkError> {
239 let exporter = match config.endpoint.protocol {
240 Protocol::Grpc => {
241 let endpoint = config.effective_endpoint();
242 let mut builder = opentelemetry_otlp::SpanExporter::builder()
243 .with_tonic()
244 .with_endpoint(&endpoint)
245 .with_timeout(config.endpoint.timeout);
246
247 if !config.endpoint.headers.is_empty() {
248 let mut metadata = tonic::metadata::MetadataMap::new();
249 for (key, value) in &config.endpoint.headers {
250 if let (Ok(k), Ok(v)) = (
251 key.parse::<MetadataKey<_>>(),
252 value.parse::<MetadataValue<_>>(),
253 ) {
254 metadata.insert(k, v);
255 }
256 }
257 builder = builder.with_metadata(metadata);
258 }
259
260 builder.build().map_err(SdkError::TraceExporter)?
261 }
262 Protocol::HttpBinary => {
263 let endpoint = config.signal_endpoint("/v1/traces");
264 let mut builder = opentelemetry_otlp::SpanExporter::builder()
265 .with_http()
266 .with_endpoint(&endpoint)
267 .with_timeout(config.endpoint.timeout)
268 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
269
270 if !config.endpoint.headers.is_empty() {
271 builder = builder.with_headers(config.endpoint.headers.clone());
272 }
273
274 builder.build().map_err(SdkError::TraceExporter)?
275 }
276 Protocol::HttpJson => {
277 let endpoint = config.signal_endpoint("/v1/traces");
278 let mut builder = opentelemetry_otlp::SpanExporter::builder()
279 .with_http()
280 .with_endpoint(&endpoint)
281 .with_timeout(config.endpoint.timeout)
282 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
283
284 if !config.endpoint.headers.is_empty() {
285 builder = builder.with_headers(config.endpoint.headers.clone());
286 }
287
288 builder.build().map_err(SdkError::TraceExporter)?
289 }
290 };
291
292 let batch_config = TraceBatchConfigBuilder::default()
293 .with_max_queue_size(config.traces.batch.max_queue_size)
294 .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
295 .with_scheduled_delay(config.traces.batch.scheduled_delay)
296 .build();
297
298 let span_processor = BatchSpanProcessor::builder(exporter)
299 .with_batch_config(batch_config)
300 .build();
301
302 Ok(SdkTracerProvider::builder()
303 .with_span_processor(span_processor)
304 .with_resource(resource)
305 .build())
306}
307
308fn build_meter_provider(
309 config: &OtelSdkConfig,
310 resource: Resource,
311) -> Result<SdkMeterProvider, SdkError> {
312 let exporter = match config.endpoint.protocol {
313 Protocol::Grpc => {
314 let endpoint = config.effective_endpoint();
315 let mut builder = opentelemetry_otlp::MetricExporter::builder()
316 .with_tonic()
317 .with_endpoint(&endpoint)
318 .with_timeout(config.endpoint.timeout);
319
320 if !config.endpoint.headers.is_empty() {
321 let mut metadata = tonic::metadata::MetadataMap::new();
322 for (key, value) in &config.endpoint.headers {
323 if let (Ok(k), Ok(v)) = (
324 key.parse::<MetadataKey<_>>(),
325 value.parse::<MetadataValue<_>>(),
326 ) {
327 metadata.insert(k, v);
328 }
329 }
330 builder = builder.with_metadata(metadata);
331 }
332
333 builder.build().map_err(SdkError::MetricExporter)?
334 }
335 Protocol::HttpBinary => {
336 let endpoint = config.signal_endpoint("/v1/metrics");
337 let mut builder = opentelemetry_otlp::MetricExporter::builder()
338 .with_http()
339 .with_endpoint(&endpoint)
340 .with_timeout(config.endpoint.timeout)
341 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
342
343 if !config.endpoint.headers.is_empty() {
344 builder = builder.with_headers(config.endpoint.headers.clone());
345 }
346
347 builder.build().map_err(SdkError::MetricExporter)?
348 }
349 Protocol::HttpJson => {
350 let endpoint = config.signal_endpoint("/v1/metrics");
351 let mut builder = opentelemetry_otlp::MetricExporter::builder()
352 .with_http()
353 .with_endpoint(&endpoint)
354 .with_timeout(config.endpoint.timeout)
355 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
356
357 if !config.endpoint.headers.is_empty() {
358 builder = builder.with_headers(config.endpoint.headers.clone());
359 }
360
361 builder.build().map_err(SdkError::MetricExporter)?
362 }
363 };
364
365 let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
366 .with_interval(config.metrics.batch.scheduled_delay)
367 .build();
368
369 Ok(SdkMeterProvider::builder()
370 .with_reader(reader)
371 .with_resource(resource)
372 .build())
373}
374
375fn build_logger_provider(
376 config: &OtelSdkConfig,
377 resource: Resource,
378) -> Result<SdkLoggerProvider, SdkError> {
379 let exporter = match config.endpoint.protocol {
380 Protocol::Grpc => {
381 let endpoint = config.effective_endpoint();
382 let mut builder = opentelemetry_otlp::LogExporter::builder()
383 .with_tonic()
384 .with_endpoint(&endpoint)
385 .with_timeout(config.endpoint.timeout);
386
387 if !config.endpoint.headers.is_empty() {
388 let mut metadata = tonic::metadata::MetadataMap::new();
389 for (key, value) in &config.endpoint.headers {
390 if let (Ok(k), Ok(v)) = (
391 key.parse::<MetadataKey<_>>(),
392 value.parse::<MetadataValue<_>>(),
393 ) {
394 metadata.insert(k, v);
395 }
396 }
397 builder = builder.with_metadata(metadata);
398 }
399
400 builder.build().map_err(SdkError::LogExporter)?
401 }
402 Protocol::HttpBinary => {
403 let endpoint = config.signal_endpoint("/v1/logs");
404 let mut builder = opentelemetry_otlp::LogExporter::builder()
405 .with_http()
406 .with_endpoint(&endpoint)
407 .with_timeout(config.endpoint.timeout)
408 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
409
410 if !config.endpoint.headers.is_empty() {
411 builder = builder.with_headers(config.endpoint.headers.clone());
412 }
413
414 builder.build().map_err(SdkError::LogExporter)?
415 }
416 Protocol::HttpJson => {
417 let endpoint = config.signal_endpoint("/v1/logs");
418 let mut builder = opentelemetry_otlp::LogExporter::builder()
419 .with_http()
420 .with_endpoint(&endpoint)
421 .with_timeout(config.endpoint.timeout)
422 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
423
424 if !config.endpoint.headers.is_empty() {
425 builder = builder.with_headers(config.endpoint.headers.clone());
426 }
427
428 builder.build().map_err(SdkError::LogExporter)?
429 }
430 };
431
432 let batch_config = LogBatchConfigBuilder::default()
433 .with_max_queue_size(config.logs.batch.max_queue_size)
434 .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
435 .with_scheduled_delay(config.logs.batch.scheduled_delay)
436 .build();
437
438 let log_processor = BatchLogProcessor::builder(exporter)
439 .with_batch_config(batch_config)
440 .build();
441
442 Ok(SdkLoggerProvider::builder()
443 .with_log_processor(log_processor)
444 .with_resource(resource)
445 .build())
446}
447
448fn init_subscriber(
449 tracer_provider: &Option<SdkTracerProvider>,
450 logger_provider: &Option<SdkLoggerProvider>,
451) -> Result<(), SdkError> {
452 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
453
454 let fmt_layer = tracing_subscriber::fmt::layer()
455 .with_target(true)
456 .without_time();
457
458 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
459
460 match (tracer_provider, logger_provider) {
461 (Some(tp), Some(lp)) => {
462 let tracer = tp.tracer("lambda-otel-extension");
463 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
464 let log_layer = OpenTelemetryTracingBridge::new(lp);
465 registry.with(telemetry_layer).with(log_layer).try_init()?;
466 }
467 (Some(tp), None) => {
468 let tracer = tp.tracer("lambda-otel-extension");
469 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
470 registry.with(telemetry_layer).try_init()?;
471 }
472 (None, Some(lp)) => {
473 let log_layer = OpenTelemetryTracingBridge::new(lp);
474 registry.with(log_layer).try_init()?;
475 }
476 (None, None) => {
477 registry.try_init()?;
478 }
479 }
480
481 Ok(())
482}