1use axum::{
7 Router, body::Bytes, extract::State, http::StatusCode, response::IntoResponse, routing::post,
8};
9use serde::{Deserialize, Serialize};
10use std::net::SocketAddr;
11use std::sync::Arc;
12use tokio::net::TcpListener;
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16#[non_exhaustive]
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "lowercase")]
20pub enum TelemetryType {
21 Platform,
23 Function,
25 Extension,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct BufferingConfig {
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub max_items: Option<u32>,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub max_bytes: Option<u32>,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub timeout_ms: Option<u32>,
42}
43
44impl Default for BufferingConfig {
45 fn default() -> Self {
46 Self {
47 max_items: Some(1000),
48 max_bytes: Some(256 * 1024),
49 timeout_ms: Some(25),
50 }
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(rename_all = "camelCase")]
57pub struct DestinationConfig {
58 pub protocol: String,
60 #[serde(rename = "URI")]
62 pub uri: String,
63}
64
65#[derive(Debug, Clone, Serialize)]
67#[serde(rename_all = "camelCase")]
68pub struct TelemetrySubscription {
69 pub schema_version: String,
71 pub types: Vec<TelemetryType>,
73 pub buffering: BufferingConfig,
75 pub destination: DestinationConfig,
77}
78
79impl TelemetrySubscription {
80 pub fn platform_events(listener_uri: impl Into<String>) -> Self {
82 Self {
83 schema_version: "2022-12-13".to_string(),
84 types: vec![TelemetryType::Platform],
85 buffering: BufferingConfig::default(),
86 destination: DestinationConfig {
87 protocol: "HTTP".to_string(),
88 uri: listener_uri.into(),
89 },
90 }
91 }
92
93 pub fn all_events(listener_uri: impl Into<String>) -> Self {
95 Self {
96 schema_version: "2022-12-13".to_string(),
97 types: vec![
98 TelemetryType::Platform,
99 TelemetryType::Function,
100 TelemetryType::Extension,
101 ],
102 buffering: BufferingConfig::default(),
103 destination: DestinationConfig {
104 protocol: "HTTP".to_string(),
105 uri: listener_uri.into(),
106 },
107 }
108 }
109
110 pub fn with_buffering(mut self, config: BufferingConfig) -> Self {
112 self.buffering = config;
113 self
114 }
115}
116
117#[non_exhaustive]
119#[derive(Debug, Clone, Deserialize)]
120#[serde(tag = "type", rename_all = "lowercase")]
121pub enum TelemetryEvent {
122 #[serde(rename = "platform.initStart")]
124 InitStart {
125 time: String,
127 record: InitStartRecord,
129 },
130 #[serde(rename = "platform.initRuntimeDone")]
132 InitRuntimeDone {
133 time: String,
135 record: InitRuntimeDoneRecord,
137 },
138 #[serde(rename = "platform.start")]
140 Start {
141 time: String,
143 record: StartRecord,
145 },
146 #[serde(rename = "platform.runtimeDone")]
148 RuntimeDone {
149 time: String,
151 record: RuntimeDoneRecord,
153 },
154 #[serde(rename = "platform.report")]
156 Report {
157 time: String,
159 record: ReportRecord,
161 },
162 #[serde(rename = "platform.fault")]
164 Fault {
165 time: String,
167 record: FaultRecord,
169 },
170 #[serde(rename = "platform.extension")]
172 Extension {
173 time: String,
175 record: ExtensionRecord,
177 },
178 #[serde(rename = "function")]
180 Function {
181 time: String,
183 record: String,
185 },
186 #[serde(rename = "extension")]
188 ExtensionLog {
189 time: String,
191 record: String,
193 },
194}
195
196#[derive(Debug, Clone, Deserialize)]
198#[serde(rename_all = "camelCase")]
199pub struct InitStartRecord {
200 pub initialization_type: String,
202 #[serde(default)]
204 pub phase: String,
205 #[serde(default)]
207 pub runtime_version: Option<String>,
208 #[serde(default)]
210 pub runtime_version_arn: Option<String>,
211}
212
213#[derive(Debug, Clone, Deserialize)]
215#[serde(rename_all = "camelCase")]
216pub struct InitRuntimeDoneRecord {
217 pub initialization_type: String,
219 #[serde(default)]
221 pub status: String,
222 #[serde(default)]
224 pub phase: String,
225}
226
227#[derive(Debug, Clone, Deserialize)]
229#[serde(rename_all = "camelCase")]
230pub struct StartRecord {
231 pub request_id: String,
233 #[serde(default)]
235 pub version: Option<String>,
236 #[serde(default)]
238 pub tracing: Option<TracingRecord>,
239}
240
241#[derive(Debug, Clone, Deserialize)]
243#[serde(rename_all = "camelCase")]
244pub struct TracingRecord {
245 #[serde(default)]
247 pub span_id: Option<String>,
248 #[serde(rename = "type", default)]
250 pub trace_type: Option<String>,
251 #[serde(default)]
253 pub value: Option<String>,
254}
255
256#[derive(Debug, Clone, Deserialize)]
258#[serde(rename_all = "camelCase")]
259pub struct RuntimeDoneRecord {
260 pub request_id: String,
262 pub status: String,
264 #[serde(default)]
266 pub metrics: Option<RuntimeMetrics>,
267 #[serde(default)]
269 pub tracing: Option<TracingRecord>,
270 #[serde(default)]
272 pub spans: Vec<SpanRecord>,
273}
274
275#[derive(Debug, Clone, Deserialize)]
277#[serde(rename_all = "camelCase")]
278pub struct RuntimeMetrics {
279 pub duration_ms: f64,
281 #[serde(default)]
283 pub produced_bytes: Option<u64>,
284}
285
286#[derive(Debug, Clone, Deserialize)]
288#[serde(rename_all = "camelCase")]
289pub struct SpanRecord {
290 pub name: String,
292 pub start: f64,
294 pub duration_ms: f64,
296}
297
298#[derive(Debug, Clone, Deserialize)]
300#[serde(rename_all = "camelCase")]
301pub struct ReportRecord {
302 pub request_id: String,
304 pub status: String,
306 pub metrics: ReportMetrics,
308 #[serde(default)]
310 pub tracing: Option<TracingRecord>,
311}
312
313#[derive(Debug, Clone, Deserialize)]
315#[serde(rename_all = "camelCase")]
316pub struct ReportMetrics {
317 pub duration_ms: f64,
319 pub billed_duration_ms: u64,
321 #[serde(rename = "memorySizeMB")]
323 pub memory_size_mb: u64,
324 #[serde(rename = "maxMemoryUsedMB")]
326 pub max_memory_used_mb: u64,
327 #[serde(default)]
329 pub init_duration_ms: Option<f64>,
330 #[serde(default)]
332 pub restore_duration_ms: Option<f64>,
333}
334
335#[derive(Debug, Clone, Deserialize)]
337#[serde(rename_all = "camelCase")]
338pub struct FaultRecord {
339 #[serde(default)]
341 pub request_id: Option<String>,
342 #[serde(default)]
344 pub fault_message: Option<String>,
345}
346
347#[derive(Debug, Clone, Deserialize)]
349#[serde(rename_all = "camelCase")]
350pub struct ExtensionRecord {
351 pub name: String,
353 pub state: String,
355 #[serde(default)]
357 pub events: Vec<String>,
358}
359
360#[non_exhaustive]
362#[derive(Debug)]
363pub enum TelemetryError {
364 Parse(String),
366}
367
368impl std::fmt::Display for TelemetryError {
369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370 match self {
371 TelemetryError::Parse(msg) => write!(f, "Parse error: {}", msg),
372 }
373 }
374}
375
376impl std::error::Error for TelemetryError {}
377
378pub struct TelemetryListener {
380 port: u16,
381 event_tx: mpsc::Sender<Vec<TelemetryEvent>>,
382 cancel_token: CancellationToken,
383}
384
385impl TelemetryListener {
386 pub fn new(
394 port: u16,
395 event_tx: mpsc::Sender<Vec<TelemetryEvent>>,
396 cancel_token: CancellationToken,
397 ) -> Self {
398 Self {
399 port,
400 event_tx,
401 cancel_token,
402 }
403 }
404
405 pub fn listener_uri(&self) -> String {
411 if std::env::var("AWS_LAMBDA_FUNCTION_NAME").is_ok() {
413 format!("http://sandbox.localdomain:{}", self.port)
414 } else {
415 format!("http://127.0.0.1:{}", self.port)
416 }
417 }
418
419 pub async fn run(self) -> Result<(), std::io::Error> {
423 let state = ListenerState {
424 event_tx: self.event_tx,
425 };
426
427 let app = Router::new()
428 .route("/", post(handle_telemetry))
429 .with_state(Arc::new(state));
430
431 let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
432 let listener = TcpListener::bind(addr).await?;
433
434 tracing::info!(port = self.port, "Telemetry API listener started");
435
436 axum::serve(listener, app)
437 .with_graceful_shutdown(self.cancel_token.cancelled_owned())
438 .await
439 }
440}
441
442struct ListenerState {
443 event_tx: mpsc::Sender<Vec<TelemetryEvent>>,
444}
445
446async fn handle_telemetry(
447 State(state): State<Arc<ListenerState>>,
448 body: Bytes,
449) -> impl IntoResponse {
450 let events: Vec<TelemetryEvent> = match serde_json::from_slice(&body) {
451 Ok(events) => events,
452 Err(e) => {
453 tracing::warn!(error = %e, "Failed to parse telemetry events");
454 return StatusCode::BAD_REQUEST;
455 }
456 };
457
458 tracing::debug!(count = events.len(), "Received telemetry events");
459
460 match state.event_tx.try_send(events) {
461 Ok(()) => StatusCode::OK,
462 Err(mpsc::error::TrySendError::Full(_)) => {
463 tracing::warn!("Telemetry event channel full");
464 StatusCode::SERVICE_UNAVAILABLE
465 }
466 Err(mpsc::error::TrySendError::Closed(_)) => {
467 tracing::error!("Telemetry event channel closed");
468 StatusCode::INTERNAL_SERVER_ERROR
469 }
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
478 fn test_telemetry_subscription_platform() {
479 let sub = TelemetrySubscription::platform_events("http://localhost:9999");
480
481 assert_eq!(sub.schema_version, "2022-12-13");
482 assert_eq!(sub.types, vec![TelemetryType::Platform]);
483 assert_eq!(sub.destination.uri, "http://localhost:9999");
484 }
485
486 #[test]
487 fn test_telemetry_subscription_all() {
488 let sub = TelemetrySubscription::all_events("http://localhost:9999");
489
490 assert_eq!(sub.types.len(), 3);
491 assert!(sub.types.contains(&TelemetryType::Platform));
492 assert!(sub.types.contains(&TelemetryType::Function));
493 assert!(sub.types.contains(&TelemetryType::Extension));
494 }
495
496 #[test]
497 fn test_parse_start_event() {
498 let json = r#"[{
499 "type": "platform.start",
500 "time": "2022-10-12T00:00:00.000Z",
501 "record": {
502 "requestId": "test-request-id",
503 "version": "$LATEST"
504 }
505 }]"#;
506
507 let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
508 assert_eq!(events.len(), 1);
509
510 match &events[0] {
511 TelemetryEvent::Start { record, .. } => {
512 assert_eq!(record.request_id, "test-request-id");
513 assert_eq!(record.version, Some("$LATEST".to_string()));
514 }
515 _ => panic!("Expected Start event"),
516 }
517 }
518
519 #[test]
520 fn test_parse_report_event() {
521 let json = r#"[{
522 "type": "platform.report",
523 "time": "2022-10-12T00:00:00.000Z",
524 "record": {
525 "requestId": "test-request-id",
526 "status": "success",
527 "metrics": {
528 "durationMs": 100.5,
529 "billedDurationMs": 200,
530 "memorySizeMB": 128,
531 "maxMemoryUsedMB": 64
532 }
533 }
534 }]"#;
535
536 let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
537 assert_eq!(events.len(), 1);
538
539 match &events[0] {
540 TelemetryEvent::Report { record, .. } => {
541 assert_eq!(record.request_id, "test-request-id");
542 assert_eq!(record.status, "success");
543 assert_eq!(record.metrics.duration_ms, 100.5);
544 assert_eq!(record.metrics.billed_duration_ms, 200);
545 }
546 _ => panic!("Expected Report event"),
547 }
548 }
549
550 #[test]
551 fn test_parse_runtime_done_event() {
552 let json = r#"[{
553 "type": "platform.runtimeDone",
554 "time": "2022-10-12T00:00:00.000Z",
555 "record": {
556 "requestId": "test-request-id",
557 "status": "success",
558 "metrics": {
559 "durationMs": 50.0
560 },
561 "spans": [
562 {"name": "responseLatency", "start": 0.0, "durationMs": 10.0}
563 ]
564 }
565 }]"#;
566
567 let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
568 assert_eq!(events.len(), 1);
569
570 match &events[0] {
571 TelemetryEvent::RuntimeDone { record, .. } => {
572 assert_eq!(record.request_id, "test-request-id");
573 assert_eq!(record.spans.len(), 1);
574 assert_eq!(record.spans[0].name, "responseLatency");
575 }
576 _ => panic!("Expected RuntimeDone event"),
577 }
578 }
579
580 #[test]
581 fn test_parse_init_events() {
582 let json = r#"[
583 {
584 "type": "platform.initStart",
585 "time": "2022-10-12T00:00:00.000Z",
586 "record": {
587 "initializationType": "on-demand",
588 "phase": "init"
589 }
590 },
591 {
592 "type": "platform.initRuntimeDone",
593 "time": "2022-10-12T00:00:01.000Z",
594 "record": {
595 "initializationType": "on-demand",
596 "status": "success",
597 "phase": "init"
598 }
599 }
600 ]"#;
601
602 let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
603 assert_eq!(events.len(), 2);
604
605 match &events[0] {
606 TelemetryEvent::InitStart { record, .. } => {
607 assert_eq!(record.initialization_type, "on-demand");
608 }
609 _ => panic!("Expected InitStart event"),
610 }
611
612 match &events[1] {
613 TelemetryEvent::InitRuntimeDone { record, .. } => {
614 assert_eq!(record.status, "success");
615 }
616 _ => panic!("Expected InitRuntimeDone event"),
617 }
618 }
619
620 #[test]
621 fn test_parse_function_log() {
622 let json = r#"[{
623 "type": "function",
624 "time": "2022-10-12T00:00:00.000Z",
625 "record": "Hello from Lambda!"
626 }]"#;
627
628 let events: Vec<TelemetryEvent> = serde_json::from_str(json).unwrap();
629 assert_eq!(events.len(), 1);
630
631 match &events[0] {
632 TelemetryEvent::Function { record, .. } => {
633 assert_eq!(record, "Hello from Lambda!");
634 }
635 _ => panic!("Expected Function event"),
636 }
637 }
638
639 #[test]
640 fn test_listener_uri() {
641 let (tx, _rx) = mpsc::channel(10);
642 let listener = TelemetryListener::new(9999, tx, CancellationToken::new());
643
644 assert_eq!(listener.listener_uri(), "http://127.0.0.1:9999");
646 }
647
648 #[test]
649 fn test_telemetry_error_display() {
650 let err = TelemetryError::Parse("parse error".to_string());
651 assert!(format!("{}", err).contains("parse error"));
652 }
653
654 #[test]
655 fn test_buffering_config_default() {
656 let config = BufferingConfig::default();
657
658 assert_eq!(config.max_items, Some(1000));
659 assert_eq!(config.max_bytes, Some(256 * 1024));
660 assert_eq!(config.timeout_ms, Some(25));
661 }
662}