lambda_otel_lite/telemetry.rs
1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//! let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//! Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//! let resource = Resource::builder()
47//! .with_attributes(vec![
48//! KeyValue::new("service.version", "1.0.0"),
49//! KeyValue::new("deployment.environment", "production"),
50//! ])
51//! .build();
52//!
53//! let config = TelemetryConfig::builder()
54//! .resource(resource)
55//! .build();
56//!
57//! let (_, completion_handler) = init_telemetry(config).await?;
58//! Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//! let config = TelemetryConfig::builder()
72//! .with_span_processor(SimpleSpanProcessor::new(
73//! OtlpStdoutSpanExporter::default()
74//! ))
75//! .enable_fmt_layer(true)
76//! .build();
77//!
78//! let (_, completion_handler) = init_telemetry(config).await?;
79//! Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95 constants, extension::register_extension, logger::Logger, mode::ProcessorMode,
96 processor::LambdaSpanProcessor, propagation::LambdaXrayPropagator,
97 resource::get_lambda_resource,
98};
99use bon::Builder;
100use lambda_runtime::Error;
101use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
102use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
103use opentelemetry_aws::trace::XrayPropagator;
104use opentelemetry_sdk::{
105 propagation::TraceContextPropagator,
106 trace::{IdGenerator, SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
107 Resource,
108};
109use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
110use std::{borrow::Cow, env, sync::Arc};
111use tokio::sync::mpsc::UnboundedSender;
112use tracing_subscriber::layer::SubscriberExt;
113
114// Add module-specific logger
115static LOGGER: Logger = Logger::const_new("telemetry");
116
117/// Manages the lifecycle of span export based on the processing mode.
118///
119/// This handler must be used to signal when spans should be exported. Its behavior
120/// varies by processing mode:
121/// - Sync: Forces immediate export
122/// - Async: Signals the extension to export
123/// - Finalize: Defers to span processor
124///
125/// # Thread Safety
126///
127/// This type is `Clone` and can be safely shared between threads.
128#[derive(Clone)]
129pub struct TelemetryCompletionHandler {
130 provider: Arc<SdkTracerProvider>,
131 sender: Option<UnboundedSender<()>>,
132 mode: ProcessorMode,
133 tracer: opentelemetry_sdk::trace::Tracer,
134}
135
136impl TelemetryCompletionHandler {
137 pub fn new(
138 provider: Arc<SdkTracerProvider>,
139 sender: Option<UnboundedSender<()>>,
140 mode: ProcessorMode,
141 ) -> Self {
142 // Create instrumentation scope with attributes
143 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
144 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
145 .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
146 .with_attributes(vec![
147 KeyValue::new("library.language", "rust"),
148 KeyValue::new("library.type", "instrumentation"),
149 KeyValue::new("library.runtime", "aws_lambda"),
150 ])
151 .build();
152
153 // Create tracer with instrumentation scope
154 let tracer = provider.tracer_with_scope(scope);
155
156 Self {
157 provider,
158 sender,
159 mode,
160 tracer,
161 }
162 }
163
164 /// Get the tracer instance for creating spans.
165 ///
166 /// Returns the cached tracer instance configured with this package's instrumentation scope.
167 /// The tracer is configured with the provider's settings and will automatically use
168 /// the correct span processor based on the processing mode.
169 pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
170 &self.tracer
171 }
172
173 /// Complete telemetry processing for the current invocation
174 ///
175 /// In Sync mode, this will force flush the provider and log any errors that occur.
176 /// In Async mode, this will send a completion signal to the extension.
177 /// In Finalize mode, this will do nothing (handled by drop).
178 pub fn complete(&self) {
179 match self.mode {
180 ProcessorMode::Sync => {
181 if let Err(e) = self.provider.force_flush() {
182 LOGGER.warn(format!("Error flushing telemetry: {:?}", e));
183 }
184 }
185 ProcessorMode::Async => {
186 if let Some(sender) = &self.sender {
187 if let Err(e) = sender.send(()) {
188 LOGGER.warn(format!(
189 "Failed to send completion signal to extension: {:?}",
190 e
191 ));
192 }
193 }
194 }
195 ProcessorMode::Finalize => {
196 // Do nothing, handled by drop
197 }
198 }
199 }
200}
201
202/// Configuration for OpenTelemetry initialization.
203///
204/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
205/// for standard Lambda configuration, or the builder pattern for customization.
206///
207/// # Fields
208///
209/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
210/// * `set_global_provider` - Set as global tracer provider (default: true)
211/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
212/// * `env_var_name` - Environment variable name for log level configuration
213/// * `id_generator` - Custom ID generator for trace and span IDs
214/// * `processor_mode` - Span processing mode (sync/async/finalize)
215///
216/// # Examples
217///
218/// Basic usage with default configuration:
219///
220/// ```no_run
221/// use lambda_otel_lite::telemetry::TelemetryConfig;
222///
223/// let config = TelemetryConfig::default();
224/// ```
225///
226/// Custom configuration with resource attributes:
227///
228/// ```no_run
229/// use lambda_otel_lite::telemetry::TelemetryConfig;
230/// use opentelemetry::KeyValue;
231/// use opentelemetry_sdk::Resource;
232///
233/// let config = TelemetryConfig::builder()
234/// .resource(Resource::builder()
235/// .with_attributes(vec![KeyValue::new("version", "1.0.0")])
236/// .build())
237/// .build();
238/// ```
239///
240/// Custom configuration with logging options:
241///
242/// ```no_run
243/// use lambda_otel_lite::telemetry::TelemetryConfig;
244///
245/// let config = TelemetryConfig::builder()
246/// .enable_fmt_layer(true) // Enable console output for debugging
247/// .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string()) // Custom env var for log level
248/// .build();
249/// ```
250#[derive(Builder, Debug)]
251pub struct TelemetryConfig {
252 // Custom fields for internal state
253 #[builder(field)]
254 provider_builder: TracerProviderBuilder,
255
256 #[builder(field)]
257 has_processor: bool,
258
259 #[builder(field)]
260 propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
261
262 /// Enable console output for debugging.
263 ///
264 /// When enabled, spans and events will be printed to the console in addition
265 /// to being exported through the configured span processors. This is useful
266 /// for debugging but adds overhead and should be disabled in production.
267 ///
268 /// This can also be controlled via the `LAMBDA_TRACING_ENABLE_FMT_LAYER` environment variable,
269 /// which takes precedence over this setting when present:
270 /// - Setting the env var to "true" will enable console output even if this field is false
271 /// - Setting the env var to "false" will disable console output even if this field is true
272 /// - Invalid values will log a warning and fall back to this code setting
273 ///
274 /// This environment variable override allows toggling logging for debugging without code changes.
275 ///
276 /// Default: `false`
277 #[builder(default = false)]
278 pub enable_fmt_layer: bool,
279
280 /// Set this provider as the global OpenTelemetry provider.
281 ///
282 /// When enabled, the provider will be registered as the global provider
283 /// for the OpenTelemetry API. This allows using the global tracer API
284 /// without explicitly passing around the provider.
285 ///
286 /// Default: `true`
287 #[builder(default = true)]
288 pub set_global_provider: bool,
289
290 /// Custom resource attributes for all spans.
291 ///
292 /// If not provided, resource attributes will be automatically detected
293 /// from the Lambda environment. Custom resources will override any
294 /// automatically detected attributes with the same keys.
295 ///
296 /// Default: `None` (auto-detected from Lambda environment)
297 pub resource: Option<Resource>,
298
299 /// Environment variable name to use for log level configuration.
300 ///
301 /// This field specifies which environment variable should be used to configure
302 /// the tracing subscriber's log level filter. If not specified, the system will
303 /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
304 ///
305 /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
306 pub env_var_name: Option<String>,
307
308 /// Span processing mode (sync/async/finalize)
309 ///
310 /// Controls how spans are exported from the processor. This can be overridden by the
311 /// LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE environment variable, which takes precedence.
312 ///
313 /// Default: `None` (uses environment variable or defaults to `ProcessorMode::Sync`)
314 pub processor_mode: Option<ProcessorMode>,
315}
316
317impl Default for TelemetryConfig {
318 fn default() -> Self {
319 Self::builder().build()
320 }
321}
322
323/// Builder methods for adding span processors and other configuration
324impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
325 /// Add a span processor to the tracer provider.
326 ///
327 /// This method allows adding custom span processors for trace data processing.
328 /// Multiple processors can be added by calling this method multiple times.
329 ///
330 /// # Arguments
331 ///
332 /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
333 ///
334 /// # Examples
335 ///
336 /// ```no_run
337 /// use lambda_otel_lite::TelemetryConfig;
338 /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
339 /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
340 ///
341 /// // Only use builder when adding custom processors
342 /// let config = TelemetryConfig::builder()
343 /// .with_span_processor(SimpleSpanProcessor::new(
344 /// OtlpStdoutSpanExporter::default()
345 /// ))
346 /// .build();
347 /// ```
348 pub fn with_span_processor<T>(mut self, processor: T) -> Self
349 where
350 T: SpanProcessor + 'static,
351 {
352 self.provider_builder = self.provider_builder.with_span_processor(processor);
353 self.has_processor = true;
354 self
355 }
356
357 /// Add a propagator to the list of propagators.
358 ///
359 /// Multiple propagators can be added and will be combined into a composite propagator.
360 /// The default propagator is TraceContextPropagator.
361 ///
362 /// # Arguments
363 ///
364 /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use lambda_otel_lite::TelemetryConfig;
370 /// use opentelemetry_sdk::propagation::BaggagePropagator;
371 ///
372 /// let config = TelemetryConfig::builder()
373 /// .with_propagator(BaggagePropagator::new())
374 /// .build();
375 /// ```
376 pub fn with_propagator<T>(mut self, propagator: T) -> Self
377 where
378 T: TextMapPropagator + Send + Sync + 'static,
379 {
380 self.propagators.push(Box::new(propagator));
381 self
382 }
383
384 pub fn with_named_propagator(self, name: &str) -> Self {
385 match name {
386 "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
387 "xray" => self.with_propagator(XrayPropagator::new()),
388 "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
389 "none" => self.with_propagator(NoopPropagator::new()),
390 _ => {
391 LOGGER.warn(format!(
392 "Unknown propagator: {}, using default propagators",
393 name
394 ));
395 self
396 }
397 }
398 }
399
400 /// Add a custom ID generator to the tracer provider.
401 ///
402 /// This method allows setting a custom ID generator for trace and span IDs.
403 /// This is particularly useful when integrating with AWS X-Ray, which requires
404 /// a specific ID format.
405 ///
406 /// # Arguments
407 ///
408 /// * `id_generator` - An ID generator implementing the [`IdGenerator`] trait
409 ///
410 /// # Examples
411 ///
412 /// ```no_run
413 /// use lambda_otel_lite::TelemetryConfig;
414 /// use opentelemetry_aws::trace::XrayIdGenerator;
415 ///
416 /// // Configure with X-Ray compatible ID generator
417 /// let config = TelemetryConfig::builder()
418 /// .with_id_generator(XrayIdGenerator::default())
419 /// .build();
420 /// ```
421 pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
422 where
423 T: IdGenerator + 'static,
424 {
425 self.provider_builder = self.provider_builder.with_id_generator(id_generator);
426 self
427 }
428}
429
430/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
431///
432/// # Arguments
433///
434/// * `config` - Configuration for telemetry initialization
435///
436/// # Returns
437///
438/// Returns a tuple containing:
439/// - A tracer instance for manual instrumentation
440/// - A completion handler for managing span export timing
441///
442/// # Errors
443///
444/// Returns error if:
445/// - Extension registration fails (async/finalize modes)
446/// - Tracer provider initialization fails
447/// - Environment variable parsing fails
448///
449/// # Examples
450///
451/// Basic usage with default configuration:
452///
453/// ```no_run
454/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
455///
456/// # async fn example() -> Result<(), lambda_runtime::Error> {
457/// // Initialize with default configuration
458/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
459/// # Ok(())
460/// # }
461/// ```
462///
463/// Custom configuration:
464///
465/// ```no_run
466/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
467/// use opentelemetry::KeyValue;
468/// use opentelemetry_sdk::Resource;
469///
470/// # async fn example() -> Result<(), lambda_runtime::Error> {
471/// // Create custom resource
472/// let resource = Resource::builder()
473/// .with_attributes(vec![
474/// KeyValue::new("service.name", "payment-api"),
475/// KeyValue::new("service.version", "1.2.3"),
476/// ])
477/// .build();
478///
479/// // Initialize with custom configuration
480/// let (_, telemetry) = init_telemetry(
481/// TelemetryConfig::builder()
482/// .resource(resource)
483/// .build()
484/// ).await?;
485/// # Ok(())
486/// # }
487/// ```
488///
489/// Advanced usage with BatchSpanProcessor (required for async exporters):
490///
491/// ```no_run
492/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
493/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
494/// use opentelemetry_sdk::trace::BatchSpanProcessor;
495/// use lambda_runtime::Error;
496///
497/// # async fn example() -> Result<(), Error> {
498/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
499/// .with_http()
500/// .with_http_client(reqwest::Client::new())
501/// .with_protocol(Protocol::HttpBinary)
502/// .build()?;
503///
504/// let (provider, completion) = init_telemetry(
505/// TelemetryConfig::builder()
506/// .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
507/// .build()
508/// ).await?;
509/// # Ok(())
510/// # }
511/// ```
512///
513/// Using LambdaSpanProcessor with blocking http client:
514///
515/// ```no_run
516/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
517/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
518/// use lambda_runtime::Error;
519///
520/// # async fn example() -> Result<(), Error> {
521/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
522/// .with_http()
523/// .with_http_client(reqwest::blocking::Client::new())
524/// .with_protocol(Protocol::HttpBinary)
525/// .build()?;
526///
527/// let (provider, completion) = init_telemetry(
528/// TelemetryConfig::builder()
529/// .with_span_processor(
530/// LambdaSpanProcessor::builder()
531/// .exporter(lambda_exporter)
532/// .max_queue_size(2048)
533/// .build()
534/// )
535/// .build()
536/// ).await?;
537/// # Ok(())
538/// # }
539/// ```
540///
541pub async fn init_telemetry(
542 mut config: TelemetryConfig,
543) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
544 // Get mode from config or environment with environment taking precedence
545 let mode = ProcessorMode::resolve(config.processor_mode);
546
547 if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
548 let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
549
550 for propagator in propagators {
551 match propagator {
552 "tracecontext" => config
553 .propagators
554 .push(Box::new(TraceContextPropagator::new())),
555 "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
556 "xray-lambda" => config
557 .propagators
558 .push(Box::new(LambdaXrayPropagator::new())),
559 "none" => config.propagators.push(Box::new(NoopPropagator::new())),
560 _ => LOGGER.warn(format!(
561 "Unknown propagator: {}, using default propagators",
562 propagator
563 )),
564 }
565 }
566 } else {
567 // if no propagators are set, use the default propagators
568 if config.propagators.is_empty() {
569 // IMPORTANT:
570 // LambdaXrayPropagator is added *before* TraceContextPropagator
571 // because in OpenTelemetry Rust, the *last* propagator that extracts
572 // a valid context wins during extraction.
573 // This ensures that if both an AWS X-Ray header (or _X_AMZN_TRACE_ID)
574 // and a W3C traceparent header are present, the W3C traceparent takes precedence.
575 config
576 .propagators
577 .push(Box::new(LambdaXrayPropagator::new()));
578 config
579 .propagators
580 .push(Box::new(TraceContextPropagator::new()));
581 }
582 }
583
584 let composite_propagator = TextMapCompositePropagator::new(config.propagators);
585 global::set_text_map_propagator(composite_propagator);
586
587 // Add default span processor if none was added
588 if !config.has_processor {
589 let processor = LambdaSpanProcessor::builder()
590 .exporter(OtlpStdoutSpanExporter::default())
591 .build();
592 config.provider_builder = config.provider_builder.with_span_processor(processor);
593 }
594
595 // Apply defaults and build the provider
596 let resource = config.resource.unwrap_or_else(get_lambda_resource);
597
598 let provider = Arc::new(config.provider_builder.with_resource(resource).build());
599
600 // Register the extension if in async or finalize mode
601 let sender = match mode {
602 ProcessorMode::Async | ProcessorMode::Finalize => {
603 Some(register_extension(provider.clone(), mode.clone()).await?)
604 }
605 _ => None,
606 };
607
608 if config.set_global_provider {
609 // Set the provider as global
610 set_tracer_provider(provider.as_ref().clone());
611 }
612
613 // Initialize tracing subscriber with smart env var selection
614 let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
615 if env::var("RUST_LOG").is_ok() {
616 "RUST_LOG"
617 } else {
618 "AWS_LAMBDA_LOG_LEVEL"
619 }
620 });
621
622 let env_filter = tracing_subscriber::EnvFilter::builder()
623 .with_env_var(env_var_name)
624 .from_env_lossy();
625
626 let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
627 let tracer = completion_handler.get_tracer().clone();
628
629 let subscriber = tracing_subscriber::registry::Registry::default()
630 .with(tracing_opentelemetry::OpenTelemetryLayer::new(
631 tracer.clone(),
632 ))
633 .with(env_filter);
634
635 // Determine if fmt layer should be enabled - environment variable takes precedence when set
636 let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
637 match env_value.to_lowercase().as_str() {
638 "true" => true,
639 "false" => false,
640 other => {
641 LOGGER.warn(format!(
642 "Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
643 other,
644 constants::env_vars::ENABLE_FMT_LAYER
645 ));
646 config.enable_fmt_layer
647 }
648 }
649 } else {
650 // If env var not set, use the configured value
651 config.enable_fmt_layer
652 };
653
654 // Enable fmt layer based on the determined value
655 if enable_fmt {
656 // Determine if the lambda logging configuration is set to output json logs
657 let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
658 .unwrap_or_default()
659 .to_uppercase()
660 == "JSON";
661
662 if is_json {
663 tracing::subscriber::set_global_default(
664 subscriber.with(
665 tracing_subscriber::fmt::layer()
666 .with_target(false)
667 .without_time()
668 .json(),
669 ),
670 )?;
671 } else {
672 tracing::subscriber::set_global_default(
673 subscriber.with(
674 tracing_subscriber::fmt::layer()
675 .with_target(false)
676 .without_time()
677 .with_ansi(false),
678 ),
679 )?;
680 }
681 } else {
682 tracing::subscriber::set_global_default(subscriber)?;
683 }
684
685 Ok((tracer, completion_handler))
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691 use opentelemetry::trace::{Span, Tracer};
692 use opentelemetry_aws::trace::XrayIdGenerator;
693 use opentelemetry_sdk::trace::SimpleSpanProcessor;
694 use sealed_test::prelude::*;
695 use std::sync::Arc;
696 use tokio::sync::mpsc;
697
698 // Helper to clean up environment variables between tests
699 fn cleanup_env() {
700 env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
701 env::remove_var(constants::env_vars::PROPAGATORS);
702 env::remove_var("_X_AMZN_TRACE_ID");
703 }
704
705 #[test]
706 #[sealed_test]
707 fn test_telemetry_config_defaults() {
708 cleanup_env();
709
710 let config = TelemetryConfig::builder().build();
711 assert!(config.set_global_provider); // Should be true by default
712 assert!(!config.has_processor);
713 assert!(!config.enable_fmt_layer);
714 assert!(config.propagators.is_empty()); // No propagators by default in builder
715 }
716
717 #[test]
718 #[sealed_test]
719 fn test_telemetry_config_with_propagators() {
720 cleanup_env();
721
722 // Test with explicit tracecontext propagator
723 let config = TelemetryConfig::builder()
724 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
725 .with_named_propagator("tracecontext")
726 .build();
727 assert_eq!(config.propagators.len(), 1);
728
729 // Test with explicit xray propagator
730 let config = TelemetryConfig::builder()
731 .with_named_propagator("xray")
732 .build();
733 assert_eq!(config.propagators.len(), 1);
734
735 // Test with both propagators
736 let config = TelemetryConfig::builder()
737 .with_named_propagator("tracecontext")
738 .with_named_propagator("xray")
739 .build();
740 assert_eq!(config.propagators.len(), 2);
741
742 // Test with default propagators (empty - will be set in init_telemetry)
743 let config = TelemetryConfig::builder().build();
744 assert_eq!(config.propagators.len(), 0);
745
746 // Test with none
747 let config = TelemetryConfig::builder()
748 .with_named_propagator("none")
749 .build();
750 assert_eq!(config.propagators.len(), 1);
751 }
752
753 #[tokio::test]
754 #[sealed_test]
755 async fn test_telemetry_config_env_propagators_tracecontext() {
756 cleanup_env();
757
758 // Test with OTEL_PROPAGATORS=tracecontext
759 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
760 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
761 // In real usage we'd check the behavior rather than implementation details
762 // So we'll just check that we can create and use a handler
763 assert!(handler.sender.is_none());
764
765 cleanup_env();
766 }
767
768 #[tokio::test]
769 #[sealed_test]
770 async fn test_telemetry_config_env_propagators_xray() {
771 cleanup_env();
772
773 // Test with OTEL_PROPAGATORS=xray
774 env::set_var(constants::env_vars::PROPAGATORS, "xray");
775 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
776 assert!(handler.sender.is_none());
777
778 cleanup_env();
779 }
780
781 #[tokio::test]
782 #[sealed_test]
783 async fn test_telemetry_config_env_propagators_combined() {
784 cleanup_env();
785
786 // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
787 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
788 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
789 assert!(handler.sender.is_none());
790
791 cleanup_env();
792 }
793
794 #[tokio::test]
795 #[sealed_test]
796 async fn test_telemetry_config_env_propagators_none() {
797 cleanup_env();
798
799 // Test with OTEL_PROPAGATORS=none
800 env::set_var(constants::env_vars::PROPAGATORS, "none");
801 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
802 assert!(handler.sender.is_none());
803
804 cleanup_env();
805 }
806
807 #[tokio::test]
808 #[sealed_test]
809 async fn test_init_telemetry_defaults() {
810 let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
811 assert!(completion_handler.sender.is_none()); // Default mode is Sync
812 }
813
814 #[tokio::test]
815 #[sealed_test]
816 async fn test_init_telemetry_custom() {
817 let resource = Resource::builder().build();
818 let config = TelemetryConfig::builder()
819 .resource(resource)
820 .with_named_propagator("tracecontext")
821 .enable_fmt_layer(true)
822 .set_global_provider(false)
823 .build();
824
825 let (_, completion_handler) = init_telemetry(config).await.unwrap();
826 assert!(completion_handler.sender.is_none());
827 }
828
829 #[tokio::test]
830 #[sealed_test]
831 async fn test_telemetry_config_env_fmt_layer_true_override() {
832 cleanup_env();
833
834 // Test: Env var "true" overrides code setting "false"
835 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
836 let config = TelemetryConfig::default(); // code setting is false by default
837 assert!(!config.enable_fmt_layer); // Config should not be affected by env var
838
839 // Initialize telemetry - env var should override config
840 let result = init_telemetry(config).await;
841 assert!(result.is_ok());
842
843 // Clean up
844 cleanup_env();
845 }
846
847 #[tokio::test]
848 #[sealed_test]
849 async fn test_telemetry_config_env_fmt_layer_false_override() {
850 cleanup_env();
851
852 // Test: Env var "false" overrides code setting "true"
853 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
854 let config = TelemetryConfig::builder()
855 .enable_fmt_layer(true) // code setting is true
856 .build();
857 assert!(config.enable_fmt_layer);
858
859 // Initialize telemetry - env var should override config
860 let result = init_telemetry(config).await;
861 assert!(result.is_ok());
862
863 // Clean up
864 cleanup_env();
865 }
866
867 #[tokio::test]
868 #[sealed_test]
869 async fn test_telemetry_config_env_fmt_layer_invalid() {
870 cleanup_env();
871
872 // Test: Invalid env var falls back to code setting
873 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
874 let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
875
876 // Initialize telemetry - should log a warning but use code setting
877 let result = init_telemetry(config).await;
878 assert!(result.is_ok());
879
880 // Clean up
881 cleanup_env();
882 }
883
884 #[tokio::test]
885 #[sealed_test]
886 async fn test_telemetry_config_env_fmt_layer_not_set() {
887 cleanup_env();
888
889 // Test: No env var uses code setting
890 let config = TelemetryConfig::default();
891 assert!(!config.enable_fmt_layer);
892
893 let result = init_telemetry(config).await;
894 assert!(result.is_ok());
895
896 // Clean up
897 cleanup_env();
898 }
899
900 #[test]
901 fn test_completion_handler_sync_mode() {
902 let provider = Arc::new(
903 SdkTracerProvider::builder()
904 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
905 .build(),
906 );
907
908 let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
909
910 // In sync mode, complete() should call force_flush
911 handler.complete();
912 // Note: We can't easily verify the flush was called since TracerProvider
913 // doesn't expose this information, but we can verify it doesn't panic
914 }
915
916 #[tokio::test]
917 async fn test_completion_handler_async_mode() {
918 let provider = Arc::new(
919 SdkTracerProvider::builder()
920 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
921 .build(),
922 );
923
924 let (tx, mut rx) = mpsc::unbounded_channel();
925
926 let completion_handler =
927 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
928
929 // In async mode, complete() should send a message through the channel
930 completion_handler.complete();
931
932 // Verify that we received the completion signal
933 assert!(rx.try_recv().is_ok());
934 // Verify channel is now empty
935 assert!(rx.try_recv().is_err());
936 }
937
938 #[test]
939 fn test_completion_handler_finalize_mode() {
940 let provider = Arc::new(
941 SdkTracerProvider::builder()
942 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
943 .build(),
944 );
945
946 let (tx, _rx) = mpsc::unbounded_channel();
947
948 let completion_handler =
949 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
950
951 // In finalize mode, complete() should do nothing
952 completion_handler.complete();
953 // Verify it doesn't panic or cause issues
954 }
955
956 #[test]
957 fn test_completion_handler_clone() {
958 let provider = Arc::new(
959 SdkTracerProvider::builder()
960 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
961 .build(),
962 );
963
964 let (tx, _rx) = mpsc::unbounded_channel();
965
966 let completion_handler =
967 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
968
969 // Test that Clone is implemented correctly
970 let cloned = completion_handler.clone();
971
972 // Verify both handlers have the same mode
973 assert!(matches!(cloned.mode, ProcessorMode::Async));
974 assert!(cloned.sender.is_some());
975 }
976
977 #[test]
978 fn test_completion_handler_sync_mode_error_handling() {
979 let provider = Arc::new(
980 SdkTracerProvider::builder()
981 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
982 .build(),
983 );
984
985 let completion_handler =
986 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
987
988 // Test that complete() doesn't panic
989 completion_handler.complete();
990 }
991
992 #[tokio::test]
993 async fn test_completion_handler_async_mode_error_handling() {
994 let provider = Arc::new(
995 SdkTracerProvider::builder()
996 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
997 .build(),
998 );
999
1000 // Use UnboundedSender instead of Sender
1001 let (tx, _rx) = mpsc::unbounded_channel();
1002 // Fill the channel by dropping the receiver
1003 drop(_rx);
1004
1005 let completion_handler =
1006 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
1007
1008 // Test that complete() doesn't panic when receiver is dropped
1009 completion_handler.complete();
1010 }
1011
1012 #[test]
1013 #[sealed_test]
1014 fn test_telemetry_config_with_id_generator() {
1015 cleanup_env();
1016
1017 // Create a config with X-Ray ID generator
1018 let config = TelemetryConfig::builder()
1019 .with_id_generator(XrayIdGenerator::default())
1020 .build();
1021
1022 // We can't directly check the ID generator type since it's boxed inside the provider,
1023 // but we can verify it's applied by checking the generated trace IDs format
1024 let provider = Arc::new(config.provider_builder.build());
1025
1026 // Create a scope with attributes
1027 let scope = opentelemetry::InstrumentationScope::builder("test")
1028 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
1029 .build();
1030
1031 // Get a tracer using the correct API
1032 let tracer = provider.tracer_with_scope(scope);
1033
1034 // Start a span using the tracer
1035 let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
1036 let trace_id = span.span_context().trace_id();
1037
1038 // Verify X-Ray trace ID format:
1039 // 1. Convert to hex string for easier checking
1040 let trace_id_hex = format!("{:032x}", trace_id);
1041
1042 // 2. The first 8 characters of X-Ray trace IDs represent a timestamp in seconds
1043 // This is the key characteristic of X-Ray trace IDs that we can verify
1044 let timestamp_part = &trace_id_hex[0..8];
1045
1046 // 3. Parse the hex timestamp to ensure it's a valid timestamp (recent past)
1047 let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
1048
1049 // 4. Check that timestamp is reasonable (within the last day)
1050 let now = std::time::SystemTime::now()
1051 .duration_since(std::time::UNIX_EPOCH)
1052 .unwrap()
1053 .as_secs() as u32;
1054
1055 // The timestamp should be within the last day
1056 assert!(timestamp <= now);
1057 assert!(timestamp > now - 86400); // Within the last day
1058
1059 // Verify remaining 24 characters are not all zeros (random part)
1060 let random_part = &trace_id_hex[8..];
1061 assert_ne!(random_part, "000000000000000000000000");
1062 }
1063}
1064
1065// A simple no-op propagator
1066#[derive(Debug)]
1067struct NoopPropagator;
1068
1069impl NoopPropagator {
1070 fn new() -> Self {
1071 NoopPropagator
1072 }
1073}
1074
1075impl TextMapPropagator for NoopPropagator {
1076 fn inject_context(
1077 &self,
1078 _cx: &opentelemetry::Context,
1079 _injector: &mut dyn opentelemetry::propagation::Injector,
1080 ) {
1081 }
1082
1083 fn extract_with_context(
1084 &self,
1085 cx: &opentelemetry::Context,
1086 _extractor: &dyn opentelemetry::propagation::Extractor,
1087 ) -> opentelemetry::Context {
1088 cx.clone()
1089 }
1090
1091 fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
1092 opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
1093 }
1094}