lamedh_runtime/
lib.rs

1#![deny(clippy::all, clippy::cargo)]
2#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
3
4//! The official Rust runtime for AWS Lambda.
5//!
6//! There are two mechanisms available for defining a Lambda function:
7//! 1. The `lambda` attribute macro, which generates the boilerplate to
8//!    launch and run a Lambda function.
9//!
10//!    The [`#[lambda]`] attribute _must_ be placed on an asynchronous main function.
11//!    However, as asynchronous main functions are not legal valid Rust
12//!    this means that the main function must also be decorated using a
13//!    [`#[tokio::main]`] attribute macro. This is available from
14//!    the [Tokio] crate.
15//!
16//! 2. A type that conforms to the [`Handler`] trait. This type can then be passed
17//!    to the the `lamedh_runtime::run` function, which launches and runs the Lambda runtime.
18//!
19//! An asynchronous function annotated with the `#[lambda]` attribute must
20//! accept an argument of type `A` which implements [`serde::Deserialize`], a [`lambda::Context`] and
21//! return a `Result<B, E>`, where `B` implements [`serde::Serializable`]. `E` is
22//! any type that implements `Into<Box<dyn std::error::Error + Send + Sync + 'static>>`.
23//!
24//! ```no_run
25//! use lamedh_runtime::{lambda, Context, Error};
26//! use serde_json::Value;
27//!
28//! #[lambda]
29//! #[tokio::main]
30//! async fn main(event: Value, _: Context) -> Result<Value, Error> {
31//!     Ok(event)
32//! }
33//! ```
34//!
35//! [`Handler`]: trait.Handler.html
36//! [`lambda::Context`]: struct.Context.html
37//! [`lambda`]: attr.lambda.html
38//! [`#[tokio::main]`]: https://docs.rs/tokio/0.2.21/tokio/attr.main.html
39//! [Tokio]: https://docs.rs/tokio/
40pub use crate::types::Context;
41use client::Client;
42use futures_core::stream::Stream;
43use futures_util::stream::StreamExt;
44pub use lamedh_attributes::lambda;
45use serde::{Deserialize, Serialize};
46use std::{
47    convert::{TryFrom, TryInto},
48    env, fmt,
49    future::Future,
50};
51use tracing::trace;
52
53mod client;
54mod requests;
55#[cfg(test)]
56mod simulated;
57/// Types available to a Lambda function.
58mod types;
59
60use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
61use types::Diagnostic;
62
63static DEFAULT_LOG_GROUP: &str = "/aws/lambda/Functions";
64static DEFAULT_LOG_STREAM: &str = "$LATEST";
65
66/// Error type that lambdas may result in
67pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
68
69/// Configuration derived from environment variables.
70#[derive(Debug, Default, Clone, PartialEq)]
71pub struct Config {
72    /// The host and port of the [runtime API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
73    pub endpoint: String,
74    /// The name of the function.
75    pub function_name: String,
76    /// The amount of memory available to the function in MB.
77    pub memory: i32,
78    /// The version of the function being executed.
79    pub version: String,
80    /// The name of the Amazon CloudWatch Logs stream for the function.
81    pub log_stream: String,
82    /// The name of the Amazon CloudWatch Logs group for the function.
83    pub log_group: String,
84}
85
86impl Config {
87    /// Attempts to read configuration from environment variables.
88    pub fn from_env() -> Result<Self, Error> {
89        let conf = Config {
90            endpoint: env::var("AWS_LAMBDA_RUNTIME_API")?,
91            function_name: env::var("AWS_LAMBDA_FUNCTION_NAME")?,
92            memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")?.parse::<i32>()?,
93            version: env::var("AWS_LAMBDA_FUNCTION_VERSION")?,
94            log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_else(|_e| DEFAULT_LOG_STREAM.to_owned()),
95            log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_else(|_e| DEFAULT_LOG_GROUP.to_owned()),
96        };
97        Ok(conf)
98    }
99}
100
101/// A trait describing an asynchronous function `A` to `B`.
102pub trait Handler<A, B> {
103    /// Errors returned by this handler.
104    type Error;
105    /// Response of this handler.
106    type Fut: Future<Output = Result<B, Self::Error>>;
107    /// Handle the incoming event.
108    fn call(&mut self, event: A, context: Context) -> Self::Fut;
109}
110
111/// Returns a new [`HandlerFn`] with the given closure.
112///
113/// [`HandlerFn`]: struct.HandlerFn.html
114pub fn handler_fn<F>(f: F) -> HandlerFn<F> {
115    HandlerFn { f }
116}
117
118/// A [`Handler`] implemented by a closure.
119///
120/// [`Handler`]: trait.Handler.html
121#[derive(Clone, Debug)]
122pub struct HandlerFn<F> {
123    f: F,
124}
125
126impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
127where
128    F: Fn(A, Context) -> Fut,
129    Fut: Future<Output = Result<B, Error>> + Send,
130    Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
131{
132    type Error = Error;
133    type Fut = Fut;
134    fn call(&mut self, req: A, ctx: Context) -> Self::Fut {
135        (self.f)(req, ctx)
136    }
137}
138
139/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
140/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
141///
142/// # Example
143/// ```no_run
144/// use lamedh_runtime::{handler_fn, Context, Error};
145/// use serde_json::Value;
146///
147/// #[tokio::main]
148/// async fn main() -> Result<(), Error> {
149///     let func = handler_fn(func);
150///     lamedh_runtime::run(func).await?;
151///     Ok(())
152/// }
153///
154/// async fn func(event: Value, _: Context) -> Result<Value, Error> {
155///     Ok(event)
156/// }
157/// ```
158pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
159where
160    F: Handler<A, B>,
161    <F as Handler<A, B>>::Error: fmt::Debug,
162    A: for<'de> Deserialize<'de>,
163    B: Serialize,
164{
165    trace!("Loading config from env");
166    let mut handler = handler;
167    let config = Config::from_env()?;
168    let uri = config.endpoint.try_into().expect("Unable to convert to URL");
169    let client = Client::with(uri, hyper::Client::new());
170    let incoming = incoming(&client);
171    run_inner(&client, incoming, &mut handler).await?;
172
173    Ok(())
174}
175
176/// Runs the lambda function almost entirely in-memory. This is meant for testing.
177pub async fn run_simulated<A, B, F>(handler: F, url: &str) -> Result<(), Error>
178where
179    F: Handler<A, B>,
180    <F as Handler<A, B>>::Error: fmt::Debug,
181    A: for<'de> Deserialize<'de>,
182    B: Serialize,
183{
184    let mut handler = handler;
185    let uri = url.try_into().expect("Unable to convert to URL");
186    let client = Client::with(uri, hyper::Client::new());
187    let incoming = incoming(&client).take(1);
188    run_inner(&client, incoming, &mut handler).await?;
189
190    Ok(())
191}
192
193fn incoming(client: &Client) -> impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + '_ {
194    async_stream::stream! {
195        loop {
196            let req = NextEventRequest.into_req().expect("Unable to construct request");
197            let res = client.call(req).await;
198            yield res;
199        }
200    }
201}
202
203async fn run_inner<A, B, F>(
204    client: &Client,
205    incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>>,
206    handler: &mut F,
207) -> Result<(), Error>
208where
209    F: Handler<A, B>,
210    <F as Handler<A, B>>::Error: fmt::Debug,
211    A: for<'de> Deserialize<'de>,
212    B: Serialize,
213{
214    tokio::pin!(incoming);
215
216    while let Some(event) = incoming.next().await {
217        let event = event?;
218        let (parts, body) = event.into_parts();
219
220        let mut ctx: Context = Context::try_from(parts.headers)?;
221        ctx.env_config = Config::from_env()?;
222        let body = hyper::body::to_bytes(body).await?;
223        let body = serde_json::from_slice(&body)?;
224
225        let request_id = &ctx.request_id.clone();
226        let f = handler.call(body, ctx);
227
228        let req = match f.await {
229            Ok(res) => EventCompletionRequest { request_id, body: res }.into_req()?,
230            Err(e) => EventErrorRequest {
231                request_id,
232                diagnostic: Diagnostic {
233                    error_message: format!("{:?}", e),
234                    error_type: type_name_of_val(e).to_owned(),
235                },
236            }
237            .into_req()?,
238        };
239        client.call(req).await?;
240    }
241
242    Ok(())
243}
244
245fn type_name_of_val<T>(_: T) -> &'static str {
246    std::any::type_name::<T>()
247}