opentelemetry_lambda_tower/extractors/
sqs.rs

1//! SQS event extractor for message queue triggers.
2//!
3//! Extracts trace context from SQS message system attributes using the
4//! `AWSTraceHeader` attribute in X-Ray format.
5
6use crate::extractor::TraceContextExtractor;
7use aws_lambda_events::sqs::{SqsEvent, SqsMessage};
8use lambda_runtime::Context as LambdaContext;
9use opentelemetry::Context;
10use opentelemetry::trace::{Link, SpanContext, SpanId, TraceFlags, TraceId, TraceState};
11use opentelemetry_semantic_conventions::attribute::{
12    MESSAGING_BATCH_MESSAGE_COUNT, MESSAGING_DESTINATION_NAME, MESSAGING_MESSAGE_ID,
13    MESSAGING_OPERATION_TYPE, MESSAGING_SYSTEM,
14};
15use tracing::Span;
16
17/// Extractor for SQS message events.
18///
19/// SQS events carry trace context in the `AWSTraceHeader` system attribute
20/// using X-Ray format. This extractor:
21///
22/// 1. Does NOT set a parent context (returns current context)
23/// 2. Creates span links for each message's trace context
24///
25/// This follows OpenTelemetry semantic conventions for messaging systems,
26/// where the async nature of message queues means span links are more
27/// appropriate than parent-child relationships.
28///
29/// # Example
30///
31/// ```ignore
32/// use opentelemetry_lambda_tower::{OtelTracingLayer, SqsEventExtractor};
33///
34/// let layer = OtelTracingLayer::new(SqsEventExtractor::new());
35/// ```
36#[derive(Clone, Debug, Default)]
37pub struct SqsEventExtractor;
38
39impl SqsEventExtractor {
40    /// Creates a new SQS event extractor.
41    pub fn new() -> Self {
42        Self
43    }
44
45    /// Extracts the queue name from an event source ARN.
46    ///
47    /// ARN format: `arn:aws:sqs:{region}:{account}:{queue-name}`
48    fn queue_name_from_arn(arn: &str) -> Option<&str> {
49        arn.rsplit(':').next()
50    }
51}
52
53impl TraceContextExtractor<SqsEvent> for SqsEventExtractor {
54    fn extract_context(&self, _event: &SqsEvent) -> Context {
55        // For SQS, we don't set a parent context because:
56        // 1. Messages may come from multiple different traces
57        // 2. The async nature means parent-child doesn't make semantic sense
58        // Instead, we use span links (see extract_links)
59        Context::current()
60    }
61
62    fn extract_links(&self, event: &SqsEvent) -> Vec<Link> {
63        event
64            .records
65            .iter()
66            .filter_map(extract_link_from_message)
67            .collect()
68    }
69
70    fn trigger_type(&self) -> &'static str {
71        "pubsub"
72    }
73
74    fn span_name(&self, event: &SqsEvent, lambda_ctx: &LambdaContext) -> String {
75        // Use "{queue_name} process" format per OTel messaging conventions
76        let queue_name = event
77            .records
78            .first()
79            .and_then(|r| r.event_source_arn.as_deref())
80            .and_then(Self::queue_name_from_arn)
81            .unwrap_or(&lambda_ctx.env_config.function_name);
82
83        format!("{} process", queue_name)
84    }
85
86    fn record_attributes(&self, event: &SqsEvent, span: &Span) {
87        span.record(MESSAGING_SYSTEM, "aws_sqs");
88        span.record(MESSAGING_OPERATION_TYPE, "process");
89
90        if let Some(record) = event.records.first()
91            && let Some(ref arn) = record.event_source_arn
92            && let Some(queue_name) = Self::queue_name_from_arn(arn)
93        {
94            span.record(MESSAGING_DESTINATION_NAME, queue_name);
95        }
96
97        span.record(MESSAGING_BATCH_MESSAGE_COUNT, event.records.len() as i64);
98
99        if event.records.len() == 1
100            && let Some(ref msg_id) = event.records[0].message_id
101        {
102            span.record(MESSAGING_MESSAGE_ID, msg_id.as_str());
103        }
104    }
105}
106
107/// Extracts a span link from an SQS message's AWSTraceHeader.
108fn extract_link_from_message(message: &SqsMessage) -> Option<Link> {
109    // AWSTraceHeader is in the system attributes, NOT message_attributes
110    let trace_header = message.attributes.get("AWSTraceHeader")?;
111
112    let span_context = parse_xray_trace_header(trace_header)?;
113
114    Some(Link::new(span_context, vec![], 0))
115}
116
117/// Parses an X-Ray trace header into a SpanContext.
118///
119/// X-Ray format: `Root=1-{epoch}-{random};Parent={span-id};Sampled={0|1}`
120///
121/// # Example
122///
123/// ```
124/// use opentelemetry_lambda_tower::extractors::sqs::parse_xray_trace_header;
125///
126/// let header = "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1";
127/// let ctx = parse_xray_trace_header(header);
128/// assert!(ctx.is_some());
129/// ```
130pub fn parse_xray_trace_header(header: &str) -> Option<SpanContext> {
131    let mut trace_id_str = None;
132    let mut parent_id_str = None;
133    let mut sampled = false;
134
135    for part in header.split(';') {
136        let part = part.trim();
137        if let Some(root) = part.strip_prefix("Root=") {
138            trace_id_str = convert_xray_trace_id(root);
139        } else if let Some(parent) = part.strip_prefix("Parent=") {
140            parent_id_str = Some(parent.to_string());
141        } else if part == "Sampled=1" {
142            sampled = true;
143        }
144    }
145
146    let trace_id_hex = trace_id_str?;
147    let parent_id_hex = parent_id_str?;
148
149    // Parse trace ID (32 hex chars = 16 bytes)
150    let trace_id_bytes = hex_to_bytes::<16>(&trace_id_hex)?;
151    let trace_id = TraceId::from_bytes(trace_id_bytes);
152
153    // Parse parent/span ID (16 hex chars = 8 bytes)
154    let span_id_bytes = hex_to_bytes::<8>(&parent_id_hex)?;
155    let span_id = SpanId::from_bytes(span_id_bytes);
156
157    let flags = if sampled {
158        TraceFlags::SAMPLED
159    } else {
160        TraceFlags::default()
161    };
162
163    Some(SpanContext::new(
164        trace_id,
165        span_id,
166        flags,
167        true, // is_remote
168        TraceState::default(),
169    ))
170}
171
172/// Converts X-Ray trace ID format to 32-character hex string.
173///
174/// X-Ray format: `1-{epoch_hex}-{random_hex}` (8 + 24 = 32 chars)
175/// Returns: `{epoch_hex}{random_hex}`
176fn convert_xray_trace_id(xray_id: &str) -> Option<String> {
177    let parts: Vec<&str> = xray_id.split('-').collect();
178    if parts.len() == 3 && parts[0] == "1" {
179        let combined = format!("{}{}", parts[1], parts[2]);
180        if combined.len() == 32 {
181            return Some(combined);
182        }
183    }
184    None
185}
186
187/// Converts a hex string to a fixed-size byte array.
188fn hex_to_bytes<const N: usize>(hex: &str) -> Option<[u8; N]> {
189    if hex.len() != N * 2 {
190        return None;
191    }
192
193    let mut bytes = [0u8; N];
194    for (i, chunk) in hex.as_bytes().chunks(2).enumerate() {
195        let high = hex_char_to_nibble(chunk[0])?;
196        let low = hex_char_to_nibble(chunk[1])?;
197        bytes[i] = (high << 4) | low;
198    }
199    Some(bytes)
200}
201
202/// Converts a single hex character to its 4-bit value.
203fn hex_char_to_nibble(c: u8) -> Option<u8> {
204    match c {
205        b'0'..=b'9' => Some(c - b'0'),
206        b'a'..=b'f' => Some(c - b'a' + 10),
207        b'A'..=b'F' => Some(c - b'A' + 10),
208        _ => None,
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use std::collections::HashMap;
216
217    fn create_test_sqs_event() -> SqsEvent {
218        let mut attributes = HashMap::new();
219        attributes.insert(
220            "AWSTraceHeader".to_string(),
221            "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
222                .to_string(),
223        );
224
225        let mut message = SqsMessage::default();
226        message.message_id = Some("msg-123".to_string());
227        message.receipt_handle = Some("receipt-123".to_string());
228        message.body = Some(r#"{"test": "data"}"#.to_string());
229        message.attributes = attributes;
230        message.message_attributes = HashMap::new();
231        message.event_source = Some("aws:sqs".to_string());
232        message.event_source_arn = Some("arn:aws:sqs:us-east-1:123456789:my-queue".to_string());
233        message.aws_region = Some("us-east-1".to_string());
234
235        let mut event = SqsEvent::default();
236        event.records = vec![message];
237        event
238    }
239
240    fn create_test_lambda_context() -> LambdaContext {
241        LambdaContext::default()
242    }
243
244    #[test]
245    fn test_trigger_type() {
246        let extractor = SqsEventExtractor::new();
247        assert_eq!(extractor.trigger_type(), "pubsub");
248    }
249
250    #[test]
251    fn test_span_name_includes_queue() {
252        let extractor = SqsEventExtractor::new();
253        let event = create_test_sqs_event();
254        let ctx = create_test_lambda_context();
255
256        let name = extractor.span_name(&event, &ctx);
257        assert_eq!(name, "my-queue process");
258    }
259
260    #[test]
261    fn test_queue_name_from_arn() {
262        assert_eq!(
263            SqsEventExtractor::queue_name_from_arn("arn:aws:sqs:us-east-1:123456789:my-queue"),
264            Some("my-queue")
265        );
266        assert_eq!(
267            SqsEventExtractor::queue_name_from_arn(
268                "arn:aws:sqs:eu-west-1:987654321:another-queue.fifo"
269            ),
270            Some("another-queue.fifo")
271        );
272    }
273
274    #[test]
275    fn test_extract_links_single_message() {
276        let extractor = SqsEventExtractor::new();
277        let event = create_test_sqs_event();
278
279        let links = extractor.extract_links(&event);
280
281        assert_eq!(links.len(), 1);
282        let link = &links[0];
283        assert!(link.span_context.is_valid());
284        assert_eq!(
285            link.span_context.trace_id().to_string(),
286            "5759e988bd862e3fe1be46a994272793"
287        );
288        assert_eq!(link.span_context.span_id().to_string(), "53995c3f42cd8ad8");
289        assert!(link.span_context.is_sampled());
290    }
291
292    #[test]
293    fn test_extract_links_multiple_messages() {
294        let extractor = SqsEventExtractor::new();
295
296        let mut attrs1 = HashMap::new();
297        attrs1.insert(
298            "AWSTraceHeader".to_string(),
299            "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
300                .to_string(),
301        );
302
303        let mut attrs2 = HashMap::new();
304        attrs2.insert(
305            "AWSTraceHeader".to_string(),
306            "Root=1-67890abc-def0123456789abcdef01234;Parent=1234567890abcdef;Sampled=0"
307                .to_string(),
308        );
309
310        let mut msg1 = SqsMessage::default();
311        msg1.attributes = attrs1;
312
313        let mut msg2 = SqsMessage::default();
314        msg2.attributes = attrs2;
315
316        let mut event = SqsEvent::default();
317        event.records = vec![msg1, msg2];
318
319        let links = extractor.extract_links(&event);
320
321        assert_eq!(links.len(), 2);
322        // First link is sampled
323        assert!(links[0].span_context.is_sampled());
324        // Second link is not sampled
325        assert!(!links[1].span_context.is_sampled());
326    }
327
328    #[test]
329    fn test_extract_links_no_trace_header() {
330        let extractor = SqsEventExtractor::new();
331
332        let mut msg = SqsMessage::default();
333        msg.attributes = HashMap::new();
334
335        let mut event = SqsEvent::default();
336        event.records = vec![msg];
337
338        let links = extractor.extract_links(&event);
339        assert!(links.is_empty());
340    }
341
342    #[test]
343    fn test_parse_xray_trace_header() {
344        let header = "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1";
345
346        let ctx = parse_xray_trace_header(header).unwrap();
347
348        assert!(ctx.is_valid());
349        assert_eq!(
350            ctx.trace_id().to_string(),
351            "5759e988bd862e3fe1be46a994272793"
352        );
353        assert_eq!(ctx.span_id().to_string(), "53995c3f42cd8ad8");
354        assert!(ctx.is_sampled());
355        assert!(ctx.is_remote());
356    }
357
358    #[test]
359    fn test_parse_xray_trace_header_unsampled() {
360        let header = "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=0";
361
362        let ctx = parse_xray_trace_header(header).unwrap();
363        assert!(!ctx.is_sampled());
364    }
365
366    #[test]
367    fn test_parse_xray_trace_header_invalid() {
368        assert!(parse_xray_trace_header("invalid").is_none());
369        assert!(parse_xray_trace_header("Root=invalid;Parent=abc").is_none());
370        assert!(parse_xray_trace_header("Root=1-abc-def").is_none());
371    }
372
373    #[test]
374    fn test_convert_xray_trace_id() {
375        assert_eq!(
376            convert_xray_trace_id("1-5759e988-bd862e3fe1be46a994272793"),
377            Some("5759e988bd862e3fe1be46a994272793".to_string())
378        );
379    }
380
381    #[test]
382    fn test_hex_to_bytes() {
383        let bytes: [u8; 4] = hex_to_bytes("deadbeef").unwrap();
384        assert_eq!(bytes, [0xde, 0xad, 0xbe, 0xef]);
385    }
386
387    #[test]
388    fn test_hex_to_bytes_invalid() {
389        assert!(hex_to_bytes::<4>("deadbee").is_none()); // too short
390        assert!(hex_to_bytes::<4>("deadbeefx").is_none()); // too long
391        assert!(hex_to_bytes::<4>("deadbeeg").is_none()); // invalid char
392    }
393}