mod processor;
pub use processor::*;
use aws_lambda_runtime_proxy::Proxy;
use std::process::Stdio;
use tokio::{
io::{self, AsyncBufReadExt, AsyncRead, BufReader, Lines},
sync::{mpsc, oneshot, Mutex},
};
#[derive(Default)]
pub struct LogProxy {
pub stdout: Option<Processor>,
pub stderr: Option<Processor>,
pub disable_lambda_telemetry_log_fd: bool,
}
impl LogProxy {
pub fn stdout(mut self, builder: impl FnOnce(ProcessorBuilder) -> Processor) -> Self {
self.stdout = Some(builder(ProcessorBuilder::default()));
self
}
pub fn stderr(mut self, builder: impl FnOnce(ProcessorBuilder) -> Processor) -> Self {
self.stderr = Some(builder(ProcessorBuilder::default()));
self
}
pub fn disable_lambda_telemetry_log_fd(mut self, disable: bool) -> Self {
self.disable_lambda_telemetry_log_fd = disable;
self
}
pub async fn start(self) {
let mut command = Proxy::default_command();
if self.stdout.is_some() {
command.stdout(Stdio::piped());
}
if self.stderr.is_some() {
command.stderr(Stdio::piped());
}
if self.disable_lambda_telemetry_log_fd {
command.env_remove("_LAMBDA_TELEMETRY_LOG_FD");
}
let mut proxy = Proxy::default().command(command).spawn().await;
let stdout_checker_tx = proxy
.handler
.stdout
.take()
.map(|file| spawn_reader(file, self.stdout.unwrap()));
let stderr_checker_tx = proxy
.handler
.stderr
.take()
.map(|file| spawn_reader(file, self.stderr.unwrap()));
let client = Mutex::new(proxy.client);
proxy
.server
.serve(|req| async {
if req.uri().path() == "/2018-06-01/runtime/invocation/next" {
let stdout_ack_rx = send_checker(&stdout_checker_tx).await;
let stderr_ack_rx = send_checker(&stderr_checker_tx).await;
wait_for_ack(stdout_ack_rx).await;
wait_for_ack(stderr_ack_rx).await;
}
client.lock().await.send_request(req).await
})
.await
}
}
fn spawn_reader<T: AsyncRead + Send + 'static>(
file: T,
mut processor: Processor,
) -> mpsc::Sender<oneshot::Sender<()>>
where
BufReader<T>: Unpin,
{
let (checker_tx, mut checker_rx) = mpsc::channel::<oneshot::Sender<()>>(1);
tokio::spawn(async move {
let reader = io::BufReader::new(file);
let mut lines = reader.lines();
loop {
tokio::select! {
line = lines.next_line() => {
processor.process(line.unwrap().unwrap()).await;
while has_newline_in_buffer(&mut lines) {
processor.process(lines.next_line().await.unwrap().unwrap()).await;
}
processor.flush().await;
}
ack_tx = checker_rx.recv() => {
let mut need_flush = false;
while has_newline_in_buffer(&mut lines) {
processor.process(lines.next_line().await.unwrap().unwrap()).await;
need_flush = true;
}
if need_flush {
processor.flush().await;
}
ack_tx.unwrap().send(()).unwrap();
}
}
}
});
checker_tx
}
fn has_newline_in_buffer<T: AsyncRead + Send + 'static>(lines: &mut Lines<BufReader<T>>) -> bool
where
BufReader<T>: Unpin,
{
lines.get_ref().buffer().contains(&10)
}
async fn send_checker(
checker_tx: &Option<mpsc::Sender<oneshot::Sender<()>>>,
) -> Option<oneshot::Receiver<()>> {
match checker_tx {
Some(checker_tx) => {
let (ack_tx, ack_rx) = oneshot::channel();
checker_tx.send(ack_tx).await.unwrap();
Some(ack_rx)
}
None => None,
}
}
async fn wait_for_ack(ack_rx: Option<oneshot::Receiver<()>>) {
if let Some(ack_rx) = ack_rx {
ack_rx.await.unwrap();
}
}