declarative_dataflow/plan/
antijoin.rs1use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8use differential_dataflow::operators::{Join, Threshold};
9
10use crate::binding::{AsBinding, Binding};
11use crate::plan::{Dependencies, ImplContext, Implementable};
12use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Var, VariableMap};
13
14#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
18pub struct Antijoin<P1: Implementable, P2: Implementable> {
19 pub variables: Vec<Var>,
21 pub left_plan: Box<P1>,
23 pub right_plan: Box<P2>,
25}
26
27impl<P1: Implementable, P2: Implementable> Implementable for Antijoin<P1, P2> {
28 fn dependencies(&self) -> Dependencies {
29 Dependencies::merge(
30 self.left_plan.dependencies(),
31 self.right_plan.dependencies(),
32 )
33 }
34
35 fn into_bindings(&self) -> Vec<Binding> {
36 unimplemented!();
37 }
46
47 fn implement<'b, T, I, S>(
48 &self,
49 nested: &mut Iterative<'b, S, u64>,
50 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
51 context: &mut I,
52 ) -> (Implemented<'b, S>, ShutdownHandle)
53 where
54 T: Timestamp + Lattice,
55 I: ImplContext<T>,
56 S: Scope<Timestamp = T>,
57 {
58 let mut shutdown_handle = ShutdownHandle::empty();
59 let left = {
60 let (left, shutdown) = self
61 .left_plan
62 .implement(nested, local_arrangements, context);
63 shutdown_handle.merge_with(shutdown);
64 left
65 };
66 let right = {
67 let (right, shutdown) = self
68 .right_plan
69 .implement(nested, local_arrangements, context);
70 shutdown_handle.merge_with(shutdown);
71 right
72 };
73
74 let variables = self
75 .variables
76 .iter()
77 .cloned()
78 .chain(
79 left.variables()
80 .drain(..)
81 .filter(|x| !self.variables.contains(x)),
82 )
83 .collect();
84
85 let right_projected = {
86 let (projected, shutdown) = right.projected(nested, context, &self.variables);
87 shutdown_handle.merge_with(shutdown);
88 projected
89 };
90
91 let left_arranged = {
92 let (arranged, shutdown) = left.tuples_by_variables(nested, context, &self.variables);
93 shutdown_handle.merge_with(shutdown);
94 arranged
95 };
96
97 let tuples = left_arranged
98 .distinct()
99 .antijoin(&right_projected.distinct())
100 .map(|(key, tuple)| key.iter().cloned().chain(tuple.iter().cloned()).collect());
101
102 let relation = CollectionRelation { variables, tuples };
103
104 (Implemented::Collection(relation), shutdown_handle)
105 }
106}