lambda_otel_lite/
extension.rs

1//! Lambda Extension for OpenTelemetry span processing.
2//!
3//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
4//! The extension integrates with AWS Lambda's Extensions API to efficiently manage telemetry data
5//! collection and export.
6//!
7//! # Architecture
8//!
9//! The extension operates as a background task within the Lambda function process and communicates
10//! with both the Lambda Runtime API and the function handler through asynchronous channels:
11//!
12//! 1. **Extension Registration**: On startup, the extension task registers with the Lambda Extensions API
13//!    and subscribes to the appropriate events based on the processing mode.
14//!
15//! 2. **Handler Communication**: The extension uses a channel-based communication pattern to
16//!    coordinate with the function handler for span export timing.
17//!
18//! 3. **Processing Modes**:
19//!    - `Async`: Registers for INVOKE events and exports spans after handler completion
20//!      - Spans are queued during handler execution
21//!      - Export occurs after response is sent to user
22//!      - Best for high-volume telemetry
23//!    - `Finalize`: Registers with no events
24//!      - Only installs SIGTERM handler
25//!      - Lets application code control span export
26//!      - Compatible with BatchSpanProcessor
27//!
28//! 4. **Graceful Shutdown**: The extension implements proper shutdown handling to ensure
29//!    no telemetry data is lost when the Lambda environment is terminated.
30//!
31//! # Error Handling
32//!
33//! The extension implements robust error handling:
34//! - Logs all export errors without failing the function
35//! - Implements graceful shutdown on SIGTERM
36//! - Handles channel communication failures
37
38use crate::logger::Logger;
39use crate::ProcessorMode;
40use lambda_extension::{service_fn, Error, Extension, NextEvent};
41use opentelemetry_sdk::trace::SdkTracerProvider;
42use std::sync::Arc;
43use tokio::{
44    signal::unix::{signal, SignalKind},
45    sync::{
46        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
47        Mutex,
48    },
49};
50
51static LOGGER: Logger = Logger::const_new("extension");
52
53/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
54///
55/// This extension is responsible for:
56/// - Receiving completion signals from the handler
57/// - Flushing spans at the appropriate time
58/// - Managing the lifecycle of the tracer provider
59///
60/// # Thread Safety
61///
62/// The extension is designed to be thread-safe:
63/// - Uses `Arc` for shared ownership of the tracer provider
64/// - Implements proper synchronization through `Mutex` for the channel receiver
65/// - Safely handles concurrent access from multiple tasks
66///
67/// # Performance Characteristics
68///
69/// The extension is optimized for Lambda environments:
70/// - Minimizes memory usage through efficient buffering
71/// - Uses non-blocking channel communication
72/// - Implements backpressure handling to prevent memory exhaustion
73///
74/// # Error Handling
75///
76/// The extension handles various error scenarios:
77/// - Channel closed: Logs error and continues processing
78/// - Export failures: Logs errors without failing the function
79/// - Shutdown signals: Ensures final flush of spans
80///
81/// The extension operates asynchronously to minimize impact on handler latency.
82/// It uses a channel-based communication pattern to coordinate with the handler.
83pub struct OtelInternalExtension {
84    /// Channel receiver to know when the handler is done
85    request_done_receiver: Mutex<UnboundedReceiver<()>>,
86    /// Reference to the tracer provider for flushing spans
87    tracer_provider: Arc<SdkTracerProvider>,
88}
89
90impl OtelInternalExtension {
91    /// Creates a new OtelInternalExtension.
92    ///
93    /// # Arguments
94    ///
95    /// * `request_done_receiver` - Channel receiver for completion signals
96    /// * `tracer_provider` - TracerProvider for span management
97    pub fn new(
98        request_done_receiver: UnboundedReceiver<()>,
99        tracer_provider: Arc<SdkTracerProvider>,
100    ) -> Self {
101        Self {
102            request_done_receiver: Mutex::new(request_done_receiver),
103            tracer_provider,
104        }
105    }
106
107    /// Handles extension events and flushes telemetry after each invocation.
108    ///
109    /// This method implements the core event handling logic for the extension.
110    /// It coordinates with the Lambda function handler to ensure spans are
111    /// exported at the appropriate time.
112    ///
113    /// # Operation Flow
114    ///
115    /// 1. **Event Reception**:
116    ///    - Receives Lambda extension events
117    ///    - Filters for INVOKE events
118    ///    - Ignores other event types
119    ///
120    /// 2. **Handler Coordination**:
121    ///    - Waits for handler completion signal
122    ///    - Uses async channel communication
123    ///    - Handles channel closure gracefully
124    ///
125    /// 3. **Span Export**:
126    ///    - Forces flush of all pending spans
127    ///    - Handles export errors without failing
128    ///    - Logs any export failures
129    ///
130    /// # Error Handling
131    ///
132    /// The method handles several failure scenarios:
133    ///
134    /// - **Channel Errors**:
135    ///    - Channel closure: Returns error with descriptive message
136    ///    - Send/receive failures: Properly propagated
137    ///
138    /// - **Export Errors**:
139    ///    - Individual span export failures are logged
140    ///    - Continues processing despite errors
141    ///    - Maintains extension stability
142    ///
143    /// # Performance
144    ///
145    /// The method is optimized for Lambda environments:
146    /// - Uses async/await for efficient execution
147    /// - Minimizes blocking operations
148    /// - Implements proper error recovery
149    ///
150    /// # Arguments
151    ///
152    /// * `event` - The Lambda extension event to handle
153    ///
154    /// # Returns
155    ///
156    /// Returns `Ok(())` if the event was processed successfully, or an `Error`
157    /// if something went wrong during processing. Note that export errors are
158    /// logged but do not cause the method to return an error.
159    pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
160        if let NextEvent::Invoke(_e) = event.next {
161            // Wait for runtime to finish processing event
162            self.request_done_receiver
163                .lock()
164                .await
165                .recv()
166                .await
167                .ok_or_else(|| Error::from("channel closed"))?;
168            // Force flush all spans and handle any errors
169            if let Err(err) = self.tracer_provider.force_flush() {
170                LOGGER.error(format!(
171                    "OtelInternalExtension.invoke.Error: Error flushing tracer provider: {:?}",
172                    err
173                ));
174            }
175        }
176
177        Ok(())
178    }
179}
180
181/// Register an internal extension for handling OpenTelemetry span processing.
182///
183/// # Warning
184///
185/// This is an internal API used by [`init_telemetry`](crate::init_telemetry) and should not be called directly.
186/// Use [`init_telemetry`](crate::init_telemetry) instead to set up telemetry for your Lambda function.
187///
188/// # Internal Details
189///
190/// The extension registration process:
191/// 1. Creates communication channels for handler-extension coordination
192/// 2. Registers with Lambda Extensions API based on processor mode
193/// 3. Sets up signal handlers for graceful shutdown
194/// 4. Manages span export timing based on processor mode
195///
196/// # Arguments
197///
198/// * `tracer_provider` - The TracerProvider to use for span management
199/// * `processor_mode` - The mode determining how spans are processed
200///
201/// # Returns
202///
203/// Returns a channel sender for signaling completion, or an Error if registration fails.
204pub(crate) async fn register_extension(
205    tracer_provider: Arc<SdkTracerProvider>,
206    processor_mode: ProcessorMode,
207) -> Result<UnboundedSender<()>, Error> {
208    LOGGER.debug("OtelInternalExtension.register_extension: starting registration");
209    let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();
210
211    let extension = Arc::new(OtelInternalExtension::new(
212        request_done_receiver,
213        tracer_provider.clone(),
214    ));
215
216    // Register and start the extension
217    let mut ext = Extension::new();
218
219    // Only register for INVOKE events in async mode
220    if matches!(processor_mode, ProcessorMode::Async) {
221        ext = ext.with_events(&["INVOKE"]);
222    } else {
223        ext = ext.with_events(&[]);
224    }
225
226    let registered_extension = ext
227        .with_events_processor(service_fn(move |event| {
228            let extension = extension.clone();
229            async move { extension.invoke(event).await }
230        }))
231        .with_extension_name("otel-internal")
232        .register()
233        .await?;
234
235    // Run the extension in the background
236    tokio::spawn(async move {
237        if let Err(err) = registered_extension.run().await {
238            LOGGER.error(format!(
239                "OtelInternalExtension.run.Error: Error running extension: {:?}",
240                err
241            ));
242        }
243    });
244
245    // Set up signal handler for graceful shutdown
246    tokio::spawn(async move {
247        let mut sigterm = signal(SignalKind::terminate()).unwrap();
248
249        if sigterm.recv().await.is_some() {
250            LOGGER.debug("OtelInternalExtension.SIGTERM: SIGTERM received, flushing spans");
251            // Direct synchronous flush
252            if let Err(err) = tracer_provider.force_flush() {
253                LOGGER.error(format!(
254                    "OtelInternalExtension.SIGTERM.Error: Error during shutdown: {:?}",
255                    err
256                ));
257            }
258            LOGGER.debug("OtelInternalExtension.SIGTERM: Shutdown complete");
259            std::process::exit(0);
260        }
261    });
262
263    Ok(request_done_sender)
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use futures_util::future::BoxFuture;
270    use lambda_extension::{InvokeEvent, LambdaEvent};
271    use opentelemetry_sdk::{
272        trace::{SdkTracerProvider, SpanData, SpanExporter},
273        Resource,
274    };
275    use std::sync::{
276        atomic::{AtomicUsize, Ordering},
277        Mutex,
278    };
279
280    /// Test-specific logger
281
282    // Test exporter that captures spans
283    #[derive(Debug, Default, Clone)]
284    struct TestExporter {
285        export_count: Arc<AtomicUsize>,
286        spans: Arc<Mutex<Vec<SpanData>>>,
287    }
288
289    impl TestExporter {
290        fn new() -> Self {
291            Self {
292                export_count: Arc::new(AtomicUsize::new(0)),
293                spans: Arc::new(Mutex::new(Vec::new())),
294            }
295        }
296
297        #[allow(dead_code)]
298        fn get_spans(&self) -> Vec<SpanData> {
299            self.spans.lock().unwrap().clone()
300        }
301    }
302
303    impl SpanExporter for TestExporter {
304        fn export(
305            &mut self,
306            spans: Vec<SpanData>,
307        ) -> BoxFuture<'static, opentelemetry_sdk::error::OTelSdkResult> {
308            self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
309            self.spans.lock().unwrap().extend(spans);
310            Box::pin(futures_util::future::ready(Ok(())))
311        }
312    }
313
314    fn setup_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
315        let exporter = TestExporter::new();
316        let provider = SdkTracerProvider::builder()
317            .with_simple_exporter(exporter.clone())
318            .with_resource(Resource::builder_empty().build())
319            .build();
320
321        (Arc::new(provider), Arc::new(exporter))
322    }
323
324    #[tokio::test]
325    async fn test_extension_invoke_handling() -> Result<(), Error> {
326        let (provider, _) = setup_test_provider();
327        let (sender, receiver) = unbounded_channel();
328
329        let extension = OtelInternalExtension::new(receiver, provider);
330
331        // Create an INVOKE event
332        let invoke_event = InvokeEvent {
333            deadline_ms: 1000,
334            request_id: "test-id".to_string(),
335            invoked_function_arn: "test-arn".to_string(),
336            tracing: Default::default(),
337        };
338        let event = LambdaEvent {
339            next: NextEvent::Invoke(invoke_event),
340        };
341
342        // Spawn task to handle the event
343        let handle = tokio::spawn(async move { extension.invoke(event).await });
344
345        // Send completion signal
346        sender.send(()).unwrap();
347
348        // Wait for handler to complete
349        let result = handle.await.unwrap();
350        assert!(result.is_ok());
351
352        Ok(())
353    }
354
355    #[tokio::test]
356    async fn test_extension_channel_closed() -> Result<(), Error> {
357        let (provider, _) = setup_test_provider();
358        let (sender, receiver) = unbounded_channel();
359
360        let extension = OtelInternalExtension::new(receiver, provider);
361
362        // Create an INVOKE event
363        let invoke_event = InvokeEvent {
364            deadline_ms: 1000,
365            request_id: "test-id".to_string(),
366            invoked_function_arn: "test-arn".to_string(),
367            tracing: Default::default(),
368        };
369        let event = LambdaEvent {
370            next: NextEvent::Invoke(invoke_event),
371        };
372
373        // Drop sender to close channel
374        drop(sender);
375
376        // Invoke should return error when channel is closed
377        let result = extension.invoke(event).await;
378        assert!(result.is_err());
379
380        Ok(())
381    }
382}