use crate::types::ToStreamErrorTrailer;
use crate::{types::Diagnostic, Error, FunctionResponse, IntoFunctionResponse};
use bytes::Bytes;
use http::header::CONTENT_TYPE;
use http::{Method, Request, Response, Uri};
use lambda_runtime_api_client::{body::Body, build_request};
use serde::Serialize;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::str::FromStr;
use tokio_stream::{Stream, StreamExt};
pub(crate) trait IntoRequest {
fn into_req(self) -> Result<Request<Body>, Error>;
}
pub(crate) trait IntoResponse {
fn into_rsp(self) -> Result<Response<Body>, Error>;
}
#[derive(Debug, Eq, PartialEq)]
pub(crate) struct NextEventRequest;
impl IntoRequest for NextEventRequest {
fn into_req(self) -> Result<Request<Body>, Error> {
let req = build_request()
.method(Method::GET)
.uri(Uri::from_static("/2018-06-01/runtime/invocation/next"))
.body(Default::default())?;
Ok(req)
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct NextEventResponse<'a> {
pub request_id: &'a str,
pub deadline: u64,
pub arn: &'a str,
pub trace_id: &'a str,
pub body: Vec<u8>,
}
impl<'a> IntoResponse for NextEventResponse<'a> {
fn into_rsp(self) -> Result<Response<Body>, Error> {
let rsp = Response::builder()
.header("lambda-runtime-aws-request-id", self.request_id)
.header("lambda-runtime-deadline-ms", self.deadline)
.header("lambda-runtime-invoked-function-arn", self.arn)
.header("lambda-runtime-trace-id", self.trace_id)
.body(Body::from(self.body))?;
Ok(rsp)
}
}
#[test]
fn test_next_event_request() {
let req = NextEventRequest;
let req = req.into_req().unwrap();
assert_eq!(req.method(), Method::GET);
assert_eq!(req.uri(), &Uri::from_static("/2018-06-01/runtime/invocation/next"));
assert!(match req.headers().get("User-Agent") {
Some(header) => header.to_str().unwrap().starts_with("aws-lambda-rust/"),
None => false,
});
}
pub(crate) struct EventCompletionRequest<'a, R, B, S, D, E>
where
R: IntoFunctionResponse<B, S>,
B: Serialize,
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
D: Into<Bytes> + Send,
E: Into<Error> + Send + Debug,
{
pub(crate) request_id: &'a str,
pub(crate) body: R,
pub(crate) _unused_b: PhantomData<B>,
pub(crate) _unused_s: PhantomData<S>,
}
impl<'a, R, B, D, E, S> EventCompletionRequest<'a, R, B, S, D, E>
where
R: IntoFunctionResponse<B, S>,
B: Serialize,
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
D: Into<Bytes> + Send,
E: Into<Error> + Send + Debug,
{
pub(crate) fn new(request_id: &'a str, body: R) -> EventCompletionRequest<'a, R, B, S, D, E> {
EventCompletionRequest {
request_id,
body,
_unused_b: PhantomData::<B>,
_unused_s: PhantomData::<S>,
}
}
}
impl<'a, R, B, S, D, E> IntoRequest for EventCompletionRequest<'a, R, B, S, D, E>
where
R: IntoFunctionResponse<B, S>,
B: Serialize,
S: Stream<Item = Result<D, E>> + Unpin + Send + 'static,
D: Into<Bytes> + Send,
E: Into<Error> + Send + Debug,
{
fn into_req(self) -> Result<Request<Body>, Error> {
match self.body.into_response() {
FunctionResponse::BufferedResponse(body) => {
let uri = format!("/2018-06-01/runtime/invocation/{}/response", self.request_id);
let uri = Uri::from_str(&uri)?;
let body = serde_json::to_vec(&body)?;
let body = Body::from(body);
let req = build_request().method(Method::POST).uri(uri).body(body)?;
Ok(req)
}
FunctionResponse::StreamingResponse(mut response) => {
let uri = format!("/2018-06-01/runtime/invocation/{}/response", self.request_id);
let uri = Uri::from_str(&uri)?;
let mut builder = build_request().method(Method::POST).uri(uri);
let req_headers = builder.headers_mut().unwrap();
req_headers.insert("Transfer-Encoding", "chunked".parse()?);
req_headers.insert("Lambda-Runtime-Function-Response-Mode", "streaming".parse()?);
req_headers.append("Trailer", "Lambda-Runtime-Function-Error-Type".parse()?);
req_headers.append("Trailer", "Lambda-Runtime-Function-Error-Body".parse()?);
req_headers.insert(
"Content-Type",
"application/vnd.awslambda.http-integration-response".parse()?,
);
let preloud_headers = &mut response.metadata_prelude.headers;
preloud_headers
.entry(CONTENT_TYPE)
.or_insert("application/octet-stream".parse()?);
let metadata_prelude = serde_json::to_string(&response.metadata_prelude)?;
tracing::trace!(?metadata_prelude);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
tx.send_data(metadata_prelude.into()).await.unwrap();
tx.send_data("\u{0}".repeat(8).into()).await.unwrap();
while let Some(chunk) = response.stream.next().await {
let chunk = match chunk {
Ok(chunk) => chunk.into(),
Err(err) => err.into().to_tailer().into(),
};
tx.send_data(chunk).await.unwrap();
}
});
let req = builder.body(rx)?;
Ok(req)
}
}
}
}
#[test]
fn test_event_completion_request() {
let req = EventCompletionRequest::new("id", "hello, world!");
let req = req.into_req().unwrap();
let expected = Uri::from_static("/2018-06-01/runtime/invocation/id/response");
assert_eq!(req.method(), Method::POST);
assert_eq!(req.uri(), &expected);
assert!(match req.headers().get("User-Agent") {
Some(header) => header.to_str().unwrap().starts_with("aws-lambda-rust/"),
None => false,
});
}
pub(crate) struct EventErrorRequest<'a> {
pub(crate) request_id: &'a str,
pub(crate) diagnostic: Diagnostic<'a>,
}
impl<'a> EventErrorRequest<'a> {
pub(crate) fn new(request_id: &'a str, error_type: &'a str, error_message: &'a str) -> EventErrorRequest<'a> {
EventErrorRequest {
request_id,
diagnostic: Diagnostic {
error_type,
error_message,
},
}
}
}
impl<'a> IntoRequest for EventErrorRequest<'a> {
fn into_req(self) -> Result<Request<Body>, Error> {
let uri = format!("/2018-06-01/runtime/invocation/{}/error", self.request_id);
let uri = Uri::from_str(&uri)?;
let body = serde_json::to_vec(&self.diagnostic)?;
let body = Body::from(body);
let req = build_request()
.method(Method::POST)
.uri(uri)
.header("lambda-runtime-function-error-type", "unhandled")
.body(body)?;
Ok(req)
}
}
#[test]
fn test_event_error_request() {
let req = EventErrorRequest {
request_id: "id",
diagnostic: Diagnostic {
error_type: "InvalidEventDataError",
error_message: "Error parsing event data",
},
};
let req = req.into_req().unwrap();
let expected = Uri::from_static("/2018-06-01/runtime/invocation/id/error");
assert_eq!(req.method(), Method::POST);
assert_eq!(req.uri(), &expected);
assert!(match req.headers().get("User-Agent") {
Some(header) => header.to_str().unwrap().starts_with("aws-lambda-rust/"),
None => false,
});
}
struct InitErrorRequest;
impl IntoRequest for InitErrorRequest {
fn into_req(self) -> Result<Request<Body>, Error> {
let uri = "/2018-06-01/runtime/init/error".to_string();
let uri = Uri::from_str(&uri)?;
let req = build_request()
.method(Method::POST)
.uri(uri)
.header("lambda-runtime-function-error-type", "unhandled")
.body(Body::empty())?;
Ok(req)
}
}
#[test]
fn test_init_error_request() {
let req = InitErrorRequest;
let req = req.into_req().unwrap();
let expected = Uri::from_static("/2018-06-01/runtime/init/error");
assert_eq!(req.method(), Method::POST);
assert_eq!(req.uri(), &expected);
assert!(match req.headers().get("User-Agent") {
Some(header) => header.to_str().unwrap().starts_with("aws-lambda-rust/"),
None => false,
});
}