declarative_dataflow/plan/
pull_v2.rs1use std::collections::HashMap;
4
5use timely::dataflow::scopes::child::Iterative;
6use timely::dataflow::{Scope, Stream};
7use timely::order::Product;
8use timely::progress::Timestamp;
9
10use differential_dataflow::lattice::Lattice;
11
12use crate::binding::AsBinding;
13use crate::plan::{Dependencies, ImplContext, Implementable, Plan};
14use crate::{Aid, Value, Var};
15use crate::{Relation, ShutdownHandle, VariableMap};
16
17pub type PathId = Vec<Aid>;
20
21#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
24pub struct PullLevel<P: Implementable> {
25 pub plan: Box<P>,
27 pub pull_variable: Var,
29 pub pull_attributes: Vec<Aid>,
31 pub path_attributes: Vec<Aid>,
34 pub cardinality_many: bool,
36}
37
38impl<P: Implementable> PullLevel<P> {
39 fn dependencies(&self) -> Dependencies {
42 let mut dependencies = self.plan.dependencies();
43
44 for attribute in &self.pull_attributes {
45 let attribute_dependencies = Dependencies::attribute(&attribute);
46 dependencies = Dependencies::merge(dependencies, attribute_dependencies);
47 }
48
49 dependencies
50 }
51
52 fn implement<'b, T, I, S>(
55 &self,
56 nested: &mut Iterative<'b, S, u64>,
57 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
58 context: &mut I,
59 ) -> (
60 HashMap<PathId, Stream<S, (Vec<Value>, S::Timestamp, isize)>>,
61 ShutdownHandle,
62 )
63 where
64 T: Timestamp + Lattice,
65 I: ImplContext<T>,
66 S: Scope<Timestamp = T>,
67 {
68 use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
69 use differential_dataflow::operators::JoinCore;
70 use differential_dataflow::trace::implementations::ord::OrdValSpine;
71 use differential_dataflow::trace::TraceReader;
72
73 assert_eq!(self.pull_attributes.is_empty(), false);
74
75 let (input, mut shutdown_handle) = self.plan.implement(nested, local_arrangements, context);
76
77 let e_offset = input
79 .binds(self.pull_variable)
80 .expect("input relation doesn't bind pull_variable");
81
82 let paths = {
83 let (tuples, shutdown) = input.tuples(nested, context);
84 shutdown_handle.merge_with(shutdown);
85 tuples
86 };
87
88 let e_path: Arranged<
89 Iterative<S, u64>,
90 TraceAgent<OrdValSpine<Value, Vec<Value>, Product<T, u64>, isize>>,
91 > = paths.map(move |t| (t[e_offset].clone(), t)).arrange();
92
93 let mut shutdown_handle = shutdown_handle;
94 let path_streams = self
95 .pull_attributes
96 .iter()
97 .map(|a| {
98 let e_v = match context.forward_propose(a) {
99 None => panic!("attribute {:?} does not exist", a),
100 Some(propose_trace) => {
101 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
102 let (arranged, shutdown_propose) =
103 propose_trace.import_core(&nested.parent, a);
104
105 let e_v = arranged.enter_at(nested, move |_, _, time| {
106 let mut forwarded = time.clone();
107 forwarded.advance_by(&frontier);
108 Product::new(forwarded, 0)
109 });
110
111 shutdown_handle.add_button(shutdown_propose);
112
113 e_v
114 }
115 };
116
117 let path_id: Vec<Aid> = {
118 assert_eq!(self.path_attributes.is_empty(), false);
119
120 let mut path_attributes = self.path_attributes.clone();
121 path_attributes.push(a.clone());
122 path_attributes
123 };
124
125 let path_stream = e_path
126 .join_core(&e_v, move |_e, path: &Vec<Value>, v: &Value| {
127 let mut result = path.clone();
128 result.push(v.clone());
129
130 Some(result)
131 })
132 .leave()
133 .inner;
134
135 (path_id, path_stream)
136 })
137 .collect::<HashMap<_, _>>();
138
139 (path_streams, shutdown_handle)
140 }
141}
142
143#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
146pub struct PullAll {
147 pub pull_attributes: Vec<Aid>,
149}
150
151impl PullAll {
152 fn dependencies(&self) -> Dependencies {
155 let mut dependencies = Dependencies::none();
156
157 for attribute in &self.pull_attributes {
158 let attribute_dependencies = Dependencies::attribute(&attribute);
159 dependencies = Dependencies::merge(dependencies, attribute_dependencies);
160 }
161
162 dependencies
163 }
164
165 fn implement<'b, T, I, S>(
168 &self,
169 nested: &mut Iterative<'b, S, u64>,
170 _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
171 context: &mut I,
172 ) -> (
173 HashMap<PathId, Stream<S, (Vec<Value>, S::Timestamp, isize)>>,
174 ShutdownHandle,
175 )
176 where
177 T: Timestamp + Lattice,
178 I: ImplContext<T>,
179 S: Scope<Timestamp = T>,
180 {
181 use differential_dataflow::trace::TraceReader;
182
183 assert!(!self.pull_attributes.is_empty());
184
185 let mut shutdown_handle = ShutdownHandle::empty();
186
187 let path_streams = self
188 .pull_attributes
189 .iter()
190 .map(|a| {
191 let e_v = match context.forward_propose(a) {
192 None => panic!("attribute {:?} does not exist", a),
193 Some(propose_trace) => {
194 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
195 let (arranged, shutdown_propose) =
196 propose_trace.import_core(&nested.parent, a);
197
198 let e_v = arranged.enter_at(nested, move |_, _, time| {
199 let mut forwarded = time.clone();
200 forwarded.advance_by(&frontier);
201 Product::new(forwarded, 0)
202 });
203
204 shutdown_handle.add_button(shutdown_propose);
205
206 e_v
207 }
208 };
209
210 let path_stream = e_v
211 .as_collection(|e, v| vec![e.clone(), v.clone()])
212 .leave()
213 .inner;
214
215 (vec![a.to_string()], path_stream)
216 })
217 .collect::<HashMap<_, _>>();
218
219 (path_streams, shutdown_handle)
220 }
221}
222
223#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
225pub enum Pull {
226 All(PullAll),
228 Level(PullLevel<Plan>),
230}
231
232impl Pull {
233 pub fn dependencies(&self) -> Dependencies {
236 match self {
237 Pull::All(ref pull) => pull.dependencies(),
238 Pull::Level(ref pull) => pull.dependencies(),
239 }
240 }
241
242 pub fn implement<'b, T, I, S>(
245 &self,
246 nested: &mut Iterative<'b, S, u64>,
247 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
248 context: &mut I,
249 ) -> (
250 HashMap<PathId, Stream<S, (Vec<Value>, S::Timestamp, isize)>>,
251 ShutdownHandle,
252 )
253 where
254 T: Timestamp + Lattice,
255 I: ImplContext<T>,
256 S: Scope<Timestamp = T>,
257 {
258 match self {
259 Pull::All(ref pull) => pull.implement(nested, local_arrangements, context),
260 Pull::Level(ref pull) => pull.implement(nested, local_arrangements, context),
261 }
262 }
263}