opentelemetry_lambda_tower/extractors/
lambda_http.rs

1//! Extractor for `lambda_http` crate integration.
2//!
3//! Provides trace context extraction from [`lambda_http::request::LambdaRequest`],
4//! enabling seamless integration with the `lambda_http` crate's tower middleware pattern.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use lambda_http::{Body, Error, Request, Response};
10//! use opentelemetry_lambda_tower::{LambdaHttpExtractor, OtelTracingLayer};
11//! use tower::ServiceBuilder;
12//!
13//! async fn handler(event: Request) -> Result<Response<Body>, Error> {
14//!     Ok(Response::builder().status(200).body("Hello".into())?)
15//! }
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Error> {
19//!     let tracing_layer = OtelTracingLayer::new(LambdaHttpExtractor::new());
20//!
21//!     let service = ServiceBuilder::new()
22//!         .layer(tracing_layer)
23//!         .layer_fn(lambda_http::Adapter::from)
24//!         .service_fn(handler);
25//!
26//!     lambda_runtime::run(service).await
27//! }
28//! ```
29
30use crate::extractor::TraceContextExtractor;
31use http::HeaderMap;
32use lambda_http::request::LambdaRequest;
33use lambda_runtime::Context as LambdaContext;
34use opentelemetry::Context;
35use opentelemetry::propagation::Extractor;
36use opentelemetry::trace::TraceContextExt;
37use opentelemetry_semantic_conventions::attribute::{
38    CLIENT_ADDRESS, HTTP_REQUEST_METHOD, HTTP_ROUTE, NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS,
39    URL_PATH, URL_QUERY, URL_SCHEME, USER_AGENT_ORIGINAL,
40};
41use tracing::Span;
42
43/// Extractor for `lambda_http::request::LambdaRequest`.
44///
45/// Handles all HTTP event types supported by `lambda_http`:
46/// - API Gateway REST API (v1)
47/// - API Gateway HTTP API (v2)
48/// - Application Load Balancer (ALB)
49/// - API Gateway WebSocket
50///
51/// Uses the globally configured OpenTelemetry propagator for trace context extraction,
52/// supporting W3C TraceContext, B3, Jaeger, X-Ray, or any composite propagator.
53/// Falls back to the `_X_AMZN_TRACE_ID` environment variable if no trace headers are found.
54#[derive(Clone, Debug, Default)]
55pub struct LambdaHttpExtractor;
56
57impl LambdaHttpExtractor {
58    /// Creates a new extractor.
59    ///
60    /// Uses the globally configured OpenTelemetry propagator for trace context extraction.
61    /// Configure the propagator via `opentelemetry::global::set_text_map_propagator()`.
62    pub fn new() -> Self {
63        Self
64    }
65
66    fn extract_from_headers(&self, headers: &HeaderMap) -> Context {
67        let extractor = HeaderMapExtractor(headers);
68        let ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
69            propagator.extract(&extractor)
70        });
71
72        if ctx.span().span_context().is_valid() {
73            return ctx;
74        }
75
76        if let Ok(xray_header) = std::env::var("_X_AMZN_TRACE_ID") {
77            let env_extractor = XRayEnvExtractor::new(&xray_header);
78            let xray_ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
79                propagator.extract(&env_extractor)
80            });
81            if xray_ctx.span().span_context().is_valid() {
82                return xray_ctx;
83            }
84        }
85
86        Context::current()
87    }
88}
89
90impl TraceContextExtractor<LambdaRequest> for LambdaHttpExtractor {
91    fn extract_context(&self, event: &LambdaRequest) -> Context {
92        match event {
93            LambdaRequest::ApiGatewayV1(req) => self.extract_from_headers(&req.headers),
94            LambdaRequest::ApiGatewayV2(req) => self.extract_from_headers(&req.headers),
95            LambdaRequest::Alb(req) => self.extract_from_headers(&req.headers),
96            LambdaRequest::WebSocket(req) => self.extract_from_headers(&req.headers),
97            _ => Context::current(),
98        }
99    }
100
101    fn trigger_type(&self) -> &'static str {
102        "http"
103    }
104
105    fn span_name(&self, event: &LambdaRequest, lambda_ctx: &LambdaContext) -> String {
106        match event {
107            LambdaRequest::ApiGatewayV1(req) => {
108                let method = req.http_method.as_str();
109                let route = req
110                    .resource
111                    .as_deref()
112                    .or(req.path.as_deref())
113                    .unwrap_or(&lambda_ctx.env_config.function_name);
114                format!("{} {}", method, route)
115            }
116            LambdaRequest::ApiGatewayV2(req) => {
117                let method = req.request_context.http.method.as_str();
118                let route = req
119                    .route_key
120                    .as_deref()
121                    .and_then(|rk| rk.split_once(' ').map(|(_, route)| route))
122                    .or(req.raw_path.as_deref())
123                    .unwrap_or(&lambda_ctx.env_config.function_name);
124                format!("{} {}", method, route)
125            }
126            LambdaRequest::Alb(req) => {
127                let method = req.http_method.as_str();
128                let path = req.path.as_deref().unwrap_or("/");
129                format!("{} {}", method, path)
130            }
131            LambdaRequest::WebSocket(req) => {
132                let route = req
133                    .request_context
134                    .route_key
135                    .as_deref()
136                    .unwrap_or("$default");
137                format!("WebSocket {}", route)
138            }
139            _ => lambda_ctx.env_config.function_name.clone(),
140        }
141    }
142
143    fn record_attributes(&self, event: &LambdaRequest, span: &Span) {
144        match event {
145            LambdaRequest::ApiGatewayV1(req) => {
146                record_apigw_v1_attributes(req, span);
147            }
148            LambdaRequest::ApiGatewayV2(req) => {
149                record_apigw_v2_attributes(req, span);
150            }
151            LambdaRequest::Alb(req) => {
152                record_alb_attributes(req, span);
153            }
154            LambdaRequest::WebSocket(_req) => {
155                span.record(URL_SCHEME, "wss");
156            }
157            _ => {}
158        }
159    }
160}
161
162fn record_apigw_v1_attributes(req: &aws_lambda_events::apigw::ApiGatewayProxyRequest, span: &Span) {
163    span.record(HTTP_REQUEST_METHOD, req.http_method.as_str());
164
165    if let Some(ref path) = req.path {
166        span.record(URL_PATH, path.as_str());
167    }
168
169    if let Some(ref resource) = req.resource {
170        span.record(HTTP_ROUTE, resource.as_str());
171    }
172
173    span.record(URL_SCHEME, "https");
174
175    if let Some(ua) = req.headers.get("user-agent")
176        && let Ok(ua_str) = ua.to_str()
177    {
178        span.record(USER_AGENT_ORIGINAL, ua_str);
179    }
180
181    if let Some(ref ip) = req.request_context.identity.source_ip {
182        span.record(CLIENT_ADDRESS, ip.as_str());
183    }
184
185    if let Some(host) = req.headers.get("host")
186        && let Ok(host_str) = host.to_str()
187    {
188        span.record(SERVER_ADDRESS, host_str);
189    }
190
191    if let Some(ref protocol) = req.request_context.protocol {
192        let version = extract_http_version(protocol);
193        span.record(NETWORK_PROTOCOL_VERSION, version);
194    }
195}
196
197fn record_apigw_v2_attributes(
198    req: &aws_lambda_events::apigw::ApiGatewayV2httpRequest,
199    span: &Span,
200) {
201    span.record(
202        HTTP_REQUEST_METHOD,
203        req.request_context.http.method.as_str(),
204    );
205
206    if let Some(ref path) = req.raw_path {
207        span.record(URL_PATH, path.as_str());
208    }
209
210    if let Some(ref route_key) = req.route_key {
211        if let Some((_, route)) = route_key.split_once(' ') {
212            span.record(HTTP_ROUTE, route);
213        } else {
214            span.record(HTTP_ROUTE, route_key.as_str());
215        }
216    }
217
218    span.record(URL_SCHEME, "https");
219
220    if let Some(ref qs) = req.raw_query_string
221        && !qs.is_empty()
222    {
223        span.record(URL_QUERY, qs.as_str());
224    }
225
226    if let Some(ua) = req.headers.get("user-agent")
227        && let Ok(ua_str) = ua.to_str()
228    {
229        span.record(USER_AGENT_ORIGINAL, ua_str);
230    }
231
232    if let Some(ref ip) = req.request_context.http.source_ip {
233        span.record(CLIENT_ADDRESS, ip.as_str());
234    }
235
236    if let Some(host) = req.headers.get("host")
237        && let Ok(host_str) = host.to_str()
238    {
239        span.record(SERVER_ADDRESS, host_str);
240    }
241
242    if let Some(ref protocol) = req.request_context.http.protocol {
243        let version = extract_http_version(protocol);
244        span.record(NETWORK_PROTOCOL_VERSION, version);
245    }
246}
247
248fn record_alb_attributes(req: &aws_lambda_events::alb::AlbTargetGroupRequest, span: &Span) {
249    span.record(HTTP_REQUEST_METHOD, req.http_method.as_str());
250
251    if let Some(ref path) = req.path {
252        span.record(URL_PATH, path.as_str());
253    }
254
255    span.record(URL_SCHEME, "https");
256
257    if let Some(ua) = req.headers.get("user-agent")
258        && let Ok(ua_str) = ua.to_str()
259    {
260        span.record(USER_AGENT_ORIGINAL, ua_str);
261    }
262
263    if let Some(host) = req.headers.get("host")
264        && let Ok(host_str) = host.to_str()
265    {
266        span.record(SERVER_ADDRESS, host_str);
267    }
268}
269
270fn extract_http_version(protocol: &str) -> &str {
271    protocol
272        .strip_prefix("HTTP/")
273        .map(|v| v.trim_end_matches(".0"))
274        .unwrap_or(protocol)
275}
276
277struct HeaderMapExtractor<'a>(&'a HeaderMap);
278
279impl Extractor for HeaderMapExtractor<'_> {
280    fn get(&self, key: &str) -> Option<&str> {
281        self.0.get(key).and_then(|v| v.to_str().ok())
282    }
283
284    fn keys(&self) -> Vec<&str> {
285        self.0.keys().map(|k| k.as_str()).collect()
286    }
287}
288
289struct XRayEnvExtractor {
290    traceparent: Option<String>,
291}
292
293impl XRayEnvExtractor {
294    fn new(xray: &str) -> Self {
295        Self {
296            traceparent: convert_xray_to_traceparent(xray),
297        }
298    }
299}
300
301impl Extractor for XRayEnvExtractor {
302    fn get(&self, key: &str) -> Option<&str> {
303        if key.eq_ignore_ascii_case("traceparent") {
304            self.traceparent.as_deref()
305        } else {
306            None
307        }
308    }
309
310    fn keys(&self) -> Vec<&str> {
311        if self.traceparent.is_some() {
312            vec!["traceparent"]
313        } else {
314            vec![]
315        }
316    }
317}
318
319fn convert_xray_to_traceparent(xray: &str) -> Option<String> {
320    let mut trace_id = None;
321    let mut parent_id = None;
322    let mut sampled = false;
323
324    for part in xray.split(';') {
325        if let Some(root) = part.strip_prefix("Root=") {
326            trace_id = parse_xray_trace_id(root);
327        } else if let Some(parent) = part.strip_prefix("Parent=") {
328            parent_id = Some(parent.to_string());
329        } else if part == "Sampled=1" {
330            sampled = true;
331        }
332    }
333
334    let trace = trace_id?;
335    let parent = parent_id?;
336
337    if parent.len() != 16 {
338        return None;
339    }
340
341    let flags = if sampled { "01" } else { "00" };
342    Some(format!("00-{}-{}-{}", trace, parent, flags))
343}
344
345fn parse_xray_trace_id(root: &str) -> Option<String> {
346    let parts: Vec<&str> = root.split('-').collect();
347    if parts.len() == 3 && parts[0] == "1" {
348        let trace_id = format!("{}{}", parts[1], parts[2]);
349        if trace_id.len() == 32 {
350            return Some(trace_id);
351        }
352    }
353    None
354}