1use crate::extractors::{set_common_attributes, set_response_attributes, SpanAttributesExtractor};
86use crate::TelemetryCompletionHandler;
87use futures_util::future::BoxFuture;
88use lambda_runtime::{Error, LambdaEvent};
89use serde::{de::DeserializeOwned, Serialize};
90use std::future::Future;
91use std::sync::atomic::{AtomicBool, Ordering};
92use tracing::field::Empty;
93use tracing::Instrument;
94use tracing_opentelemetry::OpenTelemetrySpanExt;
95
96static IS_COLD_START: AtomicBool = AtomicBool::new(true);
97
98pub type TracedHandler<T, R> =
101 Box<dyn Fn(LambdaEvent<T>) -> BoxFuture<'static, Result<R, Error>> + Send + Sync>;
102
103pub(crate) async fn traced_handler<T, R, F, Fut>(
107 name: &'static str,
108 event: LambdaEvent<T>,
109 completion_handler: TelemetryCompletionHandler,
110 handler_fn: F,
111) -> Result<R, Error>
112where
113 T: SpanAttributesExtractor + DeserializeOwned + Serialize + Send + 'static,
114 R: Serialize + Send + 'static,
115 F: FnOnce(LambdaEvent<T>) -> Fut,
116 Fut: Future<Output = Result<R, Error>> + Send,
117{
118 let result = {
119 let span = tracing::info_span!(
121 parent: None,
122 "handler",
123 otel.name=Empty,
124 otel.kind=Empty,
125 otel.status_code=Empty,
126 otel.status_message=Empty,
127 requestId=%event.context.request_id,
128 );
129
130 span.record("otel.name", name.to_string());
132 span.record("otel.kind", "SERVER");
133
134 let is_cold = IS_COLD_START.swap(false, Ordering::Relaxed);
136 set_common_attributes(&span, &event.context, is_cold);
137
138 let attrs = event.payload.extract_span_attributes();
140
141 if let Some(span_name) = attrs.span_name {
143 span.record("otel.name", span_name);
144 }
145
146 if let Some(kind) = &attrs.kind {
147 span.record("otel.kind", kind.to_string());
148 }
149
150 for (key, value) in &attrs.attributes {
152 span.set_attribute(key.to_string(), value.to_string());
153 }
154
155 for link in attrs.links {
157 span.add_link_with_attributes(link.span_context, link.attributes);
158 }
159
160 if let Some(carrier) = attrs.carrier {
162 let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| {
163 propagator.extract(&carrier)
164 });
165 span.set_parent(parent_context);
166 }
167
168 span.set_attribute("faas.trigger", attrs.trigger.to_string());
170
171 let result = handler_fn(event).instrument(span.clone()).await;
173
174 if let Ok(response) = &result {
176 if let Ok(value) = serde_json::to_value(response) {
177 set_response_attributes(&span, &value);
178 }
179 } else if let Err(error) = &result {
180 span.set_status(opentelemetry::trace::Status::error(error.to_string()));
182 }
183
184 result
185 };
186
187 completion_handler.complete();
189 result
190}
191
192pub fn create_traced_handler<T, R, F, Fut>(
261 name: &'static str,
262 completion_handler: TelemetryCompletionHandler,
263 handler_fn: F,
264) -> TracedHandler<T, R>
265where
266 T: SpanAttributesExtractor + DeserializeOwned + Serialize + Send + 'static,
267 R: Serialize + Send + 'static,
268 F: Fn(LambdaEvent<T>) -> Fut + Send + Sync + Clone + 'static,
269 Fut: Future<Output = Result<R, Error>> + Send + 'static,
270{
271 Box::new(move |event: LambdaEvent<T>| {
272 let completion_handler = completion_handler.clone();
273 let handler_fn = handler_fn.clone();
274 Box::pin(traced_handler(name, event, completion_handler, handler_fn))
275 })
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::mode::ProcessorMode;
282 use futures_util::future::BoxFuture;
283 use lambda_runtime::Context;
284 use opentelemetry::trace::Status;
285 use opentelemetry::trace::TracerProvider as _;
286 use opentelemetry_sdk::{
287 trace::{SdkTracerProvider, SpanData, SpanExporter},
288 Resource,
289 };
290 use serde_json::Value;
291 use serial_test::serial;
292 use std::sync::{
293 atomic::{AtomicUsize, Ordering},
294 Arc, Mutex,
295 };
296 use std::time::Duration;
297 use tracing_subscriber::prelude::*;
298
299 #[derive(Debug, Default, Clone)]
301 struct TestExporter {
302 export_count: Arc<AtomicUsize>,
303 spans: Arc<Mutex<Vec<SpanData>>>,
304 }
305
306 impl TestExporter {
307 fn new() -> Self {
308 Self {
309 export_count: Arc::new(AtomicUsize::new(0)),
310 spans: Arc::new(Mutex::new(Vec::new())),
311 }
312 }
313
314 fn get_spans(&self) -> Vec<SpanData> {
315 self.spans.lock().unwrap().clone()
316 }
317
318 fn find_attribute(span: &SpanData, key: &str) -> Option<String> {
319 span.attributes
320 .iter()
321 .find(|kv| kv.key.as_str() == key)
322 .map(|kv| kv.value.to_string())
323 }
324 }
325
326 impl SpanExporter for TestExporter {
327 fn export(
328 &mut self,
329 spans: Vec<SpanData>,
330 ) -> BoxFuture<'static, opentelemetry_sdk::error::OTelSdkResult> {
331 self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
332 self.spans.lock().unwrap().extend(spans);
333 Box::pin(futures_util::future::ready(Ok(())))
334 }
335 }
336
337 fn setup_test_provider() -> (
338 Arc<SdkTracerProvider>,
339 Arc<TestExporter>,
340 tracing::dispatcher::DefaultGuard,
341 ) {
342 let exporter = Arc::new(TestExporter::new());
343 let provider = SdkTracerProvider::builder()
344 .with_simple_exporter(exporter.as_ref().clone())
345 .with_resource(Resource::builder().build())
346 .build();
347 let subscriber = tracing_subscriber::registry::Registry::default()
348 .with(tracing_opentelemetry::OpenTelemetryLayer::new(
349 provider.tracer("test"),
350 ))
351 .set_default();
352 (Arc::new(provider), exporter, subscriber)
353 }
354
355 async fn wait_for_spans(duration: Duration) {
356 tokio::time::sleep(duration).await;
357 }
358
359 #[tokio::test]
361 #[serial]
362 async fn test_successful_response() -> Result<(), Error> {
363 let (provider, exporter, _guard) = setup_test_provider();
364 let completion_handler =
365 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
366
367 async fn handler(_: LambdaEvent<Value>) -> Result<Value, Error> {
368 Ok(serde_json::json!({ "statusCode": 200, "body": "Success" }))
369 }
370
371 let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
372 let event = LambdaEvent::new(serde_json::json!({}), Context::default());
373
374 let result = traced_handler(event).await?;
375
376 wait_for_spans(Duration::from_millis(100)).await;
377
378 let spans = exporter.get_spans();
379 assert!(!spans.is_empty(), "No spans were exported");
380
381 let span = &spans[0];
382 assert_eq!(span.name, "test-handler", "Unexpected span name");
383 assert_eq!(
384 TestExporter::find_attribute(span, "http.status_code"),
385 Some("200".to_string())
386 );
387 assert_eq!(result["statusCode"], 200);
388
389 Ok(())
390 }
391
392 #[tokio::test]
393 #[serial]
394 async fn test_error_response() -> Result<(), Error> {
395 let (provider, exporter, _guard) = setup_test_provider();
396 let completion_handler =
397 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
398
399 async fn handler(_: LambdaEvent<Value>) -> Result<Value, Error> {
400 Ok(serde_json::json!({
401 "statusCode": 500,
402 "body": "Internal Server Error"
403 }))
404 }
405
406 let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
407 let event = LambdaEvent::new(serde_json::json!({}), Context::default());
408
409 let result = traced_handler(event).await?;
410
411 wait_for_spans(Duration::from_millis(100)).await;
412
413 let spans = exporter.get_spans();
414 assert!(!spans.is_empty(), "No spans were exported");
415
416 let span = &spans[0];
417 assert_eq!(span.name, "test-handler", "Unexpected span name");
418 assert_eq!(
419 TestExporter::find_attribute(span, "http.status_code"),
420 Some("500".to_string())
421 );
422 assert!(matches!(span.status, Status::Error { .. }));
423 assert_eq!(result["statusCode"], 500);
424
425 Ok(())
426 }
427
428 #[tokio::test]
429 #[serial]
430 async fn test_handler_reuse() -> Result<(), Error> {
431 let (provider, exporter, _guard) = setup_test_provider();
432 let completion_handler =
433 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
434
435 async fn handler(_: LambdaEvent<Value>) -> Result<Value, Error> {
436 Ok(serde_json::json!({ "status": "ok" }))
437 }
438
439 let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
440
441 for _ in 0..3 {
443 let event = LambdaEvent::new(serde_json::json!({}), Context::default());
444 let _ = traced_handler(event).await?;
445 }
446
447 wait_for_spans(Duration::from_millis(100)).await;
448
449 let spans = exporter.get_spans();
450 assert_eq!(spans.len(), 3, "Expected exactly 3 spans");
451
452 for span in spans {
454 assert_eq!(span.name, "test-handler", "Unexpected span name");
455 }
456
457 Ok(())
458 }
459
460 #[tokio::test]
461 #[serial]
462 async fn test_handler_with_closure() -> Result<(), Error> {
463 let (provider, exporter, _guard) = setup_test_provider();
464 let completion_handler =
465 TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
466
467 let prefix = "test-prefix".to_string();
468 let handler = move |_event: LambdaEvent<Value>| {
469 let prefix = prefix.clone();
470 async move {
471 Ok(serde_json::json!({
472 "statusCode": 200,
473 "prefix": prefix
474 }))
475 }
476 };
477
478 let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
479 let event = LambdaEvent::new(serde_json::json!({}), Context::default());
480
481 let result = traced_handler(event).await?;
482
483 wait_for_spans(Duration::from_millis(100)).await;
484
485 let spans = exporter.get_spans();
486 assert!(!spans.is_empty(), "No spans were exported");
487
488 assert_eq!(result["prefix"], "test-prefix");
489 assert_eq!(spans[0].name, "test-handler", "Unexpected span name");
490
491 Ok(())
492 }
493}