use chrono::{DateTime, Utc};
use http::{Request, Response};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use lambda_runtime_api_client::body::Body;
use serde::{Deserialize, Serialize};
use std::{boxed::Box, fmt, sync::Arc};
use tokio::sync::Mutex;
use tower::Service;
use tracing::{error, trace};
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct LambdaTelemetry {
pub time: DateTime<Utc>,
#[serde(flatten)]
pub record: LambdaTelemetryRecord,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(tag = "type", content = "record", rename_all = "lowercase")]
pub enum LambdaTelemetryRecord {
Function(String),
Extension(String),
#[serde(rename = "platform.initStart", rename_all = "camelCase")]
PlatformInitStart {
initialization_type: InitType,
phase: InitPhase,
#[serde(skip_serializing_if = "Option::is_none")]
runtime_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
runtime_version_arn: Option<String>,
},
#[serde(rename = "platform.initRuntimeDone", rename_all = "camelCase")]
PlatformInitRuntimeDone {
initialization_type: InitType,
#[serde(skip_serializing_if = "Option::is_none")]
phase: Option<InitPhase>,
status: Status,
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
#[serde(default)]
spans: Vec<Span>,
},
#[serde(rename = "platform.initReport", rename_all = "camelCase")]
PlatformInitReport {
initialization_type: InitType,
phase: InitPhase,
metrics: InitReportMetrics,
#[serde(default)]
spans: Vec<Span>,
},
#[serde(rename = "platform.start", rename_all = "camelCase")]
PlatformStart {
request_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
#[serde(rename = "platform.runtimeDone", rename_all = "camelCase")]
PlatformRuntimeDone {
request_id: String,
status: Status,
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
metrics: Option<RuntimeDoneMetrics>,
#[serde(default)]
spans: Vec<Span>,
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
#[serde(rename = "platform.report", rename_all = "camelCase")]
PlatformReport {
request_id: String,
status: Status,
#[serde(skip_serializing_if = "Option::is_none")]
error_type: Option<String>,
metrics: ReportMetrics,
#[serde(default)]
spans: Vec<Span>,
#[serde(skip_serializing_if = "Option::is_none")]
tracing: Option<TraceContext>,
},
#[serde(rename = "platform.extension", rename_all = "camelCase")]
PlatformExtension {
name: String,
state: String,
events: Vec<String>,
},
#[serde(rename = "platform.telemetrySubscription", rename_all = "camelCase")]
PlatformTelemetrySubscription {
name: String,
state: String,
types: Vec<String>,
},
#[serde(rename = "platform.logsDropped", rename_all = "camelCase")]
PlatformLogsDropped {
reason: String,
dropped_records: u64,
dropped_bytes: u64,
},
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum InitType {
OnDemand,
ProvisionedConcurrency,
SnapStart,
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum InitPhase {
Init,
Invoke,
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum Status {
Success,
Error,
Failure,
Timeout,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Span {
pub duration_ms: f64,
pub name: String,
pub start: DateTime<Utc>,
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct TraceContext {
pub span_id: Option<String>,
pub r#type: TracingType,
pub value: String,
}
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
pub enum TracingType {
#[serde(rename = "X-Amzn-Trace-Id")]
AmznTraceId,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct InitReportMetrics {
pub duration_ms: f64,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ReportMetrics {
pub duration_ms: f64,
pub billed_duration_ms: u64,
#[serde(rename = "memorySizeMB")]
pub memory_size_mb: u64,
#[serde(rename = "maxMemoryUsedMB")]
pub max_memory_used_mb: u64,
#[serde(default = "Option::default", skip_serializing_if = "Option::is_none")]
pub init_duration_ms: Option<f64>,
#[serde(default = "Option::default", skip_serializing_if = "Option::is_none")]
pub restore_duration_ms: Option<f64>,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct RuntimeDoneMetrics {
pub duration_ms: f64,
pub produced_bytes: Option<u64>,
}
pub(crate) async fn telemetry_wrapper<S>(
service: Arc<Mutex<S>>,
req: Request<Incoming>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>>
where
S: Service<Vec<LambdaTelemetry>, Response = ()>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
S::Future: Send,
{
trace!("Received telemetry request");
let body = match req.into_body().collect().await {
Ok(body) => body,
Err(e) => {
error!("Error reading telemetry request body: {}", e);
return Ok(hyper::Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap());
}
};
let telemetry: Vec<LambdaTelemetry> = match serde_json::from_slice(&body.to_bytes()) {
Ok(telemetry) => telemetry,
Err(e) => {
error!("Error parsing telemetry: {}", e);
return Ok(hyper::Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap());
}
};
{
let mut service = service.lock().await;
match service.call(telemetry).await {
Ok(_) => (),
Err(err) => println!("{err:?}"),
}
}
Ok(hyper::Response::new(Body::empty()))
}
#[cfg(test)]
mod deserialization_tests {
use super::*;
use chrono::{Duration, TimeZone};
macro_rules! deserialize_tests {
($($name:ident: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let actual = serde_json::from_str::<LambdaTelemetry>(&input).expect("unable to deserialize");
assert!(actual.record == expected);
}
)*
}
}
deserialize_tests! {
function: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#,
LambdaTelemetryRecord::Function("hello world".to_string()),
),
extension: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#,
LambdaTelemetryRecord::Extension("hello world".to_string()),
),
platform_start: (
r#"{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
LambdaTelemetryRecord::PlatformStart {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
version: Some("$LATEST".to_string()),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
),
platform_init_start: (
r#"{"time":"2022-10-19T13:52:15.636Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}}"#,
LambdaTelemetryRecord::PlatformInitStart {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
runtime_version: None,
runtime_version_arn: None,
},
),
platform_runtime_done: (
r#"{"time":"2022-10-21T14:05:05.764Z","type":"platform.runtimeDone","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"},"spans":[{"name":"responseLatency","start":"2022-10-21T14:05:03.165Z","durationMs":2598.0},{"name":"responseDuration","start":"2022-10-21T14:05:05.763Z","durationMs":0.0}],"metrics":{"durationMs":2599.0,"producedBytes":8}}}"#,
LambdaTelemetryRecord::PlatformRuntimeDone {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: Some(RuntimeDoneMetrics {
duration_ms: 2599.0,
produced_bytes: Some(8),
}),
spans: vec!(
Span {
name:"responseLatency".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 3)
.unwrap()
.checked_add_signed(Duration::milliseconds(165))
.unwrap(),
duration_ms: 2598.0
},
Span {
name:"responseDuration".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 5)
.unwrap()
.checked_add_signed(Duration::milliseconds(763))
.unwrap(),
duration_ms: 0.0
},
),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
),
platform_report: (
r#"{"time":"2022-10-21T14:05:05.766Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"},"status":"success"}}"#,
LambdaTelemetryRecord::PlatformReport {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: ReportMetrics {
duration_ms: 2599.4,
billed_duration_ms: 2600,
memory_size_mb:128,
max_memory_used_mb:94,
init_duration_ms: Some(549.04),
restore_duration_ms: None,
},
spans: Vec::new(),
tracing: Some(TraceContext {
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
),
platform_telemetry_subscription: (
r#"{"time":"2022-10-19T13:52:15.667Z","type":"platform.telemetrySubscription","record":{"name":"my-extension","state":"Subscribed","types":["platform","function"]}}"#,
LambdaTelemetryRecord::PlatformTelemetrySubscription {
name: "my-extension".to_string(),
state: "Subscribed".to_string(),
types: vec!("platform".to_string(), "function".to_string()),
},
),
platform_init_runtime_done: (
r#"{"time":"2022-10-19T13:52:16.136Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","status":"success"}}"#,
LambdaTelemetryRecord::PlatformInitRuntimeDone {
initialization_type: InitType::OnDemand,
status: Status::Success,
phase: None,
error_type: None,
spans: Vec::new(),
},
),
platform_extension: (
r#"{"time":"2022-10-19T13:52:16.136Z","type":"platform.extension","record":{"name":"my-extension","state":"Ready","events":["SHUTDOWN","INVOKE"]}}"#,
LambdaTelemetryRecord::PlatformExtension {
name: "my-extension".to_string(),
state: "Ready".to_string(),
events: vec!("SHUTDOWN".to_string(), "INVOKE".to_string()),
},
),
platform_init_report: (
r#"{"time":"2022-10-19T13:52:16.136Z","type":"platform.initReport","record":{"initializationType":"on-demand","metrics":{"durationMs":500.0},"phase":"init"}}"#,
LambdaTelemetryRecord::PlatformInitReport {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
metrics: InitReportMetrics { duration_ms: 500.0 },
spans: Vec::new(),
}
),
}
}
#[cfg(test)]
mod serialization_tests {
use chrono::{Duration, TimeZone};
use super::*;
macro_rules! serialize_tests {
($($name:ident: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let actual = serde_json::to_string(&input).expect("unable to serialize");
println!("Input: {:?}\n", input);
println!("Expected:\n {:?}\n", expected);
println!("Actual:\n {:?}\n", actual);
assert!(actual == expected);
}
)*
}
}
serialize_tests! {
function: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Function("hello world".to_string()),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"function","record":"hello world"}"#,
),
extension: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::Extension("hello world".to_string()),
},
r#"{"time":"2023-11-28T12:00:09Z","type":"extension","record":"hello world"}"#,
),
platform_start: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformStart {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
version: Some("$LATEST".to_string()),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
}
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
platform_init_start: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitStart {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
runtime_version: None,
runtime_version_arn: None,
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}}"#,
),
platform_runtime_done: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformRuntimeDone {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: Some(RuntimeDoneMetrics {
duration_ms: 2599.0,
produced_bytes: Some(8),
}),
spans: vec!(
Span {
name:"responseLatency".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 3)
.unwrap()
.checked_add_signed(Duration::milliseconds(165))
.unwrap(),
duration_ms: 2598.0
},
Span {
name:"responseDuration".to_string(),
start: Utc
.with_ymd_and_hms(2022, 10, 21, 14, 5, 5)
.unwrap()
.checked_add_signed(Duration::milliseconds(763))
.unwrap(),
duration_ms: 0.0
},
),
tracing: Some(TraceContext{
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.runtimeDone","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.0,"producedBytes":8},"spans":[{"durationMs":2598.0,"name":"responseLatency","start":"2022-10-21T14:05:03.165Z"},{"durationMs":0.0,"name":"responseDuration","start":"2022-10-21T14:05:05.763Z"}],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
platform_report: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformReport {
request_id: "459921b5-681c-4a96-beb0-81e0aa586026".to_string(),
status: Status::Success,
error_type: None,
metrics: ReportMetrics {
duration_ms: 2599.4,
billed_duration_ms: 2600,
memory_size_mb:128,
max_memory_used_mb:94,
init_duration_ms: Some(549.04),
restore_duration_ms: None,
},
spans: Vec::new(),
tracing: Some(TraceContext {
span_id: Some("24cd7d670fa455f0".to_string()),
r#type: TracingType::AmznTraceId,
value: "Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1".to_string(),
}),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.report","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","status":"success","metrics":{"durationMs":2599.4,"billedDurationMs":2600,"memorySizeMB":128,"maxMemoryUsedMB":94,"initDurationMs":549.04},"spans":[],"tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}"#,
),
platform_telemetry_subscription: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformTelemetrySubscription {
name: "my-extension".to_string(),
state: "Subscribed".to_string(),
types: vec!("platform".to_string(), "function".to_string()),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.telemetrySubscription","record":{"name":"my-extension","state":"Subscribed","types":["platform","function"]}}"#,
),
platform_init_runtime_done: (
LambdaTelemetry{
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitRuntimeDone {
initialization_type: InitType::OnDemand,
status: Status::Success,
phase: None,
error_type: None,
spans: Vec::new(),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initRuntimeDone","record":{"initializationType":"on-demand","status":"success","spans":[]}}"#,
),
platform_extension: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformExtension {
name: "my-extension".to_string(),
state: "Ready".to_string(),
events: vec!("SHUTDOWN".to_string(), "INVOKE".to_string()),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.extension","record":{"name":"my-extension","state":"Ready","events":["SHUTDOWN","INVOKE"]}}"#,
),
platform_init_report: (
LambdaTelemetry {
time: Utc.with_ymd_and_hms(2023, 11, 28, 12, 0, 9).unwrap(),
record: LambdaTelemetryRecord::PlatformInitReport {
initialization_type: InitType::OnDemand,
phase: InitPhase::Init,
metrics: InitReportMetrics { duration_ms: 500.0 },
spans: Vec::new(),
},
},
r#"{"time":"2023-11-28T12:00:09Z","type":"platform.initReport","record":{"initializationType":"on-demand","phase":"init","metrics":{"durationMs":500.0},"spans":[]}}"#,
),
}
}