declarative_dataflow/plan/
pull.rs1use timely::dataflow::operators::{Concat, Concatenate};
4use timely::dataflow::scopes::child::Iterative;
5use timely::dataflow::Scope;
6use timely::order::Product;
7use timely::progress::Timestamp;
8
9use differential_dataflow::lattice::Lattice;
10use differential_dataflow::AsCollection;
11
12use crate::binding::AsBinding;
13use crate::plan::{Dependencies, ImplContext, Implementable};
14use crate::{Aid, Value, Var};
15use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, VariableMap};
16
17#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
20pub struct PullLevel<P: Implementable> {
21 pub variables: Vec<Var>,
23 pub plan: Box<P>,
25 pub pull_variable: Var,
27 pub pull_attributes: Vec<Aid>,
29 pub path_attributes: Vec<Aid>,
32 pub cardinality_many: bool,
34}
35
36#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
43pub struct Pull<P: Implementable> {
44 pub variables: Vec<Var>,
46 pub paths: Vec<P>,
48}
49
50fn interleave(values: &[Value], constants: &[Aid]) -> Vec<Value> {
51 if values.is_empty() || constants.is_empty() {
52 values.to_owned()
53 } else {
54 let size: usize = values.len() + constants.len();
55 let mut result: Vec<Value> = Vec::with_capacity(size + 2);
57
58 let mut next_value = 0;
59 let mut next_const = 0;
60
61 for i in 0..size {
62 if i % 2 == 0 {
63 result.push(values[next_value].clone());
65 next_value += 1;
66 } else {
67 let a = constants[next_const].clone();
69 result.push(Value::Aid(a));
70 next_const += 1;
71 }
72 }
73
74 result
75 }
76}
77
78impl<P: Implementable> Implementable for PullLevel<P> {
79 fn dependencies(&self) -> Dependencies {
80 let mut dependencies = self.plan.dependencies();
81
82 for attribute in &self.pull_attributes {
83 let attribute_dependencies = Dependencies::attribute(&attribute);
84 dependencies = Dependencies::merge(dependencies, attribute_dependencies);
85 }
86
87 dependencies
88 }
89
90 fn implement<'b, T, I, S>(
91 &self,
92 nested: &mut Iterative<'b, S, u64>,
93 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
94 context: &mut I,
95 ) -> (Implemented<'b, S>, ShutdownHandle)
96 where
97 T: Timestamp + Lattice,
98 I: ImplContext<T>,
99 S: Scope<Timestamp = T>,
100 {
101 use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
102 use differential_dataflow::operators::JoinCore;
103 use differential_dataflow::trace::implementations::ord::OrdValSpine;
104 use differential_dataflow::trace::TraceReader;
105
106 let (input, mut shutdown_handle) = self.plan.implement(nested, local_arrangements, context);
107
108 if self.pull_attributes.is_empty() {
109 if self.path_attributes.is_empty() {
110 (input, shutdown_handle)
112 } else {
113 let path_attributes = self.path_attributes.clone();
114 let tuples = {
115 let (tuples, shutdown) = input.tuples(nested, context);
116 shutdown_handle.merge_with(shutdown);
117
118 tuples.map(move |tuple| interleave(&tuple, &path_attributes))
119 };
120
121 (
122 Implemented::Collection(CollectionRelation {
123 variables: self.variables.to_vec(),
124 tuples,
125 }),
126 shutdown_handle,
127 )
128 }
129 } else {
130 let e_offset = input
132 .binds(self.pull_variable)
133 .expect("input relation doesn't bind pull_variable");
134
135 let paths = {
136 let (tuples, shutdown) = input.tuples(nested, context);
137 shutdown_handle.merge_with(shutdown);
138 tuples
139 };
140
141 let e_path: Arranged<
142 Iterative<S, u64>,
143 TraceAgent<OrdValSpine<Value, Vec<Value>, Product<T, u64>, isize>>,
144 > = paths.map(move |t| (t[e_offset].clone(), t)).arrange();
145
146 let mut shutdown_handle = shutdown_handle;
147 let streams = self.pull_attributes.iter().map(|a| {
148 let e_v = match context.forward_propose(a) {
149 None => panic!("attribute {:?} does not exist", a),
150 Some(propose_trace) => {
151 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
152 let (arranged, shutdown_propose) =
153 propose_trace.import_core(&nested.parent, a);
154
155 let e_v = arranged.enter_at(nested, move |_, _, time| {
156 let mut forwarded = time.clone();
157 forwarded.advance_by(&frontier);
158 Product::new(forwarded, 0)
159 });
160
161 shutdown_handle.add_button(shutdown_propose);
162
163 e_v
164 }
165 };
166
167 let attribute = Value::Aid(a.clone());
168 let path_attributes: Vec<Aid> = self.path_attributes.clone();
169
170 if path_attributes.is_empty() || self.cardinality_many {
171 e_path
172 .join_core(&e_v, move |_e, path: &Vec<Value>, v: &Value| {
173 let mut result = interleave(path, &path_attributes);
177 result.push(attribute.clone());
178 result.push(v.clone());
179
180 Some(result)
181 })
182 .inner
183 } else {
184 e_path
185 .join_core(&e_v, move |_e, path: &Vec<Value>, v: &Value| {
186 let mut result = interleave(path, &path_attributes);
190
191 result.pop().expect("malformed path");
195
196 result.push(attribute.clone());
197 result.push(v.clone());
198
199 Some(result)
200 })
201 .inner
202 }
203 });
204
205 let tuples = if self.path_attributes.is_empty() || self.cardinality_many {
206 nested.concatenate(streams)
207 } else {
208 let db_ids = {
209 let path_attributes = self.path_attributes.clone();
210 paths
211 .map(move |path| {
212 let mut result = interleave(&path, &path_attributes);
213 let eid = result.pop().expect("malformed path");
214
215 result.push(Value::Aid("db__id".to_string()));
216 result.push(eid);
217
218 result
219 })
220 .inner
221 };
222
223 nested.concatenate(streams).concat(&db_ids)
224 };
225
226 let relation = CollectionRelation {
227 variables: vec![], tuples: tuples.as_collection(),
229 };
230
231 (Implemented::Collection(relation), shutdown_handle)
232 }
233 }
234}
235
236impl<P: Implementable> Implementable for Pull<P> {
237 fn dependencies(&self) -> Dependencies {
238 let mut dependencies = Dependencies::none();
239 for path in self.paths.iter() {
240 dependencies = Dependencies::merge(dependencies, path.dependencies());
241 }
242
243 dependencies
244 }
245
246 fn implement<'b, T, I, S>(
247 &self,
248 nested: &mut Iterative<'b, S, u64>,
249 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
250 context: &mut I,
251 ) -> (Implemented<'b, S>, ShutdownHandle)
252 where
253 T: Timestamp + Lattice,
254 I: ImplContext<T>,
255 S: Scope<Timestamp = T>,
256 {
257 let mut scope = nested.clone();
258 let mut shutdown_handle = ShutdownHandle::empty();
259
260 let streams = self.paths.iter().map(|path| {
261 let relation = {
262 let (relation, shutdown) = path.implement(&mut scope, local_arrangements, context);
263 shutdown_handle.merge_with(shutdown);
264 relation
265 };
266
267 let tuples = {
268 let (tuples, shutdown) = relation.tuples(&mut scope, context);
269 shutdown_handle.merge_with(shutdown);
270 tuples
271 };
272
273 tuples.inner
274 });
275
276 let tuples = nested.concatenate(streams).as_collection();
277
278 let relation = CollectionRelation {
279 variables: self.variables.to_vec(),
280 tuples,
281 };
282
283 (Implemented::Collection(relation), shutdown_handle)
284 }
285}
286
287#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
290pub struct PullAll {
291 pub variables: Vec<Var>,
293 pub pull_attributes: Vec<Aid>,
295}
296
297impl Implementable for PullAll {
298 fn dependencies(&self) -> Dependencies {
299 let mut dependencies = Dependencies::none();
300
301 for attribute in &self.pull_attributes {
302 let attribute_dependencies = Dependencies::attribute(&attribute);
303 dependencies = Dependencies::merge(dependencies, attribute_dependencies);
304 }
305
306 dependencies
307 }
308
309 fn implement<'b, T, I, S>(
310 &self,
311 nested: &mut Iterative<'b, S, u64>,
312 _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
313 context: &mut I,
314 ) -> (Implemented<'b, S>, ShutdownHandle)
315 where
316 T: Timestamp + Lattice,
317 I: ImplContext<T>,
318 S: Scope<Timestamp = T>,
319 {
320 use differential_dataflow::trace::TraceReader;
321
322 assert!(!self.pull_attributes.is_empty());
323
324 let mut shutdown_handle = ShutdownHandle::empty();
325
326 let streams = self.pull_attributes.iter().map(|a| {
327 let e_v = match context.forward_propose(a) {
328 None => panic!("attribute {:?} does not exist", a),
329 Some(propose_trace) => {
330 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
331 let (arranged, shutdown_propose) = propose_trace.import_core(&nested.parent, a);
332
333 let e_v = arranged.enter_at(nested, move |_, _, time| {
334 let mut forwarded = time.clone();
335 forwarded.advance_by(&frontier);
336 Product::new(forwarded, 0)
337 });
338
339 shutdown_handle.add_button(shutdown_propose);
340
341 e_v
342 }
343 };
344
345 let attribute = Value::Aid(a.clone());
346
347 e_v.as_collection(move |e, v| vec![e.clone(), attribute.clone(), v.clone()])
348 .inner
349 });
350
351 let tuples = nested.concatenate(streams).as_collection();
352
353 let relation = CollectionRelation {
354 variables: vec![], tuples,
356 };
357
358 (Implemented::Collection(relation), shutdown_handle)
359 }
360}