declarative_dataflow/plan/
pull.rs

1//! Pull expression plan, but without nesting.
2
3use timely::dataflow::operators::{Concat, Concatenate};
4use timely::dataflow::scopes::child::Iterative;
5use timely::dataflow::Scope;
6use timely::order::Product;
7use timely::progress::Timestamp;
8
9use differential_dataflow::lattice::Lattice;
10use differential_dataflow::AsCollection;
11
12use crate::binding::AsBinding;
13use crate::plan::{Dependencies, ImplContext, Implementable};
14use crate::{Aid, Value, Var};
15use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, VariableMap};
16
17/// A plan stage for extracting all matching [e a v] tuples for a
18/// given set of attributes and an input relation specifying entities.
19#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
20pub struct PullLevel<P: Implementable> {
21    /// TODO
22    pub variables: Vec<Var>,
23    /// Plan for the input relation.
24    pub plan: Box<P>,
25    /// Eid variable.
26    pub pull_variable: Var,
27    /// Attributes to pull for the input entities.
28    pub pull_attributes: Vec<Aid>,
29    /// Attribute names to distinguish plans of the same
30    /// length. Useful to feed into a nested hash-map directly.
31    pub path_attributes: Vec<Aid>,
32    /// @TODO
33    pub cardinality_many: bool,
34}
35
36/// A plan stage for pull queries split into individual paths. So
37/// `[:parent/name {:parent/child [:child/name]}]` would be
38/// represented as:
39///
40/// (?parent)                      <- [:parent/name] | no constraints
41/// (?parent :parent/child ?child) <- [:child/name]  | [?parent :parent/child ?child]
42#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
43pub struct Pull<P: Implementable> {
44    /// TODO
45    pub variables: Vec<Var>,
46    /// Individual paths to pull.
47    pub paths: Vec<P>,
48}
49
50fn interleave(values: &[Value], constants: &[Aid]) -> Vec<Value> {
51    if values.is_empty() || constants.is_empty() {
52        values.to_owned()
53    } else {
54        let size: usize = values.len() + constants.len();
55        // + 2, because we know there'll be a and v coming...
56        let mut result: Vec<Value> = Vec::with_capacity(size + 2);
57
58        let mut next_value = 0;
59        let mut next_const = 0;
60
61        for i in 0..size {
62            if i % 2 == 0 {
63                // on even indices we take from the result tuple
64                result.push(values[next_value].clone());
65                next_value += 1;
66            } else {
67                // on odd indices we interleave an attribute
68                let a = constants[next_const].clone();
69                result.push(Value::Aid(a));
70                next_const += 1;
71            }
72        }
73
74        result
75    }
76}
77
78impl<P: Implementable> Implementable for PullLevel<P> {
79    fn dependencies(&self) -> Dependencies {
80        let mut dependencies = self.plan.dependencies();
81
82        for attribute in &self.pull_attributes {
83            let attribute_dependencies = Dependencies::attribute(&attribute);
84            dependencies = Dependencies::merge(dependencies, attribute_dependencies);
85        }
86
87        dependencies
88    }
89
90    fn implement<'b, T, I, S>(
91        &self,
92        nested: &mut Iterative<'b, S, u64>,
93        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
94        context: &mut I,
95    ) -> (Implemented<'b, S>, ShutdownHandle)
96    where
97        T: Timestamp + Lattice,
98        I: ImplContext<T>,
99        S: Scope<Timestamp = T>,
100    {
101        use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
102        use differential_dataflow::operators::JoinCore;
103        use differential_dataflow::trace::implementations::ord::OrdValSpine;
104        use differential_dataflow::trace::TraceReader;
105
106        let (input, mut shutdown_handle) = self.plan.implement(nested, local_arrangements, context);
107
108        if self.pull_attributes.is_empty() {
109            if self.path_attributes.is_empty() {
110                // nothing to pull
111                (input, shutdown_handle)
112            } else {
113                let path_attributes = self.path_attributes.clone();
114                let tuples = {
115                    let (tuples, shutdown) = input.tuples(nested, context);
116                    shutdown_handle.merge_with(shutdown);
117
118                    tuples.map(move |tuple| interleave(&tuple, &path_attributes))
119                };
120
121                (
122                    Implemented::Collection(CollectionRelation {
123                        variables: self.variables.to_vec(),
124                        tuples,
125                    }),
126                    shutdown_handle,
127                )
128            }
129        } else {
130            // Arrange input entities by eid.
131            let e_offset = input
132                .binds(self.pull_variable)
133                .expect("input relation doesn't bind pull_variable");
134
135            let paths = {
136                let (tuples, shutdown) = input.tuples(nested, context);
137                shutdown_handle.merge_with(shutdown);
138                tuples
139            };
140
141            let e_path: Arranged<
142                Iterative<S, u64>,
143                TraceAgent<OrdValSpine<Value, Vec<Value>, Product<T, u64>, isize>>,
144            > = paths.map(move |t| (t[e_offset].clone(), t)).arrange();
145
146            let mut shutdown_handle = shutdown_handle;
147            let streams = self.pull_attributes.iter().map(|a| {
148                let e_v = match context.forward_propose(a) {
149                    None => panic!("attribute {:?} does not exist", a),
150                    Some(propose_trace) => {
151                        let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
152                        let (arranged, shutdown_propose) =
153                            propose_trace.import_core(&nested.parent, a);
154
155                        let e_v = arranged.enter_at(nested, move |_, _, time| {
156                            let mut forwarded = time.clone();
157                            forwarded.advance_by(&frontier);
158                            Product::new(forwarded, 0)
159                        });
160
161                        shutdown_handle.add_button(shutdown_propose);
162
163                        e_v
164                    }
165                };
166
167                let attribute = Value::Aid(a.clone());
168                let path_attributes: Vec<Aid> = self.path_attributes.clone();
169
170                if path_attributes.is_empty() || self.cardinality_many {
171                    e_path
172                        .join_core(&e_v, move |_e, path: &Vec<Value>, v: &Value| {
173                            // Each result tuple must hold the interleaved
174                            // path, the attribute, and the value,
175                            // i.e. [?p "parent/child" ?c ?a ?v]
176                            let mut result = interleave(path, &path_attributes);
177                            result.push(attribute.clone());
178                            result.push(v.clone());
179
180                            Some(result)
181                        })
182                        .inner
183                } else {
184                    e_path
185                        .join_core(&e_v, move |_e, path: &Vec<Value>, v: &Value| {
186                            // Each result tuple must hold the interleaved
187                            // path, the attribute, and the value,
188                            // i.e. [?p "parent/child" ?c ?a ?v]
189                            let mut result = interleave(path, &path_attributes);
190
191                            // Cardinality single means we don't need
192                            // to distinguish child ids (there can
193                            // only be one).
194                            result.pop().expect("malformed path");
195
196                            result.push(attribute.clone());
197                            result.push(v.clone());
198
199                            Some(result)
200                        })
201                        .inner
202                }
203            });
204
205            let tuples = if self.path_attributes.is_empty() || self.cardinality_many {
206                nested.concatenate(streams)
207            } else {
208                let db_ids = {
209                    let path_attributes = self.path_attributes.clone();
210                    paths
211                        .map(move |path| {
212                            let mut result = interleave(&path, &path_attributes);
213                            let eid = result.pop().expect("malformed path");
214
215                            result.push(Value::Aid("db__id".to_string()));
216                            result.push(eid);
217
218                            result
219                        })
220                        .inner
221                };
222
223                nested.concatenate(streams).concat(&db_ids)
224            };
225
226            let relation = CollectionRelation {
227                variables: vec![], // @TODO
228                tuples: tuples.as_collection(),
229            };
230
231            (Implemented::Collection(relation), shutdown_handle)
232        }
233    }
234}
235
236impl<P: Implementable> Implementable for Pull<P> {
237    fn dependencies(&self) -> Dependencies {
238        let mut dependencies = Dependencies::none();
239        for path in self.paths.iter() {
240            dependencies = Dependencies::merge(dependencies, path.dependencies());
241        }
242
243        dependencies
244    }
245
246    fn implement<'b, T, I, S>(
247        &self,
248        nested: &mut Iterative<'b, S, u64>,
249        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
250        context: &mut I,
251    ) -> (Implemented<'b, S>, ShutdownHandle)
252    where
253        T: Timestamp + Lattice,
254        I: ImplContext<T>,
255        S: Scope<Timestamp = T>,
256    {
257        let mut scope = nested.clone();
258        let mut shutdown_handle = ShutdownHandle::empty();
259
260        let streams = self.paths.iter().map(|path| {
261            let relation = {
262                let (relation, shutdown) = path.implement(&mut scope, local_arrangements, context);
263                shutdown_handle.merge_with(shutdown);
264                relation
265            };
266
267            let tuples = {
268                let (tuples, shutdown) = relation.tuples(&mut scope, context);
269                shutdown_handle.merge_with(shutdown);
270                tuples
271            };
272
273            tuples.inner
274        });
275
276        let tuples = nested.concatenate(streams).as_collection();
277
278        let relation = CollectionRelation {
279            variables: self.variables.to_vec(),
280            tuples,
281        };
282
283        (Implemented::Collection(relation), shutdown_handle)
284    }
285}
286
287/// A plan stage for extracting all tuples for a given set of
288/// attributes.
289#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
290pub struct PullAll {
291    /// TODO
292    pub variables: Vec<Var>,
293    /// Attributes to pull for the input entities.
294    pub pull_attributes: Vec<Aid>,
295}
296
297impl Implementable for PullAll {
298    fn dependencies(&self) -> Dependencies {
299        let mut dependencies = Dependencies::none();
300
301        for attribute in &self.pull_attributes {
302            let attribute_dependencies = Dependencies::attribute(&attribute);
303            dependencies = Dependencies::merge(dependencies, attribute_dependencies);
304        }
305
306        dependencies
307    }
308
309    fn implement<'b, T, I, S>(
310        &self,
311        nested: &mut Iterative<'b, S, u64>,
312        _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
313        context: &mut I,
314    ) -> (Implemented<'b, S>, ShutdownHandle)
315    where
316        T: Timestamp + Lattice,
317        I: ImplContext<T>,
318        S: Scope<Timestamp = T>,
319    {
320        use differential_dataflow::trace::TraceReader;
321
322        assert!(!self.pull_attributes.is_empty());
323
324        let mut shutdown_handle = ShutdownHandle::empty();
325
326        let streams = self.pull_attributes.iter().map(|a| {
327            let e_v = match context.forward_propose(a) {
328                None => panic!("attribute {:?} does not exist", a),
329                Some(propose_trace) => {
330                    let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
331                    let (arranged, shutdown_propose) = propose_trace.import_core(&nested.parent, a);
332
333                    let e_v = arranged.enter_at(nested, move |_, _, time| {
334                        let mut forwarded = time.clone();
335                        forwarded.advance_by(&frontier);
336                        Product::new(forwarded, 0)
337                    });
338
339                    shutdown_handle.add_button(shutdown_propose);
340
341                    e_v
342                }
343            };
344
345            let attribute = Value::Aid(a.clone());
346
347            e_v.as_collection(move |e, v| vec![e.clone(), attribute.clone(), v.clone()])
348                .inner
349        });
350
351        let tuples = nested.concatenate(streams).as_collection();
352
353        let relation = CollectionRelation {
354            variables: vec![], // @TODO
355            tuples,
356        };
357
358        (Implemented::Collection(relation), shutdown_handle)
359    }
360}