declarative_dataflow/plan/
pull_v2.rs

1//! Pull expression plan, but without nesting.
2
3use std::collections::HashMap;
4
5use timely::dataflow::scopes::child::Iterative;
6use timely::dataflow::{Scope, Stream};
7use timely::order::Product;
8use timely::progress::Timestamp;
9
10use differential_dataflow::lattice::Lattice;
11
12use crate::binding::AsBinding;
13use crate::plan::{Dependencies, ImplContext, Implementable, Plan};
14use crate::{Aid, Value, Var};
15use crate::{Relation, ShutdownHandle, VariableMap};
16
17/// A sequence of attributes that uniquely identify a nesting level in
18/// a Pull query.
19pub type PathId = Vec<Aid>;
20
21/// A plan stage for extracting all matching [e a v] tuples for a
22/// given set of attributes and an input relation specifying entities.
23#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
24pub struct PullLevel<P: Implementable> {
25    /// Plan for the input relation.
26    pub plan: Box<P>,
27    /// Eid variable.
28    pub pull_variable: Var,
29    /// Attributes to pull for the input entities.
30    pub pull_attributes: Vec<Aid>,
31    /// Attribute names to distinguish plans of the same
32    /// length. Useful to feed into a nested hash-map directly.
33    pub path_attributes: Vec<Aid>,
34    /// @TODO
35    pub cardinality_many: bool,
36}
37
38impl<P: Implementable> PullLevel<P> {
39    /// See Implementable::dependencies, as PullLevel v2 can't
40    /// implement Implementable directly.
41    fn dependencies(&self) -> Dependencies {
42        let mut dependencies = self.plan.dependencies();
43
44        for attribute in &self.pull_attributes {
45            let attribute_dependencies = Dependencies::attribute(&attribute);
46            dependencies = Dependencies::merge(dependencies, attribute_dependencies);
47        }
48
49        dependencies
50    }
51
52    /// See Implementable::implement, as PullLevel v2 can't implement
53    /// Implementable directly.
54    fn implement<'b, T, I, S>(
55        &self,
56        nested: &mut Iterative<'b, S, u64>,
57        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
58        context: &mut I,
59    ) -> (
60        HashMap<PathId, Stream<S, (Vec<Value>, S::Timestamp, isize)>>,
61        ShutdownHandle,
62    )
63    where
64        T: Timestamp + Lattice,
65        I: ImplContext<T>,
66        S: Scope<Timestamp = T>,
67    {
68        use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
69        use differential_dataflow::operators::JoinCore;
70        use differential_dataflow::trace::implementations::ord::OrdValSpine;
71        use differential_dataflow::trace::TraceReader;
72
73        assert_eq!(self.pull_attributes.is_empty(), false);
74
75        let (input, mut shutdown_handle) = self.plan.implement(nested, local_arrangements, context);
76
77        // Arrange input entities by eid.
78        let e_offset = input
79            .binds(self.pull_variable)
80            .expect("input relation doesn't bind pull_variable");
81
82        let paths = {
83            let (tuples, shutdown) = input.tuples(nested, context);
84            shutdown_handle.merge_with(shutdown);
85            tuples
86        };
87
88        let e_path: Arranged<
89            Iterative<S, u64>,
90            TraceAgent<OrdValSpine<Value, Vec<Value>, Product<T, u64>, isize>>,
91        > = paths.map(move |t| (t[e_offset].clone(), t)).arrange();
92
93        let mut shutdown_handle = shutdown_handle;
94        let path_streams = self
95            .pull_attributes
96            .iter()
97            .map(|a| {
98                let e_v = match context.forward_propose(a) {
99                    None => panic!("attribute {:?} does not exist", a),
100                    Some(propose_trace) => {
101                        let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
102                        let (arranged, shutdown_propose) =
103                            propose_trace.import_core(&nested.parent, a);
104
105                        let e_v = arranged.enter_at(nested, move |_, _, time| {
106                            let mut forwarded = time.clone();
107                            forwarded.advance_by(&frontier);
108                            Product::new(forwarded, 0)
109                        });
110
111                        shutdown_handle.add_button(shutdown_propose);
112
113                        e_v
114                    }
115                };
116
117                let path_id: Vec<Aid> = {
118                    assert_eq!(self.path_attributes.is_empty(), false);
119
120                    let mut path_attributes = self.path_attributes.clone();
121                    path_attributes.push(a.clone());
122                    path_attributes
123                };
124
125                let path_stream = e_path
126                    .join_core(&e_v, move |_e, path: &Vec<Value>, v: &Value| {
127                        let mut result = path.clone();
128                        result.push(v.clone());
129
130                        Some(result)
131                    })
132                    .leave()
133                    .inner;
134
135                (path_id, path_stream)
136            })
137            .collect::<HashMap<_, _>>();
138
139        (path_streams, shutdown_handle)
140    }
141}
142
143/// A plan stage for extracting all tuples for a given set of
144/// attributes.
145#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
146pub struct PullAll {
147    /// Attributes to pull for the input entities.
148    pub pull_attributes: Vec<Aid>,
149}
150
151impl PullAll {
152    /// See Implementable::dependencies, as PullAll v2 can't implement
153    /// Implementable directly.
154    fn dependencies(&self) -> Dependencies {
155        let mut dependencies = Dependencies::none();
156
157        for attribute in &self.pull_attributes {
158            let attribute_dependencies = Dependencies::attribute(&attribute);
159            dependencies = Dependencies::merge(dependencies, attribute_dependencies);
160        }
161
162        dependencies
163    }
164
165    /// See Implementable::implement, as PullAll v2 can't implement
166    /// Implementable directly.
167    fn implement<'b, T, I, S>(
168        &self,
169        nested: &mut Iterative<'b, S, u64>,
170        _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
171        context: &mut I,
172    ) -> (
173        HashMap<PathId, Stream<S, (Vec<Value>, S::Timestamp, isize)>>,
174        ShutdownHandle,
175    )
176    where
177        T: Timestamp + Lattice,
178        I: ImplContext<T>,
179        S: Scope<Timestamp = T>,
180    {
181        use differential_dataflow::trace::TraceReader;
182
183        assert!(!self.pull_attributes.is_empty());
184
185        let mut shutdown_handle = ShutdownHandle::empty();
186
187        let path_streams = self
188            .pull_attributes
189            .iter()
190            .map(|a| {
191                let e_v = match context.forward_propose(a) {
192                    None => panic!("attribute {:?} does not exist", a),
193                    Some(propose_trace) => {
194                        let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
195                        let (arranged, shutdown_propose) =
196                            propose_trace.import_core(&nested.parent, a);
197
198                        let e_v = arranged.enter_at(nested, move |_, _, time| {
199                            let mut forwarded = time.clone();
200                            forwarded.advance_by(&frontier);
201                            Product::new(forwarded, 0)
202                        });
203
204                        shutdown_handle.add_button(shutdown_propose);
205
206                        e_v
207                    }
208                };
209
210                let path_stream = e_v
211                    .as_collection(|e, v| vec![e.clone(), v.clone()])
212                    .leave()
213                    .inner;
214
215                (vec![a.to_string()], path_stream)
216            })
217            .collect::<HashMap<_, _>>();
218
219        (path_streams, shutdown_handle)
220    }
221}
222
223/// @TODO
224#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
225pub enum Pull {
226    /// @TODO
227    All(PullAll),
228    /// @TODO
229    Level(PullLevel<Plan>),
230}
231
232impl Pull {
233    /// See Implementable::dependencies, as Pull v2 can't implement
234    /// Implementable directly.
235    pub fn dependencies(&self) -> Dependencies {
236        match self {
237            Pull::All(ref pull) => pull.dependencies(),
238            Pull::Level(ref pull) => pull.dependencies(),
239        }
240    }
241
242    /// See Implementable::implement, as Pull v2 can't implement
243    /// Implementable directly.
244    pub fn implement<'b, T, I, S>(
245        &self,
246        nested: &mut Iterative<'b, S, u64>,
247        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
248        context: &mut I,
249    ) -> (
250        HashMap<PathId, Stream<S, (Vec<Value>, S::Timestamp, isize)>>,
251        ShutdownHandle,
252    )
253    where
254        T: Timestamp + Lattice,
255        I: ImplContext<T>,
256        S: Scope<Timestamp = T>,
257    {
258        match self {
259            Pull::All(ref pull) => pull.implement(nested, local_arrangements, context),
260            Pull::Level(ref pull) => pull.implement(nested, local_arrangements, context),
261        }
262    }
263}