lambda_runtime/lib.rs
1#![deny(clippy::all, clippy::cargo)]
2#![allow(clippy::multiple_crate_versions)]
3#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! The mechanism available for defining a Lambda function is as follows:
7//!
8//! Create a type that conforms to the [`tower::Service`] trait. This type can
9//! then be passed to the the `lambda_runtime::run` function, which launches
10//! and runs the Lambda runtime.
11use serde::{Deserialize, Serialize};
12use std::{
13 env,
14 fmt::{self, Debug},
15 future::Future,
16 sync::Arc,
17};
18use tokio_stream::Stream;
19use tower::util::ServiceFn;
20pub use tower::{self, service_fn, Service};
21
22#[macro_use]
23mod macros;
24
25/// Diagnostic utilities to convert Rust types into Lambda Error types.
26pub mod diagnostic;
27pub use diagnostic::Diagnostic;
28
29mod deserializer;
30/// Tower middleware to be applied to runtime invocations.
31pub mod layers;
32mod requests;
33mod runtime;
34/// Utilities for Lambda Streaming functions.
35pub mod streaming;
36
37/// Utilities to initialize and use `tracing` and `tracing-subscriber` in Lambda Functions.
38#[cfg(feature = "tracing")]
39#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
40pub use lambda_runtime_api_client::tracing;
41
42/// Types available to a Lambda function.
43mod types;
44
45use requests::EventErrorRequest;
46pub use runtime::{LambdaInvocation, Runtime};
47pub use types::{Context, FunctionResponse, IntoFunctionResponse, LambdaEvent, MetadataPrelude, StreamResponse};
48
49/// Error type that lambdas may result in
50pub type Error = lambda_runtime_api_client::BoxError;
51
52/// Configuration derived from environment variables.
53#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
54pub struct Config {
55 /// The name of the function.
56 pub function_name: String,
57 /// The amount of memory available to the function in MB.
58 pub memory: i32,
59 /// The version of the function being executed.
60 pub version: String,
61 /// The name of the Amazon CloudWatch Logs stream for the function.
62 pub log_stream: String,
63 /// The name of the Amazon CloudWatch Logs group for the function.
64 pub log_group: String,
65}
66
67type RefConfig = Arc<Config>;
68
69impl Config {
70 /// Attempts to read configuration from environment variables.
71 pub fn from_env() -> Self {
72 Config {
73 function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").expect("Missing AWS_LAMBDA_FUNCTION_NAME env var"),
74 memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
75 .expect("Missing AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var")
76 .parse::<i32>()
77 .expect("AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var is not <i32>"),
78 version: env::var("AWS_LAMBDA_FUNCTION_VERSION").expect("Missing AWS_LAMBDA_FUNCTION_VERSION env var"),
79 log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_default(),
80 log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_default(),
81 }
82 }
83}
84
85/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments.
86#[deprecated(since = "0.5.0", note = "Use `service_fn` and `LambdaEvent` instead")]
87pub fn handler_fn<A, F, Fut>(f: F) -> ServiceFn<impl Fn(LambdaEvent<A>) -> Fut>
88where
89 F: Fn(A, Context) -> Fut,
90{
91 service_fn(move |req: LambdaEvent<A>| f(req.payload, req.context))
92}
93
94/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
95/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
96///
97/// If you need more control over the runtime and add custom middleware, use the
98/// [Runtime] type directly.
99///
100/// # Managed concurrency
101/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged.
102/// If your handler can satisfy `Clone + Send + 'static`,
103/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature),
104/// which honors managed concurrency and falls back to sequential behavior when
105/// unset.
106///
107/// # Example
108/// ```no_run
109/// use lambda_runtime::{Error, service_fn, LambdaEvent};
110/// use serde_json::Value;
111///
112/// #[tokio::main]
113/// async fn main() -> Result<(), Error> {
114/// let func = service_fn(func);
115/// lambda_runtime::run(func).await?;
116/// Ok(())
117/// }
118///
119/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
120/// Ok(event.payload)
121/// }
122/// ```
123///
124/// # Panics
125///
126/// This function panics if required Lambda environment variables are missing
127/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
128/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
129pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
130where
131 F: Service<LambdaEvent<A>, Response = R>,
132 F::Future: Future<Output = Result<R, F::Error>>,
133 F::Error: Into<Diagnostic> + fmt::Debug,
134 A: for<'de> Deserialize<'de>,
135 R: IntoFunctionResponse<B, S>,
136 B: Serialize,
137 S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
138 D: Into<bytes::Bytes> + Send,
139 E: Into<Error> + Send + Debug,
140{
141 let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
142 runtime.run().await
143}
144
145/// Starts the Lambda Rust runtime in a mode that is compatible with
146/// Lambda Managed Instances (concurrent invocations).
147///
148/// Requires the `concurrency-tokio` feature.
149///
150/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
151/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` tokio worker tasks, each running its own
152/// `/next` polling loop. When the environment variable is unset or `<= 1`, it
153/// falls back to the same sequential behavior as [`run`], so the same handler
154/// can run on both classic Lambda and Lambda Managed Instances.
155///
156/// If you need more control over the runtime and add custom middleware, use the
157/// [Runtime] type directly.
158///
159/// # Example
160/// ```no_run
161/// use lambda_runtime::{Error, service_fn, LambdaEvent};
162/// use serde_json::Value;
163///
164/// #[tokio::main]
165/// async fn main() -> Result<(), Error> {
166/// let func = service_fn(func);
167/// lambda_runtime::run_concurrent(func).await?;
168/// Ok(())
169/// }
170///
171/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
172/// Ok(event.payload)
173/// }
174/// ```
175///
176/// # Panics
177///
178/// This function panics if:
179/// - Called outside of a Tokio runtime
180/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`,
181/// `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`,
182/// `AWS_LAMBDA_RUNTIME_API`)
183#[cfg(feature = "concurrency-tokio")]
184#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
185pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
186where
187 F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,
188 F::Future: Future<Output = Result<R, F::Error>> + Send + 'static,
189 F::Error: Into<Diagnostic> + fmt::Debug,
190 A: for<'de> Deserialize<'de> + Send + 'static,
191 R: IntoFunctionResponse<B, S> + Send + 'static,
192 B: Serialize + Send + 'static,
193 S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
194 D: Into<bytes::Bytes> + Send + 'static,
195 E: Into<Error> + Send + Debug + 'static,
196{
197 let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
198 runtime.run_concurrent().await
199}
200
201/// Spawns a task that will be execute a provided async closure when the process
202/// receives unix graceful shutdown signals. If the closure takes longer than 500ms
203/// to execute, an unhandled `SIGKILL` signal might be received.
204///
205/// You can use this future to execute cleanup or flush related logic prior to runtime shutdown.
206///
207/// This function's returned future must be resolved prior to `lambda_runtime::run()`.
208///
209/// Note that this implicitly also registers and drives a no-op internal extension that subscribes to no events.
210/// This extension will be named `_lambda-rust-runtime-no-op-graceful-shutdown-helper`. This extension name
211/// can not be reused by other registered extensions. This is necessary in order to receive graceful shutdown signals.
212///
213/// This extension is cheap to run because it receives no events, but is not zero cost. If you have another extension
214/// registered already, you might prefer to manually construct your own graceful shutdown handling without the dummy extension.
215///
216/// For more information on general AWS Lambda graceful shutdown handling, see:
217/// <https://github.com/aws-samples/graceful-shutdown-with-aws-lambda>
218///
219/// # Panics
220///
221/// This function panics if:
222/// - this function is called after `lambda_runtime::run()`
223/// - this function is called outside of a context that has access to the tokio i/o
224/// - the no-op extension cannot be registered
225/// - either signal listener panics [tokio::signal::unix](https://docs.rs/tokio/latest/tokio/signal/unix/fn.signal.html#errors)
226///
227/// # Example
228/// ```no_run
229/// use lambda_runtime::{Error, service_fn, LambdaEvent};
230/// use serde_json::Value;
231///
232/// #[tokio::main]
233/// async fn main() -> Result<(), Error> {
234/// let func = service_fn(func);
235///
236/// let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout());
237/// lambda_runtime::tracing::init_default_subscriber_with_writer(writer);
238///
239/// let shutdown_hook = || async move {
240/// std::mem::drop(log_guard);
241/// };
242/// lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook).await;
243///
244/// lambda_runtime::run(func).await?;
245/// Ok(())
246/// }
247///
248/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
249/// Ok(event.payload)
250/// }
251/// ```
252#[cfg(all(unix, feature = "graceful-shutdown"))]
253#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "graceful-shutdown"))))]
254pub async fn spawn_graceful_shutdown_handler<Fut>(shutdown_hook: impl FnOnce() -> Fut + Send + 'static)
255where
256 Fut: Future<Output = ()> + Send + 'static,
257{
258 // You need an extension registered with the Lambda orchestrator in order for your process
259 // to receive a SIGTERM for graceful shutdown.
260 //
261 // We accomplish this here by registering a no-op internal extension, which does not subscribe to any events.
262 //
263 // This extension is cheap to run since after it connects to the lambda orchestration, the connection
264 // will just wait forever for data to come, which never comes, so it won't cause wakes.
265 let extension = lambda_extension::Extension::new()
266 // Don't subscribe to any event types
267 .with_events(&[])
268 // Internal extension names MUST be unique within a given Lambda function.
269 .with_extension_name("_lambda-rust-runtime-no-op-graceful-shutdown-helper")
270 // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init
271 // phase and begins the Invoke phase.
272 .register()
273 .await
274 .expect("could not register no-op extension for graceful shutdown");
275
276 tokio::task::spawn(async move {
277 let graceful_shutdown_future = async move {
278 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();
279 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
280 tokio::select! {
281 _sigint = sigint.recv() => {
282 eprintln!("[runtime] SIGINT received");
283 eprintln!("[runtime] Graceful shutdown in progress ...");
284 shutdown_hook().await;
285 eprintln!("[runtime] Graceful shutdown completed");
286 std::process::exit(0);
287 },
288 _sigterm = sigterm.recv()=> {
289 eprintln!("[runtime] SIGTERM received");
290 eprintln!("[runtime] Graceful shutdown in progress ...");
291 shutdown_hook().await;
292 eprintln!("[runtime] Graceful shutdown completed");
293 std::process::exit(0);
294 },
295 }
296 };
297
298 let _: (_, ()) = tokio::join!(
299 // we always poll the graceful shutdown future first,
300 // which results in a smaller future due to lack of bookkeeping of which was last polled
301 biased;
302 graceful_shutdown_future, async {
303 // we suppress extension errors because we don't actually mind if it crashes,
304 // all we need to do is kick off the run so that lambda exits the init phase
305 let _ = extension.run().await;
306 });
307 });
308}