#![deny(clippy::all, clippy::cargo)]
#![allow(clippy::multiple_crate_versions)]
#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use serde::{Deserialize, Serialize};
use std::{
env,
fmt::{self, Debug},
future::Future,
sync::Arc,
};
use tokio_stream::Stream;
use tower::util::ServiceFn;
pub use tower::{self, service_fn, Service};
#[macro_use]
mod macros;
pub mod diagnostic;
pub use diagnostic::Diagnostic;
mod deserializer;
pub mod layers;
mod requests;
mod runtime;
pub mod streaming;
#[cfg(feature = "tracing")]
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
pub use lambda_runtime_api_client::tracing;
mod types;
use requests::EventErrorRequest;
pub use runtime::{LambdaInvocation, Runtime};
pub use types::{Context, FunctionResponse, IntoFunctionResponse, LambdaEvent, MetadataPrelude, StreamResponse};
pub type Error = lambda_runtime_api_client::BoxError;
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Config {
pub function_name: String,
pub memory: i32,
pub version: String,
pub log_stream: String,
pub log_group: String,
}
type RefConfig = Arc<Config>;
impl Config {
pub fn from_env() -> Self {
Config {
function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").expect("Missing AWS_LAMBDA_FUNCTION_NAME env var"),
memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
.expect("Missing AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var")
.parse::<i32>()
.expect("AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var is not <i32>"),
version: env::var("AWS_LAMBDA_FUNCTION_VERSION").expect("Missing AWS_LAMBDA_FUNCTION_VERSION env var"),
log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_default(),
log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_default(),
}
}
}
#[deprecated(since = "0.5.0", note = "Use `service_fn` and `LambdaEvent` instead")]
pub fn handler_fn<A, F, Fut>(f: F) -> ServiceFn<impl Fn(LambdaEvent<A>) -> Fut>
where
F: Fn(A, Context) -> Fut,
{
service_fn(move |req: LambdaEvent<A>| f(req.payload, req.context))
}
pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
where
F: Service<LambdaEvent<A>, Response = R>,
F::Future: Future<Output = Result<R, F::Error>>,
F::Error: Into<Diagnostic> + fmt::Debug,
A: for<'de> Deserialize<'de>,
R: IntoFunctionResponse<B, S>,
B: Serialize,
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
D: Into<bytes::Bytes> + Send,
E: Into<Error> + Send + Debug,
{
let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
runtime.run().await
}
#[cfg(feature = "concurrency-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
where
F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,
F::Future: Future<Output = Result<R, F::Error>> + Send + 'static,
F::Error: Into<Diagnostic> + fmt::Debug,
A: for<'de> Deserialize<'de> + Send + 'static,
R: IntoFunctionResponse<B, S> + Send + 'static,
B: Serialize + Send + 'static,
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
D: Into<bytes::Bytes> + Send + 'static,
E: Into<Error> + Send + Debug + 'static,
{
let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
runtime.run_concurrent().await
}
#[cfg(all(unix, feature = "graceful-shutdown"))]
#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "graceful-shutdown"))))]
pub async fn spawn_graceful_shutdown_handler<Fut>(shutdown_hook: impl FnOnce() -> Fut + Send + 'static)
where
Fut: Future<Output = ()> + Send + 'static,
{
let extension = lambda_extension::Extension::new()
.with_events(&[])
.with_extension_name("_lambda-rust-runtime-no-op-graceful-shutdown-helper")
.register()
.await
.expect("could not register no-op extension for graceful shutdown");
tokio::task::spawn(async move {
let graceful_shutdown_future = async move {
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
tokio::select! {
_sigint = sigint.recv() => {
eprintln!("[runtime] SIGINT received");
eprintln!("[runtime] Graceful shutdown in progress ...");
shutdown_hook().await;
eprintln!("[runtime] Graceful shutdown completed");
std::process::exit(0);
},
_sigterm = sigterm.recv()=> {
eprintln!("[runtime] SIGTERM received");
eprintln!("[runtime] Graceful shutdown in progress ...");
shutdown_hook().await;
eprintln!("[runtime] Graceful shutdown completed");
std::process::exit(0);
},
}
};
let _: (_, ()) = tokio::join!(
biased;
graceful_shutdown_future, async {
let _ = extension.run().await;
});
});
}