use std::{error::Error, result};
use serde;
use serde_json;
use context::Context;
use env::{ConfigProvider, EnvConfigProvider, FunctionSettings};
use error::{HandlerError, RuntimeError};
use lambda_runtime_client::RuntimeClient;
use tokio::runtime::Runtime as TokioRuntime;
const MAX_RETRIES: i8 = 3;
pub type Handler<E, O> = fn(E, Context) -> Result<O, HandlerError>;
pub fn start<E, O>(f: Handler<E, O>, runtime: Option<TokioRuntime>)
where
for<'invocation> E: serde::Deserialize<'invocation>,
O: serde::Serialize,
{
start_with_config(f, &EnvConfigProvider::new(), runtime)
}
#[macro_export]
macro_rules! lambda {
($handler:ident) => {
$crate::start($handler, None)
};
($handler:ident, $runtime:expr) => {
$crate::start($handler, Some($runtime))
};
}
pub(crate) fn start_with_config<E, O, C>(f: Handler<E, O>, config: &C, runtime: Option<TokioRuntime>)
where
for<'invocation> E: serde::Deserialize<'invocation>,
O: serde::Serialize,
C: ConfigProvider,
{
let endpoint: String;
match config.get_runtime_api_endpoint() {
Ok(value) => endpoint = value,
Err(e) => {
panic!("Could not find runtime API env var: {}", e);
}
}
let function_config: FunctionSettings;
let settings = config.get_function_settings();
match settings {
Ok(env_settings) => function_config = env_settings,
Err(e) => {
panic!("Could not find runtime API env var: {}", e);
}
}
match RuntimeClient::new(endpoint, runtime) {
Ok(client) => {
start_with_runtime_client(f, function_config, client);
}
Err(e) => {
panic!("Could not create runtime client SDK: {}", e);
}
}
}
pub(crate) fn start_with_runtime_client<E, O>(f: Handler<E, O>, func_settings: FunctionSettings, client: RuntimeClient)
where
for<'invocation> E: serde::Deserialize<'invocation>,
O: serde::Serialize,
{
let lambda_runtime: Runtime<E, O>;
match Runtime::new(f, func_settings, MAX_RETRIES, client) {
Ok(r) => lambda_runtime = r,
Err(e) => {
panic!("Error while starting runtime: {}", e);
}
}
lambda_runtime.start();
}
pub(super) struct Runtime<E, O> {
runtime_client: RuntimeClient,
handler: Handler<E, O>,
max_retries: i8,
settings: FunctionSettings,
}
impl<E, O> Runtime<E, O> {
pub(super) fn new(
f: Handler<E, O>,
config: FunctionSettings,
retries: i8,
client: RuntimeClient,
) -> result::Result<Runtime<E, O>, RuntimeError> {
debug!(
"Creating new runtime with {} max retries for endpoint {}",
retries,
client.get_endpoint()
);
Ok(Runtime {
runtime_client: client,
settings: config,
handler: f,
max_retries: retries,
})
}
}
impl<'env, E, O> Runtime<E, O>
where
for<'de> E: serde::Deserialize<'de>,
O: serde::Serialize,
{
fn start(&self) {
debug!("Beginning main event loop");
loop {
let (event, ctx) = self.get_next_event(0, None);
let request_id = ctx.aws_request_id.clone();
info!("Received new event with AWS request id: {}", request_id);
let function_outcome = self.invoke(event, ctx);
match function_outcome {
Ok(response) => {
debug!(
"Function executed succesfully for {}, pushing response to Runtime API",
request_id
);
match serde_json::to_vec(&response) {
Ok(response_bytes) => {
match self.runtime_client.event_response(&request_id, response_bytes) {
Ok(_) => info!("Response for {} accepted by Runtime API", request_id),
Err(e) => {
error!("Could not send response for {} to Runtime API: {}", request_id, e);
if !e.recoverable {
error!(
"Error for {} is not recoverable, sending fail_init signal and panicking.",
request_id
);
self.runtime_client.fail_init(&e);
panic!("Could not send response");
}
}
}
}
Err(e) => {
error!(
"Could not marshal output object to Vec<u8> JSON represnetation for request {}: {}",
request_id, e
);
self.runtime_client
.fail_init(&RuntimeError::unrecoverable(e.description()));
panic!("Failed to marshal handler output, panic");
}
}
}
Err(e) => {
debug!("Handler returned an error for {}: {}", request_id, e);
debug!("Attempting to send error response to Runtime API for {}", request_id);
match self.runtime_client.event_error(&request_id, &e) {
Ok(_) => info!("Error response for {} accepted by Runtime API", request_id),
Err(e) => {
error!("Unable to send error response for {} to Runtime API: {}", request_id, e);
if !e.recoverable {
error!(
"Error for {} is not recoverable, sending fail_init signal and panicking",
request_id
);
self.runtime_client.fail_init(&e);
panic!("Could not send error response");
}
}
}
}
}
}
}
pub(super) fn invoke(&self, e: E, ctx: Context) -> Result<O, HandlerError> {
(self.handler)(e, ctx)
}
pub(super) fn get_next_event(&self, retries: i8, e: Option<RuntimeError>) -> (E, Context) {
if let Some(err) = e {
if retries > self.max_retries {
error!("Unrecoverable error while fetching next event: {}", err);
match err.request_id.clone() {
Some(req_id) => {
self.runtime_client
.event_error(&req_id, &err)
.expect("Could not send event error response");
}
None => {
self.runtime_client.fail_init(&err);
}
}
panic!("Could not retrieve next event");
}
}
match self.runtime_client.next_event() {
Ok((ev_data, invocation_ctx)) => {
let parse_result = serde_json::from_slice(&ev_data);
match parse_result {
Ok(ev) => {
let mut handler_ctx = Context::new(self.settings.clone());
handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn;
handler_ctx.aws_request_id = invocation_ctx.aws_request_id;
handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id;
handler_ctx.client_context = invocation_ctx.client_context;
handler_ctx.identity = invocation_ctx.identity;
handler_ctx.deadline = invocation_ctx.deadline;
(ev, handler_ctx)
}
Err(mut e) => {
error!("Could not parse event to type: {}", e);
let mut runtime_err = RuntimeError::from(e);
runtime_err.request_id = Option::from(invocation_ctx.aws_request_id);
self.get_next_event(retries + 1, Option::from(runtime_err))
}
}
}
Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use context;
use env;
use lambda_runtime_client::RuntimeClient;
#[test]
fn runtime_invokes_handler() {
let config: &env::ConfigProvider = &env::tests::MockConfigProvider { error: false };
let client = RuntimeClient::new(
config
.get_runtime_api_endpoint()
.expect("Could not get runtime endpoint"),
None,
)
.expect("Could not initialize client");
let handler = |_e: String, _c: context::Context| -> Result<String, HandlerError> { Ok("hello".to_string()) };
let retries: i8 = 3;
let runtime = Runtime::new(
handler,
config
.get_function_settings()
.expect("Could not load environment config"),
retries,
client,
);
assert_eq!(
runtime.is_err(),
false,
"Runtime threw an unexpected error: {}",
runtime.err().unwrap()
);
let output = runtime
.unwrap()
.invoke(String::from("test"), context::tests::test_context(10));
assert_eq!(
output.is_err(),
false,
"Handler threw an unexpected error: {}",
output.err().unwrap()
);
let output_string = output.unwrap();
assert_eq!(output_string, "hello", "Unexpected output message: {}", output_string);
}
}