lambda_otel_lite/telemetry.rs
1//! Core functionality for OpenTelemetry initialization and configuration in Lambda functions.
2//!
3//! This module provides the initialization and configuration components for OpenTelemetry in Lambda:
4//! - `init_telemetry`: Main entry point for telemetry setup
5//! - `TelemetryConfig`: Configuration builder with environment-based defaults
6//! - `TelemetryCompletionHandler`: Controls span export timing based on processing mode
7//!
8//! # Architecture
9//!
10//! The initialization flow:
11//! 1. Configuration is built from environment and/or builder options
12//! 2. Span processor is created based on processing mode
13//! 3. Resource attributes are detected from Lambda environment
14//! 4. Tracer provider is initialized with the configuration
15//! 5. Completion handler is returned for managing span export
16//!
17//! # Environment Configuration
18//!
19//! Core environment variables:
20//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: "sync" (default), "async", or "finalize"
21//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Maximum spans in buffer (default: 2048)
22//! - `OTEL_SERVICE_NAME`: Override auto-detected service name
23//!
24//! # Basic Usage
25//!
26//! ```no_run
27//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
28//! use lambda_runtime::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Error> {
32//! let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
33//! Ok(())
34//! }
35//! ```
36//!
37//! Custom configuration with custom resource attributes:
38//! ```no_run
39//! use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
40//! use opentelemetry::KeyValue;
41//! use opentelemetry_sdk::Resource;
42//! use lambda_runtime::Error;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), Error> {
46//! let resource = Resource::builder()
47//! .with_attributes(vec![
48//! KeyValue::new("service.version", "1.0.0"),
49//! KeyValue::new("deployment.environment", "production"),
50//! ])
51//! .build();
52//!
53//! let config = TelemetryConfig::builder()
54//! .resource(resource)
55//! .build();
56//!
57//! let (_, completion_handler) = init_telemetry(config).await?;
58//! Ok(())
59//! }
60//! ```
61//!
62//! Custom configuration with custom span processor:
63//! ```no_run
64//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
65//! use opentelemetry_sdk::trace::SimpleSpanProcessor;
66//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
67//! use lambda_runtime::Error;
68//!
69//! #[tokio::main]
70//! async fn main() -> Result<(), Error> {
71//! let config = TelemetryConfig::builder()
72//! .with_span_processor(SimpleSpanProcessor::new(
73//! Box::new(OtlpStdoutSpanExporter::default())
74//! ))
75//! .enable_fmt_layer(true)
76//! .build();
77//!
78//! let (_, completion_handler) = init_telemetry(config).await?;
79//! Ok(())
80//! }
81//! ```
82//!
83//! # Environment Variables
84//!
85//! The following environment variables affect the configuration:
86//! - `OTEL_SERVICE_NAME`: Service name for spans
87//! - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes
88//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Span buffer size (default: 2048)
89//! - `OTLP_STDOUT_SPAN_EXPORTER_COMPRESSION_LEVEL`: Export compression (default: 6)
90//! - `LAMBDA_TRACING_ENABLE_FMT_LAYER`: Enable formatting layer (default: false)
91//! - `LAMBDA_EXTENSION_SPAN_PROCESSOR_MODE`: Processing mode (sync/async/finalize)
92//! - `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`: Log level configuration
93
94use crate::constants::defaults;
95use crate::constants::env_vars;
96use crate::{
97 constants, extension::register_extension, mode::ProcessorMode, processor::LambdaSpanProcessor,
98 propagation::LambdaXrayPropagator, resource::get_lambda_resource,
99};
100use bon::Builder;
101use lambda_runtime::Error;
102use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
103use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
104use opentelemetry_aws::trace::XrayPropagator;
105use opentelemetry_sdk::{
106 propagation::TraceContextPropagator,
107 trace::{SdkTracerProvider, SpanProcessor, TracerProviderBuilder},
108 Resource,
109};
110use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
111use std::{borrow::Cow, env, sync::Arc};
112use tokio::sync::mpsc::UnboundedSender;
113use tracing_subscriber::layer::SubscriberExt;
114
115/// Manages the lifecycle of span export based on the processing mode.
116///
117/// This handler must be used to signal when spans should be exported. Its behavior
118/// varies by processing mode:
119/// - Sync: Forces immediate export
120/// - Async: Signals the extension to export
121/// - Finalize: Defers to span processor
122///
123/// # Thread Safety
124///
125/// This type is `Clone` and can be safely shared between threads.
126#[derive(Clone)]
127pub struct TelemetryCompletionHandler {
128 provider: Arc<SdkTracerProvider>,
129 sender: Option<UnboundedSender<()>>,
130 mode: ProcessorMode,
131 tracer: opentelemetry_sdk::trace::Tracer,
132}
133
134impl TelemetryCompletionHandler {
135 pub fn new(
136 provider: Arc<SdkTracerProvider>,
137 sender: Option<UnboundedSender<()>>,
138 mode: ProcessorMode,
139 ) -> Self {
140 // Create instrumentation scope with attributes
141 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
142 .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
143 .with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
144 .with_attributes(vec![
145 KeyValue::new("library.language", "rust"),
146 KeyValue::new("library.type", "instrumentation"),
147 KeyValue::new("library.runtime", "aws_lambda"),
148 ])
149 .build();
150
151 // Create tracer with instrumentation scope
152 let tracer = provider.tracer_with_scope(scope);
153
154 Self {
155 provider,
156 sender,
157 mode,
158 tracer,
159 }
160 }
161
162 /// Get the tracer instance for creating spans.
163 ///
164 /// Returns the cached tracer instance configured with this package's instrumentation scope.
165 /// The tracer is configured with the provider's settings and will automatically use
166 /// the correct span processor based on the processing mode.
167 pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
168 &self.tracer
169 }
170
171 /// Complete telemetry processing for the current invocation
172 ///
173 /// In Sync mode, this will force flush the provider and log any errors that occur.
174 /// In Async mode, this will send a completion signal to the extension.
175 /// In Finalize mode, this will do nothing (handled by drop).
176 pub fn complete(&self) {
177 match self.mode {
178 ProcessorMode::Sync => {
179 if let Err(e) = self.provider.force_flush() {
180 tracing::warn!(error = ?e, "Error flushing telemetry");
181 }
182 }
183 ProcessorMode::Async => {
184 if let Some(sender) = &self.sender {
185 if let Err(e) = sender.send(()) {
186 tracing::warn!(error = ?e, "Failed to send completion signal to extension");
187 }
188 }
189 }
190 ProcessorMode::Finalize => {
191 // Do nothing, handled by drop
192 }
193 }
194 }
195}
196
197/// Configuration for OpenTelemetry initialization.
198///
199/// Provides configuration options for telemetry setup. Use `TelemetryConfig::default()`
200/// for standard Lambda configuration, or the builder pattern for customization.
201///
202/// # Fields
203///
204/// * `enable_fmt_layer` - Enable console output for debugging (default: false)
205/// * `set_global_provider` - Set as global tracer provider (default: true)
206/// * `resource` - Custom resource attributes (default: auto-detected from Lambda)
207/// * `env_var_name` - Environment variable name for log level configuration
208///
209/// # Examples
210///
211/// Basic usage with default configuration:
212///
213/// ```no_run
214/// use lambda_otel_lite::telemetry::TelemetryConfig;
215///
216/// let config = TelemetryConfig::default();
217/// ```
218///
219/// Custom configuration with resource attributes:
220///
221/// ```no_run
222/// use lambda_otel_lite::telemetry::TelemetryConfig;
223/// use opentelemetry::KeyValue;
224/// use opentelemetry_sdk::Resource;
225///
226/// let config = TelemetryConfig::builder()
227/// .resource(Resource::builder()
228/// .with_attributes(vec![KeyValue::new("version", "1.0.0")])
229/// .build())
230/// .build();
231/// ```
232///
233/// Custom configuration with logging options:
234///
235/// ```no_run
236/// use lambda_otel_lite::telemetry::TelemetryConfig;
237///
238/// let config = TelemetryConfig::builder()
239/// .enable_fmt_layer(true) // Enable console output for debugging
240/// .env_var_name("MY_CUSTOM_LOG_LEVEL".to_string()) // Custom env var for log level
241/// .build();
242/// ```
243#[derive(Builder, Debug)]
244pub struct TelemetryConfig {
245 // Custom fields for internal state
246 #[builder(field)]
247 provider_builder: TracerProviderBuilder,
248
249 #[builder(field)]
250 has_processor: bool,
251
252 #[builder(field)]
253 propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
254
255 /// Enable console output for debugging.
256 ///
257 /// When enabled, spans and events will be printed to the console in addition
258 /// to being exported through the configured span processors. This is useful
259 /// for debugging but adds overhead and should be disabled in production.
260 ///
261 /// This can also be controlled via the `LAMBDA_OTEL_ENABLE_FMT_LAYER` environment variable.
262 /// Setting this to "true" will enable console output even if this field is false in the code.
263 /// This allows toggling logging for debugging without code changes.
264 ///
265 /// Default: `false`
266 #[builder(default = false)]
267 pub enable_fmt_layer: bool,
268
269 /// Set this provider as the global OpenTelemetry provider.
270 ///
271 /// When enabled, the provider will be registered as the global provider
272 /// for the OpenTelemetry API. This allows using the global tracer API
273 /// without explicitly passing around the provider.
274 ///
275 /// Default: `true`
276 #[builder(default = true)]
277 pub set_global_provider: bool,
278
279 /// Custom resource attributes for all spans.
280 ///
281 /// If not provided, resource attributes will be automatically detected
282 /// from the Lambda environment. Custom resources will override any
283 /// automatically detected attributes with the same keys.
284 ///
285 /// Default: `None` (auto-detected from Lambda environment)
286 pub resource: Option<Resource>,
287
288 /// Environment variable name to use for log level configuration.
289 ///
290 /// This field specifies which environment variable should be used to configure
291 /// the tracing subscriber's log level filter. If not specified, the system will
292 /// first check for `RUST_LOG` and then fall back to `AWS_LAMBDA_LOG_LEVEL`.
293 ///
294 /// Default: `None` (uses `RUST_LOG` or `AWS_LAMBDA_LOG_LEVEL`)
295 pub env_var_name: Option<String>,
296}
297
298impl Default for TelemetryConfig {
299 fn default() -> Self {
300 Self::builder().build()
301 }
302}
303
304/// Builder methods for adding span processors and other configuration
305impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
306 /// Add a span processor to the tracer provider.
307 ///
308 /// This method allows adding custom span processors for trace data processing.
309 /// Multiple processors can be added by calling this method multiple times.
310 ///
311 /// # Arguments
312 ///
313 /// * `processor` - A span processor implementing the [`SpanProcessor`] trait
314 ///
315 /// # Examples
316 ///
317 /// ```no_run
318 /// use lambda_otel_lite::TelemetryConfig;
319 /// use opentelemetry_sdk::trace::SimpleSpanProcessor;
320 /// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
321 ///
322 /// // Only use builder when adding custom processors
323 /// let config = TelemetryConfig::builder()
324 /// .with_span_processor(SimpleSpanProcessor::new(
325 /// Box::new(OtlpStdoutSpanExporter::default())
326 /// ))
327 /// .build();
328 /// ```
329 pub fn with_span_processor<T>(mut self, processor: T) -> Self
330 where
331 T: SpanProcessor + 'static,
332 {
333 self.provider_builder = self.provider_builder.with_span_processor(processor);
334 self.has_processor = true;
335 self
336 }
337
338 /// Add a propagator to the list of propagators.
339 ///
340 /// Multiple propagators can be added and will be combined into a composite propagator.
341 /// The default propagator is TraceContextPropagator.
342 ///
343 /// # Arguments
344 ///
345 /// * `propagator` - A propagator implementing the [`TextMapPropagator`] trait
346 ///
347 /// # Examples
348 ///
349 /// ```no_run
350 /// use lambda_otel_lite::TelemetryConfig;
351 /// use opentelemetry_sdk::propagation::BaggagePropagator;
352 ///
353 /// let config = TelemetryConfig::builder()
354 /// .with_propagator(BaggagePropagator::new())
355 /// .build();
356 /// ```
357 pub fn with_propagator<T>(mut self, propagator: T) -> Self
358 where
359 T: TextMapPropagator + Send + Sync + 'static,
360 {
361 self.propagators.push(Box::new(propagator));
362 self
363 }
364
365 pub fn with_named_propagator(self, name: &str) -> Self {
366 match name {
367 "tracecontext" => self.with_propagator(TraceContextPropagator::new()),
368 "xray" => self.with_propagator(XrayPropagator::new()),
369 "xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
370 "none" => self.with_propagator(NoopPropagator::new()),
371 _ => {
372 tracing::warn!("Unknown propagator: {}, using default propagators", name);
373 self
374 }
375 }
376 }
377}
378
379/// Initialize OpenTelemetry for AWS Lambda with the provided configuration.
380///
381/// # Arguments
382///
383/// * `config` - Configuration for telemetry initialization
384///
385/// # Returns
386///
387/// Returns a tuple containing:
388/// - A tracer instance for manual instrumentation
389/// - A completion handler for managing span export timing
390///
391/// # Errors
392///
393/// Returns error if:
394/// - Extension registration fails (async/finalize modes)
395/// - Tracer provider initialization fails
396/// - Environment variable parsing fails
397///
398/// # Examples
399///
400/// Basic usage with default configuration:
401///
402/// ```no_run
403/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
404///
405/// # async fn example() -> Result<(), lambda_runtime::Error> {
406/// // Initialize with default configuration
407/// let (_, telemetry) = init_telemetry(TelemetryConfig::default()).await?;
408/// # Ok(())
409/// # }
410/// ```
411///
412/// Custom configuration:
413///
414/// ```no_run
415/// use lambda_otel_lite::telemetry::{init_telemetry, TelemetryConfig};
416/// use opentelemetry::KeyValue;
417/// use opentelemetry_sdk::Resource;
418///
419/// # async fn example() -> Result<(), lambda_runtime::Error> {
420/// // Create custom resource
421/// let resource = Resource::builder()
422/// .with_attributes(vec![
423/// KeyValue::new("service.name", "payment-api"),
424/// KeyValue::new("service.version", "1.2.3"),
425/// ])
426/// .build();
427///
428/// // Initialize with custom configuration
429/// let (_, telemetry) = init_telemetry(
430/// TelemetryConfig::builder()
431/// .resource(resource)
432/// .build()
433/// ).await?;
434/// # Ok(())
435/// # }
436/// ```
437///
438/// Advanced usage with BatchSpanProcessor (required for async exporters):
439///
440/// ```no_run
441/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
442/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
443/// use opentelemetry_sdk::trace::BatchSpanProcessor;
444/// use lambda_runtime::Error;
445///
446/// # async fn example() -> Result<(), Error> {
447/// let batch_exporter = opentelemetry_otlp::SpanExporter::builder()
448/// .with_http()
449/// .with_http_client(reqwest::Client::new())
450/// .with_protocol(Protocol::HttpBinary)
451/// .build()?;
452///
453/// let (provider, completion) = init_telemetry(
454/// TelemetryConfig::builder()
455/// .with_span_processor(BatchSpanProcessor::builder(batch_exporter).build())
456/// .build()
457/// ).await?;
458/// # Ok(())
459/// # }
460/// ```
461///
462/// Using LambdaSpanProcessor with blocking http client:
463///
464/// ```no_run
465/// use lambda_otel_lite::{init_telemetry, TelemetryConfig, LambdaSpanProcessor};
466/// use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, Protocol};
467/// use lambda_runtime::Error;
468///
469/// # async fn example() -> Result<(), Error> {
470/// let lambda_exporter = opentelemetry_otlp::SpanExporter::builder()
471/// .with_http()
472/// .with_http_client(reqwest::blocking::Client::new())
473/// .with_protocol(Protocol::HttpBinary)
474/// .build()?;
475///
476/// let (provider, completion) = init_telemetry(
477/// TelemetryConfig::builder()
478/// .with_span_processor(
479/// LambdaSpanProcessor::builder()
480/// .exporter(lambda_exporter)
481/// .max_batch_size(512)
482/// .max_queue_size(2048)
483/// .build()
484/// )
485/// .build()
486/// ).await?;
487/// # Ok(())
488/// # }
489/// ```
490///
491pub async fn init_telemetry(
492 mut config: TelemetryConfig,
493) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
494 let mode = ProcessorMode::from_env();
495
496 if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
497 let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
498
499 for propagator in propagators {
500 match propagator {
501 "tracecontext" => config
502 .propagators
503 .push(Box::new(TraceContextPropagator::new())),
504 "xray" => config.propagators.push(Box::new(XrayPropagator::new())),
505 "xray-lambda" => config
506 .propagators
507 .push(Box::new(LambdaXrayPropagator::new())),
508 "none" => config.propagators.push(Box::new(NoopPropagator::new())),
509 _ => tracing::warn!(
510 "Unknown propagator: {}, using default propagators",
511 propagator
512 ),
513 }
514 }
515 } else {
516 // if no propagators are set, use the default propagators
517 if config.propagators.is_empty() {
518 config
519 .propagators
520 .push(Box::new(TraceContextPropagator::new()));
521 config
522 .propagators
523 .push(Box::new(LambdaXrayPropagator::new()));
524 }
525 }
526
527 let composite_propagator = TextMapCompositePropagator::new(config.propagators);
528 global::set_text_map_propagator(composite_propagator);
529
530 // Add default span processor if none was added
531 if !config.has_processor {
532 let processor = LambdaSpanProcessor::builder()
533 .exporter(OtlpStdoutSpanExporter::default())
534 .build();
535 config.provider_builder = config.provider_builder.with_span_processor(processor);
536 }
537
538 // Apply defaults and build the provider
539 let resource = config.resource.unwrap_or_else(get_lambda_resource);
540 let provider = Arc::new(config.provider_builder.with_resource(resource).build());
541
542 // Register the extension if in async or finalize mode
543 let sender = match mode {
544 ProcessorMode::Async | ProcessorMode::Finalize => {
545 Some(register_extension(provider.clone(), mode.clone()).await?)
546 }
547 _ => None,
548 };
549
550 if config.set_global_provider {
551 // Set the provider as global
552 set_tracer_provider(provider.as_ref().clone());
553 }
554
555 // Initialize tracing subscriber with smart env var selection
556 let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
557 if env::var("RUST_LOG").is_ok() {
558 "RUST_LOG"
559 } else {
560 "AWS_LAMBDA_LOG_LEVEL"
561 }
562 });
563
564 let env_filter = tracing_subscriber::EnvFilter::builder()
565 .with_env_var(env_var_name)
566 .from_env_lossy();
567
568 let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
569 let tracer = completion_handler.get_tracer().clone();
570
571 let subscriber = tracing_subscriber::registry::Registry::default()
572 .with(tracing_opentelemetry::OpenTelemetryLayer::new(
573 tracer.clone(),
574 ))
575 .with(env_filter);
576
577 // Check if fmt layer should be enabled via environment variable
578 let enable_fmt = env::var(env_vars::ENABLE_FMT_LAYER)
579 .map(|v| v.to_lowercase() == "true")
580 .unwrap_or(defaults::ENABLE_FMT_LAYER);
581
582 // Enable fmt layer if either configured in code or enabled via environment
583 // This allows toggling logging via LAMBDA_TRACING_ENABLE_FMT_LAYER=true without code changes
584 // Log level is still controlled by AWS_LAMBDA_LOG_LEVEL or RUST_LOG as usual
585 if config.enable_fmt_layer || enable_fmt {
586 // Determine if the lambda logging configuration is set to output json logs
587 let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
588 .unwrap_or_default()
589 .to_uppercase()
590 == "JSON";
591
592 if is_json {
593 tracing::subscriber::set_global_default(
594 subscriber.with(
595 tracing_subscriber::fmt::layer()
596 .with_target(false)
597 .without_time()
598 .json(),
599 ),
600 )?;
601 } else {
602 tracing::subscriber::set_global_default(
603 subscriber.with(
604 tracing_subscriber::fmt::layer()
605 .with_target(false)
606 .without_time()
607 .with_ansi(false),
608 ),
609 )?;
610 }
611 } else {
612 tracing::subscriber::set_global_default(subscriber)?;
613 }
614
615 Ok((tracer, completion_handler))
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use opentelemetry_sdk::trace::SimpleSpanProcessor;
622 use sealed_test::prelude::*;
623 use tokio::sync::mpsc;
624
625 // Helper to clean up environment variables between tests
626 fn cleanup_env() {
627 env::remove_var("LAMBDA_TRACING_ENABLE_FMT_LAYER");
628 env::remove_var(constants::env_vars::PROPAGATORS);
629 env::remove_var("_X_AMZN_TRACE_ID");
630 }
631
632 #[test]
633 #[sealed_test]
634 fn test_telemetry_config_defaults() {
635 cleanup_env();
636
637 let config = TelemetryConfig::builder().build();
638 assert!(config.set_global_provider); // Should be true by default
639 assert!(!config.has_processor);
640 assert!(!config.enable_fmt_layer);
641 assert!(config.propagators.is_empty()); // No propagators by default in builder
642 }
643
644 #[test]
645 #[sealed_test]
646 fn test_telemetry_config_with_propagators() {
647 cleanup_env();
648
649 // Test with explicit tracecontext propagator
650 let config = TelemetryConfig::builder()
651 .with_span_processor(SimpleSpanProcessor::new(Box::new(
652 OtlpStdoutSpanExporter::default(),
653 )))
654 .with_named_propagator("tracecontext")
655 .build();
656 assert_eq!(config.propagators.len(), 1);
657
658 // Test with explicit xray propagator
659 let config = TelemetryConfig::builder()
660 .with_named_propagator("xray")
661 .build();
662 assert_eq!(config.propagators.len(), 1);
663
664 // Test with both propagators
665 let config = TelemetryConfig::builder()
666 .with_named_propagator("tracecontext")
667 .with_named_propagator("xray")
668 .build();
669 assert_eq!(config.propagators.len(), 2);
670
671 // Test with default propagators (empty - will be set in init_telemetry)
672 let config = TelemetryConfig::builder().build();
673 assert_eq!(config.propagators.len(), 0);
674
675 // Test with none
676 let config = TelemetryConfig::builder()
677 .with_named_propagator("none")
678 .build();
679 assert_eq!(config.propagators.len(), 1);
680 }
681
682 #[tokio::test]
683 #[sealed_test]
684 async fn test_telemetry_config_env_propagators_tracecontext() {
685 cleanup_env();
686
687 // Test with OTEL_PROPAGATORS=tracecontext
688 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
689 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
690 // In real usage we'd check the behavior rather than implementation details
691 // So we'll just check that we can create and use a handler
692 assert!(handler.sender.is_none());
693
694 cleanup_env();
695 }
696
697 #[tokio::test]
698 #[sealed_test]
699 async fn test_telemetry_config_env_propagators_xray() {
700 cleanup_env();
701
702 // Test with OTEL_PROPAGATORS=xray
703 env::set_var(constants::env_vars::PROPAGATORS, "xray");
704 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
705 assert!(handler.sender.is_none());
706
707 cleanup_env();
708 }
709
710 #[tokio::test]
711 #[sealed_test]
712 async fn test_telemetry_config_env_propagators_combined() {
713 cleanup_env();
714
715 // Test with OTEL_PROPAGATORS=tracecontext,xray-lambda
716 env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
717 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
718 assert!(handler.sender.is_none());
719
720 cleanup_env();
721 }
722
723 #[tokio::test]
724 #[sealed_test]
725 async fn test_telemetry_config_env_propagators_none() {
726 cleanup_env();
727
728 // Test with OTEL_PROPAGATORS=none
729 env::set_var(constants::env_vars::PROPAGATORS, "none");
730 let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
731 assert!(handler.sender.is_none());
732
733 cleanup_env();
734 }
735
736 #[tokio::test]
737 #[sealed_test]
738 async fn test_init_telemetry_defaults() {
739 let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
740 assert!(completion_handler.sender.is_none()); // Default mode is Sync
741 }
742
743 #[tokio::test]
744 #[sealed_test]
745 async fn test_init_telemetry_custom() {
746 let resource = Resource::builder().build();
747 let config = TelemetryConfig::builder()
748 .resource(resource)
749 .with_named_propagator("tracecontext")
750 .enable_fmt_layer(true)
751 .set_global_provider(false)
752 .build();
753
754 let (_, completion_handler) = init_telemetry(config).await.unwrap();
755 assert!(completion_handler.sender.is_none());
756 }
757
758 #[tokio::test]
759 #[sealed_test]
760 async fn test_telemetry_config_env_fmt_layer() {
761 cleanup_env();
762
763 // Test that environment variable is respected
764 env::set_var("LAMBDA_TRACING_ENABLE_FMT_LAYER", "true");
765 let config = TelemetryConfig::default();
766 assert!(!config.enable_fmt_layer); // Config should not be affected by env var
767 let enable_fmt = env::var(env_vars::ENABLE_FMT_LAYER)
768 .map(|v| v.to_lowercase() == "true")
769 .unwrap_or(defaults::ENABLE_FMT_LAYER);
770 assert!(enable_fmt); // But the env var should be true
771
772 // Test that code-level setting works independently
773 let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
774 assert!(config.enable_fmt_layer);
775
776 // Test default behavior
777 env::remove_var("LAMBDA_TRACING_ENABLE_FMT_LAYER");
778 let config = TelemetryConfig::default();
779 assert!(!config.enable_fmt_layer);
780 let enable_fmt = env::var(env_vars::ENABLE_FMT_LAYER)
781 .map(|v| v.to_lowercase() == "true")
782 .unwrap_or(defaults::ENABLE_FMT_LAYER);
783 assert!(!enable_fmt);
784
785 // Clean up
786 cleanup_env();
787 }
788
789 #[test]
790 fn test_completion_handler_sync_mode() {
791 let provider = Arc::new(
792 SdkTracerProvider::builder()
793 .with_span_processor(SimpleSpanProcessor::new(Box::new(
794 OtlpStdoutSpanExporter::default(),
795 )))
796 .build(),
797 );
798
799 let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
800
801 // In sync mode, complete() should call force_flush
802 handler.complete();
803 // Note: We can't easily verify the flush was called since TracerProvider
804 // doesn't expose this information, but we can verify it doesn't panic
805 }
806
807 #[tokio::test]
808 async fn test_completion_handler_async_mode() {
809 let provider = Arc::new(
810 SdkTracerProvider::builder()
811 .with_span_processor(SimpleSpanProcessor::new(Box::new(
812 OtlpStdoutSpanExporter::default(),
813 )))
814 .build(),
815 );
816
817 let (tx, mut rx) = mpsc::unbounded_channel();
818
819 let completion_handler =
820 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
821
822 // In async mode, complete() should send a message through the channel
823 completion_handler.complete();
824
825 // Verify that we received the completion signal
826 assert!(rx.try_recv().is_ok());
827 // Verify channel is now empty
828 assert!(rx.try_recv().is_err());
829 }
830
831 #[test]
832 fn test_completion_handler_finalize_mode() {
833 let provider = Arc::new(
834 SdkTracerProvider::builder()
835 .with_span_processor(SimpleSpanProcessor::new(Box::new(
836 OtlpStdoutSpanExporter::default(),
837 )))
838 .build(),
839 );
840
841 let (tx, _rx) = mpsc::unbounded_channel();
842
843 let completion_handler =
844 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
845
846 // In finalize mode, complete() should do nothing
847 completion_handler.complete();
848 // Verify it doesn't panic or cause issues
849 }
850
851 #[test]
852 fn test_completion_handler_clone() {
853 let provider = Arc::new(
854 SdkTracerProvider::builder()
855 .with_span_processor(SimpleSpanProcessor::new(Box::new(
856 OtlpStdoutSpanExporter::default(),
857 )))
858 .build(),
859 );
860
861 let (tx, _rx) = mpsc::unbounded_channel();
862
863 let completion_handler =
864 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
865
866 // Test that Clone is implemented correctly
867 let cloned = completion_handler.clone();
868
869 // Verify both handlers have the same mode
870 assert!(matches!(cloned.mode, ProcessorMode::Async));
871 assert!(cloned.sender.is_some());
872 }
873
874 #[test]
875 fn test_completion_handler_sync_mode_error_handling() {
876 let provider = Arc::new(
877 SdkTracerProvider::builder()
878 .with_span_processor(SimpleSpanProcessor::new(Box::new(
879 OtlpStdoutSpanExporter::default(),
880 )))
881 .build(),
882 );
883
884 let completion_handler =
885 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
886
887 // Test that complete() doesn't panic
888 completion_handler.complete();
889 }
890
891 #[tokio::test]
892 async fn test_completion_handler_async_mode_error_handling() {
893 let provider = Arc::new(
894 SdkTracerProvider::builder()
895 .with_span_processor(SimpleSpanProcessor::new(Box::new(
896 OtlpStdoutSpanExporter::default(),
897 )))
898 .build(),
899 );
900
901 // Use UnboundedSender instead of Sender
902 let (tx, _rx) = mpsc::unbounded_channel();
903 // Fill the channel by dropping the receiver
904 drop(_rx);
905
906 let completion_handler =
907 TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
908
909 // Test that complete() doesn't panic when receiver is dropped
910 completion_handler.complete();
911 }
912}
913
914// A simple no-op propagator
915#[derive(Debug)]
916struct NoopPropagator;
917
918impl NoopPropagator {
919 fn new() -> Self {
920 NoopPropagator
921 }
922}
923
924impl TextMapPropagator for NoopPropagator {
925 fn inject_context(
926 &self,
927 _cx: &opentelemetry::Context,
928 _injector: &mut dyn opentelemetry::propagation::Injector,
929 ) {
930 }
931
932 fn extract_with_context(
933 &self,
934 cx: &opentelemetry::Context,
935 _extractor: &dyn opentelemetry::propagation::Extractor,
936 ) -> opentelemetry::Context {
937 cx.clone()
938 }
939
940 fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
941 opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
942 }
943}