lambda_otel_lite/
telemetry.rs

1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//!     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//!     let resource = Resource::builder()
47//!         .with_attributes(vec![
48//!             KeyValue::new("service.version", "1.0.0"),
49//!             KeyValue::new("deployment.environment", "production"),
50//!         ])
51//!         .build();
52//!
53//!     let config = TelemetryConfig::builder()
54//!         .resource(resource)
55//!         .build();
56//!
57//!     let (_, completion_handler) = init_telemetry(config).await?;
58//!     Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//!     let config = TelemetryConfig::builder()
72//!         .with_span_processor(SimpleSpanProcessor::new(
73//!             Box::new(OtlpStdoutSpanExporter::default())
74//!         ))
75//!         .enable_fmt_layer(true)
76//!         .build();
77//!
78//!     let (_, completion_handler) = init_telemetry(config).await?;
79//!     Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95    constants, extension::register_extension, logger::Logger, mode::ProcessorMode,
96    processor::LambdaSpanProcessor, propagation::LambdaXrayPropagator,
97    resource::get_lambda_resource,
98};
99use bon::Builder;
100use lambda_runtime::Error;
101use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
102use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
103use opentelemetry_aws::trace::XrayPropagator;
104use opentelemetry_sdk::{
105    propagation::TraceContextPropagator,
106    trace::{IdGenerator, SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
107    Resource,
108};
109use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
110use std::{borrow::Cow, env, sync::Arc};
111use tokio::sync::mpsc::UnboundedSender;
112use tracing_subscriber::layer::SubscriberExt;
113
114// Add module-specific logger
115static LOGGER: Logger = Logger::const_new("telemetry");
116
117/// Manages the lifecycle of span export based on the processing mode.
118///
119/// This handler must be used to signal when spans should be exported. Its behavior
120/// varies by processing mode:
121/// - Sync: Forces immediate export
122/// - Async: Signals the extension to export
123/// - Finalize: Defers to span processor
124///
125/// # Thread Safety
126///
127/// This type is `Clone` and can be safely shared between threads.
128#[derive(Clone)]
129pub struct TelemetryCompletionHandler {
130    provider: Arc<SdkTracerProvider>,
131    sender: Option<UnboundedSender<()>>,
132    mode: ProcessorMode,
133    tracer: opentelemetry_sdk::trace::Tracer,
134}
135
136impl TelemetryCompletionHandler {
137    pub fn new(
138        provider: Arc<SdkTracerProvider>,
139        sender: Option<UnboundedSender<()>>,
140        mode: ProcessorMode,
141    ) -> Self {
142        // Create instrumentation scope with attributes
143        let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
144            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
145            .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
146            .with_attributes(vec![
147                KeyValue::new("library.language", "rust"),
148                KeyValue::new("library.type", "instrumentation"),
149                KeyValue::new("library.runtime", "aws_lambda"),
150            ])
151            .build();
152
153        // Create tracer with instrumentation scope
154        let tracer = provider.tracer_with_scope(scope);
155
156        Self {
157            provider,
158            sender,
159            mode,
160            tracer,
161        }
162    }
163
164    /// Get the tracer instance for creating spans.
165    ///
166    /// Returns the cached tracer instance configured with this package's instrumentation scope.
167    /// The tracer is configured with the provider's settings and will automatically use
168    /// the correct span processor based on the processing mode.
169    pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
170        &self.tracer
171    }
172
173    /// Complete telemetry processing for the current invocation
174    ///
175    /// In Sync mode, this will force flush the provider and log any errors that occur.
176    /// In Async mode, this will send a completion signal to the extension.
177    /// In Finalize mode, this will do nothing (handled by drop).
178    pub fn complete(&self) {
179        match self.mode {
180            ProcessorMode::Sync => {
181                if let Err(e) = self.provider.force_flush() {
182                    LOGGER.warn(format!("Error flushing telemetry: {:?}", e));
183                }
184            }
185            ProcessorMode::Async => {
186                if let Some(sender) = &self.sender {
187                    if let Err(e) = sender.send(()) {
188                        LOGGER.warn(format!(
189                            "Failed to send completion signal to extension: {:?}",
190                            e
191                        ));
192                    }
193                }
194            }
195            ProcessorMode::Finalize => {
196                // Do nothing, handled by drop
197            }
198        }
199    }
200}
201
202/// Configuration for OpenTelemetry initialization.
203///
204/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
205/// for standard Lambda configuration, or the builder pattern for customization.
206///
207/// # Fields
208///
209/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
210/// * `set_global_provider` - Set as global tracer provider (default: true)
211/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
212/// * `env_var_name` - Environment variable name for log level configuration
213/// * `id_generator` - Custom ID generator for trace and span IDs
214/// * `processor_mode` - Span processing mode (sync/async/finalize)
215///
216/// # Examples
217///
218/// Basic usage with default configuration:
219///
220/// ```no_run
221/// use lambda_otel_lite::telemetry::TelemetryConfig;
222///
223/// let config = TelemetryConfig::default();
224/// ```
225///
226/// Custom configuration with resource attributes:
227///
228/// ```no_run
229/// use lambda_otel_lite::telemetry::TelemetryConfig;
230/// use opentelemetry::KeyValue;
231/// use opentelemetry_sdk::Resource;
232///
233/// let config = TelemetryConfig::builder()
234///     .resource(Resource::builder()
235///         .with_attributes(vec![KeyValue::new("version", "1.0.0")])
236///         .build())
237///     .build();
238/// ```
239///
240/// Custom configuration with logging options:
241///
242/// ```no_run
243/// use lambda_otel_lite::telemetry::TelemetryConfig;
244///
245/// let config = TelemetryConfig::builder()
246///     .enable_fmt_layer(true)  // Enable console output for debugging
247///     .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string())  // Custom env var for log level
248///     .build();
249/// ```
250#[derive(Builder, Debug)]
251pub struct TelemetryConfig {
252    // Custom fields for internal state
253    #[builder(field)]
254    provider_builder: TracerProviderBuilder,
255
256    #[builder(field)]
257    has_processor: bool,
258
259    #[builder(field)]
260    propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
261
262    /// Enable console output for debugging.
263    ///
264    /// When enabled, spans and events will be printed to the console in addition
265    /// to being exported through the configured span processors. This is useful
266    /// for debugging but adds overhead and should be disabled in production.
267    ///
268    /// This can also be controlled via the `LAMBDA_TRACING_ENABLE_FMT_LAYER` environment variable,
269    /// which takes precedence over this setting when present:
270    /// - Setting the env var to "true" will enable console output even if this field is false
271    /// - Setting the env var to "false" will disable console output even if this field is true
272    /// - Invalid values will log a warning and fall back to this code setting
273    ///
274    /// This environment variable override allows toggling logging for debugging without code changes.
275    ///
276    /// Default: `false`
277    #[builder(default = false)]
278    pub enable_fmt_layer: bool,
279
280    /// Set this provider as the global OpenTelemetry provider.
281    ///
282    /// When enabled, the provider will be registered as the global provider
283    /// for the OpenTelemetry API. This allows using the global tracer API
284    /// without explicitly passing around the provider.
285    ///
286    /// Default: `true`
287    #[builder(default = true)]
288    pub set_global_provider: bool,
289
290    /// Custom resource attributes for all spans.
291    ///
292    /// If not provided, resource attributes will be automatically detected
293    /// from the Lambda environment. Custom resources will override any
294    /// automatically detected attributes with the same keys.
295    ///
296    /// Default: `None` (auto-detected from Lambda environment)
297    pub resource: Option<Resource>,
298
299    /// Environment variable name to use for log level configuration.
300    ///
301    /// This field specifies which environment variable should be used to configure
302    /// the tracing subscriber's log level filter. If not specified, the system will
303    /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
304    ///
305    /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
306    pub env_var_name: Option<String>,
307
308    /// Span processing mode (sync/async/finalize)
309    ///
310    /// Controls how spans are exported from the processor. This can be overridden by the
311    /// LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE environment variable, which takes precedence.
312    ///
313    /// Default: `None` (uses environment variable or defaults to `ProcessorMode::Sync`)
314    pub processor_mode: Option<ProcessorMode>,
315}
316
317impl Default for TelemetryConfig {
318    fn default() -> Self {
319        Self::builder().build()
320    }
321}
322
323/// Builder methods for adding span processors and other configuration
324impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
325    /// Add a span processor to the tracer provider.
326    ///
327    /// This method allows adding custom span processors for trace data processing.
328    /// Multiple processors can be added by calling this method multiple times.
329    ///
330    /// # Arguments
331    ///
332    /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
333    ///
334    /// # Examples
335    ///
336    /// ```no_run
337    /// use lambda_otel_lite::TelemetryConfig;
338    /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
339    /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
340    ///
341    /// // Only use builder when adding custom processors
342    /// let config = TelemetryConfig::builder()
343    ///     .with_span_processor(SimpleSpanProcessor::new(
344    ///         Box::new(OtlpStdoutSpanExporter::default())
345    ///     ))
346    ///     .build();
347    /// ```
348    pub fn with_span_processor<T>(mut self, processor: T) -> Self
349    where
350        T: SpanProcessor + 'static,
351    {
352        self.provider_builder = self.provider_builder.with_span_processor(processor);
353        self.has_processor = true;
354        self
355    }
356
357    /// Add a propagator to the list of propagators.
358    ///
359    /// Multiple propagators can be added and will be combined into a composite propagator.
360    /// The default propagator is TraceContextPropagator.
361    ///
362    /// # Arguments
363    ///
364    /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use lambda_otel_lite::TelemetryConfig;
370    /// use opentelemetry_sdk::propagation::BaggagePropagator;
371    ///
372    /// let config = TelemetryConfig::builder()
373    ///     .with_propagator(BaggagePropagator::new())
374    ///     .build();
375    /// ```
376    pub fn with_propagator<T>(mut self, propagator: T) -> Self
377    where
378        T: TextMapPropagator + Send + Sync + 'static,
379    {
380        self.propagators.push(Box::new(propagator));
381        self
382    }
383
384    pub fn with_named_propagator(self, name: &str) -> Self {
385        match name {
386            "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
387            "xray" => self.with_propagator(XrayPropagator::new()),
388            "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
389            "none" => self.with_propagator(NoopPropagator::new()),
390            _ => {
391                LOGGER.warn(format!(
392                    "Unknown propagator: {}, using default propagators",
393                    name
394                ));
395                self
396            }
397        }
398    }
399
400    /// Add a custom ID generator to the tracer provider.
401    ///
402    /// This method allows setting a custom ID generator for trace and span IDs.
403    /// This is particularly useful when integrating with AWS X-Ray, which requires
404    /// a specific ID format.
405    ///
406    /// # Arguments
407    ///
408    /// * `id_generator` - An ID generator implementing the [`IdGenerator`] trait
409    ///
410    /// # Examples
411    ///
412    /// ```no_run
413    /// use lambda_otel_lite::TelemetryConfig;
414    /// use opentelemetry_aws::trace::XrayIdGenerator;
415    ///
416    /// // Configure with X-Ray compatible ID generator
417    /// let config = TelemetryConfig::builder()
418    ///     .with_id_generator(XrayIdGenerator::default())
419    ///     .build();
420    /// ```
421    pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
422    where
423        T: IdGenerator + 'static,
424    {
425        self.provider_builder = self.provider_builder.with_id_generator(id_generator);
426        self
427    }
428}
429
430/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
431///
432/// # Arguments
433///
434/// * `config` - Configuration for telemetry initialization
435///
436/// # Returns
437///
438/// Returns a tuple containing:
439/// - A tracer instance for manual instrumentation
440/// - A completion handler for managing span export timing
441///
442/// # Errors
443///
444/// Returns error if:
445/// - Extension registration fails (async/finalize modes)
446/// - Tracer provider initialization fails
447/// - Environment variable parsing fails
448///
449/// # Examples
450///
451/// Basic usage with default configuration:
452///
453/// ```no_run
454/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
455///
456/// # async fn example() -> Result<(), lambda_runtime::Error> {
457/// // Initialize with default configuration
458/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
459/// # Ok(())
460/// # }
461/// ```
462///
463/// Custom configuration:
464///
465/// ```no_run
466/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
467/// use opentelemetry::KeyValue;
468/// use opentelemetry_sdk::Resource;
469///
470/// # async fn example() -> Result<(), lambda_runtime::Error> {
471/// // Create custom resource
472/// let resource = Resource::builder()
473///     .with_attributes(vec![
474///         KeyValue::new("service.name", "payment-api"),
475///         KeyValue::new("service.version", "1.2.3"),
476///     ])
477///     .build();
478///
479/// // Initialize with custom configuration
480/// let (_, telemetry) = init_telemetry(
481///     TelemetryConfig::builder()
482///         .resource(resource)
483///         .build()
484/// ).await?;
485/// # Ok(())
486/// # }
487/// ```
488///
489/// Advanced usage with BatchSpanProcessor (required for async exporters):
490///
491/// ```no_run
492/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
493/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
494/// use opentelemetry_sdk::trace::BatchSpanProcessor;
495/// use lambda_runtime::Error;
496///
497/// # async fn example() -> Result<(), Error> {
498/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
499///     .with_http()
500///     .with_http_client(reqwest::Client::new())
501///     .with_protocol(Protocol::HttpBinary)
502///     .build()?;
503///
504/// let (provider, completion) = init_telemetry(
505///     TelemetryConfig::builder()
506///         .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
507///         .build()
508/// ).await?;
509/// # Ok(())
510/// # }
511/// ```
512///
513/// Using LambdaSpanProcessor with blocking http client:
514///
515/// ```no_run
516/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
517/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
518/// use lambda_runtime::Error;
519///
520/// # async fn example() -> Result<(), Error> {
521/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
522///     .with_http()
523///     .with_http_client(reqwest::blocking::Client::new())
524///     .with_protocol(Protocol::HttpBinary)
525///     .build()?;
526///
527/// let (provider, completion) = init_telemetry(
528///     TelemetryConfig::builder()
529///         .with_span_processor(
530///             LambdaSpanProcessor::builder()
531///                 .exporter(lambda_exporter)
532///                 .max_batch_size(512)
533///                 .max_queue_size(2048)
534///                 .build()
535///         )
536///         .build()
537/// ).await?;
538/// # Ok(())
539/// # }
540/// ```
541///
542pub async fn init_telemetry(
543    mut config: TelemetryConfig,
544) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
545    // Get mode from config or environment with environment taking precedence
546    let mode = ProcessorMode::resolve(config.processor_mode);
547
548    if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
549        let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
550
551        for propagator in propagators {
552            match propagator {
553                "tracecontext" => config
554                    .propagators
555                    .push(Box::new(TraceContextPropagator::new())),
556                "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
557                "xray-lambda" => config
558                    .propagators
559                    .push(Box::new(LambdaXrayPropagator::new())),
560                "none" => config.propagators.push(Box::new(NoopPropagator::new())),
561                _ => LOGGER.warn(format!(
562                    "Unknown propagator: {}, using default propagators",
563                    propagator
564                )),
565            }
566        }
567    } else {
568        // if no propagators are set, use the default propagators
569        if config.propagators.is_empty() {
570            // IMPORTANT:
571            // LambdaXrayPropagator is added *before* TraceContextPropagator
572            // because in OpenTelemetry Rust, the *last* propagator that extracts
573            // a valid context wins during extraction.
574            // This ensures that if both an AWS X-Ray header (or _X_AMZN_TRACE_ID)
575            // and a W3C traceparent header are present, the W3C traceparent takes precedence.
576            config
577                .propagators
578                .push(Box::new(LambdaXrayPropagator::new()));
579            config
580                .propagators
581                .push(Box::new(TraceContextPropagator::new()));
582        }
583    }
584
585    let composite_propagator = TextMapCompositePropagator::new(config.propagators);
586    global::set_text_map_propagator(composite_propagator);
587
588    // Add default span processor if none was added
589    if !config.has_processor {
590        let processor = LambdaSpanProcessor::builder()
591            .exporter(OtlpStdoutSpanExporter::default())
592            .build();
593        config.provider_builder = config.provider_builder.with_span_processor(processor);
594    }
595
596    // Apply defaults and build the provider
597    let resource = config.resource.unwrap_or_else(get_lambda_resource);
598
599    let provider = Arc::new(config.provider_builder.with_resource(resource).build());
600
601    // Register the extension if in async or finalize mode
602    let sender = match mode {
603        ProcessorMode::Async | ProcessorMode::Finalize => {
604            Some(register_extension(provider.clone(), mode.clone()).await?)
605        }
606        _ => None,
607    };
608
609    if config.set_global_provider {
610        // Set the provider as global
611        set_tracer_provider(provider.as_ref().clone());
612    }
613
614    // Initialize tracing subscriber with smart env var selection
615    let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
616        if env::var("RUST_LOG").is_ok() {
617            "RUST_LOG"
618        } else {
619            "AWS_LAMBDA_LOG_LEVEL"
620        }
621    });
622
623    let env_filter = tracing_subscriber::EnvFilter::builder()
624        .with_env_var(env_var_name)
625        .from_env_lossy();
626
627    let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
628    let tracer = completion_handler.get_tracer().clone();
629
630    let subscriber = tracing_subscriber::registry::Registry::default()
631        .with(tracing_opentelemetry::OpenTelemetryLayer::new(
632            tracer.clone(),
633        ))
634        .with(env_filter);
635
636    // Determine if fmt layer should be enabled - environment variable takes precedence when set
637    let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
638        match env_value.to_lowercase().as_str() {
639            "true" => true,
640            "false" => false,
641            other => {
642                LOGGER.warn(format!(
643                    "Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
644                    other,
645                    constants::env_vars::ENABLE_FMT_LAYER
646                ));
647                config.enable_fmt_layer
648            }
649        }
650    } else {
651        // If env var not set, use the configured value
652        config.enable_fmt_layer
653    };
654
655    // Enable fmt layer based on the determined value
656    if enable_fmt {
657        // Determine if the lambda logging configuration is set to output json logs
658        let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
659            .unwrap_or_default()
660            .to_uppercase()
661            == "JSON";
662
663        if is_json {
664            tracing::subscriber::set_global_default(
665                subscriber.with(
666                    tracing_subscriber::fmt::layer()
667                        .with_target(false)
668                        .without_time()
669                        .json(),
670                ),
671            )?;
672        } else {
673            tracing::subscriber::set_global_default(
674                subscriber.with(
675                    tracing_subscriber::fmt::layer()
676                        .with_target(false)
677                        .without_time()
678                        .with_ansi(false),
679                ),
680            )?;
681        }
682    } else {
683        tracing::subscriber::set_global_default(subscriber)?;
684    }
685
686    Ok((tracer, completion_handler))
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use opentelemetry::trace::{Span, Tracer};
693    use opentelemetry_aws::trace::XrayIdGenerator;
694    use opentelemetry_sdk::trace::SimpleSpanProcessor;
695    use sealed_test::prelude::*;
696    use std::sync::Arc;
697    use tokio::sync::mpsc;
698
699    // Helper to clean up environment variables between tests
700    fn cleanup_env() {
701        env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
702        env::remove_var(constants::env_vars::PROPAGATORS);
703        env::remove_var("_X_AMZN_TRACE_ID");
704    }
705
706    #[test]
707    #[sealed_test]
708    fn test_telemetry_config_defaults() {
709        cleanup_env();
710
711        let config = TelemetryConfig::builder().build();
712        assert!(config.set_global_provider); // Should be true by default
713        assert!(!config.has_processor);
714        assert!(!config.enable_fmt_layer);
715        assert!(config.propagators.is_empty()); // No propagators by default in builder
716    }
717
718    #[test]
719    #[sealed_test]
720    fn test_telemetry_config_with_propagators() {
721        cleanup_env();
722
723        // Test with explicit tracecontext propagator
724        let config = TelemetryConfig::builder()
725            .with_span_processor(SimpleSpanProcessor::new(Box::new(
726                OtlpStdoutSpanExporter::default(),
727            )))
728            .with_named_propagator("tracecontext")
729            .build();
730        assert_eq!(config.propagators.len(), 1);
731
732        // Test with explicit xray propagator
733        let config = TelemetryConfig::builder()
734            .with_named_propagator("xray")
735            .build();
736        assert_eq!(config.propagators.len(), 1);
737
738        // Test with both propagators
739        let config = TelemetryConfig::builder()
740            .with_named_propagator("tracecontext")
741            .with_named_propagator("xray")
742            .build();
743        assert_eq!(config.propagators.len(), 2);
744
745        // Test with default propagators (empty - will be set in init_telemetry)
746        let config = TelemetryConfig::builder().build();
747        assert_eq!(config.propagators.len(), 0);
748
749        // Test with none
750        let config = TelemetryConfig::builder()
751            .with_named_propagator("none")
752            .build();
753        assert_eq!(config.propagators.len(), 1);
754    }
755
756    #[tokio::test]
757    #[sealed_test]
758    async fn test_telemetry_config_env_propagators_tracecontext() {
759        cleanup_env();
760
761        // Test with OTEL_PROPAGATORS=tracecontext
762        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
763        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
764        // In real usage we'd check the behavior rather than implementation details
765        // So we'll just check that we can create and use a handler
766        assert!(handler.sender.is_none());
767
768        cleanup_env();
769    }
770
771    #[tokio::test]
772    #[sealed_test]
773    async fn test_telemetry_config_env_propagators_xray() {
774        cleanup_env();
775
776        // Test with OTEL_PROPAGATORS=xray
777        env::set_var(constants::env_vars::PROPAGATORS, "xray");
778        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
779        assert!(handler.sender.is_none());
780
781        cleanup_env();
782    }
783
784    #[tokio::test]
785    #[sealed_test]
786    async fn test_telemetry_config_env_propagators_combined() {
787        cleanup_env();
788
789        // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
790        env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
791        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
792        assert!(handler.sender.is_none());
793
794        cleanup_env();
795    }
796
797    #[tokio::test]
798    #[sealed_test]
799    async fn test_telemetry_config_env_propagators_none() {
800        cleanup_env();
801
802        // Test with OTEL_PROPAGATORS=none
803        env::set_var(constants::env_vars::PROPAGATORS, "none");
804        let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
805        assert!(handler.sender.is_none());
806
807        cleanup_env();
808    }
809
810    #[tokio::test]
811    #[sealed_test]
812    async fn test_init_telemetry_defaults() {
813        let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
814        assert!(completion_handler.sender.is_none()); // Default mode is Sync
815    }
816
817    #[tokio::test]
818    #[sealed_test]
819    async fn test_init_telemetry_custom() {
820        let resource = Resource::builder().build();
821        let config = TelemetryConfig::builder()
822            .resource(resource)
823            .with_named_propagator("tracecontext")
824            .enable_fmt_layer(true)
825            .set_global_provider(false)
826            .build();
827
828        let (_, completion_handler) = init_telemetry(config).await.unwrap();
829        assert!(completion_handler.sender.is_none());
830    }
831
832    #[tokio::test]
833    #[sealed_test]
834    async fn test_telemetry_config_env_fmt_layer_true_override() {
835        cleanup_env();
836
837        // Test: Env var "true" overrides code setting "false"
838        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
839        let config = TelemetryConfig::default(); // code setting is false by default
840        assert!(!config.enable_fmt_layer); // Config should not be affected by env var
841
842        // Initialize telemetry - env var should override config
843        let result = init_telemetry(config).await;
844        assert!(result.is_ok());
845
846        // Clean up
847        cleanup_env();
848    }
849
850    #[tokio::test]
851    #[sealed_test]
852    async fn test_telemetry_config_env_fmt_layer_false_override() {
853        cleanup_env();
854
855        // Test: Env var "false" overrides code setting "true"
856        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
857        let config = TelemetryConfig::builder()
858            .enable_fmt_layer(true) // code setting is true
859            .build();
860        assert!(config.enable_fmt_layer);
861
862        // Initialize telemetry - env var should override config
863        let result = init_telemetry(config).await;
864        assert!(result.is_ok());
865
866        // Clean up
867        cleanup_env();
868    }
869
870    #[tokio::test]
871    #[sealed_test]
872    async fn test_telemetry_config_env_fmt_layer_invalid() {
873        cleanup_env();
874
875        // Test: Invalid env var falls back to code setting
876        env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
877        let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
878
879        // Initialize telemetry - should log a warning but use code setting
880        let result = init_telemetry(config).await;
881        assert!(result.is_ok());
882
883        // Clean up
884        cleanup_env();
885    }
886
887    #[tokio::test]
888    #[sealed_test]
889    async fn test_telemetry_config_env_fmt_layer_not_set() {
890        cleanup_env();
891
892        // Test: No env var uses code setting
893        let config = TelemetryConfig::default();
894        assert!(!config.enable_fmt_layer);
895
896        let result = init_telemetry(config).await;
897        assert!(result.is_ok());
898
899        // Clean up
900        cleanup_env();
901    }
902
903    #[test]
904    fn test_completion_handler_sync_mode() {
905        let provider = Arc::new(
906            SdkTracerProvider::builder()
907                .with_span_processor(SimpleSpanProcessor::new(Box::new(
908                    OtlpStdoutSpanExporter::default(),
909                )))
910                .build(),
911        );
912
913        let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
914
915        // In sync mode, complete() should call force_flush
916        handler.complete();
917        // Note: We can't easily verify the flush was called since TracerProvider
918        // doesn't expose this information, but we can verify it doesn't panic
919    }
920
921    #[tokio::test]
922    async fn test_completion_handler_async_mode() {
923        let provider = Arc::new(
924            SdkTracerProvider::builder()
925                .with_span_processor(SimpleSpanProcessor::new(Box::new(
926                    OtlpStdoutSpanExporter::default(),
927                )))
928                .build(),
929        );
930
931        let (tx, mut rx) = mpsc::unbounded_channel();
932
933        let completion_handler =
934            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
935
936        // In async mode, complete() should send a message through the channel
937        completion_handler.complete();
938
939        // Verify that we received the completion signal
940        assert!(rx.try_recv().is_ok());
941        // Verify channel is now empty
942        assert!(rx.try_recv().is_err());
943    }
944
945    #[test]
946    fn test_completion_handler_finalize_mode() {
947        let provider = Arc::new(
948            SdkTracerProvider::builder()
949                .with_span_processor(SimpleSpanProcessor::new(Box::new(
950                    OtlpStdoutSpanExporter::default(),
951                )))
952                .build(),
953        );
954
955        let (tx, _rx) = mpsc::unbounded_channel();
956
957        let completion_handler =
958            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
959
960        // In finalize mode, complete() should do nothing
961        completion_handler.complete();
962        // Verify it doesn't panic or cause issues
963    }
964
965    #[test]
966    fn test_completion_handler_clone() {
967        let provider = Arc::new(
968            SdkTracerProvider::builder()
969                .with_span_processor(SimpleSpanProcessor::new(Box::new(
970                    OtlpStdoutSpanExporter::default(),
971                )))
972                .build(),
973        );
974
975        let (tx, _rx) = mpsc::unbounded_channel();
976
977        let completion_handler =
978            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
979
980        // Test that Clone is implemented correctly
981        let cloned = completion_handler.clone();
982
983        // Verify both handlers have the same mode
984        assert!(matches!(cloned.mode, ProcessorMode::Async));
985        assert!(cloned.sender.is_some());
986    }
987
988    #[test]
989    fn test_completion_handler_sync_mode_error_handling() {
990        let provider = Arc::new(
991            SdkTracerProvider::builder()
992                .with_span_processor(SimpleSpanProcessor::new(Box::new(
993                    OtlpStdoutSpanExporter::default(),
994                )))
995                .build(),
996        );
997
998        let completion_handler =
999            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
1000
1001        // Test that complete() doesn't panic
1002        completion_handler.complete();
1003    }
1004
1005    #[tokio::test]
1006    async fn test_completion_handler_async_mode_error_handling() {
1007        let provider = Arc::new(
1008            SdkTracerProvider::builder()
1009                .with_span_processor(SimpleSpanProcessor::new(Box::new(
1010                    OtlpStdoutSpanExporter::default(),
1011                )))
1012                .build(),
1013        );
1014
1015        // Use UnboundedSender instead of Sender
1016        let (tx, _rx) = mpsc::unbounded_channel();
1017        // Fill the channel by dropping the receiver
1018        drop(_rx);
1019
1020        let completion_handler =
1021            TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
1022
1023        // Test that complete() doesn't panic when receiver is dropped
1024        completion_handler.complete();
1025    }
1026
1027    #[test]
1028    #[sealed_test]
1029    fn test_telemetry_config_with_id_generator() {
1030        cleanup_env();
1031
1032        // Create a config with X-Ray ID generator
1033        let config = TelemetryConfig::builder()
1034            .with_id_generator(XrayIdGenerator::default())
1035            .build();
1036
1037        // We can't directly check the ID generator type since it's boxed inside the provider,
1038        // but we can verify it's applied by checking the generated trace IDs format
1039        let provider = Arc::new(config.provider_builder.build());
1040
1041        // Create a scope with attributes
1042        let scope = opentelemetry::InstrumentationScope::builder("test")
1043            .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
1044            .build();
1045
1046        // Get a tracer using the correct API
1047        let tracer = provider.tracer_with_scope(scope);
1048
1049        // Start a span using the tracer
1050        let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
1051        let trace_id = span.span_context().trace_id();
1052
1053        // Verify X-Ray trace ID format:
1054        // 1. Convert to hex string for easier checking
1055        let trace_id_hex = format!("{:032x}", trace_id);
1056
1057        // 2. The first 8 characters of X-Ray trace IDs represent a timestamp in seconds
1058        // This is the key characteristic of X-Ray trace IDs that we can verify
1059        let timestamp_part = &trace_id_hex[0..8];
1060
1061        // 3. Parse the hex timestamp to ensure it's a valid timestamp (recent past)
1062        let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
1063
1064        // 4. Check that timestamp is reasonable (within the last day)
1065        let now = std::time::SystemTime::now()
1066            .duration_since(std::time::UNIX_EPOCH)
1067            .unwrap()
1068            .as_secs() as u32;
1069
1070        // The timestamp should be within the last day
1071        assert!(timestamp <= now);
1072        assert!(timestamp > now - 86400); // Within the last day
1073
1074        // Verify remaining 24 characters are not all zeros (random part)
1075        let random_part = &trace_id_hex[8..];
1076        assert_ne!(random_part, "000000000000000000000000");
1077    }
1078}
1079
1080// A simple no-op propagator
1081#[derive(Debug)]
1082struct NoopPropagator;
1083
1084impl NoopPropagator {
1085    fn new() -> Self {
1086        NoopPropagator
1087    }
1088}
1089
1090impl TextMapPropagator for NoopPropagator {
1091    fn inject_context(
1092        &self,
1093        _cx: &opentelemetry::Context,
1094        _injector: &mut dyn opentelemetry::propagation::Injector,
1095    ) {
1096    }
1097
1098    fn extract_with_context(
1099        &self,
1100        cx: &opentelemetry::Context,
1101        _extractor: &dyn opentelemetry::propagation::Extractor,
1102    ) -> opentelemetry::Context {
1103        cx.clone()
1104    }
1105
1106    fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
1107        opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
1108    }
1109}