use crate::Error;
use http::{uri::Scheme, Request, Response, Uri};
use hyper::{client::HttpConnector, Body};
#[derive(Debug)]
pub(crate) struct Client<C = HttpConnector> {
base: Uri,
client: hyper::Client<C>,
}
impl<C> Client<C>
where
C: hyper::client::connect::Connect + Sync + Send + Clone + 'static,
{
pub fn with(base: Uri, client: hyper::Client<C>) -> Self {
Self { base, client }
}
fn set_origin<B>(&self, req: Request<B>) -> Result<Request<B>, Error> {
let (mut parts, body) = req.into_parts();
let (scheme, authority) = {
let scheme = self.base.scheme().unwrap_or(&Scheme::HTTP);
let authority = self.base.authority().expect("Authority not found");
(scheme, authority)
};
let path = parts.uri.path_and_query().expect("PathAndQuery not found");
let uri = Uri::builder()
.scheme(scheme.clone())
.authority(authority.clone())
.path_and_query(path.clone())
.build()
.expect("Unable to build URI");
parts.uri = uri;
Ok(Request::from_parts(parts, body))
}
pub(crate) async fn call(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
let req = self.set_origin(req)?;
let (parts, body) = req.into_parts();
let body = Body::from(body);
let req = Request::from_parts(parts, body);
let response = self.client.request(req).await?;
Ok(response)
}
}
#[cfg(test)]
mod endpoint_tests {
use crate::{
requests::{
EventCompletionRequest, EventErrorRequest, IntoRequest, IntoResponse, NextEventRequest, NextEventResponse,
},
simulated::Connector,
types::Diagnostic,
Error,
};
use http::{
uri::{PathAndQuery, Scheme},
HeaderValue, Method, Request, Response, StatusCode, Uri,
};
use hyper::{server::conn::Http, service::service_fn, Body};
use serde_json::json;
use std::convert::TryFrom;
use tokio::{
io::{AsyncRead, AsyncWrite},
select, sync,
sync::oneshot,
};
use tracing::{error, info, instrument};
#[instrument]
async fn hello(req: Request<Body>) -> Result<Response<Body>, Error> {
Ok(Response::new(Body::from("hello")))
}
async fn handle_incoming(req: Request<Body>) -> Result<Response<Body>, Error> {
let path: Vec<&str> = req
.uri()
.path_and_query()
.unwrap()
.as_str()
.split("/")
.collect::<Vec<&str>>();
match &path[1..] {
["2018-06-01", "runtime", "invocation", "next"] => next_event(&req).await,
["2018-06-01", "runtime", "invocation", id, "response"] => complete_event(&req, id).await,
["2018-06-01", "runtime", "invocation", id, "error"] => event_err(&req, id).await,
["2018-06-01", "runtime", "init", "error"] => unimplemented!(),
_ => unimplemented!(),
}
}
#[instrument(skip(io, rx))]
async fn handle<I>(io: I, rx: oneshot::Receiver<()>) -> Result<(), hyper::Error>
where
I: AsyncRead + AsyncWrite + Unpin + 'static,
{
let conn = Http::new().serve_connection(io, service_fn(handle_incoming));
select! {
_ = rx => {
info!("Received cancelation signal");
return Ok(())
}
res = conn => {
match res {
Ok(()) => return Ok(()),
Err(e) => {
error!(message = "Got error serving connection", e = %e);
return Err(e);
}
}
}
}
}
async fn next_event(req: &Request<Body>) -> Result<Response<Body>, Error> {
let path = "/2018-06-01/runtime/invocation/next";
assert_eq!(req.method(), Method::GET);
assert_eq!(req.uri().path_and_query().unwrap(), &PathAndQuery::from_static(path));
let body = json!({"message": "hello"});
let rsp = NextEventResponse {
request_id: "8476a536-e9f4-11e8-9739-2dfe598c3fcd",
deadline: 1_542_409_706_888,
arn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime",
trace_id: "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419",
body: serde_json::to_vec(&body)?,
};
rsp.into_rsp().map_err(|e| e.into())
}
async fn complete_event(req: &Request<Body>, id: &str) -> Result<Response<Body>, Error> {
assert_eq!(Method::POST, req.method());
let rsp = Response::builder()
.status(StatusCode::ACCEPTED)
.body(Body::empty())
.expect("Unable to construct response");
let expected = format!("/2018-06-01/runtime/invocation/{}/response", id);
assert_eq!(expected, req.uri().path());
Ok(rsp)
}
async fn event_err(req: &Request<Body>, id: &str) -> Result<Response<Body>, Error> {
let expected = format!("/2018-06-01/runtime/invocation/{}/error", id);
assert_eq!(expected, req.uri().path());
assert_eq!(req.method(), Method::POST);
let header = "lambda-runtime-function-error-type";
let expected = "unhandled";
assert_eq!(req.headers()[header], HeaderValue::try_from(expected)?);
let rsp = Response::builder().status(StatusCode::ACCEPTED).body(Body::empty())?;
Ok(rsp)
}
fn set_origin<B>(base: Uri, req: Request<B>) -> Result<Request<B>, Error> {
let (mut parts, body) = req.into_parts();
let (scheme, authority) = {
let scheme = base.scheme().unwrap_or(&Scheme::HTTP);
let authority = base.authority().expect("Authority not found");
(scheme, authority)
};
let path = parts.uri.path_and_query().expect("PathAndQuery not found");
let uri = Uri::builder()
.scheme(scheme.clone())
.authority(authority.clone())
.path_and_query(path.clone())
.build()
.expect("Unable to build URI");
parts.uri = uri;
Ok(Request::from_parts(parts, body))
}
#[tokio::test]
async fn test_next_event() -> Result<(), Error> {
let (client, server) = crate::simulated::chan();
let base = Uri::from_static("http://localhost:9001");
let (tx, rx) = sync::oneshot::channel();
let server = tokio::spawn(async {
handle(server, rx).await.expect("Unable to handle request");
});
let conn = Connector { inner: client };
let client = hyper::Client::builder().build(conn);
let req = NextEventRequest.into_req()?;
let req = set_origin(base, req)?;
let rsp = client.request(req).await.expect("Unable to send request");
assert_eq!(rsp.status(), StatusCode::OK);
let header = "lambda-runtime-deadline-ms";
assert_eq!(rsp.headers()[header], &HeaderValue::try_from("1542409706888")?);
tx.send(()).expect("Receiver has been dropped");
match server.await {
Ok(_) => Ok(()),
Err(e) if e.is_panic() => return Err::<(), Error>(e.into()),
Err(_) => unreachable!("This branch shouldn't be reachable"),
}
}
#[tokio::test]
async fn ok_response() -> Result<(), Error> {
let (client, server) = crate::simulated::chan();
let (tx, rx) = sync::oneshot::channel();
let base = Uri::from_static("http://localhost:9001");
let server = tokio::spawn(async {
handle(server, rx).await.expect("Unable to handle request");
});
let conn = Connector { inner: client };
let client = hyper::Client::builder().build(conn);
let req = EventCompletionRequest {
request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9",
body: "done",
};
let req = req.into_req()?;
let req = set_origin(base, req)?;
let rsp = client.request(req).await?;
assert_eq!(rsp.status(), StatusCode::ACCEPTED);
tx.send(()).expect("Receiver has been dropped");
match server.await {
Ok(_) => Ok(()),
Err(e) if e.is_panic() => return Err::<(), Error>(e.into()),
Err(_) => unreachable!("This branch shouldn't be reachable"),
}
}
#[tokio::test]
async fn error_response() -> Result<(), Error> {
let (client, server) = crate::simulated::chan();
let (tx, rx) = sync::oneshot::channel();
let base = Uri::from_static("http://localhost:9001");
let server = tokio::spawn(async {
handle(server, rx).await.expect("Unable to handle request");
});
let conn = Connector { inner: client };
let client = hyper::Client::builder().build(conn);
let req = EventErrorRequest {
request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9",
diagnostic: Diagnostic {
error_type: "InvalidEventDataError".to_string(),
error_message: "Error parsing event data".to_string(),
},
};
let req = req.into_req()?;
let req = set_origin(base, req)?;
let rsp = client.request(req).await?;
assert_eq!(rsp.status(), StatusCode::ACCEPTED);
tx.send(()).expect("Receiver has been dropped");
match server.await {
Ok(_) => Ok(()),
Err(e) if e.is_panic() => return Err::<(), Error>(e.into()),
Err(_) => unreachable!("This branch shouldn't be reachable"),
}
}
}