aws_lambda_log_proxy/
lib.rs

1#[cfg(feature = "emf")]
2mod emf;
3mod processor;
4
5use aws_lambda_runtime_proxy::{LambdaRuntimeApiClient, MockLambdaRuntimeApiServer};
6use chrono::Utc;
7use tokio::{
8  io::{stdin, AsyncBufReadExt, AsyncRead, BufReader},
9  sync::{mpsc, oneshot},
10};
11use tracing::{debug, trace};
12
13#[cfg(feature = "emf")]
14pub use emf::*;
15pub use processor::*;
16
17/// # Examples
18/// Simple creation:
19/// ```
20/// use aws_lambda_log_proxy::{LogProxy, Sink};
21///
22/// # async fn t1() {
23/// LogProxy::new().simple(|p| p.sink(Sink::stdout().spawn()).build()).start().await;
24/// # }
25/// ```
26/// Custom creation:
27/// ```
28/// use aws_lambda_log_proxy::{LogProxy, Processor, Timestamp};
29///
30/// pub struct MyProcessor;
31///
32/// impl Processor for MyProcessor {
33///   async fn process(&mut self, _line: String, _timestamp: Timestamp) { }
34///   async fn truncate(&mut self) { }
35/// }
36///
37/// # async fn t1() {
38/// LogProxy::new().processor(MyProcessor).buffer_size(1024).port(1234).start().await;
39/// # }
40/// ```
41pub struct LogProxy<P> {
42  /// The processor for stdin.
43  pub processor: P,
44  /// See [`Self::buffer_size`].
45  pub buffer_size: usize,
46  /// See [`Self::port`].
47  pub port: u16,
48}
49
50impl<P: Default> Default for LogProxy<P> {
51  fn default() -> Self {
52    Self {
53      processor: Default::default(),
54      buffer_size: 256,
55      port: 3000,
56    }
57  }
58}
59
60impl LogProxy<()> {
61  /// Create a new instance with the default properties
62  /// and a mock processor which will discard all logs.
63  pub fn new() -> Self {
64    Self::default()
65  }
66}
67
68impl<P> LogProxy<P> {
69  /// Set [`Self::processor`] to a custom processor.
70  pub fn processor<T>(self, processor: T) -> LogProxy<T> {
71    LogProxy {
72      processor,
73      buffer_size: self.buffer_size,
74      port: self.port,
75    }
76  }
77
78  /// Set [`Self::processor`] to a [`SimpleProcessor`] via [`SimpleProcessorBuilder`].
79  /// # Examples
80  /// ```
81  /// use aws_lambda_log_proxy::{LogProxy, Sink};
82  ///
83  /// # async fn t1() {
84  /// LogProxy::new().simple(|p| p.sink(Sink::stdout().spawn()).build());
85  /// # }
86  /// ```
87  pub fn simple<T>(
88    self,
89    builder: impl FnOnce(SimpleProcessorBuilder<fn(String) -> Option<String>, ()>) -> SimpleProcessor<T>,
90  ) -> LogProxy<SimpleProcessor<T>> {
91    self.processor(builder(SimpleProcessorBuilder::new()))
92  }
93
94  /// Set how many lines can be buffered if the processing is slow.
95  /// If the handler process writes too many lines then return the response immediately,
96  /// the suppression of `invocation/next`
97  /// might not working, maybe some logs will be processed in the next invocation.
98  /// Increase this value should help to prevent logs from being lost.
99  /// The default value is `256`.
100  pub fn buffer_size(mut self, buffer_size: usize) -> Self {
101    self.buffer_size = buffer_size;
102    self
103  }
104
105  /// Set the port for the log proxy.
106  /// The default value is `3000`.
107  pub fn port(mut self, port: u16) -> Self {
108    self.port = port;
109    self
110  }
111
112  /// Start the log proxy.
113  /// This will block the current thread.
114  pub async fn start(self)
115  where
116    P: Processor,
117  {
118    debug!(port = %self.port, buffer_size = %self.buffer_size, "Starting log proxy");
119
120    let checker_tx = spawn_reader(stdin(), self.processor, self.buffer_size);
121
122    MockLambdaRuntimeApiServer::bind(self.port)
123      .await
124      .unwrap()
125      .serve(move |req| {
126        let checker_tx = checker_tx.clone();
127        async move {
128          let is_invocation_next = req.uri().path() == "/2018-06-01/runtime/invocation/next";
129
130          if is_invocation_next {
131            // in lambda, send `invocation/next` will freeze current execution environment,
132            // unprocessed logs might be lost,
133            // so before proceeding, wait for the processors to finish processing the logs
134
135            // send checkers to reader threads
136            let (ack_tx, ack_rx) = oneshot::channel();
137            checker_tx.send(ack_tx).await.unwrap();
138            // wait for the checker to finish
139            debug!("Waiting for the processor to finish processing logs");
140            ack_rx.await.unwrap();
141            debug!("Processor finished processing logs");
142          }
143
144          // forward the request to the real lambda runtime API, consume the request
145          LambdaRuntimeApiClient::new()
146            .await
147            .unwrap()
148            .forward(req)
149            .await
150        }
151      })
152      .await
153  }
154}
155
156fn spawn_reader<F: AsyncRead + Send + 'static, P: Processor + 'static>(
157  file: F,
158  mut processor: P,
159  buffer_size: usize,
160) -> mpsc::Sender<oneshot::Sender<()>>
161where
162  BufReader<F>: Unpin,
163{
164  let (checker_tx, mut checker_rx) = mpsc::channel::<oneshot::Sender<()>>(1);
165  let (buffer_tx, mut buffer_rx) = mpsc::channel(buffer_size);
166
167  // the reader thread, read from the file then push into the buffer.
168  // we use a separate thread to read from the file to get an accurate timestamp.
169  tokio::spawn(async move {
170    let mut lines = BufReader::new(file).lines();
171    while let Ok(Some(line)) = lines.next_line().await {
172      trace!(line = %line, "Read line");
173      // `next_line` already removes '\n' and '\r', so we only need to check if the line is empty.
174      // only push into buffer if the line is not empty
175      if !line.is_empty() {
176        // put line in a queue, record the timestamp
177        buffer_tx.send((line, Utc::now())).await.unwrap();
178      }
179    }
180    debug!("Reader thread finished");
181  });
182
183  // the processor thread
184  tokio::spawn(async move {
185    loop {
186      tokio::select! {
187        // enable `biased` to make sure we always try to recv from buffer before accept the server thread checker
188        biased;
189
190        res = buffer_rx.recv() => {
191          let (line, timestamp) = res.unwrap();
192          processor.process(line, timestamp).await;
193        }
194        // the server thread requests to check if the processor has finished processing the logs.
195        checker = checker_rx.recv() => {
196          // since we are using `biased` select, we don't need to check if there is a message in the buffer,
197          // just stop suppressing the server thread if the branch is executed
198
199          processor.truncate().await;
200
201          checker.unwrap().send(()).unwrap();
202        }
203      }
204    }
205  });
206
207  checker_tx
208}
209
210#[cfg(test)]
211mod tests {
212  use serial_test::serial;
213
214  use super::*;
215
216  macro_rules! assert_unit {
217    ($unit:expr) => {
218      let _: () = $unit;
219    };
220  }
221
222  #[test]
223  fn test_log_proxy_default() {
224    let proxy = LogProxy::new();
225    assert_unit!(proxy.processor);
226    assert_eq!(proxy.buffer_size, 256);
227    assert_eq!(proxy.port, 3000);
228  }
229
230  #[tokio::test]
231  #[serial]
232  async fn test_log_proxy_simple() {
233    let sink = Sink::stdout().spawn();
234    let proxy = LogProxy::new().simple(|p| p.sink(sink).build());
235    assert_eq!(proxy.buffer_size, 256);
236    assert_eq!(proxy.port, 3000);
237  }
238
239  #[test]
240  fn test_log_proxy_buffer_size() {
241    let proxy = LogProxy::new().buffer_size(512);
242    assert_eq!(proxy.buffer_size, 512);
243  }
244
245  #[test]
246  fn test_log_proxy_port() {
247    let proxy = LogProxy::new().port(3001);
248    assert_eq!(proxy.port, 3001);
249  }
250
251  // this is to check if the `start` can be called with different processors during the compile time
252  // so don't run this test
253  async fn _ensure_start_can_be_called() {
254    // mock processor
255    let proxy: LogProxy<()> = LogProxy::new();
256    proxy.start().await;
257    let sink = Sink::stdout().spawn();
258    let proxy = LogProxy::new().simple(|p| p.sink(sink.clone()).build());
259    proxy.start().await;
260  }
261}