aws_lambda_log_proxy/processor/
simple.rs1use super::Timestamp;
2use crate::{Processor, SinkHandle};
3
4pub struct SimpleProcessor<T> {
9 pub transformer: T,
11 pub sink: SinkHandle,
13
14 need_flush: bool,
15}
16
17impl<T> SimpleProcessor<T> {
18 pub fn new(transformer: T, sink: SinkHandle) -> Self {
19 Self {
20 transformer,
21 sink,
22 need_flush: false,
23 }
24 }
25}
26
27impl<T: FnMut(String) -> Option<String> + Send + 'static> Processor for SimpleProcessor<T> {
28 async fn process(&mut self, line: String, timestamp: Timestamp) {
29 if let Some(transformed) = (self.transformer)(line) {
30 self.sink.write_line(transformed, timestamp).await;
31 self.need_flush = true;
32 }
33 }
34
35 async fn truncate(&mut self) {
36 if self.need_flush {
37 self.sink.flush().await;
38 self.need_flush = false;
39 }
40 }
41}
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use crate::{SimpleProcessorBuilder, Sink};
47 use chrono::DateTime;
48
49 #[tokio::test]
50 async fn test_processor_process_default() {
51 let sink = Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build()).spawn();
52 let mut processor = SimpleProcessorBuilder::new().sink(sink).build();
53 processor
54 .process("hello".to_string(), mock_timestamp())
55 .await;
56 assert!(processor.need_flush);
57 processor.truncate().await;
58 assert!(!processor.need_flush);
59 }
60
61 #[tokio::test]
62 async fn test_processor_process_with_transformer() {
63 let sink = Sink::new(tokio_test::io::Builder::new().write(b"hello\n").build()).spawn();
64 let mut processor = SimpleProcessorBuilder::new()
65 .transformer(|line| (line != "world").then_some(line))
66 .sink(sink)
67 .build();
68 processor
69 .process("hello".to_string(), mock_timestamp())
70 .await;
71 processor
72 .process("world".to_string(), mock_timestamp())
73 .await;
74 processor.truncate().await;
75 }
76
77 fn mock_timestamp() -> Timestamp {
78 DateTime::from_timestamp(0, 0).unwrap()
79 }
80}