1use chrono::{DateTime, Utc};
2use http::{Request, Response};
3use http_body_util::BodyExt;
4use hyper::body::Incoming;
5use lambda_runtime_api_client::body::Body;
6use serde::{Deserialize, Serialize};
7use std::{fmt, sync::Arc};
8use tokio::sync::Mutex;
9use tower::Service;
10use tracing::{error, trace};
11
12use crate::{Error, ExtensionError};
13
14#[derive(Clone, Debug, Deserialize, PartialEq)]
17pub struct LambdaLog {
18 pub time: DateTime<Utc>,
20 #[serde(flatten)]
22 pub record: LambdaLogRecord,
23}
24
25#[derive(Clone, Debug, Deserialize, PartialEq)]
27#[serde(tag = "type", content = "record", rename_all = "lowercase")]
28pub enum LambdaLogRecord {
29 Function(String),
31
32 Extension(String),
34
35 #[serde(rename = "platform.start", rename_all = "camelCase")]
37 PlatformStart {
38 request_id: String,
40 },
41 #[serde(rename = "platform.end", rename_all = "camelCase")]
43 PlatformEnd {
44 request_id: String,
46 },
47 #[serde(rename = "platform.report", rename_all = "camelCase")]
49 PlatformReport {
50 request_id: String,
52 metrics: LogPlatformReportMetrics,
54 },
55 #[serde(rename = "platform.fault")]
57 PlatformFault(String),
58 #[serde(rename = "platform.extension", rename_all = "camelCase")]
60 PlatformExtension {
61 name: String,
63 state: String,
65 events: Vec<String>,
67 },
68 #[serde(rename = "platform.logsSubscription", rename_all = "camelCase")]
70 PlatformLogsSubscription {
71 name: String,
73 state: String,
75 types: Vec<String>,
77 },
78 #[serde(rename = "platform.logsDropped", rename_all = "camelCase")]
80 PlatformLogsDropped {
81 reason: String,
83 dropped_records: u64,
85 dropped_bytes: u64,
87 },
88 #[serde(rename = "platform.runtimeDone", rename_all = "camelCase")]
90 PlatformRuntimeDone {
91 request_id: String,
93 status: String,
95 },
96}
97
98#[derive(Clone, Debug, Deserialize, PartialEq)]
100#[serde(rename_all = "camelCase")]
101pub struct LogPlatformReportMetrics {
102 pub duration_ms: f64,
104 pub billed_duration_ms: u64,
106 #[serde(rename = "memorySizeMB")]
108 pub memory_size_mb: u64,
109 #[serde(rename = "maxMemoryUsedMB")]
111 pub max_memory_used_mb: u64,
112 #[serde(default = "Option::default")]
114 pub init_duration_ms: Option<f64>,
115}
116
117#[derive(Debug, Serialize, Copy, Clone)]
120#[serde(rename_all = "camelCase")]
121pub struct LogBuffering {
122 pub timeout_ms: usize,
125 pub max_bytes: usize,
128 pub max_items: usize,
131}
132
133static LOG_BUFFERING_MIN_TIMEOUT_MS: usize = 25;
134static LOG_BUFFERING_MAX_TIMEOUT_MS: usize = 30_000;
135static LOG_BUFFERING_MIN_BYTES: usize = 262_144;
136static LOG_BUFFERING_MAX_BYTES: usize = 1_048_576;
137static LOG_BUFFERING_MIN_ITEMS: usize = 1_000;
138static LOG_BUFFERING_MAX_ITEMS: usize = 10_000;
139
140impl LogBuffering {
141 fn validate(&self) -> Result<(), Error> {
142 if self.timeout_ms < LOG_BUFFERING_MIN_TIMEOUT_MS || self.timeout_ms > LOG_BUFFERING_MAX_TIMEOUT_MS {
143 let error = format!(
144 "LogBuffering validation error: Invalid timeout_ms: {}. Allowed values: Minumun: {}. Maximum: {}",
145 self.timeout_ms, LOG_BUFFERING_MIN_TIMEOUT_MS, LOG_BUFFERING_MAX_TIMEOUT_MS
146 );
147 return Err(ExtensionError::boxed(error));
148 }
149 if self.max_bytes < LOG_BUFFERING_MIN_BYTES || self.max_bytes > LOG_BUFFERING_MAX_BYTES {
150 let error = format!(
151 "LogBuffering validation error: Invalid max_bytes: {}. Allowed values: Minumun: {}. Maximum: {}",
152 self.max_bytes, LOG_BUFFERING_MIN_BYTES, LOG_BUFFERING_MAX_BYTES
153 );
154 return Err(ExtensionError::boxed(error));
155 }
156 if self.max_items < LOG_BUFFERING_MIN_ITEMS || self.max_items > LOG_BUFFERING_MAX_ITEMS {
157 let error = format!(
158 "LogBuffering validation error: Invalid max_items: {}. Allowed values: Minumun: {}. Maximum: {}",
159 self.max_items, LOG_BUFFERING_MIN_ITEMS, LOG_BUFFERING_MAX_ITEMS
160 );
161 return Err(ExtensionError::boxed(error));
162 }
163 Ok(())
164 }
165}
166
167impl Default for LogBuffering {
168 fn default() -> Self {
169 LogBuffering {
170 timeout_ms: 1_000,
171 max_bytes: 262_144,
172 max_items: 10_000,
173 }
174 }
175}
176
177pub(crate) fn validate_buffering_configuration(log_buffering: Option<LogBuffering>) -> Result<(), Error> {
183 match log_buffering {
184 Some(log_buffering) => log_buffering.validate(),
185 None => Ok(()),
186 }
187}
188
189pub(crate) async fn log_wrapper<S>(service: Arc<Mutex<S>>, req: Request<Incoming>) -> Result<Response<Body>, Error>
194where
195 S: Service<Vec<LambdaLog>, Response = ()>,
196 S::Error: Into<Error> + fmt::Debug,
197 S::Future: Send,
198{
199 trace!("Received logs request");
200 let body = match req.into_body().collect().await {
202 Ok(body) => body,
203 Err(e) => {
204 error!("Error reading logs request body: {}", e);
205 return Ok(hyper::Response::builder()
206 .status(hyper::StatusCode::BAD_REQUEST)
207 .body(Body::empty())
208 .unwrap());
209 }
210 };
211 let logs: Vec<LambdaLog> = match serde_json::from_slice(&body.to_bytes()) {
212 Ok(logs) => logs,
213 Err(e) => {
214 error!("Error parsing logs: {}", e);
215 return Ok(hyper::Response::builder()
216 .status(hyper::StatusCode::BAD_REQUEST)
217 .body(Body::empty())
218 .unwrap());
219 }
220 };
221
222 {
223 let mut service = service.lock().await;
224 match service.call(logs).await {
225 Ok(_) => (),
226 Err(err) => println!("{err:?}"),
227 }
228 }
229
230 Ok(hyper::Response::new(Body::empty()))
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use chrono::{TimeDelta, TimeZone};
237
238 #[test]
239 fn deserialize_full() {
240 let data = r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#;
241 let expected = LambdaLog {
242 time: Utc
243 .with_ymd_and_hms(2020, 8, 20, 12, 31, 32)
244 .unwrap()
245 .checked_add_signed(TimeDelta::try_milliseconds(123).unwrap())
246 .unwrap(),
247 record: LambdaLogRecord::Function("hello world".to_string()),
248 };
249
250 let actual = serde_json::from_str::<LambdaLog>(data).unwrap();
251
252 assert_eq!(expected, actual);
253 }
254
255 macro_rules! deserialize_tests {
256 ($($name:ident: $value:expr,)*) => {
257 $(
258 #[test]
259 fn $name() {
260 let (input, expected) = $value;
261 let actual = serde_json::from_str::<LambdaLog>(&input).expect("unable to deserialize");
262
263 assert!(actual.record == expected);
264 }
265 )*
266 }
267 }
268
269 deserialize_tests! {
270 function: (
272 r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#,
273 LambdaLogRecord::Function("hello world".to_string()),
274 ),
275
276 extension: (
278 r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#,
279 LambdaLogRecord::Extension("hello world".to_string()),
280 ),
281
282 platform_start: (
284 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.start","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#,
285 LambdaLogRecord::PlatformStart {
286 request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
287 },
288 ),
289 platform_end: (
291 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.end","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#,
292 LambdaLogRecord::PlatformEnd {
293 request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
294 },
295 ),
296 platform_report: (
298 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.report","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56","metrics": {"durationMs": 1.23,"billedDurationMs": 123,"memorySizeMB": 123,"maxMemoryUsedMB": 123,"initDurationMs": 1.23}}}"#,
299 LambdaLogRecord::PlatformReport {
300 request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
301 metrics: LogPlatformReportMetrics {
302 duration_ms: 1.23,
303 billed_duration_ms: 123,
304 memory_size_mb: 123,
305 max_memory_used_mb: 123,
306 init_duration_ms: Some(1.23),
307 },
308 },
309 ),
310 platform_fault: (
312 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.fault","record": "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"}"#,
313 LambdaLogRecord::PlatformFault(
314 "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"
315 .to_string(),
316 ),
317 ),
318 platform_extension: (
320 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.extension","record": {"name": "Foo.bar","state": "Ready","events": ["INVOKE", "SHUTDOWN"]}}"#,
321 LambdaLogRecord::PlatformExtension {
322 name: "Foo.bar".to_string(),
323 state: "Ready".to_string(),
324 events: vec!["INVOKE".to_string(), "SHUTDOWN".to_string()],
325 },
326 ),
327 platform_logssubscription: (
329 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsSubscription","record": {"name": "test","state": "active","types": ["test"]}}"#,
330 LambdaLogRecord::PlatformLogsSubscription {
331 name: "test".to_string(),
332 state: "active".to_string(),
333 types: vec!["test".to_string()],
334 },
335 ),
336 platform_logsdropped: (
338 r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsDropped","record": {"reason": "Consumer seems to have fallen behind as it has not acknowledged receipt of logs.","droppedRecords": 123,"droppedBytes": 12345}}"#,
339 LambdaLogRecord::PlatformLogsDropped {
340 reason: "Consumer seems to have fallen behind as it has not acknowledged receipt of logs."
341 .to_string(),
342 dropped_records: 123,
343 dropped_bytes: 12345,
344 },
345 ),
346 platform_runtimedone: (
348 r#"{"time": "2021-02-04T20:00:05.123Z","type": "platform.runtimeDone","record": {"requestId":"6f7f0961f83442118a7af6fe80b88d56","status": "success"}}"#,
349 LambdaLogRecord::PlatformRuntimeDone {
350 request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
351 status: "success".to_string(),
352 },
353 ),
354 }
355
356 macro_rules! log_buffering_configuration_tests {
357 ($($name:ident: $value:expr,)*) => {
358 $(
359 #[test]
360 fn $name() {
361 let (input, expected) = $value;
362 let result = validate_buffering_configuration(input);
363
364 if let Some(expected) = expected {
365 assert!(result.is_err());
366 assert_eq!(result.unwrap_err().to_string(), expected.to_string());
367 } else {
368 assert!(result.is_ok());
369 }
370
371 }
372 )*
373 }
374 }
375
376 log_buffering_configuration_tests! {
377 log_buffer_configuration_none_success: (
378 None,
379 None::<ExtensionError>
380 ),
381 log_buffer_configuration_default_success: (
382 Some(LogBuffering::default()),
383 None::<ExtensionError>
384 ),
385 log_buffer_configuration_min_success: (
386 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS }),
387 None::<ExtensionError>
388 ),
389 log_buffer_configuration_max_success: (
390 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
391 None::<ExtensionError>
392 ),
393 min_timeout_ms_error: (
394 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS-1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
395 Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 24. Allowed values: Minumun: 25. Maximum: 30000"))
396 ),
397 max_timeout_ms_error: (
398 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS+1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
399 Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 30001. Allowed values: Minumun: 25. Maximum: 30000"))
400 ),
401 min_max_bytes_error: (
402 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES-1, max_items: LOG_BUFFERING_MAX_ITEMS }),
403 Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 262143. Allowed values: Minumun: 262144. Maximum: 1048576"))
404 ),
405 max_max_bytes_error: (
406 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES+1, max_items: LOG_BUFFERING_MAX_ITEMS }),
407 Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 1048577. Allowed values: Minumun: 262144. Maximum: 1048576"))
408 ),
409 min_max_items_error: (
410 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS-1 }),
411 Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 999. Allowed values: Minumun: 1000. Maximum: 10000"))
412 ),
413 max_max_items_error: (
414 Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS+1 }),
415 Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 10001. Allowed values: Minumun: 1000. Maximum: 10000"))
416 ),
417 }
418}