fluxus_api/operators/
filter.rs1use async_trait::async_trait;
2use fluxus_transformers::Operator;
3use fluxus_utils::models::{Record, StreamResult};
4use std::marker::PhantomData;
5
6pub struct FilterOperator<T, F> {
7 f: F,
8 _phantom: PhantomData<T>,
9}
10
11impl<T, F> FilterOperator<T, F>
12where
13 F: Fn(&T) -> bool,
14{
15 pub fn new(f: F) -> Self {
16 Self {
17 f,
18 _phantom: PhantomData,
19 }
20 }
21}
22
23#[async_trait]
24impl<T, F> Operator<T, T> for FilterOperator<T, F>
25where
26 T: Clone + Send + Sync + 'static,
27 F: Fn(&T) -> bool + Send + Sync,
28{
29 async fn init(&mut self) -> StreamResult<()> {
30 Ok(())
31 }
32
33 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
34 if (self.f)(&record.data) {
35 Ok(vec![record])
36 } else {
37 Ok(vec![])
38 }
39 }
40
41 async fn close(&mut self) -> StreamResult<()> {
42 Ok(())
43 }
44}