differential_dataflow/operators/arrange/arrangement.rs
1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, VecCollection, AsCollection};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::merge_batcher::container::InternalMerge;
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38
39use super::TraceAgent;
40
41/// An arranged collection of `(K,V)` values.
42///
43/// An `Arranged` allows multiple differential operators to share the resources (communication,
44/// computation, memory) required to produce and maintain an indexed representation of a collection.
45pub struct Arranged<G, Tr>
46where
47 G: Scope<Timestamp: Lattice+Ord>,
48 Tr: TraceReader+Clone,
49{
50 /// A stream containing arranged updates.
51 ///
52 /// This stream contains the same batches of updates the trace itself accepts, so there should
53 /// be no additional overhead to receiving these records. The batches can be navigated just as
54 /// the batches in the trace, by key and by value.
55 pub stream: Stream<G, Vec<Tr::Batch>>,
56 /// A shared trace, updated by the `Arrange` operator and readable by others.
57 pub trace: Tr,
58 // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
59 // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
60}
61
62impl<G, Tr> Clone for Arranged<G, Tr>
63where
64 G: Scope<Timestamp=Tr::Time>,
65 Tr: TraceReader + Clone,
66{
67 fn clone(&self) -> Self {
68 Arranged {
69 stream: self.stream.clone(),
70 trace: self.trace.clone(),
71 }
72 }
73}
74
75use ::timely::dataflow::scopes::Child;
76use ::timely::progress::timestamp::Refines;
77use timely::Container;
78use timely::container::PushInto;
79
80impl<G, Tr> Arranged<G, Tr>
81where
82 G: Scope<Timestamp=Tr::Time>,
83 Tr: TraceReader + Clone,
84{
85 /// Brings an arranged collection into a nested scope.
86 ///
87 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
88 /// have all been extended with an additional coordinate with the default value. The resulting collection does
89 /// not vary with the new timestamp coordinate.
90 pub fn enter<'a, TInner>(self, child: &Child<'a, G, TInner>)
91 -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
92 where
93 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
94 {
95 Arranged {
96 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
97 trace: TraceEnter::make_from(self.trace),
98 }
99 }
100
101 /// Brings an arranged collection into a nested region.
102 ///
103 /// This method only applies to *regions*, which are subscopes with the same timestamp
104 /// as their containing scope. In this case, the trace type does not need to change.
105 pub fn enter_region<'a>(self, child: &Child<'a, G, G::Timestamp>)
106 -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
107 Arranged {
108 stream: self.stream.enter(child),
109 trace: self.trace,
110 }
111 }
112
113 /// Brings an arranged collection into a nested scope.
114 ///
115 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
116 /// have all been extended with an additional coordinate with the default value. The resulting collection does
117 /// not vary with the new timestamp coordinate.
118 pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G, TInner>, logic: F, prior: P)
119 -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
120 where
121 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
122 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
123 P: FnMut(&TInner)->Tr::Time+Clone+'static,
124 {
125 let logic1 = logic.clone();
126 let logic2 = logic.clone();
127 Arranged {
128 trace: TraceEnterAt::make_from(self.trace, logic1, prior),
129 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
130 }
131 }
132
133 /// Extracts a collection of any container from the stream of batches.
134 ///
135 /// This method is like `self.stream.flat_map`, except that it produces containers
136 /// directly, rather than form a container of containers as `flat_map` would.
137 pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<G, I::Item>
138 where
139 I: IntoIterator<Item: Container>,
140 L: FnMut(Tr::Batch) -> I+'static,
141 {
142 self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
143 input.for_each(|time, data| {
144 let mut session = output.session(&time);
145 for wrapper in data.drain(..) {
146 for mut container in logic(wrapper) {
147 session.give_container(&mut container);
148 }
149 }
150 });
151 })
152 .as_collection()
153 }
154
155 /// Flattens the stream into a `VecCollection`.
156 ///
157 /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
158 /// and this method should only be used when the data need to be transformed or exchanged, rather than
159 /// supplied as arguments to an operator using the same key-value structure.
160 pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
161 where
162 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
163 {
164 self.flat_map_ref(move |key, val| Some(logic(key,val)))
165 }
166
167 /// Flattens the stream into a `VecCollection`.
168 ///
169 /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
170 /// and this method should only be used when the data need to be transformed or exchanged, rather than
171 /// supplied as arguments to an operator using the same key-value structure.
172 pub fn as_vecs(self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
173 where
174 Tr::KeyOwn: crate::ExchangeData,
175 Tr::ValOwn: crate::ExchangeData,
176 {
177 self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
178 }
179
180 /// Extracts elements from an arrangement as a `VecCollection`.
181 ///
182 /// The supplied logic may produce an iterator over output values, allowing either
183 /// filtering or flat mapping as part of the extraction.
184 pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
185 where
186 I: IntoIterator<Item: Data>,
187 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
188 {
189 Self::flat_map_batches(self.stream, logic)
190 }
191
192 /// Extracts elements from a stream of batches as a `VecCollection`.
193 ///
194 /// The supplied logic may produce an iterator over output values, allowing either
195 /// filtering or flat mapping as part of the extraction.
196 ///
197 /// This method exists for streams of batches without the corresponding arrangement.
198 /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
199 pub fn flat_map_batches<I, L>(stream: Stream<G, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<G, I::Item, Tr::Diff>
200 where
201 I: IntoIterator<Item: Data>,
202 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
203 {
204 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
205 input.for_each(|time, data| {
206 let mut session = output.session(&time);
207 for wrapper in data.iter() {
208 let batch = &wrapper;
209 let mut cursor = batch.cursor();
210 while let Some(key) = cursor.get_key(batch) {
211 while let Some(val) = cursor.get_val(batch) {
212 for datum in logic(key, val) {
213 cursor.map_times(batch, |time, diff| {
214 session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
215 });
216 }
217 cursor.step_val(batch);
218 }
219 cursor.step_key(batch);
220 }
221 }
222 });
223 })
224 .as_collection()
225 }
226}
227
228
229use crate::difference::Multiply;
230// Direct join implementations.
231impl<G, T1> Arranged<G, T1>
232where
233 G: Scope<Timestamp=T1::Time>,
234 T1: TraceReader + Clone + 'static,
235{
236 /// A convenience method to join and produce `VecCollection` output.
237 ///
238 /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
239 pub fn join_core<T2,I,L>(self, other: Arranged<G,T2>, mut result: L) -> VecCollection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
240 where
241 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
242 T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
243 I: IntoIterator<Item: Data>,
244 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
245 {
246 let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
247 let t = t.clone();
248 let r = (r1.clone()).multiply(r2);
249 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
250 };
251
252 use crate::operators::join::join_traces;
253 join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
254 self,
255 other,
256 move |k, v1, v2, t, d1, d2, c| {
257 for datum in result(k, v1, v2, t, d1, d2) {
258 c.give(datum);
259 }
260 }
261 )
262 .as_collection()
263 }
264}
265
266// Direct reduce implementations.
267use crate::difference::Abelian;
268impl<G, T1> Arranged<G, T1>
269where
270 G: Scope<Timestamp = T1::Time>,
271 T1: TraceReader + Clone + 'static,
272{
273 /// A direct implementation of `ReduceCore::reduce_abelian`.
274 pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
275 where
276 T1: TraceReader<KeyOwn: Ord>,
277 T2: for<'a> Trace<
278 Key<'a>= T1::Key<'a>,
279 KeyOwn=T1::KeyOwn,
280 ValOwn: Data,
281 Time=T1::Time,
282 Diff: Abelian,
283 >+'static,
284 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
285 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
286 {
287 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
288 if !input.is_empty() {
289 logic(key, input, change);
290 }
291 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
292 crate::consolidation::consolidate(change);
293 })
294 }
295
296 /// A direct implementation of `ReduceCore::reduce_core`.
297 pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
298 where
299 T1: TraceReader<KeyOwn: Ord>,
300 T2: for<'a> Trace<
301 Key<'a>=T1::Key<'a>,
302 KeyOwn=T1::KeyOwn,
303 ValOwn: Data,
304 Time=T1::Time,
305 >+'static,
306 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
307 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
308 {
309 use crate::operators::reduce::reduce_trace;
310 reduce_trace::<_,_,Bu,_,_>(self, name, logic)
311 }
312}
313
314
315impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
316where
317 G: Scope<Timestamp=Tr::Time>,
318 Tr: TraceReader + Clone,
319{
320 /// Brings an arranged collection out of a nested region.
321 ///
322 /// This method only applies to *regions*, which are subscopes with the same timestamp
323 /// as their containing scope. In this case, the trace type does not need to change.
324 pub fn leave_region(self) -> Arranged<G, Tr> {
325 use timely::dataflow::operators::Leave;
326 Arranged {
327 stream: self.stream.leave(),
328 trace: self.trace,
329 }
330 }
331}
332
333/// A type that can be arranged as if a collection of updates.
334pub trait Arrange<G, C> : Sized
335where
336 G: Scope<Timestamp: Lattice>,
337{
338 /// Arranges updates into a shared trace.
339 fn arrange<Ba, Bu, Tr>(self) -> Arranged<G, TraceAgent<Tr>>
340 where
341 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
342 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
343 Tr: Trace<Time=G::Timestamp> + 'static,
344 {
345 self.arrange_named::<Ba, Bu, Tr>("Arrange")
346 }
347
348 /// Arranges updates into a shared trace, with a supplied name.
349 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
350 where
351 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
352 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
353 Tr: Trace<Time=G::Timestamp> + 'static,
354 ;
355}
356
357/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
358///
359/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
360/// It uses the supplied parallelization contract to distribute the data, which does not need to
361/// be consistently by key (though this is the most common).
362pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: Stream<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
363where
364 G: Scope<Timestamp: Lattice>,
365 P: ParallelizationContract<G::Timestamp, Ba::Input>,
366 Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
367 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
368 Tr: Trace<Time=G::Timestamp>+'static,
369{
370 // The `Arrange` operator is tasked with reacting to an advancing input
371 // frontier by producing the sequence of batches whose lower and upper
372 // bounds are those frontiers, containing updates at times greater or
373 // equal to lower and not greater or equal to upper.
374 //
375 // The operator uses its batch type's `Batcher`, which accepts update
376 // triples and responds to requests to "seal" batches (presented as new
377 // upper frontiers).
378 //
379 // Each sealed batch is presented to the trace, and if at all possible
380 // transmitted along the outgoing channel. Empty batches may not have
381 // a corresponding capability, as they are only retained for actual data
382 // held by the batcher, which may prevents the operator from sending an
383 // empty batch.
384
385 let mut reader: Option<TraceAgent<Tr>> = None;
386
387 // fabricate a data-parallel operator using the `unary_notify` pattern.
388 let reader_ref = &mut reader;
389 let scope = stream.scope();
390
391 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
392
393 // Acquire a logger for arrange events.
394 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
395
396 // Where we will deposit received updates, and from which we extract batches.
397 let mut batcher = Ba::new(logger.clone(), info.global_id);
398
399 // Capabilities for the lower envelope of updates in `batcher`.
400 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
401
402 let activator = Some(scope.activator_for(info.address.clone()));
403 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
404 // If there is default exertion logic set, install it.
405 if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
406 empty_trace.set_exert_logic(exert_logic);
407 }
408
409 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
410
411 *reader_ref = Some(reader_local);
412
413 // Initialize to the minimal input frontier.
414 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
415
416 move |(input, frontier), output| {
417
418 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
419 // We don't have to keep all capabilities, but we need to be able to form output messages
420 // when we realize that time intervals are complete.
421
422 input.for_each(|cap, data| {
423 capabilities.insert(cap.retain(0));
424 batcher.push_container(data);
425 });
426
427 // The frontier may have advanced by multiple elements, which is an issue because
428 // timely dataflow currently only allows one capability per message. This means we
429 // must pretend to process the frontier advances one element at a time, batching
430 // and sending smaller bites than we might have otherwise done.
431
432 // Assert that the frontier never regresses.
433 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
434
435 // Test to see if strict progress has occurred, which happens whenever the new
436 // frontier isn't equal to the previous. It is only in this case that we have any
437 // data processing to do.
438 if prev_frontier.borrow() != frontier.frontier() {
439 // There are two cases to handle with some care:
440 //
441 // 1. If any held capabilities are not in advance of the new input frontier,
442 // we must carve out updates now in advance of the new input frontier and
443 // transmit them as batches, which requires appropriate *single* capabilities;
444 // Until timely dataflow supports multiple capabilities on messages, at least.
445 //
446 // 2. If there are no held capabilities in advance of the new input frontier,
447 // then there are no updates not in advance of the new input frontier and
448 // we can simply create an empty input batch with the new upper frontier
449 // and feed this to the trace agent (but not along the timely output).
450
451 // If there is at least one capability not in advance of the input frontier ...
452 if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
453
454 let mut upper = Antichain::new(); // re-used allocation for sealing batches.
455
456 // For each capability not in advance of the input frontier ...
457 for (index, capability) in capabilities.elements().iter().enumerate() {
458
459 if !frontier.less_equal(capability.time()) {
460
461 // Assemble the upper bound on times we can commit with this capabilities.
462 // We must respect the input frontier, and *subsequent* capabilities, as
463 // we are pretending to retire the capability changes one by one.
464 upper.clear();
465 for time in frontier.frontier().iter() {
466 upper.insert(time.clone());
467 }
468 for other_capability in &capabilities.elements()[(index + 1) .. ] {
469 upper.insert(other_capability.time().clone());
470 }
471
472 // Extract updates not in advance of `upper`.
473 let batch = batcher.seal::<Bu>(upper.clone());
474
475 writer.insert(batch.clone(), Some(capability.time().clone()));
476
477 // send the batch to downstream consumers, empty or not.
478 output.session(&capabilities.elements()[index]).give(batch);
479 }
480 }
481
482 // Having extracted and sent batches between each capability and the input frontier,
483 // we should downgrade all capabilities to match the batcher's lower update frontier.
484 // This may involve discarding capabilities, which is fine as any new updates arrive
485 // in messages with new capabilities.
486
487 let mut new_capabilities = Antichain::new();
488 for time in batcher.frontier().iter() {
489 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
490 new_capabilities.insert(capability.delayed(time));
491 }
492 else {
493 panic!("failed to find capability");
494 }
495 }
496
497 capabilities = new_capabilities;
498 }
499 else {
500 // Announce progress updates, even without data.
501 let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
502 writer.seal(frontier.frontier().to_owned());
503 }
504
505 prev_frontier.clear();
506 prev_frontier.extend(frontier.frontier().iter().cloned());
507 }
508
509 writer.exert();
510 }
511 });
512
513 Arranged { stream, trace: reader.unwrap() }
514}