differential_dataflow/operators/arrange/arrangement.rs
1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::container::{ContainerBuilder, PushInto};
28use timely::dataflow::operators::Capability;
29
30use crate::{Data, VecCollection, AsCollection};
31use crate::difference::Semigroup;
32use crate::lattice::Lattice;
33use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38
39use super::TraceAgent;
40
41/// An arranged collection of `(K,V)` values.
42///
43/// An `Arranged` allows multiple differential operators to share the resources (communication,
44/// computation, memory) required to produce and maintain an indexed representation of a collection.
45pub struct Arranged<'scope, Tr: TraceReader> {
46 /// A stream containing arranged updates.
47 ///
48 /// This stream contains the same batches of updates the trace itself accepts, so there should
49 /// be no additional overhead to receiving these records. The batches can be navigated just as
50 /// the batches in the trace, by key and by value.
51 pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>,
52 /// A shared trace, updated by the `Arrange` operator and readable by others.
53 pub trace: Tr,
54}
55
56impl<'scope, Tr: TraceReader+Clone> Clone for Arranged<'scope, Tr> {
57 fn clone(&self) -> Self {
58 Arranged {
59 stream: self.stream.clone(),
60 trace: self.trace.clone(),
61 }
62 }
63}
64
65use ::timely::progress::timestamp::Refines;
66use timely::Container;
67
68impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> {
69 /// Brings an arranged collection into a nested scope.
70 ///
71 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
72 /// have all been extended with an additional coordinate with the default value. The resulting collection does
73 /// not vary with the new timestamp coordinate.
74 pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter<Tr, TInner>>
75 where
76 TInner: Refines<Tr::Time>+Lattice,
77 {
78 Arranged {
79 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
80 trace: TraceEnter::make_from(self.trace),
81 }
82 }
83
84 /// Brings an arranged collection into a nested region.
85 ///
86 /// This method only applies to *regions*, which are subscopes with the same timestamp
87 /// as their containing scope. In this case, the trace type does not need to change.
88 pub fn enter_region<'inner>(self, child: Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> {
89 Arranged {
90 stream: self.stream.enter(child),
91 trace: self.trace,
92 }
93 }
94
95 /// Brings an arranged collection into a nested scope.
96 ///
97 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
98 /// have all been extended with an additional coordinate with the default value. The resulting collection does
99 /// not vary with the new timestamp coordinate.
100 pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt<Tr, TInner, F, P>>
101 where
102 TInner: Refines<Tr::Time>+Lattice+'static,
103 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
104 P: FnMut(&TInner)->Tr::Time+Clone+'static,
105 {
106 let logic1 = logic.clone();
107 let logic2 = logic.clone();
108 Arranged {
109 trace: TraceEnterAt::make_from(self.trace, logic1, prior),
110 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
111 }
112 }
113
114 /// Extracts a collection of any container from the stream of batches.
115 ///
116 /// This method is like `self.stream.flat_map`, except that it produces containers
117 /// directly, rather than form a container of containers as `flat_map` would.
118 pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<'scope, Tr::Time, I::Item>
119 where
120 I: IntoIterator<Item: Container>,
121 L: FnMut(Tr::Batch) -> I+'static,
122 {
123 self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
124 input.for_each(|time, data| {
125 let mut session = output.session(&time);
126 for wrapper in data.drain(..) {
127 for mut container in logic(wrapper) {
128 session.give_container(&mut container);
129 }
130 }
131 });
132 })
133 .as_collection()
134 }
135
136 /// Flattens the stream into a `VecCollection`.
137 ///
138 /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
139 /// and this method should only be used when the data need to be transformed or exchanged, rather than
140 /// supplied as arguments to an operator using the same key-value structure.
141 pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff>
142 where
143 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
144 {
145 self.flat_map_ref(move |key, val| Some(logic(key,val)))
146 }
147
148 /// Flattens the stream into a `VecCollection`.
149 ///
150 /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
151 /// and this method should only be used when the data need to be transformed or exchanged, rather than
152 /// supplied as arguments to an operator using the same key-value structure.
153 ///
154 /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support
155 /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic
156 /// on the reference types.
157 pub fn as_vecs<K, V>(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff>
158 where
159 K: crate::ExchangeData,
160 V: crate::ExchangeData,
161 Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
162 {
163 self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
164 }
165
166 /// Extracts elements from an arrangement as a `VecCollection`.
167 ///
168 /// The supplied logic may produce an iterator over output values, allowing either
169 /// filtering or flat mapping as part of the extraction.
170 pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
171 where
172 I: IntoIterator<Item: Data>,
173 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
174 {
175 Self::flat_map_batches(self.stream, logic)
176 }
177
178 /// Extracts elements from a stream of batches as a `VecCollection`.
179 ///
180 /// The supplied logic may produce an iterator over output values, allowing either
181 /// filtering or flat mapping as part of the extraction.
182 ///
183 /// This method exists for streams of batches without the corresponding arrangement.
184 /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
185 pub fn flat_map_batches<I, L>(stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
186 where
187 I: IntoIterator<Item: Data>,
188 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
189 {
190 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
191 input.for_each(|time, data| {
192 let mut session = output.session(&time);
193 for wrapper in data.iter() {
194 let batch = &wrapper;
195 let mut cursor = batch.cursor();
196 while let Some(key) = cursor.get_key(batch) {
197 while let Some(val) = cursor.get_val(batch) {
198 for datum in logic(key, val) {
199 cursor.map_times(batch, |time, diff| {
200 session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
201 });
202 }
203 cursor.step_val(batch);
204 }
205 cursor.step_key(batch);
206 }
207 }
208 });
209 })
210 .as_collection()
211 }
212}
213
214
215use crate::difference::Multiply;
216// Direct join implementations.
217impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> {
218 /// A convenience method to join and produce `VecCollection` output.
219 ///
220 /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
221 pub fn join_core<Tr2,I,L>(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,<Tr1::Diff as Multiply<Tr2::Diff>>::Output>
222 where
223 Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static,
224 Tr1::Diff: Multiply<Tr2::Diff, Output: Semigroup+'static>,
225 I: IntoIterator<Item: Data>,
226 L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static
227 {
228 let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
229 let r = (r1.clone()).multiply(r2);
230 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
231 };
232
233 use crate::operators::join::join_traces;
234 join_traces::<_, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
235 self,
236 other,
237 move |k, v1, v2, t, d1, d2, c| {
238 for datum in result(k, v1, v2, t, d1, d2) {
239 c.give(datum);
240 }
241 }
242 )
243 .as_collection()
244 }
245}
246
247// Direct reduce implementations.
248use crate::difference::Abelian;
249impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> {
250 /// A direct implementation of `ReduceCore::reduce_abelian`.
251 pub fn reduce_abelian<L, Bu, Tr2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
252 where
253 Tr2: for<'a> Trace<
254 Key<'a>= Tr1::Key<'a>,
255 ValOwn: Data,
256 Time=Tr1::Time,
257 Diff: Abelian,
258 >+'static,
259 Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
260 L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
261 P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
262 {
263 self.reduce_core::<_,Bu,Tr2,_>(name, move |key, input, output, change| {
264 if !input.is_empty() {
265 logic(key, input, change);
266 }
267 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
268 crate::consolidation::consolidate(change);
269 }, push)
270 }
271
272 /// A direct implementation of `ReduceCore::reduce_core`.
273 pub fn reduce_core<L, Bu, Tr2, P>(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
274 where
275 Tr2: for<'a> Trace<
276 Key<'a>=Tr1::Key<'a>,
277 ValOwn: Data,
278 Time=Tr1::Time,
279 >+'static,
280 Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
281 L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
282 P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
283 {
284 use crate::operators::reduce::reduce_trace;
285 reduce_trace::<_,Bu,_,_,_>(self, name, logic, push)
286 }
287}
288
289
290impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> {
291 /// Brings an arranged collection out of a nested region.
292 ///
293 /// This method only applies to *regions*, which are subscopes with the same timestamp
294 /// as their containing scope. In this case, the trace type does not need to change.
295 pub fn leave_region<'outer>(self, outer: Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> {
296 use timely::dataflow::operators::Leave;
297 Arranged {
298 stream: self.stream.leave(outer),
299 trace: self.trace,
300 }
301 }
302}
303
304/// A type that can be arranged as if a collection of updates.
305pub trait Arrange<'scope, T: Timestamp+Lattice, C> : Sized {
306 /// Arranges updates into a shared trace.
307 ///
308 /// The batcher's output container must equal the stream container `C`; the default
309 /// chunker only consolidates same-type containers. For chunker setups that convert
310 /// between container types (e.g. columnar layouts), call [`arrange_core`] directly.
311 fn arrange<Ba, Bu, Tr>(self) -> Arranged<'scope, TraceAgent<Tr>>
312 where
313 Ba: Batcher<Output=C, Time=T> + 'static,
314 Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
315 Tr: Trace<Time=T> + 'static,
316 {
317 self.arrange_named::<Ba, Bu, Tr>("Arrange")
318 }
319
320 /// Arranges updates into a shared trace, with a supplied name.
321 ///
322 /// See [`Arrange::arrange`] for constraints on the batcher's output container.
323 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
324 where
325 Ba: Batcher<Output=C, Time=T> + 'static,
326 Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
327 Tr: Trace<Time=T> + 'static,
328 ;
329}
330
331/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
332///
333/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
334/// It uses the supplied parallelization contract to distribute the data, which does not need to
335/// be consistently by key (though this is the most common).
336pub fn arrange_core<'scope, P, C, Chu, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, C>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
337where
338 C: Container + Clone + 'static,
339 P: ParallelizationContract<Tr::Time, C>,
340 Chu: ContainerBuilder<Container=Ba::Output> + for<'a> PushInto<&'a mut C> + 'static,
341 Ba: Batcher<Time=Tr::Time> + 'static,
342 Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output = Tr::Batch>,
343 Tr: Trace+'static,
344{
345 // The `Arrange` operator is tasked with reacting to an advancing input
346 // frontier by producing the sequence of batches whose lower and upper
347 // bounds are those frontiers, containing updates at times greater or
348 // equal to lower and not greater or equal to upper.
349 //
350 // The operator uses its batch type's `Batcher`, which accepts update
351 // triples and responds to requests to "seal" batches (presented as new
352 // upper frontiers).
353 //
354 // Each sealed batch is presented to the trace, and if at all possible
355 // transmitted along the outgoing channel. Empty batches may not have
356 // a corresponding capability, as they are only retained for actual data
357 // held by the batcher, which may prevents the operator from sending an
358 // empty batch.
359
360 let mut reader: Option<TraceAgent<Tr>> = None;
361
362 // fabricate a data-parallel operator using the `unary_notify` pattern.
363 let reader_ref = &mut reader;
364 let scope = stream.scope();
365
366 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
367
368 // Acquire a logger for arrange events.
369 let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
370
371 // Where we will deposit received updates, and from which we extract batches.
372 let mut batcher = Ba::new(logger.clone(), info.global_id);
373
374 // Capabilities for the lower envelope of updates in `batcher`.
375 let mut capabilities = Antichain::<Capability<Tr::Time>>::new();
376
377 let activator = Some(scope.activator_for(std::rc::Rc::clone(&info.address)));
378 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
379 // If there is default exertion logic set, install it.
380 if let Some(exert_logic) = scope.worker().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
381 empty_trace.set_exert_logic(exert_logic);
382 }
383
384 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
385
386 *reader_ref = Some(reader_local);
387
388 // Initialize to the minimal input frontier.
389 let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());
390
391 let mut chunker = Chu::default();
392
393 move |(input, frontier), output| {
394
395 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
396 // We don't have to keep all capabilities, but we need to be able to form output messages
397 // when we realize that time intervals are complete.
398
399 input.for_each(|cap, data| {
400 capabilities.insert(cap.retain(0));
401 chunker.push_into(data);
402 while let Some(chunk) = chunker.extract() {
403 batcher.push_into(std::mem::take(chunk));
404 }
405 });
406
407 // The frontier may have advanced by multiple elements, which is an issue because
408 // timely dataflow currently only allows one capability per message. This means we
409 // must pretend to process the frontier advances one element at a time, batching
410 // and sending smaller bites than we might have otherwise done.
411
412 // Assert that the frontier never regresses.
413 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
414
415 // Test to see if strict progress has occurred, which happens whenever the new
416 // frontier isn't equal to the previous. It is only in this case that we have any
417 // data processing to do.
418 if prev_frontier.borrow() != frontier.frontier() {
419 // Flush any data the chunker is still accumulating into the batcher before we
420 // seal. The batcher only sees chunks the chunker has emitted; without this drain
421 // a partial final chunk would never reach the batcher.
422 while let Some(chunk) = chunker.finish() {
423 batcher.push_into(std::mem::take(chunk));
424 }
425
426 // There are two cases to handle with some care:
427 //
428 // 1. If any held capabilities are not in advance of the new input frontier,
429 // we must carve out updates now in advance of the new input frontier and
430 // transmit them as batches, which requires appropriate *single* capabilities;
431 // Until timely dataflow supports multiple capabilities on messages, at least.
432 //
433 // 2. If there are no held capabilities in advance of the new input frontier,
434 // then there are no updates not in advance of the new input frontier and
435 // we can simply create an empty input batch with the new upper frontier
436 // and feed this to the trace agent (but not along the timely output).
437
438 // If there is at least one capability not in advance of the input frontier ...
439 if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
440
441 let mut upper = Antichain::new(); // re-used allocation for sealing batches.
442
443 // For each capability not in advance of the input frontier ...
444 for (index, capability) in capabilities.elements().iter().enumerate() {
445
446 if !frontier.less_equal(capability.time()) {
447
448 // Assemble the upper bound on times we can commit with this capabilities.
449 // We must respect the input frontier, and *subsequent* capabilities, as
450 // we are pretending to retire the capability changes one by one.
451 upper.clear();
452 for time in frontier.frontier().iter() {
453 upper.insert(time.clone());
454 }
455 for other_capability in &capabilities.elements()[(index + 1) .. ] {
456 upper.insert(other_capability.time().clone());
457 }
458
459 // Extract updates not in advance of `upper`.
460 let (mut chain, description) = batcher.seal(upper.clone());
461 let batch = Bu::seal(&mut chain, description);
462
463 writer.insert(batch.clone(), Some(capability.time().clone()));
464
465 // send the batch to downstream consumers, empty or not.
466 output.session(&capabilities.elements()[index]).give(batch);
467 }
468 }
469
470 // Having extracted and sent batches between each capability and the input frontier,
471 // we should downgrade all capabilities to match the batcher's lower update frontier.
472 // This may involve discarding capabilities, which is fine as any new updates arrive
473 // in messages with new capabilities.
474
475 let mut new_capabilities = Antichain::new();
476 for time in batcher.frontier().iter() {
477 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
478 new_capabilities.insert(capability.delayed(time));
479 }
480 else {
481 panic!("failed to find capability");
482 }
483 }
484
485 capabilities = new_capabilities;
486 }
487 else {
488 // Announce progress updates, even without data. We seal the batcher to
489 // advance its lower bound and frontier, but discard the readied updates
490 // rather than building a batch we would immediately drop.
491 let _ = batcher.seal(frontier.frontier().to_owned());
492 writer.seal(frontier.frontier().to_owned());
493 }
494
495 prev_frontier.clear();
496 prev_frontier.extend(frontier.frontier().iter().cloned());
497 }
498
499 writer.exert();
500 }
501 });
502
503 Arranged { stream, trace: reader.unwrap() }
504}