#[cfg(feature = "emf")]
mod emf;
mod processor;
use aws_lambda_runtime_proxy::{LambdaRuntimeApiClient, MockLambdaRuntimeApiServer};
use chrono::Utc;
use tokio::{
io::{stdin, AsyncBufReadExt, AsyncRead, BufReader},
sync::{mpsc, oneshot},
};
use tracing::{debug, trace};
#[cfg(feature = "emf")]
pub use emf::*;
pub use processor::*;
pub struct LogProxy<P> {
pub processor: P,
pub buffer_size: usize,
pub port: u16,
}
impl<P: Default> Default for LogProxy<P> {
fn default() -> Self {
Self {
processor: Default::default(),
buffer_size: 256,
port: 3000,
}
}
}
impl LogProxy<()> {
pub fn new() -> Self {
Self::default()
}
}
impl<P> LogProxy<P> {
pub fn processor<T>(self, processor: T) -> LogProxy<T> {
LogProxy {
processor,
buffer_size: self.buffer_size,
port: self.port,
}
}
pub fn simple<T>(
self,
builder: impl FnOnce(SimpleProcessorBuilder<fn(String) -> Option<String>, ()>) -> SimpleProcessor<T>,
) -> LogProxy<SimpleProcessor<T>> {
self.processor(builder(SimpleProcessorBuilder::new()))
}
pub fn buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub async fn start(self)
where
P: Processor,
{
debug!(port = %self.port, buffer_size = %self.buffer_size, "Starting log proxy");
let checker_tx = spawn_reader(stdin(), self.processor, self.buffer_size);
MockLambdaRuntimeApiServer::bind(self.port)
.await
.unwrap()
.serve(move |req| {
let checker_tx = checker_tx.clone();
async move {
let is_invocation_next = req.uri().path() == "/2018-06-01/runtime/invocation/next";
if is_invocation_next {
let (ack_tx, ack_rx) = oneshot::channel();
checker_tx.send(ack_tx).await.unwrap();
debug!("Waiting for the processor to finish processing logs");
ack_rx.await.unwrap();
debug!("Processor finished processing logs");
}
LambdaRuntimeApiClient::new()
.await
.unwrap()
.forward(req)
.await
}
})
.await
}
}
fn spawn_reader<F: AsyncRead + Send + 'static, P: Processor + 'static>(
file: F,
mut processor: P,
buffer_size: usize,
) -> mpsc::Sender<oneshot::Sender<()>>
where
BufReader<F>: Unpin,
{
let (checker_tx, mut checker_rx) = mpsc::channel::<oneshot::Sender<()>>(1);
let (buffer_tx, mut buffer_rx) = mpsc::channel(buffer_size);
tokio::spawn(async move {
let mut lines = BufReader::new(file).lines();
while let Ok(Some(line)) = lines.next_line().await {
trace!(line = %line, "Read line");
if !line.is_empty() {
buffer_tx.send((line, Utc::now())).await.unwrap();
}
}
debug!("Reader thread finished");
});
tokio::spawn(async move {
loop {
tokio::select! {
biased;
res = buffer_rx.recv() => {
let (line, timestamp) = res.unwrap();
processor.process(line, timestamp).await;
}
checker = checker_rx.recv() => {
processor.truncate().await;
checker.unwrap().send(()).unwrap();
}
}
}
});
checker_tx
}
#[cfg(test)]
mod tests {
use serial_test::serial;
use super::*;
macro_rules! assert_unit {
($unit:expr) => {
let _: () = $unit;
};
}
#[test]
fn test_log_proxy_default() {
let proxy = LogProxy::new();
assert_unit!(proxy.processor);
assert_eq!(proxy.buffer_size, 256);
assert_eq!(proxy.port, 3000);
}
#[tokio::test]
#[serial]
async fn test_log_proxy_simple() {
let sink = Sink::stdout().spawn();
let proxy = LogProxy::new().simple(|p| p.sink(sink).build());
assert_eq!(proxy.buffer_size, 256);
assert_eq!(proxy.port, 3000);
}
#[test]
fn test_log_proxy_buffer_size() {
let proxy = LogProxy::new().buffer_size(512);
assert_eq!(proxy.buffer_size, 512);
}
#[test]
fn test_log_proxy_port() {
let proxy = LogProxy::new().port(3001);
assert_eq!(proxy.port, 3001);
}
async fn _ensure_start_can_be_called() {
let proxy: LogProxy<()> = LogProxy::new();
proxy.start().await;
let sink = Sink::stdout().spawn();
let proxy = LogProxy::new().simple(|p| p.sink(sink.clone()).build());
proxy.start().await;
}
}