1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
mod processor;
pub use processor::*;
use aws_lambda_runtime_proxy::Proxy;
use std::{process::Stdio, sync::Arc};
use tokio::{
io::{self, AsyncBufReadExt, AsyncRead, BufReader},
sync::Mutex,
};
#[derive(Default)]
pub struct LogProxy {
/// See [`Self::stdout`].
pub stdout: Option<Processor>,
/// See [`Self::stderr`].
pub stderr: Option<Processor>,
/// See [`Self::disable_lambda_telemetry_log_fd`].
pub disable_lambda_telemetry_log_fd: bool,
}
impl LogProxy {
/// Set the processor for `stdout`.
/// By default there is no processor for `stdout`.
/// # Examples
/// ```
/// use aws_lambda_log_proxy::{LogProxy, Sink};
///
/// let sink = Sink::stdout();
/// LogProxy::default().stdout(|p| p.sink(sink));
/// ```
pub fn stdout(mut self, builder: impl FnOnce(ProcessorBuilder) -> Processor) -> Self {
self.stdout = Some(builder(ProcessorBuilder::default()));
self
}
/// Set the processor for `stderr`.
/// By default there is no processor for `stderr`.
/// # Examples
/// ```
/// use aws_lambda_log_proxy::{LogProxy, Sink};
///
/// let sink = Sink::stdout();
/// LogProxy::default().stderr(|p| p.sink(sink));
/// ```
pub fn stderr(mut self, builder: impl FnOnce(ProcessorBuilder) -> Processor) -> Self {
self.stderr = Some(builder(ProcessorBuilder::default()));
self
}
/// Remove the `_LAMBDA_TELEMETRY_LOG_FD` environment variable for the handler process
/// to prevent logs from being written to other file descriptors.
pub fn disable_lambda_telemetry_log_fd(mut self, disable: bool) -> Self {
self.disable_lambda_telemetry_log_fd = disable;
self
}
/// Start the log proxy.
/// This will block the current thread.
pub async fn start(self) {
let mut command = Proxy::default_command();
// only pipe if there is a processor
if self.stdout.is_some() {
command.stdout(Stdio::piped());
}
if self.stderr.is_some() {
command.stderr(Stdio::piped());
}
if self.disable_lambda_telemetry_log_fd {
command.env_remove("_LAMBDA_TELEMETRY_LOG_FD");
}
let mut proxy = Proxy::default().command(command).spawn().await;
// create a mutex to ensure logs are written before the proxy call invocation/next
let mutex = Arc::new(Mutex::new(()));
proxy
.handler
.stdout
.take()
.map(|file| Self::spawn_reader(file, &mutex, self.stdout.unwrap()));
proxy
.handler
.stderr
.take()
.map(|file| Self::spawn_reader(file, &mutex, self.stderr.unwrap()));
let client = Mutex::new(proxy.client);
proxy
.server
.serve(|req| async {
if req.uri().path() == "/2018-06-01/runtime/invocation/next" {
// wait until there is no more lines in the buffer
let _ = mutex.lock().await;
}
client.lock().await.send_request(req).await
})
.await
}
fn spawn_reader<T: AsyncRead + Send + 'static>(
file: T,
mutex: &Arc<Mutex<()>>,
mut processor: Processor,
) where
BufReader<T>: Unpin,
{
let mutex = mutex.clone();
tokio::spawn(async move {
let reader = io::BufReader::new(file);
let mut lines = reader.lines();
loop {
// wait until there is at least one line in the buffer
let line = lines.next_line().await.unwrap().unwrap();
// lock the mutex to suppress the call to invocation/next
let _lock = mutex.lock().await;
// process the first line
processor.process(line).await;
// check if there are more lines in the buffer
while lines.get_ref().buffer().contains(/* '\n' */ &10) {
// next line exists, process it
let line = lines.next_line().await.unwrap().unwrap();
processor.process(line).await;
}
// flush the processor since there is no more lines in the buffer
processor.flush().await;
// now there is no more lines in the buffer, release the mutex
}
});
}
}