lambda_otel_lite/
telemetry.rs

1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//!     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//!     let resource = Resource::builder()
47//!         .with_attributes(vec![
48//!             KeyValue::new("service.version", "1.0.0"),
49//!             KeyValue::new("deployment.environment", "production"),
50//!         ])
51//!         .build();
52//!
53//!     let config = TelemetryConfig::builder()
54//!         .resource(resource)
55//!         .build();
56//!
57//!     let (_, completion_handler) = init_telemetry(config).await?;
58//!     Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//!     let config = TelemetryConfig::builder()
72//!         .with_span_processor(SimpleSpanProcessor::new(
73//!             Box::new(OtlpStdoutSpanExporter::default())
74//!         ))
75//!         .enable_fmt_layer(true)
76//!         .build();
77//!
78//!     let (_, completion_handler) = init_telemetry(config).await?;
79//!     Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::constants::defaults;
95use crate::constants::env_vars;
96use crate::{
97    constants, extension::register_extension, mode::ProcessorMode, processor::LambdaSpanProcessor,
98    propagation::LambdaXrayPropagator, resource::get_lambda_resource,
99};
100use bon::Builder;
101use lambda_runtime::Error;
102use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
103use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
104use opentelemetry_aws::trace::XrayPropagator;
105use opentelemetry_sdk::{
106    propagation::TraceContextPropagator,
107    trace::{SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
108    Resource,
109};
110use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
111use std::{borrow::Cow, env, sync::Arc};
112use tokio::sync::mpsc::UnboundedSender;
113use tracing_subscriber::layer::SubscriberExt;
114
115/// Manages the lifecycle of span export based on the processing mode.
116///
117/// This handler must be used to signal when spans should be exported. Its behavior
118/// varies by processing mode:
119/// - Sync: Forces immediate export
120/// - Async: Signals the extension to export
121/// - Finalize: Defers to span processor
122///
123/// # Thread Safety
124///
125/// This type is `Clone` and can be safely shared between threads.
126#[derive(Clone)]
127pub struct TelemetryCompletionHandler {
128    provider: Arc<SdkTracerProvider>,
129    sender: Option<UnboundedSender<()>>,
130    mode: ProcessorMode,
131    tracer: opentelemetry_sdk::trace::Tracer,
132}
133
134impl TelemetryCompletionHandler {
135    pub fn new(
136        provider: Arc<SdkTracerProvider>,
137        sender: Option<UnboundedSender<()>>,
138        mode: ProcessorMode,
139    ) -> Self {
140        // Create instrumentation scope with attributes
141        let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
142            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
143            .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
144            .with_attributes(vec![
145                KeyValue::new("library.language", "rust"),
146                KeyValue::new("library.type", "instrumentation"),
147                KeyValue::new("library.runtime", "aws_lambda"),
148            ])
149            .build();
150
151        // Create tracer with instrumentation scope
152        let tracer = provider.tracer_with_scope(scope);
153
154        Self {
155            provider,
156            sender,
157            mode,
158            tracer,
159        }
160    }
161
162    /// Get the tracer instance for creating spans.
163    ///
164    /// Returns the cached tracer instance configured with this package's instrumentation scope.
165    /// The tracer is configured with the provider's settings and will automatically use
166    /// the correct span processor based on the processing mode.
167    pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
168        &self.tracer
169    }
170
171    /// Complete telemetry processing for the current invocation
172    ///
173    /// In Sync mode, this will force flush the provider and log any errors that occur.
174    /// In Async mode, this will send a completion signal to the extension.
175    /// In Finalize mode, this will do nothing (handled by drop).
176    pub fn complete(&self) {
177        match self.mode {
178            ProcessorMode::Sync => {
179                if let Err(e) = self.provider.force_flush() {
180                    tracing::warn!(error = ?e, "Error flushing telemetry");
181                }
182            }
183            ProcessorMode::Async => {
184                if let Some(sender) = &self.sender {
185                    if let Err(e) = sender.send(()) {
186                        tracing::warn!(error = ?e, "Failed to send completion signal to extension");
187                    }
188                }
189            }
190            ProcessorMode::Finalize => {
191                // Do nothing, handled by drop
192            }
193        }
194    }
195}
196
197/// Configuration for OpenTelemetry initialization.
198///
199/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
200/// for standard Lambda configuration, or the builder pattern for customization.
201///
202/// # Fields
203///
204/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
205/// * `set_global_provider` - Set as global tracer provider (default: true)
206/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
207/// * `env_var_name` - Environment variable name for log level configuration
208///
209/// # Examples
210///
211/// Basic usage with default configuration:
212///
213/// ```no_run
214/// use lambda_otel_lite::telemetry::TelemetryConfig;
215///
216/// let config = TelemetryConfig::default();
217/// ```
218///
219/// Custom configuration with resource attributes:
220///
221/// ```no_run
222/// use lambda_otel_lite::telemetry::TelemetryConfig;
223/// use opentelemetry::KeyValue;
224/// use opentelemetry_sdk::Resource;
225///
226/// let config = TelemetryConfig::builder()
227///     .resource(Resource::builder()
228///         .with_attributes(vec![KeyValue::new("version", "1.0.0")])
229///         .build())
230///     .build();
231/// ```
232///
233/// Custom configuration with logging options:
234///
235/// ```no_run
236/// use lambda_otel_lite::telemetry::TelemetryConfig;
237///
238/// let config = TelemetryConfig::builder()
239///     .enable_fmt_layer(true)  // Enable console output for debugging
240///     .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string())  // Custom env var for log level
241///     .build();
242/// ```
243#[derive(Builder, Debug)]
244pub struct TelemetryConfig {
245    // Custom fields for internal state
246    #[builder(field)]
247    provider_builder: TracerProviderBuilder,
248
249    #[builder(field)]
250    has_processor: bool,
251
252    #[builder(field)]
253    propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
254
255    /// Enable console output for debugging.
256    ///
257    /// When enabled, spans and events will be printed to the console in addition
258    /// to being exported through the configured span processors. This is useful
259    /// for debugging but adds overhead and should be disabled in production.
260    ///
261    /// This can also be controlled via the `LAMBDA_OTEL_ENABLE_FMT_LAYER` environment variable.
262    /// Setting this to "true" will enable console output even if this field is false in the code.
263    /// This allows toggling logging for debugging without code changes.
264    ///
265    /// Default: `false`
266    #[builder(default = false)]
267    pub enable_fmt_layer: bool,
268
269    /// Set this provider as the global OpenTelemetry provider.
270    ///
271    /// When enabled, the provider will be registered as the global provider
272    /// for the OpenTelemetry API. This allows using the global tracer API
273    /// without explicitly passing around the provider.
274    ///
275    /// Default: `true`
276    #[builder(default = true)]
277    pub set_global_provider: bool,
278
279    /// Custom resource attributes for all spans.
280    ///
281    /// If not provided, resource attributes will be automatically detected
282    /// from the Lambda environment. Custom resources will override any
283    /// automatically detected attributes with the same keys.
284    ///
285    /// Default: `None` (auto-detected from Lambda environment)
286    pub resource: Option<Resource>,
287
288    /// Environment variable name to use for log level configuration.
289    ///
290    /// This field specifies which environment variable should be used to configure
291    /// the tracing subscriber's log level filter. If not specified, the system will
292    /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
293    ///
294    /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
295    pub env_var_name: Option<String>,
296}
297
298impl Default for TelemetryConfig {
299    fn default() -> Self {
300        Self::builder().build()
301    }
302}
303
304/// Builder methods for adding span processors and other configuration
305impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
306    /// Add a span processor to the tracer provider.
307    ///
308    /// This method allows adding custom span processors for trace data processing.
309    /// Multiple processors can be added by calling this method multiple times.
310    ///
311    /// # Arguments
312    ///
313    /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
314    ///
315    /// # Examples
316    ///
317    /// ```no_run
318    /// use lambda_otel_lite::TelemetryConfig;
319    /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
320    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
321    ///
322    /// // Only use builder when adding custom processors
323    /// let config = TelemetryConfig::builder()
324    ///     .with_span_processor(SimpleSpanProcessor::new(
325    ///         Box::new(OtlpStdoutSpanExporter::default())
326    ///     ))
327    ///     .build();
328    /// ```
329    pub fn with_span_processor<T>(mut self, processor: T) -> Self
330    where
331        T: SpanProcessor + 'static,
332    {
333        self.provider_builder = self.provider_builder.with_span_processor(processor);
334        self.has_processor = true;
335        self
336    }
337
338    /// Add a propagator to the list of propagators.
339    ///
340    /// Multiple propagators can be added and will be combined into a composite propagator.
341    /// The default propagator is TraceContextPropagator.
342    ///
343    /// # Arguments
344    ///
345    /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
346    ///
347    /// # Examples
348    ///
349    /// ```no_run
350    /// use lambda_otel_lite::TelemetryConfig;
351    /// use opentelemetry_sdk::propagation::BaggagePropagator;
352    ///
353    /// let config = TelemetryConfig::builder()
354    ///     .with_propagator(BaggagePropagator::new())
355    ///     .build();
356    /// ```
357    pub fn with_propagator<T>(mut self, propagator: T) -> Self
358    where
359        T: TextMapPropagator + Send + Sync + 'static,
360    {
361        self.propagators.push(Box::new(propagator));
362        self
363    }
364
365    pub fn with_named_propagator(self, name: &str) -> Self {
366        match name {
367            "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
368            "xray" => self.with_propagator(XrayPropagator::new()),
369            "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
370            "none" => self.with_propagator(NoopPropagator::new()),
371            _ => {
372                tracing::warn!("Unknown propagator: {}, using default propagators", name);
373                self
374            }
375        }
376    }
377}
378
379/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
380///
381/// # Arguments
382///
383/// * `config` - Configuration for telemetry initialization
384///
385/// # Returns
386///
387/// Returns a tuple containing:
388/// - A tracer instance for manual instrumentation
389/// - A completion handler for managing span export timing
390///
391/// # Errors
392///
393/// Returns error if:
394/// - Extension registration fails (async/finalize modes)
395/// - Tracer provider initialization fails
396/// - Environment variable parsing fails
397///
398/// # Examples
399///
400/// Basic usage with default configuration:
401///
402/// ```no_run
403/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
404///
405/// # async fn example() -> Result<(), lambda_runtime::Error> {
406/// // Initialize with default configuration
407/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
408/// # Ok(())
409/// # }
410/// ```
411///
412/// Custom configuration:
413///
414/// ```no_run
415/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
416/// use opentelemetry::KeyValue;
417/// use opentelemetry_sdk::Resource;
418///
419/// # async fn example() -> Result<(), lambda_runtime::Error> {
420/// // Create custom resource
421/// let resource = Resource::builder()
422///     .with_attributes(vec![
423///         KeyValue::new("service.name", "payment-api"),
424///         KeyValue::new("service.version", "1.2.3"),
425///     ])
426///     .build();
427///
428/// // Initialize with custom configuration
429/// let (_, telemetry) = init_telemetry(
430///     TelemetryConfig::builder()
431///         .resource(resource)
432///         .build()
433/// ).await?;
434/// # Ok(())
435/// # }
436/// ```
437///
438/// Advanced usage with BatchSpanProcessor (required for async exporters):
439///
440/// ```no_run
441/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
442/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
443/// use opentelemetry_sdk::trace::BatchSpanProcessor;
444/// use lambda_runtime::Error;
445///
446/// # async fn example() -> Result<(), Error> {
447/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
448///     .with_http()
449///     .with_http_client(reqwest::Client::new())
450///     .with_protocol(Protocol::HttpBinary)
451///     .build()?;
452///
453/// let (provider, completion) = init_telemetry(
454///     TelemetryConfig::builder()
455///         .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
456///         .build()
457/// ).await?;
458/// # Ok(())
459/// # }
460/// ```
461///
462/// Using LambdaSpanProcessor with blocking http client:
463///
464/// ```no_run
465/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
466/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
467/// use lambda_runtime::Error;
468///
469/// # async fn example() -> Result<(), Error> {
470/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
471///     .with_http()
472///     .with_http_client(reqwest::blocking::Client::new())
473///     .with_protocol(Protocol::HttpBinary)
474///     .build()?;
475///
476/// let (provider, completion) = init_telemetry(
477///     TelemetryConfig::builder()
478///         .with_span_processor(
479///             LambdaSpanProcessor::builder()
480///                 .exporter(lambda_exporter)
481///                 .max_batch_size(512)
482///                 .max_queue_size(2048)
483///                 .build()
484///         )
485///         .build()
486/// ).await?;
487/// # Ok(())
488/// # }
489/// ```
490///
491pub async fn init_telemetry(
492    mut config: TelemetryConfig,
493) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
494    let mode = ProcessorMode::from_env();
495
496    if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
497        let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
498
499        for propagator in propagators {
500            match propagator {
501                "tracecontext" => config
502                    .propagators
503                    .push(Box::new(TraceContextPropagator::new())),
504                "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
505                "xray-lambda" => config
506                    .propagators
507                    .push(Box::new(LambdaXrayPropagator::new())),
508                "none" => config.propagators.push(Box::new(NoopPropagator::new())),
509                _ => tracing::warn!(
510                    "Unknown propagator: {}, using default propagators",
511                    propagator
512                ),
513            }
514        }
515    } else {
516        // if no propagators are set, use the default propagators
517        if config.propagators.is_empty() {
518            config
519                .propagators
520                .push(Box::new(TraceContextPropagator::new()));
521            config
522                .propagators
523                .push(Box::new(LambdaXrayPropagator::new()));
524        }
525    }
526
527    let composite_propagator = TextMapCompositePropagator::new(config.propagators);
528    global::set_text_map_propagator(composite_propagator);
529
530    // Add default span processor if none was added
531    if !config.has_processor {
532        let processor = LambdaSpanProcessor::builder()
533            .exporter(OtlpStdoutSpanExporter::default())
534            .build();
535        config.provider_builder = config.provider_builder.with_span_processor(processor);
536    }
537
538    // Apply defaults and build the provider
539    let resource = config.resource.unwrap_or_else(get_lambda_resource);
540    let provider = Arc::new(config.provider_builder.with_resource(resource).build());
541
542    // Register the extension if in async or finalize mode
543    let sender = match mode {
544        ProcessorMode::Async | ProcessorMode::Finalize => {
545            Some(register_extension(provider.clone(), mode.clone()).await?)
546        }
547        _ => None,
548    };
549
550    if config.set_global_provider {
551        // Set the provider as global
552        set_tracer_provider(provider.as_ref().clone());
553    }
554
555    // Initialize tracing subscriber with smart env var selection
556    let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
557        if env::var("RUST_LOG").is_ok() {
558            "RUST_LOG"
559        } else {
560            "AWS_LAMBDA_LOG_LEVEL"
561        }
562    });
563
564    let env_filter = tracing_subscriber::EnvFilter::builder()
565        .with_env_var(env_var_name)
566        .from_env_lossy();
567
568    let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
569    let tracer = completion_handler.get_tracer().clone();
570
571    let subscriber = tracing_subscriber::registry::Registry::default()
572        .with(tracing_opentelemetry::OpenTelemetryLayer::new(
573            tracer.clone(),
574        ))
575        .with(env_filter);
576
577    // Check if fmt layer should be enabled via environment variable
578    let enable_fmt = env::var(env_vars::ENABLE_FMT_LAYER)
579        .map(|v| v.to_lowercase() == "true")
580        .unwrap_or(defaults::ENABLE_FMT_LAYER);
581
582    // Enable fmt layer if either configured in code or enabled via environment
583    // This allows toggling logging via LAMBDA_TRACING_ENABLE_FMT_LAYER=true without code changes
584    // Log level is still controlled by AWS_LAMBDA_LOG_LEVEL or RUST_LOG as usual
585    if config.enable_fmt_layer || enable_fmt {
586        // Determine if the lambda logging configuration is set to output json logs
587        let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
588            .unwrap_or_default()
589            .to_uppercase()
590            == "JSON";
591
592        if is_json {
593            tracing::subscriber::set_global_default(
594                subscriber.with(
595                    tracing_subscriber::fmt::layer()
596                        .with_target(false)
597                        .without_time()
598                        .json(),
599                ),
600            )?;
601        } else {
602            tracing::subscriber::set_global_default(
603                subscriber.with(
604                    tracing_subscriber::fmt::layer()
605                        .with_target(false)
606                        .without_time()
607                        .with_ansi(false),
608                ),
609            )?;
610        }
611    } else {
612        tracing::subscriber::set_global_default(subscriber)?;
613    }
614
615    Ok((tracer, completion_handler))
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use opentelemetry_sdk::trace::SimpleSpanProcessor;
622    use sealed_test::prelude::*;
623    use tokio::sync::mpsc;
624
625    // Helper to clean up environment variables between tests
626    fn cleanup_env() {
627        env::remove_var("LAMBDA_TRACING_ENABLE_FMT_LAYER");
628        env::remove_var(constants::env_vars::PROPAGATORS);
629        env::remove_var("_X_AMZN_TRACE_ID");
630    }
631
632    #[test]
633    #[sealed_test]
634    fn test_telemetry_config_defaults() {
635        cleanup_env();
636
637        let config = TelemetryConfig::builder().build();
638        assert!(config.set_global_provider); // Should be true by default
639        assert!(!config.has_processor);
640        assert!(!config.enable_fmt_layer);
641        assert!(config.propagators.is_empty()); // No propagators by default in builder
642    }
643
644    #[test]
645    #[sealed_test]
646    fn test_telemetry_config_with_propagators() {
647        cleanup_env();
648
649        // Test with explicit tracecontext propagator
650        let config = TelemetryConfig::builder()
651            .with_span_processor(SimpleSpanProcessor::new(Box::new(
652                OtlpStdoutSpanExporter::default(),
653            )))
654            .with_named_propagator("tracecontext")
655            .build();
656        assert_eq!(config.propagators.len(), 1);
657
658        // Test with explicit xray propagator
659        let config = TelemetryConfig::builder()
660            .with_named_propagator("xray")
661            .build();
662        assert_eq!(config.propagators.len(), 1);
663
664        // Test with both propagators
665        let config = TelemetryConfig::builder()
666            .with_named_propagator("tracecontext")
667            .with_named_propagator("xray")
668            .build();
669        assert_eq!(config.propagators.len(), 2);
670
671        // Test with default propagators (empty - will be set in init_telemetry)
672        let config = TelemetryConfig::builder().build();
673        assert_eq!(config.propagators.len(), 0);
674
675        // Test with none
676        let config = TelemetryConfig::builder()
677            .with_named_propagator("none")
678            .build();
679        assert_eq!(config.propagators.len(), 1);
680    }
681
682    #[tokio::test]
683    #[sealed_test]
684    async fn test_telemetry_config_env_propagators_tracecontext() {
685        cleanup_env();
686
687        // Test with OTEL_PROPAGATORS=tracecontext
688        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
689        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
690        // In real usage we'd check the behavior rather than implementation details
691        // So we'll just check that we can create and use a handler
692        assert!(handler.sender.is_none());
693
694        cleanup_env();
695    }
696
697    #[tokio::test]
698    #[sealed_test]
699    async fn test_telemetry_config_env_propagators_xray() {
700        cleanup_env();
701
702        // Test with OTEL_PROPAGATORS=xray
703        env::set_var(constants::env_vars::PROPAGATORS, "xray");
704        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
705        assert!(handler.sender.is_none());
706
707        cleanup_env();
708    }
709
710    #[tokio::test]
711    #[sealed_test]
712    async fn test_telemetry_config_env_propagators_combined() {
713        cleanup_env();
714
715        // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
716        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
717        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
718        assert!(handler.sender.is_none());
719
720        cleanup_env();
721    }
722
723    #[tokio::test]
724    #[sealed_test]
725    async fn test_telemetry_config_env_propagators_none() {
726        cleanup_env();
727
728        // Test with OTEL_PROPAGATORS=none
729        env::set_var(constants::env_vars::PROPAGATORS, "none");
730        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
731        assert!(handler.sender.is_none());
732
733        cleanup_env();
734    }
735
736    #[tokio::test]
737    #[sealed_test]
738    async fn test_init_telemetry_defaults() {
739        let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
740        assert!(completion_handler.sender.is_none()); // Default mode is Sync
741    }
742
743    #[tokio::test]
744    #[sealed_test]
745    async fn test_init_telemetry_custom() {
746        let resource = Resource::builder().build();
747        let config = TelemetryConfig::builder()
748            .resource(resource)
749            .with_named_propagator("tracecontext")
750            .enable_fmt_layer(true)
751            .set_global_provider(false)
752            .build();
753
754        let (_, completion_handler) = init_telemetry(config).await.unwrap();
755        assert!(completion_handler.sender.is_none());
756    }
757
758    #[tokio::test]
759    #[sealed_test]
760    async fn test_telemetry_config_env_fmt_layer() {
761        cleanup_env();
762
763        // Test that environment variable is respected
764        env::set_var("LAMBDA_TRACING_ENABLE_FMT_LAYER", "true");
765        let config = TelemetryConfig::default();
766        assert!(!config.enable_fmt_layer); // Config should not be affected by env var
767        let enable_fmt = env::var(env_vars::ENABLE_FMT_LAYER)
768            .map(|v| v.to_lowercase() == "true")
769            .unwrap_or(defaults::ENABLE_FMT_LAYER);
770        assert!(enable_fmt); // But the env var should be true
771
772        // Test that code-level setting works independently
773        let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
774        assert!(config.enable_fmt_layer);
775
776        // Test default behavior
777        env::remove_var("LAMBDA_TRACING_ENABLE_FMT_LAYER");
778        let config = TelemetryConfig::default();
779        assert!(!config.enable_fmt_layer);
780        let enable_fmt = env::var(env_vars::ENABLE_FMT_LAYER)
781            .map(|v| v.to_lowercase() == "true")
782            .unwrap_or(defaults::ENABLE_FMT_LAYER);
783        assert!(!enable_fmt);
784
785        // Clean up
786        cleanup_env();
787    }
788
789    #[test]
790    fn test_completion_handler_sync_mode() {
791        let provider = Arc::new(
792            SdkTracerProvider::builder()
793                .with_span_processor(SimpleSpanProcessor::new(Box::new(
794                    OtlpStdoutSpanExporter::default(),
795                )))
796                .build(),
797        );
798
799        let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
800
801        // In sync mode, complete() should call force_flush
802        handler.complete();
803        // Note: We can't easily verify the flush was called since TracerProvider
804        // doesn't expose this information, but we can verify it doesn't panic
805    }
806
807    #[tokio::test]
808    async fn test_completion_handler_async_mode() {
809        let provider = Arc::new(
810            SdkTracerProvider::builder()
811                .with_span_processor(SimpleSpanProcessor::new(Box::new(
812                    OtlpStdoutSpanExporter::default(),
813                )))
814                .build(),
815        );
816
817        let (tx, mut rx) = mpsc::unbounded_channel();
818
819        let completion_handler =
820            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
821
822        // In async mode, complete() should send a message through the channel
823        completion_handler.complete();
824
825        // Verify that we received the completion signal
826        assert!(rx.try_recv().is_ok());
827        // Verify channel is now empty
828        assert!(rx.try_recv().is_err());
829    }
830
831    #[test]
832    fn test_completion_handler_finalize_mode() {
833        let provider = Arc::new(
834            SdkTracerProvider::builder()
835                .with_span_processor(SimpleSpanProcessor::new(Box::new(
836                    OtlpStdoutSpanExporter::default(),
837                )))
838                .build(),
839        );
840
841        let (tx, _rx) = mpsc::unbounded_channel();
842
843        let completion_handler =
844            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
845
846        // In finalize mode, complete() should do nothing
847        completion_handler.complete();
848        // Verify it doesn't panic or cause issues
849    }
850
851    #[test]
852    fn test_completion_handler_clone() {
853        let provider = Arc::new(
854            SdkTracerProvider::builder()
855                .with_span_processor(SimpleSpanProcessor::new(Box::new(
856                    OtlpStdoutSpanExporter::default(),
857                )))
858                .build(),
859        );
860
861        let (tx, _rx) = mpsc::unbounded_channel();
862
863        let completion_handler =
864            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
865
866        // Test that Clone is implemented correctly
867        let cloned = completion_handler.clone();
868
869        // Verify both handlers have the same mode
870        assert!(matches!(cloned.mode, ProcessorMode::Async));
871        assert!(cloned.sender.is_some());
872    }
873
874    #[test]
875    fn test_completion_handler_sync_mode_error_handling() {
876        let provider = Arc::new(
877            SdkTracerProvider::builder()
878                .with_span_processor(SimpleSpanProcessor::new(Box::new(
879                    OtlpStdoutSpanExporter::default(),
880                )))
881                .build(),
882        );
883
884        let completion_handler =
885            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
886
887        // Test that complete() doesn't panic
888        completion_handler.complete();
889    }
890
891    #[tokio::test]
892    async fn test_completion_handler_async_mode_error_handling() {
893        let provider = Arc::new(
894            SdkTracerProvider::builder()
895                .with_span_processor(SimpleSpanProcessor::new(Box::new(
896                    OtlpStdoutSpanExporter::default(),
897                )))
898                .build(),
899        );
900
901        // Use UnboundedSender instead of Sender
902        let (tx, _rx) = mpsc::unbounded_channel();
903        // Fill the channel by dropping the receiver
904        drop(_rx);
905
906        let completion_handler =
907            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
908
909        // Test that complete() doesn't panic when receiver is dropped
910        completion_handler.complete();
911    }
912}
913
914// A simple no-op propagator
915#[derive(Debug)]
916struct NoopPropagator;
917
918impl NoopPropagator {
919    fn new() -> Self {
920        NoopPropagator
921    }
922}
923
924impl TextMapPropagator for NoopPropagator {
925    fn inject_context(
926        &self,
927        _cx: &opentelemetry::Context,
928        _injector: &mut dyn opentelemetry::propagation::Injector,
929    ) {
930    }
931
932    fn extract_with_context(
933        &self,
934        cx: &opentelemetry::Context,
935        _extractor: &dyn opentelemetry::propagation::Extractor,
936    ) -> opentelemetry::Context {
937        cx.clone()
938    }
939
940    fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
941        opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
942    }
943}