aws_lambda_log_proxy/
lib.rs1#[cfg(feature = "emf")]
2mod emf;
3mod processor;
4
5use aws_lambda_runtime_proxy::{LambdaRuntimeApiClient, MockLambdaRuntimeApiServer};
6use chrono::Utc;
7use tokio::{
8 io::{stdin, AsyncBufReadExt, AsyncRead, BufReader},
9 sync::{mpsc, oneshot},
10};
11use tracing::{debug, trace};
12
13#[cfg(feature = "emf")]
14pub use emf::*;
15pub use processor::*;
16
17pub struct LogProxy<P> {
42 pub processor: P,
44 pub buffer_size: usize,
46 pub port: u16,
48}
49
50impl<P: Default> Default for LogProxy<P> {
51 fn default() -> Self {
52 Self {
53 processor: Default::default(),
54 buffer_size: 256,
55 port: 3000,
56 }
57 }
58}
59
60impl LogProxy<()> {
61 pub fn new() -> Self {
64 Self::default()
65 }
66}
67
68impl<P> LogProxy<P> {
69 pub fn processor<T>(self, processor: T) -> LogProxy<T> {
71 LogProxy {
72 processor,
73 buffer_size: self.buffer_size,
74 port: self.port,
75 }
76 }
77
78 pub fn simple<T>(
88 self,
89 builder: impl FnOnce(SimpleProcessorBuilder<fn(String) -> Option<String>, ()>) -> SimpleProcessor<T>,
90 ) -> LogProxy<SimpleProcessor<T>> {
91 self.processor(builder(SimpleProcessorBuilder::new()))
92 }
93
94 pub fn buffer_size(mut self, buffer_size: usize) -> Self {
101 self.buffer_size = buffer_size;
102 self
103 }
104
105 pub fn port(mut self, port: u16) -> Self {
108 self.port = port;
109 self
110 }
111
112 pub async fn start(self)
115 where
116 P: Processor,
117 {
118 debug!(port = %self.port, buffer_size = %self.buffer_size, "Starting log proxy");
119
120 let checker_tx = spawn_reader(stdin(), self.processor, self.buffer_size);
121
122 MockLambdaRuntimeApiServer::bind(self.port)
123 .await
124 .unwrap()
125 .serve(move |req| {
126 let checker_tx = checker_tx.clone();
127 async move {
128 let is_invocation_next = req.uri().path() == "/2018-06-01/runtime/invocation/next";
129
130 if is_invocation_next {
131 let (ack_tx, ack_rx) = oneshot::channel();
137 checker_tx.send(ack_tx).await.unwrap();
138 debug!("Waiting for the processor to finish processing logs");
140 ack_rx.await.unwrap();
141 debug!("Processor finished processing logs");
142 }
143
144 LambdaRuntimeApiClient::new()
146 .await
147 .unwrap()
148 .forward(req)
149 .await
150 }
151 })
152 .await
153 }
154}
155
156fn spawn_reader<F: AsyncRead + Send + 'static, P: Processor + 'static>(
157 file: F,
158 mut processor: P,
159 buffer_size: usize,
160) -> mpsc::Sender<oneshot::Sender<()>>
161where
162 BufReader<F>: Unpin,
163{
164 let (checker_tx, mut checker_rx) = mpsc::channel::<oneshot::Sender<()>>(1);
165 let (buffer_tx, mut buffer_rx) = mpsc::channel(buffer_size);
166
167 tokio::spawn(async move {
170 let mut lines = BufReader::new(file).lines();
171 while let Ok(Some(line)) = lines.next_line().await {
172 trace!(line = %line, "Read line");
173 if !line.is_empty() {
176 buffer_tx.send((line, Utc::now())).await.unwrap();
178 }
179 }
180 debug!("Reader thread finished");
181 });
182
183 tokio::spawn(async move {
185 loop {
186 tokio::select! {
187 biased;
189
190 res = buffer_rx.recv() => {
191 let (line, timestamp) = res.unwrap();
192 processor.process(line, timestamp).await;
193 }
194 checker = checker_rx.recv() => {
196 processor.truncate().await;
200
201 checker.unwrap().send(()).unwrap();
202 }
203 }
204 }
205 });
206
207 checker_tx
208}
209
210#[cfg(test)]
211mod tests {
212 use serial_test::serial;
213
214 use super::*;
215
216 macro_rules! assert_unit {
217 ($unit:expr) => {
218 let _: () = $unit;
219 };
220 }
221
222 #[test]
223 fn test_log_proxy_default() {
224 let proxy = LogProxy::new();
225 assert_unit!(proxy.processor);
226 assert_eq!(proxy.buffer_size, 256);
227 assert_eq!(proxy.port, 3000);
228 }
229
230 #[tokio::test]
231 #[serial]
232 async fn test_log_proxy_simple() {
233 let sink = Sink::stdout().spawn();
234 let proxy = LogProxy::new().simple(|p| p.sink(sink).build());
235 assert_eq!(proxy.buffer_size, 256);
236 assert_eq!(proxy.port, 3000);
237 }
238
239 #[test]
240 fn test_log_proxy_buffer_size() {
241 let proxy = LogProxy::new().buffer_size(512);
242 assert_eq!(proxy.buffer_size, 512);
243 }
244
245 #[test]
246 fn test_log_proxy_port() {
247 let proxy = LogProxy::new().port(3001);
248 assert_eq!(proxy.port, 3001);
249 }
250
251 async fn _ensure_start_can_be_called() {
254 let proxy: LogProxy<()> = LogProxy::new();
256 proxy.start().await;
257 let sink = Sink::stdout().spawn();
258 let proxy = LogProxy::new().simple(|p| p.sink(sink.clone()).build());
259 proxy.start().await;
260 }
261}