lambda_runtime/
lib.rs

1#![deny(clippy::all, clippy::cargo)]
2#![allow(clippy::multiple_crate_versions)]
3#![warn(missing_docs, nonstandard_style, rust_2018_idioms)]
4#![cfg_attr(docsrs, feature(doc_cfg))]
5
6//! The mechanism available for defining a Lambda function is as follows:
7//!
8//! Create a type that conforms to the [`tower::Service`] trait. This type can
9//! then be passed to the the `lambda_runtime::run` function, which launches
10//! and runs the Lambda runtime.
11use serde::{Deserialize, Serialize};
12use std::{
13    env,
14    fmt::{self, Debug},
15    future::Future,
16    sync::Arc,
17};
18use tokio_stream::Stream;
19use tower::util::ServiceFn;
20pub use tower::{self, service_fn, Service};
21
22/// Diagnostic utilities to convert Rust types into Lambda Error types.
23pub mod diagnostic;
24pub use diagnostic::Diagnostic;
25
26mod deserializer;
27/// Tower middleware to be applied to runtime invocations.
28pub mod layers;
29mod requests;
30mod runtime;
31/// Utilities for Lambda Streaming functions.
32pub mod streaming;
33
34/// Utilities to initialize and use `tracing` and `tracing-subscriber` in Lambda Functions.
35#[cfg(feature = "tracing")]
36#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
37pub use lambda_runtime_api_client::tracing;
38
39/// Types available to a Lambda function.
40mod types;
41
42use requests::EventErrorRequest;
43pub use runtime::{LambdaInvocation, Runtime};
44pub use types::{Context, FunctionResponse, IntoFunctionResponse, LambdaEvent, MetadataPrelude, StreamResponse};
45
46/// Error type that lambdas may result in
47pub type Error = lambda_runtime_api_client::BoxError;
48
49/// Configuration derived from environment variables.
50#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
51pub struct Config {
52    /// The name of the function.
53    pub function_name: String,
54    /// The amount of memory available to the function in MB.
55    pub memory: i32,
56    /// The version of the function being executed.
57    pub version: String,
58    /// The name of the Amazon CloudWatch Logs stream for the function.
59    pub log_stream: String,
60    /// The name of the Amazon CloudWatch Logs group for the function.
61    pub log_group: String,
62}
63
64type RefConfig = Arc<Config>;
65
66impl Config {
67    /// Attempts to read configuration from environment variables.
68    pub fn from_env() -> Self {
69        Config {
70            function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").expect("Missing AWS_LAMBDA_FUNCTION_NAME env var"),
71            memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
72                .expect("Missing AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var")
73                .parse::<i32>()
74                .expect("AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var is not <i32>"),
75            version: env::var("AWS_LAMBDA_FUNCTION_VERSION").expect("Missing AWS_LAMBDA_FUNCTION_VERSION env var"),
76            log_stream: env::var("AWS_LAMBDA_LOG_STREAM_NAME").unwrap_or_default(),
77            log_group: env::var("AWS_LAMBDA_LOG_GROUP_NAME").unwrap_or_default(),
78        }
79    }
80}
81
82/// Return a new [`ServiceFn`] with a closure that takes an event and context as separate arguments.
83#[deprecated(since = "0.5.0", note = "Use `service_fn` and `LambdaEvent` instead")]
84pub fn handler_fn<A, F, Fut>(f: F) -> ServiceFn<impl Fn(LambdaEvent<A>) -> Fut>
85where
86    F: Fn(A, Context) -> Fut,
87{
88    service_fn(move |req: LambdaEvent<A>| f(req.payload, req.context))
89}
90
91/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
92/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
93///
94/// If you need more control over the runtime and add custom middleware, use the
95/// [Runtime] type directly.
96///
97/// # Example
98/// ```no_run
99/// use lambda_runtime::{Error, service_fn, LambdaEvent};
100/// use serde_json::Value;
101///
102/// #[tokio::main]
103/// async fn main() -> Result<(), Error> {
104///     let func = service_fn(func);
105///     lambda_runtime::run(func).await?;
106///     Ok(())
107/// }
108///
109/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
110///     Ok(event.payload)
111/// }
112/// ```
113pub async fn run<A, F, R, B, S, D, E>(handler: F) -> Result<(), Error>
114where
115    F: Service<LambdaEvent<A>, Response = R>,
116    F::Future: Future<Output = Result<R, F::Error>>,
117    F::Error: Into<Diagnostic> + fmt::Debug,
118    A: for<'de> Deserialize<'de>,
119    R: IntoFunctionResponse<B, S>,
120    B: Serialize,
121    S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
122    D: Into<bytes::Bytes> + Send,
123    E: Into<Error> + Send + Debug,
124{
125    let runtime = Runtime::new(handler).layer(layers::TracingLayer::new());
126    runtime.run().await
127}
128
129/// Spawns a task that will be execute a provided async closure when the process
130/// receives unix graceful shutdown signals. If the closure takes longer than 500ms
131/// to execute, an unhandled `SIGKILL` signal might be received.
132///
133/// You can use this future to execute cleanup or flush related logic prior to runtime shutdown.
134///
135/// This function's returned future must be resolved prior to `lambda_runtime::run()`.
136///
137/// Note that this implicitly also registers and drives a no-op internal extension that subscribes to no events.
138/// This extension will be named `_lambda-rust-runtime-no-op-graceful-shutdown-helper`. This extension name
139/// can not be reused by other registered extensions. This is necessary in order to receive graceful shutdown signals.
140///
141/// This extension is cheap to run because it receives no events, but is not zero cost. If you have another extension
142/// registered already, you might prefer to manually construct your own graceful shutdown handling without the dummy extension.
143///
144/// For more information on general AWS Lambda graceful shutdown handling, see:
145/// <https://github.com/aws-samples/graceful-shutdown-with-aws-lambda>
146///
147/// # Panics
148///
149/// This function panics if:
150/// - this function is called after `lambda_runtime::run()`
151/// - this function is called outside of a context that has access to the tokio i/o
152/// - the no-op extension cannot be registered
153/// - either signal listener panics [tokio::signal::unix](https://docs.rs/tokio/latest/tokio/signal/unix/fn.signal.html#errors)
154///
155/// # Example
156/// ```no_run
157/// use lambda_runtime::{Error, service_fn, LambdaEvent};
158/// use serde_json::Value;
159///
160/// #[tokio::main]
161/// async fn main() -> Result<(), Error> {
162///     let func = service_fn(func);
163///
164///     let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout());
165///     lambda_runtime::tracing::init_default_subscriber_with_writer(writer);
166///
167///     let shutdown_hook = || async move {
168///         std::mem::drop(log_guard);
169///     };
170///     lambda_runtime::spawn_graceful_shutdown_handler(shutdown_hook).await;
171///
172///     lambda_runtime::run(func).await?;
173///     Ok(())
174/// }
175///
176/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
177///     Ok(event.payload)
178/// }
179/// ```
180#[cfg(all(unix, feature = "graceful-shutdown"))]
181#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "graceful-shutdown"))))]
182pub async fn spawn_graceful_shutdown_handler<Fut>(shutdown_hook: impl FnOnce() -> Fut + Send + 'static)
183where
184    Fut: Future<Output = ()> + Send + 'static,
185{
186    // You need an extension registered with the Lambda orchestrator in order for your process
187    // to receive a SIGTERM for graceful shutdown.
188    //
189    // We accomplish this here by registering a no-op internal extension, which does not subscribe to any events.
190    //
191    // This extension is cheap to run since after it connects to the lambda orchestration, the connection
192    // will just wait forever for data to come, which never comes, so it won't cause wakes.
193    let extension = lambda_extension::Extension::new()
194        // Don't subscribe to any event types
195        .with_events(&[])
196        // Internal extension names MUST be unique within a given Lambda function.
197        .with_extension_name("_lambda-rust-runtime-no-op-graceful-shutdown-helper")
198        // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init
199        // phase and begins the Invoke phase.
200        .register()
201        .await
202        .expect("could not register no-op extension for graceful shutdown");
203
204    tokio::task::spawn(async move {
205        let graceful_shutdown_future = async move {
206            let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();
207            let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
208            tokio::select! {
209                _sigint = sigint.recv() => {
210                    eprintln!("[runtime] SIGINT received");
211                    eprintln!("[runtime] Graceful shutdown in progress ...");
212                    shutdown_hook().await;
213                    eprintln!("[runtime] Graceful shutdown completed");
214                    std::process::exit(0);
215                },
216                _sigterm = sigterm.recv()=> {
217                    eprintln!("[runtime] SIGTERM received");
218                    eprintln!("[runtime] Graceful shutdown in progress ...");
219                    shutdown_hook().await;
220                    eprintln!("[runtime] Graceful shutdown completed");
221                    std::process::exit(0);
222                },
223            }
224        };
225
226        let _: (_, ()) = tokio::join!(
227            // we always poll the graceful shutdown future first,
228            // which results in a smaller future due to lack of bookkeeping of which was last polled
229            biased;
230            graceful_shutdown_future, async {
231            // we suppress extension errors because we don't actually mind if it crashes,
232            // all we need to do is kick off the run so that lambda exits the init phase
233            let _ = extension.run().await;
234        });
235    });
236}