declarative_dataflow/plan/
join.rs

1//! Equijoin expression plan.
2
3use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::order::Product;
6use timely::progress::Timestamp;
7
8use differential_dataflow::lattice::Lattice;
9use differential_dataflow::operators::arrange::{Arrange, Arranged};
10use differential_dataflow::operators::JoinCore;
11use differential_dataflow::trace::TraceReader;
12
13use crate::binding::{AsBinding, Binding};
14use crate::plan::{next_id, Dependencies, ImplContext, Implementable};
15use crate::{Aid, Eid, Value, Var};
16use crate::{
17    AttributeBinding, CollectionRelation, Implemented, Relation, ShutdownHandle, TraceValHandle,
18    VariableMap,
19};
20
21/// A plan stage joining two source relations on the specified
22/// variables. Throws if any of the join variables isn't bound by both
23/// sources.
24#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
25pub struct Join<P1: Implementable, P2: Implementable> {
26    /// TODO
27    pub variables: Vec<Var>,
28    /// Plan for the left input.
29    pub left_plan: Box<P1>,
30    /// Plan for the right input.
31    pub right_plan: Box<P2>,
32}
33
34fn attribute_attribute<'b, T, I, S>(
35    nested: &mut Iterative<'b, S, u64>,
36    context: &mut I,
37    target: Var,
38    left: AttributeBinding,
39    right: AttributeBinding,
40) -> (Implemented<'b, S>, ShutdownHandle)
41where
42    T: Timestamp + Lattice,
43    I: ImplContext<T>,
44    S: Scope<Timestamp = T>,
45{
46    let mut variables = Vec::with_capacity(3);
47    variables.push(target);
48
49    let (left_arranged, shutdown_left) = {
50        let (mut index, shutdown_button) = if target == left.variables.0 {
51            variables.push(left.variables.1);
52            context
53                .forward_propose(&left.source_attribute)
54                .expect("forward propose trace does not exist")
55                .import_core(&nested.parent, &left.source_attribute)
56        } else if target == left.variables.1 {
57            variables.push(left.variables.0);
58            context
59                .reverse_propose(&left.source_attribute)
60                .expect("reverse propose trace does not exist")
61                .import_core(&nested.parent, &left.source_attribute)
62        } else {
63            panic!("Unbound target variable in Attribute<->Attribute join.");
64        };
65
66        let frontier = index.trace.advance_frontier().to_vec();
67        let forwarded = index.enter_at(nested, move |_, _, time| {
68            let mut forwarded = time.clone();
69            forwarded.advance_by(&frontier);
70            Product::new(forwarded, 0)
71        });
72
73        (forwarded, shutdown_button)
74    };
75
76    let (right_arranged, shutdown_right) = {
77        let (mut index, shutdown_button) = if target == right.variables.0 {
78            variables.push(right.variables.1);
79            context
80                .forward_propose(&right.source_attribute)
81                .expect("forward propose trace does not exist")
82                .import_core(&nested.parent, &right.source_attribute)
83        } else if target == right.variables.1 {
84            variables.push(right.variables.0);
85            context
86                .reverse_propose(&right.source_attribute)
87                .expect("reverse propose trace does not exist")
88                .import_core(&nested.parent, &right.source_attribute)
89        } else {
90            panic!("Unbound target variable in Attribute<->Attribute join.");
91        };
92
93        let frontier = index.trace.advance_frontier().to_vec();
94        let forwarded = index.enter_at(nested, move |_, _, time| {
95            let mut forwarded = time.clone();
96            forwarded.advance_by(&frontier);
97            Product::new(forwarded, 0)
98        });
99
100        (forwarded, shutdown_button)
101    };
102
103    let tuples = left_arranged.join_core(&right_arranged, move |key: &Value, v1, v2| {
104        let mut out = Vec::with_capacity(3);
105        out.push(key.clone());
106        out.push(v1.clone());
107        out.push(v2.clone());
108
109        Some(out)
110    });
111
112    let mut shutdown_handle = ShutdownHandle::from_button(shutdown_left);
113    shutdown_handle.add_button(shutdown_right);
114
115    let relation = CollectionRelation { variables, tuples };
116
117    (Implemented::Collection(relation), shutdown_handle)
118}
119
120fn collection_collection<'b, T, S, I>(
121    nested: &mut Iterative<'b, S, u64>,
122    context: &mut I,
123    target_variables: &[Var],
124    left: CollectionRelation<'b, S>,
125    right: CollectionRelation<'b, S>,
126) -> (Implemented<'b, S>, ShutdownHandle)
127where
128    T: Timestamp + Lattice,
129    I: ImplContext<T>,
130    S: Scope<Timestamp = T>,
131{
132    let mut shutdown_handle = ShutdownHandle::empty();
133
134    let variables = target_variables
135        .iter()
136        .cloned()
137        .chain(
138            left.variables()
139                .drain(..)
140                .filter(|x| !target_variables.contains(x)),
141        )
142        .chain(
143            right
144                .variables()
145                .drain(..)
146                .filter(|x| !target_variables.contains(x)),
147        )
148        .collect();
149
150    let left_arranged: Arranged<
151        Iterative<'b, S, u64>,
152        TraceValHandle<Vec<Value>, Vec<Value>, Product<S::Timestamp, u64>, isize>,
153    > = {
154        let (arranged, shutdown) = left.tuples_by_variables(nested, context, &target_variables);
155        shutdown_handle.merge_with(shutdown);
156        arranged.arrange()
157    };
158
159    let right_arranged: Arranged<
160        Iterative<'b, S, u64>,
161        TraceValHandle<Vec<Value>, Vec<Value>, Product<S::Timestamp, u64>, isize>,
162    > = {
163        let (arranged, shutdown) = right.tuples_by_variables(nested, context, &target_variables);
164        shutdown_handle.merge_with(shutdown);
165        arranged.arrange()
166    };
167
168    let tuples = left_arranged.join_core(&right_arranged, |key: &Vec<Value>, v1, v2| {
169        Some(
170            key.iter()
171                .cloned()
172                .chain(v1.iter().cloned())
173                .chain(v2.iter().cloned())
174                .collect(),
175        )
176    });
177
178    let relation = CollectionRelation { variables, tuples };
179
180    (Implemented::Collection(relation), shutdown_handle)
181}
182
183fn collection_attribute<'b, T, S, I>(
184    nested: &mut Iterative<'b, S, u64>,
185    context: &mut I,
186    target_variables: &[Var],
187    left: CollectionRelation<'b, S>,
188    right: AttributeBinding,
189) -> (Implemented<'b, S>, ShutdownHandle)
190where
191    T: Timestamp + Lattice,
192    I: ImplContext<T>,
193    S: Scope<Timestamp = T>,
194{
195    // @TODO specialized implementation
196
197    let (tuples, shutdown_propose) = match context.forward_propose(&right.source_attribute) {
198        None => panic!("attribute {:?} does not exist", &right.source_attribute),
199        Some(propose_trace) => {
200            let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
201            let (propose, shutdown_propose) =
202                propose_trace.import_core(&nested.parent, &right.source_attribute);
203
204            let tuples = propose
205                .enter_at(nested, move |_, _, time| {
206                    let mut forwarded = time.clone();
207                    forwarded.advance_by(&frontier);
208                    Product::new(forwarded, 0)
209                })
210                .as_collection(|e, v| vec![e.clone(), v.clone()]);
211
212            (tuples, shutdown_propose)
213        }
214    };
215
216    let right_collected = CollectionRelation {
217        variables: vec![right.variables.0, right.variables.1],
218        tuples,
219    };
220
221    let (implemented, mut shutdown_handle) =
222        collection_collection(nested, context, target_variables, left, right_collected);
223
224    shutdown_handle.add_button(shutdown_propose);
225
226    (implemented, shutdown_handle)
227}
228
229//             Some(var) => {
230//                 assert!(*var == self.variables.1);
231
232//                 let (index, shutdown_button) = context
233//                     .forward_validate(&self.source_attribute)
234//                     .unwrap()
235//                     .import_core(&scope.parent, &self.source_attribute);
236
237//                 let frontier = index.trace.advance_frontier().to_vec();
238//                 let forwarded = index.enter_at(scope, move |_, _, time| {
239//                     let mut forwarded = time.clone();
240//                     forwarded.advance_by(&frontier);
241//                     Product::new(forwarded, 0)
242//                 });
243
244//                 (forwarded, ShutdownHandle::from_button(shutdown_button))
245//             }
246
247//             Some(var) => {
248//                 assert!(*var == self.variables.0);
249
250//                 let (index, shutdown_button) = context
251//                     .reverse_validate(&self.source_attribute)
252//                     .unwrap()
253//                     .import_core(&scope.parent, &self.source_attribute);
254
255//                 let frontier = index.trace.advance_frontier().to_vec();
256//                 let forwarded = index.enter_at(scope, move |_, _, time| {
257//                     let mut forwarded = time.clone();
258//                     forwarded.advance_by(&frontier);
259//                     Product::new(forwarded, 0)
260//                 });
261
262//                 (forwarded, ShutdownHandle::from_button(shutdown_button))
263//             }
264
265impl<P1: Implementable, P2: Implementable> Implementable for Join<P1, P2> {
266    fn dependencies(&self) -> Dependencies {
267        Dependencies::merge(
268            self.left_plan.dependencies(),
269            self.right_plan.dependencies(),
270        )
271    }
272
273    fn into_bindings(&self) -> Vec<Binding> {
274        let mut left_bindings = self.left_plan.into_bindings();
275        let mut right_bindings = self.right_plan.into_bindings();
276
277        let mut bindings = Vec::with_capacity(left_bindings.len() + right_bindings.len());
278        bindings.append(&mut left_bindings);
279        bindings.append(&mut right_bindings);
280
281        bindings
282    }
283
284    fn datafy(&self) -> Vec<(Eid, Aid, Value)> {
285        let eid = next_id();
286
287        let mut left_data = self.left_plan.datafy();
288        let mut right_data = self.right_plan.datafy();
289
290        let mut left_eids: Vec<(Eid, Aid, Value)> = left_data
291            .iter()
292            .map(|(e, _, _)| (eid, "df.join/binding".to_string(), Value::Eid(*e)))
293            .collect();
294
295        let mut right_eids: Vec<(Eid, Aid, Value)> = right_data
296            .iter()
297            .map(|(e, _, _)| (eid, "df.join/binding".to_string(), Value::Eid(*e)))
298            .collect();
299
300        let mut data = Vec::with_capacity(
301            left_data.len() + right_data.len() + left_eids.len() + right_eids.len(),
302        );
303        data.append(&mut left_data);
304        data.append(&mut right_data);
305        data.append(&mut left_eids);
306        data.append(&mut right_eids);
307
308        data
309    }
310
311    fn implement<'b, T, I, S>(
312        &self,
313        nested: &mut Iterative<'b, S, u64>,
314        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
315        context: &mut I,
316    ) -> (Implemented<'b, S>, ShutdownHandle)
317    where
318        T: Timestamp + Lattice,
319        I: ImplContext<T>,
320        S: Scope<Timestamp = T>,
321    {
322        assert!(!self.variables.is_empty());
323
324        let (left, shutdown_left) = self
325            .left_plan
326            .implement(nested, local_arrangements, context);
327        let (right, shutdown_right) =
328            self.right_plan
329                .implement(nested, local_arrangements, context);
330
331        let (implemented, mut shutdown_handle) = match left {
332            Implemented::Attribute(left) => {
333                match right {
334                    Implemented::Attribute(right) => {
335                        if self.variables.len() == 1 {
336                            attribute_attribute(nested, context, self.variables[0], left, right)
337                        } else if self.variables.len() == 2 {
338                            unimplemented!();
339                        // intersect_attributes(nested, context, self.variables, left, right)
340                        } else {
341                            panic!(
342                                "Attribute<->Attribute joins can't target more than two variables."
343                            );
344                        }
345                    }
346                    Implemented::Collection(right) => {
347                        collection_attribute(nested, context, &self.variables, right, left)
348                    }
349                }
350            }
351            Implemented::Collection(left) => match right {
352                Implemented::Attribute(right) => {
353                    collection_attribute(nested, context, &self.variables, left, right)
354                }
355                Implemented::Collection(right) => {
356                    collection_collection(nested, context, &self.variables, left, right)
357                }
358            },
359        };
360
361        shutdown_handle.merge_with(shutdown_left);
362        shutdown_handle.merge_with(shutdown_right);
363
364        (implemented, shutdown_handle)
365    }
366}