1use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::order::Product;
6use timely::progress::Timestamp;
7
8use differential_dataflow::lattice::Lattice;
9use differential_dataflow::operators::arrange::{Arrange, Arranged};
10use differential_dataflow::operators::JoinCore;
11use differential_dataflow::trace::TraceReader;
12
13use crate::binding::{AsBinding, Binding};
14use crate::plan::{next_id, Dependencies, ImplContext, Implementable};
15use crate::{Aid, Eid, Value, Var};
16use crate::{
17 AttributeBinding, CollectionRelation, Implemented, Relation, ShutdownHandle, TraceValHandle,
18 VariableMap,
19};
20
21#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
25pub struct Join<P1: Implementable, P2: Implementable> {
26 pub variables: Vec<Var>,
28 pub left_plan: Box<P1>,
30 pub right_plan: Box<P2>,
32}
33
34fn attribute_attribute<'b, T, I, S>(
35 nested: &mut Iterative<'b, S, u64>,
36 context: &mut I,
37 target: Var,
38 left: AttributeBinding,
39 right: AttributeBinding,
40) -> (Implemented<'b, S>, ShutdownHandle)
41where
42 T: Timestamp + Lattice,
43 I: ImplContext<T>,
44 S: Scope<Timestamp = T>,
45{
46 let mut variables = Vec::with_capacity(3);
47 variables.push(target);
48
49 let (left_arranged, shutdown_left) = {
50 let (mut index, shutdown_button) = if target == left.variables.0 {
51 variables.push(left.variables.1);
52 context
53 .forward_propose(&left.source_attribute)
54 .expect("forward propose trace does not exist")
55 .import_core(&nested.parent, &left.source_attribute)
56 } else if target == left.variables.1 {
57 variables.push(left.variables.0);
58 context
59 .reverse_propose(&left.source_attribute)
60 .expect("reverse propose trace does not exist")
61 .import_core(&nested.parent, &left.source_attribute)
62 } else {
63 panic!("Unbound target variable in Attribute<->Attribute join.");
64 };
65
66 let frontier = index.trace.advance_frontier().to_vec();
67 let forwarded = index.enter_at(nested, move |_, _, time| {
68 let mut forwarded = time.clone();
69 forwarded.advance_by(&frontier);
70 Product::new(forwarded, 0)
71 });
72
73 (forwarded, shutdown_button)
74 };
75
76 let (right_arranged, shutdown_right) = {
77 let (mut index, shutdown_button) = if target == right.variables.0 {
78 variables.push(right.variables.1);
79 context
80 .forward_propose(&right.source_attribute)
81 .expect("forward propose trace does not exist")
82 .import_core(&nested.parent, &right.source_attribute)
83 } else if target == right.variables.1 {
84 variables.push(right.variables.0);
85 context
86 .reverse_propose(&right.source_attribute)
87 .expect("reverse propose trace does not exist")
88 .import_core(&nested.parent, &right.source_attribute)
89 } else {
90 panic!("Unbound target variable in Attribute<->Attribute join.");
91 };
92
93 let frontier = index.trace.advance_frontier().to_vec();
94 let forwarded = index.enter_at(nested, move |_, _, time| {
95 let mut forwarded = time.clone();
96 forwarded.advance_by(&frontier);
97 Product::new(forwarded, 0)
98 });
99
100 (forwarded, shutdown_button)
101 };
102
103 let tuples = left_arranged.join_core(&right_arranged, move |key: &Value, v1, v2| {
104 let mut out = Vec::with_capacity(3);
105 out.push(key.clone());
106 out.push(v1.clone());
107 out.push(v2.clone());
108
109 Some(out)
110 });
111
112 let mut shutdown_handle = ShutdownHandle::from_button(shutdown_left);
113 shutdown_handle.add_button(shutdown_right);
114
115 let relation = CollectionRelation { variables, tuples };
116
117 (Implemented::Collection(relation), shutdown_handle)
118}
119
120fn collection_collection<'b, T, S, I>(
121 nested: &mut Iterative<'b, S, u64>,
122 context: &mut I,
123 target_variables: &[Var],
124 left: CollectionRelation<'b, S>,
125 right: CollectionRelation<'b, S>,
126) -> (Implemented<'b, S>, ShutdownHandle)
127where
128 T: Timestamp + Lattice,
129 I: ImplContext<T>,
130 S: Scope<Timestamp = T>,
131{
132 let mut shutdown_handle = ShutdownHandle::empty();
133
134 let variables = target_variables
135 .iter()
136 .cloned()
137 .chain(
138 left.variables()
139 .drain(..)
140 .filter(|x| !target_variables.contains(x)),
141 )
142 .chain(
143 right
144 .variables()
145 .drain(..)
146 .filter(|x| !target_variables.contains(x)),
147 )
148 .collect();
149
150 let left_arranged: Arranged<
151 Iterative<'b, S, u64>,
152 TraceValHandle<Vec<Value>, Vec<Value>, Product<S::Timestamp, u64>, isize>,
153 > = {
154 let (arranged, shutdown) = left.tuples_by_variables(nested, context, &target_variables);
155 shutdown_handle.merge_with(shutdown);
156 arranged.arrange()
157 };
158
159 let right_arranged: Arranged<
160 Iterative<'b, S, u64>,
161 TraceValHandle<Vec<Value>, Vec<Value>, Product<S::Timestamp, u64>, isize>,
162 > = {
163 let (arranged, shutdown) = right.tuples_by_variables(nested, context, &target_variables);
164 shutdown_handle.merge_with(shutdown);
165 arranged.arrange()
166 };
167
168 let tuples = left_arranged.join_core(&right_arranged, |key: &Vec<Value>, v1, v2| {
169 Some(
170 key.iter()
171 .cloned()
172 .chain(v1.iter().cloned())
173 .chain(v2.iter().cloned())
174 .collect(),
175 )
176 });
177
178 let relation = CollectionRelation { variables, tuples };
179
180 (Implemented::Collection(relation), shutdown_handle)
181}
182
183fn collection_attribute<'b, T, S, I>(
184 nested: &mut Iterative<'b, S, u64>,
185 context: &mut I,
186 target_variables: &[Var],
187 left: CollectionRelation<'b, S>,
188 right: AttributeBinding,
189) -> (Implemented<'b, S>, ShutdownHandle)
190where
191 T: Timestamp + Lattice,
192 I: ImplContext<T>,
193 S: Scope<Timestamp = T>,
194{
195 let (tuples, shutdown_propose) = match context.forward_propose(&right.source_attribute) {
198 None => panic!("attribute {:?} does not exist", &right.source_attribute),
199 Some(propose_trace) => {
200 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
201 let (propose, shutdown_propose) =
202 propose_trace.import_core(&nested.parent, &right.source_attribute);
203
204 let tuples = propose
205 .enter_at(nested, move |_, _, time| {
206 let mut forwarded = time.clone();
207 forwarded.advance_by(&frontier);
208 Product::new(forwarded, 0)
209 })
210 .as_collection(|e, v| vec![e.clone(), v.clone()]);
211
212 (tuples, shutdown_propose)
213 }
214 };
215
216 let right_collected = CollectionRelation {
217 variables: vec![right.variables.0, right.variables.1],
218 tuples,
219 };
220
221 let (implemented, mut shutdown_handle) =
222 collection_collection(nested, context, target_variables, left, right_collected);
223
224 shutdown_handle.add_button(shutdown_propose);
225
226 (implemented, shutdown_handle)
227}
228
229impl<P1: Implementable, P2: Implementable> Implementable for Join<P1, P2> {
266 fn dependencies(&self) -> Dependencies {
267 Dependencies::merge(
268 self.left_plan.dependencies(),
269 self.right_plan.dependencies(),
270 )
271 }
272
273 fn into_bindings(&self) -> Vec<Binding> {
274 let mut left_bindings = self.left_plan.into_bindings();
275 let mut right_bindings = self.right_plan.into_bindings();
276
277 let mut bindings = Vec::with_capacity(left_bindings.len() + right_bindings.len());
278 bindings.append(&mut left_bindings);
279 bindings.append(&mut right_bindings);
280
281 bindings
282 }
283
284 fn datafy(&self) -> Vec<(Eid, Aid, Value)> {
285 let eid = next_id();
286
287 let mut left_data = self.left_plan.datafy();
288 let mut right_data = self.right_plan.datafy();
289
290 let mut left_eids: Vec<(Eid, Aid, Value)> = left_data
291 .iter()
292 .map(|(e, _, _)| (eid, "df.join/binding".to_string(), Value::Eid(*e)))
293 .collect();
294
295 let mut right_eids: Vec<(Eid, Aid, Value)> = right_data
296 .iter()
297 .map(|(e, _, _)| (eid, "df.join/binding".to_string(), Value::Eid(*e)))
298 .collect();
299
300 let mut data = Vec::with_capacity(
301 left_data.len() + right_data.len() + left_eids.len() + right_eids.len(),
302 );
303 data.append(&mut left_data);
304 data.append(&mut right_data);
305 data.append(&mut left_eids);
306 data.append(&mut right_eids);
307
308 data
309 }
310
311 fn implement<'b, T, I, S>(
312 &self,
313 nested: &mut Iterative<'b, S, u64>,
314 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
315 context: &mut I,
316 ) -> (Implemented<'b, S>, ShutdownHandle)
317 where
318 T: Timestamp + Lattice,
319 I: ImplContext<T>,
320 S: Scope<Timestamp = T>,
321 {
322 assert!(!self.variables.is_empty());
323
324 let (left, shutdown_left) = self
325 .left_plan
326 .implement(nested, local_arrangements, context);
327 let (right, shutdown_right) =
328 self.right_plan
329 .implement(nested, local_arrangements, context);
330
331 let (implemented, mut shutdown_handle) = match left {
332 Implemented::Attribute(left) => {
333 match right {
334 Implemented::Attribute(right) => {
335 if self.variables.len() == 1 {
336 attribute_attribute(nested, context, self.variables[0], left, right)
337 } else if self.variables.len() == 2 {
338 unimplemented!();
339 } else {
341 panic!(
342 "Attribute<->Attribute joins can't target more than two variables."
343 );
344 }
345 }
346 Implemented::Collection(right) => {
347 collection_attribute(nested, context, &self.variables, right, left)
348 }
349 }
350 }
351 Implemented::Collection(left) => match right {
352 Implemented::Attribute(right) => {
353 collection_attribute(nested, context, &self.variables, left, right)
354 }
355 Implemented::Collection(right) => {
356 collection_collection(nested, context, &self.variables, left, right)
357 }
358 },
359 };
360
361 shutdown_handle.merge_with(shutdown_left);
362 shutdown_handle.merge_with(shutdown_right);
363
364 (implemented, shutdown_handle)
365 }
366}