timely/dataflow/operators/
filter.rs1use crate::Data;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6use crate::dataflow::operators::generic::operator::Operator;
7
8pub trait Filter<D: Data> {
10 fn filter<P: FnMut(&D)->bool+'static>(&self, predicate: P) -> Self;
23}
24
25impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
26 fn filter<P: FnMut(&D)->bool+'static>(&self, mut predicate: P) -> Stream<G, D> {
27 let mut vector = Vec::new();
28 self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
29 input.for_each(|time, data| {
30 data.swap(&mut vector);
31 vector.retain(|x| predicate(x));
32 if !vector.is_empty() {
33 output.session(&time).give_vec(&mut vector);
34 }
35 });
36 })
37 }
38}