fluxus_api/operators/
filter.rs1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, StreamResult};
3use std::marker::PhantomData;
4
5pub struct FilterOperator<T, F> {
6 f: F,
7 _phantom: PhantomData<T>,
8}
9
10impl<T, F> FilterOperator<T, F>
11where
12 F: Fn(&T) -> bool,
13{
14 pub fn new(f: F) -> Self {
15 Self {
16 f,
17 _phantom: PhantomData,
18 }
19 }
20}
21
22#[async_trait]
23impl<T, F> Operator<T, T> for FilterOperator<T, F>
24where
25 T: Clone + Send + Sync + 'static,
26 F: Fn(&T) -> bool + Send + Sync,
27{
28 async fn init(&mut self) -> StreamResult<()> {
29 Ok(())
30 }
31
32 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
33 if (self.f)(&record.data) {
34 Ok(vec![record])
35 } else {
36 Ok(vec![])
37 }
38 }
39
40 async fn close(&mut self) -> StreamResult<()> {
41 Ok(())
42 }
43}