lambda_otel_lite/telemetry.rs
1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//! let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//! Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//! let resource = Resource::builder()
47//! .with_attributes(vec![
48//! KeyValue::new("service.version", "1.0.0"),
49//! KeyValue::new("deployment.environment", "production"),
50//! ])
51//! .build();
52//!
53//! let config = TelemetryConfig::builder()
54//! .resource(resource)
55//! .build();
56//!
57//! let (_, completion_handler) = init_telemetry(config).await?;
58//! Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//! let config = TelemetryConfig::builder()
72//! .with_span_processor(SimpleSpanProcessor::new(
73//! OtlpStdoutSpanExporter::default()
74//! ))
75//! .enable_fmt_layer(true)
76//! .build();
77//!
78//! let (_, completion_handler) = init_telemetry(config).await?;
79//! Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95 constants, extension::register_extension, logger::Logger, mode::ProcessorMode,
96 processor::LambdaSpanProcessor, propagation::LambdaXrayPropagator,
97 resource::get_lambda_resource,
98};
99use bon::Builder;
100use lambda_runtime::Error;
101use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
102use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
103use opentelemetry_aws::trace::XrayPropagator;
104use opentelemetry_sdk::{
105 propagation::TraceContextPropagator,
106 trace::{IdGenerator, SdkTracerProvider, ShouldSample, SpanProcessor, TracerProviderBuilder},
107 Resource,
108};
109use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
110use std::{borrow::Cow, env, sync::Arc};
111use tokio::sync::mpsc::UnboundedSender;
112use tracing_subscriber::layer::SubscriberExt;
113
114// Add module-specific logger
115static LOGGER: Logger = Logger::const_new("telemetry");
116
117/// Manages the lifecycle of span export based on the processing mode.
118///
119/// This handler must be used to signal when spans should be exported. Its behavior
120/// varies by processing mode:
121/// - Sync: Forces immediate export
122/// - Async: Signals the extension to export
123/// - Finalize: Defers to span processor
124///
125/// # Thread Safety
126///
127/// This type is `Clone` and can be safely shared between threads.
128#[derive(Clone)]
129pub struct TelemetryCompletionHandler {
130 provider: Arc<SdkTracerProvider>,
131 sender: Option<UnboundedSender<()>>,
132 mode: ProcessorMode,
133 tracer: opentelemetry_sdk::trace::Tracer,
134}
135
136impl TelemetryCompletionHandler {
137 pub fn new(
138 provider: Arc<SdkTracerProvider>,
139 sender: Option<UnboundedSender<()>>,
140 mode: ProcessorMode,
141 ) -> Self {
142 // Create instrumentation scope with attributes
143 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
144 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
145 .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
146 .with_attributes(vec![
147 KeyValue::new("library.language", "rust"),
148 KeyValue::new("library.type", "instrumentation"),
149 KeyValue::new("library.runtime", "aws_lambda"),
150 ])
151 .build();
152
153 // Create tracer with instrumentation scope
154 let tracer = provider.tracer_with_scope(scope);
155
156 Self {
157 provider,
158 sender,
159 mode,
160 tracer,
161 }
162 }
163
164 /// Get the tracer instance for creating spans.
165 ///
166 /// Returns the cached tracer instance configured with this package's instrumentation scope.
167 /// The tracer is configured with the provider's settings and will automatically use
168 /// the correct span processor based on the processing mode.
169 pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
170 &self.tracer
171 }
172
173 /// Complete telemetry processing for the current invocation
174 ///
175 /// In Sync mode, this will force flush the provider and log any errors that occur.
176 /// In Async mode, this will send a completion signal to the extension.
177 /// In Finalize mode, this will do nothing (handled by drop).
178 pub fn complete(&self) {
179 match self.mode {
180 ProcessorMode::Sync => {
181 if let Err(e) = self.provider.force_flush() {
182 LOGGER.warn(format!("Error flushing telemetry: {e:?}"));
183 }
184 }
185 ProcessorMode::Async => {
186 if let Some(sender) = &self.sender {
187 if let Err(e) = sender.send(()) {
188 LOGGER.warn(format!(
189 "Failed to send completion signal to extension: {e:?}"
190 ));
191 }
192 }
193 }
194 ProcessorMode::Finalize => {
195 // Do nothing, handled by drop
196 }
197 }
198 }
199}
200
201/// Configuration for OpenTelemetry initialization.
202///
203/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
204/// for standard Lambda configuration, or the builder pattern for customization.
205///
206/// # Fields
207///
208/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
209/// * `set_global_provider` - Set as global tracer provider (default: true)
210/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
211/// * `env_var_name` - Environment variable name for log level configuration
212/// * `id_generator` - Custom ID generator for trace and span IDs
213/// * `processor_mode` - Span processing mode (sync/async/finalize)
214///
215/// # Examples
216///
217/// Basic usage with default configuration:
218///
219/// ```no_run
220/// use lambda_otel_lite::telemetry::TelemetryConfig;
221///
222/// let config = TelemetryConfig::default();
223/// ```
224///
225/// Custom configuration with resource attributes:
226///
227/// ```no_run
228/// use lambda_otel_lite::telemetry::TelemetryConfig;
229/// use opentelemetry::KeyValue;
230/// use opentelemetry_sdk::Resource;
231///
232/// let config = TelemetryConfig::builder()
233/// .resource(Resource::builder()
234/// .with_attributes(vec![KeyValue::new("version", "1.0.0")])
235/// .build())
236/// .build();
237/// ```
238///
239/// Custom configuration with logging options:
240///
241/// ```no_run
242/// use lambda_otel_lite::telemetry::TelemetryConfig;
243///
244/// let config = TelemetryConfig::builder()
245/// .enable_fmt_layer(true) // Enable console output for debugging
246/// .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string()) // Custom env var for log level
247/// .build();
248/// ```
249#[derive(Builder, Debug)]
250pub struct TelemetryConfig {
251 // Custom fields for internal state
252 #[builder(field)]
253 provider_builder: TracerProviderBuilder,
254
255 #[builder(field)]
256 has_processor: bool,
257
258 #[builder(field)]
259 propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
260
261 /// Enable console output for debugging.
262 ///
263 /// When enabled, spans and events will be printed to the console in addition
264 /// to being exported through the configured span processors. This is useful
265 /// for debugging but adds overhead and should be disabled in production.
266 ///
267 /// This can also be controlled via the `LAMBDA_TRACING_ENABLE_FMT_LAYER` environment variable,
268 /// which takes precedence over this setting when present:
269 /// - Setting the env var to "true" will enable console output even if this field is false
270 /// - Setting the env var to "false" will disable console output even if this field is true
271 /// - Invalid values will log a warning and fall back to this code setting
272 ///
273 /// This environment variable override allows toggling logging for debugging without code changes.
274 ///
275 /// Default: `false`
276 #[builder(default = false)]
277 pub enable_fmt_layer: bool,
278
279 /// Set this provider as the global OpenTelemetry provider.
280 ///
281 /// When enabled, the provider will be registered as the global provider
282 /// for the OpenTelemetry API. This allows using the global tracer API
283 /// without explicitly passing around the provider.
284 ///
285 /// Default: `true`
286 #[builder(default = true)]
287 pub set_global_provider: bool,
288
289 /// Custom resource attributes for all spans.
290 ///
291 /// If not provided, resource attributes will be automatically detected
292 /// from the Lambda environment. Custom resources will override any
293 /// automatically detected attributes with the same keys.
294 ///
295 /// Default: `None` (auto-detected from Lambda environment)
296 pub resource: Option<Resource>,
297
298 /// Environment variable name to use for log level configuration.
299 ///
300 /// This field specifies which environment variable should be used to configure
301 /// the tracing subscriber's log level filter. If not specified, the system will
302 /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
303 ///
304 /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
305 pub env_var_name: Option<String>,
306
307 /// Span processing mode (sync/async/finalize)
308 ///
309 /// Controls how spans are exported from the processor. This can be overridden by the
310 /// LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE environment variable, which takes precedence.
311 ///
312 /// Default: `None` (uses environment variable or defaults to `ProcessorMode::Sync`)
313 pub processor_mode: Option<ProcessorMode>,
314}
315
316impl Default for TelemetryConfig {
317 fn default() -> Self {
318 Self::builder().build()
319 }
320}
321
322/// Builder methods for adding span processors and other configuration
323impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
324 /// Add a span processor to the tracer provider.
325 ///
326 /// This method allows adding custom span processors for trace data processing.
327 /// Multiple processors can be added by calling this method multiple times.
328 ///
329 /// # Arguments
330 ///
331 /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
332 ///
333 /// # Examples
334 ///
335 /// ```no_run
336 /// use lambda_otel_lite::TelemetryConfig;
337 /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
338 /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
339 ///
340 /// // Only use builder when adding custom processors
341 /// let config = TelemetryConfig::builder()
342 /// .with_span_processor(SimpleSpanProcessor::new(
343 /// OtlpStdoutSpanExporter::default()
344 /// ))
345 /// .build();
346 /// ```
347 pub fn with_span_processor<T>(mut self, processor: T) -> Self
348 where
349 T: SpanProcessor + 'static,
350 {
351 self.provider_builder = self.provider_builder.with_span_processor(processor);
352 self.has_processor = true;
353 self
354 }
355
356 /// Add a propagator to the list of propagators.
357 ///
358 /// Multiple propagators can be added and will be combined into a composite propagator.
359 /// The default propagator is TraceContextPropagator.
360 ///
361 /// # Arguments
362 ///
363 /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
364 ///
365 /// # Examples
366 ///
367 /// ```no_run
368 /// use lambda_otel_lite::TelemetryConfig;
369 /// use opentelemetry_sdk::propagation::BaggagePropagator;
370 ///
371 /// let config = TelemetryConfig::builder()
372 /// .with_propagator(BaggagePropagator::new())
373 /// .build();
374 /// ```
375 pub fn with_propagator<T>(mut self, propagator: T) -> Self
376 where
377 T: TextMapPropagator + Send + Sync + 'static,
378 {
379 self.propagators.push(Box::new(propagator));
380 self
381 }
382
383 pub fn with_named_propagator(self, name: &str) -> Self {
384 match name {
385 "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
386 "xray" => self.with_propagator(XrayPropagator::new()),
387 "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
388 "none" => self.with_propagator(NoopPropagator::new()),
389 _ => {
390 LOGGER.warn(format!(
391 "Unknown propagator: {name}, using default propagators"
392 ));
393 self
394 }
395 }
396 }
397
398 /// Add a custom ID generator to the tracer provider.
399 ///
400 /// This method allows setting a custom ID generator for trace and span IDs.
401 /// This is particularly useful when integrating with AWS X-Ray, which requires
402 /// a specific ID format.
403 ///
404 /// # Arguments
405 ///
406 /// * `id_generator` - An ID generator implementing the [`IdGenerator`] trait
407 ///
408 /// # Examples
409 ///
410 /// ```no_run
411 /// use lambda_otel_lite::TelemetryConfig;
412 /// use opentelemetry_aws::trace::XrayIdGenerator;
413 ///
414 /// // Configure with X-Ray compatible ID generator
415 /// let config = TelemetryConfig::builder()
416 /// .with_id_generator(XrayIdGenerator::default())
417 /// .build();
418 /// ```
419 pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
420 where
421 T: IdGenerator + 'static,
422 {
423 self.provider_builder = self.provider_builder.with_id_generator(id_generator);
424 self
425 }
426
427 /// Add a custom sampler to the tracer provider.
428 ///
429 /// This method allows setting a custom sampler for trace sampling decisions.
430 /// Common samplers include AlwaysOn, AlwaysOff, TraceIdRatioBased, and ParentBased.
431 ///
432 /// # Arguments
433 ///
434 /// * `sampler` - A sampler implementing the [`ShouldSample`] trait
435 ///
436 /// # Examples
437 ///
438 /// ```no_run
439 /// use lambda_otel_lite::TelemetryConfig;
440 /// use opentelemetry_sdk::trace::Sampler;
441 ///
442 /// // Sample all traces
443 /// let config = TelemetryConfig::builder()
444 /// .with_sampler(Sampler::AlwaysOn)
445 /// .build();
446 ///
447 /// // Sample 10% of traces
448 /// let config = TelemetryConfig::builder()
449 /// .with_sampler(Sampler::TraceIdRatioBased(0.1))
450 /// .build();
451 /// ```
452 pub fn with_sampler<T>(mut self, sampler: T) -> Self
453 where
454 T: ShouldSample + 'static,
455 {
456 self.provider_builder = self.provider_builder.with_sampler(sampler);
457 self
458 }
459}
460
461/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
462///
463/// # Arguments
464///
465/// * `config` - Configuration for telemetry initialization
466///
467/// # Returns
468///
469/// Returns a tuple containing:
470/// - A tracer instance for manual instrumentation
471/// - A completion handler for managing span export timing
472///
473/// # Errors
474///
475/// Returns error if:
476/// - Extension registration fails (async/finalize modes)
477/// - Tracer provider initialization fails
478/// - Environment variable parsing fails
479///
480/// # Examples
481///
482/// Basic usage with default configuration:
483///
484/// ```no_run
485/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
486///
487/// # async fn example() -> Result<(), lambda_runtime::Error> {
488/// // Initialize with default configuration
489/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
490/// # Ok(())
491/// # }
492/// ```
493///
494/// Custom configuration:
495///
496/// ```no_run
497/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
498/// use opentelemetry::KeyValue;
499/// use opentelemetry_sdk::Resource;
500///
501/// # async fn example() -> Result<(), lambda_runtime::Error> {
502/// // Create custom resource
503/// let resource = Resource::builder()
504/// .with_attributes(vec![
505/// KeyValue::new("service.name", "payment-api"),
506/// KeyValue::new("service.version", "1.2.3"),
507/// ])
508/// .build();
509///
510/// // Initialize with custom configuration
511/// let (_, telemetry) = init_telemetry(
512/// TelemetryConfig::builder()
513/// .resource(resource)
514/// .build()
515/// ).await?;
516/// # Ok(())
517/// # }
518/// ```
519///
520/// Advanced usage with BatchSpanProcessor (required for async exporters):
521///
522/// ```no_run
523/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
524/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
525/// use opentelemetry_sdk::trace::BatchSpanProcessor;
526/// use lambda_runtime::Error;
527///
528/// # async fn example() -> Result<(), Error> {
529/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
530/// .with_http()
531/// .with_http_client(reqwest::Client::new())
532/// .with_protocol(Protocol::HttpBinary)
533/// .build()?;
534///
535/// let (provider, completion) = init_telemetry(
536/// TelemetryConfig::builder()
537/// .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
538/// .build()
539/// ).await?;
540/// # Ok(())
541/// # }
542/// ```
543///
544/// Using LambdaSpanProcessor with blocking http client:
545///
546/// ```no_run
547/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
548/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
549/// use lambda_runtime::Error;
550///
551/// # async fn example() -> Result<(), Error> {
552/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
553/// .with_http()
554/// .with_http_client(reqwest::blocking::Client::new())
555/// .with_protocol(Protocol::HttpBinary)
556/// .build()?;
557///
558/// let (provider, completion) = init_telemetry(
559/// TelemetryConfig::builder()
560/// .with_span_processor(
561/// LambdaSpanProcessor::builder()
562/// .exporter(lambda_exporter)
563/// .max_queue_size(2048)
564/// .build()
565/// )
566/// .build()
567/// ).await?;
568/// # Ok(())
569/// # }
570/// ```
571///
572pub async fn init_telemetry(
573 mut config: TelemetryConfig,
574) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
575 // Get mode from config or environment with environment taking precedence
576 let mode = ProcessorMode::resolve(config.processor_mode);
577
578 if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
579 let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
580
581 for propagator in propagators {
582 match propagator {
583 "tracecontext" => config
584 .propagators
585 .push(Box::new(TraceContextPropagator::new())),
586 "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
587 "xray-lambda" => config
588 .propagators
589 .push(Box::new(LambdaXrayPropagator::new())),
590 "none" => config.propagators.push(Box::new(NoopPropagator::new())),
591 _ => LOGGER.warn(format!(
592 "Unknown propagator: {propagator}, using default propagators"
593 )),
594 }
595 }
596 } else {
597 // if no propagators are set, use the default propagators
598 if config.propagators.is_empty() {
599 // IMPORTANT:
600 // LambdaXrayPropagator is added *before* TraceContextPropagator
601 // because in OpenTelemetry Rust, the *last* propagator that extracts
602 // a valid context wins during extraction.
603 // This ensures that if both an AWS X-Ray header (or _X_AMZN_TRACE_ID)
604 // and a W3C traceparent header are present, the W3C traceparent takes precedence.
605 config
606 .propagators
607 .push(Box::new(LambdaXrayPropagator::new()));
608 config
609 .propagators
610 .push(Box::new(TraceContextPropagator::new()));
611 }
612 }
613
614 let composite_propagator = TextMapCompositePropagator::new(config.propagators);
615 global::set_text_map_propagator(composite_propagator);
616
617 // Add default span processor if none was added
618 if !config.has_processor {
619 let processor = LambdaSpanProcessor::builder()
620 .exporter(OtlpStdoutSpanExporter::default())
621 .build();
622 config.provider_builder = config.provider_builder.with_span_processor(processor);
623 }
624
625 // Apply defaults and build the provider
626 let resource = config.resource.unwrap_or_else(get_lambda_resource);
627
628 let provider = Arc::new(config.provider_builder.with_resource(resource).build());
629
630 // Register the extension if in async or finalize mode
631 let sender = match mode {
632 ProcessorMode::Async | ProcessorMode::Finalize => {
633 Some(register_extension(provider.clone(), mode.clone()).await?)
634 }
635 _ => None,
636 };
637
638 if config.set_global_provider {
639 // Set the provider as global
640 set_tracer_provider(provider.as_ref().clone());
641 }
642
643 // Initialize tracing subscriber with smart env var selection
644 let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
645 if env::var("RUST_LOG").is_ok() {
646 "RUST_LOG"
647 } else {
648 "AWS_LAMBDA_LOG_LEVEL"
649 }
650 });
651
652 let env_filter = tracing_subscriber::EnvFilter::builder()
653 .with_env_var(env_var_name)
654 .from_env_lossy();
655
656 let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
657 let tracer = completion_handler.get_tracer().clone();
658
659 let subscriber = tracing_subscriber::registry::Registry::default()
660 .with(tracing_opentelemetry::OpenTelemetryLayer::new(
661 tracer.clone(),
662 ))
663 .with(env_filter);
664
665 // Determine if fmt layer should be enabled - environment variable takes precedence when set
666 let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
667 match env_value.to_lowercase().as_str() {
668 "true" => true,
669 "false" => false,
670 other => {
671 LOGGER.warn(format!(
672 "Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
673 other,
674 constants::env_vars::ENABLE_FMT_LAYER
675 ));
676 config.enable_fmt_layer
677 }
678 }
679 } else {
680 // If env var not set, use the configured value
681 config.enable_fmt_layer
682 };
683
684 // Enable fmt layer based on the determined value
685 if enable_fmt {
686 // Determine if the lambda logging configuration is set to output json logs
687 let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
688 .unwrap_or_default()
689 .to_uppercase()
690 == "JSON";
691
692 if is_json {
693 tracing::subscriber::set_global_default(
694 subscriber.with(
695 tracing_subscriber::fmt::layer()
696 .with_target(false)
697 .without_time()
698 .json(),
699 ),
700 )?;
701 } else {
702 tracing::subscriber::set_global_default(
703 subscriber.with(
704 tracing_subscriber::fmt::layer()
705 .with_target(false)
706 .without_time()
707 .with_ansi(false),
708 ),
709 )?;
710 }
711 } else {
712 tracing::subscriber::set_global_default(subscriber)?;
713 }
714
715 Ok((tracer, completion_handler))
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721 use opentelemetry::trace::{Span, Tracer};
722 use opentelemetry_aws::trace::XrayIdGenerator;
723 use opentelemetry_sdk::trace::{Sampler, SimpleSpanProcessor};
724 use sealed_test::prelude::*;
725 use std::sync::Arc;
726 use tokio::sync::mpsc;
727
728 // Helper to clean up environment variables between tests
729 fn cleanup_env() {
730 env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
731 env::remove_var(constants::env_vars::PROPAGATORS);
732 env::remove_var("_X_AMZN_TRACE_ID");
733 }
734
735 #[test]
736 #[sealed_test]
737 fn test_telemetry_config_defaults() {
738 cleanup_env();
739
740 let config = TelemetryConfig::builder().build();
741 assert!(config.set_global_provider); // Should be true by default
742 assert!(!config.has_processor);
743 assert!(!config.enable_fmt_layer);
744 assert!(config.propagators.is_empty()); // No propagators by default in builder
745 }
746
747 #[test]
748 #[sealed_test]
749 fn test_telemetry_config_with_propagators() {
750 cleanup_env();
751
752 // Test with explicit tracecontext propagator
753 let config = TelemetryConfig::builder()
754 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
755 .with_named_propagator("tracecontext")
756 .build();
757 assert_eq!(config.propagators.len(), 1);
758
759 // Test with explicit xray propagator
760 let config = TelemetryConfig::builder()
761 .with_named_propagator("xray")
762 .build();
763 assert_eq!(config.propagators.len(), 1);
764
765 // Test with both propagators
766 let config = TelemetryConfig::builder()
767 .with_named_propagator("tracecontext")
768 .with_named_propagator("xray")
769 .build();
770 assert_eq!(config.propagators.len(), 2);
771
772 // Test with default propagators (empty - will be set in init_telemetry)
773 let config = TelemetryConfig::builder().build();
774 assert_eq!(config.propagators.len(), 0);
775
776 // Test with none
777 let config = TelemetryConfig::builder()
778 .with_named_propagator("none")
779 .build();
780 assert_eq!(config.propagators.len(), 1);
781 }
782
783 #[tokio::test]
784 #[sealed_test]
785 async fn test_telemetry_config_env_propagators_tracecontext() {
786 cleanup_env();
787
788 // Test with OTEL_PROPAGATORS=tracecontext
789 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
790 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
791 // In real usage we'd check the behavior rather than implementation details
792 // So we'll just check that we can create and use a handler
793 assert!(handler.sender.is_none());
794
795 cleanup_env();
796 }
797
798 #[tokio::test]
799 #[sealed_test]
800 async fn test_telemetry_config_env_propagators_xray() {
801 cleanup_env();
802
803 // Test with OTEL_PROPAGATORS=xray
804 env::set_var(constants::env_vars::PROPAGATORS, "xray");
805 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
806 assert!(handler.sender.is_none());
807
808 cleanup_env();
809 }
810
811 #[tokio::test]
812 #[sealed_test]
813 async fn test_telemetry_config_env_propagators_combined() {
814 cleanup_env();
815
816 // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
817 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
818 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
819 assert!(handler.sender.is_none());
820
821 cleanup_env();
822 }
823
824 #[tokio::test]
825 #[sealed_test]
826 async fn test_telemetry_config_env_propagators_none() {
827 cleanup_env();
828
829 // Test with OTEL_PROPAGATORS=none
830 env::set_var(constants::env_vars::PROPAGATORS, "none");
831 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
832 assert!(handler.sender.is_none());
833
834 cleanup_env();
835 }
836
837 #[tokio::test]
838 #[sealed_test]
839 async fn test_init_telemetry_defaults() {
840 let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
841 assert!(completion_handler.sender.is_none()); // Default mode is Sync
842 }
843
844 #[tokio::test]
845 #[sealed_test]
846 async fn test_init_telemetry_custom() {
847 let resource = Resource::builder().build();
848 let config = TelemetryConfig::builder()
849 .resource(resource)
850 .with_named_propagator("tracecontext")
851 .enable_fmt_layer(true)
852 .set_global_provider(false)
853 .build();
854
855 let (_, completion_handler) = init_telemetry(config).await.unwrap();
856 assert!(completion_handler.sender.is_none());
857 }
858
859 #[tokio::test]
860 #[sealed_test]
861 async fn test_telemetry_config_env_fmt_layer_true_override() {
862 cleanup_env();
863
864 // Test: Env var "true" overrides code setting "false"
865 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
866 let config = TelemetryConfig::default(); // code setting is false by default
867 assert!(!config.enable_fmt_layer); // Config should not be affected by env var
868
869 // Initialize telemetry - env var should override config
870 let result = init_telemetry(config).await;
871 assert!(result.is_ok());
872
873 // Clean up
874 cleanup_env();
875 }
876
877 #[tokio::test]
878 #[sealed_test]
879 async fn test_telemetry_config_env_fmt_layer_false_override() {
880 cleanup_env();
881
882 // Test: Env var "false" overrides code setting "true"
883 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
884 let config = TelemetryConfig::builder()
885 .enable_fmt_layer(true) // code setting is true
886 .build();
887 assert!(config.enable_fmt_layer);
888
889 // Initialize telemetry - env var should override config
890 let result = init_telemetry(config).await;
891 assert!(result.is_ok());
892
893 // Clean up
894 cleanup_env();
895 }
896
897 #[tokio::test]
898 #[sealed_test]
899 async fn test_telemetry_config_env_fmt_layer_invalid() {
900 cleanup_env();
901
902 // Test: Invalid env var falls back to code setting
903 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
904 let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
905
906 // Initialize telemetry - should log a warning but use code setting
907 let result = init_telemetry(config).await;
908 assert!(result.is_ok());
909
910 // Clean up
911 cleanup_env();
912 }
913
914 #[tokio::test]
915 #[sealed_test]
916 async fn test_telemetry_config_env_fmt_layer_not_set() {
917 cleanup_env();
918
919 // Test: No env var uses code setting
920 let config = TelemetryConfig::default();
921 assert!(!config.enable_fmt_layer);
922
923 let result = init_telemetry(config).await;
924 assert!(result.is_ok());
925
926 // Clean up
927 cleanup_env();
928 }
929
930 #[test]
931 fn test_completion_handler_sync_mode() {
932 let provider = Arc::new(
933 SdkTracerProvider::builder()
934 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
935 .build(),
936 );
937
938 let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
939
940 // In sync mode, complete() should call force_flush
941 handler.complete();
942 // Note: We can't easily verify the flush was called since TracerProvider
943 // doesn't expose this information, but we can verify it doesn't panic
944 }
945
946 #[tokio::test]
947 async fn test_completion_handler_async_mode() {
948 let provider = Arc::new(
949 SdkTracerProvider::builder()
950 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
951 .build(),
952 );
953
954 let (tx, mut rx) = mpsc::unbounded_channel();
955
956 let completion_handler =
957 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
958
959 // In async mode, complete() should send a message through the channel
960 completion_handler.complete();
961
962 // Verify that we received the completion signal
963 assert!(rx.try_recv().is_ok());
964 // Verify channel is now empty
965 assert!(rx.try_recv().is_err());
966 }
967
968 #[test]
969 fn test_completion_handler_finalize_mode() {
970 let provider = Arc::new(
971 SdkTracerProvider::builder()
972 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
973 .build(),
974 );
975
976 let (tx, _rx) = mpsc::unbounded_channel();
977
978 let completion_handler =
979 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
980
981 // In finalize mode, complete() should do nothing
982 completion_handler.complete();
983 // Verify it doesn't panic or cause issues
984 }
985
986 #[test]
987 fn test_completion_handler_clone() {
988 let provider = Arc::new(
989 SdkTracerProvider::builder()
990 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
991 .build(),
992 );
993
994 let (tx, _rx) = mpsc::unbounded_channel();
995
996 let completion_handler =
997 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
998
999 // Test that Clone is implemented correctly
1000 let cloned = completion_handler.clone();
1001
1002 // Verify both handlers have the same mode
1003 assert!(matches!(cloned.mode, ProcessorMode::Async));
1004 assert!(cloned.sender.is_some());
1005 }
1006
1007 #[test]
1008 fn test_completion_handler_sync_mode_error_handling() {
1009 let provider = Arc::new(
1010 SdkTracerProvider::builder()
1011 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
1012 .build(),
1013 );
1014
1015 let completion_handler =
1016 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
1017
1018 // Test that complete() doesn't panic
1019 completion_handler.complete();
1020 }
1021
1022 #[tokio::test]
1023 async fn test_completion_handler_async_mode_error_handling() {
1024 let provider = Arc::new(
1025 SdkTracerProvider::builder()
1026 .with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
1027 .build(),
1028 );
1029
1030 // Use UnboundedSender instead of Sender
1031 let (tx, _rx) = mpsc::unbounded_channel();
1032 // Fill the channel by dropping the receiver
1033 drop(_rx);
1034
1035 let completion_handler =
1036 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
1037
1038 // Test that complete() doesn't panic when receiver is dropped
1039 completion_handler.complete();
1040 }
1041
1042 #[test]
1043 #[sealed_test]
1044 fn test_telemetry_config_with_id_generator() {
1045 cleanup_env();
1046
1047 // Create a config with X-Ray ID generator
1048 let config = TelemetryConfig::builder()
1049 .with_id_generator(XrayIdGenerator::default())
1050 .build();
1051
1052 // We can't directly check the ID generator type since it's boxed inside the provider,
1053 // but we can verify it's applied by checking the generated trace IDs format
1054 let provider = Arc::new(config.provider_builder.build());
1055
1056 // Create a scope with attributes
1057 let scope = opentelemetry::InstrumentationScope::builder("test")
1058 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
1059 .build();
1060
1061 // Get a tracer using the correct API
1062 let tracer = provider.tracer_with_scope(scope);
1063
1064 // Start a span using the tracer
1065 let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
1066 let trace_id = span.span_context().trace_id();
1067
1068 // Verify X-Ray trace ID format:
1069 // 1. Convert to hex string for easier checking
1070 let trace_id_hex = format!("{trace_id:032x}");
1071
1072 // 2. The first 8 characters of X-Ray trace IDs represent a timestamp in seconds
1073 // This is the key characteristic of X-Ray trace IDs that we can verify
1074 let timestamp_part = &trace_id_hex[0..8];
1075
1076 // 3. Parse the hex timestamp to ensure it's a valid timestamp (recent past)
1077 let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
1078
1079 // 4. Check that timestamp is reasonable (within the last day)
1080 let now = std::time::SystemTime::now()
1081 .duration_since(std::time::UNIX_EPOCH)
1082 .unwrap()
1083 .as_secs() as u32;
1084
1085 // The timestamp should be within the last day
1086 assert!(timestamp <= now);
1087 assert!(timestamp > now - 86400); // Within the last day
1088
1089 // Verify remaining 24 characters are not all zeros (random part)
1090 let random_part = &trace_id_hex[8..];
1091 assert_ne!(random_part, "000000000000000000000000");
1092 }
1093
1094 #[test]
1095 #[sealed_test]
1096 fn test_telemetry_config_with_sampler() {
1097 cleanup_env();
1098
1099 // Test with AlwaysOn sampler
1100 let config = TelemetryConfig::builder()
1101 .with_sampler(Sampler::AlwaysOn)
1102 .build();
1103
1104 // Test with TraceIdRatioBased sampler
1105 let config = TelemetryConfig::builder()
1106 .with_sampler(Sampler::TraceIdRatioBased(0.1))
1107 .build();
1108
1109 // Test with ParentBased sampler
1110 let config = TelemetryConfig::builder()
1111 .with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)))
1112 .build();
1113
1114 // Verify configurations don't panic
1115 let _provider = config.provider_builder.build();
1116 }
1117
1118 #[test]
1119 #[sealed_test]
1120 fn test_telemetry_config_with_standard_samplers() {
1121 cleanup_env();
1122
1123 // Test with standard SDK samplers using with_sampler
1124 let config = TelemetryConfig::builder()
1125 .with_sampler(Sampler::AlwaysOn)
1126 .build();
1127
1128 let config = TelemetryConfig::builder()
1129 .with_sampler(Sampler::AlwaysOff)
1130 .build();
1131
1132 let config = TelemetryConfig::builder()
1133 .with_sampler(Sampler::TraceIdRatioBased(0.5))
1134 .build();
1135
1136 let config = TelemetryConfig::builder()
1137 .with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)))
1138 .build();
1139
1140 // Verify configurations don't panic
1141 let _provider = config.provider_builder.build();
1142 }
1143
1144 #[tokio::test]
1145 #[sealed_test]
1146 async fn test_telemetry_config_env_sampler() {
1147 cleanup_env();
1148
1149 // Test that SDK environment variable sampler configuration works
1150 // The SDK automatically handles OTEL_TRACES_SAMPLER and OTEL_TRACES_SAMPLER_ARG
1151 env::set_var("OTEL_TRACES_SAMPLER", "always_on");
1152 let config = TelemetryConfig::builder()
1153 .set_global_provider(false) // Don't set global to avoid conflicts
1154 .build();
1155 let (_, _) = init_telemetry(config).await.unwrap();
1156
1157 // Clean up
1158 cleanup_env();
1159 }
1160}
1161
1162// A simple no-op propagator
1163#[derive(Debug)]
1164struct NoopPropagator;
1165
1166impl NoopPropagator {
1167 fn new() -> Self {
1168 NoopPropagator
1169 }
1170}
1171
1172impl TextMapPropagator for NoopPropagator {
1173 fn inject_context(
1174 &self,
1175 _cx: &opentelemetry::Context,
1176 _injector: &mut dyn opentelemetry::propagation::Injector,
1177 ) {
1178 }
1179
1180 fn extract_with_context(
1181 &self,
1182 cx: &opentelemetry::Context,
1183 _extractor: &dyn opentelemetry::propagation::Extractor,
1184 ) -> opentelemetry::Context {
1185 cx.clone()
1186 }
1187
1188 fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
1189 opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
1190 }
1191}