#![deny(clippy::all, clippy::cargo)]
#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
pub use crate::types::Context;
use client::Client;
use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
pub use netlify_lambda_attributes::lambda;
use serde::{Deserialize, Serialize};
use std::{
convert::{TryFrom, TryInto},
env, fmt,
future::Future,
};
use tracing::trace;
mod client;
mod requests;
#[cfg(test)]
mod simulated;
mod types;
use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
use types::Diagnostic;
static DEFAULT_LOG_GROUP: &str = "/aws/lambda/Functions";
static DEFAULT_LOG_STREAM: &str = "$LATEST";
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Config {
pub endpoint: String,
pub function_name: String,
pub memory: i32,
pub version: String,
pub log_stream: String,
pub log_group: String,
}
impl Config {
pub fn from_env() -> Result<Self, Error> {
let conf = Config {
endpoint: env::var("AWS_LAMBDA_RUNTIME_API")?,
function_name: env::var("AWS_LAMBDA_FUNCTION_NAME")?,
memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")?.parse::<i32>()?,
version: env::var("AWS_LAMBDA_FUNCTION_VERSION")?,
log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_else(|_e| DEFAULT_LOG_STREAM.to_owned()),
log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_else(|_e| DEFAULT_LOG_GROUP.to_owned()),
};
Ok(conf)
}
}
pub trait Handler<A, B> {
type Error;
type Fut: Future<Output = Result<B, Self::Error>>;
fn call(&mut self, event: A, context: Context) -> Self::Fut;
}
pub fn handler_fn<F>(f: F) -> HandlerFn<F> {
HandlerFn { f }
}
#[derive(Clone, Debug)]
pub struct HandlerFn<F> {
f: F,
}
impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
where
F: Fn(A, Context) -> Fut,
Fut: Future<Output = Result<B, Error>> + Send,
Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
{
type Error = Error;
type Fut = Fut;
fn call(&mut self, req: A, ctx: Context) -> Self::Fut {
(self.f)(req, ctx)
}
}
pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
where
F: Handler<A, B>,
<F as Handler<A, B>>::Error: fmt::Debug,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
trace!("Loading config from env");
let mut handler = handler;
let config = Config::from_env()?;
let uri = config.endpoint.try_into().expect("Unable to convert to URL");
let client = Client::with(uri, hyper::Client::new());
let incoming = incoming(&client);
run_inner(&client, incoming, &mut handler).await?;
Ok(())
}
pub async fn run_simulated<A, B, F>(handler: F, url: &str) -> Result<(), Error>
where
F: Handler<A, B>,
<F as Handler<A, B>>::Error: fmt::Debug,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
let mut handler = handler;
let uri = url.try_into().expect("Unable to convert to URL");
let client = Client::with(uri, hyper::Client::new());
let incoming = incoming(&client).take(1);
run_inner(&client, incoming, &mut handler).await?;
Ok(())
}
fn incoming(client: &Client) -> impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + '_ {
async_stream::stream! {
loop {
let req = NextEventRequest.into_req().expect("Unable to construct request");
let res = client.call(req).await;
yield res;
}
}
}
async fn run_inner<A, B, F>(
client: &Client,
incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>>,
handler: &mut F,
) -> Result<(), Error>
where
F: Handler<A, B>,
<F as Handler<A, B>>::Error: fmt::Debug,
A: for<'de> Deserialize<'de>,
B: Serialize,
{
tokio::pin!(incoming);
while let Some(event) = incoming.next().await {
let event = event?;
let (parts, body) = event.into_parts();
let mut ctx: Context = Context::try_from(parts.headers)?;
ctx.env_config = Config::from_env()?;
let body = hyper::body::to_bytes(body).await?;
let body = serde_json::from_slice(&body)?;
let request_id = &ctx.request_id.clone();
let f = handler.call(body, ctx);
let req = match f.await {
Ok(res) => EventCompletionRequest { request_id, body: res }.into_req()?,
Err(e) => EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_message: format!("{:?}", e),
error_type: type_name_of_val(e).to_owned(),
},
}
.into_req()?,
};
client.call(req).await?;
}
Ok(())
}
fn type_name_of_val<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}