lambda_runtime_core/
runtime.rs

1use crate::{
2    context::Context,
3    env::{ConfigProvider, EnvConfigProvider, FunctionSettings},
4    error::RuntimeError,
5    handler::Handler,
6};
7use failure::Fail;
8use lambda_runtime_client::{error::ErrorResponse, RuntimeClient};
9use lambda_runtime_errors::LambdaErrorExt;
10use log::*;
11use std::{fmt::Display, marker::PhantomData};
12use tokio::runtime::Runtime as TokioRuntime;
13
14// include file generated during the build process
15include!(concat!(env!("OUT_DIR"), "/metadata.rs"));
16
17const MAX_RETRIES: i8 = 3;
18
19/// Creates a new runtime and begins polling for events using Lambda's Runtime APIs.
20///
21/// # Arguments
22///
23/// * `f` A function pointer that conforms to the `Handler` type.
24///
25/// # Panics
26/// The function panics if the Lambda environment variables are not set.
27pub fn start<EventError>(f: impl Handler<EventError>, runtime: Option<TokioRuntime>)
28where
29    EventError: Fail + LambdaErrorExt + Display + Send + Sync,
30{
31    start_with_config(f, &EnvConfigProvider::default(), runtime)
32}
33
34#[macro_export]
35/// Starts an event listener which will parse incoming events into the even type requested by
36/// `handler` and will invoke `handler` on each incoming event. Can optionally be passed a Tokio
37/// `runtime` to build the listener on. If none is provided, it creates its own.
38macro_rules! lambda {
39    ($handler:ident) => {
40        $crate::start($handler, None)
41    };
42    ($handler:ident, $runtime:expr) => {
43        $crate::start($handler, Some($runtime))
44    };
45    ($handler:expr) => {
46        $crate::start($handler, None)
47    };
48    ($handler:expr, $runtime:expr) => {
49        $crate::start($handler, Some($runtime))
50    };
51}
52
53/// Internal implementation of the start method that receives a config provider. This method
54/// is used for unit tests with a mock provider. The provider data is used to construct the
55/// `HttpRuntimeClient` which is then passed to the `start_with_runtime_client()` function.
56///
57/// # Arguments
58///
59/// * `f` A function pointer that conforms to the `Handler` type.
60/// * `config` An implementation of the `ConfigProvider` trait with static lifetime.
61///
62/// # Panics
63/// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()`
64/// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment
65/// and spin up a new one for the next invocation.
66pub fn start_with_config<Config, EventError>(
67    f: impl Handler<EventError>,
68    config: &Config,
69    runtime: Option<TokioRuntime>,
70) where
71    Config: ConfigProvider,
72    EventError: Fail + LambdaErrorExt + Display + Send + Sync,
73{
74    // if we cannot find the endpoint we panic, nothing else we can do.
75    let endpoint: String;
76    match config.get_runtime_api_endpoint() {
77        Ok(value) => endpoint = value,
78        Err(e) => {
79            panic!("Could not find runtime API env var: {}", e);
80        }
81    }
82
83    // if we can't get the settings from the environment variable
84    // we also panic.
85    let function_config: FunctionSettings;
86    let settings = config.get_function_settings();
87    match settings {
88        Ok(env_settings) => function_config = env_settings,
89        Err(e) => {
90            panic!("Could not find runtime API env var: {}", e);
91        }
92    }
93
94    let info = Option::from(runtime_release().to_owned());
95
96    match RuntimeClient::new(&endpoint, info, runtime) {
97        Ok(client) => {
98            start_with_runtime_client(f, function_config, client);
99        }
100        Err(e) => {
101            panic!("Could not create runtime client SDK: {}", e);
102        }
103    }
104}
105
106/// Starts the rust runtime with the given Runtime API client.
107///
108/// # Arguments
109///
110/// * `f` A function pointer that conforms to the `Handler` type.
111/// * `client` An implementation of the `lambda_runtime_client::RuntimeClient`
112///            trait with a lifetime that matches that of the environment,
113///            in this case expressed as `'env`.
114///
115/// # Panics
116/// The function panics if we cannot instantiate a new `RustRuntime` object.
117pub(crate) fn start_with_runtime_client<EventError>(
118    f: impl Handler<EventError>,
119    func_settings: FunctionSettings,
120    client: RuntimeClient,
121) where
122    EventError: Fail + LambdaErrorExt + Display + Send + Sync,
123{
124    let mut lambda_runtime: Runtime<_, EventError> = Runtime::new(f, func_settings, MAX_RETRIES, client);
125
126    // start the infinite loop
127    lambda_runtime.start();
128}
129
130/// Internal representation of the runtime object that polls for events and communicates
131/// with the Runtime APIs
132pub(super) struct Runtime<Function, EventError> {
133    runtime_client: RuntimeClient,
134    handler: Function,
135    max_retries: i8,
136    settings: FunctionSettings,
137    _phantom: PhantomData<EventError>,
138}
139
140// generic methods implementation
141impl<Function, EventError> Runtime<Function, EventError>
142where
143    Function: Handler<EventError>,
144    EventError: Fail + LambdaErrorExt + Display + Send + Sync,
145{
146    /// Creates a new instance of the `Runtime` object populated with the environment
147    /// settings.
148    ///
149    /// # Arguments
150    ///
151    /// * `f` A function pointer that conforms to the `Handler` type.
152    /// * `retries` The maximum number of times we should retry calling the Runtime APIs
153    ///             for recoverable errors while polling for new events.
154    ///
155    /// # Return
156    /// A `Result` for the `Runtime` object or a `errors::RuntimeSerror`. The runtime
157    /// fails the init if this function returns an error. If we cannot find the
158    /// `AWS_LAMBDA_RUNTIME_API` variable in the environment the function panics.
159    pub(super) fn new(f: Function, config: FunctionSettings, retries: i8, client: RuntimeClient) -> Self {
160        debug!(
161            "Creating new runtime with {} max retries for endpoint {}",
162            retries,
163            client.get_endpoint()
164        );
165
166        Runtime {
167            runtime_client: client,
168            settings: config,
169            handler: f,
170            max_retries: retries,
171            _phantom: PhantomData,
172        }
173    }
174}
175
176// implementation of methods that require the Event and Output types
177// to be compatible with `serde`'s Deserialize/Serialize.
178impl<Function, EventError> Runtime<Function, EventError>
179where
180    Function: Handler<EventError>,
181    EventError: Fail + LambdaErrorExt + Display + Send + Sync,
182{
183    /// Starts the main event loop and begin polling or new events. If one of the
184    /// Runtime APIs returns an unrecoverable error this method calls the init failed
185    /// API and then panics.
186    fn start(&mut self) {
187        debug!("Beginning main event loop");
188        loop {
189            let (event, ctx) = self.get_next_event(0, None);
190            let request_id = ctx.aws_request_id.clone();
191            info!("Received new event with AWS request id: {}", request_id);
192            let function_outcome = self.invoke(event, ctx);
193            match function_outcome {
194                Ok(response) => {
195                    debug!(
196                        "Function executed succesfully for {}, pushing response to Runtime API",
197                        request_id
198                    );
199                    match self.runtime_client.event_response(&request_id, &response) {
200                        Ok(_) => info!("Response for {} accepted by Runtime API", request_id),
201                        // unrecoverable error while trying to communicate with the endpoint.
202                        // we let the Lambda Runtime API know that we have died
203                        Err(e) => {
204                            error!("Could not send response for {} to Runtime API: {}", request_id, e);
205                            if !e.is_recoverable() {
206                                error!(
207                                    "Error for {} is not recoverable, sending fail_init signal and panicking.",
208                                    request_id
209                                );
210                                self.runtime_client.fail_init(&ErrorResponse::from(e));
211                                panic!("Could not send response");
212                            }
213                        }
214                    }
215                }
216                Err(e) => {
217                    error!("Handler returned an error for {}: {}", request_id, e);
218                    debug!("Attempting to send error response to Runtime API for {}", request_id);
219                    match self.runtime_client.event_error(&request_id, &ErrorResponse::from(e)) {
220                        Ok(_) => info!("Error response for {} accepted by Runtime API", request_id),
221                        Err(e) => {
222                            error!("Unable to send error response for {} to Runtime API: {}", request_id, e);
223                            if !e.is_recoverable() {
224                                error!(
225                                    "Error for {} is not recoverable, sending fail_init signal and panicking",
226                                    request_id
227                                );
228                                self.runtime_client.fail_init(&ErrorResponse::from(e));
229                                panic!("Could not send error response");
230                            }
231                        }
232                    }
233                }
234            }
235        }
236    }
237
238    /// Invoke the handler function. This method is split out of the main loop to
239    /// make it testable.
240    pub(super) fn invoke(&mut self, e: Vec<u8>, ctx: Context) -> Result<Vec<u8>, EventError> {
241        (self.handler).run(e, ctx)
242    }
243
244    /// Attempts to get the next event from the Runtime APIs and keeps retrying
245    /// unless the error throws is not recoverable.
246    ///
247    /// # Return
248    /// The next `Event` object to be processed.
249    pub(super) fn get_next_event(&self, retries: i8, e: Option<RuntimeError>) -> (Vec<u8>, Context) {
250        if let Some(err) = e {
251            if retries > self.max_retries {
252                error!("Unrecoverable error while fetching next event: {}", err);
253                match err.request_id.clone() {
254                    Some(req_id) => {
255                        self.runtime_client
256                            .event_error(&req_id, &ErrorResponse::from(err))
257                            .expect("Could not send event error response");
258                    }
259                    None => {
260                        self.runtime_client.fail_init(&ErrorResponse::from(err));
261                    }
262                }
263
264                // these errors are not recoverable. Either we can't communicate with the runtie APIs
265                // or we cannot parse the event. panic to restart the environment.
266                panic!("Could not retrieve next event");
267            }
268        }
269
270        match self.runtime_client.next_event() {
271            Ok((ev_data, invocation_ctx)) => {
272                let mut handler_ctx = Context::new(self.settings.clone());
273                handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
274                handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
275                handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
276                handler_ctx.client_context = invocation_ctx.client_context;
277                handler_ctx.identity = invocation_ctx.identity;
278                handler_ctx.deadline = invocation_ctx.deadline;
279
280                (ev_data, handler_ctx)
281            }
282            Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))),
283        }
284    }
285}
286
287#[cfg(test)]
288pub(crate) mod tests {
289    use super::*;
290    use crate::{context, env};
291    use lambda_runtime_client::RuntimeClient;
292    use lambda_runtime_errors::HandlerError;
293
294    #[test]
295    fn runtime_invokes_handler() {
296        let config: &dyn env::ConfigProvider = &env::tests::MockConfigProvider { error: false };
297        let client = RuntimeClient::new(
298            &config
299                .get_runtime_api_endpoint()
300                .expect("Could not get runtime endpoint"),
301            None,
302            None,
303        )
304        .expect("Could not initialize client");
305        let handler = |_e: Vec<u8>, _c: context::Context| -> Result<Vec<u8>, HandlerError> { Ok(b"hello".to_vec()) };
306        let retries: i8 = 3;
307        let mut runtime = Runtime::new(
308            handler,
309            config
310                .get_function_settings()
311                .expect("Could not load environment config"),
312            retries,
313            client,
314        );
315        let output = runtime.invoke(b"test".to_vec(), context::tests::test_context(10));
316        assert_eq!(
317            output.is_err(),
318            false,
319            "Handler threw an unexpected error: {}",
320            output.err().unwrap()
321        );
322        let output_bytes = output.ok().unwrap();
323        let output_string = String::from_utf8(output_bytes).unwrap();
324        assert_eq!(output_string, "hello", "Unexpected output message: {}", output_string);
325    }
326}