lambda_extension/
logs.rs

1use chrono::{DateTime, Utc};
2use http::{Request, Response};
3use http_body_util::BodyExt;
4use hyper::body::Incoming;
5use lambda_runtime_api_client::body::Body;
6use serde::{Deserialize, Serialize};
7use std::{fmt, sync::Arc};
8use tokio::sync::Mutex;
9use tower::Service;
10use tracing::{error, trace};
11
12use crate::{Error, ExtensionError};
13
14/// Payload received from the Lambda Logs API
15/// See: <https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-msg>
16#[derive(Clone, Debug, Deserialize, PartialEq)]
17pub struct LambdaLog {
18    /// Time when the log was generated
19    pub time: DateTime<Utc>,
20    /// Log record entry
21    #[serde(flatten)]
22    pub record: LambdaLogRecord,
23}
24
25/// Record in a LambdaLog entry
26#[derive(Clone, Debug, Deserialize, PartialEq)]
27#[serde(tag = "type", content = "record", rename_all = "lowercase")]
28pub enum LambdaLogRecord {
29    /// Function log records
30    Function(String),
31
32    /// Extension log records
33    Extension(String),
34
35    /// Platform start record
36    #[serde(rename = "platform.start", rename_all = "camelCase")]
37    PlatformStart {
38        /// Request identifier
39        request_id: String,
40    },
41    /// Platform stop record
42    #[serde(rename = "platform.end", rename_all = "camelCase")]
43    PlatformEnd {
44        /// Request identifier
45        request_id: String,
46    },
47    /// Platform report record
48    #[serde(rename = "platform.report", rename_all = "camelCase")]
49    PlatformReport {
50        /// Request identifier
51        request_id: String,
52        /// Request metrics
53        metrics: LogPlatformReportMetrics,
54    },
55    /// Runtime or execution environment error record
56    #[serde(rename = "platform.fault")]
57    PlatformFault(String),
58    /// Extension-specific record
59    #[serde(rename = "platform.extension", rename_all = "camelCase")]
60    PlatformExtension {
61        /// Name of the extension
62        name: String,
63        /// State of the extension
64        state: String,
65        /// Events sent to the extension
66        events: Vec<String>,
67    },
68    /// Log processor-specific record
69    #[serde(rename = "platform.logsSubscription", rename_all = "camelCase")]
70    PlatformLogsSubscription {
71        /// Name of the extension
72        name: String,
73        /// State of the extensions
74        state: String,
75        /// Types of records sent to the extension
76        types: Vec<String>,
77    },
78    /// Record generated when the log processor is falling behind
79    #[serde(rename = "platform.logsDropped", rename_all = "camelCase")]
80    PlatformLogsDropped {
81        /// Reason for dropping the logs
82        reason: String,
83        /// Number of records dropped
84        dropped_records: u64,
85        /// Total size of the dropped records
86        dropped_bytes: u64,
87    },
88    /// Record marking the completion of an invocation
89    #[serde(rename = "platform.runtimeDone", rename_all = "camelCase")]
90    PlatformRuntimeDone {
91        /// Request identifier
92        request_id: String,
93        /// Status of the invocation
94        status: String,
95    },
96}
97
98/// Platform report metrics
99#[derive(Clone, Debug, Deserialize, PartialEq)]
100#[serde(rename_all = "camelCase")]
101pub struct LogPlatformReportMetrics {
102    /// Duration in milliseconds
103    pub duration_ms: f64,
104    /// Billed duration in milliseconds
105    pub billed_duration_ms: u64,
106    /// Memory allocated in megabytes
107    #[serde(rename = "memorySizeMB")]
108    pub memory_size_mb: u64,
109    /// Maximum memory used for the invoke in megabytes
110    #[serde(rename = "maxMemoryUsedMB")]
111    pub max_memory_used_mb: u64,
112    /// Init duration in case of a cold start
113    #[serde(default = "Option::default")]
114    pub init_duration_ms: Option<f64>,
115}
116
117/// Log buffering configuration.
118/// Allows Lambda to buffer logs before delivering them to a subscriber.
119#[derive(Debug, Serialize, Copy, Clone)]
120#[serde(rename_all = "camelCase")]
121pub struct LogBuffering {
122    /// The maximum time (in milliseconds) to buffer a batch.
123    /// Default: 1,000. Minimum: 25. Maximum: 30,000
124    pub timeout_ms: usize,
125    /// The maximum size (in bytes) of the logs to buffer in memory.
126    /// Default: 262,144. Minimum: 262,144. Maximum: 1,048,576
127    pub max_bytes: usize,
128    /// The maximum number of events to buffer in memory.
129    /// Default: 10,000. Minimum: 1,000. Maximum: 10,000
130    pub max_items: usize,
131}
132
133static LOG_BUFFERING_MIN_TIMEOUT_MS: usize = 25;
134static LOG_BUFFERING_MAX_TIMEOUT_MS: usize = 30_000;
135static LOG_BUFFERING_MIN_BYTES: usize = 262_144;
136static LOG_BUFFERING_MAX_BYTES: usize = 1_048_576;
137static LOG_BUFFERING_MIN_ITEMS: usize = 1_000;
138static LOG_BUFFERING_MAX_ITEMS: usize = 10_000;
139
140impl LogBuffering {
141    fn validate(&self) -> Result<(), Error> {
142        if self.timeout_ms < LOG_BUFFERING_MIN_TIMEOUT_MS || self.timeout_ms > LOG_BUFFERING_MAX_TIMEOUT_MS {
143            let error = format!(
144                "LogBuffering validation error: Invalid timeout_ms: {}. Allowed values: Minumun: {}. Maximum: {}",
145                self.timeout_ms, LOG_BUFFERING_MIN_TIMEOUT_MS, LOG_BUFFERING_MAX_TIMEOUT_MS
146            );
147            return Err(ExtensionError::boxed(error));
148        }
149        if self.max_bytes < LOG_BUFFERING_MIN_BYTES || self.max_bytes > LOG_BUFFERING_MAX_BYTES {
150            let error = format!(
151                "LogBuffering validation error: Invalid max_bytes: {}. Allowed values: Minumun: {}. Maximum: {}",
152                self.max_bytes, LOG_BUFFERING_MIN_BYTES, LOG_BUFFERING_MAX_BYTES
153            );
154            return Err(ExtensionError::boxed(error));
155        }
156        if self.max_items < LOG_BUFFERING_MIN_ITEMS || self.max_items > LOG_BUFFERING_MAX_ITEMS {
157            let error = format!(
158                "LogBuffering validation error: Invalid max_items: {}. Allowed values: Minumun: {}. Maximum: {}",
159                self.max_items, LOG_BUFFERING_MIN_ITEMS, LOG_BUFFERING_MAX_ITEMS
160            );
161            return Err(ExtensionError::boxed(error));
162        }
163        Ok(())
164    }
165}
166
167impl Default for LogBuffering {
168    fn default() -> Self {
169        LogBuffering {
170            timeout_ms: 1_000,
171            max_bytes: 262_144,
172            max_items: 10_000,
173        }
174    }
175}
176
177/// Validate the `LogBuffering` configuration (if present)
178///
179/// # Errors
180///
181/// This function will return an error if `LogBuffering` is present and configured incorrectly
182pub(crate) fn validate_buffering_configuration(log_buffering: Option<LogBuffering>) -> Result<(), Error> {
183    match log_buffering {
184        Some(log_buffering) => log_buffering.validate(),
185        None => Ok(()),
186    }
187}
188
189/// Wrapper function that sends logs to the subscriber Service
190///
191/// This takes an `hyper::Request` and transforms it into `Vec<LambdaLog>` for the
192/// underlying `Service` to process.
193pub(crate) async fn log_wrapper<S>(service: Arc<Mutex<S>>, req: Request<Incoming>) -> Result<Response<Body>, Error>
194where
195    S: Service<Vec<LambdaLog>, Response = ()>,
196    S::Error: Into<Error> + fmt::Debug,
197    S::Future: Send,
198{
199    trace!("Received logs request");
200    // Parse the request body as a Vec<LambdaLog>
201    let body = match req.into_body().collect().await {
202        Ok(body) => body,
203        Err(e) => {
204            error!("Error reading logs request body: {}", e);
205            return Ok(hyper::Response::builder()
206                .status(hyper::StatusCode::BAD_REQUEST)
207                .body(Body::empty())
208                .unwrap());
209        }
210    };
211    let logs: Vec<LambdaLog> = match serde_json::from_slice(&body.to_bytes()) {
212        Ok(logs) => logs,
213        Err(e) => {
214            error!("Error parsing logs: {}", e);
215            return Ok(hyper::Response::builder()
216                .status(hyper::StatusCode::BAD_REQUEST)
217                .body(Body::empty())
218                .unwrap());
219        }
220    };
221
222    {
223        let mut service = service.lock().await;
224        match service.call(logs).await {
225            Ok(_) => (),
226            Err(err) => println!("{err:?}"),
227        }
228    }
229
230    Ok(hyper::Response::new(Body::empty()))
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use chrono::{TimeDelta, TimeZone};
237
238    #[test]
239    fn deserialize_full() {
240        let data = r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#;
241        let expected = LambdaLog {
242            time: Utc
243                .with_ymd_and_hms(2020, 8, 20, 12, 31, 32)
244                .unwrap()
245                .checked_add_signed(TimeDelta::try_milliseconds(123).unwrap())
246                .unwrap(),
247            record: LambdaLogRecord::Function("hello world".to_string()),
248        };
249
250        let actual = serde_json::from_str::<LambdaLog>(data).unwrap();
251
252        assert_eq!(expected, actual);
253    }
254
255    macro_rules! deserialize_tests {
256        ($($name:ident: $value:expr,)*) => {
257            $(
258                #[test]
259                fn $name() {
260                    let (input, expected) = $value;
261                    let actual = serde_json::from_str::<LambdaLog>(&input).expect("unable to deserialize");
262
263                    assert!(actual.record == expected);
264                }
265            )*
266        }
267    }
268
269    deserialize_tests! {
270        // function
271        function: (
272            r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#,
273            LambdaLogRecord::Function("hello world".to_string()),
274        ),
275
276        // extension
277        extension: (
278            r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#,
279            LambdaLogRecord::Extension("hello world".to_string()),
280        ),
281
282        // platform.start
283        platform_start: (
284            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.start","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#,
285            LambdaLogRecord::PlatformStart {
286                request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
287            },
288        ),
289        // platform.end
290        platform_end: (
291            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.end","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#,
292            LambdaLogRecord::PlatformEnd {
293                request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
294            },
295        ),
296        // platform.report
297        platform_report: (
298            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.report","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56","metrics": {"durationMs": 1.23,"billedDurationMs": 123,"memorySizeMB": 123,"maxMemoryUsedMB": 123,"initDurationMs": 1.23}}}"#,
299            LambdaLogRecord::PlatformReport {
300                request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
301                metrics: LogPlatformReportMetrics {
302                    duration_ms: 1.23,
303                    billed_duration_ms: 123,
304                    memory_size_mb: 123,
305                    max_memory_used_mb: 123,
306                    init_duration_ms: Some(1.23),
307                },
308            },
309        ),
310        // platform.fault
311        platform_fault: (
312            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.fault","record": "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"}"#,
313            LambdaLogRecord::PlatformFault(
314                "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"
315                    .to_string(),
316            ),
317        ),
318        // platform.extension
319        platform_extension: (
320            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.extension","record": {"name": "Foo.bar","state": "Ready","events": ["INVOKE", "SHUTDOWN"]}}"#,
321            LambdaLogRecord::PlatformExtension {
322                name: "Foo.bar".to_string(),
323                state: "Ready".to_string(),
324                events: vec!["INVOKE".to_string(), "SHUTDOWN".to_string()],
325            },
326        ),
327        // platform.logsSubscription
328        platform_logssubscription: (
329            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsSubscription","record": {"name": "test","state": "active","types": ["test"]}}"#,
330            LambdaLogRecord::PlatformLogsSubscription {
331                name: "test".to_string(),
332                state: "active".to_string(),
333                types: vec!["test".to_string()],
334            },
335        ),
336        // platform.logsDropped
337        platform_logsdropped: (
338            r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsDropped","record": {"reason": "Consumer seems to have fallen behind as it has not acknowledged receipt of logs.","droppedRecords": 123,"droppedBytes": 12345}}"#,
339            LambdaLogRecord::PlatformLogsDropped {
340                reason: "Consumer seems to have fallen behind as it has not acknowledged receipt of logs."
341                    .to_string(),
342                dropped_records: 123,
343                dropped_bytes: 12345,
344            },
345        ),
346        // platform.runtimeDone
347        platform_runtimedone: (
348            r#"{"time": "2021-02-04T20:00:05.123Z","type": "platform.runtimeDone","record": {"requestId":"6f7f0961f83442118a7af6fe80b88d56","status": "success"}}"#,
349            LambdaLogRecord::PlatformRuntimeDone {
350                request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(),
351                status: "success".to_string(),
352            },
353        ),
354    }
355
356    macro_rules! log_buffering_configuration_tests {
357        ($($name:ident: $value:expr,)*) => {
358            $(
359                #[test]
360                fn $name() {
361                    let (input, expected) = $value;
362                    let result = validate_buffering_configuration(input);
363
364                    if let Some(expected) = expected {
365                        assert!(result.is_err());
366                        assert_eq!(result.unwrap_err().to_string(), expected.to_string());
367                    } else {
368                        assert!(result.is_ok());
369                    }
370
371                }
372            )*
373        }
374    }
375
376    log_buffering_configuration_tests! {
377        log_buffer_configuration_none_success: (
378            None,
379            None::<ExtensionError>
380        ),
381        log_buffer_configuration_default_success: (
382            Some(LogBuffering::default()),
383            None::<ExtensionError>
384        ),
385        log_buffer_configuration_min_success: (
386            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS }),
387            None::<ExtensionError>
388        ),
389        log_buffer_configuration_max_success: (
390            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
391            None::<ExtensionError>
392        ),
393        min_timeout_ms_error: (
394            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS-1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
395            Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 24. Allowed values: Minumun: 25. Maximum: 30000"))
396        ),
397        max_timeout_ms_error: (
398            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS+1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
399            Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 30001. Allowed values: Minumun: 25. Maximum: 30000"))
400        ),
401        min_max_bytes_error: (
402            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES-1, max_items: LOG_BUFFERING_MAX_ITEMS }),
403            Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 262143. Allowed values: Minumun: 262144. Maximum: 1048576"))
404        ),
405        max_max_bytes_error: (
406            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES+1, max_items: LOG_BUFFERING_MAX_ITEMS }),
407            Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 1048577. Allowed values: Minumun: 262144. Maximum: 1048576"))
408        ),
409        min_max_items_error: (
410            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS-1 }),
411            Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 999. Allowed values: Minumun: 1000. Maximum: 10000"))
412        ),
413        max_max_items_error: (
414            Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS+1 }),
415            Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 10001. Allowed values: Minumun: 1000. Maximum: 10000"))
416        ),
417    }
418}