lambda_otel_lite/telemetry.rs
1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//! let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//! Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//! let resource = Resource::builder()
47//! .with_attributes(vec![
48//! KeyValue::new("service.version", "1.0.0"),
49//! KeyValue::new("deployment.environment", "production"),
50//! ])
51//! .build();
52//!
53//! let config = TelemetryConfig::builder()
54//! .resource(resource)
55//! .build();
56//!
57//! let (_, completion_handler) = init_telemetry(config).await?;
58//! Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//! let config = TelemetryConfig::builder()
72//! .with_span_processor(SimpleSpanProcessor::new(
73//! Box::new(OtlpStdoutSpanExporter::default())
74//! ))
75//! .enable_fmt_layer(true)
76//! .build();
77//!
78//! let (_, completion_handler) = init_telemetry(config).await?;
79//! Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::{
95 constants, extension::register_extension, mode::ProcessorMode, processor::LambdaSpanProcessor,
96 propagation::LambdaXrayPropagator, resource::get_lambda_resource,
97};
98use bon::Builder;
99use lambda_runtime::Error;
100use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
101use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
102use opentelemetry_aws::trace::XrayPropagator;
103use opentelemetry_sdk::{
104 propagation::TraceContextPropagator,
105 trace::{IdGenerator, SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
106 Resource,
107};
108use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
109use std::{borrow::Cow, env, sync::Arc};
110use tokio::sync::mpsc::UnboundedSender;
111use tracing_subscriber::layer::SubscriberExt;
112
113/// Manages the lifecycle of span export based on the processing mode.
114///
115/// This handler must be used to signal when spans should be exported. Its behavior
116/// varies by processing mode:
117/// - Sync: Forces immediate export
118/// - Async: Signals the extension to export
119/// - Finalize: Defers to span processor
120///
121/// # Thread Safety
122///
123/// This type is `Clone` and can be safely shared between threads.
124#[derive(Clone)]
125pub struct TelemetryCompletionHandler {
126 provider: Arc<SdkTracerProvider>,
127 sender: Option<UnboundedSender<()>>,
128 mode: ProcessorMode,
129 tracer: opentelemetry_sdk::trace::Tracer,
130}
131
132impl TelemetryCompletionHandler {
133 pub fn new(
134 provider: Arc<SdkTracerProvider>,
135 sender: Option<UnboundedSender<()>>,
136 mode: ProcessorMode,
137 ) -> Self {
138 // Create instrumentation scope with attributes
139 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
140 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
141 .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
142 .with_attributes(vec![
143 KeyValue::new("library.language", "rust"),
144 KeyValue::new("library.type", "instrumentation"),
145 KeyValue::new("library.runtime", "aws_lambda"),
146 ])
147 .build();
148
149 // Create tracer with instrumentation scope
150 let tracer = provider.tracer_with_scope(scope);
151
152 Self {
153 provider,
154 sender,
155 mode,
156 tracer,
157 }
158 }
159
160 /// Get the tracer instance for creating spans.
161 ///
162 /// Returns the cached tracer instance configured with this package's instrumentation scope.
163 /// The tracer is configured with the provider's settings and will automatically use
164 /// the correct span processor based on the processing mode.
165 pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
166 &self.tracer
167 }
168
169 /// Complete telemetry processing for the current invocation
170 ///
171 /// In Sync mode, this will force flush the provider and log any errors that occur.
172 /// In Async mode, this will send a completion signal to the extension.
173 /// In Finalize mode, this will do nothing (handled by drop).
174 pub fn complete(&self) {
175 match self.mode {
176 ProcessorMode::Sync => {
177 if let Err(e) = self.provider.force_flush() {
178 tracing::warn!(error = ?e, "Error flushing telemetry");
179 }
180 }
181 ProcessorMode::Async => {
182 if let Some(sender) = &self.sender {
183 if let Err(e) = sender.send(()) {
184 tracing::warn!(error = ?e, "Failed to send completion signal to extension");
185 }
186 }
187 }
188 ProcessorMode::Finalize => {
189 // Do nothing, handled by drop
190 }
191 }
192 }
193}
194
195/// Configuration for OpenTelemetry initialization.
196///
197/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
198/// for standard Lambda configuration, or the builder pattern for customization.
199///
200/// # Fields
201///
202/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
203/// * `set_global_provider` - Set as global tracer provider (default: true)
204/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
205/// * `env_var_name` - Environment variable name for log level configuration
206/// * `id_generator` - Custom ID generator for trace and span IDs
207///
208/// # Examples
209///
210/// Basic usage with default configuration:
211///
212/// ```no_run
213/// use lambda_otel_lite::telemetry::TelemetryConfig;
214///
215/// let config = TelemetryConfig::default();
216/// ```
217///
218/// Custom configuration with resource attributes:
219///
220/// ```no_run
221/// use lambda_otel_lite::telemetry::TelemetryConfig;
222/// use opentelemetry::KeyValue;
223/// use opentelemetry_sdk::Resource;
224///
225/// let config = TelemetryConfig::builder()
226/// .resource(Resource::builder()
227/// .with_attributes(vec![KeyValue::new("version", "1.0.0")])
228/// .build())
229/// .build();
230/// ```
231///
232/// Custom configuration with logging options:
233///
234/// ```no_run
235/// use lambda_otel_lite::telemetry::TelemetryConfig;
236///
237/// let config = TelemetryConfig::builder()
238/// .enable_fmt_layer(true) // Enable console output for debugging
239/// .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string()) // Custom env var for log level
240/// .build();
241/// ```
242#[derive(Builder, Debug)]
243pub struct TelemetryConfig {
244 // Custom fields for internal state
245 #[builder(field)]
246 provider_builder: TracerProviderBuilder,
247
248 #[builder(field)]
249 has_processor: bool,
250
251 #[builder(field)]
252 propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
253
254 /// Enable console output for debugging.
255 ///
256 /// When enabled, spans and events will be printed to the console in addition
257 /// to being exported through the configured span processors. This is useful
258 /// for debugging but adds overhead and should be disabled in production.
259 ///
260 /// This can also be controlled via the `LAMBDA_TRACING_ENABLE_FMT_LAYER` environment variable,
261 /// which takes precedence over this setting when present:
262 /// - Setting the env var to "true" will enable console output even if this field is false
263 /// - Setting the env var to "false" will disable console output even if this field is true
264 /// - Invalid values will log a warning and fall back to this code setting
265 ///
266 /// This environment variable override allows toggling logging for debugging without code changes.
267 ///
268 /// Default: `false`
269 #[builder(default = false)]
270 pub enable_fmt_layer: bool,
271
272 /// Set this provider as the global OpenTelemetry provider.
273 ///
274 /// When enabled, the provider will be registered as the global provider
275 /// for the OpenTelemetry API. This allows using the global tracer API
276 /// without explicitly passing around the provider.
277 ///
278 /// Default: `true`
279 #[builder(default = true)]
280 pub set_global_provider: bool,
281
282 /// Custom resource attributes for all spans.
283 ///
284 /// If not provided, resource attributes will be automatically detected
285 /// from the Lambda environment. Custom resources will override any
286 /// automatically detected attributes with the same keys.
287 ///
288 /// Default: `None` (auto-detected from Lambda environment)
289 pub resource: Option<Resource>,
290
291 /// Environment variable name to use for log level configuration.
292 ///
293 /// This field specifies which environment variable should be used to configure
294 /// the tracing subscriber's log level filter. If not specified, the system will
295 /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
296 ///
297 /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
298 pub env_var_name: Option<String>,
299}
300
301impl Default for TelemetryConfig {
302 fn default() -> Self {
303 Self::builder().build()
304 }
305}
306
307/// Builder methods for adding span processors and other configuration
308impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
309 /// Add a span processor to the tracer provider.
310 ///
311 /// This method allows adding custom span processors for trace data processing.
312 /// Multiple processors can be added by calling this method multiple times.
313 ///
314 /// # Arguments
315 ///
316 /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
317 ///
318 /// # Examples
319 ///
320 /// ```no_run
321 /// use lambda_otel_lite::TelemetryConfig;
322 /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
323 /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
324 ///
325 /// // Only use builder when adding custom processors
326 /// let config = TelemetryConfig::builder()
327 /// .with_span_processor(SimpleSpanProcessor::new(
328 /// Box::new(OtlpStdoutSpanExporter::default())
329 /// ))
330 /// .build();
331 /// ```
332 pub fn with_span_processor<T>(mut self, processor: T) -> Self
333 where
334 T: SpanProcessor + 'static,
335 {
336 self.provider_builder = self.provider_builder.with_span_processor(processor);
337 self.has_processor = true;
338 self
339 }
340
341 /// Add a propagator to the list of propagators.
342 ///
343 /// Multiple propagators can be added and will be combined into a composite propagator.
344 /// The default propagator is TraceContextPropagator.
345 ///
346 /// # Arguments
347 ///
348 /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
349 ///
350 /// # Examples
351 ///
352 /// ```no_run
353 /// use lambda_otel_lite::TelemetryConfig;
354 /// use opentelemetry_sdk::propagation::BaggagePropagator;
355 ///
356 /// let config = TelemetryConfig::builder()
357 /// .with_propagator(BaggagePropagator::new())
358 /// .build();
359 /// ```
360 pub fn with_propagator<T>(mut self, propagator: T) -> Self
361 where
362 T: TextMapPropagator + Send + Sync + 'static,
363 {
364 self.propagators.push(Box::new(propagator));
365 self
366 }
367
368 pub fn with_named_propagator(self, name: &str) -> Self {
369 match name {
370 "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
371 "xray" => self.with_propagator(XrayPropagator::new()),
372 "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
373 "none" => self.with_propagator(NoopPropagator::new()),
374 _ => {
375 tracing::warn!("Unknown propagator: {}, using default propagators", name);
376 self
377 }
378 }
379 }
380
381 /// Add a custom ID generator to the tracer provider.
382 ///
383 /// This method allows setting a custom ID generator for trace and span IDs.
384 /// This is particularly useful when integrating with AWS X-Ray, which requires
385 /// a specific ID format.
386 ///
387 /// # Arguments
388 ///
389 /// * `id_generator` - An ID generator implementing the [`IdGenerator`] trait
390 ///
391 /// # Examples
392 ///
393 /// ```no_run
394 /// use lambda_otel_lite::TelemetryConfig;
395 /// use opentelemetry_aws::trace::XrayIdGenerator;
396 ///
397 /// // Configure with X-Ray compatible ID generator
398 /// let config = TelemetryConfig::builder()
399 /// .with_id_generator(XrayIdGenerator::default())
400 /// .build();
401 /// ```
402 pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
403 where
404 T: IdGenerator + 'static,
405 {
406 self.provider_builder = self.provider_builder.with_id_generator(id_generator);
407 self
408 }
409}
410
411/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
412///
413/// # Arguments
414///
415/// * `config` - Configuration for telemetry initialization
416///
417/// # Returns
418///
419/// Returns a tuple containing:
420/// - A tracer instance for manual instrumentation
421/// - A completion handler for managing span export timing
422///
423/// # Errors
424///
425/// Returns error if:
426/// - Extension registration fails (async/finalize modes)
427/// - Tracer provider initialization fails
428/// - Environment variable parsing fails
429///
430/// # Examples
431///
432/// Basic usage with default configuration:
433///
434/// ```no_run
435/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
436///
437/// # async fn example() -> Result<(), lambda_runtime::Error> {
438/// // Initialize with default configuration
439/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
440/// # Ok(())
441/// # }
442/// ```
443///
444/// Custom configuration:
445///
446/// ```no_run
447/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
448/// use opentelemetry::KeyValue;
449/// use opentelemetry_sdk::Resource;
450///
451/// # async fn example() -> Result<(), lambda_runtime::Error> {
452/// // Create custom resource
453/// let resource = Resource::builder()
454/// .with_attributes(vec![
455/// KeyValue::new("service.name", "payment-api"),
456/// KeyValue::new("service.version", "1.2.3"),
457/// ])
458/// .build();
459///
460/// // Initialize with custom configuration
461/// let (_, telemetry) = init_telemetry(
462/// TelemetryConfig::builder()
463/// .resource(resource)
464/// .build()
465/// ).await?;
466/// # Ok(())
467/// # }
468/// ```
469///
470/// Advanced usage with BatchSpanProcessor (required for async exporters):
471///
472/// ```no_run
473/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
474/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
475/// use opentelemetry_sdk::trace::BatchSpanProcessor;
476/// use lambda_runtime::Error;
477///
478/// # async fn example() -> Result<(), Error> {
479/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
480/// .with_http()
481/// .with_http_client(reqwest::Client::new())
482/// .with_protocol(Protocol::HttpBinary)
483/// .build()?;
484///
485/// let (provider, completion) = init_telemetry(
486/// TelemetryConfig::builder()
487/// .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
488/// .build()
489/// ).await?;
490/// # Ok(())
491/// # }
492/// ```
493///
494/// Using LambdaSpanProcessor with blocking http client:
495///
496/// ```no_run
497/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
498/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
499/// use lambda_runtime::Error;
500///
501/// # async fn example() -> Result<(), Error> {
502/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
503/// .with_http()
504/// .with_http_client(reqwest::blocking::Client::new())
505/// .with_protocol(Protocol::HttpBinary)
506/// .build()?;
507///
508/// let (provider, completion) = init_telemetry(
509/// TelemetryConfig::builder()
510/// .with_span_processor(
511/// LambdaSpanProcessor::builder()
512/// .exporter(lambda_exporter)
513/// .max_batch_size(512)
514/// .max_queue_size(2048)
515/// .build()
516/// )
517/// .build()
518/// ).await?;
519/// # Ok(())
520/// # }
521/// ```
522///
523pub async fn init_telemetry(
524 mut config: TelemetryConfig,
525) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
526 let mode = ProcessorMode::from_env();
527
528 if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
529 let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
530
531 for propagator in propagators {
532 match propagator {
533 "tracecontext" => config
534 .propagators
535 .push(Box::new(TraceContextPropagator::new())),
536 "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
537 "xray-lambda" => config
538 .propagators
539 .push(Box::new(LambdaXrayPropagator::new())),
540 "none" => config.propagators.push(Box::new(NoopPropagator::new())),
541 _ => tracing::warn!(
542 "Unknown propagator: {}, using default propagators",
543 propagator
544 ),
545 }
546 }
547 } else {
548 // if no propagators are set, use the default propagators
549 if config.propagators.is_empty() {
550 config
551 .propagators
552 .push(Box::new(TraceContextPropagator::new()));
553 config
554 .propagators
555 .push(Box::new(LambdaXrayPropagator::new()));
556 }
557 }
558
559 let composite_propagator = TextMapCompositePropagator::new(config.propagators);
560 global::set_text_map_propagator(composite_propagator);
561
562 // Add default span processor if none was added
563 if !config.has_processor {
564 let processor = LambdaSpanProcessor::builder()
565 .exporter(OtlpStdoutSpanExporter::default())
566 .build();
567 config.provider_builder = config.provider_builder.with_span_processor(processor);
568 }
569
570 // Apply defaults and build the provider
571 let resource = config.resource.unwrap_or_else(get_lambda_resource);
572
573 let provider = Arc::new(config.provider_builder.with_resource(resource).build());
574
575 // Register the extension if in async or finalize mode
576 let sender = match mode {
577 ProcessorMode::Async | ProcessorMode::Finalize => {
578 Some(register_extension(provider.clone(), mode.clone()).await?)
579 }
580 _ => None,
581 };
582
583 if config.set_global_provider {
584 // Set the provider as global
585 set_tracer_provider(provider.as_ref().clone());
586 }
587
588 // Initialize tracing subscriber with smart env var selection
589 let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
590 if env::var("RUST_LOG").is_ok() {
591 "RUST_LOG"
592 } else {
593 "AWS_LAMBDA_LOG_LEVEL"
594 }
595 });
596
597 let env_filter = tracing_subscriber::EnvFilter::builder()
598 .with_env_var(env_var_name)
599 .from_env_lossy();
600
601 let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
602 let tracer = completion_handler.get_tracer().clone();
603
604 let subscriber = tracing_subscriber::registry::Registry::default()
605 .with(tracing_opentelemetry::OpenTelemetryLayer::new(
606 tracer.clone(),
607 ))
608 .with(env_filter);
609
610 // Determine if fmt layer should be enabled - environment variable takes precedence when set
611 let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
612 match env_value.to_lowercase().as_str() {
613 "true" => true,
614 "false" => false,
615 other => {
616 tracing::warn!(
617 "Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
618 other,
619 constants::env_vars::ENABLE_FMT_LAYER
620 );
621 config.enable_fmt_layer
622 }
623 }
624 } else {
625 // If env var not set, use the configured value
626 config.enable_fmt_layer
627 };
628
629 // Enable fmt layer based on the determined value
630 if enable_fmt {
631 // Determine if the lambda logging configuration is set to output json logs
632 let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
633 .unwrap_or_default()
634 .to_uppercase()
635 == "JSON";
636
637 if is_json {
638 tracing::subscriber::set_global_default(
639 subscriber.with(
640 tracing_subscriber::fmt::layer()
641 .with_target(false)
642 .without_time()
643 .json(),
644 ),
645 )?;
646 } else {
647 tracing::subscriber::set_global_default(
648 subscriber.with(
649 tracing_subscriber::fmt::layer()
650 .with_target(false)
651 .without_time()
652 .with_ansi(false),
653 ),
654 )?;
655 }
656 } else {
657 tracing::subscriber::set_global_default(subscriber)?;
658 }
659
660 Ok((tracer, completion_handler))
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use opentelemetry::trace::{Span, Tracer};
667 use opentelemetry_aws::trace::XrayIdGenerator;
668 use opentelemetry_sdk::trace::SimpleSpanProcessor;
669 use sealed_test::prelude::*;
670 use std::sync::Arc;
671 use tokio::sync::mpsc;
672
673 // Helper to clean up environment variables between tests
674 fn cleanup_env() {
675 env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
676 env::remove_var(constants::env_vars::PROPAGATORS);
677 env::remove_var("_X_AMZN_TRACE_ID");
678 }
679
680 #[test]
681 #[sealed_test]
682 fn test_telemetry_config_defaults() {
683 cleanup_env();
684
685 let config = TelemetryConfig::builder().build();
686 assert!(config.set_global_provider); // Should be true by default
687 assert!(!config.has_processor);
688 assert!(!config.enable_fmt_layer);
689 assert!(config.propagators.is_empty()); // No propagators by default in builder
690 }
691
692 #[test]
693 #[sealed_test]
694 fn test_telemetry_config_with_propagators() {
695 cleanup_env();
696
697 // Test with explicit tracecontext propagator
698 let config = TelemetryConfig::builder()
699 .with_span_processor(SimpleSpanProcessor::new(Box::new(
700 OtlpStdoutSpanExporter::default(),
701 )))
702 .with_named_propagator("tracecontext")
703 .build();
704 assert_eq!(config.propagators.len(), 1);
705
706 // Test with explicit xray propagator
707 let config = TelemetryConfig::builder()
708 .with_named_propagator("xray")
709 .build();
710 assert_eq!(config.propagators.len(), 1);
711
712 // Test with both propagators
713 let config = TelemetryConfig::builder()
714 .with_named_propagator("tracecontext")
715 .with_named_propagator("xray")
716 .build();
717 assert_eq!(config.propagators.len(), 2);
718
719 // Test with default propagators (empty - will be set in init_telemetry)
720 let config = TelemetryConfig::builder().build();
721 assert_eq!(config.propagators.len(), 0);
722
723 // Test with none
724 let config = TelemetryConfig::builder()
725 .with_named_propagator("none")
726 .build();
727 assert_eq!(config.propagators.len(), 1);
728 }
729
730 #[tokio::test]
731 #[sealed_test]
732 async fn test_telemetry_config_env_propagators_tracecontext() {
733 cleanup_env();
734
735 // Test with OTEL_PROPAGATORS=tracecontext
736 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
737 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
738 // In real usage we'd check the behavior rather than implementation details
739 // So we'll just check that we can create and use a handler
740 assert!(handler.sender.is_none());
741
742 cleanup_env();
743 }
744
745 #[tokio::test]
746 #[sealed_test]
747 async fn test_telemetry_config_env_propagators_xray() {
748 cleanup_env();
749
750 // Test with OTEL_PROPAGATORS=xray
751 env::set_var(constants::env_vars::PROPAGATORS, "xray");
752 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
753 assert!(handler.sender.is_none());
754
755 cleanup_env();
756 }
757
758 #[tokio::test]
759 #[sealed_test]
760 async fn test_telemetry_config_env_propagators_combined() {
761 cleanup_env();
762
763 // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
764 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
765 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
766 assert!(handler.sender.is_none());
767
768 cleanup_env();
769 }
770
771 #[tokio::test]
772 #[sealed_test]
773 async fn test_telemetry_config_env_propagators_none() {
774 cleanup_env();
775
776 // Test with OTEL_PROPAGATORS=none
777 env::set_var(constants::env_vars::PROPAGATORS, "none");
778 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
779 assert!(handler.sender.is_none());
780
781 cleanup_env();
782 }
783
784 #[tokio::test]
785 #[sealed_test]
786 async fn test_init_telemetry_defaults() {
787 let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
788 assert!(completion_handler.sender.is_none()); // Default mode is Sync
789 }
790
791 #[tokio::test]
792 #[sealed_test]
793 async fn test_init_telemetry_custom() {
794 let resource = Resource::builder().build();
795 let config = TelemetryConfig::builder()
796 .resource(resource)
797 .with_named_propagator("tracecontext")
798 .enable_fmt_layer(true)
799 .set_global_provider(false)
800 .build();
801
802 let (_, completion_handler) = init_telemetry(config).await.unwrap();
803 assert!(completion_handler.sender.is_none());
804 }
805
806 #[tokio::test]
807 #[sealed_test]
808 async fn test_telemetry_config_env_fmt_layer_true_override() {
809 cleanup_env();
810
811 // Test: Env var "true" overrides code setting "false"
812 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
813 let config = TelemetryConfig::default(); // code setting is false by default
814 assert!(!config.enable_fmt_layer); // Config should not be affected by env var
815
816 // Initialize telemetry - env var should override config
817 let result = init_telemetry(config).await;
818 assert!(result.is_ok());
819
820 // Clean up
821 cleanup_env();
822 }
823
824 #[tokio::test]
825 #[sealed_test]
826 async fn test_telemetry_config_env_fmt_layer_false_override() {
827 cleanup_env();
828
829 // Test: Env var "false" overrides code setting "true"
830 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
831 let config = TelemetryConfig::builder()
832 .enable_fmt_layer(true) // code setting is true
833 .build();
834 assert!(config.enable_fmt_layer);
835
836 // Initialize telemetry - env var should override config
837 let result = init_telemetry(config).await;
838 assert!(result.is_ok());
839
840 // Clean up
841 cleanup_env();
842 }
843
844 #[tokio::test]
845 #[sealed_test]
846 async fn test_telemetry_config_env_fmt_layer_invalid() {
847 cleanup_env();
848
849 // Test: Invalid env var falls back to code setting
850 env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
851 let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
852
853 // Initialize telemetry - should log a warning but use code setting
854 let result = init_telemetry(config).await;
855 assert!(result.is_ok());
856
857 // Clean up
858 cleanup_env();
859 }
860
861 #[tokio::test]
862 #[sealed_test]
863 async fn test_telemetry_config_env_fmt_layer_not_set() {
864 cleanup_env();
865
866 // Test: No env var uses code setting
867 let config = TelemetryConfig::default();
868 assert!(!config.enable_fmt_layer);
869
870 let result = init_telemetry(config).await;
871 assert!(result.is_ok());
872
873 // Clean up
874 cleanup_env();
875 }
876
877 #[test]
878 fn test_completion_handler_sync_mode() {
879 let provider = Arc::new(
880 SdkTracerProvider::builder()
881 .with_span_processor(SimpleSpanProcessor::new(Box::new(
882 OtlpStdoutSpanExporter::default(),
883 )))
884 .build(),
885 );
886
887 let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
888
889 // In sync mode, complete() should call force_flush
890 handler.complete();
891 // Note: We can't easily verify the flush was called since TracerProvider
892 // doesn't expose this information, but we can verify it doesn't panic
893 }
894
895 #[tokio::test]
896 async fn test_completion_handler_async_mode() {
897 let provider = Arc::new(
898 SdkTracerProvider::builder()
899 .with_span_processor(SimpleSpanProcessor::new(Box::new(
900 OtlpStdoutSpanExporter::default(),
901 )))
902 .build(),
903 );
904
905 let (tx, mut rx) = mpsc::unbounded_channel();
906
907 let completion_handler =
908 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
909
910 // In async mode, complete() should send a message through the channel
911 completion_handler.complete();
912
913 // Verify that we received the completion signal
914 assert!(rx.try_recv().is_ok());
915 // Verify channel is now empty
916 assert!(rx.try_recv().is_err());
917 }
918
919 #[test]
920 fn test_completion_handler_finalize_mode() {
921 let provider = Arc::new(
922 SdkTracerProvider::builder()
923 .with_span_processor(SimpleSpanProcessor::new(Box::new(
924 OtlpStdoutSpanExporter::default(),
925 )))
926 .build(),
927 );
928
929 let (tx, _rx) = mpsc::unbounded_channel();
930
931 let completion_handler =
932 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
933
934 // In finalize mode, complete() should do nothing
935 completion_handler.complete();
936 // Verify it doesn't panic or cause issues
937 }
938
939 #[test]
940 fn test_completion_handler_clone() {
941 let provider = Arc::new(
942 SdkTracerProvider::builder()
943 .with_span_processor(SimpleSpanProcessor::new(Box::new(
944 OtlpStdoutSpanExporter::default(),
945 )))
946 .build(),
947 );
948
949 let (tx, _rx) = mpsc::unbounded_channel();
950
951 let completion_handler =
952 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
953
954 // Test that Clone is implemented correctly
955 let cloned = completion_handler.clone();
956
957 // Verify both handlers have the same mode
958 assert!(matches!(cloned.mode, ProcessorMode::Async));
959 assert!(cloned.sender.is_some());
960 }
961
962 #[test]
963 fn test_completion_handler_sync_mode_error_handling() {
964 let provider = Arc::new(
965 SdkTracerProvider::builder()
966 .with_span_processor(SimpleSpanProcessor::new(Box::new(
967 OtlpStdoutSpanExporter::default(),
968 )))
969 .build(),
970 );
971
972 let completion_handler =
973 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
974
975 // Test that complete() doesn't panic
976 completion_handler.complete();
977 }
978
979 #[tokio::test]
980 async fn test_completion_handler_async_mode_error_handling() {
981 let provider = Arc::new(
982 SdkTracerProvider::builder()
983 .with_span_processor(SimpleSpanProcessor::new(Box::new(
984 OtlpStdoutSpanExporter::default(),
985 )))
986 .build(),
987 );
988
989 // Use UnboundedSender instead of Sender
990 let (tx, _rx) = mpsc::unbounded_channel();
991 // Fill the channel by dropping the receiver
992 drop(_rx);
993
994 let completion_handler =
995 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
996
997 // Test that complete() doesn't panic when receiver is dropped
998 completion_handler.complete();
999 }
1000
1001 #[test]
1002 #[sealed_test]
1003 fn test_telemetry_config_with_id_generator() {
1004 cleanup_env();
1005
1006 // Create a config with X-Ray ID generator
1007 let config = TelemetryConfig::builder()
1008 .with_id_generator(XrayIdGenerator::default())
1009 .build();
1010
1011 // We can't directly check the ID generator type since it's boxed inside the provider,
1012 // but we can verify it's applied by checking the generated trace IDs format
1013 let provider = Arc::new(config.provider_builder.build());
1014
1015 // Create a scope with attributes
1016 let scope = opentelemetry::InstrumentationScope::builder("test")
1017 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
1018 .build();
1019
1020 // Get a tracer using the correct API
1021 let tracer = provider.tracer_with_scope(scope);
1022
1023 // Start a span using the tracer
1024 let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
1025 let trace_id = span.span_context().trace_id();
1026
1027 // Verify X-Ray trace ID format:
1028 // 1. Convert to hex string for easier checking
1029 let trace_id_hex = format!("{:032x}", trace_id);
1030
1031 // 2. The first 8 characters of X-Ray trace IDs represent a timestamp in seconds
1032 // This is the key characteristic of X-Ray trace IDs that we can verify
1033 let timestamp_part = &trace_id_hex[0..8];
1034
1035 // 3. Parse the hex timestamp to ensure it's a valid timestamp (recent past)
1036 let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
1037
1038 // 4. Check that timestamp is reasonable (within the last day)
1039 let now = std::time::SystemTime::now()
1040 .duration_since(std::time::UNIX_EPOCH)
1041 .unwrap()
1042 .as_secs() as u32;
1043
1044 // The timestamp should be within the last day
1045 assert!(timestamp <= now);
1046 assert!(timestamp > now - 86400); // Within the last day
1047
1048 // Verify remaining 24 characters are not all zeros (random part)
1049 let random_part = &trace_id_hex[8..];
1050 assert_ne!(random_part, "000000000000000000000000");
1051 }
1052}
1053
1054// A simple no-op propagator
1055#[derive(Debug)]
1056struct NoopPropagator;
1057
1058impl NoopPropagator {
1059 fn new() -> Self {
1060 NoopPropagator
1061 }
1062}
1063
1064impl TextMapPropagator for NoopPropagator {
1065 fn inject_context(
1066 &self,
1067 _cx: &opentelemetry::Context,
1068 _injector: &mut dyn opentelemetry::propagation::Injector,
1069 ) {
1070 }
1071
1072 fn extract_with_context(
1073 &self,
1074 cx: &opentelemetry::Context,
1075 _extractor: &dyn opentelemetry::propagation::Extractor,
1076 ) -> opentelemetry::Context {
1077 cx.clone()
1078 }
1079
1080 fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
1081 opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
1082 }
1083}