fluxus_api/operators/
filter.rs

1use async_trait::async_trait;
2use fluxus_transformers::Operator;
3use fluxus_utils::models::{Record, StreamResult};
4use std::marker::PhantomData;
5
6pub struct FilterOperator<T, F> {
7    f: F,
8    _phantom: PhantomData<T>,
9}
10
11impl<T, F> FilterOperator<T, F>
12where
13    F: Fn(&T) -> bool,
14{
15    pub fn new(f: F) -> Self {
16        Self {
17            f,
18            _phantom: PhantomData,
19        }
20    }
21}
22
23#[async_trait]
24impl<T, F> Operator<T, T> for FilterOperator<T, F>
25where
26    T: Clone + Send + Sync + 'static,
27    F: Fn(&T) -> bool + Send + Sync,
28{
29    async fn init(&mut self) -> StreamResult<()> {
30        Ok(())
31    }
32
33    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
34        if (self.f)(&record.data) {
35            Ok(vec![record])
36        } else {
37            Ok(vec![])
38        }
39    }
40
41    async fn close(&mut self) -> StreamResult<()> {
42        Ok(())
43    }
44}