declarative_dataflow/plan/
filter.rs

1//! Predicate expression plan.
2
3use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8
9pub use crate::binding::{
10    AsBinding, BinaryPredicate as Predicate, BinaryPredicateBinding, Binding,
11};
12use crate::plan::{Dependencies, ImplContext, Implementable};
13use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Value, Var, VariableMap};
14
15#[inline(always)]
16fn lt(a: &Value, b: &Value) -> bool {
17    a < b
18}
19#[inline(always)]
20fn lte(a: &Value, b: &Value) -> bool {
21    a <= b
22}
23#[inline(always)]
24fn gt(a: &Value, b: &Value) -> bool {
25    a > b
26}
27#[inline(always)]
28fn gte(a: &Value, b: &Value) -> bool {
29    a >= b
30}
31#[inline(always)]
32fn eq(a: &Value, b: &Value) -> bool {
33    a == b
34}
35#[inline(always)]
36fn neq(a: &Value, b: &Value) -> bool {
37    a != b
38}
39
40/// A plan stage filtering source tuples by the specified
41/// predicate. Frontends are responsible for ensuring that the source
42/// binds the argument variables.
43#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
44pub struct Filter<P: Implementable> {
45    /// TODO
46    pub variables: Vec<Var>,
47    /// Logical predicate to apply.
48    pub predicate: Predicate,
49    /// Plan for the data source.
50    pub plan: Box<P>,
51    /// Constant inputs
52    pub constants: Vec<Option<Value>>,
53}
54
55impl<P: Implementable> Implementable for Filter<P> {
56    fn dependencies(&self) -> Dependencies {
57        self.plan.dependencies()
58    }
59
60    fn into_bindings(&self) -> Vec<Binding> {
61        // let mut bindings = self.plan.into_bindings();
62        // let variables = self.variables.clone();
63
64        unimplemented!();
65        // bindings.push(Binding::BinaryPredicate(BinaryPredicateBinding {
66        //     variables: (variables[0], variables[1]),
67        //     predicate: self.predicate.clone(),
68        // }));
69
70        // bindings
71    }
72
73    fn implement<'b, T, I, S>(
74        &self,
75        nested: &mut Iterative<'b, S, u64>,
76        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
77        context: &mut I,
78    ) -> (Implemented<'b, S>, ShutdownHandle)
79    where
80        T: Timestamp + Lattice,
81        I: ImplContext<T>,
82        S: Scope<Timestamp = T>,
83    {
84        let (relation, mut shutdown_handle) =
85            self.plan.implement(nested, local_arrangements, context);
86
87        let key_offsets: Vec<usize> = self
88            .variables
89            .iter()
90            .map(|variable| relation.binds(*variable).expect("variable not found"))
91            .collect();
92
93        let binary_predicate = match self.predicate {
94            Predicate::LT => lt,
95            Predicate::LTE => lte,
96            Predicate::GT => gt,
97            Predicate::GTE => gte,
98            Predicate::EQ => eq,
99            Predicate::NEQ => neq,
100        };
101
102        let variables = relation.variables();
103        let projected = {
104            let (projected, shutdown) = relation.projected(nested, context, &variables);
105            shutdown_handle.merge_with(shutdown);
106            projected
107        };
108
109        let filtered = if let Some(constant) = self.constants[0].clone() {
110            CollectionRelation {
111                variables,
112                tuples: projected
113                    .filter(move |tuple| binary_predicate(&constant, &tuple[key_offsets[0]])),
114            }
115        } else if let Some(constant) = self.constants[1].clone() {
116            CollectionRelation {
117                variables,
118                tuples: projected
119                    .filter(move |tuple| binary_predicate(&tuple[key_offsets[0]], &constant)),
120            }
121        } else {
122            CollectionRelation {
123                variables,
124                tuples: projected.filter(move |tuple| {
125                    binary_predicate(&tuple[key_offsets[0]], &tuple[key_offsets[1]])
126                }),
127            }
128        };
129
130        (Implemented::Collection(filtered), shutdown_handle)
131    }
132}