lambda_otel_lite/
handler.rs

1//! Lambda function handler wrapper with OpenTelemetry tracing.
2//!
3//! This module provides a wrapper function that automatically creates OpenTelemetry spans
4//! for Lambda function invocations. It offers an alternative to the Tower middleware layer
5//! when more direct control over span creation is needed.
6//!
7//! # When to Use the Handler Wrapper
8//!
9//! The handler wrapper approach is recommended when:
10//! - You have a simple Lambda function without complex middleware needs
11//! - You want minimal setup and configuration
12//! - You need direct control over span creation and attributes
13//! - You don't need Tower's middleware composition features
14//!
15//! For more complex applications, consider using the Tower layer approach instead.
16//!
17//! # Features
18//!
19//! - Automatic span creation with configurable names and attributes
20//! - Built-in support for common AWS event types (API Gateway v1/v2)
21//! - Automatic context propagation from HTTP headers
22//! - Response status code tracking
23//! - Custom attribute extraction
24//!
25//! # Architecture
26//!
27//! The handler wrapper operates by:
28//! 1. Creating a span for each invocation
29//! 2. Extracting attributes from the event
30//! 3. Running the handler function within the span
31//! 4. Capturing response attributes (e.g., status code)
32//! 5. Signaling completion for span export
33//!
34//! # Performance Considerations
35//!
36//! The wrapper is designed to minimize overhead:
37//! - Lazy attribute extraction
38//! - Efficient downcasting for type detection
39//! - Minimal allocations for span attributes
40//! - No blocking operations in the critical path
41//!
42//! # Comparison with Tower Layer
43//!
44//! This wrapper provides an alternative to the `OtelTracingLayer`:
45//! - More direct control over span creation
46//! - Simpler integration (no middleware stack)
47//! - Easier to customize span attributes
48//! - Better suited for simple Lambda functions
49//!
50//! Use this wrapper when:
51//! - You have a simple Lambda function
52//! - You don't need other Tower middleware
53//! - You want direct control over spans
54//!
55//! Use the Tower layer when:
56//! - You're building a complex service
57//! - You need other Tower middleware
58//! - You want standardized instrumentation
59//!
60//! # Examples
61///
62/// ```rust,no_run
63/// use std::result::Result;
64/// use lambda_runtime::{Error, LambdaEvent};
65/// use serde_json::Value;
66/// use lambda_otel_lite::{init_telemetry, create_traced_handler, TelemetryConfig};
67///
68/// async fn my_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
69///     let prefix = event.payload.get("prefix").and_then(|p| p.as_str()).unwrap_or("default");
70///     Ok::<Value, Error>(serde_json::json!({ "prefix": prefix }))
71/// }
72///
73/// # #[tokio::main]
74/// # async fn main() -> Result<(), Error> {
75///     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
76///     let handler = create_traced_handler(
77///         "my-handler",
78///         completion_handler,
79///         my_handler
80///     );
81///     // ... use handler with Runtime ...
82/// #   Ok(())
83/// # }
84/// ```
85use crate::extractors::{set_common_attributes, set_response_attributes, SpanAttributesExtractor};
86use crate::TelemetryCompletionHandler;
87use futures_util::future::BoxFuture;
88use lambda_runtime::{Error, LambdaEvent};
89use serde::{de::DeserializeOwned, Serialize};
90use std::future::Future;
91use std::sync::atomic::{AtomicBool, Ordering};
92use tracing::field::Empty;
93use tracing::Instrument;
94use tracing_opentelemetry::OpenTelemetrySpanExt;
95
96static IS_COLD_START: AtomicBool = AtomicBool::new(true);
97
98/// Type representing a traced Lambda handler function.
99/// Takes a `LambdaEvent<T>` and returns a `Future` that resolves to `Result<R, Error>`.
100pub type TracedHandler<T, R> =
101    Box<dyn Fn(LambdaEvent<T>) -> BoxFuture<'static, Result<R, Error>> + Send + Sync>;
102
103/// Internal implementation that wraps a Lambda handler function with OpenTelemetry tracing.
104///
105/// This is an implementation detail. Users should use `create_traced_handler` instead.
106pub(crate) async fn traced_handler<T, R, F, Fut>(
107    name: &'static str,
108    event: LambdaEvent<T>,
109    completion_handler: TelemetryCompletionHandler,
110    handler_fn: F,
111) -> Result<R, Error>
112where
113    T: SpanAttributesExtractor + DeserializeOwned + Serialize + Send + 'static,
114    R: Serialize + Send + 'static,
115    F: FnOnce(LambdaEvent<T>) -> Fut,
116    Fut: Future<Output = Result<R, Error>> + Send,
117{
118    let result = {
119        // Create the base span
120        let span = tracing::info_span!(
121            parent: None,
122            "handler",
123            otel.name=Empty,
124            otel.kind=Empty,
125            otel.status_code=Empty,
126            otel.status_message=Empty,
127            requestId=%event.context.request_id,
128        );
129
130        // Set the span name and default kind
131        span.record("otel.name", name.to_string());
132        span.record("otel.kind", "SERVER");
133
134        // Set common Lambda attributes with cold start tracking
135        let is_cold = IS_COLD_START.swap(false, Ordering::Relaxed);
136        set_common_attributes(&span, &event.context, is_cold);
137
138        // Extract attributes directly using the trait
139        let attrs = event.payload.extract_span_attributes();
140
141        // Apply extracted attributes
142        if let Some(span_name) = attrs.span_name {
143            span.record("otel.name", span_name);
144        }
145
146        if let Some(kind) = &attrs.kind {
147            span.record("otel.kind", kind.to_string());
148        }
149
150        // Set custom attributes
151        for (key, value) in &attrs.attributes {
152            span.set_attribute(key.to_string(), value.to_string());
153        }
154
155        // Add span links
156        for link in attrs.links {
157            span.add_link_with_attributes(link.span_context, link.attributes);
158        }
159
160        // Propagate context from headers
161        if let Some(carrier) = attrs.carrier {
162            let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| {
163                propagator.extract(&carrier)
164            });
165            span.set_parent(parent_context);
166        }
167
168        // Set trigger type
169        span.set_attribute("faas.trigger", attrs.trigger.to_string());
170
171        // Run the handler with the span
172        let result = handler_fn(event).instrument(span.clone()).await;
173
174        // Set response attributes if successful
175        if let Ok(response) = &result {
176            if let Ok(value) = serde_json::to_value(response) {
177                set_response_attributes(&span, &value);
178            }
179        } else if let Err(error) = &result {
180            // Set error status according to OpenTelemetry spec
181            span.set_status(opentelemetry::trace::Status::error(error.to_string()));
182        }
183
184        result
185    };
186
187    // Signal completion
188    completion_handler.complete();
189    result
190}
191
192/// Creates a traced handler function that can be used directly with `service_fn`.
193///
194/// This is a convenience wrapper around `traced_handler` that returns a function suitable
195/// for direct use with the Lambda runtime. It provides a more ergonomic interface by
196/// allowing handler creation to be separated from usage.
197///
198/// # Type Parameters
199///
200/// * `T` - The event payload type that must be deserializable and serializable
201/// * `R` - The response type that must be serializable
202/// * `F` - The handler function type, must be `Clone` to allow reuse across invocations
203/// * `Fut` - The future returned by the handler function
204///
205/// # Handler Requirements
206///
207/// The handler function must implement `Clone`. This is automatically satisfied by:
208/// - Regular functions (e.g., `fn(LambdaEvent<T>) -> Future<...>`)
209/// - Closures that capture only `Clone` types
210///
211/// For example:
212/// ```ignore
213/// // Regular function - automatically implements Clone
214/// async fn my_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
215///     Ok(serde_json::json!({}))
216/// }
217///
218/// // Closure capturing Clone types - implements Clone
219/// let prefix = "my-prefix".to_string();
220/// let handler = |event: LambdaEvent<Value>| async move {
221///     let prefix = prefix.clone();
222///     Ok::<Value, Error>(serde_json::json!({ "prefix": prefix }))
223/// };
224/// ```
225///
226/// # Arguments
227///
228/// * `name` - Name of the handler/span
229/// * `completion_handler` - Handler for managing span export
230/// * `handler_fn` - The actual Lambda handler function to wrap
231///
232/// # Returns
233///
234/// Returns a boxed function that can be used directly with `service_fn`
235///
236/// # Examples
237///
238/// ```rust
239/// use lambda_runtime::{Error, LambdaEvent};
240/// use serde_json::Value;
241/// use lambda_otel_lite::{init_telemetry, create_traced_handler, TelemetryConfig};
242///
243/// async fn my_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
244///     let prefix = event.payload.get("prefix").and_then(|p| p.as_str()).unwrap_or("default");
245///     Ok::<Value, Error>(serde_json::json!({ "prefix": prefix }))
246/// }
247///
248/// # #[tokio::main]
249/// # async fn main() -> Result<(), Error> {
250///     let (_, completion_handler) = init_telemetry(TelemetryConfig::default()).await?;
251///     let handler = create_traced_handler(
252///         "my-handler",
253///         completion_handler,
254///         my_handler
255///     );
256///     // ... use handler with Runtime ...
257/// #   Ok(())
258/// # }
259/// ```
260pub fn create_traced_handler<T, R, F, Fut>(
261    name: &'static str,
262    completion_handler: TelemetryCompletionHandler,
263    handler_fn: F,
264) -> TracedHandler<T, R>
265where
266    T: SpanAttributesExtractor + DeserializeOwned + Serialize + Send + 'static,
267    R: Serialize + Send + 'static,
268    F: Fn(LambdaEvent<T>) -> Fut + Send + Sync + Clone + 'static,
269    Fut: Future<Output = Result<R, Error>> + Send + 'static,
270{
271    Box::new(move |event: LambdaEvent<T>| {
272        let completion_handler = completion_handler.clone();
273        let handler_fn = handler_fn.clone();
274        Box::pin(traced_handler(name, event, completion_handler, handler_fn))
275    })
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use crate::mode::ProcessorMode;
282    use futures_util::future::BoxFuture;
283    use lambda_runtime::Context;
284    use opentelemetry::trace::Status;
285    use opentelemetry::trace::TracerProvider as _;
286    use opentelemetry_sdk::{
287        trace::{SdkTracerProvider, SpanData, SpanExporter},
288        Resource,
289    };
290    use serde_json::Value;
291    use serial_test::serial;
292    use std::sync::{
293        atomic::{AtomicUsize, Ordering},
294        Arc, Mutex,
295    };
296    use std::time::Duration;
297    use tracing_subscriber::prelude::*;
298
299    // Test infrastructure
300    #[derive(Debug, Default, Clone)]
301    struct TestExporter {
302        export_count: Arc<AtomicUsize>,
303        spans: Arc<Mutex<Vec<SpanData>>>,
304    }
305
306    impl TestExporter {
307        fn new() -> Self {
308            Self {
309                export_count: Arc::new(AtomicUsize::new(0)),
310                spans: Arc::new(Mutex::new(Vec::new())),
311            }
312        }
313
314        fn get_spans(&self) -> Vec<SpanData> {
315            self.spans.lock().unwrap().clone()
316        }
317
318        fn find_attribute(span: &SpanData, key: &str) -> Option<String> {
319            span.attributes
320                .iter()
321                .find(|kv| kv.key.as_str() == key)
322                .map(|kv| kv.value.to_string())
323        }
324    }
325
326    impl SpanExporter for TestExporter {
327        fn export(
328            &mut self,
329            spans: Vec<SpanData>,
330        ) -> BoxFuture<'static, opentelemetry_sdk::error::OTelSdkResult> {
331            self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
332            self.spans.lock().unwrap().extend(spans);
333            Box::pin(futures_util::future::ready(Ok(())))
334        }
335    }
336
337    fn setup_test_provider() -> (
338        Arc<SdkTracerProvider>,
339        Arc<TestExporter>,
340        tracing::dispatcher::DefaultGuard,
341    ) {
342        let exporter = Arc::new(TestExporter::new());
343        let provider = SdkTracerProvider::builder()
344            .with_simple_exporter(exporter.as_ref().clone())
345            .with_resource(Resource::builder().build())
346            .build();
347        let subscriber = tracing_subscriber::registry::Registry::default()
348            .with(tracing_opentelemetry::OpenTelemetryLayer::new(
349                provider.tracer("test"),
350            ))
351            .set_default();
352        (Arc::new(provider), exporter, subscriber)
353    }
354
355    async fn wait_for_spans(duration: Duration) {
356        tokio::time::sleep(duration).await;
357    }
358
359    // Basic functionality tests
360    #[tokio::test]
361    #[serial]
362    async fn test_successful_response() -> Result<(), Error> {
363        let (provider, exporter, _guard) = setup_test_provider();
364        let completion_handler =
365            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
366
367        async fn handler(_: LambdaEvent<Value>) -> Result<Value, Error> {
368            Ok(serde_json::json!({ "statusCode": 200, "body": "Success" }))
369        }
370
371        let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
372        let event = LambdaEvent::new(serde_json::json!({}), Context::default());
373
374        let result = traced_handler(event).await?;
375
376        wait_for_spans(Duration::from_millis(100)).await;
377
378        let spans = exporter.get_spans();
379        assert!(!spans.is_empty(), "No spans were exported");
380
381        let span = &spans[0];
382        assert_eq!(span.name, "test-handler", "Unexpected span name");
383        assert_eq!(
384            TestExporter::find_attribute(span, "http.status_code"),
385            Some("200".to_string())
386        );
387        assert_eq!(result["statusCode"], 200);
388
389        Ok(())
390    }
391
392    #[tokio::test]
393    #[serial]
394    async fn test_error_response() -> Result<(), Error> {
395        let (provider, exporter, _guard) = setup_test_provider();
396        let completion_handler =
397            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
398
399        async fn handler(_: LambdaEvent<Value>) -> Result<Value, Error> {
400            Ok(serde_json::json!({
401                "statusCode": 500,
402                "body": "Internal Server Error"
403            }))
404        }
405
406        let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
407        let event = LambdaEvent::new(serde_json::json!({}), Context::default());
408
409        let result = traced_handler(event).await?;
410
411        wait_for_spans(Duration::from_millis(100)).await;
412
413        let spans = exporter.get_spans();
414        assert!(!spans.is_empty(), "No spans were exported");
415
416        let span = &spans[0];
417        assert_eq!(span.name, "test-handler", "Unexpected span name");
418        assert_eq!(
419            TestExporter::find_attribute(span, "http.status_code"),
420            Some("500".to_string())
421        );
422        assert!(matches!(span.status, Status::Error { .. }));
423        assert_eq!(result["statusCode"], 500);
424
425        Ok(())
426    }
427
428    #[tokio::test]
429    #[serial]
430    async fn test_handler_reuse() -> Result<(), Error> {
431        let (provider, exporter, _guard) = setup_test_provider();
432        let completion_handler =
433            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
434
435        async fn handler(_: LambdaEvent<Value>) -> Result<Value, Error> {
436            Ok(serde_json::json!({ "status": "ok" }))
437        }
438
439        let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
440
441        // Call the handler multiple times to verify it can be reused
442        for _ in 0..3 {
443            let event = LambdaEvent::new(serde_json::json!({}), Context::default());
444            let _ = traced_handler(event).await?;
445        }
446
447        wait_for_spans(Duration::from_millis(100)).await;
448
449        let spans = exporter.get_spans();
450        assert_eq!(spans.len(), 3, "Expected exactly 3 spans");
451
452        // Verify all spans have the correct name
453        for span in spans {
454            assert_eq!(span.name, "test-handler", "Unexpected span name");
455        }
456
457        Ok(())
458    }
459
460    #[tokio::test]
461    #[serial]
462    async fn test_handler_with_closure() -> Result<(), Error> {
463        let (provider, exporter, _guard) = setup_test_provider();
464        let completion_handler =
465            TelemetryCompletionHandler::new(provider, None, ProcessorMode::Sync);
466
467        let prefix = "test-prefix".to_string();
468        let handler = move |_event: LambdaEvent<Value>| {
469            let prefix = prefix.clone();
470            async move {
471                Ok(serde_json::json!({
472                    "statusCode": 200,
473                    "prefix": prefix
474                }))
475            }
476        };
477
478        let traced_handler = create_traced_handler("test-handler", completion_handler, handler);
479        let event = LambdaEvent::new(serde_json::json!({}), Context::default());
480
481        let result = traced_handler(event).await?;
482
483        wait_for_spans(Duration::from_millis(100)).await;
484
485        let spans = exporter.get_spans();
486        assert!(!spans.is_empty(), "No spans were exported");
487
488        assert_eq!(result["prefix"], "test-prefix");
489        assert_eq!(spans[0].name, "test-handler", "Unexpected span name");
490
491        Ok(())
492    }
493}