declarative_dataflow/plan/
filter.rs1use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8
9pub use crate::binding::{
10 AsBinding, BinaryPredicate as Predicate, BinaryPredicateBinding, Binding,
11};
12use crate::plan::{Dependencies, ImplContext, Implementable};
13use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Value, Var, VariableMap};
14
15#[inline(always)]
16fn lt(a: &Value, b: &Value) -> bool {
17 a < b
18}
19#[inline(always)]
20fn lte(a: &Value, b: &Value) -> bool {
21 a <= b
22}
23#[inline(always)]
24fn gt(a: &Value, b: &Value) -> bool {
25 a > b
26}
27#[inline(always)]
28fn gte(a: &Value, b: &Value) -> bool {
29 a >= b
30}
31#[inline(always)]
32fn eq(a: &Value, b: &Value) -> bool {
33 a == b
34}
35#[inline(always)]
36fn neq(a: &Value, b: &Value) -> bool {
37 a != b
38}
39
40#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
44pub struct Filter<P: Implementable> {
45 pub variables: Vec<Var>,
47 pub predicate: Predicate,
49 pub plan: Box<P>,
51 pub constants: Vec<Option<Value>>,
53}
54
55impl<P: Implementable> Implementable for Filter<P> {
56 fn dependencies(&self) -> Dependencies {
57 self.plan.dependencies()
58 }
59
60 fn into_bindings(&self) -> Vec<Binding> {
61 unimplemented!();
65 }
72
73 fn implement<'b, T, I, S>(
74 &self,
75 nested: &mut Iterative<'b, S, u64>,
76 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
77 context: &mut I,
78 ) -> (Implemented<'b, S>, ShutdownHandle)
79 where
80 T: Timestamp + Lattice,
81 I: ImplContext<T>,
82 S: Scope<Timestamp = T>,
83 {
84 let (relation, mut shutdown_handle) =
85 self.plan.implement(nested, local_arrangements, context);
86
87 let key_offsets: Vec<usize> = self
88 .variables
89 .iter()
90 .map(|variable| relation.binds(*variable).expect("variable not found"))
91 .collect();
92
93 let binary_predicate = match self.predicate {
94 Predicate::LT => lt,
95 Predicate::LTE => lte,
96 Predicate::GT => gt,
97 Predicate::GTE => gte,
98 Predicate::EQ => eq,
99 Predicate::NEQ => neq,
100 };
101
102 let variables = relation.variables();
103 let projected = {
104 let (projected, shutdown) = relation.projected(nested, context, &variables);
105 shutdown_handle.merge_with(shutdown);
106 projected
107 };
108
109 let filtered = if let Some(constant) = self.constants[0].clone() {
110 CollectionRelation {
111 variables,
112 tuples: projected
113 .filter(move |tuple| binary_predicate(&constant, &tuple[key_offsets[0]])),
114 }
115 } else if let Some(constant) = self.constants[1].clone() {
116 CollectionRelation {
117 variables,
118 tuples: projected
119 .filter(move |tuple| binary_predicate(&tuple[key_offsets[0]], &constant)),
120 }
121 } else {
122 CollectionRelation {
123 variables,
124 tuples: projected.filter(move |tuple| {
125 binary_predicate(&tuple[key_offsets[0]], &tuple[key_offsets[1]])
126 }),
127 }
128 };
129
130 (Implemented::Collection(filtered), shutdown_handle)
131 }
132}