lambda_otel_lite/extension.rs
1//! Lambda Extension for OpenTelemetry span processing.
2//!
3//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
4//! The extension integrates with AWS Lambda's Extensions API to efficiently manage telemetry data
5//! collection and export.
6//!
7//! # Architecture
8//!
9//! The extension operates as a background task within the Lambda function process and communicates
10//! with both the Lambda Runtime API and the function handler through asynchronous channels:
11//!
12//! 1. **Extension Registration**: On startup, the extension task registers with the Lambda Extensions API
13//! and subscribes to the appropriate events based on the processing mode.
14//!
15//! 2. **Handler Communication**: The extension uses a channel-based communication pattern to
16//! coordinate with the function handler for span export timing.
17//!
18//! 3. **Processing Modes**:
19//! - `Async`: Registers for INVOKE events and exports spans after handler completion
20//! - Spans are queued during handler execution
21//! - Export occurs after response is sent to user
22//! - Best for high-volume telemetry
23//! - `Finalize`: Registers with no events
24//! - Only installs SIGTERM handler
25//! - Lets application code control span export
26//! - Compatible with BatchSpanProcessor
27//!
28//! 4. **Graceful Shutdown**: The extension implements proper shutdown handling to ensure
29//! no telemetry data is lost when the Lambda environment is terminated.
30//!
31//! # Error Handling
32//!
33//! The extension implements robust error handling:
34//! - Logs all export errors without failing the function
35//! - Implements graceful shutdown on SIGTERM
36//! - Handles channel communication failures
37
38use crate::logger::Logger;
39use crate::ProcessorMode;
40use lambda_extension::{service_fn, Error, Extension, NextEvent};
41use opentelemetry_sdk::trace::SdkTracerProvider;
42use std::sync::Arc;
43use tokio::{
44 signal::unix::{signal, SignalKind},
45 sync::{
46 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
47 Mutex,
48 },
49};
50
51static LOGGER: Logger = Logger::const_new("extension");
52
53/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
54///
55/// This extension is responsible for:
56/// - Receiving completion signals from the handler
57/// - Flushing spans at the appropriate time
58/// - Managing the lifecycle of the tracer provider
59///
60/// # Thread Safety
61///
62/// The extension is designed to be thread-safe:
63/// - Uses `Arc` for shared ownership of the tracer provider
64/// - Implements proper synchronization through `Mutex` for the channel receiver
65/// - Safely handles concurrent access from multiple tasks
66///
67/// # Performance Characteristics
68///
69/// The extension is optimized for Lambda environments:
70/// - Minimizes memory usage through efficient buffering
71/// - Uses non-blocking channel communication
72/// - Implements backpressure handling to prevent memory exhaustion
73///
74/// # Error Handling
75///
76/// The extension handles various error scenarios:
77/// - Channel closed: Logs error and continues processing
78/// - Export failures: Logs errors without failing the function
79/// - Shutdown signals: Ensures final flush of spans
80///
81/// The extension operates asynchronously to minimize impact on handler latency.
82/// It uses a channel-based communication pattern to coordinate with the handler.
83pub struct OtelInternalExtension {
84 /// Channel receiver to know when the handler is done
85 request_done_receiver: Mutex<UnboundedReceiver<()>>,
86 /// Reference to the tracer provider for flushing spans
87 tracer_provider: Arc<SdkTracerProvider>,
88}
89
90impl OtelInternalExtension {
91 /// Creates a new OtelInternalExtension.
92 ///
93 /// # Arguments
94 ///
95 /// * `request_done_receiver` - Channel receiver for completion signals
96 /// * `tracer_provider` - TracerProvider for span management
97 pub fn new(
98 request_done_receiver: UnboundedReceiver<()>,
99 tracer_provider: Arc<SdkTracerProvider>,
100 ) -> Self {
101 Self {
102 request_done_receiver: Mutex::new(request_done_receiver),
103 tracer_provider,
104 }
105 }
106
107 /// Handles extension events and flushes telemetry after each invocation.
108 ///
109 /// This method implements the core event handling logic for the extension.
110 /// It coordinates with the Lambda function handler to ensure spans are
111 /// exported at the appropriate time.
112 ///
113 /// # Operation Flow
114 ///
115 /// 1. **Event Reception**:
116 /// - Receives Lambda extension events
117 /// - Filters for INVOKE events
118 /// - Ignores other event types
119 ///
120 /// 2. **Handler Coordination**:
121 /// - Waits for handler completion signal
122 /// - Uses async channel communication
123 /// - Handles channel closure gracefully
124 ///
125 /// 3. **Span Export**:
126 /// - Forces flush of all pending spans
127 /// - Handles export errors without failing
128 /// - Logs any export failures
129 ///
130 /// # Error Handling
131 ///
132 /// The method handles several failure scenarios:
133 ///
134 /// - **Channel Errors**:
135 /// - Channel closure: Returns error with descriptive message
136 /// - Send/receive failures: Properly propagated
137 ///
138 /// - **Export Errors**:
139 /// - Individual span export failures are logged
140 /// - Continues processing despite errors
141 /// - Maintains extension stability
142 ///
143 /// # Performance
144 ///
145 /// The method is optimized for Lambda environments:
146 /// - Uses async/await for efficient execution
147 /// - Minimizes blocking operations
148 /// - Implements proper error recovery
149 ///
150 /// # Arguments
151 ///
152 /// * `event` - The Lambda extension event to handle
153 ///
154 /// # Returns
155 ///
156 /// Returns `Ok(())` if the event was processed successfully, or an `Error`
157 /// if something went wrong during processing. Note that export errors are
158 /// logged but do not cause the method to return an error.
159 pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
160 if let NextEvent::Invoke(_e) = event.next {
161 // Wait for runtime to finish processing event
162 self.request_done_receiver
163 .lock()
164 .await
165 .recv()
166 .await
167 .ok_or_else(|| Error::from("channel closed"))?;
168 // Force flush all spans and handle any errors
169 if let Err(err) = self.tracer_provider.force_flush() {
170 LOGGER.error(format!(
171 "OtelInternalExtension.invoke.Error: Error flushing tracer provider: {:?}",
172 err
173 ));
174 }
175 }
176
177 Ok(())
178 }
179}
180
181/// Register an internal extension for handling OpenTelemetry span processing.
182///
183/// # Warning
184///
185/// This is an internal API used by [`init_telemetry`](crate::init_telemetry) and should not be called directly.
186/// Use [`init_telemetry`](crate::init_telemetry) instead to set up telemetry for your Lambda function.
187///
188/// # Internal Details
189///
190/// The extension registration process:
191/// 1. Creates communication channels for handler-extension coordination
192/// 2. Registers with Lambda Extensions API based on processor mode
193/// 3. Sets up signal handlers for graceful shutdown
194/// 4. Manages span export timing based on processor mode
195///
196/// # Arguments
197///
198/// * `tracer_provider` - The TracerProvider to use for span management
199/// * `processor_mode` - The mode determining how spans are processed
200///
201/// # Returns
202///
203/// Returns a channel sender for signaling completion, or an Error if registration fails.
204pub(crate) async fn register_extension(
205 tracer_provider: Arc<SdkTracerProvider>,
206 processor_mode: ProcessorMode,
207) -> Result<UnboundedSender<()>, Error> {
208 LOGGER.debug("OtelInternalExtension.register_extension: starting registration");
209 let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();
210
211 let extension = Arc::new(OtelInternalExtension::new(
212 request_done_receiver,
213 tracer_provider.clone(),
214 ));
215
216 // Register and start the extension
217 let mut ext = Extension::new();
218
219 // Only register for INVOKE events in async mode
220 if matches!(processor_mode, ProcessorMode::Async) {
221 ext = ext.with_events(&["INVOKE"]);
222 } else {
223 ext = ext.with_events(&[]);
224 }
225
226 let registered_extension = ext
227 .with_events_processor(service_fn(move |event| {
228 let extension = extension.clone();
229 async move { extension.invoke(event).await }
230 }))
231 .with_extension_name("otel-internal")
232 .register()
233 .await?;
234
235 // Run the extension in the background
236 tokio::spawn(async move {
237 if let Err(err) = registered_extension.run().await {
238 LOGGER.error(format!(
239 "OtelInternalExtension.run.Error: Error running extension: {:?}",
240 err
241 ));
242 }
243 });
244
245 // Set up signal handler for graceful shutdown
246 tokio::spawn(async move {
247 let mut sigterm = signal(SignalKind::terminate()).unwrap();
248
249 if sigterm.recv().await.is_some() {
250 LOGGER.debug("OtelInternalExtension.SIGTERM: SIGTERM received, flushing spans");
251 // Direct synchronous flush
252 if let Err(err) = tracer_provider.force_flush() {
253 LOGGER.error(format!(
254 "OtelInternalExtension.SIGTERM.Error: Error during shutdown: {:?}",
255 err
256 ));
257 }
258 LOGGER.debug("OtelInternalExtension.SIGTERM: Shutdown complete");
259 std::process::exit(0);
260 }
261 });
262
263 Ok(request_done_sender)
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use futures_util::future::BoxFuture;
270 use lambda_extension::{InvokeEvent, LambdaEvent};
271 use opentelemetry_sdk::{
272 trace::{SdkTracerProvider, SpanData, SpanExporter},
273 Resource,
274 };
275 use std::sync::{
276 atomic::{AtomicUsize, Ordering},
277 Mutex,
278 };
279
280 /// Test-specific logger
281
282 // Test exporter that captures spans
283 #[derive(Debug, Default, Clone)]
284 struct TestExporter {
285 export_count: Arc<AtomicUsize>,
286 spans: Arc<Mutex<Vec<SpanData>>>,
287 }
288
289 impl TestExporter {
290 fn new() -> Self {
291 Self {
292 export_count: Arc::new(AtomicUsize::new(0)),
293 spans: Arc::new(Mutex::new(Vec::new())),
294 }
295 }
296
297 #[allow(dead_code)]
298 fn get_spans(&self) -> Vec<SpanData> {
299 self.spans.lock().unwrap().clone()
300 }
301 }
302
303 impl SpanExporter for TestExporter {
304 fn export(
305 &mut self,
306 spans: Vec<SpanData>,
307 ) -> BoxFuture<'static, opentelemetry_sdk::error::OTelSdkResult> {
308 self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
309 self.spans.lock().unwrap().extend(spans);
310 Box::pin(futures_util::future::ready(Ok(())))
311 }
312 }
313
314 fn setup_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
315 let exporter = TestExporter::new();
316 let provider = SdkTracerProvider::builder()
317 .with_simple_exporter(exporter.clone())
318 .with_resource(Resource::builder_empty().build())
319 .build();
320
321 (Arc::new(provider), Arc::new(exporter))
322 }
323
324 #[tokio::test]
325 async fn test_extension_invoke_handling() -> Result<(), Error> {
326 let (provider, _) = setup_test_provider();
327 let (sender, receiver) = unbounded_channel();
328
329 let extension = OtelInternalExtension::new(receiver, provider);
330
331 // Create an INVOKE event
332 let invoke_event = InvokeEvent {
333 deadline_ms: 1000,
334 request_id: "test-id".to_string(),
335 invoked_function_arn: "test-arn".to_string(),
336 tracing: Default::default(),
337 };
338 let event = LambdaEvent {
339 next: NextEvent::Invoke(invoke_event),
340 };
341
342 // Spawn task to handle the event
343 let handle = tokio::spawn(async move { extension.invoke(event).await });
344
345 // Send completion signal
346 sender.send(()).unwrap();
347
348 // Wait for handler to complete
349 let result = handle.await.unwrap();
350 assert!(result.is_ok());
351
352 Ok(())
353 }
354
355 #[tokio::test]
356 async fn test_extension_channel_closed() -> Result<(), Error> {
357 let (provider, _) = setup_test_provider();
358 let (sender, receiver) = unbounded_channel();
359
360 let extension = OtelInternalExtension::new(receiver, provider);
361
362 // Create an INVOKE event
363 let invoke_event = InvokeEvent {
364 deadline_ms: 1000,
365 request_id: "test-id".to_string(),
366 invoked_function_arn: "test-arn".to_string(),
367 tracing: Default::default(),
368 };
369 let event = LambdaEvent {
370 next: NextEvent::Invoke(invoke_event),
371 };
372
373 // Drop sender to close channel
374 drop(sender);
375
376 // Invoke should return error when channel is closed
377 let result = extension.invoke(event).await;
378 assert!(result.is_err());
379
380 Ok(())
381 }
382}