opentelemetry_configuration/
guard.rs1use crate::config::{OtelSdkConfig, Protocol};
8use crate::error::SdkError;
9use crate::fallback::ExportFallback;
10use opentelemetry::KeyValue;
11use opentelemetry::trace::TracerProvider as _;
12use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
13use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig};
14use opentelemetry_sdk::Resource;
15use opentelemetry_sdk::logs::{
16 BatchConfigBuilder as LogBatchConfigBuilder, BatchLogProcessor, SdkLoggerProvider,
17};
18use opentelemetry_sdk::metrics::SdkMeterProvider;
19use opentelemetry_sdk::trace::{
20 BatchConfigBuilder as TraceBatchConfigBuilder, BatchSpanProcessor, SdkTracerProvider,
21};
22use tonic::metadata::{MetadataKey, MetadataValue};
23use tracing_subscriber::EnvFilter;
24use tracing_subscriber::layer::SubscriberExt;
25use tracing_subscriber::util::SubscriberInitExt;
26
27pub struct OtelGuard {
50 tracer_provider: Option<SdkTracerProvider>,
51 meter_provider: Option<SdkMeterProvider>,
52 logger_provider: Option<SdkLoggerProvider>,
53 #[allow(dead_code)]
54 fallback: ExportFallback,
55}
56
57impl OtelGuard {
58 pub(crate) fn from_config(
62 config: OtelSdkConfig,
63 fallback: ExportFallback,
64 custom_resource: Option<Resource>,
65 ) -> Result<Self, SdkError> {
66 let resource = custom_resource.unwrap_or_else(|| build_resource(&config));
67
68 let tracer_provider = if config.traces.enabled {
69 Some(build_tracer_provider(&config, resource.clone())?)
70 } else {
71 None
72 };
73
74 let meter_provider = if config.metrics.enabled {
75 Some(build_meter_provider(&config, resource.clone())?)
76 } else {
77 None
78 };
79
80 let logger_provider = if config.logs.enabled {
81 Some(build_logger_provider(&config, resource)?)
82 } else {
83 None
84 };
85
86 if let Some(ref provider) = tracer_provider {
88 opentelemetry::global::set_tracer_provider(provider.clone());
89 }
90 if let Some(ref provider) = meter_provider {
91 opentelemetry::global::set_meter_provider(provider.clone());
92 }
93
94 if config.init_tracing_subscriber {
96 init_subscriber(&tracer_provider, &logger_provider)?;
97 }
98
99 Ok(Self {
100 tracer_provider,
101 meter_provider,
102 logger_provider,
103 fallback,
104 })
105 }
106
107 pub fn tracer_provider(&self) -> Option<&SdkTracerProvider> {
109 self.tracer_provider.as_ref()
110 }
111
112 pub fn meter_provider(&self) -> Option<&SdkMeterProvider> {
114 self.meter_provider.as_ref()
115 }
116
117 pub fn logger_provider(&self) -> Option<&SdkLoggerProvider> {
119 self.logger_provider.as_ref()
120 }
121
122 pub fn flush(&self) {
131 if let Some(provider) = &self.tracer_provider
132 && let Err(e) = provider.force_flush()
133 {
134 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush tracer provider");
135 }
136
137 if let Some(provider) = &self.meter_provider
138 && let Err(e) = provider.force_flush()
139 {
140 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush meter provider");
141 }
142
143 if let Some(provider) = &self.logger_provider
144 && let Err(e) = provider.force_flush()
145 {
146 tracing::warn!(target: "otel_lifecycle", error = %e, "Failed to flush logger provider");
147 }
148 }
149
150 pub fn shutdown(mut self) -> Result<(), SdkError> {
159 if let Some(provider) = self.tracer_provider.take() {
160 provider.force_flush().map_err(SdkError::Flush)?;
161 provider.shutdown().map_err(SdkError::Shutdown)?;
162 }
163
164 if let Some(provider) = self.logger_provider.take() {
165 provider.force_flush().map_err(SdkError::Flush)?;
166 provider.shutdown().map_err(SdkError::Shutdown)?;
167 }
168
169 if let Some(provider) = self.meter_provider.take() {
170 provider.force_flush().map_err(SdkError::Flush)?;
171 provider.shutdown().map_err(SdkError::Shutdown)?;
172 }
173
174 Ok(())
175 }
176}
177
178impl Drop for OtelGuard {
179 fn drop(&mut self) {
180 if let Some(provider) = self.tracer_provider.take() {
181 let _ = provider.force_flush();
182 if let Err(e) = provider.shutdown() {
183 eprintln!("Error shutting down tracer provider: {e}");
184 }
185 }
186
187 if let Some(provider) = self.logger_provider.take() {
188 let _ = provider.force_flush();
189 if let Err(e) = provider.shutdown() {
190 eprintln!("Error shutting down logger provider: {e}");
191 }
192 }
193
194 if let Some(provider) = self.meter_provider.take() {
195 let _ = provider.force_flush();
196 if let Err(e) = provider.shutdown() {
197 eprintln!("Error shutting down meter provider: {e}");
198 }
199 }
200 }
201}
202
203fn build_resource(config: &OtelSdkConfig) -> Resource {
204 let mut attributes: Vec<KeyValue> = config
205 .resource
206 .attributes
207 .iter()
208 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
209 .collect();
210
211 if let Some(name) = &config.resource.service_name {
212 attributes.push(KeyValue::new("service.name", name.clone()));
213 }
214
215 if let Some(version) = &config.resource.service_version {
216 attributes.push(KeyValue::new("service.version", version.clone()));
217 }
218
219 if let Some(env) = &config.resource.deployment_environment {
220 attributes.push(KeyValue::new("deployment.environment.name", env.clone()));
221 }
222
223 Resource::builder().with_attributes(attributes).build()
224}
225
226fn build_tracer_provider(
227 config: &OtelSdkConfig,
228 resource: Resource,
229) -> Result<SdkTracerProvider, SdkError> {
230 let exporter = match config.endpoint.protocol {
231 Protocol::Grpc => {
232 let endpoint = config.effective_endpoint();
233 let mut builder = opentelemetry_otlp::SpanExporter::builder()
234 .with_tonic()
235 .with_endpoint(&endpoint)
236 .with_timeout(config.endpoint.timeout);
237
238 if !config.endpoint.headers.is_empty() {
239 let mut metadata = tonic::metadata::MetadataMap::new();
240 for (key, value) in &config.endpoint.headers {
241 if let (Ok(k), Ok(v)) = (
242 key.parse::<MetadataKey<_>>(),
243 value.parse::<MetadataValue<_>>(),
244 ) {
245 metadata.insert(k, v);
246 }
247 }
248 builder = builder.with_metadata(metadata);
249 }
250
251 builder.build().map_err(SdkError::TraceExporter)?
252 }
253 Protocol::HttpBinary => {
254 let endpoint = config.signal_endpoint("/v1/traces");
255 let mut builder = opentelemetry_otlp::SpanExporter::builder()
256 .with_http()
257 .with_endpoint(&endpoint)
258 .with_timeout(config.endpoint.timeout)
259 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
260
261 if !config.endpoint.headers.is_empty() {
262 builder = builder.with_headers(config.endpoint.headers.clone());
263 }
264
265 builder.build().map_err(SdkError::TraceExporter)?
266 }
267 Protocol::HttpJson => {
268 let endpoint = config.signal_endpoint("/v1/traces");
269 let mut builder = opentelemetry_otlp::SpanExporter::builder()
270 .with_http()
271 .with_endpoint(&endpoint)
272 .with_timeout(config.endpoint.timeout)
273 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
274
275 if !config.endpoint.headers.is_empty() {
276 builder = builder.with_headers(config.endpoint.headers.clone());
277 }
278
279 builder.build().map_err(SdkError::TraceExporter)?
280 }
281 };
282
283 let batch_config = TraceBatchConfigBuilder::default()
284 .with_max_queue_size(config.traces.batch.max_queue_size)
285 .with_max_export_batch_size(config.traces.batch.max_export_batch_size)
286 .with_scheduled_delay(config.traces.batch.scheduled_delay)
287 .build();
288
289 let span_processor = BatchSpanProcessor::builder(exporter)
290 .with_batch_config(batch_config)
291 .build();
292
293 Ok(SdkTracerProvider::builder()
294 .with_span_processor(span_processor)
295 .with_resource(resource)
296 .build())
297}
298
299fn build_meter_provider(
300 config: &OtelSdkConfig,
301 resource: Resource,
302) -> Result<SdkMeterProvider, SdkError> {
303 let exporter = match config.endpoint.protocol {
304 Protocol::Grpc => {
305 let endpoint = config.effective_endpoint();
306 let mut builder = opentelemetry_otlp::MetricExporter::builder()
307 .with_tonic()
308 .with_endpoint(&endpoint)
309 .with_timeout(config.endpoint.timeout);
310
311 if !config.endpoint.headers.is_empty() {
312 let mut metadata = tonic::metadata::MetadataMap::new();
313 for (key, value) in &config.endpoint.headers {
314 if let (Ok(k), Ok(v)) = (
315 key.parse::<MetadataKey<_>>(),
316 value.parse::<MetadataValue<_>>(),
317 ) {
318 metadata.insert(k, v);
319 }
320 }
321 builder = builder.with_metadata(metadata);
322 }
323
324 builder.build().map_err(SdkError::MetricExporter)?
325 }
326 Protocol::HttpBinary => {
327 let endpoint = config.signal_endpoint("/v1/metrics");
328 let mut builder = opentelemetry_otlp::MetricExporter::builder()
329 .with_http()
330 .with_endpoint(&endpoint)
331 .with_timeout(config.endpoint.timeout)
332 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
333
334 if !config.endpoint.headers.is_empty() {
335 builder = builder.with_headers(config.endpoint.headers.clone());
336 }
337
338 builder.build().map_err(SdkError::MetricExporter)?
339 }
340 Protocol::HttpJson => {
341 let endpoint = config.signal_endpoint("/v1/metrics");
342 let mut builder = opentelemetry_otlp::MetricExporter::builder()
343 .with_http()
344 .with_endpoint(&endpoint)
345 .with_timeout(config.endpoint.timeout)
346 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
347
348 if !config.endpoint.headers.is_empty() {
349 builder = builder.with_headers(config.endpoint.headers.clone());
350 }
351
352 builder.build().map_err(SdkError::MetricExporter)?
353 }
354 };
355
356 let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
357 .with_interval(config.metrics.batch.scheduled_delay)
358 .build();
359
360 Ok(SdkMeterProvider::builder()
361 .with_reader(reader)
362 .with_resource(resource)
363 .build())
364}
365
366fn build_logger_provider(
367 config: &OtelSdkConfig,
368 resource: Resource,
369) -> Result<SdkLoggerProvider, SdkError> {
370 let exporter = match config.endpoint.protocol {
371 Protocol::Grpc => {
372 let endpoint = config.effective_endpoint();
373 let mut builder = opentelemetry_otlp::LogExporter::builder()
374 .with_tonic()
375 .with_endpoint(&endpoint)
376 .with_timeout(config.endpoint.timeout);
377
378 if !config.endpoint.headers.is_empty() {
379 let mut metadata = tonic::metadata::MetadataMap::new();
380 for (key, value) in &config.endpoint.headers {
381 if let (Ok(k), Ok(v)) = (
382 key.parse::<MetadataKey<_>>(),
383 value.parse::<MetadataValue<_>>(),
384 ) {
385 metadata.insert(k, v);
386 }
387 }
388 builder = builder.with_metadata(metadata);
389 }
390
391 builder.build().map_err(SdkError::LogExporter)?
392 }
393 Protocol::HttpBinary => {
394 let endpoint = config.signal_endpoint("/v1/logs");
395 let mut builder = opentelemetry_otlp::LogExporter::builder()
396 .with_http()
397 .with_endpoint(&endpoint)
398 .with_timeout(config.endpoint.timeout)
399 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary);
400
401 if !config.endpoint.headers.is_empty() {
402 builder = builder.with_headers(config.endpoint.headers.clone());
403 }
404
405 builder.build().map_err(SdkError::LogExporter)?
406 }
407 Protocol::HttpJson => {
408 let endpoint = config.signal_endpoint("/v1/logs");
409 let mut builder = opentelemetry_otlp::LogExporter::builder()
410 .with_http()
411 .with_endpoint(&endpoint)
412 .with_timeout(config.endpoint.timeout)
413 .with_protocol(opentelemetry_otlp::Protocol::HttpJson);
414
415 if !config.endpoint.headers.is_empty() {
416 builder = builder.with_headers(config.endpoint.headers.clone());
417 }
418
419 builder.build().map_err(SdkError::LogExporter)?
420 }
421 };
422
423 let batch_config = LogBatchConfigBuilder::default()
424 .with_max_queue_size(config.logs.batch.max_queue_size)
425 .with_max_export_batch_size(config.logs.batch.max_export_batch_size)
426 .with_scheduled_delay(config.logs.batch.scheduled_delay)
427 .build();
428
429 let log_processor = BatchLogProcessor::builder(exporter)
430 .with_batch_config(batch_config)
431 .build();
432
433 Ok(SdkLoggerProvider::builder()
434 .with_log_processor(log_processor)
435 .with_resource(resource)
436 .build())
437}
438
439fn init_subscriber(
440 tracer_provider: &Option<SdkTracerProvider>,
441 logger_provider: &Option<SdkLoggerProvider>,
442) -> Result<(), SdkError> {
443 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
444
445 let fmt_layer = tracing_subscriber::fmt::layer()
446 .with_target(true)
447 .without_time();
448
449 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
450
451 match (tracer_provider, logger_provider) {
452 (Some(tp), Some(lp)) => {
453 let tracer = tp.tracer("lambda-otel-extension");
454 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
455 let log_layer = OpenTelemetryTracingBridge::new(lp);
456 registry.with(telemetry_layer).with(log_layer).try_init()?;
457 }
458 (Some(tp), None) => {
459 let tracer = tp.tracer("lambda-otel-extension");
460 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
461 registry.with(telemetry_layer).try_init()?;
462 }
463 (None, Some(lp)) => {
464 let log_layer = OpenTelemetryTracingBridge::new(lp);
465 registry.with(log_layer).try_init()?;
466 }
467 (None, None) => {
468 registry.try_init()?;
469 }
470 }
471
472 Ok(())
473}