use crate::{
constants, extension::register_extension, logger::Logger, mode::ProcessorMode,
processor::LambdaSpanProcessor, propagation::LambdaXrayPropagator,
resource::get_lambda_resource,
};
use bon::Builder;
use lambda_runtime::Error;
use opentelemetry::propagation::{TextMapCompositePropagator, TextMapPropagator};
use opentelemetry::{global, global::set_tracer_provider, trace::TracerProvider as _, KeyValue};
use opentelemetry_aws::trace::XrayPropagator;
use opentelemetry_sdk::{
propagation::TraceContextPropagator,
trace::{IdGenerator, SdkTracerProvider, ShouldSample, SpanProcessor, TracerProviderBuilder},
Resource,
};
use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
use std::{borrow::Cow, env, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;
use tracing_subscriber::layer::SubscriberExt;
static LOGGER: Logger = Logger::const_new("telemetry");
#[derive(Clone)]
pub struct TelemetryCompletionHandler {
provider: Arc<SdkTracerProvider>,
sender: Option<UnboundedSender<()>>,
mode: ProcessorMode,
tracer: opentelemetry_sdk::trace::Tracer,
}
impl TelemetryCompletionHandler {
pub fn new(
provider: Arc<SdkTracerProvider>,
sender: Option<UnboundedSender<()>>,
mode: ProcessorMode,
) -> Self {
let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
.with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
.with_schema_url(Cow::Borrowed("https://opentelemetry.io/schemas/1.30.0"))
.with_attributes(vec![
KeyValue::new("library.language", "rust"),
KeyValue::new("library.type", "instrumentation"),
KeyValue::new("library.runtime", "aws_lambda"),
])
.build();
let tracer = provider.tracer_with_scope(scope);
Self {
provider,
sender,
mode,
tracer,
}
}
pub fn get_tracer(&self) -> &opentelemetry_sdk::trace::Tracer {
&self.tracer
}
pub fn complete(&self) {
match self.mode {
ProcessorMode::Sync => {
if let Err(e) = self.provider.force_flush() {
LOGGER.warn(format!("Error flushing telemetry: {e:?}"));
}
}
ProcessorMode::Async => {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(()) {
LOGGER.warn(format!(
"Failed to send completion signal to extension: {e:?}"
));
}
}
}
ProcessorMode::Finalize => {
}
}
}
}
#[derive(Builder, Debug)]
pub struct TelemetryConfig {
#[builder(field)]
provider_builder: TracerProviderBuilder,
#[builder(field)]
has_processor: bool,
#[builder(field)]
propagators: Vec<Box<dyn TextMapPropagator + Send + Sync>>,
#[builder(default = false)]
pub enable_fmt_layer: bool,
#[builder(default = true)]
pub set_global_provider: bool,
pub resource: Option<Resource>,
pub env_var_name: Option<String>,
pub processor_mode: Option<ProcessorMode>,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self::builder().build()
}
}
impl<S: telemetry_config_builder::State> TelemetryConfigBuilder<S> {
pub fn with_span_processor<T>(mut self, processor: T) -> Self
where
T: SpanProcessor + 'static,
{
self.provider_builder = self.provider_builder.with_span_processor(processor);
self.has_processor = true;
self
}
pub fn with_propagator<T>(mut self, propagator: T) -> Self
where
T: TextMapPropagator + Send + Sync + 'static,
{
self.propagators.push(Box::new(propagator));
self
}
pub fn with_named_propagator(self, name: &str) -> Self {
match name {
"tracecontext" => self.with_propagator(TraceContextPropagator::new()),
"xray" => self.with_propagator(XrayPropagator::new()),
"xray-lambda" => self.with_propagator(LambdaXrayPropagator::new()),
"none" => self.with_propagator(NoopPropagator::new()),
_ => {
LOGGER.warn(format!(
"Unknown propagator: {name}, using default propagators"
));
self
}
}
}
pub fn with_id_generator<T>(mut self, id_generator: T) -> Self
where
T: IdGenerator + 'static,
{
self.provider_builder = self.provider_builder.with_id_generator(id_generator);
self
}
pub fn with_sampler<T>(mut self, sampler: T) -> Self
where
T: ShouldSample + 'static,
{
self.provider_builder = self.provider_builder.with_sampler(sampler);
self
}
}
pub async fn init_telemetry(
mut config: TelemetryConfig,
) -> Result<(opentelemetry_sdk::trace::Tracer, TelemetryCompletionHandler), Error> {
let mode = ProcessorMode::resolve(config.processor_mode);
if let Ok(env_propagators) = env::var(constants::env_vars::PROPAGATORS) {
let propagators: Vec<&str> = env_propagators.split(',').map(|s| s.trim()).collect();
for propagator in propagators {
match propagator {
"tracecontext" => config
.propagators
.push(Box::new(TraceContextPropagator::new())),
"xray" => config.propagators.push(Box::new(XrayPropagator::new())),
"xray-lambda" => config
.propagators
.push(Box::new(LambdaXrayPropagator::new())),
"none" => config.propagators.push(Box::new(NoopPropagator::new())),
_ => LOGGER.warn(format!(
"Unknown propagator: {propagator}, using default propagators"
)),
}
}
} else {
if config.propagators.is_empty() {
config
.propagators
.push(Box::new(LambdaXrayPropagator::new()));
config
.propagators
.push(Box::new(TraceContextPropagator::new()));
}
}
let composite_propagator = TextMapCompositePropagator::new(config.propagators);
global::set_text_map_propagator(composite_propagator);
if !config.has_processor {
let processor = LambdaSpanProcessor::builder()
.exporter(OtlpStdoutSpanExporter::default())
.build();
config.provider_builder = config.provider_builder.with_span_processor(processor);
}
let resource = config.resource.unwrap_or_else(get_lambda_resource);
let provider = Arc::new(config.provider_builder.with_resource(resource).build());
let sender = match mode {
ProcessorMode::Async | ProcessorMode::Finalize => {
Some(register_extension(provider.clone(), mode.clone()).await?)
}
_ => None,
};
if config.set_global_provider {
set_tracer_provider(provider.as_ref().clone());
}
let env_var_name = config.env_var_name.as_deref().unwrap_or_else(|| {
if env::var("RUST_LOG").is_ok() {
"RUST_LOG"
} else {
"AWS_LAMBDA_LOG_LEVEL"
}
});
let env_filter = tracing_subscriber::EnvFilter::builder()
.with_env_var(env_var_name)
.from_env_lossy();
let completion_handler = TelemetryCompletionHandler::new(provider.clone(), sender, mode);
let tracer = completion_handler.get_tracer().clone();
let subscriber = tracing_subscriber::registry::Registry::default()
.with(tracing_opentelemetry::OpenTelemetryLayer::new(
tracer.clone(),
))
.with(env_filter);
let enable_fmt = if let Ok(env_value) = env::var(constants::env_vars::ENABLE_FMT_LAYER) {
match env_value.to_lowercase().as_str() {
"true" => true,
"false" => false,
other => {
LOGGER.warn(format!(
"Invalid value '{}' for {}, expected 'true' or 'false'. Using code configuration.",
other,
constants::env_vars::ENABLE_FMT_LAYER
));
config.enable_fmt_layer
}
}
} else {
config.enable_fmt_layer
};
if enable_fmt {
let is_json = env::var("AWS_LAMBDA_LOG_FORMAT")
.unwrap_or_default()
.to_uppercase()
== "JSON";
if is_json {
tracing::subscriber::set_global_default(
subscriber.with(
tracing_subscriber::fmt::layer()
.with_target(false)
.without_time()
.json(),
),
)?;
} else {
tracing::subscriber::set_global_default(
subscriber.with(
tracing_subscriber::fmt::layer()
.with_target(false)
.without_time()
.with_ansi(false),
),
)?;
}
} else {
tracing::subscriber::set_global_default(subscriber)?;
}
Ok((tracer, completion_handler))
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::trace::{Span, Tracer};
use opentelemetry_aws::trace::XrayIdGenerator;
use opentelemetry_sdk::trace::{Sampler, SimpleSpanProcessor};
use sealed_test::prelude::*;
use std::sync::Arc;
use tokio::sync::mpsc;
fn cleanup_env() {
env::remove_var(constants::env_vars::ENABLE_FMT_LAYER);
env::remove_var(constants::env_vars::PROPAGATORS);
env::remove_var(constants::env_vars::PROCESSOR_MODE);
env::remove_var("_X_AMZN_TRACE_ID");
env::remove_var("AWS_LAMBDA_RUNTIME_API");
}
#[test]
#[sealed_test]
fn test_telemetry_config_defaults() {
cleanup_env();
let config = TelemetryConfig::builder().build();
assert!(config.set_global_provider); assert!(!config.has_processor);
assert!(!config.enable_fmt_layer);
assert!(config.propagators.is_empty()); }
#[test]
#[sealed_test]
fn test_telemetry_config_with_propagators() {
cleanup_env();
let config = TelemetryConfig::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.with_named_propagator("tracecontext")
.build();
assert_eq!(config.propagators.len(), 1);
let config = TelemetryConfig::builder()
.with_named_propagator("xray")
.build();
assert_eq!(config.propagators.len(), 1);
let config = TelemetryConfig::builder()
.with_named_propagator("tracecontext")
.with_named_propagator("xray")
.build();
assert_eq!(config.propagators.len(), 2);
let config = TelemetryConfig::builder().build();
assert_eq!(config.propagators.len(), 0);
let config = TelemetryConfig::builder()
.with_named_propagator("none")
.build();
assert_eq!(config.propagators.len(), 1);
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_propagators_tracecontext() {
cleanup_env();
env::set_var(constants::env_vars::PROPAGATORS, "tracecontext");
let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
assert!(handler.sender.is_none());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_propagators_xray() {
cleanup_env();
env::set_var(constants::env_vars::PROPAGATORS, "xray");
let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
assert!(handler.sender.is_none());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_propagators_combined() {
cleanup_env();
env::set_var(constants::env_vars::PROPAGATORS, "tracecontext,xray-lambda");
let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
assert!(handler.sender.is_none());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_propagators_none() {
cleanup_env();
env::set_var(constants::env_vars::PROPAGATORS, "none");
let (_, handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
assert!(handler.sender.is_none());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_init_telemetry_defaults() {
let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await.unwrap();
assert!(completion_handler.sender.is_none()); }
#[tokio::test]
#[sealed_test]
async fn test_init_telemetry_custom() {
let resource = Resource::builder().build();
let config = TelemetryConfig::builder()
.resource(resource)
.with_named_propagator("tracecontext")
.enable_fmt_layer(true)
.set_global_provider(false)
.build();
let (_, completion_handler) = init_telemetry(config).await.unwrap();
assert!(completion_handler.sender.is_none());
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_fmt_layer_true_override() {
cleanup_env();
env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "true");
let config = TelemetryConfig::default(); assert!(!config.enable_fmt_layer);
let result = init_telemetry(config).await;
assert!(result.is_ok());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_fmt_layer_false_override() {
cleanup_env();
env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "false");
let config = TelemetryConfig::builder()
.enable_fmt_layer(true) .build();
assert!(config.enable_fmt_layer);
let result = init_telemetry(config).await;
assert!(result.is_ok());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_fmt_layer_invalid() {
cleanup_env();
env::set_var(constants::env_vars::ENABLE_FMT_LAYER, "invalid");
let config = TelemetryConfig::builder().enable_fmt_layer(true).build();
let result = init_telemetry(config).await;
assert!(result.is_ok());
cleanup_env();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_fmt_layer_not_set() {
cleanup_env();
let config = TelemetryConfig::default();
assert!(!config.enable_fmt_layer);
let result = init_telemetry(config).await;
assert!(result.is_ok());
cleanup_env();
}
#[test]
fn test_completion_handler_sync_mode() {
let provider = Arc::new(
SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.build(),
);
let handler = TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
handler.complete();
}
#[tokio::test]
async fn test_completion_handler_async_mode() {
let provider = Arc::new(
SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.build(),
);
let (tx, mut rx) = mpsc::unbounded_channel();
let completion_handler =
TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
completion_handler.complete();
assert!(rx.try_recv().is_ok());
assert!(rx.try_recv().is_err());
}
#[test]
fn test_completion_handler_finalize_mode() {
let provider = Arc::new(
SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.build(),
);
let (tx, _rx) = mpsc::unbounded_channel();
let completion_handler =
TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Finalize);
completion_handler.complete();
}
#[test]
fn test_completion_handler_clone() {
let provider = Arc::new(
SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.build(),
);
let (tx, _rx) = mpsc::unbounded_channel();
let completion_handler =
TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
let cloned = completion_handler.clone();
assert!(matches!(cloned.mode, ProcessorMode::Async));
assert!(cloned.sender.is_some());
}
#[test]
fn test_completion_handler_sync_mode_error_handling() {
let provider = Arc::new(
SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.build(),
);
let completion_handler =
TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
completion_handler.complete();
}
#[tokio::test]
async fn test_completion_handler_async_mode_error_handling() {
let provider = Arc::new(
SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(OtlpStdoutSpanExporter::default()))
.build(),
);
let (tx, _rx) = mpsc::unbounded_channel();
drop(_rx);
let completion_handler =
TelemetryCompletionHandler::new(provider, Some(tx), ProcessorMode::Async);
completion_handler.complete();
}
#[test]
#[sealed_test]
fn test_telemetry_config_with_id_generator() {
cleanup_env();
let config = TelemetryConfig::builder()
.with_id_generator(XrayIdGenerator::default())
.build();
let provider = Arc::new(config.provider_builder.build());
let scope = opentelemetry::InstrumentationScope::builder("test")
.with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION")))
.build();
let tracer = provider.tracer_with_scope(scope);
let span = tracer.start_with_context("test span", &opentelemetry::Context::current());
let trace_id = span.span_context().trace_id();
let trace_id_hex = format!("{trace_id:032x}");
let timestamp_part = &trace_id_hex[0..8];
let timestamp = u32::from_str_radix(timestamp_part, 16).unwrap();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as u32;
assert!(timestamp <= now);
assert!(timestamp > now - 86400);
let random_part = &trace_id_hex[8..];
assert_ne!(random_part, "000000000000000000000000");
}
#[test]
#[sealed_test]
fn test_telemetry_config_with_sampler() {
cleanup_env();
let _provider = TelemetryConfig::builder()
.with_sampler(Sampler::AlwaysOn)
.build()
.provider_builder
.build();
let _provider = TelemetryConfig::builder()
.with_sampler(Sampler::TraceIdRatioBased(0.1))
.build()
.provider_builder
.build();
let config = TelemetryConfig::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)))
.build();
let _provider = config.provider_builder.build();
}
#[test]
#[sealed_test]
fn test_telemetry_config_with_standard_samplers() {
cleanup_env();
let _provider = TelemetryConfig::builder()
.with_sampler(Sampler::AlwaysOn)
.build()
.provider_builder
.build();
let _provider = TelemetryConfig::builder()
.with_sampler(Sampler::AlwaysOff)
.build()
.provider_builder
.build();
let _provider = TelemetryConfig::builder()
.with_sampler(Sampler::TraceIdRatioBased(0.5))
.build()
.provider_builder
.build();
let config = TelemetryConfig::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)))
.build();
let _provider = config.provider_builder.build();
}
#[tokio::test]
#[sealed_test]
async fn test_telemetry_config_env_sampler() {
cleanup_env();
env::set_var("OTEL_TRACES_SAMPLER", "always_on");
let config = TelemetryConfig::builder()
.set_global_provider(false) .build();
let (_, _) = init_telemetry(config).await.unwrap();
cleanup_env();
}
}
#[derive(Debug)]
struct NoopPropagator;
impl NoopPropagator {
fn new() -> Self {
NoopPropagator
}
}
impl TextMapPropagator for NoopPropagator {
fn inject_context(
&self,
_cx: &opentelemetry::Context,
_injector: &mut dyn opentelemetry::propagation::Injector,
) {
}
fn extract_with_context(
&self,
cx: &opentelemetry::Context,
_extractor: &dyn opentelemetry::propagation::Extractor,
) -> opentelemetry::Context {
cx.clone()
}
fn fields(&self) -> opentelemetry::propagation::text_map_propagator::FieldIter<'_> {
opentelemetry::propagation::text_map_propagator::FieldIter::new(&[])
}
}