opentelemetry_lambda_extension/
telemetry.rs

1//! Telemetry API client and listener.
2//!
3//! This module provides functionality for subscribing to the Lambda Telemetry API
4//! and receiving platform events via HTTP push.
5
6use axum::{
7    Router, body::Bytes, extract::State, http::StatusCode, response::IntoResponse, routing::post,
8};
9use serde::{Deserialize, Serialize};
10use std::net::SocketAddr;
11use std::sync::Arc;
12use tokio::net::TcpListener;
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16/// Types of telemetry events from the Telemetry API.
17#[non_exhaustive]
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "lowercase")]
20pub enum TelemetryType {
21    /// Platform events (start, end, report, fault, extension).
22    Platform,
23    /// Function logs from stdout/stderr.
24    Function,
25    /// Extension logs.
26    Extension,
27}
28
29/// Buffering configuration for Telemetry API subscription.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct BufferingConfig {
33    /// Maximum number of events to buffer before sending.
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub max_items: Option<u32>,
36    /// Maximum size in bytes to buffer before sending.
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub max_bytes: Option<u32>,
39    /// Maximum time in milliseconds to buffer before sending.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub timeout_ms: Option<u32>,
42}
43
44impl Default for BufferingConfig {
45    fn default() -> Self {
46        Self {
47            max_items: Some(1000),
48            max_bytes: Some(256 * 1024),
49            timeout_ms: Some(25),
50        }
51    }
52}
53
54/// Destination configuration for Telemetry API subscription.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct DestinationConfig {
58    /// Protocol to use (HTTP only supported).
59    pub protocol: String,
60    /// URI to send events to.
61    #[serde(rename = "URI")]
62    pub uri: String,
63}
64
65/// Subscription request for the Telemetry API.
66#[derive(Debug, Clone, Serialize)]
67#[serde(rename_all = "camelCase")]
68pub struct TelemetrySubscription {
69    /// Schema version.
70    pub schema_version: String,
71    /// Types of telemetry to subscribe to.
72    pub types: Vec<TelemetryType>,
73    /// Buffering configuration.
74    pub buffering: BufferingConfig,
75    /// Destination for events.
76    pub destination: DestinationConfig,
77}
78
79impl TelemetrySubscription {
80    /// Creates a new subscription request for platform events.
81    pub fn platform_events(listener_uri: impl Into<String>) -> Self {
82        Self {
83            schema_version: "2022-12-13".to_string(),
84            types: vec![TelemetryType::Platform],
85            buffering: BufferingConfig::default(),
86            destination: DestinationConfig {
87                protocol: "HTTP".to_string(),
88                uri: listener_uri.into(),
89            },
90        }
91    }
92
93    /// Creates a subscription for all event types.
94    pub fn all_events(listener_uri: impl Into<String>) -> Self {
95        Self {
96            schema_version: "2022-12-13".to_string(),
97            types: vec![
98                TelemetryType::Platform,
99                TelemetryType::Function,
100                TelemetryType::Extension,
101            ],
102            buffering: BufferingConfig::default(),
103            destination: DestinationConfig {
104                protocol: "HTTP".to_string(),
105                uri: listener_uri.into(),
106            },
107        }
108    }
109
110    /// Sets custom buffering configuration.
111    pub fn with_buffering(mut self, config: BufferingConfig) -> Self {
112        self.buffering = config;
113        self
114    }
115}
116
117/// Platform telemetry event from Lambda.
118#[non_exhaustive]
119#[derive(Debug, Clone, Deserialize)]
120#[serde(tag = "type", rename_all = "lowercase")]
121pub enum TelemetryEvent {
122    /// Platform initialization start.
123    #[serde(rename = "platform.initStart")]
124    InitStart {
125        /// Event time in ISO 8601 format.
126        time: String,
127        /// Event record.
128        record: InitStartRecord,
129    },
130    /// Platform initialization complete (runtime ready).
131    #[serde(rename = "platform.initRuntimeDone")]
132    InitRuntimeDone {
133        /// Event time in ISO 8601 format.
134        time: String,
135        /// Event record.
136        record: InitRuntimeDoneRecord,
137    },
138    /// Platform invocation start.
139    #[serde(rename = "platform.start")]
140    Start {
141        /// Event time in ISO 8601 format.
142        time: String,
143        /// Event record.
144        record: StartRecord,
145    },
146    /// Platform runtime invocation complete.
147    #[serde(rename = "platform.runtimeDone")]
148    RuntimeDone {
149        /// Event time in ISO 8601 format.
150        time: String,
151        /// Event record.
152        record: RuntimeDoneRecord,
153    },
154    /// Platform invocation report.
155    #[serde(rename = "platform.report")]
156    Report {
157        /// Event time in ISO 8601 format.
158        time: String,
159        /// Event record.
160        record: ReportRecord,
161    },
162    /// Platform fault.
163    #[serde(rename = "platform.fault")]
164    Fault {
165        /// Event time in ISO 8601 format.
166        time: String,
167        /// Event record.
168        record: FaultRecord,
169    },
170    /// Extension event.
171    #[serde(rename = "platform.extension")]
172    Extension {
173        /// Event time in ISO 8601 format.
174        time: String,
175        /// Event record.
176        record: ExtensionRecord,
177    },
178    /// Function log line.
179    #[serde(rename = "function")]
180    Function {
181        /// Event time in ISO 8601 format.
182        time: String,
183        /// Log record.
184        record: String,
185    },
186    /// Extension log line.
187    #[serde(rename = "extension")]
188    ExtensionLog {
189        /// Event time in ISO 8601 format.
190        time: String,
191        /// Log record.
192        record: String,
193    },
194}
195
196/// Record for platform.initStart event.
197#[derive(Debug, Clone, Deserialize)]
198#[serde(rename_all = "camelCase")]
199pub struct InitStartRecord {
200    /// Initialization type (on-demand, provisioned-concurrency).
201    pub initialization_type: String,
202    /// Phase of initialization.
203    #[serde(default)]
204    pub phase: String,
205    /// Runtime version.
206    #[serde(default)]
207    pub runtime_version: Option<String>,
208    /// Runtime version ARN.
209    #[serde(default)]
210    pub runtime_version_arn: Option<String>,
211}
212
213/// Record for platform.initRuntimeDone event.
214#[derive(Debug, Clone, Deserialize)]
215#[serde(rename_all = "camelCase")]
216pub struct InitRuntimeDoneRecord {
217    /// Initialization type.
218    pub initialization_type: String,
219    /// Status of initialization.
220    #[serde(default)]
221    pub status: String,
222    /// Phase of initialization.
223    #[serde(default)]
224    pub phase: String,
225}
226
227/// Record for platform.start event.
228#[derive(Debug, Clone, Deserialize)]
229#[serde(rename_all = "camelCase")]
230pub struct StartRecord {
231    /// Request ID for this invocation.
232    pub request_id: String,
233    /// Version of the function.
234    #[serde(default)]
235    pub version: Option<String>,
236    /// Tracing information.
237    #[serde(default)]
238    pub tracing: Option<TracingRecord>,
239}
240
241/// Tracing information in platform events.
242#[derive(Debug, Clone, Deserialize)]
243#[serde(rename_all = "camelCase")]
244pub struct TracingRecord {
245    /// Span ID.
246    #[serde(default)]
247    pub span_id: Option<String>,
248    /// Trace type.
249    #[serde(rename = "type", default)]
250    pub trace_type: Option<String>,
251    /// Trace value (X-Ray header).
252    #[serde(default)]
253    pub value: Option<String>,
254}
255
256/// Record for platform.runtimeDone event.
257#[derive(Debug, Clone, Deserialize)]
258#[serde(rename_all = "camelCase")]
259pub struct RuntimeDoneRecord {
260    /// Request ID for this invocation.
261    pub request_id: String,
262    /// Status of the invocation.
263    pub status: String,
264    /// Metrics for this invocation.
265    #[serde(default)]
266    pub metrics: Option<RuntimeMetrics>,
267    /// Tracing information.
268    #[serde(default)]
269    pub tracing: Option<TracingRecord>,
270    /// Spans produced during invocation.
271    #[serde(default)]
272    pub spans: Vec<SpanRecord>,
273}
274
275/// Runtime metrics from platform.runtimeDone event.
276#[derive(Debug, Clone, Deserialize)]
277#[serde(rename_all = "camelCase")]
278pub struct RuntimeMetrics {
279    /// Duration in milliseconds.
280    pub duration_ms: f64,
281    /// Produced bytes.
282    #[serde(default)]
283    pub produced_bytes: Option<u64>,
284}
285
286/// Span record from platform events.
287#[derive(Debug, Clone, Deserialize)]
288#[serde(rename_all = "camelCase")]
289pub struct SpanRecord {
290    /// Name of the span.
291    pub name: String,
292    /// Start time in milliseconds.
293    pub start: f64,
294    /// Duration in milliseconds.
295    pub duration_ms: f64,
296}
297
298/// Record for platform.report event.
299#[derive(Debug, Clone, Deserialize)]
300#[serde(rename_all = "camelCase")]
301pub struct ReportRecord {
302    /// Request ID for this invocation.
303    pub request_id: String,
304    /// Status of the invocation.
305    pub status: String,
306    /// Metrics for this invocation.
307    pub metrics: ReportMetrics,
308    /// Tracing information.
309    #[serde(default)]
310    pub tracing: Option<TracingRecord>,
311}
312
313/// Metrics from platform.report event.
314#[derive(Debug, Clone, Deserialize)]
315#[serde(rename_all = "camelCase")]
316pub struct ReportMetrics {
317    /// Duration in milliseconds.
318    pub duration_ms: f64,
319    /// Billed duration in milliseconds.
320    pub billed_duration_ms: u64,
321    /// Memory size in MB.
322    #[serde(rename = "memorySizeMB")]
323    pub memory_size_mb: u64,
324    /// Max memory used in MB.
325    #[serde(rename = "maxMemoryUsedMB")]
326    pub max_memory_used_mb: u64,
327    /// Init duration in milliseconds (cold start only).
328    #[serde(default)]
329    pub init_duration_ms: Option<f64>,
330    /// Restore duration in milliseconds (SnapStart only).
331    #[serde(default)]
332    pub restore_duration_ms: Option<f64>,
333}
334
335/// Record for platform.fault event.
336#[derive(Debug, Clone, Deserialize)]
337#[serde(rename_all = "camelCase")]
338pub struct FaultRecord {
339    /// Request ID for this invocation.
340    #[serde(default)]
341    pub request_id: Option<String>,
342    /// Fault message.
343    #[serde(default)]
344    pub fault_message: Option<String>,
345}
346
347/// Record for platform.extension event.
348#[derive(Debug, Clone, Deserialize)]
349#[serde(rename_all = "camelCase")]
350pub struct ExtensionRecord {
351    /// Name of the extension.
352    pub name: String,
353    /// State of the extension.
354    pub state: String,
355    /// Events the extension subscribes to.
356    #[serde(default)]
357    pub events: Vec<String>,
358}
359
360/// Error from Telemetry API operations.
361#[non_exhaustive]
362#[derive(Debug)]
363pub enum TelemetryError {
364    /// Failed to parse telemetry event.
365    Parse(String),
366}
367
368impl std::fmt::Display for TelemetryError {
369    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370        match self {
371            TelemetryError::Parse(msg) => write!(f, "Parse error: {}", msg),
372        }
373    }
374}
375
376impl std::error::Error for TelemetryError {}
377
378/// HTTP listener for receiving Telemetry API events.
379pub struct TelemetryListener {
380    port: u16,
381    event_tx: mpsc::Sender<Vec<TelemetryEvent>>,
382    cancel_token: CancellationToken,
383}
384
385impl TelemetryListener {
386    /// Creates a new Telemetry API listener.
387    ///
388    /// # Arguments
389    ///
390    /// * `port` - Port to listen on
391    /// * `event_tx` - Channel to send received events
392    /// * `cancel_token` - Token for graceful shutdown
393    pub fn new(
394        port: u16,
395        event_tx: mpsc::Sender<Vec<TelemetryEvent>>,
396        cancel_token: CancellationToken,
397    ) -> Self {
398        Self {
399            port,
400            event_tx,
401            cancel_token,
402        }
403    }
404
405    /// Returns the listener URI for use in subscription requests.
406    ///
407    /// In a real Lambda environment, this would use `sandbox.localdomain`
408    /// which resolves to the execution environment. For local testing,
409    /// we use `127.0.0.1` to ensure routable connectivity.
410    pub fn listener_uri(&self) -> String {
411        // Check if we're running in Lambda (AWS_LAMBDA_FUNCTION_NAME is set)
412        if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() {
413            format!("http://sandbox.localdomain:{}", self.port)
414        } else {
415            format!("http://127.0.0.1:{}", self.port)
416        }
417    }
418
419    /// Starts the HTTP listener.
420    ///
421    /// This method blocks until the cancellation token is triggered.
422    pub async fn run(self) -> Result<(), std::io::Error> {
423        let state = ListenerState {
424            event_tx: self.event_tx,
425        };
426
427        let app = Router::new()
428            .route("/", post(handle_telemetry))
429            .with_state(Arc::new(state));
430
431        let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
432        let listener = TcpListener::bind(addr).await?;
433
434        tracing::info!(port = self.port, "Telemetry API listener started");
435
436        axum::serve(listener, app)
437            .with_graceful_shutdown(self.cancel_token.cancelled_owned())
438            .await
439    }
440}
441
442struct ListenerState {
443    event_tx: mpsc::Sender<Vec<TelemetryEvent>>,
444}
445
446async fn handle_telemetry(
447    State(state): State<Arc<ListenerState>>,
448    body: Bytes,
449) -> impl IntoResponse {
450    let events: Vec<TelemetryEvent> = match serde_json::from_slice(&body) {
451        Ok(events) => events,
452        Err(e) => {
453            tracing::warn!(error = %e, "Failed to parse telemetry events");
454            return StatusCode::BAD_REQUEST;
455        }
456    };
457
458    tracing::debug!(count = events.len(), "Received telemetry events");
459
460    match state.event_tx.try_send(events) {
461        Ok(()) => StatusCode::OK,
462        Err(mpsc::error::TrySendError::Full(_)) => {
463            tracing::warn!("Telemetry event channel full");
464            StatusCode::SERVICE_UNAVAILABLE
465        }
466        Err(mpsc::error::TrySendError::Closed(_)) => {
467            tracing::error!("Telemetry event channel closed");
468            StatusCode::INTERNAL_SERVER_ERROR
469        }
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    #[test]
478    fn test_telemetry_subscription_platform() {
479        let sub = TelemetrySubscription::platform_events("http://localhost:9999");
480
481        assert_eq!(sub.schema_version, "2022-12-13");
482        assert_eq!(sub.types, vec![TelemetryType::Platform]);
483        assert_eq!(sub.destination.uri, "http://localhost:9999");
484    }
485
486    #[test]
487    fn test_telemetry_subscription_all() {
488        let sub = TelemetrySubscription::all_events("http://localhost:9999");
489
490        assert_eq!(sub.types.len(), 3);
491        assert!(sub.types.contains(&TelemetryType::Platform));
492        assert!(sub.types.contains(&TelemetryType::Function));
493        assert!(sub.types.contains(&TelemetryType::Extension));
494    }
495
496    #[test]
497    fn test_parse_start_event() {
498        let json = r#"[{
499            "type": "platform.start",
500            "time": "2022-10-12T00:00:00.000Z",
501            "record": {
502                "requestId": "test-request-id",
503                "version": "$LATEST"
504            }
505        }]"#;
506
507        let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
508        assert_eq!(events.len(), 1);
509
510        match &events[0] {
511            TelemetryEvent::Start { record, .. } => {
512                assert_eq!(record.request_id, "test-request-id");
513                assert_eq!(record.version, Some("$LATEST".to_string()));
514            }
515            _ => panic!("Expected Start event"),
516        }
517    }
518
519    #[test]
520    fn test_parse_report_event() {
521        let json = r#"[{
522            "type": "platform.report",
523            "time": "2022-10-12T00:00:00.000Z",
524            "record": {
525                "requestId": "test-request-id",
526                "status": "success",
527                "metrics": {
528                    "durationMs": 100.5,
529                    "billedDurationMs": 200,
530                    "memorySizeMB": 128,
531                    "maxMemoryUsedMB": 64
532                }
533            }
534        }]"#;
535
536        let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
537        assert_eq!(events.len(), 1);
538
539        match &events[0] {
540            TelemetryEvent::Report { record, .. } => {
541                assert_eq!(record.request_id, "test-request-id");
542                assert_eq!(record.status, "success");
543                assert_eq!(record.metrics.duration_ms, 100.5);
544                assert_eq!(record.metrics.billed_duration_ms, 200);
545            }
546            _ => panic!("Expected Report event"),
547        }
548    }
549
550    #[test]
551    fn test_parse_runtime_done_event() {
552        let json = r#"[{
553            "type": "platform.runtimeDone",
554            "time": "2022-10-12T00:00:00.000Z",
555            "record": {
556                "requestId": "test-request-id",
557                "status": "success",
558                "metrics": {
559                    "durationMs": 50.0
560                },
561                "spans": [
562                    {"name": "responseLatency", "start": 0.0, "durationMs": 10.0}
563                ]
564            }
565        }]"#;
566
567        let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
568        assert_eq!(events.len(), 1);
569
570        match &events[0] {
571            TelemetryEvent::RuntimeDone { record, .. } => {
572                assert_eq!(record.request_id, "test-request-id");
573                assert_eq!(record.spans.len(), 1);
574                assert_eq!(record.spans[0].name, "responseLatency");
575            }
576            _ => panic!("Expected RuntimeDone event"),
577        }
578    }
579
580    #[test]
581    fn test_parse_init_events() {
582        let json = r#"[
583            {
584                "type": "platform.initStart",
585                "time": "2022-10-12T00:00:00.000Z",
586                "record": {
587                    "initializationType": "on-demand",
588                    "phase": "init"
589                }
590            },
591            {
592                "type": "platform.initRuntimeDone",
593                "time": "2022-10-12T00:00:01.000Z",
594                "record": {
595                    "initializationType": "on-demand",
596                    "status": "success",
597                    "phase": "init"
598                }
599            }
600        ]"#;
601
602        let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
603        assert_eq!(events.len(), 2);
604
605        match &events[0] {
606            TelemetryEvent::InitStart { record, .. } => {
607                assert_eq!(record.initialization_type, "on-demand");
608            }
609            _ => panic!("Expected InitStart event"),
610        }
611
612        match &events[1] {
613            TelemetryEvent::InitRuntimeDone { record, .. } => {
614                assert_eq!(record.status, "success");
615            }
616            _ => panic!("Expected InitRuntimeDone event"),
617        }
618    }
619
620    #[test]
621    fn test_parse_function_log() {
622        let json = r#"[{
623            "type": "function",
624            "time": "2022-10-12T00:00:00.000Z",
625            "record": "Hello from Lambda!"
626        }]"#;
627
628        let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
629        assert_eq!(events.len(), 1);
630
631        match &events[0] {
632            TelemetryEvent::Function { record, .. } => {
633                assert_eq!(record, "Hello from Lambda!");
634            }
635            _ => panic!("Expected Function event"),
636        }
637    }
638
639    #[test]
640    fn test_listener_uri() {
641        let (tx, _rx) = mpsc::channel(10);
642        let listener = TelemetryListener::new(9999, tx, CancellationToken::new());
643
644        // In non-Lambda environment (no AWS_LAMBDA_FUNCTION_NAME), uses 127.0.0.1
645        assert_eq!(listener.listener_uri(), "http://127.0.0.1:9999");
646    }
647
648    #[test]
649    fn test_telemetry_error_display() {
650        let err = TelemetryError::Parse("parse error".to_string());
651        assert!(format!("{}", err).contains("parse error"));
652    }
653
654    #[test]
655    fn test_buffering_config_default() {
656        let config = BufferingConfig::default();
657
658        assert_eq!(config.max_items, Some(1000));
659        assert_eq!(config.max_bytes, Some(256 * 1024));
660        assert_eq!(config.timeout_ms, Some(25));
661    }
662}