1use crate::{
2 context::Context,
3 env::{ConfigProvider, EnvConfigProvider, FunctionSettings},
4 error::RuntimeError,
5 handler::Handler,
6};
7use failure::Fail;
8use lambda_runtime_client::{error::ErrorResponse, RuntimeClient};
9use lambda_runtime_errors::LambdaErrorExt;
10use log::*;
11use std::{fmt::Display, marker::PhantomData};
12use tokio::runtime::Runtime as TokioRuntime;
13
14include!(concat!(env!("OUT_DIR"), "/metadata.rs"));
16
17const MAX_RETRIES: i8 = 3;
18
19pub fn start<EventError>(f: impl Handler<EventError>, runtime: Option<TokioRuntime>)
28where
29 EventError: Fail + LambdaErrorExt + Display + Send + Sync,
30{
31 start_with_config(f, &EnvConfigProvider::default(), runtime)
32}
33
34#[macro_export]
35macro_rules! lambda {
39 ($handler:ident) => {
40 $crate::start($handler, None)
41 };
42 ($handler:ident, $runtime:expr) => {
43 $crate::start($handler, Some($runtime))
44 };
45 ($handler:expr) => {
46 $crate::start($handler, None)
47 };
48 ($handler:expr, $runtime:expr) => {
49 $crate::start($handler, Some($runtime))
50 };
51}
52
53pub fn start_with_config<Config, EventError>(
67 f: impl Handler<EventError>,
68 config: &Config,
69 runtime: Option<TokioRuntime>,
70) where
71 Config: ConfigProvider,
72 EventError: Fail + LambdaErrorExt + Display + Send + Sync,
73{
74 let endpoint: String;
76 match config.get_runtime_api_endpoint() {
77 Ok(value) => endpoint = value,
78 Err(e) => {
79 panic!("Could not find runtime API env var: {}", e);
80 }
81 }
82
83 let function_config: FunctionSettings;
86 let settings = config.get_function_settings();
87 match settings {
88 Ok(env_settings) => function_config = env_settings,
89 Err(e) => {
90 panic!("Could not find runtime API env var: {}", e);
91 }
92 }
93
94 let info = Option::from(runtime_release().to_owned());
95
96 match RuntimeClient::new(&endpoint, info, runtime) {
97 Ok(client) => {
98 start_with_runtime_client(f, function_config, client);
99 }
100 Err(e) => {
101 panic!("Could not create runtime client SDK: {}", e);
102 }
103 }
104}
105
106pub(crate) fn start_with_runtime_client<EventError>(
118 f: impl Handler<EventError>,
119 func_settings: FunctionSettings,
120 client: RuntimeClient,
121) where
122 EventError: Fail + LambdaErrorExt + Display + Send + Sync,
123{
124 let mut lambda_runtime: Runtime<_, EventError> = Runtime::new(f, func_settings, MAX_RETRIES, client);
125
126 lambda_runtime.start();
128}
129
130pub(super) struct Runtime<Function, EventError> {
133 runtime_client: RuntimeClient,
134 handler: Function,
135 max_retries: i8,
136 settings: FunctionSettings,
137 _phantom: PhantomData<EventError>,
138}
139
140impl<Function, EventError> Runtime<Function, EventError>
142where
143 Function: Handler<EventError>,
144 EventError: Fail + LambdaErrorExt + Display + Send + Sync,
145{
146 pub(super) fn new(f: Function, config: FunctionSettings, retries: i8, client: RuntimeClient) -> Self {
160 debug!(
161 "Creating new runtime with {} max retries for endpoint {}",
162 retries,
163 client.get_endpoint()
164 );
165
166 Runtime {
167 runtime_client: client,
168 settings: config,
169 handler: f,
170 max_retries: retries,
171 _phantom: PhantomData,
172 }
173 }
174}
175
176impl<Function, EventError> Runtime<Function, EventError>
179where
180 Function: Handler<EventError>,
181 EventError: Fail + LambdaErrorExt + Display + Send + Sync,
182{
183 fn start(&mut self) {
187 debug!("Beginning main event loop");
188 loop {
189 let (event, ctx) = self.get_next_event(0, None);
190 let request_id = ctx.aws_request_id.clone();
191 info!("Received new event with AWS request id: {}", request_id);
192 let function_outcome = self.invoke(event, ctx);
193 match function_outcome {
194 Ok(response) => {
195 debug!(
196 "Function executed succesfully for {}, pushing response to Runtime API",
197 request_id
198 );
199 match self.runtime_client.event_response(&request_id, &response) {
200 Ok(_) => info!("Response for {} accepted by Runtime API", request_id),
201 Err(e) => {
204 error!("Could not send response for {} to Runtime API: {}", request_id, e);
205 if !e.is_recoverable() {
206 error!(
207 "Error for {} is not recoverable, sending fail_init signal and panicking.",
208 request_id
209 );
210 self.runtime_client.fail_init(&ErrorResponse::from(e));
211 panic!("Could not send response");
212 }
213 }
214 }
215 }
216 Err(e) => {
217 error!("Handler returned an error for {}: {}", request_id, e);
218 debug!("Attempting to send error response to Runtime API for {}", request_id);
219 match self.runtime_client.event_error(&request_id, &ErrorResponse::from(e)) {
220 Ok(_) => info!("Error response for {} accepted by Runtime API", request_id),
221 Err(e) => {
222 error!("Unable to send error response for {} to Runtime API: {}", request_id, e);
223 if !e.is_recoverable() {
224 error!(
225 "Error for {} is not recoverable, sending fail_init signal and panicking",
226 request_id
227 );
228 self.runtime_client.fail_init(&ErrorResponse::from(e));
229 panic!("Could not send error response");
230 }
231 }
232 }
233 }
234 }
235 }
236 }
237
238 pub(super) fn invoke(&mut self, e: Vec<u8>, ctx: Context) -> Result<Vec<u8>, EventError> {
241 (self.handler).run(e, ctx)
242 }
243
244 pub(super) fn get_next_event(&self, retries: i8, e: Option<RuntimeError>) -> (Vec<u8>, Context) {
250 if let Some(err) = e {
251 if retries > self.max_retries {
252 error!("Unrecoverable error while fetching next event: {}", err);
253 match err.request_id.clone() {
254 Some(req_id) => {
255 self.runtime_client
256 .event_error(&req_id, &ErrorResponse::from(err))
257 .expect("Could not send event error response");
258 }
259 None => {
260 self.runtime_client.fail_init(&ErrorResponse::from(err));
261 }
262 }
263
264 panic!("Could not retrieve next event");
267 }
268 }
269
270 match self.runtime_client.next_event() {
271 Ok((ev_data, invocation_ctx)) => {
272 let mut handler_ctx = Context::new(self.settings.clone());
273 handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
274 handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
275 handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
276 handler_ctx.client_context = invocation_ctx.client_context;
277 handler_ctx.identity = invocation_ctx.identity;
278 handler_ctx.deadline = invocation_ctx.deadline;
279
280 (ev_data, handler_ctx)
281 }
282 Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))),
283 }
284 }
285}
286
287#[cfg(test)]
288pub(crate) mod tests {
289 use super::*;
290 use crate::{context, env};
291 use lambda_runtime_client::RuntimeClient;
292 use lambda_runtime_errors::HandlerError;
293
294 #[test]
295 fn runtime_invokes_handler() {
296 let config: &dyn env::ConfigProvider = &env::tests::MockConfigProvider { error: false };
297 let client = RuntimeClient::new(
298 &config
299 .get_runtime_api_endpoint()
300 .expect("Could not get runtime endpoint"),
301 None,
302 None,
303 )
304 .expect("Could not initialize client");
305 let handler = |_e: Vec<u8>, _c: context::Context| -> Result<Vec<u8>, HandlerError> { Ok(b"hello".to_vec()) };
306 let retries: i8 = 3;
307 let mut runtime = Runtime::new(
308 handler,
309 config
310 .get_function_settings()
311 .expect("Could not load environment config"),
312 retries,
313 client,
314 );
315 let output = runtime.invoke(b"test".to_vec(), context::tests::test_context(10));
316 assert_eq!(
317 output.is_err(),
318 false,
319 "Handler threw an unexpected error: {}",
320 output.err().unwrap()
321 );
322 let output_bytes = output.ok().unwrap();
323 let output_string = String::from_utf8(output_bytes).unwrap();
324 assert_eq!(output_string, "hello", "Unexpected output message: {}", output_string);
325 }
326}