use crate::CONFIG;
use async_once::AsyncOnce;
use aws_sdk_sqs::{types::Message, Client as SqsClient};
use flate2::read::GzEncoder;
use flate2::Compression;
use lambda_runtime::Context as Ctx;
use lazy_static::lazy_static;
use runtime_emulator_types::RequestPayload;
use std::io::prelude::*;
use tokio::time::{sleep, Duration};
use tracing::{info, warn};
lazy_static! {
pub(crate) static ref SQS_CLIENT: AsyncOnce<SqsClient> =
AsyncOnce::new(async { SqsClient::new(&aws_config::load_from_env().await) });
}
#[derive(Debug)]
pub(crate) struct SqsMessage {
pub payload: String,
pub receipt_handle: String,
pub ctx: Ctx,
}
pub(crate) async fn get_input() -> SqsMessage {
let config = CONFIG.get().await;
let client = SQS_CLIENT.get().await;
let mut wait_time = 0;
loop {
let resp = match client
.receive_message()
.max_number_of_messages(1)
.set_queue_url(Some(config.remote_config().request_queue_url.clone()))
.set_wait_time_seconds(Some(wait_time))
.send()
.await
{
Ok(v) => v,
Err(e) => {
warn!("Failed to get messages: {}", e);
sleep(Duration::from_millis(5000)).await;
continue;
}
};
if resp.messages.is_none() {
if wait_time == 0 {
info!("Lambda connected. Waiting for an incoming event from AWS.");
wait_time = 20;
}
continue;
}
let mut msgs = resp.messages.expect("Failed to get list of messages");
let (payload, receipt_handle) = if let Some(msg) = msgs.pop() {
match msg {
Message {
body: Some(body),
receipt_handle: Some(receipt_handle),
..
} => (body, receipt_handle),
_ => panic!("Invalid SQS message. Missing body or receipt: {:?}", msg),
}
} else {
continue;
};
let payload: RequestPayload = serde_json::from_str(&payload).expect("Failed to deserialize msg body");
let ctx = payload.ctx;
let payload = serde_json::to_string(&payload.event).expect("event contents cannot be serialized");
return SqsMessage {
payload,
receipt_handle,
ctx,
};
}
}
pub(crate) async fn get_default_queues() -> (Option<String>, Option<String>) {
let client = SQS_CLIENT.get().await;
let resp = match client
.list_queues()
.set_queue_name_prefix(Some("proxy_lambda_re".to_string()))
.set_max_results(Some(100))
.send()
.await
{
Ok(v) => v,
Err(e) => {
panic!("Failed to get list of SQS queues: {}", e);
}
};
let mut req_queue = None;
let mut resp_queue = None;
if let Some(queue_urls) = resp.queue_urls {
for url in queue_urls {
if url.ends_with("/proxy_lambda_req") {
req_queue = Some(url);
} else if url.ends_with("/proxy_lambda_resp") {
resp_queue = Some(url);
}
}
}
(req_queue, resp_queue)
}
pub(crate) async fn send_output(response: String, receipt_handle: String) {
let config = CONFIG.get().await;
let client = SQS_CLIENT.get().await;
let response_queue_url = match &config.remote_config().response_queue_url {
Some(v) => v.clone(),
None => {
info!("Response dropped: no response queue configured");
return;
}
};
let response = compress_output(response);
if response.len() < 262144 {
if let Err(e) = client
.send_message()
.set_message_body(Some(response))
.set_queue_url(Some(response_queue_url))
.send()
.await
{
panic!("Failed to send SQS response: {}", e);
};
} else {
info!(
" Response dropped: message size {}B, max allowed by SQS is 262,144 bytes",
response.len()
);
}
if let Err(e) = client
.delete_message()
.set_queue_url(Some(config.remote_config().request_queue_url.to_string()))
.set_receipt_handle(Some(receipt_handle))
.send()
.await
{
panic!("Failed to send SQS response: {}", e);
};
info!("Response sent and request deleted from the queue");
}
fn compress_output(response: String) -> String {
if response.len() < 262144 {
return response;
}
info!(
"Message size: {}B, max allowed: 262144B. Compressing...",
response.len()
);
let mut gzipper = GzEncoder::new(response.as_bytes(), Compression::fast());
let mut gzipped: Vec<u8> = Vec::new();
let compressed_len = match gzipper.read_to_end(&mut gzipped) {
Ok(v) => v,
Err(e) => {
panic!("Failed to gzip the payload: {}", e);
}
};
let response = bs58::encode(&gzipped).into_string();
info!("Compressed: {}, encoded: {}", compressed_len, response.len());
response
}