aws_lambda_log_proxy/processor/
simple.rs

1use super::Timestamp;
2use crate::{Processor, SinkHandle};
3
4/// Process log lines with [`Self::transformer`]
5/// and write them to [`Self::sink`].
6///
7/// You can use [`SimpleProcessorBuilder`](crate::SimpleProcessorBuilder) to create this.
8pub struct SimpleProcessor<T> {
9  /// See [`SimpleProcessorBuilder::transformer`](crate::SimpleProcessorBuilder::transformer).
10  pub transformer: T,
11  /// See [`SimpleProcessorBuilder::sink`](crate::SimpleProcessorBuilder::sink).
12  pub sink: SinkHandle,
13
14  need_flush: bool,
15}
16
17impl<T> SimpleProcessor<T> {
18  pub fn new(transformer: T, sink: SinkHandle) -> Self {
19    Self {
20      transformer,
21      sink,
22      need_flush: false,
23    }
24  }
25}
26
27impl<T: FnMut(String) -> Option<String> + Send + 'static> Processor for SimpleProcessor<T> {
28  async fn process(&mut self, line: String, timestamp: Timestamp) {
29    if let Some(transformed) = (self.transformer)(line) {
30      self.sink.write_line(transformed, timestamp).await;
31      self.need_flush = true;
32    }
33  }
34
35  async fn truncate(&mut self) {
36    if self.need_flush {
37      self.sink.flush().await;
38      self.need_flush = false;
39    }
40  }
41}
42
43#[cfg(test)]
44mod tests {
45  use super::*;
46  use crate::{SimpleProcessorBuilder, Sink};
47  use chrono::DateTime;
48
49  #[tokio::test]
50  async fn test_processor_process_default() {
51    let sink = Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build()).spawn();
52    let mut processor = SimpleProcessorBuilder::new().sink(sink).build();
53    processor
54      .process("hello".to_string(), mock_timestamp())
55      .await;
56    assert!(processor.need_flush);
57    processor.truncate().await;
58    assert!(!processor.need_flush);
59  }
60
61  #[tokio::test]
62  async fn test_processor_process_with_transformer() {
63    let sink = Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build()).spawn();
64    let mut processor = SimpleProcessorBuilder::new()
65      .transformer(|line| (line != "world").then_some(line))
66      .sink(sink)
67      .build();
68    processor
69      .process("hello".to_string(), mock_timestamp())
70      .await;
71    processor
72      .process("world".to_string(), mock_timestamp())
73      .await;
74    processor.truncate().await;
75  }
76
77  fn mock_timestamp() -> Timestamp {
78    DateTime::from_timestamp(0, 0).unwrap()
79  }
80}