arcon/dataflow/stream/
filter.rs1use crate::{
2 data::ArconType,
3 dataflow::{
4 builder::OperatorBuilder,
5 stream::{OperatorExt, Stream},
6 },
7 index::EmptyState,
8 stream::operator::function,
9 util::ArconFnBounds,
10};
11use std::sync::Arc;
12
13pub trait FilterExt<T: ArconType> {
15 fn filter<F: Fn(&T) -> bool + ArconFnBounds>(self, f: F) -> Self;
27}
28
29impl<T: ArconType> FilterExt<T> for Stream<T> {
30 #[must_use]
31 fn filter<F: Fn(&T) -> bool + ArconFnBounds>(self, f: F) -> Self {
32 self.operator(OperatorBuilder {
33 operator: Arc::new(move || function::Filter::new(f.clone())),
34 state: Arc::new(|_| EmptyState),
35 conf: Default::default(),
36 })
37 }
38}