use crate::{
data::ArconType,
dataflow::{
builder::OperatorBuilder,
stream::{OperatorExt, Stream},
},
index::EmptyState,
stream::operator::function,
util::ArconFnBounds,
};
use std::sync::Arc;
pub trait FilterExt<T: ArconType> {
fn filter<F: Fn(&T) -> bool + ArconFnBounds>(self, f: F) -> Self;
}
impl<T: ArconType> FilterExt<T> for Stream<T> {
#[must_use]
fn filter<F: Fn(&T) -> bool + ArconFnBounds>(self, f: F) -> Self {
self.operator(OperatorBuilder {
operator: Arc::new(move || function::Filter::new(f.clone())),
state: Arc::new(|_| EmptyState),
conf: Default::default(),
})
}
}