lambda_otel_lite/extension.rs
1//! Lambda Extension for OpenTelemetry span processing.
2//!
3//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
4//! The extension integrates with AWS Lambda's Extensions API to efficiently manage telemetry data
5//! collection and export.
6//!
7//! # Architecture
8//!
9//! The extension operates as a background task within the Lambda function process and communicates
10//! with both the Lambda Runtime API and the function handler through asynchronous channels:
11//!
12//! 1. **Extension Registration**: On startup, the extension task registers with the Lambda Extensions API
13//! and subscribes to the appropriate events based on the processing mode.
14//!
15//! 2. **Handler Communication**: The extension uses a channel-based communication pattern to
16//! coordinate with the function handler for span export timing.
17//!
18//! 3. **Processing Modes**:
19//! - `Async`: Registers for INVOKE events and exports spans after handler completion
20//! - Spans are queued during handler execution
21//! - Export occurs after response is sent to user
22//! - Best for high-volume telemetry
23//! - `Finalize`: Registers with no events
24//! - Only installs SIGTERM handler
25//! - Lets application code control span export
26//! - Compatible with BatchSpanProcessor
27//!
28//! 4. **Graceful Shutdown**: The extension implements proper shutdown handling to ensure
29//! no telemetry data is lost when the Lambda environment is terminated.
30//!
31//! # Error Handling
32//!
33//! The extension implements robust error handling:
34//! - Logs all export errors without failing the function
35//! - Implements graceful shutdown on SIGTERM
36//! - Handles channel communication failures
37
38use crate::logger::Logger;
39use crate::ProcessorMode;
40use lambda_extension::{service_fn, Error, Extension, NextEvent};
41use opentelemetry_sdk::trace::SdkTracerProvider;
42use std::sync::Arc;
43use tokio::{
44 signal::unix::{signal, SignalKind},
45 sync::{
46 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
47 Mutex,
48 },
49};
50
51static LOGGER: Logger = Logger::const_new("extension");
52
53/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
54///
55/// This extension is responsible for:
56/// - Receiving completion signals from the handler
57/// - Flushing spans at the appropriate time
58/// - Managing the lifecycle of the tracer provider
59///
60/// # Thread Safety
61///
62/// The extension is designed to be thread-safe:
63/// - Uses `Arc` for shared ownership of the tracer provider
64/// - Implements proper synchronization through `Mutex` for the channel receiver
65/// - Safely handles concurrent access from multiple tasks
66///
67/// # Performance Characteristics
68///
69/// The extension is optimized for Lambda environments:
70/// - Minimizes memory usage through efficient buffering
71/// - Uses non-blocking channel communication
72/// - Implements backpressure handling to prevent memory exhaustion
73///
74/// # Error Handling
75///
76/// The extension handles various error scenarios:
77/// - Channel closed: Logs error and continues processing
78/// - Export failures: Logs errors without failing the function
79/// - Shutdown signals: Ensures final flush of spans
80///
81/// The extension operates asynchronously to minimize impact on handler latency.
82/// It uses a channel-based communication pattern to coordinate with the handler.
83pub struct OtelInternalExtension {
84 /// Channel receiver to know when the handler is done
85 request_done_receiver: Mutex<UnboundedReceiver<()>>,
86 /// Reference to the tracer provider for flushing spans
87 tracer_provider: Arc<SdkTracerProvider>,
88}
89
90impl OtelInternalExtension {
91 /// Creates a new OtelInternalExtension.
92 ///
93 /// # Arguments
94 ///
95 /// * `request_done_receiver` - Channel receiver for completion signals
96 /// * `tracer_provider` - TracerProvider for span management
97 pub fn new(
98 request_done_receiver: UnboundedReceiver<()>,
99 tracer_provider: Arc<SdkTracerProvider>,
100 ) -> Self {
101 Self {
102 request_done_receiver: Mutex::new(request_done_receiver),
103 tracer_provider,
104 }
105 }
106
107 /// Handles extension events and flushes telemetry after each invocation.
108 ///
109 /// This method implements the core event handling logic for the extension.
110 /// It coordinates with the Lambda function handler to ensure spans are
111 /// exported at the appropriate time.
112 ///
113 /// # Operation Flow
114 ///
115 /// 1. **Event Reception**:
116 /// - Receives Lambda extension events
117 /// - Filters for INVOKE events
118 /// - Ignores other event types
119 ///
120 /// 2. **Handler Coordination**:
121 /// - Waits for handler completion signal
122 /// - Uses async channel communication
123 /// - Handles channel closure gracefully
124 ///
125 /// 3. **Span Export**:
126 /// - Forces flush of all pending spans
127 /// - Handles export errors without failing
128 /// - Logs any export failures
129 ///
130 /// # Error Handling
131 ///
132 /// The method handles several failure scenarios:
133 ///
134 /// - **Channel Errors**:
135 /// - Channel closure: Returns error with descriptive message
136 /// - Send/receive failures: Properly propagated
137 ///
138 /// - **Export Errors**:
139 /// - Individual span export failures are logged
140 /// - Continues processing despite errors
141 /// - Maintains extension stability
142 ///
143 /// # Performance
144 ///
145 /// The method is optimized for Lambda environments:
146 /// - Uses async/await for efficient execution
147 /// - Minimizes blocking operations
148 /// - Implements proper error recovery
149 ///
150 /// # Arguments
151 ///
152 /// * `event` - The Lambda extension event to handle
153 ///
154 /// # Returns
155 ///
156 /// Returns `Ok(())` if the event was processed successfully, or an `Error`
157 /// if something went wrong during processing. Note that export errors are
158 /// logged but do not cause the method to return an error.
159 pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
160 if let NextEvent::Invoke(_e) = event.next {
161 // Wait for runtime to finish processing event
162 self.request_done_receiver
163 .lock()
164 .await
165 .recv()
166 .await
167 .ok_or_else(|| Error::from("channel closed"))?;
168 // Force flush all spans and handle any errors
169 if let Err(err) = self.tracer_provider.force_flush() {
170 LOGGER.error(format!(
171 "OtelInternalExtension.invoke.Error: Error flushing tracer provider: {err:?}"
172 ));
173 }
174 }
175
176 Ok(())
177 }
178}
179
180/// Register an internal extension for handling OpenTelemetry span processing.
181///
182/// # Warning
183///
184/// This is an internal API used by [`init_telemetry`](crate::init_telemetry) and should not be called directly.
185/// Use [`init_telemetry`](crate::init_telemetry) instead to set up telemetry for your Lambda function.
186///
187/// # Internal Details
188///
189/// The extension registration process:
190/// 1. Creates communication channels for handler-extension coordination
191/// 2. Registers with Lambda Extensions API based on processor mode
192/// 3. Sets up signal handlers for graceful shutdown
193/// 4. Manages span export timing based on processor mode
194///
195/// # Arguments
196///
197/// * `tracer_provider` - The TracerProvider to use for span management
198/// * `processor_mode` - The mode determining how spans are processed
199///
200/// # Returns
201///
202/// Returns a channel sender for signaling completion, or an Error if registration fails.
203pub(crate) async fn register_extension(
204 tracer_provider: Arc<SdkTracerProvider>,
205 processor_mode: ProcessorMode,
206) -> Result<UnboundedSender<()>, Error> {
207 LOGGER.debug("OtelInternalExtension.register_extension: starting registration");
208 let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();
209
210 let extension = Arc::new(OtelInternalExtension::new(
211 request_done_receiver,
212 tracer_provider.clone(),
213 ));
214
215 // Register and start the extension
216 let mut ext = Extension::new();
217
218 // Only register for INVOKE events in async mode
219 if matches!(processor_mode, ProcessorMode::Async) {
220 ext = ext.with_events(&["INVOKE"]);
221 } else {
222 ext = ext.with_events(&[]);
223 }
224
225 let registered_extension = ext
226 .with_events_processor(service_fn(move |event| {
227 let extension = extension.clone();
228 async move { extension.invoke(event).await }
229 }))
230 .with_extension_name("otel-internal")
231 .register()
232 .await?;
233
234 // Run the extension in the background
235 tokio::spawn(async move {
236 if let Err(err) = registered_extension.run().await {
237 LOGGER.error(format!(
238 "OtelInternalExtension.run.Error: Error running extension: {err:?}"
239 ));
240 }
241 });
242
243 // Set up signal handler for graceful shutdown
244 tokio::spawn(async move {
245 let mut sigterm = signal(SignalKind::terminate()).unwrap();
246
247 if sigterm.recv().await.is_some() {
248 LOGGER.debug("OtelInternalExtension.SIGTERM: SIGTERM received, flushing spans");
249 // Direct synchronous flush
250 if let Err(err) = tracer_provider.force_flush() {
251 LOGGER.error(format!(
252 "OtelInternalExtension.SIGTERM.Error: Error during shutdown: {err:?}"
253 ));
254 }
255 LOGGER.debug("OtelInternalExtension.SIGTERM: Shutdown complete");
256 std::process::exit(0);
257 }
258 });
259
260 Ok(request_done_sender)
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use lambda_extension::{InvokeEvent, LambdaEvent, ShutdownEvent};
267 use opentelemetry::trace::{Tracer, TracerProvider as _};
268 use opentelemetry::Context;
269 use opentelemetry_sdk::{
270 error::{OTelSdkError, OTelSdkResult},
271 trace::{SdkTracerProvider, Span, SpanData, SpanExporter, SpanProcessor},
272 Resource,
273 };
274 use std::sync::{
275 atomic::{AtomicUsize, Ordering},
276 Mutex,
277 };
278 use std::time::Duration;
279
280 /// Test-specific logger
281
282 // Test exporter that captures spans
283 #[derive(Debug, Default, Clone)]
284 struct TestExporter {
285 export_count: Arc<AtomicUsize>,
286 spans: Arc<Mutex<Vec<SpanData>>>,
287 }
288
289 impl TestExporter {
290 fn new() -> Self {
291 Self {
292 export_count: Arc::new(AtomicUsize::new(0)),
293 spans: Arc::new(Mutex::new(Vec::new())),
294 }
295 }
296
297 #[allow(dead_code)]
298 fn get_spans(&self) -> Vec<SpanData> {
299 self.spans.lock().unwrap().clone()
300 }
301 }
302
303 impl SpanExporter for TestExporter {
304 fn export(
305 &self,
306 spans: Vec<SpanData>,
307 ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
308 {
309 self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
310 self.spans.lock().unwrap().extend(spans);
311 futures_util::future::ready(Ok(()))
312 }
313 }
314
315 fn setup_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
316 let exporter = TestExporter::new();
317 let provider = SdkTracerProvider::builder()
318 .with_simple_exporter(exporter.clone())
319 .with_resource(Resource::builder_empty().build())
320 .build();
321
322 (Arc::new(provider), Arc::new(exporter))
323 }
324
325 fn setup_batch_test_provider() -> (Arc<SdkTracerProvider>, Arc<TestExporter>) {
326 let exporter = TestExporter::new();
327 let provider = SdkTracerProvider::builder()
328 .with_batch_exporter(exporter.clone())
329 .with_resource(Resource::builder_empty().build())
330 .build();
331
332 (Arc::new(provider), Arc::new(exporter))
333 }
334
335 #[derive(Debug)]
336 struct FailingSpanProcessor;
337
338 impl SpanProcessor for FailingSpanProcessor {
339 fn on_start(&self, _span: &mut Span, _cx: &Context) {}
340
341 fn on_end(&self, _span: SpanData) {}
342
343 fn force_flush(&self) -> OTelSdkResult {
344 Err(OTelSdkError::InternalFailure("force flush failed".into()))
345 }
346
347 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
348 Ok(())
349 }
350 }
351
352 #[tokio::test]
353 async fn test_extension_invoke_handling() -> Result<(), Error> {
354 let (provider, _) = setup_test_provider();
355 let (sender, receiver) = unbounded_channel();
356
357 let extension = OtelInternalExtension::new(receiver, provider);
358
359 // Create an INVOKE event
360 let invoke_event = InvokeEvent {
361 deadline_ms: 1000,
362 request_id: "test-id".to_string(),
363 invoked_function_arn: "test-arn".to_string(),
364 tracing: Default::default(),
365 };
366 let event = LambdaEvent {
367 next: NextEvent::Invoke(invoke_event),
368 };
369
370 // Spawn task to handle the event
371 let handle = tokio::spawn(async move { extension.invoke(event).await });
372
373 // Send completion signal
374 sender.send(()).unwrap();
375
376 // Wait for handler to complete
377 let result = handle.await.unwrap();
378 assert!(result.is_ok());
379
380 Ok(())
381 }
382
383 #[tokio::test]
384 async fn test_extension_channel_closed() -> Result<(), Error> {
385 let (provider, _) = setup_test_provider();
386 let (sender, receiver) = unbounded_channel();
387
388 let extension = OtelInternalExtension::new(receiver, provider);
389
390 // Create an INVOKE event
391 let invoke_event = InvokeEvent {
392 deadline_ms: 1000,
393 request_id: "test-id".to_string(),
394 invoked_function_arn: "test-arn".to_string(),
395 tracing: Default::default(),
396 };
397 let event = LambdaEvent {
398 next: NextEvent::Invoke(invoke_event),
399 };
400
401 // Drop sender to close channel
402 drop(sender);
403
404 // Invoke should return error when channel is closed
405 let result = extension.invoke(event).await;
406 assert!(result.is_err());
407
408 Ok(())
409 }
410
411 #[tokio::test]
412 async fn test_extension_ignores_shutdown_events() -> Result<(), Error> {
413 let (provider, _) = setup_test_provider();
414 let (_sender, receiver) = unbounded_channel();
415
416 let extension = OtelInternalExtension::new(receiver, provider);
417
418 let event = LambdaEvent {
419 next: NextEvent::Shutdown(ShutdownEvent {
420 shutdown_reason: "SPINDOWN".to_string(),
421 deadline_ms: 1000,
422 }),
423 };
424
425 let result = extension.invoke(event).await;
426 assert!(result.is_ok());
427
428 Ok(())
429 }
430
431 #[tokio::test]
432 async fn test_extension_force_flush_exports_pending_spans() -> Result<(), Error> {
433 let (provider, exporter) = setup_batch_test_provider();
434 let tracer = provider.tracer("extension-test");
435 let (sender, receiver) = unbounded_channel();
436
437 {
438 let span = tracer.start("pending-span");
439 drop(span);
440 }
441
442 let extension = OtelInternalExtension::new(receiver, provider);
443
444 let event = LambdaEvent {
445 next: NextEvent::Invoke(InvokeEvent {
446 deadline_ms: 1000,
447 request_id: "test-id".to_string(),
448 invoked_function_arn: "test-arn".to_string(),
449 tracing: Default::default(),
450 }),
451 };
452
453 let handle = tokio::spawn(async move { extension.invoke(event).await });
454 sender.send(()).unwrap();
455
456 let result = handle.await.unwrap();
457 assert!(result.is_ok());
458 assert_eq!(exporter.export_count.load(Ordering::SeqCst), 1);
459 assert_eq!(exporter.get_spans().len(), 1);
460
461 Ok(())
462 }
463
464 #[tokio::test]
465 async fn test_extension_invoke_returns_ok_when_force_flush_fails() -> Result<(), Error> {
466 let provider = Arc::new(
467 SdkTracerProvider::builder()
468 .with_span_processor(FailingSpanProcessor)
469 .with_resource(Resource::builder_empty().build())
470 .build(),
471 );
472 let (sender, receiver) = unbounded_channel();
473
474 let extension = OtelInternalExtension::new(receiver, provider);
475
476 let event = LambdaEvent {
477 next: NextEvent::Invoke(InvokeEvent {
478 deadline_ms: 1000,
479 request_id: "test-id".to_string(),
480 invoked_function_arn: "test-arn".to_string(),
481 tracing: Default::default(),
482 }),
483 };
484
485 let handle = tokio::spawn(async move { extension.invoke(event).await });
486 sender.send(()).unwrap();
487
488 let result = handle.await.unwrap();
489 assert!(result.is_ok());
490
491 Ok(())
492 }
493}