aruna_file/transformers/
filter.rs

1use crate::helpers::footer_parser::Range;
2use crate::transformer::{Transformer, TransformerType};
3use anyhow::Result;
4use bytes::Buf;
5
6pub struct Filter {
7    counter: usize,
8    filter: Range,
9    captured_buf_len: usize,
10    advanced_by: usize,
11}
12
13impl Filter {
14    #[tracing::instrument(level = "trace", skip(filter))]
15    #[allow(dead_code)]
16    pub fn new(filter: Range) -> Self {
17        Filter {
18            counter: 0,
19            filter,
20            captured_buf_len: 0,
21            advanced_by: 0,
22        }
23    }
24}
25
26#[async_trait::async_trait]
27impl Transformer for Filter {
28    #[tracing::instrument(level = "trace", skip(self, buf))]
29    async fn process_bytes(&mut self, buf: &mut bytes::BytesMut, _: bool, _: bool) -> Result<bool> {
30        self.captured_buf_len = buf.len();
31        self.advanced_by = 0;
32
33        // If bytes are present in the buffer
34        if !buf.is_empty() {
35            // If counter + incoming bytes are larger than lower limit
36            //   -> Advance buffer to lower limit
37            if ((self.counter + self.captured_buf_len) as u64) > self.filter.from {
38                if !(self.counter > self.filter.from as usize) {
39                    self.advanced_by = self.filter.from as usize - self.counter;
40                    buf.advance(self.advanced_by);
41                }
42            } else {
43                // If counter + incoming bytes are smaller than lower limit
44                //   -> discard buffer
45                buf.clear();
46            }
47
48            if self.counter as u64 > self.filter.to {
49                // If counter is larger than upper limit
50                //   -> discard buffer
51                buf.clear();
52            } else if self.counter as u64 + self.captured_buf_len as u64 > self.filter.to {
53                // If counter + incoming bytes is larger than upper limit
54                //   -> truncate buffer to upper limit
55                buf.truncate(self.filter.to as usize - self.advanced_by - self.counter);
56            }
57        }
58
59        self.counter += self.captured_buf_len;
60        Ok(true)
61    }
62
63    #[tracing::instrument(level = "trace", skip(self))]
64    #[inline]
65    fn get_type(&self) -> TransformerType {
66        TransformerType::Filter
67    }
68}