1#![deny(clippy::all, clippy::cargo)]
2#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
3
4pub use crate::types::Context;
43use client::Client;
44use futures_core::stream::Stream;
45use futures_util::stream::StreamExt;
46pub use netlify_lambda_attributes::lambda;
47use serde::{Deserialize, Serialize};
48use std::{
49 convert::{TryFrom, TryInto},
50 env, fmt,
51 future::Future,
52};
53use tracing::trace;
54
55mod client;
56mod requests;
57#[cfg(test)]
58mod simulated;
59mod types;
61
62use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
63use types::Diagnostic;
64
65static DEFAULT_LOG_GROUP: &str = "/aws/lambda/Functions";
66static DEFAULT_LOG_STREAM: &str = "$LATEST";
67
68pub(crate) type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
70
71#[derive(Debug, Default, Clone, PartialEq)]
73pub struct Config {
74 pub endpoint: String,
76 pub function_name: String,
78 pub memory: i32,
80 pub version: String,
82 pub log_stream: String,
84 pub log_group: String,
86}
87
88impl Config {
89 pub fn from_env() -> Result<Self, Error> {
91 let conf = Config {
92 endpoint: env::var("AWS_LAMBDA_RUNTIME_API")?,
93 function_name: env::var("AWS_LAMBDA_FUNCTION_NAME")?,
94 memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")?.parse::<i32>()?,
95 version: env::var("AWS_LAMBDA_FUNCTION_VERSION")?,
96 log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_else(|_e| DEFAULT_LOG_STREAM.to_owned()),
97 log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_else(|_e| DEFAULT_LOG_GROUP.to_owned()),
98 };
99 Ok(conf)
100 }
101}
102
103pub trait Handler<A, B> {
105 type Error;
107 type Fut: Future<Output = Result<B, Self::Error>>;
109 fn call(&mut self, event: A, context: Context) -> Self::Fut;
111}
112
113pub fn handler_fn<F>(f: F) -> HandlerFn<F> {
117 HandlerFn { f }
118}
119
120#[derive(Clone, Debug)]
124pub struct HandlerFn<F> {
125 f: F,
126}
127
128impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
129where
130 F: Fn(A, Context) -> Fut,
131 Fut: Future<Output = Result<B, Error>> + Send,
132 Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
133{
134 type Error = Error;
135 type Fut = Fut;
136 fn call(&mut self, req: A, ctx: Context) -> Self::Fut {
137 (self.f)(req, ctx)
138 }
139}
140
141pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
163where
164 F: Handler<A, B>,
165 <F as Handler<A, B>>::Error: fmt::Debug,
166 A: for<'de> Deserialize<'de>,
167 B: Serialize,
168{
169 trace!("Loading config from env");
170 let mut handler = handler;
171 let config = Config::from_env()?;
172 let uri = config.endpoint.try_into().expect("Unable to convert to URL");
173 let client = Client::with(uri, hyper::Client::new());
174 let incoming = incoming(&client);
175 run_inner(&client, incoming, &mut handler).await?;
176
177 Ok(())
178}
179
180pub async fn run_simulated<A, B, F>(handler: F, url: &str) -> Result<(), Error>
182where
183 F: Handler<A, B>,
184 <F as Handler<A, B>>::Error: fmt::Debug,
185 A: for<'de> Deserialize<'de>,
186 B: Serialize,
187{
188 let mut handler = handler;
189 let uri = url.try_into().expect("Unable to convert to URL");
190 let client = Client::with(uri, hyper::Client::new());
191 let incoming = incoming(&client).take(1);
192 run_inner(&client, incoming, &mut handler).await?;
193
194 Ok(())
195}
196
197fn incoming(client: &Client) -> impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + '_ {
198 async_stream::stream! {
199 loop {
200 let req = NextEventRequest.into_req().expect("Unable to construct request");
201 let res = client.call(req).await;
202 yield res;
203 }
204 }
205}
206
207async fn run_inner<A, B, F>(
208 client: &Client,
209 incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>>,
210 handler: &mut F,
211) -> Result<(), Error>
212where
213 F: Handler<A, B>,
214 <F as Handler<A, B>>::Error: fmt::Debug,
215 A: for<'de> Deserialize<'de>,
216 B: Serialize,
217{
218 tokio::pin!(incoming);
219
220 while let Some(event) = incoming.next().await {
221 let event = event?;
222 let (parts, body) = event.into_parts();
223
224 let mut ctx: Context = Context::try_from(parts.headers)?;
225 ctx.env_config = Config::from_env()?;
226 let body = hyper::body::to_bytes(body).await?;
227 let body = serde_json::from_slice(&body)?;
228
229 let request_id = &ctx.request_id.clone();
230 let f = handler.call(body, ctx);
231
232 let req = match f.await {
233 Ok(res) => EventCompletionRequest { request_id, body: res }.into_req()?,
234 Err(e) => EventErrorRequest {
235 request_id,
236 diagnostic: Diagnostic {
237 error_message: format!("{:?}", e),
238 error_type: type_name_of_val(e).to_owned(),
239 },
240 }
241 .into_req()?,
242 };
243 client.call(req).await?;
244 }
245
246 Ok(())
247}
248
249fn type_name_of_val<T>(_: T) -> &'static str {
250 std::any::type_name::<T>()
251}