lambda_otel_lite/
extension.rs

1//! Lambda Extension for OpenTelemetry span processing.
2//!
3//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
4//! The extension integrates with AWS Lambda's Extensions API to efficiently manage telemetry data
5//! collection and export.
6//!
7//! # Architecture
8//!
9//! The extension operates as a background task within the Lambda function process and communicates
10//! with both the Lambda Runtime API and the function handler through asynchronous channels:
11//!
12//! 1. **Extension Registration**: On startup, the extension task registers with the Lambda Extensions API
13//!    and subscribes to the appropriate events based on the processing mode.
14//!
15//! 2. **Handler Communication**: The extension uses a channel-based communication pattern to
16//!    coordinate with the function handler for span export timing.
17//!
18//! 3. **Processing Modes**:
19//!    - `Async`: Registers for INVOKE events and exports spans after handler completion
20//!      - Spans are queued during handler execution
21//!      - Export occurs after response is sent to user
22//!      - Best for high-volume telemetry
23//!    - `Finalize`: Registers with no events
24//!      - Only installs SIGTERM handler
25//!      - Lets application code control span export
26//!      - Compatible with BatchSpanProcessor
27//!
28//! 4. **Graceful Shutdown**: The extension implements proper shutdown handling to ensure
29//!    no telemetry data is lost when the Lambda environment is terminated.
30//!
31//! # Error Handling
32//!
33//! The extension implements robust error handling:
34//! - Logs all export errors without failing the function
35//! - Implements graceful shutdown on SIGTERM
36//! - Handles channel communication failures
37
38use crate::logger::Logger;
39use crate::ProcessorMode;
40use lambda_extension::{service_fn, Error, Extension, NextEvent};
41use opentelemetry_sdk::trace::SdkTracerProvider;
42use std::sync::Arc;
43use tokio::{
44    signal::unix::{signal, SignalKind},
45    sync::{
46        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
47        Mutex,
48    },
49};
50
51static LOGGER: Logger = Logger::const_new("extension");
52
53/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
54///
55/// This extension is responsible for:
56/// - Receiving completion signals from the handler
57/// - Flushing spans at the appropriate time
58/// - Managing the lifecycle of the tracer provider
59///
60/// # Thread Safety
61///
62/// The extension is designed to be thread-safe:
63/// - Uses `Arc` for shared ownership of the tracer provider
64/// - Implements proper synchronization through `Mutex` for the channel receiver
65/// - Safely handles concurrent access from multiple tasks
66///
67/// # Performance Characteristics
68///
69/// The extension is optimized for Lambda environments:
70/// - Minimizes memory usage through efficient buffering
71/// - Uses non-blocking channel communication
72/// - Implements backpressure handling to prevent memory exhaustion
73///
74/// # Error Handling
75///
76/// The extension handles various error scenarios:
77/// - Channel closed: Logs error and continues processing
78/// - Export failures: Logs errors without failing the function
79/// - Shutdown signals: Ensures final flush of spans
80///
81/// The extension operates asynchronously to minimize impact on handler latency.
82/// It uses a channel-based communication pattern to coordinate with the handler.
83pub struct OtelInternalExtension {
84    /// Channel receiver to know when the handler is done
85    request_done_receiver: Mutex<UnboundedReceiver<()>>,
86    /// Reference to the tracer provider for flushing spans
87    tracer_provider: Arc<SdkTracerProvider>,
88}
89
90impl OtelInternalExtension {
91    /// Creates a new OtelInternalExtension.
92    ///
93    /// # Arguments
94    ///
95    /// * `request_done_receiver` - Channel receiver for completion signals
96    /// * `tracer_provider` - TracerProvider for span management
97    pub fn new(
98        request_done_receiver: UnboundedReceiver<()>,
99        tracer_provider: Arc<SdkTracerProvider>,
100    ) -> Self {
101        Self {
102            request_done_receiver: Mutex::new(request_done_receiver),
103            tracer_provider,
104        }
105    }
106
107    /// Handles extension events and flushes telemetry after each invocation.
108    ///
109    /// This method implements the core event handling logic for the extension.
110    /// It coordinates with the Lambda function handler to ensure spans are
111    /// exported at the appropriate time.
112    ///
113    /// # Operation Flow
114    ///
115    /// 1. **Event Reception**:
116    ///    - Receives Lambda extension events
117    ///    - Filters for INVOKE events
118    ///    - Ignores other event types
119    ///
120    /// 2. **Handler Coordination**:
121    ///    - Waits for handler completion signal
122    ///    - Uses async channel communication
123    ///    - Handles channel closure gracefully
124    ///
125    /// 3. **Span Export**:
126    ///    - Forces flush of all pending spans
127    ///    - Handles export errors without failing
128    ///    - Logs any export failures
129    ///
130    /// # Error Handling
131    ///
132    /// The method handles several failure scenarios:
133    ///
134    /// - **Channel Errors**:
135    ///    - Channel closure: Returns error with descriptive message
136    ///    - Send/receive failures: Properly propagated
137    ///
138    /// - **Export Errors**:
139    ///    - Individual span export failures are logged
140    ///    - Continues processing despite errors
141    ///    - Maintains extension stability
142    ///
143    /// # Performance
144    ///
145    /// The method is optimized for Lambda environments:
146    /// - Uses async/await for efficient execution
147    /// - Minimizes blocking operations
148    /// - Implements proper error recovery
149    ///
150    /// # Arguments
151    ///
152    /// * `event` - The Lambda extension event to handle
153    ///
154    /// # Returns
155    ///
156    /// Returns `Ok(())` if the event was processed successfully, or an `Error`
157    /// if something went wrong during processing. Note that export errors are
158    /// logged but do not cause the method to return an error.
159    pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
160        if let NextEvent::Invoke(_e) = event.next {
161            // Wait for runtime to finish processing event
162            self.request_done_receiver
163                .lock()
164                .await
165                .recv()
166                .await
167                .ok_or_else(|| Error::from("channel closed"))?;
168            // Force flush all spans and handle any errors
169            if let Err(err) = self.tracer_provider.force_flush() {
170                LOGGER.error(format!(
171                    "OtelInternalExtension.invoke.Error: Error flushing tracer provider: {err:?}"
172                ));
173            }
174        }
175
176        Ok(())
177    }
178}
179
180/// Register an internal extension for handling OpenTelemetry span processing.
181///
182/// # Warning
183///
184/// This is an internal API used by [`init_telemetry`](crate::init_telemetry) and should not be called directly.
185/// Use [`init_telemetry`](crate::init_telemetry) instead to set up telemetry for your Lambda function.
186///
187/// # Internal Details
188///
189/// The extension registration process:
190/// 1. Creates communication channels for handler-extension coordination
191/// 2. Registers with Lambda Extensions API based on processor mode
192/// 3. Sets up signal handlers for graceful shutdown
193/// 4. Manages span export timing based on processor mode
194///
195/// # Arguments
196///
197/// * `tracer_provider` - The TracerProvider to use for span management
198/// * `processor_mode` - The mode determining how spans are processed
199///
200/// # Returns
201///
202/// Returns a channel sender for signaling completion, or an Error if registration fails.
203pub(crate) async fn register_extension(
204    tracer_provider: Arc<SdkTracerProvider>,
205    processor_mode: ProcessorMode,
206) -> Result<UnboundedSender<()>, Error> {
207    LOGGER.debug("OtelInternalExtension.register_extension: starting registration");
208    let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();
209
210    let extension = Arc::new(OtelInternalExtension::new(
211        request_done_receiver,
212        tracer_provider.clone(),
213    ));
214
215    // Register and start the extension
216    let mut ext = Extension::new();
217
218    // Only register for INVOKE events in async mode
219    if matches!(processor_mode, ProcessorMode::Async) {
220        ext = ext.with_events(&["INVOKE"]);
221    } else {
222        ext = ext.with_events(&[]);
223    }
224
225    let registered_extension = ext
226        .with_events_processor(service_fn(move |event| {
227            let extension = extension.clone();
228            async move { extension.invoke(event).await }
229        }))
230        .with_extension_name("otel-internal")
231        .register()
232        .await?;
233
234    // Run the extension in the background
235    tokio::spawn(async move {
236        if let Err(err) = registered_extension.run().await {
237            LOGGER.error(format!(
238                "OtelInternalExtension.run.Error: Error running extension: {err:?}"
239            ));
240        }
241    });
242
243    // Set up signal handler for graceful shutdown
244    tokio::spawn(async move {
245        let mut sigterm = signal(SignalKind::terminate()).unwrap();
246
247        if sigterm.recv().await.is_some() {
248            LOGGER.debug("OtelInternalExtension.SIGTERM: SIGTERM received, flushing spans");
249            // Direct synchronous flush
250            if let Err(err) = tracer_provider.force_flush() {
251                LOGGER.error(format!(
252                    "OtelInternalExtension.SIGTERM.Error: Error during shutdown: {err:?}"
253                ));
254            }
255            LOGGER.debug("OtelInternalExtension.SIGTERM: Shutdown complete");
256            std::process::exit(0);
257        }
258    });
259
260    Ok(request_done_sender)
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use lambda_extension::{InvokeEvent, LambdaEvent};
267    use opentelemetry_sdk::{
268        trace::{SdkTracerProvider, SpanData, SpanExporter},
269        Resource,
270    };
271    use std::sync::{
272        atomic::{AtomicUsize, Ordering},
273        Mutex,
274    };
275
276    /// Test-specific logger
277
278    // Test exporter that captures spans
279    #[derive(Debug, Default, Clone)]
280    struct TestExporter {
281        export_count: Arc<AtomicUsize>,
282        spans: Arc<Mutex<Vec<SpanData>>>,
283    }
284
285    impl TestExporter {
286        fn new() -> Self {
287            Self {
288                export_count: Arc::new(AtomicUsize::new(0)),
289                spans: Arc::new(Mutex::new(Vec::new())),
290            }
291        }
292
293        #[allow(dead_code)]
294        fn get_spans(&self) -> Vec<SpanData> {
295            self.spans.lock().unwrap().clone()
296        }
297    }
298
299    impl SpanExporter for TestExporter {
300        fn export(
301            &self,
302            spans: Vec<SpanData>,
303        ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
304        {
305            self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
306            self.spans.lock().unwrap().extend(spans);
307            futures_util::future::ready(Ok(()))
308        }
309    }
310
311    fn setup_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
312        let exporter = TestExporter::new();
313        let provider = SdkTracerProvider::builder()
314            .with_simple_exporter(exporter.clone())
315            .with_resource(Resource::builder_empty().build())
316            .build();
317
318        (Arc::new(provider), Arc::new(exporter))
319    }
320
321    #[tokio::test]
322    async fn test_extension_invoke_handling() -> Result<(), Error> {
323        let (provider, _) = setup_test_provider();
324        let (sender, receiver) = unbounded_channel();
325
326        let extension = OtelInternalExtension::new(receiver, provider);
327
328        // Create an INVOKE event
329        let invoke_event = InvokeEvent {
330            deadline_ms: 1000,
331            request_id: "test-id".to_string(),
332            invoked_function_arn: "test-arn".to_string(),
333            tracing: Default::default(),
334        };
335        let event = LambdaEvent {
336            next: NextEvent::Invoke(invoke_event),
337        };
338
339        // Spawn task to handle the event
340        let handle = tokio::spawn(async move { extension.invoke(event).await });
341
342        // Send completion signal
343        sender.send(()).unwrap();
344
345        // Wait for handler to complete
346        let result = handle.await.unwrap();
347        assert!(result.is_ok());
348
349        Ok(())
350    }
351
352    #[tokio::test]
353    async fn test_extension_channel_closed() -> Result<(), Error> {
354        let (provider, _) = setup_test_provider();
355        let (sender, receiver) = unbounded_channel();
356
357        let extension = OtelInternalExtension::new(receiver, provider);
358
359        // Create an INVOKE event
360        let invoke_event = InvokeEvent {
361            deadline_ms: 1000,
362            request_id: "test-id".to_string(),
363            invoked_function_arn: "test-arn".to_string(),
364            tracing: Default::default(),
365        };
366        let event = LambdaEvent {
367            next: NextEvent::Invoke(invoke_event),
368        };
369
370        // Drop sender to close channel
371        drop(sender);
372
373        // Invoke should return error when channel is closed
374        let result = extension.invoke(event).await;
375        assert!(result.is_err());
376
377        Ok(())
378    }
379}