lambda_otel_lite/
telemetry.rs

1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//!     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//!     let resource = Resource::builder()
47//!         .with_attributes(vec![
48//!             KeyValue::new("service.version", "1.0.0"),
49//!             KeyValue::new("deployment.environment", "production"),
50//!         ])
51//!         .build();
52//!
53//!     let config = TelemetryConfig::builder()
54//!         .resource(resource)
55//!         .build();
56//!
57//!     let (_, completion_handler) = init_telemetry(config).await?;
58//!     Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//!     let config = TelemetryConfig::builder()
72//!         .with_span_processor(SimpleSpanProcessor::new(
73//!             Box::new(OtlpStdoutSpanExporter::default())
74//!         ))
75//!         .enable_fmt_layer(true)
76//!         .build();
77//!
78//!     let (_, completion_handler) = init_telemetry(config).await?;
79//!     Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95    constants, extension::register_extension, mode::ProcessorMode, processor::LambdaSpanProcessor,
96    propagation::LambdaXrayPropagator, resource::get_lambda_resource,
97};
98use bon::Builder;
99use lambda_runtime::Error;
100use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
101use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
102use opentelemetry_aws::trace::XrayPropagator;
103use opentelemetry_sdk::{
104    propagation::TraceContextPropagator,
105    trace::{SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
106    Resource,
107};
108use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
109use std::{borrow::Cow, env, sync::Arc};
110use tokio::sync::mpsc::UnboundedSender;
111use tracing_subscriber::layer::SubscriberExt;
112
113/// Manages the lifecycle of span export based on the processing mode.
114///
115/// This handler must be used to signal when spans should be exported. Its behavior
116/// varies by processing mode:
117/// - Sync: Forces immediate export
118/// - Async: Signals the extension to export
119/// - Finalize: Defers to span processor
120///
121/// # Thread Safety
122///
123/// This type is `Clone` and can be safely shared between threads.
124#[derive(Clone)]
125pub struct TelemetryCompletionHandler {
126    provider: Arc<SdkTracerProvider>,
127    sender: Option<UnboundedSender<()>>,
128    mode: ProcessorMode,
129    tracer: opentelemetry_sdk::trace::Tracer,
130}
131
132impl TelemetryCompletionHandler {
133    pub fn new(
134        provider: Arc<SdkTracerProvider>,
135        sender: Option<UnboundedSender<()>>,
136        mode: ProcessorMode,
137    ) -> Self {
138        // Create instrumentation scope with attributes
139        let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
140            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
141            .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
142            .with_attributes(vec![
143                KeyValue::new("library.language", "rust"),
144                KeyValue::new("library.type", "instrumentation"),
145                KeyValue::new("library.runtime", "aws_lambda"),
146            ])
147            .build();
148
149        // Create tracer with instrumentation scope
150        let tracer = provider.tracer_with_scope(scope);
151
152        Self {
153            provider,
154            sender,
155            mode,
156            tracer,
157        }
158    }
159
160    /// Get the tracer instance for creating spans.
161    ///
162    /// Returns the cached tracer instance configured with this package's instrumentation scope.
163    /// The tracer is configured with the provider's settings and will automatically use
164    /// the correct span processor based on the processing mode.
165    pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
166        &self.tracer
167    }
168
169    /// Complete telemetry processing for the current invocation
170    ///
171    /// In Sync mode, this will force flush the provider and log any errors that occur.
172    /// In Async mode, this will send a completion signal to the extension.
173    /// In Finalize mode, this will do nothing (handled by drop).
174    pub fn complete(&self) {
175        match self.mode {
176            ProcessorMode::Sync => {
177                if let Err(e) = self.provider.force_flush() {
178                    tracing::warn!(error = ?e, "Error flushing telemetry");
179                }
180            }
181            ProcessorMode::Async => {
182                if let Some(sender) = &self.sender {
183                    if let Err(e) = sender.send(()) {
184                        tracing::warn!(error = ?e, "Failed to send completion signal to extension");
185                    }
186                }
187            }
188            ProcessorMode::Finalize => {
189                // Do nothing, handled by drop
190            }
191        }
192    }
193}
194
195/// Configuration for OpenTelemetry initialization.
196///
197/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
198/// for standard Lambda configuration, or the builder pattern for customization.
199///
200/// # Fields
201///
202/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
203/// * `set_global_provider` - Set as global tracer provider (default: true)
204/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
205/// * `env_var_name` - Environment variable name for log level configuration
206///
207/// # Examples
208///
209/// Basic usage with default configuration:
210///
211/// ```no_run
212/// use lambda_otel_lite::telemetry::TelemetryConfig;
213///
214/// let config = TelemetryConfig::default();
215/// ```
216///
217/// Custom configuration with resource attributes:
218///
219/// ```no_run
220/// use lambda_otel_lite::telemetry::TelemetryConfig;
221/// use opentelemetry::KeyValue;
222/// use opentelemetry_sdk::Resource;
223///
224/// let config = TelemetryConfig::builder()
225///     .resource(Resource::builder()
226///         .with_attributes(vec![KeyValue::new("version", "1.0.0")])
227///         .build())
228///     .build();
229/// ```
230///
231/// Custom configuration with logging options:
232///
233/// ```no_run
234/// use lambda_otel_lite::telemetry::TelemetryConfig;
235///
236/// let config = TelemetryConfig::builder()
237///     .enable_fmt_layer(true)  // Enable console output for debugging
238///     .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string())  // Custom env var for log level
239///     .build();
240/// ```
241#[derive(Builder, Debug)]
242pub struct TelemetryConfig {
243    // Custom fields for internal state
244    #[builder(field)]
245    provider_builder: TracerProviderBuilder,
246
247    #[builder(field)]
248    has_processor: bool,
249
250    #[builder(field)]
251    propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
252
253    /// Enable console output for debugging.
254    ///
255    /// When enabled, spans and events will be printed to the console in addition
256    /// to being exported through the configured span processors. This is useful
257    /// for debugging but adds overhead and should be disabled in production.
258    ///
259    /// Default: `false`
260    #[builder(default = false)]
261    pub enable_fmt_layer: bool,
262
263    /// Set this provider as the global OpenTelemetry provider.
264    ///
265    /// When enabled, the provider will be registered as the global provider
266    /// for the OpenTelemetry API. This allows using the global tracer API
267    /// without explicitly passing around the provider.
268    ///
269    /// Default: `true`
270    #[builder(default = true)]
271    pub set_global_provider: bool,
272
273    /// Custom resource attributes for all spans.
274    ///
275    /// If not provided, resource attributes will be automatically detected
276    /// from the Lambda environment. Custom resources will override any
277    /// automatically detected attributes with the same keys.
278    ///
279    /// Default: `None` (auto-detected from Lambda environment)
280    pub resource: Option<Resource>,
281
282    /// Environment variable name to use for log level configuration.
283    ///
284    /// This field specifies which environment variable should be used to configure
285    /// the tracing subscriber's log level filter. If not specified, the system will
286    /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
287    ///
288    /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
289    pub env_var_name: Option<String>,
290}
291
292impl Default for TelemetryConfig {
293    fn default() -> Self {
294        let enable_fmt_layer = env::var("LAMBDA_TRACING_ENABLE_FMT_LAYER")
295            .map(|val| val.to_lowercase() == "true" || val == "1")
296            .unwrap_or(false);
297
298        Self::builder().enable_fmt_layer(enable_fmt_layer).build()
299    }
300}
301
302/// Builder methods for adding span processors and other configuration
303impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
304    /// Add a span processor to the tracer provider.
305    ///
306    /// This method allows adding custom span processors for trace data processing.
307    /// Multiple processors can be added by calling this method multiple times.
308    ///
309    /// # Arguments
310    ///
311    /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
312    ///
313    /// # Examples
314    ///
315    /// ```no_run
316    /// use lambda_otel_lite::TelemetryConfig;
317    /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
318    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
319    ///
320    /// // Only use builder when adding custom processors
321    /// let config = TelemetryConfig::builder()
322    ///     .with_span_processor(SimpleSpanProcessor::new(
323    ///         Box::new(OtlpStdoutSpanExporter::default())
324    ///     ))
325    ///     .build();
326    /// ```
327    pub fn with_span_processor<T>(mut self, processor: T) -> Self
328    where
329        T: SpanProcessor + 'static,
330    {
331        self.provider_builder = self.provider_builder.with_span_processor(processor);
332        self.has_processor = true;
333        self
334    }
335
336    /// Add a propagator to the list of propagators.
337    ///
338    /// Multiple propagators can be added and will be combined into a composite propagator.
339    /// The default propagator is TraceContextPropagator.
340    ///
341    /// # Arguments
342    ///
343    /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
344    ///
345    /// # Examples
346    ///
347    /// ```no_run
348    /// use lambda_otel_lite::TelemetryConfig;
349    /// use opentelemetry_sdk::propagation::BaggagePropagator;
350    ///
351    /// let config = TelemetryConfig::builder()
352    ///     .with_propagator(BaggagePropagator::new())
353    ///     .build();
354    /// ```
355    pub fn with_propagator<T>(mut self, propagator: T) -> Self
356    where
357        T: TextMapPropagator + Send + Sync + 'static,
358    {
359        self.propagators.push(Box::new(propagator));
360        self
361    }
362
363    pub fn with_named_propagator(self, name: &str) -> Self {
364        match name {
365            "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
366            "xray" => self.with_propagator(XrayPropagator::new()),
367            "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
368            "none" => self.with_propagator(NoopPropagator::new()),
369            _ => {
370                tracing::warn!("Unknown propagator: {}, using default propagators", name);
371                self
372            }
373        }
374    }
375}
376
377/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
378///
379/// # Arguments
380///
381/// * `config` - Configuration for telemetry initialization
382///
383/// # Returns
384///
385/// Returns a tuple containing:
386/// - A tracer instance for manual instrumentation
387/// - A completion handler for managing span export timing
388///
389/// # Errors
390///
391/// Returns error if:
392/// - Extension registration fails (async/finalize modes)
393/// - Tracer provider initialization fails
394/// - Environment variable parsing fails
395///
396/// # Examples
397///
398/// Basic usage with default configuration:
399///
400/// ```no_run
401/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
402///
403/// # async fn example() -> Result<(), lambda_runtime::Error> {
404/// // Initialize with default configuration
405/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
406/// # Ok(())
407/// # }
408/// ```
409///
410/// Custom configuration:
411///
412/// ```no_run
413/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
414/// use opentelemetry::KeyValue;
415/// use opentelemetry_sdk::Resource;
416///
417/// # async fn example() -> Result<(), lambda_runtime::Error> {
418/// // Create custom resource
419/// let resource = Resource::builder()
420///     .with_attributes(vec![
421///         KeyValue::new("service.name", "payment-api"),
422///         KeyValue::new("service.version", "1.2.3"),
423///     ])
424///     .build();
425///
426/// // Initialize with custom configuration
427/// let (_, telemetry) = init_telemetry(
428///     TelemetryConfig::builder()
429///         .resource(resource)
430///         .build()
431/// ).await?;
432/// # Ok(())
433/// # }
434/// ```
435///
436/// Advanced usage with BatchSpanProcessor (required for async exporters):
437///
438/// ```no_run
439/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
440/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
441/// use opentelemetry_sdk::trace::BatchSpanProcessor;
442/// use lambda_runtime::Error;
443///
444/// # async fn example() -> Result<(), Error> {
445/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
446///     .with_http()
447///     .with_http_client(reqwest::Client::new())
448///     .with_protocol(Protocol::HttpBinary)
449///     .build()?;
450///
451/// let (provider, completion) = init_telemetry(
452///     TelemetryConfig::builder()
453///         .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
454///         .build()
455/// ).await?;
456/// # Ok(())
457/// # }
458/// ```
459///
460/// Using LambdaSpanProcessor with blocking http client:
461///
462/// ```no_run
463/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
464/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
465/// use lambda_runtime::Error;
466///
467/// # async fn example() -> Result<(), Error> {
468/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
469///     .with_http()
470///     .with_http_client(reqwest::blocking::Client::new())
471///     .with_protocol(Protocol::HttpBinary)
472///     .build()?;
473///
474/// let (provider, completion) = init_telemetry(
475///     TelemetryConfig::builder()
476///         .with_span_processor(
477///             LambdaSpanProcessor::builder()
478///                 .exporter(lambda_exporter)
479///                 .max_batch_size(512)
480///                 .max_queue_size(2048)
481///                 .build()
482///         )
483///         .build()
484/// ).await?;
485/// # Ok(())
486/// # }
487/// ```
488///
489pub async fn init_telemetry(
490    mut config: TelemetryConfig,
491) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
492    let mode = ProcessorMode::from_env();
493
494    if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
495        let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
496
497        for propagator in propagators {
498            match propagator {
499                "tracecontext" => config
500                    .propagators
501                    .push(Box::new(TraceContextPropagator::new())),
502                "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
503                "xray-lambda" => config
504                    .propagators
505                    .push(Box::new(LambdaXrayPropagator::new())),
506                "none" => config.propagators.push(Box::new(NoopPropagator::new())),
507                _ => tracing::warn!(
508                    "Unknown propagator: {}, using default propagators",
509                    propagator
510                ),
511            }
512        }
513    } else {
514        // if no propagators are set, use the default propagators
515        if config.propagators.is_empty() {
516            config
517                .propagators
518                .push(Box::new(TraceContextPropagator::new()));
519            config
520                .propagators
521                .push(Box::new(LambdaXrayPropagator::new()));
522        }
523    }
524
525    let composite_propagator = TextMapCompositePropagator::new(config.propagators);
526    global::set_text_map_propagator(composite_propagator);
527
528    // Add default span processor if none was added
529    if !config.has_processor {
530        let processor = LambdaSpanProcessor::builder()
531            .exporter(OtlpStdoutSpanExporter::default())
532            .build();
533        config.provider_builder = config.provider_builder.with_span_processor(processor);
534    }
535
536    // Apply defaults and build the provider
537    let resource = config.resource.unwrap_or_else(get_lambda_resource);
538    let provider = Arc::new(config.provider_builder.with_resource(resource).build());
539
540    // Register the extension if in async or finalize mode
541    let sender = match mode {
542        ProcessorMode::Async | ProcessorMode::Finalize => {
543            Some(register_extension(provider.clone(), mode.clone()).await?)
544        }
545        _ => None,
546    };
547
548    if config.set_global_provider {
549        // Set the provider as global
550        set_tracer_provider(provider.as_ref().clone());
551    }
552
553    // Initialize tracing subscriber with smart env var selection
554    let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
555        if env::var("RUST_LOG").is_ok() {
556            "RUST_LOG"
557        } else {
558            "AWS_LAMBDA_LOG_LEVEL"
559        }
560    });
561
562    let env_filter = tracing_subscriber::EnvFilter::builder()
563        .with_env_var(env_var_name)
564        .from_env_lossy();
565
566    let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
567    let tracer = completion_handler.get_tracer().clone();
568
569    let subscriber = tracing_subscriber::registry::Registry::default()
570        .with(tracing_opentelemetry::OpenTelemetryLayer::new(
571            tracer.clone(),
572        ))
573        .with(env_filter);
574
575    // Always initialize the subscriber, with or without fmt layer
576    if config.enable_fmt_layer {
577        // Determine if the lambda logging configuration is set to output json logs
578        let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
579            .unwrap_or_default()
580            .to_uppercase()
581            == "JSON";
582
583        if is_json {
584            tracing::subscriber::set_global_default(
585                subscriber.with(
586                    tracing_subscriber::fmt::layer()
587                        .with_target(false)
588                        .without_time()
589                        .json(),
590                ),
591            )?;
592        } else {
593            tracing::subscriber::set_global_default(
594                subscriber.with(
595                    tracing_subscriber::fmt::layer()
596                        .with_target(false)
597                        .without_time()
598                        .with_ansi(false),
599                ),
600            )?;
601        }
602    } else {
603        tracing::subscriber::set_global_default(subscriber)?;
604    }
605
606    Ok((tracer, completion_handler))
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use opentelemetry_sdk::trace::SimpleSpanProcessor;
613    use sealed_test::prelude::*;
614    use tokio::sync::mpsc;
615
616    // Helper to clean up environment variables between tests
617    fn cleanup_env() {
618        env::remove_var("LAMBDA_TRACING_ENABLE_FMT_LAYER");
619        env::remove_var(constants::env_vars::PROPAGATORS);
620        env::remove_var("_X_AMZN_TRACE_ID");
621    }
622
623    #[test]
624    #[sealed_test]
625    fn test_telemetry_config_defaults() {
626        cleanup_env();
627
628        let config = TelemetryConfig::builder().build();
629        assert!(config.set_global_provider); // Should be true by default
630        assert!(!config.has_processor);
631        assert!(!config.enable_fmt_layer);
632        assert!(config.propagators.is_empty()); // No propagators by default in builder
633    }
634
635    #[test]
636    #[sealed_test]
637    fn test_telemetry_config_with_propagators() {
638        cleanup_env();
639
640        // Test with explicit tracecontext propagator
641        let config = TelemetryConfig::builder()
642            .with_span_processor(SimpleSpanProcessor::new(Box::new(
643                OtlpStdoutSpanExporter::default(),
644            )))
645            .with_named_propagator("tracecontext")
646            .build();
647        assert_eq!(config.propagators.len(), 1);
648
649        // Test with explicit xray propagator
650        let config = TelemetryConfig::builder()
651            .with_named_propagator("xray")
652            .build();
653        assert_eq!(config.propagators.len(), 1);
654
655        // Test with both propagators
656        let config = TelemetryConfig::builder()
657            .with_named_propagator("tracecontext")
658            .with_named_propagator("xray")
659            .build();
660        assert_eq!(config.propagators.len(), 2);
661
662        // Test with default propagators (empty - will be set in init_telemetry)
663        let config = TelemetryConfig::builder().build();
664        assert_eq!(config.propagators.len(), 0);
665
666        // Test with none
667        let config = TelemetryConfig::builder()
668            .with_named_propagator("none")
669            .build();
670        assert_eq!(config.propagators.len(), 1);
671    }
672
673    #[tokio::test]
674    #[sealed_test]
675    async fn test_telemetry_config_env_propagators_tracecontext() {
676        cleanup_env();
677
678        // Test with OTEL_PROPAGATORS=tracecontext
679        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
680        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
681        // In real usage we'd check the behavior rather than implementation details
682        // So we'll just check that we can create and use a handler
683        assert!(handler.sender.is_none());
684
685        cleanup_env();
686    }
687
688    #[tokio::test]
689    #[sealed_test]
690    async fn test_telemetry_config_env_propagators_xray() {
691        cleanup_env();
692
693        // Test with OTEL_PROPAGATORS=xray
694        env::set_var(constants::env_vars::PROPAGATORS, "xray");
695        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
696        assert!(handler.sender.is_none());
697
698        cleanup_env();
699    }
700
701    #[tokio::test]
702    #[sealed_test]
703    async fn test_telemetry_config_env_propagators_combined() {
704        cleanup_env();
705
706        // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
707        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
708        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
709        assert!(handler.sender.is_none());
710
711        cleanup_env();
712    }
713
714    #[tokio::test]
715    #[sealed_test]
716    async fn test_telemetry_config_env_propagators_none() {
717        cleanup_env();
718
719        // Test with OTEL_PROPAGATORS=none
720        env::set_var(constants::env_vars::PROPAGATORS, "none");
721        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
722        assert!(handler.sender.is_none());
723
724        cleanup_env();
725    }
726
727    #[tokio::test]
728    #[sealed_test]
729    async fn test_init_telemetry_defaults() {
730        let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
731        assert!(completion_handler.sender.is_none()); // Default mode is Sync
732    }
733
734    #[tokio::test]
735    #[sealed_test]
736    async fn test_init_telemetry_custom() {
737        let resource = Resource::builder().build();
738        let config = TelemetryConfig::builder()
739            .resource(resource)
740            .with_named_propagator("tracecontext")
741            .enable_fmt_layer(true)
742            .set_global_provider(false)
743            .build();
744
745        let (_, completion_handler) = init_telemetry(config).await.unwrap();
746        assert!(completion_handler.sender.is_none());
747    }
748
749    #[test]
750    fn test_telemetry_config_env_fmt_layer() {
751        cleanup_env();
752
753        // Test with environment variable set to true
754        env::set_var("LAMBDA_TRACING_ENABLE_FMT_LAYER", "true");
755        let config = TelemetryConfig::default();
756        assert!(config.enable_fmt_layer);
757
758        // Test with environment variable set to 1
759        env::set_var("LAMBDA_TRACING_ENABLE_FMT_LAYER", "1");
760        let config = TelemetryConfig::default();
761        assert!(config.enable_fmt_layer);
762
763        // Test with environment variable set to false
764        env::set_var("LAMBDA_TRACING_ENABLE_FMT_LAYER", "false");
765        let config = TelemetryConfig::default();
766        assert!(!config.enable_fmt_layer);
767
768        // Clean up
769        cleanup_env();
770    }
771
772    #[test]
773    fn test_completion_handler_sync_mode() {
774        let provider = Arc::new(
775            SdkTracerProvider::builder()
776                .with_span_processor(SimpleSpanProcessor::new(Box::new(
777                    OtlpStdoutSpanExporter::default(),
778                )))
779                .build(),
780        );
781
782        let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
783
784        // In sync mode, complete() should call force_flush
785        handler.complete();
786        // Note: We can't easily verify the flush was called since TracerProvider
787        // doesn't expose this information, but we can verify it doesn't panic
788    }
789
790    #[tokio::test]
791    async fn test_completion_handler_async_mode() {
792        let provider = Arc::new(
793            SdkTracerProvider::builder()
794                .with_span_processor(SimpleSpanProcessor::new(Box::new(
795                    OtlpStdoutSpanExporter::default(),
796                )))
797                .build(),
798        );
799
800        let (tx, mut rx) = mpsc::unbounded_channel();
801
802        let completion_handler =
803            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
804
805        // In async mode, complete() should send a message through the channel
806        completion_handler.complete();
807
808        // Verify that we received the completion signal
809        assert!(rx.try_recv().is_ok());
810        // Verify channel is now empty
811        assert!(rx.try_recv().is_err());
812    }
813
814    #[test]
815    fn test_completion_handler_finalize_mode() {
816        let provider = Arc::new(
817            SdkTracerProvider::builder()
818                .with_span_processor(SimpleSpanProcessor::new(Box::new(
819                    OtlpStdoutSpanExporter::default(),
820                )))
821                .build(),
822        );
823
824        let (tx, _rx) = mpsc::unbounded_channel();
825
826        let completion_handler =
827            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
828
829        // In finalize mode, complete() should do nothing
830        completion_handler.complete();
831        // Verify it doesn't panic or cause issues
832    }
833
834    #[test]
835    fn test_completion_handler_clone() {
836        let provider = Arc::new(
837            SdkTracerProvider::builder()
838                .with_span_processor(SimpleSpanProcessor::new(Box::new(
839                    OtlpStdoutSpanExporter::default(),
840                )))
841                .build(),
842        );
843
844        let (tx, _rx) = mpsc::unbounded_channel();
845
846        let completion_handler =
847            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
848
849        // Test that Clone is implemented correctly
850        let cloned = completion_handler.clone();
851
852        // Verify both handlers have the same mode
853        assert!(matches!(cloned.mode, ProcessorMode::Async));
854        assert!(cloned.sender.is_some());
855    }
856
857    #[test]
858    fn test_completion_handler_sync_mode_error_handling() {
859        let provider = Arc::new(
860            SdkTracerProvider::builder()
861                .with_span_processor(SimpleSpanProcessor::new(Box::new(
862                    OtlpStdoutSpanExporter::default(),
863                )))
864                .build(),
865        );
866
867        let completion_handler =
868            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
869
870        // Test that complete() doesn't panic
871        completion_handler.complete();
872    }
873
874    #[tokio::test]
875    async fn test_completion_handler_async_mode_error_handling() {
876        let provider = Arc::new(
877            SdkTracerProvider::builder()
878                .with_span_processor(SimpleSpanProcessor::new(Box::new(
879                    OtlpStdoutSpanExporter::default(),
880                )))
881                .build(),
882        );
883
884        // Use UnboundedSender instead of Sender
885        let (tx, _rx) = mpsc::unbounded_channel();
886        // Fill the channel by dropping the receiver
887        drop(_rx);
888
889        let completion_handler =
890            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
891
892        // Test that complete() doesn't panic when receiver is dropped
893        completion_handler.complete();
894    }
895}
896
897// A simple no-op propagator
898#[derive(Debug)]
899struct NoopPropagator;
900
901impl NoopPropagator {
902    fn new() -> Self {
903        NoopPropagator
904    }
905}
906
907impl TextMapPropagator for NoopPropagator {
908    fn inject_context(
909        &self,
910        _cx: &opentelemetry::Context,
911        _injector: &mut dyn opentelemetry::propagation::Injector,
912    ) {
913    }
914
915    fn extract_with_context(
916        &self,
917        cx: &opentelemetry::Context,
918        _extractor: &dyn opentelemetry::propagation::Extractor,
919    ) -> opentelemetry::Context {
920        cx.clone()
921    }
922
923    fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
924        opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
925    }
926}