opentelemetry_lambda_tower/extractors/
sqs.rs1use crate::extractor::TraceContextExtractor;
7use aws_lambda_events::sqs::{SqsEvent, SqsMessage};
8use lambda_runtime::Context as LambdaContext;
9use opentelemetry::Context;
10use opentelemetry::trace::{Link, SpanContext, SpanId, TraceFlags, TraceId, TraceState};
11use opentelemetry_semantic_conventions::attribute::{
12 MESSAGING_BATCH_MESSAGE_COUNT, MESSAGING_DESTINATION_NAME, MESSAGING_MESSAGE_ID,
13 MESSAGING_OPERATION_TYPE, MESSAGING_SYSTEM,
14};
15use tracing::Span;
16
17#[derive(Clone, Debug, Default)]
37pub struct SqsEventExtractor;
38
39impl SqsEventExtractor {
40 pub fn new() -> Self {
42 Self
43 }
44
45 fn queue_name_from_arn(arn: &str) -> Option<&str> {
49 arn.rsplit(':').next()
50 }
51}
52
53impl TraceContextExtractor<SqsEvent> for SqsEventExtractor {
54 fn extract_context(&self, _event: &SqsEvent) -> Context {
55 Context::current()
60 }
61
62 fn extract_links(&self, event: &SqsEvent) -> Vec<Link> {
63 event
64 .records
65 .iter()
66 .filter_map(extract_link_from_message)
67 .collect()
68 }
69
70 fn trigger_type(&self) -> &'static str {
71 "pubsub"
72 }
73
74 fn span_name(&self, event: &SqsEvent, lambda_ctx: &LambdaContext) -> String {
75 let queue_name = event
77 .records
78 .first()
79 .and_then(|r| r.event_source_arn.as_deref())
80 .and_then(Self::queue_name_from_arn)
81 .unwrap_or(&lambda_ctx.env_config.function_name);
82
83 format!("{} process", queue_name)
84 }
85
86 fn record_attributes(&self, event: &SqsEvent, span: &Span) {
87 span.record(MESSAGING_SYSTEM, "aws_sqs");
88 span.record(MESSAGING_OPERATION_TYPE, "process");
89
90 if let Some(record) = event.records.first()
91 && let Some(ref arn) = record.event_source_arn
92 && let Some(queue_name) = Self::queue_name_from_arn(arn)
93 {
94 span.record(MESSAGING_DESTINATION_NAME, queue_name);
95 }
96
97 span.record(MESSAGING_BATCH_MESSAGE_COUNT, event.records.len() as i64);
98
99 if event.records.len() == 1
100 && let Some(ref msg_id) = event.records[0].message_id
101 {
102 span.record(MESSAGING_MESSAGE_ID, msg_id.as_str());
103 }
104 }
105}
106
107fn extract_link_from_message(message: &SqsMessage) -> Option<Link> {
109 let trace_header = message.attributes.get("AWSTraceHeader")?;
111
112 let span_context = parse_xray_trace_header(trace_header)?;
113
114 Some(Link::new(span_context, vec![], 0))
115}
116
117pub fn parse_xray_trace_header(header: &str) -> Option<SpanContext> {
131 let mut trace_id_str = None;
132 let mut parent_id_str = None;
133 let mut sampled = false;
134
135 for part in header.split(';') {
136 let part = part.trim();
137 if let Some(root) = part.strip_prefix("Root=") {
138 trace_id_str = convert_xray_trace_id(root);
139 } else if let Some(parent) = part.strip_prefix("Parent=") {
140 parent_id_str = Some(parent.to_string());
141 } else if part == "Sampled=1" {
142 sampled = true;
143 }
144 }
145
146 let trace_id_hex = trace_id_str?;
147 let parent_id_hex = parent_id_str?;
148
149 let trace_id_bytes = hex_to_bytes::<16>(&trace_id_hex)?;
151 let trace_id = TraceId::from_bytes(trace_id_bytes);
152
153 let span_id_bytes = hex_to_bytes::<8>(&parent_id_hex)?;
155 let span_id = SpanId::from_bytes(span_id_bytes);
156
157 let flags = if sampled {
158 TraceFlags::SAMPLED
159 } else {
160 TraceFlags::default()
161 };
162
163 Some(SpanContext::new(
164 trace_id,
165 span_id,
166 flags,
167 true, TraceState::default(),
169 ))
170}
171
172fn convert_xray_trace_id(xray_id: &str) -> Option<String> {
177 let parts: Vec<&str> = xray_id.split('-').collect();
178 if parts.len() == 3 && parts[0] == "1" {
179 let combined = format!("{}{}", parts[1], parts[2]);
180 if combined.len() == 32 {
181 return Some(combined);
182 }
183 }
184 None
185}
186
187fn hex_to_bytes<const N: usize>(hex: &str) -> Option<[u8; N]> {
189 if hex.len() != N * 2 {
190 return None;
191 }
192
193 let mut bytes = [0u8; N];
194 for (i, chunk) in hex.as_bytes().chunks(2).enumerate() {
195 let high = hex_char_to_nibble(chunk[0])?;
196 let low = hex_char_to_nibble(chunk[1])?;
197 bytes[i] = (high << 4) | low;
198 }
199 Some(bytes)
200}
201
202fn hex_char_to_nibble(c: u8) -> Option<u8> {
204 match c {
205 b'0'..=b'9' => Some(c - b'0'),
206 b'a'..=b'f' => Some(c - b'a' + 10),
207 b'A'..=b'F' => Some(c - b'A' + 10),
208 _ => None,
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use std::collections::HashMap;
216
217 fn create_test_sqs_event() -> SqsEvent {
218 let mut attributes = HashMap::new();
219 attributes.insert(
220 "AWSTraceHeader".to_string(),
221 "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
222 .to_string(),
223 );
224
225 let mut message = SqsMessage::default();
226 message.message_id = Some("msg-123".to_string());
227 message.receipt_handle = Some("receipt-123".to_string());
228 message.body = Some(r#"{"test": "data"}"#.to_string());
229 message.attributes = attributes;
230 message.message_attributes = HashMap::new();
231 message.event_source = Some("aws:sqs".to_string());
232 message.event_source_arn = Some("arn:aws:sqs:us-east-1:123456789:my-queue".to_string());
233 message.aws_region = Some("us-east-1".to_string());
234
235 let mut event = SqsEvent::default();
236 event.records = vec![message];
237 event
238 }
239
240 fn create_test_lambda_context() -> LambdaContext {
241 LambdaContext::default()
242 }
243
244 #[test]
245 fn test_trigger_type() {
246 let extractor = SqsEventExtractor::new();
247 assert_eq!(extractor.trigger_type(), "pubsub");
248 }
249
250 #[test]
251 fn test_span_name_includes_queue() {
252 let extractor = SqsEventExtractor::new();
253 let event = create_test_sqs_event();
254 let ctx = create_test_lambda_context();
255
256 let name = extractor.span_name(&event, &ctx);
257 assert_eq!(name, "my-queue process");
258 }
259
260 #[test]
261 fn test_queue_name_from_arn() {
262 assert_eq!(
263 SqsEventExtractor::queue_name_from_arn("arn:aws:sqs:us-east-1:123456789:my-queue"),
264 Some("my-queue")
265 );
266 assert_eq!(
267 SqsEventExtractor::queue_name_from_arn(
268 "arn:aws:sqs:eu-west-1:987654321:another-queue.fifo"
269 ),
270 Some("another-queue.fifo")
271 );
272 }
273
274 #[test]
275 fn test_extract_links_single_message() {
276 let extractor = SqsEventExtractor::new();
277 let event = create_test_sqs_event();
278
279 let links = extractor.extract_links(&event);
280
281 assert_eq!(links.len(), 1);
282 let link = &links[0];
283 assert!(link.span_context.is_valid());
284 assert_eq!(
285 link.span_context.trace_id().to_string(),
286 "5759e988bd862e3fe1be46a994272793"
287 );
288 assert_eq!(link.span_context.span_id().to_string(), "53995c3f42cd8ad8");
289 assert!(link.span_context.is_sampled());
290 }
291
292 #[test]
293 fn test_extract_links_multiple_messages() {
294 let extractor = SqsEventExtractor::new();
295
296 let mut attrs1 = HashMap::new();
297 attrs1.insert(
298 "AWSTraceHeader".to_string(),
299 "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1"
300 .to_string(),
301 );
302
303 let mut attrs2 = HashMap::new();
304 attrs2.insert(
305 "AWSTraceHeader".to_string(),
306 "Root=1-67890abc-def0123456789abcdef01234;Parent=1234567890abcdef;Sampled=0"
307 .to_string(),
308 );
309
310 let mut msg1 = SqsMessage::default();
311 msg1.attributes = attrs1;
312
313 let mut msg2 = SqsMessage::default();
314 msg2.attributes = attrs2;
315
316 let mut event = SqsEvent::default();
317 event.records = vec![msg1, msg2];
318
319 let links = extractor.extract_links(&event);
320
321 assert_eq!(links.len(), 2);
322 assert!(links[0].span_context.is_sampled());
324 assert!(!links[1].span_context.is_sampled());
326 }
327
328 #[test]
329 fn test_extract_links_no_trace_header() {
330 let extractor = SqsEventExtractor::new();
331
332 let mut msg = SqsMessage::default();
333 msg.attributes = HashMap::new();
334
335 let mut event = SqsEvent::default();
336 event.records = vec![msg];
337
338 let links = extractor.extract_links(&event);
339 assert!(links.is_empty());
340 }
341
342 #[test]
343 fn test_parse_xray_trace_header() {
344 let header = "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1";
345
346 let ctx = parse_xray_trace_header(header).unwrap();
347
348 assert!(ctx.is_valid());
349 assert_eq!(
350 ctx.trace_id().to_string(),
351 "5759e988bd862e3fe1be46a994272793"
352 );
353 assert_eq!(ctx.span_id().to_string(), "53995c3f42cd8ad8");
354 assert!(ctx.is_sampled());
355 assert!(ctx.is_remote());
356 }
357
358 #[test]
359 fn test_parse_xray_trace_header_unsampled() {
360 let header = "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=0";
361
362 let ctx = parse_xray_trace_header(header).unwrap();
363 assert!(!ctx.is_sampled());
364 }
365
366 #[test]
367 fn test_parse_xray_trace_header_invalid() {
368 assert!(parse_xray_trace_header("invalid").is_none());
369 assert!(parse_xray_trace_header("Root=invalid;Parent=abc").is_none());
370 assert!(parse_xray_trace_header("Root=1-abc-def").is_none());
371 }
372
373 #[test]
374 fn test_convert_xray_trace_id() {
375 assert_eq!(
376 convert_xray_trace_id("1-5759e988-bd862e3fe1be46a994272793"),
377 Some("5759e988bd862e3fe1be46a994272793".to_string())
378 );
379 }
380
381 #[test]
382 fn test_hex_to_bytes() {
383 let bytes: [u8; 4] = hex_to_bytes("deadbeef").unwrap();
384 assert_eq!(bytes, [0xde, 0xad, 0xbe, 0xef]);
385 }
386
387 #[test]
388 fn test_hex_to_bytes_invalid() {
389 assert!(hex_to_bytes::<4>("deadbee").is_none()); assert!(hex_to_bytes::<4>("deadbeefx").is_none()); assert!(hex_to_bytes::<4>("deadbeeg").is_none()); }
393}