opentelemetry-lambda-extension 0.1.7

AWS Lambda extension for collecting and exporting OpenTelemetry signals
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
//! Tower services for Lambda extension lifecycle and telemetry processing.
//!
//! This module provides Tower `Service` implementations that integrate with the
//! `lambda_extension` crate for proper lifecycle management. Using the official
//! Lambda extension library ensures correct handling of SHUTDOWN events and
//! telemetry delivery timing.
//!
//! The services use a shared `RwLock` to coordinate shutdown with telemetry
//! processing. The `TelemetryService` holds a read lock while processing events,
//! and the `EventsService` acquires a write lock on SHUTDOWN before performing
//! the final flush. This ensures all in-flight telemetry is processed before
//! shutdown completes.

use crate::aggregator::SignalAggregator;
use crate::config::Config;
use crate::conversion::{MetricsConverter, TelemetryProcessor};
use crate::exporter::OtlpExporter;
use crate::flush::FlushManager;
use crate::receiver::Signal;
use lambda_extension::{Error, LambdaEvent, LambdaTelemetry, LambdaTelemetryRecord, NextEvent};
use opentelemetry_proto::tonic::resource::v1::Resource;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::{Mutex, RwLock, oneshot};
use tower::Service;

/// Shared state for extension services.
///
/// This holds the components that need to be shared between the events
/// processor and telemetry processor services.
pub struct ExtensionState {
    pub(crate) aggregator: Arc<SignalAggregator>,
    pub(crate) exporter: Arc<OtlpExporter>,
    pub(crate) flush_manager: Arc<Mutex<FlushManager>>,
    pub(crate) telemetry_processor: Arc<Mutex<TelemetryProcessor>>,
    pub(crate) metrics_converter: MetricsConverter,
    #[allow(dead_code)]
    pub(crate) config: Config,
    /// Lock to coordinate shutdown with telemetry processing.
    ///
    /// `TelemetryService` acquires a read lock while processing events.
    /// `EventsService` acquires a write lock on SHUTDOWN before final flush.
    /// This ensures all in-flight telemetry is processed before shutdown.
    processing_lock: RwLock<()>,
    /// Channel to signal that shutdown processing is complete.
    ///
    /// The sender is stored in a Mutex so it can be taken when shutdown occurs.
    /// The receiver should be used with `tokio::select!` to exit the event loop.
    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
}

impl ExtensionState {
    /// Creates new extension state with the given configuration and resource.
    ///
    /// Returns the state and a receiver that will be signalled when shutdown is complete.
    /// Use the receiver with `tokio::select!` to exit the event loop gracefully.
    pub fn new(
        config: Config,
        resource: Resource,
    ) -> Result<(Self, oneshot::Receiver<()>), crate::exporter::ExportError> {
        let exporter = OtlpExporter::new(config.exporter.clone())?;
        let (shutdown_tx, shutdown_rx) = oneshot::channel();

        let state = Self {
            aggregator: Arc::new(SignalAggregator::new(config.flush.clone())),
            exporter: Arc::new(exporter),
            flush_manager: Arc::new(Mutex::new(FlushManager::new(config.flush.clone()))),
            telemetry_processor: Arc::new(Mutex::new(TelemetryProcessor::new(resource.clone()))),
            metrics_converter: MetricsConverter::new(resource),
            config,
            processing_lock: RwLock::new(()),
            shutdown_tx: Mutex::new(Some(shutdown_tx)),
        };

        Ok((state, shutdown_rx))
    }

    /// Signals that shutdown processing is complete.
    ///
    /// This should be called after `final_flush()` to allow the event loop to exit.
    pub async fn signal_shutdown_complete(&self) {
        if let Some(tx) = self.shutdown_tx.lock().await.take() {
            let _ = tx.send(());
            tracing::debug!("Shutdown complete signal sent");
        }
    }

    /// Performs a flush of all pending signals to the exporter.
    pub async fn flush_all(&self) {
        let batches = self.aggregator.get_all_batches().await;
        let mut flush_manager = self.flush_manager.lock().await;

        for batch in batches {
            let result = self.exporter.export(batch).await;
            match result {
                crate::exporter::ExportResult::Success => {
                    flush_manager.record_flush();
                }
                crate::exporter::ExportResult::Fallback
                | crate::exporter::ExportResult::Skipped => {
                    flush_manager.record_flush_timeout();
                }
            }
        }
    }

    /// Waits for any in-progress telemetry processing to complete.
    ///
    /// This acquires a write lock on the processing lock, which blocks until
    /// all read locks (held by `TelemetryService` during processing) are released.
    /// The timeout prevents indefinite blocking if something goes wrong.
    pub async fn wait_for_processing_complete(&self, timeout: Duration) {
        let result = tokio::time::timeout(timeout, self.processing_lock.write()).await;
        if result.is_err() {
            tracing::warn!(
                timeout_ms = timeout.as_millis(),
                "Timed out waiting for telemetry processing to complete"
            );
        }
        // Lock is immediately dropped, we just needed to wait for it
    }

    /// Performs a final flush draining all signals.
    pub async fn final_flush(&self) {
        tracing::info!("Performing final flush");

        let batches = self.aggregator.drain_all().await;
        let count = batches.len();

        for batch in batches {
            let result = self.exporter.export(batch).await;
            tracing::debug!(?result, "Final flush batch");
        }

        let dropped = self.aggregator.dropped_count().await;
        if dropped > 0 {
            tracing::warn!(
                dropped = dropped,
                "Signals were dropped due to queue limits"
            );
        }

        tracing::info!(batches = count, dropped = dropped, "Final flush complete");
    }
}

/// Tower service for processing Lambda extension lifecycle events.
///
/// This service handles INVOKE and SHUTDOWN events from the Extensions API.
/// On SHUTDOWN, it performs a final flush of all buffered telemetry.
pub struct EventsService {
    state: Arc<ExtensionState>,
}

impl EventsService {
    /// Creates a new events service with the given shared state.
    pub fn new(state: Arc<ExtensionState>) -> Self {
        Self { state }
    }
}

impl Service<LambdaEvent> for EventsService {
    type Response = ();
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, event: LambdaEvent) -> Self::Future {
        let state = Arc::clone(&self.state);

        Box::pin(async move {
            match event.next {
                NextEvent::Invoke(invoke) => {
                    tracing::debug!(request_id = %invoke.request_id, "Received INVOKE event");

                    // Record invocation for adaptive flush pattern detection
                    {
                        let mut flush_manager = state.flush_manager.lock().await;
                        flush_manager.record_invocation();
                    }

                    // Check if we should flush based on pending count
                    let pending = state.aggregator.pending_count().await;
                    let should_flush = {
                        let flush_manager = state.flush_manager.lock().await;
                        flush_manager
                            .should_flush(Some(invoke.deadline_ms as i64), pending, false)
                            .is_some()
                    };

                    if should_flush {
                        tracing::debug!(pending, "Flushing during invocation");
                        state.flush_all().await;
                    }
                }
                NextEvent::Shutdown(shutdown) => {
                    tracing::info!(reason = ?shutdown.shutdown_reason, "Received SHUTDOWN event");

                    // Wait for any in-flight telemetry processing to complete
                    // This ensures we don't flush before the last batch of telemetry
                    // (e.g., platform.report) has been processed and added to the aggregator
                    state
                        .wait_for_processing_complete(Duration::from_millis(500))
                        .await;

                    // Emit shutdown metric
                    let shutdown_reason = format!("{:?}", shutdown.shutdown_reason);
                    let shutdown_metric = state
                        .metrics_converter
                        .create_shutdown_metric(&shutdown_reason);
                    state.aggregator.add(Signal::Metrics(shutdown_metric)).await;

                    // Final flush of all signals
                    state.final_flush().await;

                    // Signal shutdown complete to exit the event loop gracefully
                    state.signal_shutdown_complete().await;
                }
            }

            Ok(())
        })
    }
}

/// Tower service for processing Lambda Telemetry API events.
///
/// This service receives platform telemetry events and converts them to
/// OTLP metrics and traces, adding them to the aggregator for export.
#[derive(Clone)]
pub struct TelemetryService {
    state: Arc<ExtensionState>,
}

impl TelemetryService {
    /// Creates a new telemetry service with the given shared state.
    pub fn new(state: Arc<ExtensionState>) -> Self {
        Self { state }
    }
}

impl Service<Vec<LambdaTelemetry>> for TelemetryService {
    type Response = ();
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, events: Vec<LambdaTelemetry>) -> Self::Future {
        let state = Arc::clone(&self.state);

        Box::pin(async move {
            // Acquire read lock to prevent shutdown from flushing while we're processing
            let _guard = state.processing_lock.read().await;

            tracing::debug!(count = events.len(), "Processing telemetry events");

            // Check if any event is a RuntimeDone (signals end of invocation)
            let has_runtime_done = events
                .iter()
                .any(|e| matches!(e.record, LambdaTelemetryRecord::PlatformRuntimeDone { .. }));

            // Convert lambda_extension telemetry events to our internal format
            let internal_events = convert_telemetry_events(events);

            // Process through our TelemetryProcessor
            let (metrics, _traces) = {
                let mut processor = state.telemetry_processor.lock().await;
                processor.process_events(internal_events)
            };

            // Add metrics to aggregator
            for metric in metrics {
                state
                    .aggregator
                    .add(Signal::Metrics(
                        opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest {
                            resource_metrics: metric.resource_metrics,
                        },
                    ))
                    .await;
            }

            // If we received RuntimeDone, this is the post-invoke phase.
            // Check if we should flush based on the strategy (e.g., FlushStrategy::End).
            if has_runtime_done {
                let pending = state.aggregator.pending_count().await;
                let should_flush = {
                    let flush_manager = state.flush_manager.lock().await;
                    flush_manager
                        .should_flush_on_invocation_end(pending)
                        .is_some()
                };

                if should_flush {
                    tracing::debug!(pending, "Flushing at end of invocation (post-invoke phase)");
                    state.flush_all().await;
                }
            }

            Ok(())
        })
    }
}

/// Converts lambda_extension telemetry events to our internal format.
fn convert_telemetry_events(events: Vec<LambdaTelemetry>) -> Vec<crate::telemetry::TelemetryEvent> {
    use crate::telemetry::{
        ReportMetrics, ReportRecord, RuntimeDoneRecord, RuntimeMetrics, SpanRecord, StartRecord,
        TelemetryEvent, TracingRecord,
    };

    events
        .into_iter()
        .filter_map(|event| {
            let time = event.time.to_rfc3339();

            match event.record {
                LambdaTelemetryRecord::PlatformStart {
                    request_id,
                    version,
                    tracing,
                } => Some(TelemetryEvent::Start {
                    time,
                    record: StartRecord {
                        request_id,
                        version,
                        tracing: tracing.map(|t| TracingRecord {
                            span_id: None,
                            trace_type: Some(format!("{:?}", t.r#type)),
                            value: Some(t.value),
                        }),
                    },
                }),

                LambdaTelemetryRecord::PlatformRuntimeDone {
                    request_id,
                    status,
                    error_type: _,
                    metrics,
                    spans,
                    tracing,
                } => Some(TelemetryEvent::RuntimeDone {
                    time,
                    record: RuntimeDoneRecord {
                        request_id,
                        status: format!("{:?}", status),
                        metrics: metrics.map(|m| RuntimeMetrics {
                            duration_ms: m.duration_ms,
                            produced_bytes: m.produced_bytes,
                        }),
                        spans: spans
                            .into_iter()
                            .map(|s| SpanRecord {
                                name: s.name,
                                start: s.start.timestamp_millis() as f64,
                                duration_ms: s.duration_ms,
                            })
                            .collect(),
                        tracing: tracing.map(|t| TracingRecord {
                            span_id: None,
                            trace_type: Some(format!("{:?}", t.r#type)),
                            value: Some(t.value),
                        }),
                    },
                }),

                LambdaTelemetryRecord::PlatformReport {
                    request_id,
                    status,
                    error_type: _,
                    metrics,
                    spans: _,
                    tracing,
                } => Some(TelemetryEvent::Report {
                    time,
                    record: ReportRecord {
                        request_id,
                        status: format!("{:?}", status),
                        metrics: ReportMetrics {
                            duration_ms: metrics.duration_ms,
                            billed_duration_ms: metrics.billed_duration_ms,
                            memory_size_mb: metrics.memory_size_mb,
                            max_memory_used_mb: metrics.max_memory_used_mb,
                            init_duration_ms: metrics.init_duration_ms,
                            restore_duration_ms: metrics.restore_duration_ms,
                        },
                        tracing: tracing.map(|t| TracingRecord {
                            span_id: None,
                            trace_type: Some(format!("{:?}", t.r#type)),
                            value: Some(t.value),
                        }),
                    },
                }),

                // Log other events but don't convert them
                _ => {
                    tracing::trace!(?event, "Ignoring non-platform telemetry event");
                    None
                }
            }
        })
        .collect()
}

#[cfg(test)]
mod tests {
    use super::*;
    use lambda_extension::LambdaTelemetry;

    #[test]
    fn test_extension_state_creation() {
        let config = Config::default();
        let sdk_resource = crate::resource::detect_resource();
        let proto_resource = crate::resource::to_proto_resource(&sdk_resource);

        // This will fail if exporter can't be created, but that's fine for unit tests
        let result = ExtensionState::new(config, proto_resource);
        assert!(result.is_ok());
        let (_state, _shutdown_rx) = result.unwrap();
    }

    #[test]
    fn test_simulator_telemetry_format_deserialization() {
        // This is the exact format our simulator sends
        let json = r#"[{"time":"2025-11-30T22:29:09.581655Z","type":"platform.start","record":{"requestId":"38432cb4-cb8b-4162-982d-923d3c3f6d10","tracing":{"type":"X-Amzn-Trace-Id","value":"Root=1-692cc535-0338d3516cb745b7b41f878e"},"version":"$LATEST"}}]"#;

        let result: Result<Vec<LambdaTelemetry>, _> = serde_json::from_str(json);
        match &result {
            Ok(events) => println!("Success: {:?}", events),
            Err(e) => println!("Error: {}", e),
        }
        assert!(result.is_ok(), "Failed to deserialize: {:?}", result.err());
    }

    #[test]
    fn test_full_simulator_batch_deserialization() {
        // Full batch similar to what the test produces
        let json = r#"[{"time":"2025-11-30T22:35:51.565094Z","type":"platform.start","record":{"requestId":"0c90003a-8970-474c-b696-fca5336ef4f5","tracing":{"type":"X-Amzn-Trace-Id","value":"Root=1-692cc6c7-f2ce8d3383524609b99c07a9"},"version":"$LATEST"}},{"time":"2025-11-30T22:35:51.565857Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","phase":"init","status":"success"}},{"time":"2025-11-30T22:35:51.565857Z","type":"platform.initReport","record":{"initializationType":"on-demand","phase":"init","status":"success","metrics":{"durationMs":565.4}}},{"time":"2025-11-30T22:35:51.578834Z","type":"platform.runtimeDone","record":{"requestId":"0c90003a-8970-474c-b696-fca5336ef4f5","status":"success","metrics":{"durationMs":13.74},"spans":[],"tracing":{"type":"X-Amzn-Trace-Id","value":"Root=1-692cc6c7-f2ce8d3383524609b99c07a9"}}},{"time":"2025-11-30T22:35:51.578909Z","type":"platform.report","record":{"requestId":"0c90003a-8970-474c-b696-fca5336ef4f5","status":"success","metrics":{"durationMs":13.74,"billedDurationMs":100,"memorySizeMB":128,"maxMemoryUsedMB":64},"tracing":{"type":"X-Amzn-Trace-Id","value":"Root=1-692cc6c7-f2ce8d3383524609b99c07a9"}}}]"#;

        let result: Result<Vec<LambdaTelemetry>, _> = serde_json::from_str(json);
        match &result {
            Ok(events) => {
                println!("Success: {} events parsed", events.len());
                for (i, event) in events.iter().enumerate() {
                    println!("  Event {}: {:?}", i, event);
                }
            }
            Err(e) => println!("Error: {}", e),
        }
        assert!(result.is_ok(), "Failed to deserialize: {:?}", result.err());
    }
}