lambda_runtime_types/
lib.rs

1//! This crate provides types and traits to simplify
2//! the creation of lambda functions in rust. It
3//! provides Event and Return types and specific
4//! Runners for various lambda types.
5//!
6//! # Basic Lambda with no shared data
7//!
8//! Creating a normal lambda is very easy. First create a type which implements [`Runner`] and
9//! then use it either in the [`exec`] or [`exec_tokio`] function:
10//!
11//! ```no_run
12//! struct Runner;
13//!
14//! #[async_trait::async_trait]
15//! impl<'a> lambda_runtime_types::Runner<'a, (), (), ()> for Runner {
16//!     async fn run(shared: &'a (), event: lambda_runtime_types::LambdaEvent<'a, ()>) -> anyhow::Result<()> {
17//!         // Run code on every invocation
18//!         Ok(())
19//!     }
20//!
21//!     async fn setup(_region: &'a str) -> anyhow::Result<()> {
22//!         // Setup logging to make sure that errors are printed
23//!         Ok(())
24//!     }
25//! }
26//!
27//! pub fn main() -> anyhow::Result<()> {
28//!     lambda_runtime_types::exec_tokio::<_, _, Runner, _>()
29//! }
30//! ```
31//!
32//! # Available lambda types
33//!
34//! There are various modules which predefined Event and Return types and Runner traits
35//! specialised for differnet lambda usages. Check out the modules for examples or their
36//! usage.
37//!
38//! * [`rotate`]
39//!
40//! # Custom Event and Return types
41//!
42//! If the predefined types are not enough, custom types can be used as long as types for
43//! events implement [`serde::Deserialize`] and return types implement [`serde::Serialize`].
44//!
45//! ```no_run
46//! #[derive(serde::Deserialize, Debug)]
47//! struct Event {
48//!     #[serde(flatten)]
49//!     attributes: std::collections::HashMap<String, serde_json::Value>,
50//! }
51//!
52//! #[derive(serde::Serialize, Debug)]
53//! struct Return {
54//!     data: std::borrow::Cow<'static, str>,
55//! }
56//!
57//! struct Runner;
58//!
59//! #[async_trait::async_trait]
60//! impl<'a> lambda_runtime_types::Runner<'a, (), Event, Return> for Runner {
61//!     async fn run(shared: &'a (), event: lambda_runtime_types::LambdaEvent<'a, Event>) -> anyhow::Result<Return> {
62//!         println!("{:?}", event);
63//!         Ok(Return {
64//!             data: event
65//!                 .event
66//!                 .attributes
67//!                 .get("test")
68//!                 .and_then(|a| a.as_str())
69//!                 .map(ToOwned::to_owned)
70//!                 .map(Into::into)
71//!                 .unwrap_or_else(|| "none".into()),
72//!         })
73//!     }
74//!
75//!     async fn setup(_region: &'a str) -> anyhow::Result<()> {
76//!         // Setup logging to make sure that errors are printed
77//!         Ok(())
78//!     }
79//! }
80//!
81//! pub fn main() -> anyhow::Result<()> {
82//!     lambda_runtime_types::exec_tokio::<_, _, Runner, _>()
83//! }
84//! ```
85//!
86//! # Shared Data
87//!
88//! With AWS Lambda, its possible to share data between invocations, as long as both
89//! invocations use the same runtime environment. To use this functinality, its possible
90//! to define a shared data type which will persist data by using Interior Mutability:
91//!
92//! ```no_run
93//! #[derive(Default)]
94//! struct Shared  {
95//!     invocations: tokio::sync::Mutex<u64>,
96//! }
97//!
98//! struct Runner;
99//!
100//! #[async_trait::async_trait]
101//! impl<'a> lambda_runtime_types::Runner<'a, Shared, (), ()> for Runner {
102//!     async fn run(shared: &'a Shared, event: lambda_runtime_types::LambdaEvent<'a, ()>) -> anyhow::Result<()> {
103//!         let mut invocations = shared.invocations.lock().await;
104//!         *invocations += 1;
105//!         Ok(())
106//!     }
107//!
108//!     async fn setup(_region: &'a str) -> anyhow::Result<Shared> {
109//!         // Setup logging to make sure that errors are printed
110//!         Ok(Shared::default())
111//!     }
112//! }
113//!
114//! pub fn main() -> anyhow::Result<()> {
115//!     lambda_runtime_types::exec_tokio::<_, _, Runner, _>()
116//! }
117//! ```
118//!
119//! Its important to know, that lambda execution evironments never run multiple invocations
120//! simultaneously. Its therefore possible to keep the mutex unlocked for the whole invocation
121//! as it will never block other invocations. Instead it is even recommended to do so, to
122//! make sure that there are no unnessary things slowing down lambda execution time.
123//!
124//! # Timeout handling
125//!
126//! This crate implements a timeout handling logic. Normally, if a lambda runs into a timeout,
127//! it will not create an error, which then does not get propagated by `on_error` destinations.
128//!
129//! To fix that, a timeout handler is setup, which will "fail" 100 miliseconds before the lambda
130//! would run into a timeout, creating an error which then is propagated. There is, however, no
131//! gurantee that this handler will fail in time. It will only work, when there are multiple
132//! tokio threads or when the main lambda code is currently awaiting, giving tokio the chance
133//! to switch tasks (or run them in parallel) and fail the execution.
134//!
135//! # Memory exhaustion
136//!
137//! Another thing to consider when running lambdas is memory exhaustion. Unfortunatly it is not
138//! possible in rust to check the current memory usage. Therefore it is also not possible to
139//! fail before running into OOF. When running lambdas, it may be necessary to setup checks to
140//! verify that a lambda completed successfully, and did not run into OOF, as these errors also
141//! do not get propagated to `on_error` destinations.
142//!
143
144#![deny(clippy::all, clippy::nursery)]
145#![deny(nonstandard_style, rust_2018_idioms, unused_crate_dependencies)]
146#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
147
148#[cfg(feature = "_rotate")]
149#[cfg_attr(
150    docsrs,
151    doc(cfg(any(feature = "rotate_rusoto", feature = "rotate_aws_sdk")))
152)]
153pub mod rotate;
154
155#[cfg(test)]
156use native_tls as _;
157#[cfg(test)]
158use postgres_native_tls as _;
159#[cfg(test)]
160use simple_logger as _;
161#[cfg(test)]
162use tokio_postgres as _;
163
164pub use lambda_runtime::{Config, Context};
165
166/// Types which contains all the Information relevant for
167/// the current invocation
168#[non_exhaustive]
169#[derive(Debug)]
170pub struct LambdaEvent<'a, Event> {
171    /// The expected Event which is being send
172    /// to the lambda by AWS.
173    pub event: Event,
174    /// Region the lambda is running in
175    pub region: &'a str,
176    /// Lambda Invocation Context
177    pub ctx: Context,
178}
179
180/// Defines a type which is executed every time a lambda
181/// is invoced.
182///
183/// Types:
184/// * `Shared`: Type which is shared between lambda
185///             invocations. Note that lambda will
186///             create multiple environments for
187///             simulations invokations and environments
188///             are only kept alive for a certain time.
189///             It is thus not guaranteed that data
190///             can be reused, but with this types
191///             its possible.
192/// * `Event`:  The expected Event which is being send
193///             to the lambda by AWS.
194/// * `Return`: Type which is the result of the lamba
195///             invocation being returned to AWS
196#[async_trait::async_trait]
197pub trait Runner<'a, Shared, Event, Return>
198where
199    Shared: Send + Sync + 'a,
200    Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug,
201    Return: serde::Serialize,
202{
203    /// Invoked only once before lambda runtime start. Does not get called on each
204    /// lambda invocation. Can be used to setup logging and other global services,
205    /// but should be short as it delays lambda startup
206    async fn setup(region: &'a str) -> anyhow::Result<Shared>;
207
208    /// Invoked for every lambda invocation. Data in `shared` is persisted between
209    /// invocations as long as they are running in the same `execution environment`
210    ///
211    /// More Info: <https://docs.aws.amazon.com/lambda/latest/dg/runtimes-context.html>
212    async fn run(shared: &'a Shared, event: LambdaEvent<'a, Event>) -> anyhow::Result<Return>;
213}
214
215/// Lambda entrypoint. This function sets up a lambda
216/// multi-thread runtimes and executes [`exec`]. If you
217/// already have your own runtime, use the [`exec`]
218/// function.
219///
220/// Types:
221/// * `Shared`: Type which is shared between lambda
222///             invocations. Note that lambda will
223///             create multiple environments for
224///             simulations invokations and environments
225///             are only kept alive for a certain time.
226///             It is thus not guaranteed that data
227///             can be reused, but with this types
228///             its possible.
229/// * `Event`:  The expected Event which is being send
230///             to the lambda by AWS.
231/// * `Run`:    Runner which is execued for each lambda
232///             invocation.
233/// * `Return`: Type which is the result of the lamba
234///             invocation being returned to AWS
235pub fn exec_tokio<Shared, Event, Run, Return>() -> anyhow::Result<()>
236where
237    Shared: Send + Sync,
238    Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
239    Run: for<'a> Runner<'a, Shared, Event, Return>,
240    Return: serde::Serialize,
241{
242    use anyhow::Context;
243    use tokio::runtime::Builder;
244
245    Builder::new_multi_thread()
246        .enable_all()
247        .build()
248        .context("Unable to build tokio runtime")?
249        .block_on(exec::<Shared, Event, Run, Return>())
250}
251
252/// Lambda entrypoint. This function requires a
253/// running tokio runtime. Alternativly use [`exec_tokio`]
254/// which creates one.
255///
256/// Types:
257/// * `Shared`: Type which is shared between lambda
258///             invocations. Note that lambda will
259///             create multiple environments for
260///             simulations invokations and environments
261///             are only kept alive for a certain time.
262///             It is thus not guaranteed that data
263///             can be reused, but with this types
264///             its possible.
265/// * `Event`:  The expected Event which is being send
266///             to the lambda by AWS.
267/// * `Run`:    Runner which is execued for each lambda
268///             invocation.
269/// * `Return`: Type which is the result of the lamba
270///             invocation being returned to AWS
271pub async fn exec<Shared, Event, Run, Return>() -> anyhow::Result<()>
272where
273    Shared: Send + Sync,
274    Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
275    Run: for<'a> Runner<'a, Shared, Event, Return>,
276    Return: serde::Serialize,
277{
278    use anyhow::{anyhow, Context};
279    use lambda_runtime::{service_fn, LambdaEvent};
280    use std::env;
281
282    log::info!("Starting lambda runtime");
283    let region = env::var("AWS_REGION").context("Missing AWS_REGION env variable")?;
284    let region_ref = &region;
285    let shared = Run::setup(region_ref).await?;
286    let shared_ref = &shared;
287    lambda_runtime::run(service_fn(move |data: LambdaEvent<Event>| {
288        log::info!("Received lambda invocation with event: {:?}", data.payload);
289        let deadline: u64 = data.context.deadline;
290        run::<_, Event, Run, Return>(shared_ref, data, Some(deadline), region_ref)
291    }))
292    .await
293    .map_err(|e| anyhow!(e))
294}
295
296#[allow(clippy::unit_arg)]
297async fn run<'a, Shared, Event, Run, Return>(
298    shared: &'a Shared,
299    event: lambda_runtime::LambdaEvent<Event>,
300    deadline_in_ms: Option<u64>,
301    region: &'a str,
302) -> anyhow::Result<Return>
303where
304    Shared: Send + Sync,
305    Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
306    Run: Runner<'a, Shared, Event, Return>,
307    Return: serde::Serialize,
308{
309    use anyhow::anyhow;
310    use futures::FutureExt;
311
312    let mut runner = Run::run(
313        shared,
314        LambdaEvent {
315            event: event.payload,
316            region,
317            ctx: event.context,
318        },
319    )
320    .fuse();
321    let res = if let Some(deadline_in_ms) = deadline_in_ms {
322        let mut timeout = Box::pin(timeout_handler(deadline_in_ms).fuse());
323        futures::select! {
324            res = runner => res,
325            _ = timeout => Err(anyhow!("Lambda failed by running into a timeout")),
326        }
327    } else {
328        runner.await
329    };
330    log::info!("Completed lambda invocation");
331    match res {
332        Ok(res) => Ok(res),
333        Err(err) => {
334            log::error!("{:?}", err);
335            Err(err)
336        }
337    }
338}
339
340async fn timeout_handler(deadline_in_ms: u64) {
341    use std::time::{Duration, SystemTime, UNIX_EPOCH};
342    use tokio::time::Instant;
343
344    let epoch = UNIX_EPOCH;
345    let now = SystemTime::now();
346    let now_instant = Instant::now();
347
348    let duration_from_now = now.duration_since(epoch).expect("Time went backwards");
349    let duration_from_epoch = Duration::from_millis(deadline_in_ms);
350    let duration_deadline = duration_from_epoch - duration_from_now - Duration::from_millis(100);
351
352    let deadline = now_instant + duration_deadline;
353    log::info!("Setting deadline to: {:?}", deadline);
354    tokio::time::sleep_until(deadline).await;
355}
356
357/// TestData which can be used to test lambda invocations
358/// locally in combination with [`exec_test`].
359#[derive(serde::Deserialize, Clone, Debug)]
360#[cfg(feature = "test")]
361#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
362pub struct TestData<Event> {
363    region: String,
364    invocations: Vec<Event>,
365}
366
367/// Lambda entrypoint. This function can be used to
368/// test one or multiple lambda invocations locally.
369///
370/// Types:
371/// * `Shared`: Type which is shared between lambda
372///             invocations. Note that lambda will
373///             create multiple environments for
374///             simulations invokations and environments
375///             are only kept alive for a certain time.
376///             It is thus not guaranteed that data
377///             can be reused, but with this types
378///             its possible.
379/// * `Event`:  The expected Event which is being send
380///             to the lambda by AWS.
381/// * `Run`:    Runner which is execued for each lambda
382///             invocation.
383/// * `Return`: Type which is the result of the lamba
384///             invocation being returned to AWS
385#[cfg(feature = "test")]
386#[cfg_attr(docsrs, doc(cfg(feature = "test")))]
387pub fn exec_test<Shared, Event, Run, Return>(test_data: &str) -> anyhow::Result<()>
388where
389    Shared: Send + Sync,
390    Event: for<'de> serde::Deserialize<'de> + std::fmt::Debug + Send,
391    Run: for<'a> Runner<'a, Shared, Event, Return>,
392    Return: serde::Serialize + std::fmt::Debug,
393{
394    use anyhow::Context;
395    use tokio::runtime::Builder;
396
397    Builder::new_multi_thread()
398        .enable_all()
399        .build()
400        .context("Unable to build tokio runtime")?
401        .block_on(async {
402            log::info!("Starting lambda test runtime");
403            let test_data: TestData<Event> =
404                serde_json::from_str(test_data).context("Unable to deserialize test_data")?;
405            let region_ref = &test_data.region;
406            let shared = Run::setup(region_ref).await?;
407            let shared_ref = &shared;
408
409            for (i, data) in test_data.invocations.into_iter().enumerate() {
410                log::info!("Starting lambda invocation: {}", i);
411                let res = run::<_, Event, Run, Return>(
412                    shared_ref,
413                    lambda_runtime::LambdaEvent {
414                        payload: data,
415                        context: crate::Context::default(),
416                    },
417                    None,
418                    region_ref,
419                )
420                .await?;
421                log::info!("{:?}", res);
422            }
423            Ok(())
424        })
425}