Skip to main content

lambda_otel_lite/
extension.rs

1//! Lambda Extension for OpenTelemetry span processing.
2//!
3//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
4//! The extension integrates with AWS Lambda's Extensions API to efficiently manage telemetry data
5//! collection and export.
6//!
7//! # Architecture
8//!
9//! The extension operates as a background task within the Lambda function process and communicates
10//! with both the Lambda Runtime API and the function handler through asynchronous channels:
11//!
12//! 1. **Extension Registration**: On startup, the extension task registers with the Lambda Extensions API
13//!    and subscribes to the appropriate events based on the processing mode.
14//!
15//! 2. **Handler Communication**: The extension uses a channel-based communication pattern to
16//!    coordinate with the function handler for span export timing.
17//!
18//! 3. **Processing Modes**:
19//!    - `Async`: Registers for INVOKE events and exports spans after handler completion
20//!      - Spans are queued during handler execution
21//!      - Export occurs after response is sent to user
22//!      - Best for high-volume telemetry
23//!    - `Finalize`: Registers with no events
24//!      - Only installs SIGTERM handler
25//!      - Lets application code control span export
26//!      - Compatible with BatchSpanProcessor
27//!
28//! 4. **Graceful Shutdown**: The extension implements proper shutdown handling to ensure
29//!    no telemetry data is lost when the Lambda environment is terminated.
30//!
31//! # Error Handling
32//!
33//! The extension implements robust error handling:
34//! - Logs all export errors without failing the function
35//! - Implements graceful shutdown on SIGTERM
36//! - Handles channel communication failures
37
38use crate::logger::Logger;
39use crate::ProcessorMode;
40use lambda_extension::{service_fn, Error, Extension, NextEvent};
41use opentelemetry_sdk::trace::SdkTracerProvider;
42use std::sync::Arc;
43use tokio::{
44    signal::unix::{signal, SignalKind},
45    sync::{
46        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
47        Mutex,
48    },
49};
50
51static LOGGER: Logger = Logger::const_new("extension");
52
53/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
54///
55/// This extension is responsible for:
56/// - Receiving completion signals from the handler
57/// - Flushing spans at the appropriate time
58/// - Managing the lifecycle of the tracer provider
59///
60/// # Thread Safety
61///
62/// The extension is designed to be thread-safe:
63/// - Uses `Arc` for shared ownership of the tracer provider
64/// - Implements proper synchronization through `Mutex` for the channel receiver
65/// - Safely handles concurrent access from multiple tasks
66///
67/// # Performance Characteristics
68///
69/// The extension is optimized for Lambda environments:
70/// - Minimizes memory usage through efficient buffering
71/// - Uses non-blocking channel communication
72/// - Implements backpressure handling to prevent memory exhaustion
73///
74/// # Error Handling
75///
76/// The extension handles various error scenarios:
77/// - Channel closed: Logs error and continues processing
78/// - Export failures: Logs errors without failing the function
79/// - Shutdown signals: Ensures final flush of spans
80///
81/// The extension operates asynchronously to minimize impact on handler latency.
82/// It uses a channel-based communication pattern to coordinate with the handler.
83pub struct OtelInternalExtension {
84    /// Channel receiver to know when the handler is done
85    request_done_receiver: Mutex<UnboundedReceiver<()>>,
86    /// Reference to the tracer provider for flushing spans
87    tracer_provider: Arc<SdkTracerProvider>,
88}
89
90impl OtelInternalExtension {
91    /// Creates a new OtelInternalExtension.
92    ///
93    /// # Arguments
94    ///
95    /// * `request_done_receiver` - Channel receiver for completion signals
96    /// * `tracer_provider` - TracerProvider for span management
97    pub fn new(
98        request_done_receiver: UnboundedReceiver<()>,
99        tracer_provider: Arc<SdkTracerProvider>,
100    ) -> Self {
101        Self {
102            request_done_receiver: Mutex::new(request_done_receiver),
103            tracer_provider,
104        }
105    }
106
107    /// Handles extension events and flushes telemetry after each invocation.
108    ///
109    /// This method implements the core event handling logic for the extension.
110    /// It coordinates with the Lambda function handler to ensure spans are
111    /// exported at the appropriate time.
112    ///
113    /// # Operation Flow
114    ///
115    /// 1. **Event Reception**:
116    ///    - Receives Lambda extension events
117    ///    - Filters for INVOKE events
118    ///    - Ignores other event types
119    ///
120    /// 2. **Handler Coordination**:
121    ///    - Waits for handler completion signal
122    ///    - Uses async channel communication
123    ///    - Handles channel closure gracefully
124    ///
125    /// 3. **Span Export**:
126    ///    - Forces flush of all pending spans
127    ///    - Handles export errors without failing
128    ///    - Logs any export failures
129    ///
130    /// # Error Handling
131    ///
132    /// The method handles several failure scenarios:
133    ///
134    /// - **Channel Errors**:
135    ///    - Channel closure: Returns error with descriptive message
136    ///    - Send/receive failures: Properly propagated
137    ///
138    /// - **Export Errors**:
139    ///    - Individual span export failures are logged
140    ///    - Continues processing despite errors
141    ///    - Maintains extension stability
142    ///
143    /// # Performance
144    ///
145    /// The method is optimized for Lambda environments:
146    /// - Uses async/await for efficient execution
147    /// - Minimizes blocking operations
148    /// - Implements proper error recovery
149    ///
150    /// # Arguments
151    ///
152    /// * `event` - The Lambda extension event to handle
153    ///
154    /// # Returns
155    ///
156    /// Returns `Ok(())` if the event was processed successfully, or an `Error`
157    /// if something went wrong during processing. Note that export errors are
158    /// logged but do not cause the method to return an error.
159    pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
160        if let NextEvent::Invoke(_e) = event.next {
161            // Wait for runtime to finish processing event
162            self.request_done_receiver
163                .lock()
164                .await
165                .recv()
166                .await
167                .ok_or_else(|| Error::from("channel closed"))?;
168            // Force flush all spans and handle any errors
169            if let Err(err) = self.tracer_provider.force_flush() {
170                LOGGER.error(format!(
171                    "OtelInternalExtension.invoke.Error: Error flushing tracer provider: {err:?}"
172                ));
173            }
174        }
175
176        Ok(())
177    }
178}
179
180/// Register an internal extension for handling OpenTelemetry span processing.
181///
182/// # Warning
183///
184/// This is an internal API used by [`init_telemetry`](crate::init_telemetry) and should not be called directly.
185/// Use [`init_telemetry`](crate::init_telemetry) instead to set up telemetry for your Lambda function.
186///
187/// # Internal Details
188///
189/// The extension registration process:
190/// 1. Creates communication channels for handler-extension coordination
191/// 2. Registers with Lambda Extensions API based on processor mode
192/// 3. Sets up signal handlers for graceful shutdown
193/// 4. Manages span export timing based on processor mode
194///
195/// # Arguments
196///
197/// * `tracer_provider` - The TracerProvider to use for span management
198/// * `processor_mode` - The mode determining how spans are processed
199///
200/// # Returns
201///
202/// Returns a channel sender for signaling completion, or an Error if registration fails.
203pub(crate) async fn register_extension(
204    tracer_provider: Arc<SdkTracerProvider>,
205    processor_mode: ProcessorMode,
206) -> Result<UnboundedSender<()>, Error> {
207    LOGGER.debug("OtelInternalExtension.register_extension: starting registration");
208    let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();
209
210    let extension = Arc::new(OtelInternalExtension::new(
211        request_done_receiver,
212        tracer_provider.clone(),
213    ));
214
215    // Register and start the extension
216    let mut ext = Extension::new();
217
218    // Only register for INVOKE events in async mode
219    if matches!(processor_mode, ProcessorMode::Async) {
220        ext = ext.with_events(&["INVOKE"]);
221    } else {
222        ext = ext.with_events(&[]);
223    }
224
225    let registered_extension = ext
226        .with_events_processor(service_fn(move |event| {
227            let extension = extension.clone();
228            async move { extension.invoke(event).await }
229        }))
230        .with_extension_name("otel-internal")
231        .register()
232        .await?;
233
234    // Run the extension in the background
235    tokio::spawn(async move {
236        if let Err(err) = registered_extension.run().await {
237            LOGGER.error(format!(
238                "OtelInternalExtension.run.Error: Error running extension: {err:?}"
239            ));
240        }
241    });
242
243    // Set up signal handler for graceful shutdown
244    tokio::spawn(async move {
245        let mut sigterm = signal(SignalKind::terminate()).unwrap();
246
247        if sigterm.recv().await.is_some() {
248            LOGGER.debug("OtelInternalExtension.SIGTERM: SIGTERM received, flushing spans");
249            // Direct synchronous flush
250            if let Err(err) = tracer_provider.force_flush() {
251                LOGGER.error(format!(
252                    "OtelInternalExtension.SIGTERM.Error: Error during shutdown: {err:?}"
253                ));
254            }
255            LOGGER.debug("OtelInternalExtension.SIGTERM: Shutdown complete");
256            std::process::exit(0);
257        }
258    });
259
260    Ok(request_done_sender)
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use lambda_extension::{InvokeEvent, LambdaEvent, ShutdownEvent};
267    use opentelemetry::trace::{Tracer, TracerProvider as _};
268    use opentelemetry::Context;
269    use opentelemetry_sdk::{
270        error::{OTelSdkError, OTelSdkResult},
271        trace::{SdkTracerProvider, Span, SpanData, SpanExporter, SpanProcessor},
272        Resource,
273    };
274    use std::sync::{
275        atomic::{AtomicUsize, Ordering},
276        Mutex,
277    };
278    use std::time::Duration;
279
280    /// Test-specific logger
281
282    // Test exporter that captures spans
283    #[derive(Debug, Default, Clone)]
284    struct TestExporter {
285        export_count: Arc<AtomicUsize>,
286        spans: Arc<Mutex<Vec<SpanData>>>,
287    }
288
289    impl TestExporter {
290        fn new() -> Self {
291            Self {
292                export_count: Arc::new(AtomicUsize::new(0)),
293                spans: Arc::new(Mutex::new(Vec::new())),
294            }
295        }
296
297        #[allow(dead_code)]
298        fn get_spans(&self) -> Vec<SpanData> {
299            self.spans.lock().unwrap().clone()
300        }
301    }
302
303    impl SpanExporter for TestExporter {
304        fn export(
305            &self,
306            spans: Vec<SpanData>,
307        ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
308        {
309            self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
310            self.spans.lock().unwrap().extend(spans);
311            futures_util::future::ready(Ok(()))
312        }
313    }
314
315    fn setup_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
316        let exporter = TestExporter::new();
317        let provider = SdkTracerProvider::builder()
318            .with_simple_exporter(exporter.clone())
319            .with_resource(Resource::builder_empty().build())
320            .build();
321
322        (Arc::new(provider), Arc::new(exporter))
323    }
324
325    fn setup_batch_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
326        let exporter = TestExporter::new();
327        let provider = SdkTracerProvider::builder()
328            .with_batch_exporter(exporter.clone())
329            .with_resource(Resource::builder_empty().build())
330            .build();
331
332        (Arc::new(provider), Arc::new(exporter))
333    }
334
335    #[derive(Debug)]
336    struct FailingSpanProcessor;
337
338    impl SpanProcessor for FailingSpanProcessor {
339        fn on_start(&self, _span: &mut Span, _cx: &Context) {}
340
341        fn on_end(&self, _span: SpanData) {}
342
343        fn force_flush(&self) -> OTelSdkResult {
344            Err(OTelSdkError::InternalFailure("force flush failed".into()))
345        }
346
347        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
348            Ok(())
349        }
350    }
351
352    #[tokio::test]
353    async fn test_extension_invoke_handling() -> Result<(), Error> {
354        let (provider, _) = setup_test_provider();
355        let (sender, receiver) = unbounded_channel();
356
357        let extension = OtelInternalExtension::new(receiver, provider);
358
359        // Create an INVOKE event
360        let invoke_event = InvokeEvent {
361            deadline_ms: 1000,
362            request_id: "test-id".to_string(),
363            invoked_function_arn: "test-arn".to_string(),
364            tracing: Default::default(),
365        };
366        let event = LambdaEvent {
367            next: NextEvent::Invoke(invoke_event),
368        };
369
370        // Spawn task to handle the event
371        let handle = tokio::spawn(async move { extension.invoke(event).await });
372
373        // Send completion signal
374        sender.send(()).unwrap();
375
376        // Wait for handler to complete
377        let result = handle.await.unwrap();
378        assert!(result.is_ok());
379
380        Ok(())
381    }
382
383    #[tokio::test]
384    async fn test_extension_channel_closed() -> Result<(), Error> {
385        let (provider, _) = setup_test_provider();
386        let (sender, receiver) = unbounded_channel();
387
388        let extension = OtelInternalExtension::new(receiver, provider);
389
390        // Create an INVOKE event
391        let invoke_event = InvokeEvent {
392            deadline_ms: 1000,
393            request_id: "test-id".to_string(),
394            invoked_function_arn: "test-arn".to_string(),
395            tracing: Default::default(),
396        };
397        let event = LambdaEvent {
398            next: NextEvent::Invoke(invoke_event),
399        };
400
401        // Drop sender to close channel
402        drop(sender);
403
404        // Invoke should return error when channel is closed
405        let result = extension.invoke(event).await;
406        assert!(result.is_err());
407
408        Ok(())
409    }
410
411    #[tokio::test]
412    async fn test_extension_ignores_shutdown_events() -> Result<(), Error> {
413        let (provider, _) = setup_test_provider();
414        let (_sender, receiver) = unbounded_channel();
415
416        let extension = OtelInternalExtension::new(receiver, provider);
417
418        let event = LambdaEvent {
419            next: NextEvent::Shutdown(ShutdownEvent {
420                shutdown_reason: "SPINDOWN".to_string(),
421                deadline_ms: 1000,
422            }),
423        };
424
425        let result = extension.invoke(event).await;
426        assert!(result.is_ok());
427
428        Ok(())
429    }
430
431    #[tokio::test]
432    async fn test_extension_force_flush_exports_pending_spans() -> Result<(), Error> {
433        let (provider, exporter) = setup_batch_test_provider();
434        let tracer = provider.tracer("extension-test");
435        let (sender, receiver) = unbounded_channel();
436
437        {
438            let span = tracer.start("pending-span");
439            drop(span);
440        }
441
442        let extension = OtelInternalExtension::new(receiver, provider);
443
444        let event = LambdaEvent {
445            next: NextEvent::Invoke(InvokeEvent {
446                deadline_ms: 1000,
447                request_id: "test-id".to_string(),
448                invoked_function_arn: "test-arn".to_string(),
449                tracing: Default::default(),
450            }),
451        };
452
453        let handle = tokio::spawn(async move { extension.invoke(event).await });
454        sender.send(()).unwrap();
455
456        let result = handle.await.unwrap();
457        assert!(result.is_ok());
458        assert_eq!(exporter.export_count.load(Ordering::SeqCst), 1);
459        assert_eq!(exporter.get_spans().len(), 1);
460
461        Ok(())
462    }
463
464    #[tokio::test]
465    async fn test_extension_invoke_returns_ok_when_force_flush_fails() -> Result<(), Error> {
466        let provider = Arc::new(
467            SdkTracerProvider::builder()
468                .with_span_processor(FailingSpanProcessor)
469                .with_resource(Resource::builder_empty().build())
470                .build(),
471        );
472        let (sender, receiver) = unbounded_channel();
473
474        let extension = OtelInternalExtension::new(receiver, provider);
475
476        let event = LambdaEvent {
477            next: NextEvent::Invoke(InvokeEvent {
478                deadline_ms: 1000,
479                request_id: "test-id".to_string(),
480                invoked_function_arn: "test-arn".to_string(),
481                tracing: Default::default(),
482            }),
483        };
484
485        let handle = tokio::spawn(async move { extension.invoke(event).await });
486        sender.send(()).unwrap();
487
488        let result = handle.await.unwrap();
489        assert!(result.is_ok());
490
491        Ok(())
492    }
493}