opentelemetry_lambda_tower/
service.rs

1//! Tower Service implementation for OpenTelemetry tracing.
2
3use crate::cold_start::check_cold_start;
4use crate::extractor::TraceContextExtractor;
5use crate::future::OtelTracingFuture;
6use lambda_runtime::LambdaEvent;
7use opentelemetry_sdk::logs::SdkLoggerProvider;
8use opentelemetry_sdk::trace::SdkTracerProvider;
9use opentelemetry_semantic_conventions::attribute::{
10    CLOUD_ACCOUNT_ID, CLOUD_PROVIDER, CLOUD_REGION, FAAS_MAX_MEMORY, FAAS_NAME, FAAS_VERSION,
11};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tower::Service;
16use tracing::Span;
17use tracing_opentelemetry::OpenTelemetrySpanExt;
18
19/// Tower service that instruments Lambda handlers with OpenTelemetry tracing.
20///
21/// This service wraps an inner service and:
22/// 1. Extracts trace context from the Lambda event
23/// 2. Creates a span with appropriate semantic attributes
24/// 3. Invokes the inner service within the span context
25/// 4. Flushes the tracer provider after completion
26///
27/// # Type Parameters
28///
29/// * `S` - The inner service type
30/// * `E` - The trace context extractor type
31#[derive(Clone)]
32pub struct OtelTracingService<S, E> {
33    inner: S,
34    extractor: E,
35    tracer_provider: Option<Arc<SdkTracerProvider>>,
36    logger_provider: Option<Arc<SdkLoggerProvider>>,
37    flush_on_end: bool,
38    flush_timeout: Duration,
39}
40
41impl<S, E> OtelTracingService<S, E> {
42    /// Creates a new tracing service wrapping the given service.
43    pub(crate) fn new(
44        inner: S,
45        extractor: E,
46        tracer_provider: Option<Arc<SdkTracerProvider>>,
47        logger_provider: Option<Arc<SdkLoggerProvider>>,
48        flush_on_end: bool,
49        flush_timeout: Duration,
50    ) -> Self {
51        Self {
52            inner,
53            extractor,
54            tracer_provider,
55            logger_provider,
56            flush_on_end,
57            flush_timeout,
58        }
59    }
60}
61
62impl<S, E, T> Service<LambdaEvent<T>> for OtelTracingService<S, E>
63where
64    S: Service<LambdaEvent<T>>,
65    S::Error: std::fmt::Display,
66    E: TraceContextExtractor<T>,
67    T: Send + 'static,
68{
69    type Response = S::Response;
70    type Error = S::Error;
71    type Future = OtelTracingFuture<S::Future, S::Response, S::Error>;
72
73    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74        self.inner.poll_ready(cx)
75    }
76
77    fn call(&mut self, event: LambdaEvent<T>) -> Self::Future {
78        let (payload, lambda_ctx) = event.into_parts();
79
80        // Extract trace context from the event
81        let parent_context = self.extractor.extract_context(&payload);
82
83        // Extract any span links (for SQS/SNS batch processing)
84        let links = self.extractor.extract_links(&payload);
85
86        // Check for cold start
87        let is_cold_start = check_cold_start();
88
89        // Generate span name from extractor
90        let span_name = self.extractor.span_name(&payload, &lambda_ctx);
91
92        // Create the span with tracing crate
93        let span = tracing::info_span!(
94            "lambda.invoke",
95            otel.name = %span_name,
96            otel.kind = "server",
97            faas.trigger = %self.extractor.trigger_type(),
98            faas.invocation_id = %lambda_ctx.request_id,
99            faas.coldstart = is_cold_start,
100        );
101
102        // Set the parent context from extraction
103        let _ = span.set_parent(parent_context);
104
105        // Add span links if any
106        for link in links {
107            span.add_link(link.span_context.clone());
108        }
109
110        // Record event-specific attributes
111        self.extractor.record_attributes(&payload, &span);
112
113        // Record Lambda context attributes
114        record_lambda_context_attributes(&span, &lambda_ctx);
115
116        // Reconstruct the event for the inner service
117        let event = LambdaEvent::new(payload, lambda_ctx);
118
119        // Call inner service with the span as parent context.
120        // We pass the inner future directly without .instrument() so that we
121        // have the only reference to the span. This ensures the span can be
122        // fully closed before we flush.
123        let future = {
124            let _guard = span.enter();
125            self.inner.call(event)
126        };
127
128        OtelTracingFuture::new(
129            future,
130            span,
131            self.tracer_provider.clone(),
132            self.logger_provider.clone(),
133            self.flush_on_end,
134            self.flush_timeout,
135        )
136    }
137}
138
139/// Records standard Lambda context attributes on the span.
140fn record_lambda_context_attributes(span: &Span, ctx: &lambda_runtime::Context) {
141    span.record(CLOUD_PROVIDER, "aws");
142    span.record(FAAS_NAME, ctx.env_config.function_name.as_str());
143    span.record(FAAS_VERSION, ctx.env_config.version.as_str());
144
145    let memory_bytes = ctx.env_config.memory as i64 * 1024 * 1024;
146    span.record(FAAS_MAX_MEMORY, memory_bytes);
147
148    if let Ok(region) = std::env::var("AWS_REGION") {
149        span.record(CLOUD_REGION, region.as_str());
150    }
151
152    span.record("aws.lambda.invoked_arn", ctx.invoked_function_arn.as_str());
153
154    if let Some(account_id) = ctx.invoked_function_arn.split(':').nth(4) {
155        span.record(CLOUD_ACCOUNT_ID, account_id);
156    }
157}