use super::{empty, BLOCK_NEXT_INVOCATION, LOCAL_REQUEST_ID};
use crate::sqs;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::body::Bytes;
use hyper::Error;
use hyper::Request;
use hyper::Response;
use regex::Regex;
use std::sync::OnceLock;
use tracing::{debug, error, info};
static RECEIPT_REGEX: OnceLock<Regex> = OnceLock::new();
pub(crate) async fn handler(req: Request<hyper::body::Incoming>) -> Response<BoxBody<Bytes, Error>> {
let regex = RECEIPT_REGEX.get_or_init(|| {
Regex::new(r"/runtime/invocation/(.+)/response").expect("Invalid response URL regex. It's a bug.")
});
let receipt_handle = regex
.captures(req.uri().path())
.unwrap_or_else(|| panic!("URL parsing regex failed on: {:?}. It' a bug", req.uri()))
.get(1)
.unwrap_or_else(|| {
panic!(
"Request URL does not conform to /runtime/invocation/AwsRequestId/response: {:?}",
req.uri()
)
})
.as_str()
.to_owned();
let response = match req.into_body().collect().await {
Ok(v) => v.to_bytes(),
Err(e) => panic!("Failed to read lambda response: {:?}", e),
};
let sqs_payload = match String::from_utf8(response.as_ref().to_vec()) {
Ok(v) => v,
Err(e) => {
panic!(
"Non-UTF-8 response from Lambda. {:?}\n{}",
e,
hex::encode(response.as_ref())
);
}
};
info!("Lambda response: {sqs_payload}");
if receipt_handle == LOCAL_REQUEST_ID {
if let Ok(mut w) = BLOCK_NEXT_INVOCATION.write() {
debug!("Blocking the next invocation");
*w = true;
} else {
error!("Write deadlock on BLOCK_NEXT_INVOCATION. It's a bug");
}
} else {
sqs::send_output(sqs_payload, receipt_handle).await;
}
Response::builder()
.status(hyper::StatusCode::OK)
.body(empty())
.expect("Failed to create a response")
}