opentelemetry_lambda_tower/
service.rs

1//! Tower Service implementation for OpenTelemetry tracing.
2
3use crate::cold_start::check_cold_start;
4use crate::extractor::TraceContextExtractor;
5use crate::future::OtelTracingFuture;
6use lambda_runtime::LambdaEvent;
7use opentelemetry_sdk::logs::SdkLoggerProvider;
8use opentelemetry_sdk::trace::SdkTracerProvider;
9use opentelemetry_semantic_conventions::attribute::{
10    CLIENT_ADDRESS, CLOUD_ACCOUNT_ID, CLOUD_PROVIDER, CLOUD_REGION, FAAS_MAX_MEMORY, FAAS_NAME,
11    FAAS_VERSION, HTTP_REQUEST_METHOD, HTTP_ROUTE, MESSAGING_BATCH_MESSAGE_COUNT,
12    MESSAGING_DESTINATION_NAME, MESSAGING_MESSAGE_ID, MESSAGING_OPERATION_TYPE, MESSAGING_SYSTEM,
13    NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, URL_PATH, URL_QUERY, URL_SCHEME, USER_AGENT_ORIGINAL,
14};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use std::time::Duration;
18use tower::Service;
19use tracing::Span;
20use tracing_opentelemetry::OpenTelemetrySpanExt;
21
22/// Tower service that instruments Lambda handlers with OpenTelemetry tracing.
23///
24/// This service wraps an inner service and:
25/// 1. Extracts trace context from the Lambda event
26/// 2. Creates a span with appropriate semantic attributes
27/// 3. Invokes the inner service within the span context
28/// 4. Flushes the tracer provider after completion
29///
30/// # Type Parameters
31///
32/// * `S` - The inner service type
33/// * `E` - The trace context extractor type
34#[derive(Clone)]
35pub struct OtelTracingService<S, E> {
36    inner: S,
37    extractor: E,
38    tracer_provider: Option<Arc<SdkTracerProvider>>,
39    logger_provider: Option<Arc<SdkLoggerProvider>>,
40    flush_on_end: bool,
41    flush_timeout: Duration,
42}
43
44impl<S, E> OtelTracingService<S, E> {
45    /// Creates a new tracing service wrapping the given service.
46    pub(crate) fn new(
47        inner: S,
48        extractor: E,
49        tracer_provider: Option<Arc<SdkTracerProvider>>,
50        logger_provider: Option<Arc<SdkLoggerProvider>>,
51        flush_on_end: bool,
52        flush_timeout: Duration,
53    ) -> Self {
54        Self {
55            inner,
56            extractor,
57            tracer_provider,
58            logger_provider,
59            flush_on_end,
60            flush_timeout,
61        }
62    }
63}
64
65impl<S, E, T> Service<LambdaEvent<T>> for OtelTracingService<S, E>
66where
67    S: Service<LambdaEvent<T>>,
68    S::Error: std::fmt::Display,
69    E: TraceContextExtractor<T>,
70    T: Send + 'static,
71{
72    type Response = S::Response;
73    type Error = S::Error;
74    type Future = OtelTracingFuture<S::Future, S::Response, S::Error>;
75
76    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
77        self.inner.poll_ready(cx)
78    }
79
80    fn call(&mut self, event: LambdaEvent<T>) -> Self::Future {
81        let (payload, lambda_ctx) = event.into_parts();
82
83        // Extract trace context from the event
84        let parent_context = self.extractor.extract_context(&payload);
85
86        // Extract any span links (for SQS/SNS batch processing)
87        let links = self.extractor.extract_links(&payload);
88
89        // Check for cold start
90        let is_cold_start = check_cold_start();
91
92        // Generate span name from extractor
93        let span_name = self.extractor.span_name(&payload, &lambda_ctx);
94
95        // Create the span with tracing crate
96        // All semantic convention fields must be declared here for record() to work
97        let span = tracing::info_span!(
98            "lambda.invoke",
99            otel.name = %span_name,
100            otel.kind = "server",
101            faas.trigger = %self.extractor.trigger_type(),
102            faas.invocation_id = %lambda_ctx.request_id,
103            faas.coldstart = is_cold_start,
104            // HTTP semantic conventions
105            { HTTP_REQUEST_METHOD } = tracing::field::Empty,
106            { URL_PATH } = tracing::field::Empty,
107            { URL_QUERY } = tracing::field::Empty,
108            { URL_SCHEME } = tracing::field::Empty,
109            { HTTP_ROUTE } = tracing::field::Empty,
110            { USER_AGENT_ORIGINAL } = tracing::field::Empty,
111            { CLIENT_ADDRESS } = tracing::field::Empty,
112            { SERVER_ADDRESS } = tracing::field::Empty,
113            { NETWORK_PROTOCOL_VERSION } = tracing::field::Empty,
114            // Messaging semantic conventions (SQS/SNS)
115            { MESSAGING_SYSTEM } = tracing::field::Empty,
116            { MESSAGING_OPERATION_TYPE } = tracing::field::Empty,
117            { MESSAGING_DESTINATION_NAME } = tracing::field::Empty,
118            { MESSAGING_MESSAGE_ID } = tracing::field::Empty,
119            { MESSAGING_BATCH_MESSAGE_COUNT } = tracing::field::Empty,
120            // Lambda context attributes
121            { CLOUD_PROVIDER } = tracing::field::Empty,
122            { CLOUD_REGION } = tracing::field::Empty,
123            { CLOUD_ACCOUNT_ID } = tracing::field::Empty,
124            { FAAS_NAME } = tracing::field::Empty,
125            { FAAS_VERSION } = tracing::field::Empty,
126            { FAAS_MAX_MEMORY } = tracing::field::Empty,
127            aws.lambda.invoked_arn = tracing::field::Empty,
128        );
129
130        // Set the parent context from extraction
131        let _ = span.set_parent(parent_context);
132
133        // Add span links if any
134        for link in links {
135            span.add_link(link.span_context.clone());
136        }
137
138        // Record event-specific attributes
139        self.extractor.record_attributes(&payload, &span);
140
141        // Record Lambda context attributes
142        record_lambda_context_attributes(&span, &lambda_ctx);
143
144        // Reconstruct the event for the inner service
145        let event = LambdaEvent::new(payload, lambda_ctx);
146
147        // Call inner service with the span as parent context.
148        // We pass the inner future directly without .instrument() so that we
149        // have the only reference to the span. This ensures the span can be
150        // fully closed before we flush.
151        let future = {
152            let _guard = span.enter();
153            self.inner.call(event)
154        };
155
156        OtelTracingFuture::new(
157            future,
158            span,
159            self.tracer_provider.clone(),
160            self.logger_provider.clone(),
161            self.flush_on_end,
162            self.flush_timeout,
163        )
164    }
165}
166
167/// Records standard Lambda context attributes on the span.
168fn record_lambda_context_attributes(span: &Span, ctx: &lambda_runtime::Context) {
169    span.record(CLOUD_PROVIDER, "aws");
170    span.record(FAAS_NAME, ctx.env_config.function_name.as_str());
171    span.record(FAAS_VERSION, ctx.env_config.version.as_str());
172
173    let memory_bytes = ctx.env_config.memory as i64 * 1024 * 1024;
174    span.record(FAAS_MAX_MEMORY, memory_bytes);
175
176    if let Ok(region) = std::env::var("AWS_REGION") {
177        span.record(CLOUD_REGION, region.as_str());
178    }
179
180    span.record("aws.lambda.invoked_arn", ctx.invoked_function_arn.as_str());
181
182    if let Some(account_id) = ctx.invoked_function_arn.split(':').nth(4) {
183        span.record(CLOUD_ACCOUNT_ID, account_id);
184    }
185}