use crate::{
context::Context,
env::{ConfigProvider, EnvConfigProvider, FunctionSettings},
error::RuntimeError,
handler::Handler,
};
use failure::Fail;
use lambda_runtime_client::{error::ErrorResponse, RuntimeClient};
use lambda_runtime_errors::LambdaErrorExt;
use log::*;
use std::{fmt::Display, marker::PhantomData};
use tokio::runtime::Runtime as TokioRuntime;
include!(concat!(env!("OUT_DIR"), "/metadata.rs"));
const MAX_RETRIES: i8 = 3;
pub fn start<EventError>(f: impl Handler<EventError>, runtime: Option<TokioRuntime>)
where
EventError: Fail + LambdaErrorExt + Display + Send + Sync,
{
start_with_config(f, &EnvConfigProvider::default(), runtime)
}
#[macro_export]
macro_rules! lambda {
($handler:ident) => {
$crate::start($handler, None)
};
($handler:ident, $runtime:expr) => {
$crate::start($handler, Some($runtime))
};
($handler:expr) => {
$crate::start($handler, None)
};
($handler:expr, $runtime:expr) => {
$crate::start($handler, Some($runtime))
};
}
pub fn start_with_config<Config, EventError>(
f: impl Handler<EventError>,
config: &Config,
runtime: Option<TokioRuntime>,
) where
Config: ConfigProvider,
EventError: Fail + LambdaErrorExt + Display + Send + Sync,
{
let endpoint: String;
match config.get_runtime_api_endpoint() {
Ok(value) => endpoint = value,
Err(e) => {
panic!("Could not find runtime API env var: {}", e);
}
}
let function_config: FunctionSettings;
let settings = config.get_function_settings();
match settings {
Ok(env_settings) => function_config = env_settings,
Err(e) => {
panic!("Could not find runtime API env var: {}", e);
}
}
let info = Option::from(runtime_release().to_owned());
match RuntimeClient::new(&endpoint, info, runtime) {
Ok(client) => {
start_with_runtime_client(f, function_config, client);
}
Err(e) => {
panic!("Could not create runtime client SDK: {}", e);
}
}
}
pub(crate) fn start_with_runtime_client<EventError>(
f: impl Handler<EventError>,
func_settings: FunctionSettings,
client: RuntimeClient,
) where
EventError: Fail + LambdaErrorExt + Display + Send + Sync,
{
let mut lambda_runtime: Runtime<_, EventError> = Runtime::new(f, func_settings, MAX_RETRIES, client);
lambda_runtime.start();
}
pub(super) struct Runtime<Function, EventError> {
runtime_client: RuntimeClient,
handler: Function,
max_retries: i8,
settings: FunctionSettings,
_phantom: PhantomData<EventError>,
}
impl<Function, EventError> Runtime<Function, EventError>
where
Function: Handler<EventError>,
EventError: Fail + LambdaErrorExt + Display + Send + Sync,
{
pub(super) fn new(f: Function, config: FunctionSettings, retries: i8, client: RuntimeClient) -> Self {
debug!(
"Creating new runtime with {} max retries for endpoint {}",
retries,
client.get_endpoint()
);
Runtime {
runtime_client: client,
settings: config,
handler: f,
max_retries: retries,
_phantom: PhantomData,
}
}
}
impl<Function, EventError> Runtime<Function, EventError>
where
Function: Handler<EventError>,
EventError: Fail + LambdaErrorExt + Display + Send + Sync,
{
fn start(&mut self) {
debug!("Beginning main event loop");
loop {
let (event, ctx) = self.get_next_event(0, None);
let request_id = ctx.aws_request_id.clone();
info!("Received new event with AWS request id: {}", request_id);
let function_outcome = self.invoke(event, ctx);
match function_outcome {
Ok(response) => {
debug!(
"Function executed succesfully for {}, pushing response to Runtime API",
request_id
);
match self.runtime_client.event_response(&request_id, &response) {
Ok(_) => info!("Response for {} accepted by Runtime API", request_id),
Err(e) => {
error!("Could not send response for {} to Runtime API: {}", request_id, e);
if !e.is_recoverable() {
error!(
"Error for {} is not recoverable, sending fail_init signal and panicking.",
request_id
);
self.runtime_client.fail_init(&ErrorResponse::from(e));
panic!("Could not send response");
}
}
}
}
Err(e) => {
debug!("Handler returned an error for {}: {}", request_id, e);
debug!("Attempting to send error response to Runtime API for {}", request_id);
match self.runtime_client.event_error(&request_id, &ErrorResponse::from(e)) {
Ok(_) => info!("Error response for {} accepted by Runtime API", request_id),
Err(e) => {
error!("Unable to send error response for {} to Runtime API: {}", request_id, e);
if !e.is_recoverable() {
error!(
"Error for {} is not recoverable, sending fail_init signal and panicking",
request_id
);
self.runtime_client.fail_init(&ErrorResponse::from(e));
panic!("Could not send error response");
}
}
}
}
}
}
}
pub(super) fn invoke(&mut self, e: Vec<u8>, ctx: Context) -> Result<Vec<u8>, EventError> {
(self.handler).run(e, ctx)
}
pub(super) fn get_next_event(&self, retries: i8, e: Option<RuntimeError>) -> (Vec<u8>, Context) {
if let Some(err) = e {
if retries > self.max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
match err.request_id.clone() {
Some(req_id) => {
self.runtime_client
.event_error(&req_id, &ErrorResponse::from(err))
.expect("Could not send event error response");
}
None => {
self.runtime_client.fail_init(&ErrorResponse::from(err));
}
}
panic!("Could not retrieve next event");
}
}
match self.runtime_client.next_event() {
Ok((ev_data, invocation_ctx)) => {
let mut handler_ctx = Context::new(self.settings.clone());
handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
handler_ctx.client_context = invocation_ctx.client_context;
handler_ctx.identity = invocation_ctx.identity;
handler_ctx.deadline = invocation_ctx.deadline;
(ev_data, handler_ctx)
}
Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{context, env};
use lambda_runtime_client::RuntimeClient;
use lambda_runtime_errors::HandlerError;
#[test]
fn runtime_invokes_handler() {
let config: &dyn env::ConfigProvider = &env::tests::MockConfigProvider { error: false };
let client = RuntimeClient::new(
&config
.get_runtime_api_endpoint()
.expect("Could not get runtime endpoint"),
None,
None,
)
.expect("Could not initialize client");
let handler = |_e: Vec<u8>, _c: context::Context| -> Result<Vec<u8>, HandlerError> { Ok(b"hello".to_vec()) };
let retries: i8 = 3;
let mut runtime = Runtime::new(
handler,
config
.get_function_settings()
.expect("Could not load environment config"),
retries,
client,
);
let output = runtime.invoke(b"test".to_vec(), context::tests::test_context(10));
assert_eq!(
output.is_err(),
false,
"Handler threw an unexpected error: {}",
output.err().unwrap()
);
let output_bytes = output.ok().unwrap();
let output_string = String::from_utf8(output_bytes).unwrap();
assert_eq!(output_string, "hello", "Unexpected output message: {}", output_string);
}
}