fluxus_api/operators/
filter.rs1use async_trait::async_trait;
2use fluxus_transformers::Operator;
3use fluxus_utils::models::{Record, StreamResult};
4use std::marker::PhantomData;
5
6pub struct FilterOperator<T, F> {
7 f: F,
8 _phantom: PhantomData<T>,
9}
10
11impl<T, F> FilterOperator<T, F>
12where
13 F: Fn(&T) -> bool,
14{
15 pub fn new(f: F) -> Self {
16 Self {
17 f,
18 _phantom: PhantomData,
19 }
20 }
21}
22
23#[async_trait]
24impl<T, F> Operator<T, T> for FilterOperator<T, F>
25where
26 T: Clone + Send + Sync + 'static,
27 F: Fn(&T) -> bool + Send + Sync,
28{
29 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
30 if (self.f)(&record.data) {
31 Ok(vec![record])
32 } else {
33 Ok(vec![])
34 }
35 }
36}