1#![deny(clippy::all, clippy::cargo)]
2#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
3
4pub use crate::types::Context;
41use client::Client;
42use futures_core::stream::Stream;
43use futures_util::stream::StreamExt;
44pub use lamedh_attributes::lambda;
45use serde::{Deserialize, Serialize};
46use std::{
47 convert::{TryFrom, TryInto},
48 env, fmt,
49 future::Future,
50};
51use tracing::trace;
52
53mod client;
54mod requests;
55#[cfg(test)]
56mod simulated;
57mod types;
59
60use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
61use types::Diagnostic;
62
63static DEFAULT_LOG_GROUP: &str = "/aws/lambda/Functions";
64static DEFAULT_LOG_STREAM: &str = "$LATEST";
65
66pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
68
69#[derive(Debug, Default, Clone, PartialEq)]
71pub struct Config {
72 pub endpoint: String,
74 pub function_name: String,
76 pub memory: i32,
78 pub version: String,
80 pub log_stream: String,
82 pub log_group: String,
84}
85
86impl Config {
87 pub fn from_env() -> Result<Self, Error> {
89 let conf = Config {
90 endpoint: env::var("AWS_LAMBDA_RUNTIME_API")?,
91 function_name: env::var("AWS_LAMBDA_FUNCTION_NAME")?,
92 memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")?.parse::<i32>()?,
93 version: env::var("AWS_LAMBDA_FUNCTION_VERSION")?,
94 log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_else(|_e| DEFAULT_LOG_STREAM.to_owned()),
95 log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_else(|_e| DEFAULT_LOG_GROUP.to_owned()),
96 };
97 Ok(conf)
98 }
99}
100
101pub trait Handler<A, B> {
103 type Error;
105 type Fut: Future<Output = Result<B, Self::Error>>;
107 fn call(&mut self, event: A, context: Context) -> Self::Fut;
109}
110
111pub fn handler_fn<F>(f: F) -> HandlerFn<F> {
115 HandlerFn { f }
116}
117
118#[derive(Clone, Debug)]
122pub struct HandlerFn<F> {
123 f: F,
124}
125
126impl<F, A, B, Error, Fut> Handler<A, B> for HandlerFn<F>
127where
128 F: Fn(A, Context) -> Fut,
129 Fut: Future<Output = Result<B, Error>> + Send,
130 Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + fmt::Display,
131{
132 type Error = Error;
133 type Fut = Fut;
134 fn call(&mut self, req: A, ctx: Context) -> Self::Fut {
135 (self.f)(req, ctx)
136 }
137}
138
139pub async fn run<A, B, F>(handler: F) -> Result<(), Error>
159where
160 F: Handler<A, B>,
161 <F as Handler<A, B>>::Error: fmt::Debug,
162 A: for<'de> Deserialize<'de>,
163 B: Serialize,
164{
165 trace!("Loading config from env");
166 let mut handler = handler;
167 let config = Config::from_env()?;
168 let uri = config.endpoint.try_into().expect("Unable to convert to URL");
169 let client = Client::with(uri, hyper::Client::new());
170 let incoming = incoming(&client);
171 run_inner(&client, incoming, &mut handler).await?;
172
173 Ok(())
174}
175
176pub async fn run_simulated<A, B, F>(handler: F, url: &str) -> Result<(), Error>
178where
179 F: Handler<A, B>,
180 <F as Handler<A, B>>::Error: fmt::Debug,
181 A: for<'de> Deserialize<'de>,
182 B: Serialize,
183{
184 let mut handler = handler;
185 let uri = url.try_into().expect("Unable to convert to URL");
186 let client = Client::with(uri, hyper::Client::new());
187 let incoming = incoming(&client).take(1);
188 run_inner(&client, incoming, &mut handler).await?;
189
190 Ok(())
191}
192
193fn incoming(client: &Client) -> impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + '_ {
194 async_stream::stream! {
195 loop {
196 let req = NextEventRequest.into_req().expect("Unable to construct request");
197 let res = client.call(req).await;
198 yield res;
199 }
200 }
201}
202
203async fn run_inner<A, B, F>(
204 client: &Client,
205 incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>>,
206 handler: &mut F,
207) -> Result<(), Error>
208where
209 F: Handler<A, B>,
210 <F as Handler<A, B>>::Error: fmt::Debug,
211 A: for<'de> Deserialize<'de>,
212 B: Serialize,
213{
214 tokio::pin!(incoming);
215
216 while let Some(event) = incoming.next().await {
217 let event = event?;
218 let (parts, body) = event.into_parts();
219
220 let mut ctx: Context = Context::try_from(parts.headers)?;
221 ctx.env_config = Config::from_env()?;
222 let body = hyper::body::to_bytes(body).await?;
223 let body = serde_json::from_slice(&body)?;
224
225 let request_id = &ctx.request_id.clone();
226 let f = handler.call(body, ctx);
227
228 let req = match f.await {
229 Ok(res) => EventCompletionRequest { request_id, body: res }.into_req()?,
230 Err(e) => EventErrorRequest {
231 request_id,
232 diagnostic: Diagnostic {
233 error_message: format!("{:?}", e),
234 error_type: type_name_of_val(e).to_owned(),
235 },
236 }
237 .into_req()?,
238 };
239 client.call(req).await?;
240 }
241
242 Ok(())
243}
244
245fn type_name_of_val<T>(_: T) -> &'static str {
246 std::any::type_name::<T>()
247}