fluxus_api/operators/
filter.rs

1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, StreamResult};
3use std::marker::PhantomData;
4
5pub struct FilterOperator<T, F> {
6    f: F,
7    _phantom: PhantomData<T>,
8}
9
10impl<T, F> FilterOperator<T, F>
11where
12    F: Fn(&T) -> bool,
13{
14    pub fn new(f: F) -> Self {
15        Self {
16            f,
17            _phantom: PhantomData,
18        }
19    }
20}
21
22#[async_trait]
23impl<T, F> Operator<T, T> for FilterOperator<T, F>
24where
25    T: Clone + Send + Sync + 'static,
26    F: Fn(&T) -> bool + Send + Sync,
27{
28    async fn init(&mut self) -> StreamResult<()> {
29        Ok(())
30    }
31
32    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
33        if (self.f)(&record.data) {
34            Ok(vec![record])
35        } else {
36            Ok(vec![])
37        }
38    }
39
40    async fn close(&mut self) -> StreamResult<()> {
41        Ok(())
42    }
43}