lambda_otel_lite/
extension.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
//! Lambda Extension for OpenTelemetry span processing.
//!
//! This module provides an internal Lambda extension that manages the lifecycle of OpenTelemetry spans.
//! The extension is responsible for:
//! - Registering with the Lambda Runtime API
//! - Listening for Lambda invocation events
//! - Flushing spans after each invocation
//! - Handling graceful shutdown
//!
//! # Architecture
//!
//! The extension operates in two modes:
//! - `Async`: Registers for INVOKE events and flushes spans after each invocation
//! - `Finalize`: Registers for no events, letting the processor handle span export
//!
//! # Example
//!
//! ```no_run
//! use lambda_otel_lite::{init_telemetry, TelemetryConfig};
//! use lambda_extension::Error;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Error> {
//!     // The extension is automatically registered when using init_telemetry
//!     let completion_handler = init_telemetry(TelemetryConfig::default()).await?;
//!     Ok(())
//! }
//! ```

use crate::ProcessorMode;
use lambda_extension::{service_fn, Error, Extension, NextEvent};
use opentelemetry::{otel_debug, otel_error};
use opentelemetry_sdk::trace::TracerProvider;
use std::sync::Arc;
use tokio::{
    signal::unix::{signal, SignalKind},
    sync::{
        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
        Mutex,
    },
};

/// Extension that flushes OpenTelemetry spans after each Lambda invocation.
///
/// This extension is responsible for:
/// - Receiving completion signals from the handler
/// - Flushing spans at the appropriate time
/// - Managing the lifecycle of the tracer provider
///
/// The extension operates asynchronously to minimize impact on handler latency.
/// It uses a channel-based communication pattern to coordinate with the handler.
pub struct OtelInternalExtension {
    /// Channel receiver to know when the handler is done
    request_done_receiver: Mutex<UnboundedReceiver<()>>,
    /// Reference to the tracer provider for flushing spans
    tracer_provider: Arc<TracerProvider>,
}

impl OtelInternalExtension {
    /// Creates a new OtelInternalExtension.
    ///
    /// # Arguments
    ///
    /// * `request_done_receiver` - Channel receiver for completion signals
    /// * `tracer_provider` - TracerProvider for span management
    pub fn new(
        request_done_receiver: UnboundedReceiver<()>,
        tracer_provider: Arc<TracerProvider>,
    ) -> Self {
        Self {
            request_done_receiver: Mutex::new(request_done_receiver),
            tracer_provider,
        }
    }

    /// Handles extension events and flushes telemetry after each invocation.
    ///
    /// This method:
    /// 1. Waits for an INVOKE event
    /// 2. Waits for the handler to signal completion
    /// 3. Flushes all pending spans
    ///
    /// # Arguments
    ///
    /// * `event` - The Lambda extension event to handle
    ///
    /// # Returns
    ///
    /// Returns Ok(()) if successful, or an Error if something went wrong
    pub async fn invoke(&self, event: lambda_extension::LambdaEvent) -> Result<(), Error> {
        if let NextEvent::Invoke(_e) = event.next {
            // Wait for runtime to finish processing event
            self.request_done_receiver
                .lock()
                .await
                .recv()
                .await
                .ok_or_else(|| Error::from("channel closed"))?;
            // Force flush all spans and handle any errors
            for result in self.tracer_provider.force_flush() {
                if let Err(err) = result {
                    otel_error!(
                        name: "OtelInternalExtension.invoke.Error",
                        reason = format!("{:?}", err)
                    );
                }
            }
        }

        Ok(())
    }
}

/// Register an internal extension for handling OpenTelemetry span processing.
///
/// **Note**: This function is called automatically by [`init_telemetry`](crate::init_telemetry)
/// and should not be called directly in your code. Use [`init_telemetry`](crate::init_telemetry)
/// instead to set up telemetry for your Lambda function.
///
/// This function:
/// - Creates an internal extension that listens for Lambda events
/// - Sets up a SIGTERM signal handler for graceful shutdown
/// - Returns a sender for signaling handler completion
///
/// The extension's behavior depends on the processor mode:
/// - In `Async` mode: Registers for INVOKE events and flushes after each invocation
/// - In other modes: Registers for no events, letting the processor handle exports
///
/// # Arguments
///
/// * `tracer_provider` - The TracerProvider to use for span management
/// * `processor_mode` - The mode determining how spans are processed
///
/// # Returns
///
/// Returns a channel sender for signaling completion, or an Error if registration fails
///
/// # Example
///
/// Instead of calling this function directly, use [`init_telemetry`](crate::init_telemetry):
///
/// ```no_run
/// use lambda_otel_lite::{init_telemetry, TelemetryConfig};
/// use lambda_extension::Error;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
///     let completion_handler = init_telemetry(TelemetryConfig::default()).await?;
///     Ok(())
/// }
/// ```
pub async fn register_extension(
    tracer_provider: Arc<TracerProvider>,
    processor_mode: ProcessorMode,
) -> Result<UnboundedSender<()>, Error> {
    otel_debug!(
        name: "OtelInternalExtension.register_extension",
        message = "starting registration"
    );
    let (request_done_sender, request_done_receiver) = unbounded_channel::<()>();

    let extension = Arc::new(OtelInternalExtension::new(
        request_done_receiver,
        tracer_provider.clone(),
    ));

    // Register and start the extension
    let mut ext = Extension::new();

    // Only register for INVOKE events in async mode
    if matches!(processor_mode, ProcessorMode::Async) {
        ext = ext.with_events(&["INVOKE"]);
    } else {
        ext = ext.with_events(&[]);
    }

    let registered_extension = ext
        .with_events_processor(service_fn(move |event| {
            let extension = extension.clone();
            async move { extension.invoke(event).await }
        }))
        .with_extension_name("otel-internal")
        .register()
        .await?;

    // Run the extension in the background
    tokio::spawn(async move {
        if let Err(err) = registered_extension.run().await {
            otel_error!(
                name: "OtelInternalExtension.run.Error",
                reason = format!("{:?}", err)
            );
        }
    });

    // Set up signal handler for graceful shutdown
    tokio::spawn(async move {
        let mut sigterm = signal(SignalKind::terminate()).unwrap();

        if sigterm.recv().await.is_some() {
            otel_debug!(
                name: "OtelInternalExtension.SIGTERM",
                message = "SIGTERM received, flushing spans"
            );
            // Direct synchronous flush
            for result in tracer_provider.force_flush() {
                if let Err(err) = result {
                    otel_error!(
                        name: "OtelInternalExtension.SIGTERM.Error",
                        reason = format!("{:?}", err)
                    );
                }
            }
            otel_debug!(
                name: "OtelInternalExtension.SIGTERM",
                message = "Shutdown complete"
            );
            std::process::exit(0);
        }
    });

    Ok(request_done_sender)
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures_util::future::BoxFuture;
    use lambda_extension::{InvokeEvent, LambdaEvent};
    use opentelemetry::trace::TraceResult;
    use opentelemetry_sdk::{
        export::trace::{SpanData, SpanExporter},
        trace::TracerProvider,
        Resource,
    };
    use std::sync::{
        atomic::{AtomicUsize, Ordering},
        Mutex,
    };

    // Test exporter that captures spans
    #[derive(Debug, Default, Clone)]
    struct TestExporter {
        export_count: Arc<AtomicUsize>,
        spans: Arc<Mutex<Vec<SpanData>>>,
    }

    impl TestExporter {
        fn new() -> Self {
            Self {
                export_count: Arc::new(AtomicUsize::new(0)),
                spans: Arc::new(Mutex::new(Vec::new())),
            }
        }

        #[allow(dead_code)]
        fn get_spans(&self) -> Vec<SpanData> {
            self.spans.lock().unwrap().clone()
        }
    }

    impl SpanExporter for TestExporter {
        fn export(&mut self, spans: Vec<SpanData>) -> BoxFuture<'static, TraceResult<()>> {
            self.export_count.fetch_add(spans.len(), Ordering::SeqCst);
            self.spans.lock().unwrap().extend(spans);
            Box::pin(futures_util::future::ready(Ok(())))
        }
    }

    fn setup_test_provider() -> (Arc<TracerProvider>, Arc<TestExporter>) {
        let exporter = TestExporter::new();
        let provider = TracerProvider::builder()
            .with_simple_exporter(exporter.clone())
            .with_resource(Resource::empty())
            .build();

        (Arc::new(provider), Arc::new(exporter))
    }

    #[tokio::test]
    async fn test_extension_invoke_handling() -> Result<(), Error> {
        let (provider, _) = setup_test_provider();
        let (sender, receiver) = unbounded_channel();

        let extension = OtelInternalExtension::new(receiver, provider);

        // Create an INVOKE event
        let invoke_event = InvokeEvent {
            deadline_ms: 1000,
            request_id: "test-id".to_string(),
            invoked_function_arn: "test-arn".to_string(),
            tracing: Default::default(),
        };
        let event = LambdaEvent {
            next: NextEvent::Invoke(invoke_event),
        };

        // Spawn task to handle the event
        let handle = tokio::spawn(async move { extension.invoke(event).await });

        // Send completion signal
        sender.send(()).unwrap();

        // Wait for handler to complete
        let result = handle.await.unwrap();
        assert!(result.is_ok());

        Ok(())
    }

    #[tokio::test]
    async fn test_extension_channel_closed() -> Result<(), Error> {
        let (provider, _) = setup_test_provider();
        let (sender, receiver) = unbounded_channel();

        let extension = OtelInternalExtension::new(receiver, provider);

        // Create an INVOKE event
        let invoke_event = InvokeEvent {
            deadline_ms: 1000,
            request_id: "test-id".to_string(),
            invoked_function_arn: "test-arn".to_string(),
            tracing: Default::default(),
        };
        let event = LambdaEvent {
            next: NextEvent::Invoke(invoke_event),
        };

        // Drop sender to close channel
        drop(sender);

        // Invoke should return error when channel is closed
        let result = extension.invoke(event).await;
        assert!(result.is_err());

        Ok(())
    }
}