use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{boxed::Box, fmt, sync::Arc};
use tokio::sync::Mutex;
use tower::Service;
use tracing::{error, trace};
#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct LambdaLog {
pub time: DateTime<Utc>,
#[serde(flatten)]
pub record: LambdaLogRecord,
}
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(tag = "type", content = "record", rename_all = "lowercase")]
pub enum LambdaLogRecord {
Function(String),
Extension(String),
#[serde(rename = "platform.start", rename_all = "camelCase")]
PlatformStart {
request_id: String,
},
#[serde(rename = "platform.end", rename_all = "camelCase")]
PlatformEnd {
request_id: String,
},
#[serde(rename = "platform.report", rename_all = "camelCase")]
PlatformReport {
request_id: String,
metrics: LogPlatformReportMetrics,
},
#[serde(rename = "platform.fault")]
PlatformFault(String),
#[serde(rename = "platform.extension", rename_all = "camelCase")]
PlatformExtension {
name: String,
state: String,
events: Vec<String>,
},
#[serde(rename = "platform.logsSubscription", rename_all = "camelCase")]
PlatformLogsSubscription {
name: String,
state: String,
types: Vec<String>,
},
#[serde(rename = "platform.logsDropped", rename_all = "camelCase")]
PlatformLogsDropped {
reason: String,
dropped_records: u64,
dropped_bytes: u64,
},
#[serde(rename = "platform.runtimeDone", rename_all = "camelCase")]
PlatformRuntimeDone {
request_id: String,
status: String,
},
}
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct LogPlatformReportMetrics {
pub duration_ms: f64,
pub billed_duration_ms: u64,
#[serde(rename = "memorySizeMB")]
pub memory_size_mb: u64,
#[serde(rename = "maxMemoryUsedMB")]
pub max_memory_used_mb: u64,
#[serde(default = "Option::default")]
pub init_duration_ms: Option<f64>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct LogBuffering {
pub timeout_ms: usize,
pub max_bytes: usize,
pub max_items: usize,
}
impl Default for LogBuffering {
fn default() -> Self {
LogBuffering {
timeout_ms: 1_000,
max_bytes: 262_144,
max_items: 10_000,
}
}
}
pub(crate) async fn log_wrapper<S>(
service: Arc<Mutex<S>>,
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, Box<dyn std::error::Error + Send + Sync>>
where
S: Service<Vec<LambdaLog>, Response = ()>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + fmt::Debug,
S::Future: Send,
{
trace!("Received logs request");
let body = match hyper::body::to_bytes(req.into_body()).await {
Ok(body) => body,
Err(e) => {
error!("Error reading logs request body: {}", e);
return Ok(hyper::Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(hyper::Body::empty())
.unwrap());
}
};
let logs: Vec<LambdaLog> = match serde_json::from_slice(&body) {
Ok(logs) => logs,
Err(e) => {
error!("Error parsing logs: {}", e);
return Ok(hyper::Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(hyper::Body::empty())
.unwrap());
}
};
{
let mut service = service.lock().await;
match service.call(logs).await {
Ok(_) => (),
Err(err) => println!("{err:?}"),
}
}
Ok(hyper::Response::new(hyper::Body::empty()))
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, TimeZone};
#[test]
fn deserialize_full() {
let data = r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#;
let expected = LambdaLog {
time: Utc
.with_ymd_and_hms(2020, 8, 20, 12, 31, 32)
.unwrap()
.checked_add_signed(Duration::milliseconds(123))
.unwrap(),
record: LambdaLogRecord::Function("hello world".to_string()),
};
let actual = serde_json::from_str::<LambdaLog>(data).unwrap();
assert_eq!(expected, actual);
}
macro_rules! deserialize_tests {
($($name:ident: $value:expr,)*) => {
$(
#[test]
fn $name() {
let (input, expected) = $value;
let actual = serde_json::from_str::<LambdaLog>(&input).expect("unable to deserialize");
assert!(actual.record == expected);
}
)*
}
}
deserialize_tests! {
function: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#,
LambdaLogRecord::Function("hello world".to_string()),
),
extension: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#,
LambdaLogRecord::Extension("hello world".to_string()),
),
platform_start: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.start","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#,
LambdaLogRecord::PlatformStart {
request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
},
),
platform_end: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.end","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#,
LambdaLogRecord::PlatformEnd {
request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
},
),
platform_report: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.report","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56","metrics": {"durationMs": 1.23,"billedDurationMs": 123,"memorySizeMB": 123,"maxMemoryUsedMB": 123,"initDurationMs": 1.23}}}"#,
LambdaLogRecord::PlatformReport {
request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
metrics: LogPlatformReportMetrics {
duration_ms: 1.23,
billed_duration_ms: 123,
memory_size_mb: 123,
max_memory_used_mb: 123,
init_duration_ms: Some(1.23),
},
},
),
platform_fault: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.fault","record": "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"}"#,
LambdaLogRecord::PlatformFault(
"RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"
.to_string(),
),
),
platform_extension: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.extension","record": {"name": "Foo.bar","state": "Ready","events": ["INVOKE", "SHUTDOWN"]}}"#,
LambdaLogRecord::PlatformExtension {
name: "Foo.bar".to_string(),
state: "Ready".to_string(),
events: vec!["INVOKE".to_string(), "SHUTDOWN".to_string()],
},
),
platform_logssubscription: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsSubscription","record": {"name": "test","state": "active","types": ["test"]}}"#,
LambdaLogRecord::PlatformLogsSubscription {
name: "test".to_string(),
state: "active".to_string(),
types: vec!["test".to_string()],
},
),
platform_logsdropped: (
r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsDropped","record": {"reason": "Consumer seems to have fallen behind as it has not acknowledged receipt of logs.","droppedRecords": 123,"droppedBytes": 12345}}"#,
LambdaLogRecord::PlatformLogsDropped {
reason: "Consumer seems to have fallen behind as it has not acknowledged receipt of logs."
.to_string(),
dropped_records: 123,
dropped_bytes: 12345,
},
),
platform_runtimedone: (
r#"{"time": "2021-02-04T20:00:05.123Z","type": "platform.runtimeDone","record": {"requestId":"6f7f0961f83442118a7af6fe80b88d56","status": "success"}}"#,
LambdaLogRecord::PlatformRuntimeDone {
request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
status: "success".to_string(),
},
),
}
}