use std::error::Error as StdError;
use std::future::Future;
use std::result::Result as StdResult;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::lambda_api::{LambdaApiClient, PublishErrorRequest};
use crate::model::Context;
pub type RuntimeResult = StdResult<(), Error>;
pub async fn listen_events<F, Fut, A, B, E>(handler: F) -> RuntimeResult
where F: Fn(A, Context) -> Fut + Sync + Send,
Fut: Future<Output=StdResult<B, E>> + Send,
A: for<'de> Deserialize<'de> + Send,
B: Serialize,
E: StdError
{
println!("Preparing to listen to events...");
let lambda_api = LambdaApiClient::default();
listen_events_with(lambda_api, handler).await
}
#[inline]
pub async fn listen_events_with<F, Fut, A, B, E>(lambda_api: LambdaApiClient, handler: F) -> RuntimeResult
where F: Fn(A, Context) -> Fut + Sync + Send,
Fut: Future<Output=StdResult<B, E>> + Send,
A: for<'de> Deserialize<'de> + Send,
B: Serialize,
E: StdError
{
loop {
try_invoke_lambda_handler(&lambda_api, &handler).await?;
if cfg!(test) {
return Ok(())
}
}
}
#[inline]
async fn try_invoke_lambda_handler<F, Fut, A, B, E>(lambda_api: &LambdaApiClient, handler: &F) -> RuntimeResult
where F: Fn(A, Context) -> Fut + Sync + Send,
Fut: Future<Output=StdResult<B, E>> + Send,
A: for<'de> Deserialize<'de> + Send,
B: Serialize,
E: StdError
{
let (bytes, context) = lambda_api.fetch_next_message().await?;
let request_id = context.request_id.clone();
let body = serde_json::from_slice(&bytes)?;
let result = (handler)(body, context).await;
match result {
Ok(payload) => lambda_api.publish_response(request_id, payload).await?,
Err(error) => {
let payload = PublishErrorRequest {
error_type: type_name_of_val(&error).to_string(),
error_message: format!("{}", error)
};
lambda_api.publish_error(request_id, payload).await?
}
}
Ok(())
}
fn type_name_of_val<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}
#[cfg(test)]
mod integration_tests {
use aws_lambda_events::event::alb::AlbTargetGroupRequest;
use httpmock::{MockRef, MockServer};
use rusoto_core::Region;
use rusoto_dynamodb::DynamoDbClient;
use crate::Error;
use crate::lambda_api::LambdaApiClient;
use crate::listen_events_with;
use crate::model::Config;
struct DynamoDbRepository {
#[allow(dead_code)]
dynamo_db: DynamoDbClient,
}
impl DynamoDbRepository {
fn create() -> Self {
let region = Region::default();
Self { dynamo_db: DynamoDbClient::new(region) }
}
async fn a_method_that_will_succeed(&self) -> Result<i32, Error> {
Ok(42)
}
async fn a_method_that_will_fail(&self) -> Result<(), Error> {
Err("Not implemented".into())
}
}
#[tokio::test]
async fn should_handle_successful_requests()
{
let mock_server = MockServer::start();
let (next, success, _error) = mock_lambda_runtime_endpoints(&mock_server);
let lambda_api = create_lambda_api_for_testing(mock_server.port());
let client = DynamoDbRepository::create();
let result = listen_events_with(lambda_api, |_req: AlbTargetGroupRequest, _ctx| {
client.a_method_that_will_succeed()
}).await;
if let Err(cause) = result {
panic!("Unexpected: {}", cause);
}
next.assert();
success.assert();
}
#[tokio::test]
async fn should_handle_failure_requests()
{
let mock_server = MockServer::start();
let (next, _success, error) = mock_lambda_runtime_endpoints(&mock_server);
let lambda_api = create_lambda_api_for_testing(mock_server.port());
let client = DynamoDbRepository::create();
let result = listen_events_with(lambda_api, |_req: AlbTargetGroupRequest, _ctx| {
client.a_method_that_will_fail()
}).await;
if let Err(cause) = result {
panic!("Unexpected: {}", cause);
}
next.assert();
error.assert();
}
fn create_lambda_api_for_testing(port: u16) -> LambdaApiClient {
LambdaApiClient::create(Config {
endpoint: format!("127.0.0.1:{}", port),
..Default::default()
})
}
fn mock_lambda_runtime_endpoints(server: &MockServer) -> (MockRef, MockRef, MockRef) {
let next_endpoint = server.mock(|when, then| {
when.path("/2018-06-01/runtime/invocation/next");
let alb_request = include_str!("../tests/sample_alb_request.json");
then.status(200)
.header("lambda-runtime-aws-request-id", "0000-0001")
.header("lambda-runtime-deadline-ms", "1000")
.header("lambda-runtime-invoked-function-arn", "arn::something")
.header("lambda-runtime-trace-id", "0001-0001")
.body(alb_request);
});
let success_endpoint = server.mock(|when, then| {
when.path("/2018-06-01/runtime/invocation/0000-0001/response")
.body("42")
.method("POST");
then.status(200);
});
let error_endpoint = server.mock(|when, then| {
when.path("/2018-06-01/runtime/invocation/0000-0001/error")
.body(r#"{"errorType":"mu_runtime::error::Error","errorMessage":"Not implemented"}"#)
.method("POST");
then.status(200);
});
(next_endpoint, success_endpoint, error_endpoint)
}
}