differential_dataflow/operators/arrange/arrangement.rs
1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, VecCollection, AsCollection};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33
34use trace::wrappers::enter::{TraceEnter, BatchEnter,};
35use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
36use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
37
38use super::TraceAgent;
39
40/// An arranged collection of `(K,V)` values.
41///
42/// An `Arranged` allows multiple differential operators to share the resources (communication,
43/// computation, memory) required to produce and maintain an indexed representation of a collection.
44pub struct Arranged<'scope, Tr>
45where
46 Tr: TraceReader+Clone,
47{
48 /// A stream containing arranged updates.
49 ///
50 /// This stream contains the same batches of updates the trace itself accepts, so there should
51 /// be no additional overhead to receiving these records. The batches can be navigated just as
52 /// the batches in the trace, by key and by value.
53 pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>,
54 /// A shared trace, updated by the `Arrange` operator and readable by others.
55 pub trace: Tr,
56 // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
57 // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
58}
59
60impl<'scope, Tr> Clone for Arranged<'scope, Tr>
61where
62 Tr: TraceReader + Clone,
63{
64 fn clone(&self) -> Self {
65 Arranged {
66 stream: self.stream.clone(),
67 trace: self.trace.clone(),
68 }
69 }
70}
71
72use ::timely::progress::timestamp::Refines;
73use timely::Container;
74
75impl<'scope, Tr> Arranged<'scope, Tr>
76where
77 Tr: TraceReader + Clone,
78{
79 /// Brings an arranged collection into a nested scope.
80 ///
81 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
82 /// have all been extended with an additional coordinate with the default value. The resulting collection does
83 /// not vary with the new timestamp coordinate.
84 pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter<Tr, TInner>>
85 where
86 TInner: Refines<Tr::Time>+Lattice,
87 {
88 Arranged {
89 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
90 trace: TraceEnter::make_from(self.trace),
91 }
92 }
93
94 /// Brings an arranged collection into a nested region.
95 ///
96 /// This method only applies to *regions*, which are subscopes with the same timestamp
97 /// as their containing scope. In this case, the trace type does not need to change.
98 pub fn enter_region<'inner>(self, child: Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> {
99 Arranged {
100 stream: self.stream.enter(child),
101 trace: self.trace,
102 }
103 }
104
105 /// Brings an arranged collection into a nested scope.
106 ///
107 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
108 /// have all been extended with an additional coordinate with the default value. The resulting collection does
109 /// not vary with the new timestamp coordinate.
110 pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt<Tr, TInner, F, P>>
111 where
112 TInner: Refines<Tr::Time>+Lattice+'static,
113 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
114 P: FnMut(&TInner)->Tr::Time+Clone+'static,
115 {
116 let logic1 = logic.clone();
117 let logic2 = logic.clone();
118 Arranged {
119 trace: TraceEnterAt::make_from(self.trace, logic1, prior),
120 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
121 }
122 }
123
124 /// Extracts a collection of any container from the stream of batches.
125 ///
126 /// This method is like `self.stream.flat_map`, except that it produces containers
127 /// directly, rather than form a container of containers as `flat_map` would.
128 pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<'scope, Tr::Time, I::Item>
129 where
130 I: IntoIterator<Item: Container>,
131 L: FnMut(Tr::Batch) -> I+'static,
132 {
133 self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
134 input.for_each(|time, data| {
135 let mut session = output.session(&time);
136 for wrapper in data.drain(..) {
137 for mut container in logic(wrapper) {
138 session.give_container(&mut container);
139 }
140 }
141 });
142 })
143 .as_collection()
144 }
145
146 /// Flattens the stream into a `VecCollection`.
147 ///
148 /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
149 /// and this method should only be used when the data need to be transformed or exchanged, rather than
150 /// supplied as arguments to an operator using the same key-value structure.
151 pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff>
152 where
153 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
154 {
155 self.flat_map_ref(move |key, val| Some(logic(key,val)))
156 }
157
158 /// Flattens the stream into a `VecCollection`.
159 ///
160 /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
161 /// and this method should only be used when the data need to be transformed or exchanged, rather than
162 /// supplied as arguments to an operator using the same key-value structure.
163 ///
164 /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support
165 /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic
166 /// on the reference types.
167 pub fn as_vecs<K, V>(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff>
168 where
169 K: crate::ExchangeData,
170 V: crate::ExchangeData,
171 Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
172 {
173 self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
174 }
175
176 /// Extracts elements from an arrangement as a `VecCollection`.
177 ///
178 /// The supplied logic may produce an iterator over output values, allowing either
179 /// filtering or flat mapping as part of the extraction.
180 pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
181 where
182 I: IntoIterator<Item: Data>,
183 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
184 {
185 Self::flat_map_batches(self.stream, logic)
186 }
187
188 /// Extracts elements from a stream of batches as a `VecCollection`.
189 ///
190 /// The supplied logic may produce an iterator over output values, allowing either
191 /// filtering or flat mapping as part of the extraction.
192 ///
193 /// This method exists for streams of batches without the corresponding arrangement.
194 /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
195 pub fn flat_map_batches<I, L>(stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
196 where
197 I: IntoIterator<Item: Data>,
198 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
199 {
200 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
201 input.for_each(|time, data| {
202 let mut session = output.session(&time);
203 for wrapper in data.iter() {
204 let batch = &wrapper;
205 let mut cursor = batch.cursor();
206 while let Some(key) = cursor.get_key(batch) {
207 while let Some(val) = cursor.get_val(batch) {
208 for datum in logic(key, val) {
209 cursor.map_times(batch, |time, diff| {
210 session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
211 });
212 }
213 cursor.step_val(batch);
214 }
215 cursor.step_key(batch);
216 }
217 }
218 });
219 })
220 .as_collection()
221 }
222}
223
224
225use crate::difference::Multiply;
226// Direct join implementations.
227impl<'scope, Tr1> Arranged<'scope, Tr1>
228where
229 Tr1: TraceReader + Clone + 'static,
230{
231 /// A convenience method to join and produce `VecCollection` output.
232 ///
233 /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
234 pub fn join_core<Tr2,I,L>(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,<Tr1::Diff as Multiply<Tr2::Diff>>::Output>
235 where
236 Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static,
237 Tr1::Diff: Multiply<Tr2::Diff, Output: Semigroup+'static>,
238 I: IntoIterator<Item: Data>,
239 L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static
240 {
241 let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: &Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
242 let t = t.clone();
243 let r = (r1.clone()).multiply(r2);
244 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
245 };
246
247 use crate::operators::join::join_traces;
248 join_traces::<_, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
249 self,
250 other,
251 move |k, v1, v2, t, d1, d2, c| {
252 for datum in result(k, v1, v2, t, d1, d2) {
253 c.give(datum);
254 }
255 }
256 )
257 .as_collection()
258 }
259}
260
261// Direct reduce implementations.
262use crate::difference::Abelian;
263impl<'scope, Tr1> Arranged<'scope, Tr1>
264where
265 Tr1: TraceReader + Clone + 'static,
266{
267 /// A direct implementation of `ReduceCore::reduce_abelian`.
268 pub fn reduce_abelian<L, Bu, Tr2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
269 where
270 Tr2: for<'a> Trace<
271 Key<'a>= Tr1::Key<'a>,
272 ValOwn: Data,
273 Time=Tr1::Time,
274 Diff: Abelian,
275 >+'static,
276 Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
277 L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
278 P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
279 {
280 self.reduce_core::<_,Bu,Tr2,_>(name, move |key, input, output, change| {
281 if !input.is_empty() {
282 logic(key, input, change);
283 }
284 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
285 crate::consolidation::consolidate(change);
286 }, push)
287 }
288
289 /// A direct implementation of `ReduceCore::reduce_core`.
290 pub fn reduce_core<L, Bu, Tr2, P>(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
291 where
292 Tr2: for<'a> Trace<
293 Key<'a>=Tr1::Key<'a>,
294 ValOwn: Data,
295 Time=Tr1::Time,
296 >+'static,
297 Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
298 L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
299 P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
300 {
301 use crate::operators::reduce::reduce_trace;
302 reduce_trace::<_,Bu,_,_,_>(self, name, logic, push)
303 }
304}
305
306
307impl<'scope, Tr> Arranged<'scope, Tr>
308where
309 Tr: TraceReader + Clone,
310{
311 /// Brings an arranged collection out of a nested region.
312 ///
313 /// This method only applies to *regions*, which are subscopes with the same timestamp
314 /// as their containing scope. In this case, the trace type does not need to change.
315 pub fn leave_region<'outer>(self, outer: Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> {
316 use timely::dataflow::operators::Leave;
317 Arranged {
318 stream: self.stream.leave(outer),
319 trace: self.trace,
320 }
321 }
322}
323
324/// A type that can be arranged as if a collection of updates.
325pub trait Arrange<'scope, T, C> : Sized
326where
327 T: Timestamp + Lattice,
328{
329 /// Arranges updates into a shared trace.
330 fn arrange<Ba, Bu, Tr>(self) -> Arranged<'scope, TraceAgent<Tr>>
331 where
332 Ba: Batcher<Input=C, Time=T> + 'static,
333 Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
334 Tr: Trace<Time=T> + 'static,
335 {
336 self.arrange_named::<Ba, Bu, Tr>("Arrange")
337 }
338
339 /// Arranges updates into a shared trace, with a supplied name.
340 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
341 where
342 Ba: Batcher<Input=C, Time=T> + 'static,
343 Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
344 Tr: Trace<Time=T> + 'static,
345 ;
346}
347
348/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
349///
350/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
351/// It uses the supplied parallelization contract to distribute the data, which does not need to
352/// be consistently by key (though this is the most common).
353pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
354where
355 P: ParallelizationContract<Tr::Time, Ba::Input>,
356 Ba: Batcher<Time=Tr::Time,Input: Container> + 'static,
357 Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output = Tr::Batch>,
358 Tr: Trace+'static,
359{
360 // The `Arrange` operator is tasked with reacting to an advancing input
361 // frontier by producing the sequence of batches whose lower and upper
362 // bounds are those frontiers, containing updates at times greater or
363 // equal to lower and not greater or equal to upper.
364 //
365 // The operator uses its batch type's `Batcher`, which accepts update
366 // triples and responds to requests to "seal" batches (presented as new
367 // upper frontiers).
368 //
369 // Each sealed batch is presented to the trace, and if at all possible
370 // transmitted along the outgoing channel. Empty batches may not have
371 // a corresponding capability, as they are only retained for actual data
372 // held by the batcher, which may prevents the operator from sending an
373 // empty batch.
374
375 let mut reader: Option<TraceAgent<Tr>> = None;
376
377 // fabricate a data-parallel operator using the `unary_notify` pattern.
378 let reader_ref = &mut reader;
379 let scope = stream.scope();
380
381 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
382
383 // Acquire a logger for arrange events.
384 let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
385
386 // Where we will deposit received updates, and from which we extract batches.
387 let mut batcher = Ba::new(logger.clone(), info.global_id);
388
389 // Capabilities for the lower envelope of updates in `batcher`.
390 let mut capabilities = Antichain::<Capability<Tr::Time>>::new();
391
392 let activator = Some(scope.activator_for(info.address.clone()));
393 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
394 // If there is default exertion logic set, install it.
395 if let Some(exert_logic) = scope.worker().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
396 empty_trace.set_exert_logic(exert_logic);
397 }
398
399 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
400
401 *reader_ref = Some(reader_local);
402
403 // Initialize to the minimal input frontier.
404 let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());
405
406 move |(input, frontier), output| {
407
408 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
409 // We don't have to keep all capabilities, but we need to be able to form output messages
410 // when we realize that time intervals are complete.
411
412 input.for_each(|cap, data| {
413 capabilities.insert(cap.retain(0));
414 batcher.push_container(data);
415 });
416
417 // The frontier may have advanced by multiple elements, which is an issue because
418 // timely dataflow currently only allows one capability per message. This means we
419 // must pretend to process the frontier advances one element at a time, batching
420 // and sending smaller bites than we might have otherwise done.
421
422 // Assert that the frontier never regresses.
423 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
424
425 // Test to see if strict progress has occurred, which happens whenever the new
426 // frontier isn't equal to the previous. It is only in this case that we have any
427 // data processing to do.
428 if prev_frontier.borrow() != frontier.frontier() {
429 // There are two cases to handle with some care:
430 //
431 // 1. If any held capabilities are not in advance of the new input frontier,
432 // we must carve out updates now in advance of the new input frontier and
433 // transmit them as batches, which requires appropriate *single* capabilities;
434 // Until timely dataflow supports multiple capabilities on messages, at least.
435 //
436 // 2. If there are no held capabilities in advance of the new input frontier,
437 // then there are no updates not in advance of the new input frontier and
438 // we can simply create an empty input batch with the new upper frontier
439 // and feed this to the trace agent (but not along the timely output).
440
441 // If there is at least one capability not in advance of the input frontier ...
442 if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
443
444 let mut upper = Antichain::new(); // re-used allocation for sealing batches.
445
446 // For each capability not in advance of the input frontier ...
447 for (index, capability) in capabilities.elements().iter().enumerate() {
448
449 if !frontier.less_equal(capability.time()) {
450
451 // Assemble the upper bound on times we can commit with this capabilities.
452 // We must respect the input frontier, and *subsequent* capabilities, as
453 // we are pretending to retire the capability changes one by one.
454 upper.clear();
455 for time in frontier.frontier().iter() {
456 upper.insert(time.clone());
457 }
458 for other_capability in &capabilities.elements()[(index + 1) .. ] {
459 upper.insert(other_capability.time().clone());
460 }
461
462 // Extract updates not in advance of `upper`.
463 let batch = batcher.seal::<Bu>(upper.clone());
464
465 writer.insert(batch.clone(), Some(capability.time().clone()));
466
467 // send the batch to downstream consumers, empty or not.
468 output.session(&capabilities.elements()[index]).give(batch);
469 }
470 }
471
472 // Having extracted and sent batches between each capability and the input frontier,
473 // we should downgrade all capabilities to match the batcher's lower update frontier.
474 // This may involve discarding capabilities, which is fine as any new updates arrive
475 // in messages with new capabilities.
476
477 let mut new_capabilities = Antichain::new();
478 for time in batcher.frontier().iter() {
479 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
480 new_capabilities.insert(capability.delayed(time));
481 }
482 else {
483 panic!("failed to find capability");
484 }
485 }
486
487 capabilities = new_capabilities;
488 }
489 else {
490 // Announce progress updates, even without data.
491 let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
492 writer.seal(frontier.frontier().to_owned());
493 }
494
495 prev_frontier.clear();
496 prev_frontier.extend(frontier.frontier().iter().cloned());
497 }
498
499 writer.exert();
500 }
501 });
502
503 Arranged { stream, trace: reader.unwrap() }
504}