lambda_runtime_types/lib.rs
1//! This crate provides types and traits to simplify
2//! the creation of lambda functions in rust. It
3//! provides Event and Return types and specific
4//! Runners for various lambda types.
5//!
6//! # Basic Lambda with no shared data
7//!
8//! Creating a normal lambda is very easy. First create a type which implements [`Runner`] and
9//! then use it either in the [`exec`] or [`exec_tokio`] function:
10//!
11//! ```no_run
12//! struct Runner;
13//!
14//! #[async_trait::async_trait]
15//! impl<'a> lambda_runtime_types::Runner<'a, (), (), ()> for Runner {
16//! async fn run(shared: &'a (), event: lambda_runtime_types::LambdaEvent<'a, ()>) -> anyhow::Result<()> {
17//! // Run code on every invocation
18//! Ok(())
19//! }
20//!
21//! async fn setup(_region: &'a str) -> anyhow::Result<()> {
22//! // Setup logging to make sure that errors are printed
23//! Ok(())
24//! }
25//! }
26//!
27//! pub fn main() -> anyhow::Result<()> {
28//! lambda_runtime_types::exec_tokio::<_, _, Runner, _>()
29//! }
30//! ```
31//!
32//! # Available lambda types
33//!
34//! There are various modules which predefined Event and Return types and Runner traits
35//! specialised for differnet lambda usages. Check out the modules for examples or their
36//! usage.
37//!
38//! * [`rotate`]
39//!
40//! # Custom Event and Return types
41//!
42//! If the predefined types are not enough, custom types can be used as long as types for
43//! events implement [`serde::Deserialize`] and return types implement [`serde::Serialize`].
44//!
45//! ```no_run
46//! #[derive(serde::Deserialize, Debug)]
47//! struct Event {
48//! #[serde(flatten)]
49//! attributes: std::collections::HashMap<String, serde_json::Value>,
50//! }
51//!
52//! #[derive(serde::Serialize, Debug)]
53//! struct Return {
54//! data: std::borrow::Cow<'static, str>,
55//! }
56//!
57//! struct Runner;
58//!
59//! #[async_trait::async_trait]
60//! impl<'a> lambda_runtime_types::Runner<'a, (), Event, Return> for Runner {
61//! async fn run(shared: &'a (), event: lambda_runtime_types::LambdaEvent<'a, Event>) -> anyhow::Result<Return> {
62//! println!("{:?}", event);
63//! Ok(Return {
64//! data: event
65//! .event
66//! .attributes
67//! .get("test")
68//! .and_then(|a| a.as_str())
69//! .map(ToOwned::to_owned)
70//! .map(Into::into)
71//! .unwrap_or_else(|| "none".into()),
72//! })
73//! }
74//!
75//! async fn setup(_region: &'a str) -> anyhow::Result<()> {
76//! // Setup logging to make sure that errors are printed
77//! Ok(())
78//! }
79//! }
80//!
81//! pub fn main() -> anyhow::Result<()> {
82//! lambda_runtime_types::exec_tokio::<_, _, Runner, _>()
83//! }
84//! ```
85//!
86//! # Shared Data
87//!
88//! With AWS Lambda, its possible to share data between invocations, as long as both
89//! invocations use the same runtime environment. To use this functinality, its possible
90//! to define a shared data type which will persist data by using Interior Mutability:
91//!
92//! ```no_run
93//! #[derive(Default)]
94//! struct Shared {
95//! invocations: tokio::sync::Mutex<u64>,
96//! }
97//!
98//! struct Runner;
99//!
100//! #[async_trait::async_trait]
101//! impl<'a> lambda_runtime_types::Runner<'a, Shared, (), ()> for Runner {
102//! async fn run(shared: &'a Shared, event: lambda_runtime_types::LambdaEvent<'a, ()>) -> anyhow::Result<()> {
103//! let mut invocations = shared.invocations.lock().await;
104//! *invocations += 1;
105//! Ok(())
106//! }
107//!
108//! async fn setup(_region: &'a str) -> anyhow::Result<Shared> {
109//! // Setup logging to make sure that errors are printed
110//! Ok(Shared::default())
111//! }
112//! }
113//!
114//! pub fn main() -> anyhow::Result<()> {
115//! lambda_runtime_types::exec_tokio::<_, _, Runner, _>()
116//! }
117//! ```
118//!
119//! Its important to know, that lambda execution evironments never run multiple invocations
120//! simultaneously. Its therefore possible to keep the mutex unlocked for the whole invocation
121//! as it will never block other invocations. Instead it is even recommended to do so, to
122//! make sure that there are no unnessary things slowing down lambda execution time.
123//!
124//! # Timeout handling
125//!
126//! This crate implements a timeout handling logic. Normally, if a lambda runs into a timeout,
127//! it will not create an error, which then does not get propagated by `on_error` destinations.
128//!
129//! To fix that, a timeout handler is setup, which will "fail" 100 miliseconds before the lambda
130//! would run into a timeout, creating an error which then is propagated. There is, however, no
131//! gurantee that this handler will fail in time. It will only work, when there are multiple
132//! tokio threads or when the main lambda code is currently awaiting, giving tokio the chance
133//! to switch tasks (or run them in parallel) and fail the execution.
134//!
135//! # Memory exhaustion
136//!
137//! Another thing to consider when running lambdas is memory exhaustion. Unfortunatly it is not
138//! possible in rust to check the current memory usage. Therefore it is also not possible to
139//! fail before running into OOF. When running lambdas, it may be necessary to setup checks to
140//! verify that a lambda completed successfully, and did not run into OOF, as these errors also
141//! do not get propagated to `on_error` destinations.
142//!
143
144#![deny(clippy::all, clippy::nursery)]
145#![deny(nonstandard_style, rust_2018_idioms, unused_crate_dependencies)]
146#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
147
148#[cfg(feature = "_rotate")]
149#[cfg_attr(
150 docsrs,
151 doc(cfg(any(feature = "rotate_rusoto", feature = "rotate_aws_sdk")))
152)]
153pub mod rotate;
154
155#[cfg(test)]
156use native_tls as _;
157#[cfg(test)]
158use postgres_native_tls as _;
159#[cfg(test)]
160use simple_logger as _;
161#[cfg(test)]
162use tokio_postgres as _;
163
164pub use lambda_runtime::{Config, Context};
165
166/// Types which contains all the Information relevant for
167/// the current invocation
168#[non_exhaustive]
169#[derive(Debug)]
170pub struct LambdaEvent<'a, Event> {
171 /// The expected Event which is being send
172 /// to the lambda by AWS.
173 pub event: Event,
174 /// Region the lambda is running in
175 pub region: &'a str,
176 /// Lambda Invocation Context
177 pub ctx: Context,
178}
179
180/// Defines a type which is executed every time a lambda
181/// is invoced.
182///
183/// Types:
184/// * `Shared`: Type which is shared between lambda
185/// invocations. Note that lambda will
186/// create multiple environments for
187/// simulations invokations and environments
188/// are only kept alive for a certain time.
189/// It is thus not guaranteed that data
190/// can be reused, but with this types
191/// its possible.
192/// * `Event`: The expected Event which is being send
193/// to the lambda by AWS.
194/// * `Return`: Type which is the result of the lamba
195/// invocation being returned to AWS
196#[async_trait::async_trait]
197pub trait Runner<'a, Shared, Event, Return>
198where
199 Shared: Send + Sync + 'a,
200 Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug,
201 Return: serde::Serialize,
202{
203 /// Invoked only once before lambda runtime start. Does not get called on each
204 /// lambda invocation. Can be used to setup logging and other global services,
205 /// but should be short as it delays lambda startup
206 async fn setup(region: &'a str) -> anyhow::Result<Shared>;
207
208 /// Invoked for every lambda invocation. Data in `shared` is persisted between
209 /// invocations as long as they are running in the same `execution environment`
210 ///
211 /// More Info: <https://docs.aws.amazon.com/lambda/latest/dg/runtimes-context.html>
212 async fn run(shared: &'a Shared, event: LambdaEvent<'a, Event>) -> anyhow::Result<Return>;
213}
214
215/// Lambda entrypoint. This function sets up a lambda
216/// multi-thread runtimes and executes [`exec`]. If you
217/// already have your own runtime, use the [`exec`]
218/// function.
219///
220/// Types:
221/// * `Shared`: Type which is shared between lambda
222/// invocations. Note that lambda will
223/// create multiple environments for
224/// simulations invokations and environments
225/// are only kept alive for a certain time.
226/// It is thus not guaranteed that data
227/// can be reused, but with this types
228/// its possible.
229/// * `Event`: The expected Event which is being send
230/// to the lambda by AWS.
231/// * `Run`: Runner which is execued for each lambda
232/// invocation.
233/// * `Return`: Type which is the result of the lamba
234/// invocation being returned to AWS
235pub fn exec_tokio<Shared, Event, Run, Return>() -> anyhow::Result<()>
236where
237 Shared: Send + Sync,
238 Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
239 Run: for<'a> Runner<'a, Shared, Event, Return>,
240 Return: serde::Serialize,
241{
242 use anyhow::Context;
243 use tokio::runtime::Builder;
244
245 Builder::new_multi_thread()
246 .enable_all()
247 .build()
248 .context("Unable to build tokio runtime")?
249 .block_on(exec::<Shared, Event, Run, Return>())
250}
251
252/// Lambda entrypoint. This function requires a
253/// running tokio runtime. Alternativly use [`exec_tokio`]
254/// which creates one.
255///
256/// Types:
257/// * `Shared`: Type which is shared between lambda
258/// invocations. Note that lambda will
259/// create multiple environments for
260/// simulations invokations and environments
261/// are only kept alive for a certain time.
262/// It is thus not guaranteed that data
263/// can be reused, but with this types
264/// its possible.
265/// * `Event`: The expected Event which is being send
266/// to the lambda by AWS.
267/// * `Run`: Runner which is execued for each lambda
268/// invocation.
269/// * `Return`: Type which is the result of the lamba
270/// invocation being returned to AWS
271pub async fn exec<Shared, Event, Run, Return>() -> anyhow::Result<()>
272where
273 Shared: Send + Sync,
274 Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
275 Run: for<'a> Runner<'a, Shared, Event, Return>,
276 Return: serde::Serialize,
277{
278 use anyhow::{anyhow, Context};
279 use lambda_runtime::{service_fn, LambdaEvent};
280 use std::env;
281
282 log::info!("Starting lambda runtime");
283 let region = env::var("AWS_REGION").context("Missing AWS_REGION env variable")?;
284 let region_ref = ®ion;
285 let shared = Run::setup(region_ref).await?;
286 let shared_ref = &shared;
287 lambda_runtime::run(service_fn(move |data: LambdaEvent<Event>| {
288 log::info!("Received lambda invocation with event: {:?}", data.payload);
289 let deadline: u64 = data.context.deadline;
290 run::<_, Event, Run, Return>(shared_ref, data, Some(deadline), region_ref)
291 }))
292 .await
293 .map_err(|e| anyhow!(e))
294}
295
296#[allow(clippy::unit_arg)]
297async fn run<'a, Shared, Event, Run, Return>(
298 shared: &'a Shared,
299 event: lambda_runtime::LambdaEvent<Event>,
300 deadline_in_ms: Option<u64>,
301 region: &'a str,
302) -> anyhow::Result<Return>
303where
304 Shared: Send + Sync,
305 Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
306 Run: Runner<'a, Shared, Event, Return>,
307 Return: serde::Serialize,
308{
309 use anyhow::anyhow;
310 use futures::FutureExt;
311
312 let mut runner = Run::run(
313 shared,
314 LambdaEvent {
315 event: event.payload,
316 region,
317 ctx: event.context,
318 },
319 )
320 .fuse();
321 let res = if let Some(deadline_in_ms) = deadline_in_ms {
322 let mut timeout = Box::pin(timeout_handler(deadline_in_ms).fuse());
323 futures::select! {
324 res = runner => res,
325 _ = timeout => Err(anyhow!("Lambda failed by running into a timeout")),
326 }
327 } else {
328 runner.await
329 };
330 log::info!("Completed lambda invocation");
331 match res {
332 Ok(res) => Ok(res),
333 Err(err) => {
334 log::error!("{:?}", err);
335 Err(err)
336 }
337 }
338}
339
340async fn timeout_handler(deadline_in_ms: u64) {
341 use std::time::{Duration, SystemTime, UNIX_EPOCH};
342 use tokio::time::Instant;
343
344 let epoch = UNIX_EPOCH;
345 let now = SystemTime::now();
346 let now_instant = Instant::now();
347
348 let duration_from_now = now.duration_since(epoch).expect("Time went backwards");
349 let duration_from_epoch = Duration::from_millis(deadline_in_ms);
350 let duration_deadline = duration_from_epoch - duration_from_now - Duration::from_millis(100);
351
352 let deadline = now_instant + duration_deadline;
353 log::info!("Setting deadline to: {:?}", deadline);
354 tokio::time::sleep_until(deadline).await;
355}
356
357/// TestData which can be used to test lambda invocations
358/// locally in combination with [`exec_test`].
359#[derive(serde::Deserialize, Clone, Debug)]
360#[cfg(feature = "test")]
361#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
362pub struct TestData<Event> {
363 region: String,
364 invocations: Vec<Event>,
365}
366
367/// Lambda entrypoint. This function can be used to
368/// test one or multiple lambda invocations locally.
369///
370/// Types:
371/// * `Shared`: Type which is shared between lambda
372/// invocations. Note that lambda will
373/// create multiple environments for
374/// simulations invokations and environments
375/// are only kept alive for a certain time.
376/// It is thus not guaranteed that data
377/// can be reused, but with this types
378/// its possible.
379/// * `Event`: The expected Event which is being send
380/// to the lambda by AWS.
381/// * `Run`: Runner which is execued for each lambda
382/// invocation.
383/// * `Return`: Type which is the result of the lamba
384/// invocation being returned to AWS
385#[cfg(feature = "test")]
386#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
387pub fn exec_test<Shared, Event, Run, Return>(test_data: &str) -> anyhow::Result<()>
388where
389 Shared: Send + Sync,
390 Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
391 Run: for<'a> Runner<'a, Shared, Event, Return>,
392 Return: serde::Serialize + std::fmt::Debug,
393{
394 use anyhow::Context;
395 use tokio::runtime::Builder;
396
397 Builder::new_multi_thread()
398 .enable_all()
399 .build()
400 .context("Unable to build tokio runtime")?
401 .block_on(async {
402 log::info!("Starting lambda test runtime");
403 let test_data: TestData<Event> =
404 serde_json::from_str(test_data).context("Unable to deserialize test_data")?;
405 let region_ref = &test_data.region;
406 let shared = Run::setup(region_ref).await?;
407 let shared_ref = &shared;
408
409 for (i, data) in test_data.invocations.into_iter().enumerate() {
410 log::info!("Starting lambda invocation: {}", i);
411 let res = run::<_, Event, Run, Return>(
412 shared_ref,
413 lambda_runtime::LambdaEvent {
414 payload: data,
415 context: crate::Context::default(),
416 },
417 None,
418 region_ref,
419 )
420 .await?;
421 log::info!("{:?}", res);
422 }
423 Ok(())
424 })
425}