declarative_dataflow/plan/
antijoin.rs

1//! Antijoin expression plan.
2
3use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8use differential_dataflow::operators::{Join, Threshold};
9
10use crate::binding::{AsBinding, Binding};
11use crate::plan::{Dependencies, ImplContext, Implementable};
12use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Var, VariableMap};
13
14/// A plan stage anti-joining both its sources on the specified
15/// variables. Throws if the sources are not union-compatible, i.e. bind
16/// all of the same variables in the same order.
17#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
18pub struct Antijoin<P1: Implementable, P2: Implementable> {
19    /// TODO
20    pub variables: Vec<Var>,
21    /// Plan for the left input.
22    pub left_plan: Box<P1>,
23    /// Plan for the right input.
24    pub right_plan: Box<P2>,
25}
26
27impl<P1: Implementable, P2: Implementable> Implementable for Antijoin<P1, P2> {
28    fn dependencies(&self) -> Dependencies {
29        Dependencies::merge(
30            self.left_plan.dependencies(),
31            self.right_plan.dependencies(),
32        )
33    }
34
35    fn into_bindings(&self) -> Vec<Binding> {
36        unimplemented!();
37        // let mut left_bindings = self.left_plan.into_bindings();
38        // let mut right_bindings = self.right_plan.into_bindings();
39
40        // let mut bindings = Vec::with_capacity(left_bindings.len() + right_bindings.len());
41        // bindings.append(&mut left_bindings);
42        // bindings.append(&mut right_bindings);
43
44        // bindings
45    }
46
47    fn implement<'b, T, I, S>(
48        &self,
49        nested: &mut Iterative<'b, S, u64>,
50        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
51        context: &mut I,
52    ) -> (Implemented<'b, S>, ShutdownHandle)
53    where
54        T: Timestamp + Lattice,
55        I: ImplContext<T>,
56        S: Scope<Timestamp = T>,
57    {
58        let mut shutdown_handle = ShutdownHandle::empty();
59        let left = {
60            let (left, shutdown) = self
61                .left_plan
62                .implement(nested, local_arrangements, context);
63            shutdown_handle.merge_with(shutdown);
64            left
65        };
66        let right = {
67            let (right, shutdown) = self
68                .right_plan
69                .implement(nested, local_arrangements, context);
70            shutdown_handle.merge_with(shutdown);
71            right
72        };
73
74        let variables = self
75            .variables
76            .iter()
77            .cloned()
78            .chain(
79                left.variables()
80                    .drain(..)
81                    .filter(|x| !self.variables.contains(x)),
82            )
83            .collect();
84
85        let right_projected = {
86            let (projected, shutdown) = right.projected(nested, context, &self.variables);
87            shutdown_handle.merge_with(shutdown);
88            projected
89        };
90
91        let left_arranged = {
92            let (arranged, shutdown) = left.tuples_by_variables(nested, context, &self.variables);
93            shutdown_handle.merge_with(shutdown);
94            arranged
95        };
96
97        let tuples = left_arranged
98            .distinct()
99            .antijoin(&right_projected.distinct())
100            .map(|(key, tuple)| key.iter().cloned().chain(tuple.iter().cloned()).collect());
101
102        let relation = CollectionRelation { variables, tuples };
103
104        (Implemented::Collection(relation), shutdown_handle)
105    }
106}