Skip to main content

lambda_runtime/
lib.rs

1#![deny(clippy::all, clippy::cargo)]
2#![allow(clippy::multiple_crate_versions)]
3#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! The mechanism available for defining a Lambda function is as follows:
7//!
8//! Create a type that conforms to the [`tower::Service`] trait. This type can
9//! then be passed to the the `lambda_runtime::run` function, which launches
10//! and runs the Lambda runtime.
11use serde::{Deserialize, Serialize};
12use std::{
13    env,
14    fmt::{self, Debug},
15    future::Future,
16    sync::Arc,
17};
18use tokio_stream::Stream;
19use tower::util::ServiceFn;
20pub use tower::{self, service_fn, Service};
21
22#[macro_use]
23mod macros;
24
25/// Diagnostic utilities to convert Rust types into Lambda Error types.
26pub mod diagnostic;
27pub use diagnostic::Diagnostic;
28
29mod deserializer;
30/// Tower middleware to be applied to runtime invocations.
31pub mod layers;
32mod requests;
33mod runtime;
34/// Utilities for Lambda Streaming functions.
35pub mod streaming;
36
37/// Utilities to initialize and use `tracing` and `tracing-subscriber` in Lambda Functions.
38#[cfg(feature = "tracing")]
39#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
40pub use lambda_runtime_api_client::tracing;
41
42/// Types available to a Lambda function.
43mod types;
44
45use requests::EventErrorRequest;
46pub use runtime::{LambdaInvocation, Runtime};
47pub use types::{Context, FunctionResponse, IntoFunctionResponse, LambdaEvent, MetadataPrelude, StreamResponse};
48
49/// Error type that lambdas may result in
50pub type Error = lambda_runtime_api_client::BoxError;
51
52/// Configuration derived from environment variables.
53#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
54pub struct Config {
55    /// The name of the function.
56    pub function_name: String,
57    /// The amount of memory available to the function in MB.
58    pub memory: i32,
59    /// The version of the function being executed.
60    pub version: String,
61    /// The name of the Amazon CloudWatch Logs stream for the function.
62    pub log_stream: String,
63    /// The name of the Amazon CloudWatch Logs group for the function.
64    pub log_group: String,
65}
66
67type RefConfig = Arc<Config>;
68
69impl Config {
70    /// Attempts to read configuration from environment variables.
71    pub fn from_env() -> Self {
72        Config {
73            function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").expect("Missing AWS_LAMBDA_FUNCTION_NAME env var"),
74            memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
75                .expect("Missing AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var")
76                .parse::<i32>()
77                .expect("AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var is not <i32>"),
78            version: env::var("AWS_LAMBDA_FUNCTION_VERSION").expect("Missing AWS_LAMBDA_FUNCTION_VERSION env var"),
79            log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_default(),
80            log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_default(),
81        }
82    }
83}
84
85/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments.
86#[deprecated(since = "0.5.0", note = "Use `service_fn` and `LambdaEvent` instead")]
87pub fn handler_fn<A, F, Fut>(f: F) -> ServiceFn<impl Fn(LambdaEvent<A>) -> Fut>
88where
89    F: Fn(A, Context) -> Fut,
90{
91    service_fn(move |req: LambdaEvent<A>| f(req.payload, req.context))
92}
93
94/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
95/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
96///
97/// If you need more control over the runtime and add custom middleware, use the
98/// [Runtime] type directly.
99///
100/// # Managed concurrency
101/// If `AWS_LAMBDA_MAX_CONCURRENCY` is set, a warning is logged.
102/// If your handler can satisfy `Clone + Send + 'static`,
103/// prefer [`run_concurrent`] (requires the `concurrency-tokio` feature),
104/// which honors managed concurrency and falls back to sequential behavior when
105/// unset.
106///
107/// # Example
108/// ```no_run
109/// use lambda_runtime::{Error, service_fn, LambdaEvent};
110/// use serde_json::Value;
111///
112/// #[tokio::main]
113/// async fn main() -> Result<(), Error> {
114///     let func = service_fn(func);
115///     lambda_runtime::run(func).await?;
116///     Ok(())
117/// }
118///
119/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
120///     Ok(event.payload)
121/// }
122/// ```
123///
124/// # Panics
125///
126/// This function panics if required Lambda environment variables are missing
127/// (`AWS_LAMBDA_FUNCTION_NAME`, `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`,
128/// `AWS_LAMBDA_FUNCTION_VERSION`, `AWS_LAMBDA_RUNTIME_API`).
129pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
130where
131    F: Service<LambdaEvent<A>, Response = R>,
132    F::Future: Future<Output = Result<R, F::Error>>,
133    F::Error: Into<Diagnostic> + fmt::Debug,
134    A: for<'de> Deserialize<'de>,
135    R: IntoFunctionResponse<B, S>,
136    B: Serialize,
137    S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
138    D: Into<bytes::Bytes> + Send,
139    E: Into<Error> + Send + Debug,
140{
141    let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
142    runtime.run().await
143}
144
145/// Starts the Lambda Rust runtime in a mode that is compatible with
146/// Lambda Managed Instances (concurrent invocations).
147///
148/// Requires the `concurrency-tokio` feature.
149///
150/// When `AWS_LAMBDA_MAX_CONCURRENCY` is set to a value greater than 1, this
151/// will spawn `AWS_LAMBDA_MAX_CONCURRENCY` tokio worker tasks, each running its own
152/// `/next` polling loop. When the environment variable is unset or `<= 1`, it
153/// falls back to the same sequential behavior as [`run`], so the same handler
154/// can run on both classic Lambda and Lambda Managed Instances.
155///
156/// If you need more control over the runtime and add custom middleware, use the
157/// [Runtime] type directly.
158///
159/// # Example
160/// ```no_run
161/// use lambda_runtime::{Error, service_fn, LambdaEvent};
162/// use serde_json::Value;
163///
164/// #[tokio::main]
165/// async fn main() -> Result<(), Error> {
166///     let func = service_fn(func);
167///     lambda_runtime::run_concurrent(func).await?;
168///     Ok(())
169/// }
170///
171/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
172///     Ok(event.payload)
173/// }
174/// ```
175///
176/// # Panics
177///
178/// This function panics if:
179/// - Called outside of a Tokio runtime
180/// - Required Lambda environment variables are missing (`AWS_LAMBDA_FUNCTION_NAME`,
181///   `AWS_LAMBDA_FUNCTION_MEMORY_SIZE`, `AWS_LAMBDA_FUNCTION_VERSION`,
182///   `AWS_LAMBDA_RUNTIME_API`)
183#[cfg(feature = "concurrency-tokio")]
184#[cfg_attr(docsrs, doc(cfg(feature = "concurrency-tokio")))]
185pub async fn run_concurrent<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
186where
187    F: Service<LambdaEvent<A>, Response = R> + Clone + Send + 'static,
188    F::Future: Future<Output = Result<R, F::Error>> + Send + 'static,
189    F::Error: Into<Diagnostic> + fmt::Debug,
190    A: for<'de> Deserialize<'de> + Send + 'static,
191    R: IntoFunctionResponse<B, S> + Send + 'static,
192    B: Serialize + Send + 'static,
193    S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
194    D: Into<bytes::Bytes> + Send + 'static,
195    E: Into<Error> + Send + Debug + 'static,
196{
197    let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
198    runtime.run_concurrent().await
199}
200
201/// Spawns a task that will be execute a provided async closure when the process
202/// receives unix graceful shutdown signals. If the closure takes longer than 500ms
203/// to execute, an unhandled `SIGKILL` signal might be received.
204///
205/// You can use this future to execute cleanup or flush related logic prior to runtime shutdown.
206///
207/// This function's returned future must be resolved prior to `lambda_runtime::run()`.
208///
209/// Note that this implicitly also registers and drives a no-op internal extension that subscribes to no events.
210/// This extension will be named `_lambda-rust-runtime-no-op-graceful-shutdown-helper`. This extension name
211/// can not be reused by other registered extensions. This is necessary in order to receive graceful shutdown signals.
212///
213/// This extension is cheap to run because it receives no events, but is not zero cost. If you have another extension
214/// registered already, you might prefer to manually construct your own graceful shutdown handling without the dummy extension.
215///
216/// For more information on general AWS Lambda graceful shutdown handling, see:
217/// <https://github.com/aws-samples/graceful-shutdown-with-aws-lambda>
218///
219/// # Panics
220///
221/// This function panics if:
222/// - this function is called after `lambda_runtime::run()`
223/// - this function is called outside of a context that has access to the tokio i/o
224/// - the no-op extension cannot be registered
225/// - either signal listener panics [tokio::signal::unix](https://docs.rs/tokio/latest/tokio/signal/unix/fn.signal.html#errors)
226///
227/// # Example
228/// ```no_run
229/// use lambda_runtime::{Error, service_fn, LambdaEvent};
230/// use serde_json::Value;
231///
232/// #[tokio::main]
233/// async fn main() -> Result<(), Error> {
234///     let func = service_fn(func);
235///
236///     let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout());
237///     lambda_runtime::tracing::init_default_subscriber_with_writer(writer);
238///
239///     let shutdown_hook = || async move {
240///         std::mem::drop(log_guard);
241///     };
242///     lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook).await;
243///
244///     lambda_runtime::run(func).await?;
245///     Ok(())
246/// }
247///
248/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
249///     Ok(event.payload)
250/// }
251/// ```
252#[cfg(all(unix, feature = "graceful-shutdown"))]
253#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "graceful-shutdown"))))]
254pub async fn spawn_graceful_shutdown_handler<Fut>(shutdown_hook: impl FnOnce() -> Fut + Send + 'static)
255where
256    Fut: Future<Output = ()> + Send + 'static,
257{
258    // You need an extension registered with the Lambda orchestrator in order for your process
259    // to receive a SIGTERM for graceful shutdown.
260    //
261    // We accomplish this here by registering a no-op internal extension, which does not subscribe to any events.
262    //
263    // This extension is cheap to run since after it connects to the lambda orchestration, the connection
264    // will just wait forever for data to come, which never comes, so it won't cause wakes.
265    let extension = lambda_extension::Extension::new()
266        // Don't subscribe to any event types
267        .with_events(&[])
268        // Internal extension names MUST be unique within a given Lambda function.
269        .with_extension_name("_lambda-rust-runtime-no-op-graceful-shutdown-helper")
270        // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init
271        // phase and begins the Invoke phase.
272        .register()
273        .await
274        .expect("could not register no-op extension for graceful shutdown");
275
276    tokio::task::spawn(async move {
277        let graceful_shutdown_future = async move {
278            let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();
279            let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
280            tokio::select! {
281                _sigint = sigint.recv() => {
282                    eprintln!("[runtime] SIGINT received");
283                    eprintln!("[runtime] Graceful shutdown in progress ...");
284                    shutdown_hook().await;
285                    eprintln!("[runtime] Graceful shutdown completed");
286                    std::process::exit(0);
287                },
288                _sigterm = sigterm.recv()=> {
289                    eprintln!("[runtime] SIGTERM received");
290                    eprintln!("[runtime] Graceful shutdown in progress ...");
291                    shutdown_hook().await;
292                    eprintln!("[runtime] Graceful shutdown completed");
293                    std::process::exit(0);
294                },
295            }
296        };
297
298        let _: (_, ()) = tokio::join!(
299            // we always poll the graceful shutdown future first,
300            // which results in a smaller future due to lack of bookkeeping of which was last polled
301            biased;
302            graceful_shutdown_future, async {
303            // we suppress extension errors because we don't actually mind if it crashes,
304            // all we need to do is kick off the run so that lambda exits the init phase
305            let _ = extension.run().await;
306        });
307    });
308}