lambda_otel_lite/
extension.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
//! Lambda Extension for OpenTelemetry span processing.
//!
//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
//! The extension integrates with AWS Lambda's Extensions API to efficiently manage telemetry data
//! collection and export.
//!
//! # Architecture
//!
//! The extension operates as a background task within the Lambda function process and communicates
//! with both the Lambda Runtime API and the function handler through asynchronous channels:
//!
//! 1. **Extension Registration**: On startup, the extension task registers with the Lambda Extensions API
//!    and subscribes to the appropriate events based on the processing mode.
//!
//! 2. **Handler Communication**: The extension uses a channel-based communication pattern to
//!    coordinate with the function handler for span export timing.
//!
//! 3. **Processing Modes**:
//!    - `Async`: Registers for INVOKE events and exports spans after handler completion
//!      - Spans are queued during handler execution
//!      - Export occurs after response is sent to user
//!      - Best for high-volume telemetry
//!    - `Finalize`: Registers with no events
//!      - Only installs SIGTERM handler
//!      - Lets application code control span export
//!      - Compatible with BatchSpanProcessor
//!
//! 4. **Graceful Shutdown**: The extension implements proper shutdown handling to ensure
//!    no telemetry data is lost when the Lambda environment is terminated.
//!
//! # Error Handling
//!
//! The extension implements robust error handling:
//! - Logs all export errors without failing the function
//! - Implements graceful shutdown on SIGTERM
//! - Handles channel communication failures
//!
//! # Example
//!
//! ```no_run
//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
//! use lambda_extension::Error;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Error> {
//!     // The extension is automatically registered when using init_telemetry
//!     let completion_handler = init_telemetry(TelemetryConfig::default()).await?;
//!     Ok(())
//! }
//! ```

use crate::ProcessorMode;
use lambda_extension::{service_fn, Error, Extension, NextEvent};
use opentelemetry::{otel_debug, otel_error};
use opentelemetry_sdk::trace::TracerProvider;
use std::sync::Arc;
use tokio::{
    signal::unix::{signal, SignalKind},
    sync::{
        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
        Mutex,
    },
};

/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
///
/// This extension is responsible for:
/// - Receiving completion signals from the handler
/// - Flushing spans at the appropriate time
/// - Managing the lifecycle of the tracer provider
///
/// # Thread Safety
///
/// The extension is designed to be thread-safe:
/// - Uses `Arc` for shared ownership of the tracer provider
/// - Implements proper synchronization through `Mutex` for the channel receiver
/// - Safely handles concurrent access from multiple tasks
///
/// # Performance Characteristics
///
/// The extension is optimized for Lambda environments:
/// - Minimizes memory usage through efficient buffering
/// - Uses non-blocking channel communication
/// - Implements backpressure handling to prevent memory exhaustion
///
/// # Error Handling
///
/// The extension handles various error scenarios:
/// - Channel closed: Logs error and continues processing
/// - Export failures: Logs errors without failing the function
/// - Shutdown signals: Ensures final flush of spans
///
/// The extension operates asynchronously to minimize impact on handler latency.
/// It uses a channel-based communication pattern to coordinate with the handler.
pub struct OtelInternalExtension {
    /// Channel receiver to know when the handler is done
    request_done_receiver: Mutex<UnboundedReceiver<()>>,
    /// Reference to the tracer provider for flushing spans
    tracer_provider: Arc<TracerProvider>,
}

impl OtelInternalExtension {
    /// Creates a new OtelInternalExtension.
    ///
    /// # Arguments
    ///
    /// * `request_done_receiver` - Channel receiver for completion signals
    /// * `tracer_provider` - TracerProvider for span management
    pub fn new(
        request_done_receiver: UnboundedReceiver<()>,
        tracer_provider: Arc<TracerProvider>,
    ) -> Self {
        Self {
            request_done_receiver: Mutex::new(request_done_receiver),
            tracer_provider,
        }
    }

    /// Handles extension events and flushes telemetry after each invocation.
    ///
    /// This method implements the core event handling logic for the extension.
    /// It coordinates with the Lambda function handler to ensure spans are
    /// exported at the appropriate time.
    ///
    /// # Operation Flow
    ///
    /// 1. **Event Reception**:
    ///    - Receives Lambda extension events
    ///    - Filters for INVOKE events
    ///    - Ignores other event types
    ///
    /// 2. **Handler Coordination**:
    ///    - Waits for handler completion signal
    ///    - Uses async channel communication
    ///    - Handles channel closure gracefully
    ///
    /// 3. **Span Export**:
    ///    - Forces flush of all pending spans
    ///    - Handles export errors without failing
    ///    - Logs any export failures
    ///
    /// # Error Handling
    ///
    /// The method handles several failure scenarios:
    ///
    /// - **Channel Errors**:
    ///    - Channel closure: Returns error with descriptive message
    ///    - Send/receive failures: Properly propagated
    ///
    /// - **Export Errors**:
    ///    - Individual span export failures are logged
    ///    - Continues processing despite errors
    ///    - Maintains extension stability
    ///
    /// # Performance
    ///
    /// The method is optimized for Lambda environments:
    /// - Uses async/await for efficient execution
    /// - Minimizes blocking operations
    /// - Implements proper error recovery
    ///
    /// # Arguments
    ///
    /// * `event` - The Lambda extension event to handle
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` if the event was processed successfully, or an `Error`
    /// if something went wrong during processing. Note that export errors are
    /// logged but do not cause the method to return an error.
    pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
        if let NextEvent::Invoke(_e) = event.next {
            // Wait for runtime to finish processing event
            self.request_done_receiver
                .lock()
                .await
                .recv()
                .await
                .ok_or_else(|| Error::from("channel closed"))?;
            // Force flush all spans and handle any errors
            for result in self.tracer_provider.force_flush() {
                if let Err(err) = result {
                    otel_error!(
                        name: "OtelInternalExtension.invoke.Error",
                        reason = format!("{:?}", err)
                    );
                }
            }
        }

        Ok(())
    }
}

/// Register an internal extension for handling OpenTelemetry span processing.
///
/// **Note**: This function is called automatically by [`init_telemetry`](crate::init_telemetry)
/// and should not be called directly in your code. Use [`init_telemetry`](crate::init_telemetry)
/// instead to set up telemetry for your Lambda function.
///
/// # Initialization Sequence
///
/// 1. **Channel Setup**:
///    - Creates an unbounded channel for handler-extension communication
///    - Channel sender is returned to the handler
///    - Channel receiver is managed by the extension
///
/// 2. **Extension Registration**:
///    - Creates a new Lambda extension instance
///    - Configures event subscriptions based on processor mode
///    - Registers with the Lambda Extensions API
///    - Starts the extension in a background task
///
/// 3. **Signal Handler Setup**:
///    - Registers a SIGTERM handler for graceful shutdown
///    - Ensures pending spans are flushed before termination
///
/// # Processing Modes
///
/// The extension's behavior varies by processor mode:
///
/// - **Async Mode**:
///    - Registers for INVOKE events
///    - Waits for handler completion signal
///    - Flushes spans after each invocation
///    - Best for scenarios with high span counts
///
/// - **Finalize Mode**:
///    - Registers for no events
///    - Relies on processor's internal timing
///    - Minimal overhead on handler
///
/// # Error Handling
///
/// The function handles these error scenarios:
///
/// - **Registration Failures**:
///    - Extension API errors
///    - Invalid configuration
///    - Network issues
///
/// - **Runtime Errors**:
///    - Extension execution failures
///    - Channel communication errors
///    - Span export failures
///
///
/// # Arguments
///
/// * `tracer_provider` - The TracerProvider to use for span management
/// * `processor_mode` - The mode determining how spans are processed
///
/// # Returns
///
/// Returns a channel sender for signaling completion, or an Error if registration fails.
/// The sender should be used by the handler to signal completion of request processing.
///
/// # Example
///
/// Instead of calling this function directly, use [`init_telemetry`](crate::init_telemetry):
///
/// ```no_run
/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
/// use lambda_extension::Error;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
///     let completion_handler = init_telemetry(TelemetryConfig::default()).await?;
///     Ok(())
/// }
/// ```
pub async fn register_extension(
    tracer_provider: Arc<TracerProvider>,
    processor_mode: ProcessorMode,
) -> Result<UnboundedSender<()>, Error> {
    otel_debug!(
        name: "OtelInternalExtension.register_extension",
        message = "starting registration"
    );
    let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();

    let extension = Arc::new(OtelInternalExtension::new(
        request_done_receiver,
        tracer_provider.clone(),
    ));

    // Register and start the extension
    let mut ext = Extension::new();

    // Only register for INVOKE events in async mode
    if matches!(processor_mode, ProcessorMode::Async) {
        ext = ext.with_events(&["INVOKE"]);
    } else {
        ext = ext.with_events(&[]);
    }

    let registered_extension = ext
        .with_events_processor(service_fn(move |event| {
            let extension = extension.clone();
            async move { extension.invoke(event).await }
        }))
        .with_extension_name("otel-internal")
        .register()
        .await?;

    // Run the extension in the background
    tokio::spawn(async move {
        if let Err(err) = registered_extension.run().await {
            otel_error!(
                name: "OtelInternalExtension.run.Error",
                reason = format!("{:?}", err)
            );
        }
    });

    // Set up signal handler for graceful shutdown
    tokio::spawn(async move {
        let mut sigterm = signal(SignalKind::terminate()).unwrap();

        if sigterm.recv().await.is_some() {
            otel_debug!(
                name: "OtelInternalExtension.SIGTERM",
                message = "SIGTERM received, flushing spans"
            );
            // Direct synchronous flush
            for result in tracer_provider.force_flush() {
                if let Err(err) = result {
                    otel_error!(
                        name: "OtelInternalExtension.SIGTERM.Error",
                        reason = format!("{:?}", err)
                    );
                }
            }
            otel_debug!(
                name: "OtelInternalExtension.SIGTERM",
                message = "Shutdown complete"
            );
            std::process::exit(0);
        }
    });

    Ok(request_done_sender)
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures_util::future::BoxFuture;
    use lambda_extension::{InvokeEvent, LambdaEvent};
    use opentelemetry::trace::TraceResult;
    use opentelemetry_sdk::{
        export::trace::{SpanData, SpanExporter},
        trace::TracerProvider,
        Resource,
    };
    use std::sync::{
        atomic::{AtomicUsize, Ordering},
        Mutex,
    };

    // Test exporter that captures spans
    #[derive(Debug, Default, Clone)]
    struct TestExporter {
        export_count: Arc<AtomicUsize>,
        spans: Arc<Mutex<Vec<SpanData>>>,
    }

    impl TestExporter {
        fn new() -> Self {
            Self {
                export_count: Arc::new(AtomicUsize::new(0)),
                spans: Arc::new(Mutex::new(Vec::new())),
            }
        }

        #[allow(dead_code)]
        fn get_spans(&self) -> Vec<SpanData> {
            self.spans.lock().unwrap().clone()
        }
    }

    impl SpanExporter for TestExporter {
        fn export(&mut self, spans: Vec<SpanData>) -> BoxFuture<'static, TraceResult<()>> {
            self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
            self.spans.lock().unwrap().extend(spans);
            Box::pin(futures_util::future::ready(Ok(())))
        }
    }

    fn setup_test_provider() -> (Arc<TracerProvider>, Arc<TestExporter>) {
        let exporter = TestExporter::new();
        let provider = TracerProvider::builder()
            .with_simple_exporter(exporter.clone())
            .with_resource(Resource::empty())
            .build();

        (Arc::new(provider), Arc::new(exporter))
    }

    #[tokio::test]
    async fn test_extension_invoke_handling() -> Result<(), Error> {
        let (provider, _) = setup_test_provider();
        let (sender, receiver) = unbounded_channel();

        let extension = OtelInternalExtension::new(receiver, provider);

        // Create an INVOKE event
        let invoke_event = InvokeEvent {
            deadline_ms: 1000,
            request_id: "test-id".to_string(),
            invoked_function_arn: "test-arn".to_string(),
            tracing: Default::default(),
        };
        let event = LambdaEvent {
            next: NextEvent::Invoke(invoke_event),
        };

        // Spawn task to handle the event
        let handle = tokio::spawn(async move { extension.invoke(event).await });

        // Send completion signal
        sender.send(()).unwrap();

        // Wait for handler to complete
        let result = handle.await.unwrap();
        assert!(result.is_ok());

        Ok(())
    }

    #[tokio::test]
    async fn test_extension_channel_closed() -> Result<(), Error> {
        let (provider, _) = setup_test_provider();
        let (sender, receiver) = unbounded_channel();

        let extension = OtelInternalExtension::new(receiver, provider);

        // Create an INVOKE event
        let invoke_event = InvokeEvent {
            deadline_ms: 1000,
            request_id: "test-id".to_string(),
            invoked_function_arn: "test-arn".to_string(),
            tracing: Default::default(),
        };
        let event = LambdaEvent {
            next: NextEvent::Invoke(invoke_event),
        };

        // Drop sender to close channel
        drop(sender);

        // Invoke should return error when channel is closed
        let result = extension.invoke(event).await;
        assert!(result.is_err());

        Ok(())
    }
}