Skip to main content

lambda_otel_lite/
telemetry.rs

1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//!     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//!     let resource = Resource::builder()
47//!         .with_attributes(vec![
48//!             KeyValue::new("service.version", "1.0.0"),
49//!             KeyValue::new("deployment.environment", "production"),
50//!         ])
51//!         .build();
52//!
53//!     let config = TelemetryConfig::builder()
54//!         .resource(resource)
55//!         .build();
56//!
57//!     let (_, completion_handler) = init_telemetry(config).await?;
58//!     Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//!     let config = TelemetryConfig::builder()
72//!         .with_span_processor(SimpleSpanProcessor::new(
73//!             OtlpStdoutSpanExporter::default()
74//!         ))
75//!         .enable_fmt_layer(true)
76//!         .build();
77//!
78//!     let (_, completion_handler) = init_telemetry(config).await?;
79//!     Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95    constants, extension::register_extension, logger::Logger, mode::ProcessorMode,
96    processor::LambdaSpanProcessor, propagation::LambdaXrayPropagator,
97    resource::get_lambda_resource,
98};
99use bon::Builder;
100use lambda_runtime::Error;
101use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
102use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
103use opentelemetry_aws::trace::XrayPropagator;
104use opentelemetry_sdk::{
105    propagation::TraceContextPropagator,
106    trace::{IdGenerator, SdkTracerProvider, ShouldSample, SpanProcessor, TracerProviderBuilder},
107    Resource,
108};
109use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
110use std::{borrow::Cow, env, sync::Arc};
111use tokio::sync::mpsc::UnboundedSender;
112use tracing_subscriber::layer::SubscriberExt;
113
114// Add module-specific logger
115static LOGGER: Logger = Logger::const_new("telemetry");
116
117/// Manages the lifecycle of span export based on the processing mode.
118///
119/// This handler must be used to signal when spans should be exported. Its behavior
120/// varies by processing mode:
121/// - Sync: Forces immediate export
122/// - Async: Signals the extension to export
123/// - Finalize: Defers to span processor
124///
125/// # Thread Safety
126///
127/// This type is `Clone` and can be safely shared between threads.
128#[derive(Clone)]
129pub struct TelemetryCompletionHandler {
130    provider: Arc<SdkTracerProvider>,
131    sender: Option<UnboundedSender<()>>,
132    mode: ProcessorMode,
133    tracer: opentelemetry_sdk::trace::Tracer,
134}
135
136impl TelemetryCompletionHandler {
137    pub fn new(
138        provider: Arc<SdkTracerProvider>,
139        sender: Option<UnboundedSender<()>>,
140        mode: ProcessorMode,
141    ) -> Self {
142        // Create instrumentation scope with attributes
143        let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
144            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
145            .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
146            .with_attributes(vec![
147                KeyValue::new("library.language", "rust"),
148                KeyValue::new("library.type", "instrumentation"),
149                KeyValue::new("library.runtime", "aws_lambda"),
150            ])
151            .build();
152
153        // Create tracer with instrumentation scope
154        let tracer = provider.tracer_with_scope(scope);
155
156        Self {
157            provider,
158            sender,
159            mode,
160            tracer,
161        }
162    }
163
164    /// Get the tracer instance for creating spans.
165    ///
166    /// Returns the cached tracer instance configured with this package's instrumentation scope.
167    /// The tracer is configured with the provider's settings and will automatically use
168    /// the correct span processor based on the processing mode.
169    pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
170        &self.tracer
171    }
172
173    /// Complete telemetry processing for the current invocation
174    ///
175    /// In Sync mode, this will force flush the provider and log any errors that occur.
176    /// In Async mode, this will send a completion signal to the extension.
177    /// In Finalize mode, this will do nothing (handled by drop).
178    pub fn complete(&self) {
179        match self.mode {
180            ProcessorMode::Sync => {
181                if let Err(e) = self.provider.force_flush() {
182                    LOGGER.warn(format!("Error flushing telemetry: {e:?}"));
183                }
184            }
185            ProcessorMode::Async => {
186                if let Some(sender) = &self.sender {
187                    if let Err(e) = sender.send(()) {
188                        LOGGER.warn(format!(
189                            "Failed to send completion signal to extension: {e:?}"
190                        ));
191                    }
192                }
193            }
194            ProcessorMode::Finalize => {
195                // Do nothing, handled by drop
196            }
197        }
198    }
199}
200
201/// Configuration for OpenTelemetry initialization.
202///
203/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
204/// for standard Lambda configuration, or the builder pattern for customization.
205///
206/// # Fields
207///
208/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
209/// * `set_global_provider` - Set as global tracer provider (default: true)
210/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
211/// * `env_var_name` - Environment variable name for log level configuration
212/// * `id_generator` - Custom ID generator for trace and span IDs
213/// * `processor_mode` - Span processing mode (sync/async/finalize)
214///
215/// # Examples
216///
217/// Basic usage with default configuration:
218///
219/// ```no_run
220/// use lambda_otel_lite::telemetry::TelemetryConfig;
221///
222/// let config = TelemetryConfig::default();
223/// ```
224///
225/// Custom configuration with resource attributes:
226///
227/// ```no_run
228/// use lambda_otel_lite::telemetry::TelemetryConfig;
229/// use opentelemetry::KeyValue;
230/// use opentelemetry_sdk::Resource;
231///
232/// let config = TelemetryConfig::builder()
233///     .resource(Resource::builder()
234///         .with_attributes(vec![KeyValue::new("version", "1.0.0")])
235///         .build())
236///     .build();
237/// ```
238///
239/// Custom configuration with logging options:
240///
241/// ```no_run
242/// use lambda_otel_lite::telemetry::TelemetryConfig;
243///
244/// let config = TelemetryConfig::builder()
245///     .enable_fmt_layer(true)  // Enable console output for debugging
246///     .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string())  // Custom env var for log level
247///     .build();
248/// ```
249#[derive(Builder, Debug)]
250pub struct TelemetryConfig {
251    // Custom fields for internal state
252    #[builder(field)]
253    provider_builder: TracerProviderBuilder,
254
255    #[builder(field)]
256    has_processor: bool,
257
258    #[builder(field)]
259    propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
260
261    /// Enable console output for debugging.
262    ///
263    /// When enabled, spans and events will be printed to the console in addition
264    /// to being exported through the configured span processors. This is useful
265    /// for debugging but adds overhead and should be disabled in production.
266    ///
267    /// This can also be controlled via the `LAMBDA_TRACING_ENABLE_FMT_LAYER` environment variable,
268    /// which takes precedence over this setting when present:
269    /// - Setting the env var to "true" will enable console output even if this field is false
270    /// - Setting the env var to "false" will disable console output even if this field is true
271    /// - Invalid values will log a warning and fall back to this code setting
272    ///
273    /// This environment variable override allows toggling logging for debugging without code changes.
274    ///
275    /// Default: `false`
276    #[builder(default = false)]
277    pub enable_fmt_layer: bool,
278
279    /// Set this provider as the global OpenTelemetry provider.
280    ///
281    /// When enabled, the provider will be registered as the global provider
282    /// for the OpenTelemetry API. This allows using the global tracer API
283    /// without explicitly passing around the provider.
284    ///
285    /// Default: `true`
286    #[builder(default = true)]
287    pub set_global_provider: bool,
288
289    /// Custom resource attributes for all spans.
290    ///
291    /// If not provided, resource attributes will be automatically detected
292    /// from the Lambda environment. Custom resources will override any
293    /// automatically detected attributes with the same keys.
294    ///
295    /// Default: `None` (auto-detected from Lambda environment)
296    pub resource: Option<Resource>,
297
298    /// Environment variable name to use for log level configuration.
299    ///
300    /// This field specifies which environment variable should be used to configure
301    /// the tracing subscriber's log level filter. If not specified, the system will
302    /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
303    ///
304    /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
305    pub env_var_name: Option<String>,
306
307    /// Span processing mode (sync/async/finalize)
308    ///
309    /// Controls how spans are exported from the processor. This can be overridden by the
310    /// LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE environment variable, which takes precedence.
311    ///
312    /// Default: `None` (uses environment variable or defaults to `ProcessorMode::Sync`)
313    pub processor_mode: Option<ProcessorMode>,
314}
315
316impl Default for TelemetryConfig {
317    fn default() -> Self {
318        Self::builder().build()
319    }
320}
321
322/// Builder methods for adding span processors and other configuration
323impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
324    /// Add a span processor to the tracer provider.
325    ///
326    /// This method allows adding custom span processors for trace data processing.
327    /// Multiple processors can be added by calling this method multiple times.
328    ///
329    /// # Arguments
330    ///
331    /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
332    ///
333    /// # Examples
334    ///
335    /// ```no_run
336    /// use lambda_otel_lite::TelemetryConfig;
337    /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
338    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
339    ///
340    /// // Only use builder when adding custom processors
341    /// let config = TelemetryConfig::builder()
342    ///     .with_span_processor(SimpleSpanProcessor::new(
343    ///         OtlpStdoutSpanExporter::default()
344    ///     ))
345    ///     .build();
346    /// ```
347    pub fn with_span_processor<T>(mut self, processor: T) -> Self
348    where
349        T: SpanProcessor + 'static,
350    {
351        self.provider_builder = self.provider_builder.with_span_processor(processor);
352        self.has_processor = true;
353        self
354    }
355
356    /// Add a propagator to the list of propagators.
357    ///
358    /// Multiple propagators can be added and will be combined into a composite propagator.
359    /// The default propagator is TraceContextPropagator.
360    ///
361    /// # Arguments
362    ///
363    /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
364    ///
365    /// # Examples
366    ///
367    /// ```no_run
368    /// use lambda_otel_lite::TelemetryConfig;
369    /// use opentelemetry_sdk::propagation::BaggagePropagator;
370    ///
371    /// let config = TelemetryConfig::builder()
372    ///     .with_propagator(BaggagePropagator::new())
373    ///     .build();
374    /// ```
375    pub fn with_propagator<T>(mut self, propagator: T) -> Self
376    where
377        T: TextMapPropagator + Send + Sync + 'static,
378    {
379        self.propagators.push(Box::new(propagator));
380        self
381    }
382
383    pub fn with_named_propagator(self, name: &str) -> Self {
384        match name {
385            "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
386            "xray" => self.with_propagator(XrayPropagator::new()),
387            "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
388            "none" => self.with_propagator(NoopPropagator::new()),
389            _ => {
390                LOGGER.warn(format!(
391                    "Unknown propagator: {name}, using default propagators"
392                ));
393                self
394            }
395        }
396    }
397
398    /// Add a custom ID generator to the tracer provider.
399    ///
400    /// This method allows setting a custom ID generator for trace and span IDs.
401    /// This is particularly useful when integrating with AWS X-Ray, which requires
402    /// a specific ID format.
403    ///
404    /// # Arguments
405    ///
406    /// * `id_generator` - An ID generator implementing the [`IdGenerator`] trait
407    ///
408    /// # Examples
409    ///
410    /// ```no_run
411    /// use lambda_otel_lite::TelemetryConfig;
412    /// use opentelemetry_aws::trace::XrayIdGenerator;
413    ///
414    /// // Configure with X-Ray compatible ID generator
415    /// let config = TelemetryConfig::builder()
416    ///     .with_id_generator(XrayIdGenerator::default())
417    ///     .build();
418    /// ```
419    pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
420    where
421        T: IdGenerator + 'static,
422    {
423        self.provider_builder = self.provider_builder.with_id_generator(id_generator);
424        self
425    }
426
427    /// Add a custom sampler to the tracer provider.
428    ///
429    /// This method allows setting a custom sampler for trace sampling decisions.
430    /// Common samplers include AlwaysOn, AlwaysOff, TraceIdRatioBased, and ParentBased.
431    ///
432    /// # Arguments
433    ///
434    /// * `sampler` - A sampler implementing the [`ShouldSample`] trait
435    ///
436    /// # Examples
437    ///
438    /// ```no_run
439    /// use lambda_otel_lite::TelemetryConfig;
440    /// use opentelemetry_sdk::trace::Sampler;
441    ///
442    /// // Sample all traces
443    /// let config = TelemetryConfig::builder()
444    ///     .with_sampler(Sampler::AlwaysOn)
445    ///     .build();
446    ///
447    /// // Sample 10% of traces
448    /// let config = TelemetryConfig::builder()
449    ///     .with_sampler(Sampler::TraceIdRatioBased(0.1))
450    ///     .build();
451    /// ```
452    pub fn with_sampler<T>(mut self, sampler: T) -> Self
453    where
454        T: ShouldSample + 'static,
455    {
456        self.provider_builder = self.provider_builder.with_sampler(sampler);
457        self
458    }
459}
460
461/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
462///
463/// # Arguments
464///
465/// * `config` - Configuration for telemetry initialization
466///
467/// # Returns
468///
469/// Returns a tuple containing:
470/// - A tracer instance for manual instrumentation
471/// - A completion handler for managing span export timing
472///
473/// # Errors
474///
475/// Returns error if:
476/// - Extension registration fails (async/finalize modes)
477/// - Tracer provider initialization fails
478/// - Environment variable parsing fails
479///
480/// # Examples
481///
482/// Basic usage with default configuration:
483///
484/// ```no_run
485/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
486///
487/// # async fn example() -> Result<(), lambda_runtime::Error> {
488/// // Initialize with default configuration
489/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
490/// # Ok(())
491/// # }
492/// ```
493///
494/// Custom configuration:
495///
496/// ```no_run
497/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
498/// use opentelemetry::KeyValue;
499/// use opentelemetry_sdk::Resource;
500///
501/// # async fn example() -> Result<(), lambda_runtime::Error> {
502/// // Create custom resource
503/// let resource = Resource::builder()
504///     .with_attributes(vec![
505///         KeyValue::new("service.name", "payment-api"),
506///         KeyValue::new("service.version", "1.2.3"),
507///     ])
508///     .build();
509///
510/// // Initialize with custom configuration
511/// let (_, telemetry) = init_telemetry(
512///     TelemetryConfig::builder()
513///         .resource(resource)
514///         .build()
515/// ).await?;
516/// # Ok(())
517/// # }
518/// ```
519///
520/// Advanced usage with BatchSpanProcessor (required for async exporters):
521///
522/// ```no_run
523/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
524/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
525/// use opentelemetry_sdk::trace::BatchSpanProcessor;
526/// use lambda_runtime::Error;
527///
528/// # async fn example() -> Result<(), Error> {
529/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
530///     .with_http()
531///     .with_http_client(reqwest::Client::new())
532///     .with_protocol(Protocol::HttpBinary)
533///     .build()?;
534///
535/// let (provider, completion) = init_telemetry(
536///     TelemetryConfig::builder()
537///         .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
538///         .build()
539/// ).await?;
540/// # Ok(())
541/// # }
542/// ```
543///
544/// Using LambdaSpanProcessor with blocking http client:
545///
546/// ```no_run
547/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
548/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
549/// use lambda_runtime::Error;
550///
551/// # async fn example() -> Result<(), Error> {
552/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
553///     .with_http()
554///     .with_http_client(reqwest::blocking::Client::new())
555///     .with_protocol(Protocol::HttpBinary)
556///     .build()?;
557///
558/// let (provider, completion) = init_telemetry(
559///     TelemetryConfig::builder()
560///         .with_span_processor(
561///             LambdaSpanProcessor::builder()
562///                 .exporter(lambda_exporter)
563///                 .max_queue_size(2048)
564///                 .build()
565///         )
566///         .build()
567/// ).await?;
568/// # Ok(())
569/// # }
570/// ```
571///
572pub async fn init_telemetry(
573    mut config: TelemetryConfig,
574) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
575    // Get mode from config or environment with environment taking precedence
576    let mode = ProcessorMode::resolve(config.processor_mode);
577
578    if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
579        let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
580
581        for propagator in propagators {
582            match propagator {
583                "tracecontext" => config
584                    .propagators
585                    .push(Box::new(TraceContextPropagator::new())),
586                "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
587                "xray-lambda" => config
588                    .propagators
589                    .push(Box::new(LambdaXrayPropagator::new())),
590                "none" => config.propagators.push(Box::new(NoopPropagator::new())),
591                _ => LOGGER.warn(format!(
592                    "Unknown propagator: {propagator}, using default propagators"
593                )),
594            }
595        }
596    } else {
597        // if no propagators are set, use the default propagators
598        if config.propagators.is_empty() {
599            // IMPORTANT:
600            // LambdaXrayPropagator is added *before* TraceContextPropagator
601            // because in OpenTelemetry Rust, the *last* propagator that extracts
602            // a valid context wins during extraction.
603            // This ensures that if both an AWS X-Ray header (or _X_AMZN_TRACE_ID)
604            // and a W3C traceparent header are present, the W3C traceparent takes precedence.
605            config
606                .propagators
607                .push(Box::new(LambdaXrayPropagator::new()));
608            config
609                .propagators
610                .push(Box::new(TraceContextPropagator::new()));
611        }
612    }
613
614    let composite_propagator = TextMapCompositePropagator::new(config.propagators);
615    global::set_text_map_propagator(composite_propagator);
616
617    // Add default span processor if none was added
618    if !config.has_processor {
619        let processor = LambdaSpanProcessor::builder()
620            .exporter(OtlpStdoutSpanExporter::default())
621            .build();
622        config.provider_builder = config.provider_builder.with_span_processor(processor);
623    }
624
625    // Apply defaults and build the provider
626    let resource = config.resource.unwrap_or_else(get_lambda_resource);
627
628    let provider = Arc::new(config.provider_builder.with_resource(resource).build());
629
630    // Register the extension if in async or finalize mode
631    let sender = match mode {
632        ProcessorMode::Async | ProcessorMode::Finalize => {
633            Some(register_extension(provider.clone(), mode.clone()).await?)
634        }
635        _ => None,
636    };
637
638    if config.set_global_provider {
639        // Set the provider as global
640        set_tracer_provider(provider.as_ref().clone());
641    }
642
643    // Initialize tracing subscriber with smart env var selection
644    let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
645        if env::var("RUST_LOG").is_ok() {
646            "RUST_LOG"
647        } else {
648            "AWS_LAMBDA_LOG_LEVEL"
649        }
650    });
651
652    let env_filter = tracing_subscriber::EnvFilter::builder()
653        .with_env_var(env_var_name)
654        .from_env_lossy();
655
656    let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
657    let tracer = completion_handler.get_tracer().clone();
658
659    let subscriber = tracing_subscriber::registry::Registry::default()
660        .with(tracing_opentelemetry::OpenTelemetryLayer::new(
661            tracer.clone(),
662        ))
663        .with(env_filter);
664
665    // Determine if fmt layer should be enabled - environment variable takes precedence when set
666    let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
667        match env_value.to_lowercase().as_str() {
668            "true" => true,
669            "false" => false,
670            other => {
671                LOGGER.warn(format!(
672                    "Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
673                    other,
674                    constants::env_vars::ENABLE_FMT_LAYER
675                ));
676                config.enable_fmt_layer
677            }
678        }
679    } else {
680        // If env var not set, use the configured value
681        config.enable_fmt_layer
682    };
683
684    // Enable fmt layer based on the determined value
685    if enable_fmt {
686        // Determine if the lambda logging configuration is set to output json logs
687        let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
688            .unwrap_or_default()
689            .to_uppercase()
690            == "JSON";
691
692        if is_json {
693            tracing::subscriber::set_global_default(
694                subscriber.with(
695                    tracing_subscriber::fmt::layer()
696                        .with_target(false)
697                        .without_time()
698                        .json(),
699                ),
700            )?;
701        } else {
702            tracing::subscriber::set_global_default(
703                subscriber.with(
704                    tracing_subscriber::fmt::layer()
705                        .with_target(false)
706                        .without_time()
707                        .with_ansi(false),
708                ),
709            )?;
710        }
711    } else {
712        tracing::subscriber::set_global_default(subscriber)?;
713    }
714
715    Ok((tracer, completion_handler))
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use opentelemetry::trace::{Span, Tracer};
722    use opentelemetry_aws::trace::XrayIdGenerator;
723    use opentelemetry_sdk::trace::{Sampler, SimpleSpanProcessor};
724    use sealed_test::prelude::*;
725    use std::sync::Arc;
726    use tokio::sync::mpsc;
727
728    // Helper to clean up environment variables between tests
729    fn cleanup_env() {
730        env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
731        env::remove_var(constants::env_vars::PROPAGATORS);
732        env::remove_var(constants::env_vars::PROCESSOR_MODE);
733        env::remove_var("_X_AMZN_TRACE_ID");
734        env::remove_var("AWS_LAMBDA_RUNTIME_API");
735    }
736
737    #[test]
738    #[sealed_test]
739    fn test_telemetry_config_defaults() {
740        cleanup_env();
741
742        let config = TelemetryConfig::builder().build();
743        assert!(config.set_global_provider); // Should be true by default
744        assert!(!config.has_processor);
745        assert!(!config.enable_fmt_layer);
746        assert!(config.propagators.is_empty()); // No propagators by default in builder
747    }
748
749    #[test]
750    #[sealed_test]
751    fn test_telemetry_config_with_propagators() {
752        cleanup_env();
753
754        // Test with explicit tracecontext propagator
755        let config = TelemetryConfig::builder()
756            .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
757            .with_named_propagator("tracecontext")
758            .build();
759        assert_eq!(config.propagators.len(), 1);
760
761        // Test with explicit xray propagator
762        let config = TelemetryConfig::builder()
763            .with_named_propagator("xray")
764            .build();
765        assert_eq!(config.propagators.len(), 1);
766
767        // Test with both propagators
768        let config = TelemetryConfig::builder()
769            .with_named_propagator("tracecontext")
770            .with_named_propagator("xray")
771            .build();
772        assert_eq!(config.propagators.len(), 2);
773
774        // Test with default propagators (empty - will be set in init_telemetry)
775        let config = TelemetryConfig::builder().build();
776        assert_eq!(config.propagators.len(), 0);
777
778        // Test with none
779        let config = TelemetryConfig::builder()
780            .with_named_propagator("none")
781            .build();
782        assert_eq!(config.propagators.len(), 1);
783    }
784
785    #[tokio::test]
786    #[sealed_test]
787    async fn test_telemetry_config_env_propagators_tracecontext() {
788        cleanup_env();
789
790        // Test with OTEL_PROPAGATORS=tracecontext
791        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
792        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
793        // In real usage we'd check the behavior rather than implementation details
794        // So we'll just check that we can create and use a handler
795        assert!(handler.sender.is_none());
796
797        cleanup_env();
798    }
799
800    #[tokio::test]
801    #[sealed_test]
802    async fn test_telemetry_config_env_propagators_xray() {
803        cleanup_env();
804
805        // Test with OTEL_PROPAGATORS=xray
806        env::set_var(constants::env_vars::PROPAGATORS, "xray");
807        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
808        assert!(handler.sender.is_none());
809
810        cleanup_env();
811    }
812
813    #[tokio::test]
814    #[sealed_test]
815    async fn test_telemetry_config_env_propagators_combined() {
816        cleanup_env();
817
818        // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
819        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
820        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
821        assert!(handler.sender.is_none());
822
823        cleanup_env();
824    }
825
826    #[tokio::test]
827    #[sealed_test]
828    async fn test_telemetry_config_env_propagators_none() {
829        cleanup_env();
830
831        // Test with OTEL_PROPAGATORS=none
832        env::set_var(constants::env_vars::PROPAGATORS, "none");
833        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
834        assert!(handler.sender.is_none());
835
836        cleanup_env();
837    }
838
839    #[tokio::test]
840    #[sealed_test]
841    async fn test_init_telemetry_defaults() {
842        let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
843        assert!(completion_handler.sender.is_none()); // Default mode is Sync
844    }
845
846    #[tokio::test]
847    #[sealed_test]
848    async fn test_init_telemetry_custom() {
849        let resource = Resource::builder().build();
850        let config = TelemetryConfig::builder()
851            .resource(resource)
852            .with_named_propagator("tracecontext")
853            .enable_fmt_layer(true)
854            .set_global_provider(false)
855            .build();
856
857        let (_, completion_handler) = init_telemetry(config).await.unwrap();
858        assert!(completion_handler.sender.is_none());
859    }
860
861    #[tokio::test]
862    #[sealed_test]
863    async fn test_telemetry_config_env_fmt_layer_true_override() {
864        cleanup_env();
865
866        // Test: Env var "true" overrides code setting "false"
867        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
868        let config = TelemetryConfig::default(); // code setting is false by default
869        assert!(!config.enable_fmt_layer); // Config should not be affected by env var
870
871        // Initialize telemetry - env var should override config
872        let result = init_telemetry(config).await;
873        assert!(result.is_ok());
874
875        // Clean up
876        cleanup_env();
877    }
878
879    #[tokio::test]
880    #[sealed_test]
881    async fn test_telemetry_config_env_fmt_layer_false_override() {
882        cleanup_env();
883
884        // Test: Env var "false" overrides code setting "true"
885        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
886        let config = TelemetryConfig::builder()
887            .enable_fmt_layer(true) // code setting is true
888            .build();
889        assert!(config.enable_fmt_layer);
890
891        // Initialize telemetry - env var should override config
892        let result = init_telemetry(config).await;
893        assert!(result.is_ok());
894
895        // Clean up
896        cleanup_env();
897    }
898
899    #[tokio::test]
900    #[sealed_test]
901    async fn test_telemetry_config_env_fmt_layer_invalid() {
902        cleanup_env();
903
904        // Test: Invalid env var falls back to code setting
905        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
906        let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
907
908        // Initialize telemetry - should log a warning but use code setting
909        let result = init_telemetry(config).await;
910        assert!(result.is_ok());
911
912        // Clean up
913        cleanup_env();
914    }
915
916    #[tokio::test]
917    #[sealed_test]
918    async fn test_telemetry_config_env_fmt_layer_not_set() {
919        cleanup_env();
920
921        // Test: No env var uses code setting
922        let config = TelemetryConfig::default();
923        assert!(!config.enable_fmt_layer);
924
925        let result = init_telemetry(config).await;
926        assert!(result.is_ok());
927
928        // Clean up
929        cleanup_env();
930    }
931
932    #[test]
933    fn test_completion_handler_sync_mode() {
934        let provider = Arc::new(
935            SdkTracerProvider::builder()
936                .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
937                .build(),
938        );
939
940        let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
941
942        // In sync mode, complete() should call force_flush
943        handler.complete();
944        // Note: We can't easily verify the flush was called since TracerProvider
945        // doesn't expose this information, but we can verify it doesn't panic
946    }
947
948    #[tokio::test]
949    async fn test_completion_handler_async_mode() {
950        let provider = Arc::new(
951            SdkTracerProvider::builder()
952                .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
953                .build(),
954        );
955
956        let (tx, mut rx) = mpsc::unbounded_channel();
957
958        let completion_handler =
959            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
960
961        // In async mode, complete() should send a message through the channel
962        completion_handler.complete();
963
964        // Verify that we received the completion signal
965        assert!(rx.try_recv().is_ok());
966        // Verify channel is now empty
967        assert!(rx.try_recv().is_err());
968    }
969
970    #[test]
971    fn test_completion_handler_finalize_mode() {
972        let provider = Arc::new(
973            SdkTracerProvider::builder()
974                .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
975                .build(),
976        );
977
978        let (tx, _rx) = mpsc::unbounded_channel();
979
980        let completion_handler =
981            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
982
983        // In finalize mode, complete() should do nothing
984        completion_handler.complete();
985        // Verify it doesn't panic or cause issues
986    }
987
988    #[test]
989    fn test_completion_handler_clone() {
990        let provider = Arc::new(
991            SdkTracerProvider::builder()
992                .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
993                .build(),
994        );
995
996        let (tx, _rx) = mpsc::unbounded_channel();
997
998        let completion_handler =
999            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
1000
1001        // Test that Clone is implemented correctly
1002        let cloned = completion_handler.clone();
1003
1004        // Verify both handlers have the same mode
1005        assert!(matches!(cloned.mode, ProcessorMode::Async));
1006        assert!(cloned.sender.is_some());
1007    }
1008
1009    #[test]
1010    fn test_completion_handler_sync_mode_error_handling() {
1011        let provider = Arc::new(
1012            SdkTracerProvider::builder()
1013                .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
1014                .build(),
1015        );
1016
1017        let completion_handler =
1018            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
1019
1020        // Test that complete() doesn't panic
1021        completion_handler.complete();
1022    }
1023
1024    #[tokio::test]
1025    async fn test_completion_handler_async_mode_error_handling() {
1026        let provider = Arc::new(
1027            SdkTracerProvider::builder()
1028                .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
1029                .build(),
1030        );
1031
1032        // Use UnboundedSender instead of Sender
1033        let (tx, _rx) = mpsc::unbounded_channel();
1034        // Fill the channel by dropping the receiver
1035        drop(_rx);
1036
1037        let completion_handler =
1038            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
1039
1040        // Test that complete() doesn't panic when receiver is dropped
1041        completion_handler.complete();
1042    }
1043
1044    #[test]
1045    #[sealed_test]
1046    fn test_telemetry_config_with_id_generator() {
1047        cleanup_env();
1048
1049        // Create a config with X-Ray ID generator
1050        let config = TelemetryConfig::builder()
1051            .with_id_generator(XrayIdGenerator::default())
1052            .build();
1053
1054        // We can't directly check the ID generator type since it's boxed inside the provider,
1055        // but we can verify it's applied by checking the generated trace IDs format
1056        let provider = Arc::new(config.provider_builder.build());
1057
1058        // Create a scope with attributes
1059        let scope = opentelemetry::InstrumentationScope::builder("test")
1060            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
1061            .build();
1062
1063        // Get a tracer using the correct API
1064        let tracer = provider.tracer_with_scope(scope);
1065
1066        // Start a span using the tracer
1067        let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
1068        let trace_id = span.span_context().trace_id();
1069
1070        // Verify X-Ray trace ID format:
1071        // 1. Convert to hex string for easier checking
1072        let trace_id_hex = format!("{trace_id:032x}");
1073
1074        // 2. The first 8 characters of X-Ray trace IDs represent a timestamp in seconds
1075        // This is the key characteristic of X-Ray trace IDs that we can verify
1076        let timestamp_part = &trace_id_hex[0..8];
1077
1078        // 3. Parse the hex timestamp to ensure it's a valid timestamp (recent past)
1079        let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
1080
1081        // 4. Check that timestamp is reasonable (within the last day)
1082        let now = std::time::SystemTime::now()
1083            .duration_since(std::time::UNIX_EPOCH)
1084            .unwrap()
1085            .as_secs() as u32;
1086
1087        // The timestamp should be within the last day
1088        assert!(timestamp <= now);
1089        assert!(timestamp > now - 86400); // Within the last day
1090
1091        // Verify remaining 24 characters are not all zeros (random part)
1092        let random_part = &trace_id_hex[8..];
1093        assert_ne!(random_part, "000000000000000000000000");
1094    }
1095
1096    #[test]
1097    #[sealed_test]
1098    fn test_telemetry_config_with_sampler() {
1099        cleanup_env();
1100
1101        // Test with AlwaysOn sampler
1102        let _provider = TelemetryConfig::builder()
1103            .with_sampler(Sampler::AlwaysOn)
1104            .build()
1105            .provider_builder
1106            .build();
1107
1108        // Test with TraceIdRatioBased sampler
1109        let _provider = TelemetryConfig::builder()
1110            .with_sampler(Sampler::TraceIdRatioBased(0.1))
1111            .build()
1112            .provider_builder
1113            .build();
1114
1115        // Test with ParentBased sampler
1116        let config = TelemetryConfig::builder()
1117            .with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)))
1118            .build();
1119
1120        // Verify configurations don't panic
1121        let _provider = config.provider_builder.build();
1122    }
1123
1124    #[test]
1125    #[sealed_test]
1126    fn test_telemetry_config_with_standard_samplers() {
1127        cleanup_env();
1128
1129        // Test with standard SDK samplers using with_sampler
1130        let _provider = TelemetryConfig::builder()
1131            .with_sampler(Sampler::AlwaysOn)
1132            .build()
1133            .provider_builder
1134            .build();
1135
1136        let _provider = TelemetryConfig::builder()
1137            .with_sampler(Sampler::AlwaysOff)
1138            .build()
1139            .provider_builder
1140            .build();
1141
1142        let _provider = TelemetryConfig::builder()
1143            .with_sampler(Sampler::TraceIdRatioBased(0.5))
1144            .build()
1145            .provider_builder
1146            .build();
1147
1148        let config = TelemetryConfig::builder()
1149            .with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)))
1150            .build();
1151
1152        // Verify configurations don't panic
1153        let _provider = config.provider_builder.build();
1154    }
1155
1156    #[tokio::test]
1157    #[sealed_test]
1158    async fn test_telemetry_config_env_sampler() {
1159        cleanup_env();
1160
1161        // Test that SDK environment variable sampler configuration works
1162        // The SDK automatically handles OTEL_TRACES_SAMPLER and OTEL_TRACES_SAMPLER_ARG
1163        env::set_var("OTEL_TRACES_SAMPLER", "always_on");
1164        let config = TelemetryConfig::builder()
1165            .set_global_provider(false) // Don't set global to avoid conflicts
1166            .build();
1167        let (_, _) = init_telemetry(config).await.unwrap();
1168
1169        // Clean up
1170        cleanup_env();
1171    }
1172}
1173
1174// A simple no-op propagator
1175#[derive(Debug)]
1176struct NoopPropagator;
1177
1178impl NoopPropagator {
1179    fn new() -> Self {
1180        NoopPropagator
1181    }
1182}
1183
1184impl TextMapPropagator for NoopPropagator {
1185    fn inject_context(
1186        &self,
1187        _cx: &opentelemetry::Context,
1188        _injector: &mut dyn opentelemetry::propagation::Injector,
1189    ) {
1190    }
1191
1192    fn extract_with_context(
1193        &self,
1194        cx: &opentelemetry::Context,
1195        _extractor: &dyn opentelemetry::propagation::Extractor,
1196    ) -> opentelemetry::Context {
1197        cx.clone()
1198    }
1199
1200    fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
1201        opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
1202    }
1203}