netlify_lambda/
lib.rs

1#![deny(clippy::all, clippy::cargo)]
2#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
3
4//! The official Rust runtime for AWS Lambda.
5//!
6//! There are two mechanisms available for defining a Lambda function:
7//! 1. The `lambda` attribute macro, which generates the boilerplate to
8//!    launch and run a Lambda function.
9//!
10//!    The [`#[lambda]`] attribute _must_ be placed on an asynchronous main function.
11//!    However, as asynchronous main functions are not legal valid Rust
12//!    this means that the main function must also be decorated using a
13//!    [`#[tokio::main]`] attribute macro. This is available from
14//!    the [Tokio] crate.
15//!
16//! 2. A type that conforms to the [`Handler`] trait. This type can then be passed
17//!    to the the `netlify_lambda::run` function, which launches and runs the Lambda runtime.
18//!
19//! An asynchronous function annotated with the `#[lambda]` attribute must
20//! accept an argument of type `A` which implements [`serde::Deserialize`], a [`lambda::Context`] and
21//! return a `Result<B, E>`, where `B` implements [`serde::Serializable`]. `E` is
22//! any type that implements `Into<Box<dyn std::error::Error + Send + Sync + 'static>>`.
23//!
24//! ```no_run
25//! use netlify_lambda::{lambda, Context};
26//! use serde_json::Value;
27//!
28//! type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
29//!
30//! #[lambda]
31//! #[tokio::main]
32//! async fn main(event: Value, _: Context) -> Result<Value, Error> {
33//!     Ok(event)
34//! }
35//! ```
36//!
37//! [`Handler`]: trait.Handler.html
38//! [`lambda::Context`]: struct.Context.html
39//! [`lambda`]: attr.lambda.html
40//! [`#[tokio::main]`]: https://docs.rs/tokio/0.2.21/tokio/attr.main.html
41//! [Tokio]: https://docs.rs/tokio/
42pub use crate::types::Context;
43use client::Client;
44use futures_core::stream::Stream;
45use futures_util::stream::StreamExt;
46pub use netlify_lambda_attributes::lambda;
47use serde::{Deserialize, Serialize};
48use std::{
49    convert::{TryFrom, TryInto},
50    env, fmt,
51    future::Future,
52};
53use tracing::trace;
54
55mod client;
56mod requests;
57#[cfg(test)]
58mod simulated;
59/// Types available to a Lambda function.
60mod types;
61
62use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
63use types::Diagnostic;
64
65static DEFAULT_LOG_GROUP: &str = "/aws/lambda/Functions";
66static DEFAULT_LOG_STREAM: &str = "$LATEST";
67
68/// Error type that lambdas may result in
69pub(crate) type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
70
71/// Configuration derived from environment variables.
72#[derive(Debug, Default, Clone, PartialEq)]
73pub struct Config {
74    /// The host and port of the [runtime API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
75    pub endpoint: String,
76    /// The name of the function.
77    pub function_name: String,
78    /// The amount of memory available to the function in MB.
79    pub memory: i32,
80    /// The version of the function being executed.
81    pub version: String,
82    /// The name of the Amazon CloudWatch Logs stream for the function.
83    pub log_stream: String,
84    /// The name of the Amazon CloudWatch Logs group for the function.
85    pub log_group: String,
86}
87
88impl Config {
89    /// Attempts to read configuration from environment variables.
90    pub fn from_env() -> Result<Self, Error> {
91        let conf = Config {
92            endpoint: env::var("AWS_LAMBDA_RUNTIME_API")?,
93            function_name: env::var("AWS_LAMBDA_FUNCTION_NAME")?,
94            memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")?.parse::<i32>()?,
95            version: env::var("AWS_LAMBDA_FUNCTION_VERSION")?,
96            log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_else(|_e| DEFAULT_LOG_STREAM.to_owned()),
97            log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_else(|_e| DEFAULT_LOG_GROUP.to_owned()),
98        };
99        Ok(conf)
100    }
101}
102
103/// A trait describing an asynchronous function `A` to `B`.
104pub trait Handler<A, B> {
105    /// Errors returned by this handler.
106    type Error;
107    /// Response of this handler.
108    type Fut: Future<Output = Result<B, Self::Error>>;
109    /// Handle the incoming event.
110    fn call(&mut self, event: A, context: Context) -> Self::Fut;
111}
112
113/// Returns a new [`HandlerFn`] with the given closure.
114///
115/// [`HandlerFn`]: struct.HandlerFn.html
116pub fn handler_fn<F>(f: F) -> HandlerFn<F> {
117    HandlerFn { f }
118}
119
120/// A [`Handler`] implemented by a closure.
121///
122/// [`Handler`]: trait.Handler.html
123#[derive(Clone, Debug)]
124pub struct HandlerFn<F> {
125    f: F,
126}
127
128impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
129where
130    F: Fn(A, Context) -> Fut,
131    Fut: Future<Output = Result<B, Error>> + Send,
132    Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
133{
134    type Error = Error;
135    type Fut = Fut;
136    fn call(&mut self, req: A, ctx: Context) -> Self::Fut {
137        (self.f)(req, ctx)
138    }
139}
140
141/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
142/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
143///
144/// # Example
145/// ```no_run
146/// use netlify_lambda::{handler_fn, Context};
147/// use serde_json::Value;
148///
149/// type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
150///
151/// #[tokio::main]
152/// async fn main() -> Result<(), Error> {
153///     let func = handler_fn(func);
154///     netlify_lambda::run(func).await?;
155///     Ok(())
156/// }
157///
158/// async fn func(event: Value, _: Context) -> Result<Value, Error> {
159///     Ok(event)
160/// }
161/// ```
162pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
163where
164    F: Handler<A, B>,
165    <F as Handler<A, B>>::Error: fmt::Debug,
166    A: for<'de> Deserialize<'de>,
167    B: Serialize,
168{
169    trace!("Loading config from env");
170    let mut handler = handler;
171    let config = Config::from_env()?;
172    let uri = config.endpoint.try_into().expect("Unable to convert to URL");
173    let client = Client::with(uri, hyper::Client::new());
174    let incoming = incoming(&client);
175    run_inner(&client, incoming, &mut handler).await?;
176
177    Ok(())
178}
179
180/// Runs the lambda function almost entirely in-memory. This is meant for testing.
181pub async fn run_simulated<A, B, F>(handler: F, url: &str) -> Result<(), Error>
182where
183    F: Handler<A, B>,
184    <F as Handler<A, B>>::Error: fmt::Debug,
185    A: for<'de> Deserialize<'de>,
186    B: Serialize,
187{
188    let mut handler = handler;
189    let uri = url.try_into().expect("Unable to convert to URL");
190    let client = Client::with(uri, hyper::Client::new());
191    let incoming = incoming(&client).take(1);
192    run_inner(&client, incoming, &mut handler).await?;
193
194    Ok(())
195}
196
197fn incoming(client: &Client) -> impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + '_ {
198    async_stream::stream! {
199        loop {
200            let req = NextEventRequest.into_req().expect("Unable to construct request");
201            let res = client.call(req).await;
202            yield res;
203        }
204    }
205}
206
207async fn run_inner<A, B, F>(
208    client: &Client,
209    incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>>,
210    handler: &mut F,
211) -> Result<(), Error>
212where
213    F: Handler<A, B>,
214    <F as Handler<A, B>>::Error: fmt::Debug,
215    A: for<'de> Deserialize<'de>,
216    B: Serialize,
217{
218    tokio::pin!(incoming);
219
220    while let Some(event) = incoming.next().await {
221        let event = event?;
222        let (parts, body) = event.into_parts();
223
224        let mut ctx: Context = Context::try_from(parts.headers)?;
225        ctx.env_config = Config::from_env()?;
226        let body = hyper::body::to_bytes(body).await?;
227        let body = serde_json::from_slice(&body)?;
228
229        let request_id = &ctx.request_id.clone();
230        let f = handler.call(body, ctx);
231
232        let req = match f.await {
233            Ok(res) => EventCompletionRequest { request_id, body: res }.into_req()?,
234            Err(e) => EventErrorRequest {
235                request_id,
236                diagnostic: Diagnostic {
237                    error_message: format!("{:?}", e),
238                    error_type: type_name_of_val(e).to_owned(),
239                },
240            }
241            .into_req()?,
242        };
243        client.call(req).await?;
244    }
245
246    Ok(())
247}
248
249fn type_name_of_val<T>(_: T) -> &'static str {
250    std::any::type_name::<T>()
251}