lambda_otel_lite/
telemetry.rs

1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//!     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//!     let resource = Resource::builder()
47//!         .with_attributes(vec![
48//!             KeyValue::new("service.version", "1.0.0"),
49//!             KeyValue::new("deployment.environment", "production"),
50//!         ])
51//!         .build();
52//!
53//!     let config = TelemetryConfig::builder()
54//!         .resource(resource)
55//!         .build();
56//!
57//!     let (_, completion_handler) = init_telemetry(config).await?;
58//!     Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//!     let config = TelemetryConfig::builder()
72//!         .with_span_processor(SimpleSpanProcessor::new(
73//!             Box::new(OtlpStdoutSpanExporter::default())
74//!         ))
75//!         .enable_fmt_layer(true)
76//!         .build();
77//!
78//!     let (_, completion_handler) = init_telemetry(config).await?;
79//!     Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95    constants, extension::register_extension, mode::ProcessorMode, processor::LambdaSpanProcessor,
96    propagation::LambdaXrayPropagator, resource::get_lambda_resource,
97};
98use bon::Builder;
99use lambda_runtime::Error;
100use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
101use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
102use opentelemetry_aws::trace::XrayPropagator;
103use opentelemetry_sdk::{
104    propagation::TraceContextPropagator,
105    trace::{IdGenerator, SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
106    Resource,
107};
108use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
109use std::{borrow::Cow, env, sync::Arc};
110use tokio::sync::mpsc::UnboundedSender;
111use tracing_subscriber::layer::SubscriberExt;
112
113/// Manages the lifecycle of span export based on the processing mode.
114///
115/// This handler must be used to signal when spans should be exported. Its behavior
116/// varies by processing mode:
117/// - Sync: Forces immediate export
118/// - Async: Signals the extension to export
119/// - Finalize: Defers to span processor
120///
121/// # Thread Safety
122///
123/// This type is `Clone` and can be safely shared between threads.
124#[derive(Clone)]
125pub struct TelemetryCompletionHandler {
126    provider: Arc<SdkTracerProvider>,
127    sender: Option<UnboundedSender<()>>,
128    mode: ProcessorMode,
129    tracer: opentelemetry_sdk::trace::Tracer,
130}
131
132impl TelemetryCompletionHandler {
133    pub fn new(
134        provider: Arc<SdkTracerProvider>,
135        sender: Option<UnboundedSender<()>>,
136        mode: ProcessorMode,
137    ) -> Self {
138        // Create instrumentation scope with attributes
139        let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
140            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
141            .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
142            .with_attributes(vec![
143                KeyValue::new("library.language", "rust"),
144                KeyValue::new("library.type", "instrumentation"),
145                KeyValue::new("library.runtime", "aws_lambda"),
146            ])
147            .build();
148
149        // Create tracer with instrumentation scope
150        let tracer = provider.tracer_with_scope(scope);
151
152        Self {
153            provider,
154            sender,
155            mode,
156            tracer,
157        }
158    }
159
160    /// Get the tracer instance for creating spans.
161    ///
162    /// Returns the cached tracer instance configured with this package's instrumentation scope.
163    /// The tracer is configured with the provider's settings and will automatically use
164    /// the correct span processor based on the processing mode.
165    pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
166        &self.tracer
167    }
168
169    /// Complete telemetry processing for the current invocation
170    ///
171    /// In Sync mode, this will force flush the provider and log any errors that occur.
172    /// In Async mode, this will send a completion signal to the extension.
173    /// In Finalize mode, this will do nothing (handled by drop).
174    pub fn complete(&self) {
175        match self.mode {
176            ProcessorMode::Sync => {
177                if let Err(e) = self.provider.force_flush() {
178                    tracing::warn!(error = ?e, "Error flushing telemetry");
179                }
180            }
181            ProcessorMode::Async => {
182                if let Some(sender) = &self.sender {
183                    if let Err(e) = sender.send(()) {
184                        tracing::warn!(error = ?e, "Failed to send completion signal to extension");
185                    }
186                }
187            }
188            ProcessorMode::Finalize => {
189                // Do nothing, handled by drop
190            }
191        }
192    }
193}
194
195/// Configuration for OpenTelemetry initialization.
196///
197/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
198/// for standard Lambda configuration, or the builder pattern for customization.
199///
200/// # Fields
201///
202/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
203/// * `set_global_provider` - Set as global tracer provider (default: true)
204/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
205/// * `env_var_name` - Environment variable name for log level configuration
206/// * `id_generator` - Custom ID generator for trace and span IDs
207///
208/// # Examples
209///
210/// Basic usage with default configuration:
211///
212/// ```no_run
213/// use lambda_otel_lite::telemetry::TelemetryConfig;
214///
215/// let config = TelemetryConfig::default();
216/// ```
217///
218/// Custom configuration with resource attributes:
219///
220/// ```no_run
221/// use lambda_otel_lite::telemetry::TelemetryConfig;
222/// use opentelemetry::KeyValue;
223/// use opentelemetry_sdk::Resource;
224///
225/// let config = TelemetryConfig::builder()
226///     .resource(Resource::builder()
227///         .with_attributes(vec![KeyValue::new("version", "1.0.0")])
228///         .build())
229///     .build();
230/// ```
231///
232/// Custom configuration with logging options:
233///
234/// ```no_run
235/// use lambda_otel_lite::telemetry::TelemetryConfig;
236///
237/// let config = TelemetryConfig::builder()
238///     .enable_fmt_layer(true)  // Enable console output for debugging
239///     .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string())  // Custom env var for log level
240///     .build();
241/// ```
242#[derive(Builder, Debug)]
243pub struct TelemetryConfig {
244    // Custom fields for internal state
245    #[builder(field)]
246    provider_builder: TracerProviderBuilder,
247
248    #[builder(field)]
249    has_processor: bool,
250
251    #[builder(field)]
252    propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
253
254    /// Enable console output for debugging.
255    ///
256    /// When enabled, spans and events will be printed to the console in addition
257    /// to being exported through the configured span processors. This is useful
258    /// for debugging but adds overhead and should be disabled in production.
259    ///
260    /// This can also be controlled via the `LAMBDA_TRACING_ENABLE_FMT_LAYER` environment variable,
261    /// which takes precedence over this setting when present:
262    /// - Setting the env var to "true" will enable console output even if this field is false
263    /// - Setting the env var to "false" will disable console output even if this field is true
264    /// - Invalid values will log a warning and fall back to this code setting
265    ///
266    /// This environment variable override allows toggling logging for debugging without code changes.
267    ///
268    /// Default: `false`
269    #[builder(default = false)]
270    pub enable_fmt_layer: bool,
271
272    /// Set this provider as the global OpenTelemetry provider.
273    ///
274    /// When enabled, the provider will be registered as the global provider
275    /// for the OpenTelemetry API. This allows using the global tracer API
276    /// without explicitly passing around the provider.
277    ///
278    /// Default: `true`
279    #[builder(default = true)]
280    pub set_global_provider: bool,
281
282    /// Custom resource attributes for all spans.
283    ///
284    /// If not provided, resource attributes will be automatically detected
285    /// from the Lambda environment. Custom resources will override any
286    /// automatically detected attributes with the same keys.
287    ///
288    /// Default: `None` (auto-detected from Lambda environment)
289    pub resource: Option<Resource>,
290
291    /// Environment variable name to use for log level configuration.
292    ///
293    /// This field specifies which environment variable should be used to configure
294    /// the tracing subscriber's log level filter. If not specified, the system will
295    /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
296    ///
297    /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
298    pub env_var_name: Option<String>,
299}
300
301impl Default for TelemetryConfig {
302    fn default() -> Self {
303        Self::builder().build()
304    }
305}
306
307/// Builder methods for adding span processors and other configuration
308impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
309    /// Add a span processor to the tracer provider.
310    ///
311    /// This method allows adding custom span processors for trace data processing.
312    /// Multiple processors can be added by calling this method multiple times.
313    ///
314    /// # Arguments
315    ///
316    /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
317    ///
318    /// # Examples
319    ///
320    /// ```no_run
321    /// use lambda_otel_lite::TelemetryConfig;
322    /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
323    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
324    ///
325    /// // Only use builder when adding custom processors
326    /// let config = TelemetryConfig::builder()
327    ///     .with_span_processor(SimpleSpanProcessor::new(
328    ///         Box::new(OtlpStdoutSpanExporter::default())
329    ///     ))
330    ///     .build();
331    /// ```
332    pub fn with_span_processor<T>(mut self, processor: T) -> Self
333    where
334        T: SpanProcessor + 'static,
335    {
336        self.provider_builder = self.provider_builder.with_span_processor(processor);
337        self.has_processor = true;
338        self
339    }
340
341    /// Add a propagator to the list of propagators.
342    ///
343    /// Multiple propagators can be added and will be combined into a composite propagator.
344    /// The default propagator is TraceContextPropagator.
345    ///
346    /// # Arguments
347    ///
348    /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
349    ///
350    /// # Examples
351    ///
352    /// ```no_run
353    /// use lambda_otel_lite::TelemetryConfig;
354    /// use opentelemetry_sdk::propagation::BaggagePropagator;
355    ///
356    /// let config = TelemetryConfig::builder()
357    ///     .with_propagator(BaggagePropagator::new())
358    ///     .build();
359    /// ```
360    pub fn with_propagator<T>(mut self, propagator: T) -> Self
361    where
362        T: TextMapPropagator + Send + Sync + 'static,
363    {
364        self.propagators.push(Box::new(propagator));
365        self
366    }
367
368    pub fn with_named_propagator(self, name: &str) -> Self {
369        match name {
370            "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
371            "xray" => self.with_propagator(XrayPropagator::new()),
372            "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
373            "none" => self.with_propagator(NoopPropagator::new()),
374            _ => {
375                tracing::warn!("Unknown propagator: {}, using default propagators", name);
376                self
377            }
378        }
379    }
380
381    /// Add a custom ID generator to the tracer provider.
382    ///
383    /// This method allows setting a custom ID generator for trace and span IDs.
384    /// This is particularly useful when integrating with AWS X-Ray, which requires
385    /// a specific ID format.
386    ///
387    /// # Arguments
388    ///
389    /// * `id_generator` - An ID generator implementing the [`IdGenerator`] trait
390    ///
391    /// # Examples
392    ///
393    /// ```no_run
394    /// use lambda_otel_lite::TelemetryConfig;
395    /// use opentelemetry_aws::trace::XrayIdGenerator;
396    ///
397    /// // Configure with X-Ray compatible ID generator
398    /// let config = TelemetryConfig::builder()
399    ///     .with_id_generator(XrayIdGenerator::default())
400    ///     .build();
401    /// ```
402    pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
403    where
404        T: IdGenerator + 'static,
405    {
406        self.provider_builder = self.provider_builder.with_id_generator(id_generator);
407        self
408    }
409}
410
411/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
412///
413/// # Arguments
414///
415/// * `config` - Configuration for telemetry initialization
416///
417/// # Returns
418///
419/// Returns a tuple containing:
420/// - A tracer instance for manual instrumentation
421/// - A completion handler for managing span export timing
422///
423/// # Errors
424///
425/// Returns error if:
426/// - Extension registration fails (async/finalize modes)
427/// - Tracer provider initialization fails
428/// - Environment variable parsing fails
429///
430/// # Examples
431///
432/// Basic usage with default configuration:
433///
434/// ```no_run
435/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
436///
437/// # async fn example() -> Result<(), lambda_runtime::Error> {
438/// // Initialize with default configuration
439/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
440/// # Ok(())
441/// # }
442/// ```
443///
444/// Custom configuration:
445///
446/// ```no_run
447/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
448/// use opentelemetry::KeyValue;
449/// use opentelemetry_sdk::Resource;
450///
451/// # async fn example() -> Result<(), lambda_runtime::Error> {
452/// // Create custom resource
453/// let resource = Resource::builder()
454///     .with_attributes(vec![
455///         KeyValue::new("service.name", "payment-api"),
456///         KeyValue::new("service.version", "1.2.3"),
457///     ])
458///     .build();
459///
460/// // Initialize with custom configuration
461/// let (_, telemetry) = init_telemetry(
462///     TelemetryConfig::builder()
463///         .resource(resource)
464///         .build()
465/// ).await?;
466/// # Ok(())
467/// # }
468/// ```
469///
470/// Advanced usage with BatchSpanProcessor (required for async exporters):
471///
472/// ```no_run
473/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
474/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
475/// use opentelemetry_sdk::trace::BatchSpanProcessor;
476/// use lambda_runtime::Error;
477///
478/// # async fn example() -> Result<(), Error> {
479/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
480///     .with_http()
481///     .with_http_client(reqwest::Client::new())
482///     .with_protocol(Protocol::HttpBinary)
483///     .build()?;
484///
485/// let (provider, completion) = init_telemetry(
486///     TelemetryConfig::builder()
487///         .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
488///         .build()
489/// ).await?;
490/// # Ok(())
491/// # }
492/// ```
493///
494/// Using LambdaSpanProcessor with blocking http client:
495///
496/// ```no_run
497/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
498/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
499/// use lambda_runtime::Error;
500///
501/// # async fn example() -> Result<(), Error> {
502/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
503///     .with_http()
504///     .with_http_client(reqwest::blocking::Client::new())
505///     .with_protocol(Protocol::HttpBinary)
506///     .build()?;
507///
508/// let (provider, completion) = init_telemetry(
509///     TelemetryConfig::builder()
510///         .with_span_processor(
511///             LambdaSpanProcessor::builder()
512///                 .exporter(lambda_exporter)
513///                 .max_batch_size(512)
514///                 .max_queue_size(2048)
515///                 .build()
516///         )
517///         .build()
518/// ).await?;
519/// # Ok(())
520/// # }
521/// ```
522///
523pub async fn init_telemetry(
524    mut config: TelemetryConfig,
525) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
526    let mode = ProcessorMode::from_env();
527
528    if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
529        let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
530
531        for propagator in propagators {
532            match propagator {
533                "tracecontext" => config
534                    .propagators
535                    .push(Box::new(TraceContextPropagator::new())),
536                "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
537                "xray-lambda" => config
538                    .propagators
539                    .push(Box::new(LambdaXrayPropagator::new())),
540                "none" => config.propagators.push(Box::new(NoopPropagator::new())),
541                _ => tracing::warn!(
542                    "Unknown propagator: {}, using default propagators",
543                    propagator
544                ),
545            }
546        }
547    } else {
548        // if no propagators are set, use the default propagators
549        if config.propagators.is_empty() {
550            config
551                .propagators
552                .push(Box::new(TraceContextPropagator::new()));
553            config
554                .propagators
555                .push(Box::new(LambdaXrayPropagator::new()));
556        }
557    }
558
559    let composite_propagator = TextMapCompositePropagator::new(config.propagators);
560    global::set_text_map_propagator(composite_propagator);
561
562    // Add default span processor if none was added
563    if !config.has_processor {
564        let processor = LambdaSpanProcessor::builder()
565            .exporter(OtlpStdoutSpanExporter::default())
566            .build();
567        config.provider_builder = config.provider_builder.with_span_processor(processor);
568    }
569
570    // Apply defaults and build the provider
571    let resource = config.resource.unwrap_or_else(get_lambda_resource);
572
573    let provider = Arc::new(config.provider_builder.with_resource(resource).build());
574
575    // Register the extension if in async or finalize mode
576    let sender = match mode {
577        ProcessorMode::Async | ProcessorMode::Finalize => {
578            Some(register_extension(provider.clone(), mode.clone()).await?)
579        }
580        _ => None,
581    };
582
583    if config.set_global_provider {
584        // Set the provider as global
585        set_tracer_provider(provider.as_ref().clone());
586    }
587
588    // Initialize tracing subscriber with smart env var selection
589    let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
590        if env::var("RUST_LOG").is_ok() {
591            "RUST_LOG"
592        } else {
593            "AWS_LAMBDA_LOG_LEVEL"
594        }
595    });
596
597    let env_filter = tracing_subscriber::EnvFilter::builder()
598        .with_env_var(env_var_name)
599        .from_env_lossy();
600
601    let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
602    let tracer = completion_handler.get_tracer().clone();
603
604    let subscriber = tracing_subscriber::registry::Registry::default()
605        .with(tracing_opentelemetry::OpenTelemetryLayer::new(
606            tracer.clone(),
607        ))
608        .with(env_filter);
609
610    // Determine if fmt layer should be enabled - environment variable takes precedence when set
611    let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
612        match env_value.to_lowercase().as_str() {
613            "true" => true,
614            "false" => false,
615            other => {
616                tracing::warn!(
617                    "Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
618                    other,
619                    constants::env_vars::ENABLE_FMT_LAYER
620                );
621                config.enable_fmt_layer
622            }
623        }
624    } else {
625        // If env var not set, use the configured value
626        config.enable_fmt_layer
627    };
628
629    // Enable fmt layer based on the determined value
630    if enable_fmt {
631        // Determine if the lambda logging configuration is set to output json logs
632        let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
633            .unwrap_or_default()
634            .to_uppercase()
635            == "JSON";
636
637        if is_json {
638            tracing::subscriber::set_global_default(
639                subscriber.with(
640                    tracing_subscriber::fmt::layer()
641                        .with_target(false)
642                        .without_time()
643                        .json(),
644                ),
645            )?;
646        } else {
647            tracing::subscriber::set_global_default(
648                subscriber.with(
649                    tracing_subscriber::fmt::layer()
650                        .with_target(false)
651                        .without_time()
652                        .with_ansi(false),
653                ),
654            )?;
655        }
656    } else {
657        tracing::subscriber::set_global_default(subscriber)?;
658    }
659
660    Ok((tracer, completion_handler))
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666    use opentelemetry::trace::{Span, Tracer};
667    use opentelemetry_aws::trace::XrayIdGenerator;
668    use opentelemetry_sdk::trace::SimpleSpanProcessor;
669    use sealed_test::prelude::*;
670    use std::sync::Arc;
671    use tokio::sync::mpsc;
672
673    // Helper to clean up environment variables between tests
674    fn cleanup_env() {
675        env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
676        env::remove_var(constants::env_vars::PROPAGATORS);
677        env::remove_var("_X_AMZN_TRACE_ID");
678    }
679
680    #[test]
681    #[sealed_test]
682    fn test_telemetry_config_defaults() {
683        cleanup_env();
684
685        let config = TelemetryConfig::builder().build();
686        assert!(config.set_global_provider); // Should be true by default
687        assert!(!config.has_processor);
688        assert!(!config.enable_fmt_layer);
689        assert!(config.propagators.is_empty()); // No propagators by default in builder
690    }
691
692    #[test]
693    #[sealed_test]
694    fn test_telemetry_config_with_propagators() {
695        cleanup_env();
696
697        // Test with explicit tracecontext propagator
698        let config = TelemetryConfig::builder()
699            .with_span_processor(SimpleSpanProcessor::new(Box::new(
700                OtlpStdoutSpanExporter::default(),
701            )))
702            .with_named_propagator("tracecontext")
703            .build();
704        assert_eq!(config.propagators.len(), 1);
705
706        // Test with explicit xray propagator
707        let config = TelemetryConfig::builder()
708            .with_named_propagator("xray")
709            .build();
710        assert_eq!(config.propagators.len(), 1);
711
712        // Test with both propagators
713        let config = TelemetryConfig::builder()
714            .with_named_propagator("tracecontext")
715            .with_named_propagator("xray")
716            .build();
717        assert_eq!(config.propagators.len(), 2);
718
719        // Test with default propagators (empty - will be set in init_telemetry)
720        let config = TelemetryConfig::builder().build();
721        assert_eq!(config.propagators.len(), 0);
722
723        // Test with none
724        let config = TelemetryConfig::builder()
725            .with_named_propagator("none")
726            .build();
727        assert_eq!(config.propagators.len(), 1);
728    }
729
730    #[tokio::test]
731    #[sealed_test]
732    async fn test_telemetry_config_env_propagators_tracecontext() {
733        cleanup_env();
734
735        // Test with OTEL_PROPAGATORS=tracecontext
736        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
737        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
738        // In real usage we'd check the behavior rather than implementation details
739        // So we'll just check that we can create and use a handler
740        assert!(handler.sender.is_none());
741
742        cleanup_env();
743    }
744
745    #[tokio::test]
746    #[sealed_test]
747    async fn test_telemetry_config_env_propagators_xray() {
748        cleanup_env();
749
750        // Test with OTEL_PROPAGATORS=xray
751        env::set_var(constants::env_vars::PROPAGATORS, "xray");
752        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
753        assert!(handler.sender.is_none());
754
755        cleanup_env();
756    }
757
758    #[tokio::test]
759    #[sealed_test]
760    async fn test_telemetry_config_env_propagators_combined() {
761        cleanup_env();
762
763        // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
764        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
765        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
766        assert!(handler.sender.is_none());
767
768        cleanup_env();
769    }
770
771    #[tokio::test]
772    #[sealed_test]
773    async fn test_telemetry_config_env_propagators_none() {
774        cleanup_env();
775
776        // Test with OTEL_PROPAGATORS=none
777        env::set_var(constants::env_vars::PROPAGATORS, "none");
778        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
779        assert!(handler.sender.is_none());
780
781        cleanup_env();
782    }
783
784    #[tokio::test]
785    #[sealed_test]
786    async fn test_init_telemetry_defaults() {
787        let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
788        assert!(completion_handler.sender.is_none()); // Default mode is Sync
789    }
790
791    #[tokio::test]
792    #[sealed_test]
793    async fn test_init_telemetry_custom() {
794        let resource = Resource::builder().build();
795        let config = TelemetryConfig::builder()
796            .resource(resource)
797            .with_named_propagator("tracecontext")
798            .enable_fmt_layer(true)
799            .set_global_provider(false)
800            .build();
801
802        let (_, completion_handler) = init_telemetry(config).await.unwrap();
803        assert!(completion_handler.sender.is_none());
804    }
805
806    #[tokio::test]
807    #[sealed_test]
808    async fn test_telemetry_config_env_fmt_layer_true_override() {
809        cleanup_env();
810
811        // Test: Env var "true" overrides code setting "false"
812        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
813        let config = TelemetryConfig::default(); // code setting is false by default
814        assert!(!config.enable_fmt_layer); // Config should not be affected by env var
815
816        // Initialize telemetry - env var should override config
817        let result = init_telemetry(config).await;
818        assert!(result.is_ok());
819
820        // Clean up
821        cleanup_env();
822    }
823
824    #[tokio::test]
825    #[sealed_test]
826    async fn test_telemetry_config_env_fmt_layer_false_override() {
827        cleanup_env();
828
829        // Test: Env var "false" overrides code setting "true"
830        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
831        let config = TelemetryConfig::builder()
832            .enable_fmt_layer(true) // code setting is true
833            .build();
834        assert!(config.enable_fmt_layer);
835
836        // Initialize telemetry - env var should override config
837        let result = init_telemetry(config).await;
838        assert!(result.is_ok());
839
840        // Clean up
841        cleanup_env();
842    }
843
844    #[tokio::test]
845    #[sealed_test]
846    async fn test_telemetry_config_env_fmt_layer_invalid() {
847        cleanup_env();
848
849        // Test: Invalid env var falls back to code setting
850        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
851        let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
852
853        // Initialize telemetry - should log a warning but use code setting
854        let result = init_telemetry(config).await;
855        assert!(result.is_ok());
856
857        // Clean up
858        cleanup_env();
859    }
860
861    #[tokio::test]
862    #[sealed_test]
863    async fn test_telemetry_config_env_fmt_layer_not_set() {
864        cleanup_env();
865
866        // Test: No env var uses code setting
867        let config = TelemetryConfig::default();
868        assert!(!config.enable_fmt_layer);
869
870        let result = init_telemetry(config).await;
871        assert!(result.is_ok());
872
873        // Clean up
874        cleanup_env();
875    }
876
877    #[test]
878    fn test_completion_handler_sync_mode() {
879        let provider = Arc::new(
880            SdkTracerProvider::builder()
881                .with_span_processor(SimpleSpanProcessor::new(Box::new(
882                    OtlpStdoutSpanExporter::default(),
883                )))
884                .build(),
885        );
886
887        let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
888
889        // In sync mode, complete() should call force_flush
890        handler.complete();
891        // Note: We can't easily verify the flush was called since TracerProvider
892        // doesn't expose this information, but we can verify it doesn't panic
893    }
894
895    #[tokio::test]
896    async fn test_completion_handler_async_mode() {
897        let provider = Arc::new(
898            SdkTracerProvider::builder()
899                .with_span_processor(SimpleSpanProcessor::new(Box::new(
900                    OtlpStdoutSpanExporter::default(),
901                )))
902                .build(),
903        );
904
905        let (tx, mut rx) = mpsc::unbounded_channel();
906
907        let completion_handler =
908            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
909
910        // In async mode, complete() should send a message through the channel
911        completion_handler.complete();
912
913        // Verify that we received the completion signal
914        assert!(rx.try_recv().is_ok());
915        // Verify channel is now empty
916        assert!(rx.try_recv().is_err());
917    }
918
919    #[test]
920    fn test_completion_handler_finalize_mode() {
921        let provider = Arc::new(
922            SdkTracerProvider::builder()
923                .with_span_processor(SimpleSpanProcessor::new(Box::new(
924                    OtlpStdoutSpanExporter::default(),
925                )))
926                .build(),
927        );
928
929        let (tx, _rx) = mpsc::unbounded_channel();
930
931        let completion_handler =
932            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
933
934        // In finalize mode, complete() should do nothing
935        completion_handler.complete();
936        // Verify it doesn't panic or cause issues
937    }
938
939    #[test]
940    fn test_completion_handler_clone() {
941        let provider = Arc::new(
942            SdkTracerProvider::builder()
943                .with_span_processor(SimpleSpanProcessor::new(Box::new(
944                    OtlpStdoutSpanExporter::default(),
945                )))
946                .build(),
947        );
948
949        let (tx, _rx) = mpsc::unbounded_channel();
950
951        let completion_handler =
952            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
953
954        // Test that Clone is implemented correctly
955        let cloned = completion_handler.clone();
956
957        // Verify both handlers have the same mode
958        assert!(matches!(cloned.mode, ProcessorMode::Async));
959        assert!(cloned.sender.is_some());
960    }
961
962    #[test]
963    fn test_completion_handler_sync_mode_error_handling() {
964        let provider = Arc::new(
965            SdkTracerProvider::builder()
966                .with_span_processor(SimpleSpanProcessor::new(Box::new(
967                    OtlpStdoutSpanExporter::default(),
968                )))
969                .build(),
970        );
971
972        let completion_handler =
973            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
974
975        // Test that complete() doesn't panic
976        completion_handler.complete();
977    }
978
979    #[tokio::test]
980    async fn test_completion_handler_async_mode_error_handling() {
981        let provider = Arc::new(
982            SdkTracerProvider::builder()
983                .with_span_processor(SimpleSpanProcessor::new(Box::new(
984                    OtlpStdoutSpanExporter::default(),
985                )))
986                .build(),
987        );
988
989        // Use UnboundedSender instead of Sender
990        let (tx, _rx) = mpsc::unbounded_channel();
991        // Fill the channel by dropping the receiver
992        drop(_rx);
993
994        let completion_handler =
995            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
996
997        // Test that complete() doesn't panic when receiver is dropped
998        completion_handler.complete();
999    }
1000
1001    #[test]
1002    #[sealed_test]
1003    fn test_telemetry_config_with_id_generator() {
1004        cleanup_env();
1005
1006        // Create a config with X-Ray ID generator
1007        let config = TelemetryConfig::builder()
1008            .with_id_generator(XrayIdGenerator::default())
1009            .build();
1010
1011        // We can't directly check the ID generator type since it's boxed inside the provider,
1012        // but we can verify it's applied by checking the generated trace IDs format
1013        let provider = Arc::new(config.provider_builder.build());
1014
1015        // Create a scope with attributes
1016        let scope = opentelemetry::InstrumentationScope::builder("test")
1017            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
1018            .build();
1019
1020        // Get a tracer using the correct API
1021        let tracer = provider.tracer_with_scope(scope);
1022
1023        // Start a span using the tracer
1024        let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
1025        let trace_id = span.span_context().trace_id();
1026
1027        // Verify X-Ray trace ID format:
1028        // 1. Convert to hex string for easier checking
1029        let trace_id_hex = format!("{:032x}", trace_id);
1030
1031        // 2. The first 8 characters of X-Ray trace IDs represent a timestamp in seconds
1032        // This is the key characteristic of X-Ray trace IDs that we can verify
1033        let timestamp_part = &trace_id_hex[0..8];
1034
1035        // 3. Parse the hex timestamp to ensure it's a valid timestamp (recent past)
1036        let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
1037
1038        // 4. Check that timestamp is reasonable (within the last day)
1039        let now = std::time::SystemTime::now()
1040            .duration_since(std::time::UNIX_EPOCH)
1041            .unwrap()
1042            .as_secs() as u32;
1043
1044        // The timestamp should be within the last day
1045        assert!(timestamp <= now);
1046        assert!(timestamp > now - 86400); // Within the last day
1047
1048        // Verify remaining 24 characters are not all zeros (random part)
1049        let random_part = &trace_id_hex[8..];
1050        assert_ne!(random_part, "000000000000000000000000");
1051    }
1052}
1053
1054// A simple no-op propagator
1055#[derive(Debug)]
1056struct NoopPropagator;
1057
1058impl NoopPropagator {
1059    fn new() -> Self {
1060        NoopPropagator
1061    }
1062}
1063
1064impl TextMapPropagator for NoopPropagator {
1065    fn inject_context(
1066        &self,
1067        _cx: &opentelemetry::Context,
1068        _injector: &mut dyn opentelemetry::propagation::Injector,
1069    ) {
1070    }
1071
1072    fn extract_with_context(
1073        &self,
1074        cx: &opentelemetry::Context,
1075        _extractor: &dyn opentelemetry::propagation::Extractor,
1076    ) -> opentelemetry::Context {
1077        cx.clone()
1078    }
1079
1080    fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
1081        opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
1082    }
1083}