differential_dataflow/operators/arrange/arrangement.rs
1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, VecCollection, AsCollection};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::merge_batcher::container::MergerChunk;
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38
39use super::TraceAgent;
40
41/// An arranged collection of `(K,V)` values.
42///
43/// An `Arranged` allows multiple differential operators to share the resources (communication,
44/// computation, memory) required to produce and maintain an indexed representation of a collection.
45pub struct Arranged<G, Tr>
46where
47 G: Scope<Timestamp: Lattice+Ord>,
48 Tr: TraceReader+Clone,
49{
50 /// A stream containing arranged updates.
51 ///
52 /// This stream contains the same batches of updates the trace itself accepts, so there should
53 /// be no additional overhead to receiving these records. The batches can be navigated just as
54 /// the batches in the trace, by key and by value.
55 pub stream: Stream<G, Vec<Tr::Batch>>,
56 /// A shared trace, updated by the `Arrange` operator and readable by others.
57 pub trace: Tr,
58 // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
59 // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
60}
61
62impl<G, Tr> Clone for Arranged<G, Tr>
63where
64 G: Scope<Timestamp=Tr::Time>,
65 Tr: TraceReader + Clone,
66{
67 fn clone(&self) -> Self {
68 Arranged {
69 stream: self.stream.clone(),
70 trace: self.trace.clone(),
71 }
72 }
73}
74
75use ::timely::dataflow::scopes::Child;
76use ::timely::progress::timestamp::Refines;
77use timely::Container;
78use timely::container::PushInto;
79
80impl<G, Tr> Arranged<G, Tr>
81where
82 G: Scope<Timestamp=Tr::Time>,
83 Tr: TraceReader + Clone,
84{
85 /// Brings an arranged collection into a nested scope.
86 ///
87 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
88 /// have all been extended with an additional coordinate with the default value. The resulting collection does
89 /// not vary with the new timestamp coordinate.
90 pub fn enter<'a, TInner>(self, child: &Child<'a, G, TInner>)
91 -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
92 where
93 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
94 {
95 Arranged {
96 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
97 trace: TraceEnter::make_from(self.trace),
98 }
99 }
100
101 /// Brings an arranged collection into a nested region.
102 ///
103 /// This method only applies to *regions*, which are subscopes with the same timestamp
104 /// as their containing scope. In this case, the trace type does not need to change.
105 pub fn enter_region<'a>(self, child: &Child<'a, G, G::Timestamp>)
106 -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
107 Arranged {
108 stream: self.stream.enter(child),
109 trace: self.trace,
110 }
111 }
112
113 /// Brings an arranged collection into a nested scope.
114 ///
115 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
116 /// have all been extended with an additional coordinate with the default value. The resulting collection does
117 /// not vary with the new timestamp coordinate.
118 pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G, TInner>, logic: F, prior: P)
119 -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
120 where
121 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
122 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
123 P: FnMut(&TInner)->Tr::Time+Clone+'static,
124 {
125 let logic1 = logic.clone();
126 let logic2 = logic.clone();
127 Arranged {
128 trace: TraceEnterAt::make_from(self.trace, logic1, prior),
129 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
130 }
131 }
132
133 /// Flattens the stream into a `Collection`.
134 ///
135 /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
136 /// and this method should only be used when the data need to be transformed or exchanged, rather than
137 /// supplied as arguments to an operator using the same key-value structure.
138 pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
139 where
140 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
141 {
142 self.flat_map_ref(move |key, val| Some(logic(key,val)))
143 }
144
145 /// Flattens the stream into a `Collection`.
146 ///
147 /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
148 /// and this method should only be used when the data need to be transformed or exchanged, rather than
149 /// supplied as arguments to an operator using the same key-value structure.
150 pub fn as_vecs(self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
151 where
152 Tr::KeyOwn: crate::ExchangeData,
153 Tr::ValOwn: crate::ExchangeData,
154 {
155 self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
156 }
157
158 /// Extracts elements from an arrangement as a collection.
159 ///
160 /// The supplied logic may produce an iterator over output values, allowing either
161 /// filtering or flat mapping as part of the extraction.
162 pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
163 where
164 I: IntoIterator<Item: Data>,
165 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
166 {
167 Self::flat_map_batches(self.stream, logic)
168 }
169
170 /// Extracts elements from a stream of batches as a collection.
171 ///
172 /// The supplied logic may produce an iterator over output values, allowing either
173 /// filtering or flat mapping as part of the extraction.
174 ///
175 /// This method exists for streams of batches without the corresponding arrangement.
176 /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
177 pub fn flat_map_batches<I, L>(stream: Stream<G, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<G, I::Item, Tr::Diff>
178 where
179 I: IntoIterator<Item: Data>,
180 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
181 {
182 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
183 input.for_each(|time, data| {
184 let mut session = output.session(&time);
185 for wrapper in data.iter() {
186 let batch = &wrapper;
187 let mut cursor = batch.cursor();
188 while let Some(key) = cursor.get_key(batch) {
189 while let Some(val) = cursor.get_val(batch) {
190 for datum in logic(key, val) {
191 cursor.map_times(batch, |time, diff| {
192 session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
193 });
194 }
195 cursor.step_val(batch);
196 }
197 cursor.step_key(batch);
198 }
199 }
200 });
201 })
202 .as_collection()
203 }
204}
205
206
207use crate::difference::Multiply;
208// Direct join implementations.
209impl<G, T1> Arranged<G, T1>
210where
211 G: Scope<Timestamp=T1::Time>,
212 T1: TraceReader + Clone + 'static,
213{
214 /// A convenience method to join and produce `VecCollection` output.
215 ///
216 /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
217 pub fn join_core<T2,I,L>(self, other: Arranged<G,T2>, mut result: L) -> VecCollection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
218 where
219 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
220 T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
221 I: IntoIterator<Item: Data>,
222 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
223 {
224 let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
225 let t = t.clone();
226 let r = (r1.clone()).multiply(r2);
227 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
228 };
229
230 use crate::operators::join::join_traces;
231 join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
232 self,
233 other,
234 move |k, v1, v2, t, d1, d2, c| {
235 for datum in result(k, v1, v2, t, d1, d2) {
236 c.give(datum);
237 }
238 }
239 )
240 .as_collection()
241 }
242}
243
244// Direct reduce implementations.
245use crate::difference::Abelian;
246impl<G, T1> Arranged<G, T1>
247where
248 G: Scope<Timestamp = T1::Time>,
249 T1: TraceReader + Clone + 'static,
250{
251 /// A direct implementation of `ReduceCore::reduce_abelian`.
252 pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
253 where
254 T1: TraceReader<KeyOwn: Ord>,
255 T2: for<'a> Trace<
256 Key<'a>= T1::Key<'a>,
257 KeyOwn=T1::KeyOwn,
258 ValOwn: Data,
259 Time=T1::Time,
260 Diff: Abelian,
261 >+'static,
262 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
263 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
264 {
265 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
266 if !input.is_empty() {
267 logic(key, input, change);
268 }
269 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
270 crate::consolidation::consolidate(change);
271 })
272 }
273
274 /// A direct implementation of `ReduceCore::reduce_core`.
275 pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
276 where
277 T1: TraceReader<KeyOwn: Ord>,
278 T2: for<'a> Trace<
279 Key<'a>=T1::Key<'a>,
280 KeyOwn=T1::KeyOwn,
281 ValOwn: Data,
282 Time=T1::Time,
283 >+'static,
284 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
285 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
286 {
287 use crate::operators::reduce::reduce_trace;
288 reduce_trace::<_,_,Bu,_,_>(self, name, logic)
289 }
290}
291
292
293impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
294where
295 G: Scope<Timestamp=Tr::Time>,
296 Tr: TraceReader + Clone,
297{
298 /// Brings an arranged collection out of a nested region.
299 ///
300 /// This method only applies to *regions*, which are subscopes with the same timestamp
301 /// as their containing scope. In this case, the trace type does not need to change.
302 pub fn leave_region(self) -> Arranged<G, Tr> {
303 use timely::dataflow::operators::Leave;
304 Arranged {
305 stream: self.stream.leave(),
306 trace: self.trace,
307 }
308 }
309}
310
311/// A type that can be arranged as if a collection of updates.
312pub trait Arrange<G, C> : Sized
313where
314 G: Scope<Timestamp: Lattice>,
315{
316 /// Arranges updates into a shared trace.
317 fn arrange<Ba, Bu, Tr>(self) -> Arranged<G, TraceAgent<Tr>>
318 where
319 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
320 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
321 Tr: Trace<Time=G::Timestamp> + 'static,
322 {
323 self.arrange_named::<Ba, Bu, Tr>("Arrange")
324 }
325
326 /// Arranges updates into a shared trace, with a supplied name.
327 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
328 where
329 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
330 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
331 Tr: Trace<Time=G::Timestamp> + 'static,
332 ;
333}
334
335/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
336///
337/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
338/// It uses the supplied parallelization contract to distribute the data, which does not need to
339/// be consistently by key (though this is the most common).
340pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: Stream<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
341where
342 G: Scope<Timestamp: Lattice>,
343 P: ParallelizationContract<G::Timestamp, Ba::Input>,
344 Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
345 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
346 Tr: Trace<Time=G::Timestamp>+'static,
347{
348 // The `Arrange` operator is tasked with reacting to an advancing input
349 // frontier by producing the sequence of batches whose lower and upper
350 // bounds are those frontiers, containing updates at times greater or
351 // equal to lower and not greater or equal to upper.
352 //
353 // The operator uses its batch type's `Batcher`, which accepts update
354 // triples and responds to requests to "seal" batches (presented as new
355 // upper frontiers).
356 //
357 // Each sealed batch is presented to the trace, and if at all possible
358 // transmitted along the outgoing channel. Empty batches may not have
359 // a corresponding capability, as they are only retained for actual data
360 // held by the batcher, which may prevents the operator from sending an
361 // empty batch.
362
363 let mut reader: Option<TraceAgent<Tr>> = None;
364
365 // fabricate a data-parallel operator using the `unary_notify` pattern.
366 let reader_ref = &mut reader;
367 let scope = stream.scope();
368
369 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
370
371 // Acquire a logger for arrange events.
372 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
373
374 // Where we will deposit received updates, and from which we extract batches.
375 let mut batcher = Ba::new(logger.clone(), info.global_id);
376
377 // Capabilities for the lower envelope of updates in `batcher`.
378 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
379
380 let activator = Some(scope.activator_for(info.address.clone()));
381 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
382 // If there is default exertion logic set, install it.
383 if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
384 empty_trace.set_exert_logic(exert_logic);
385 }
386
387 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
388
389 *reader_ref = Some(reader_local);
390
391 // Initialize to the minimal input frontier.
392 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
393
394 move |(input, frontier), output| {
395
396 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
397 // We don't have to keep all capabilities, but we need to be able to form output messages
398 // when we realize that time intervals are complete.
399
400 input.for_each(|cap, data| {
401 capabilities.insert(cap.retain(0));
402 batcher.push_container(data);
403 });
404
405 // The frontier may have advanced by multiple elements, which is an issue because
406 // timely dataflow currently only allows one capability per message. This means we
407 // must pretend to process the frontier advances one element at a time, batching
408 // and sending smaller bites than we might have otherwise done.
409
410 // Assert that the frontier never regresses.
411 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
412
413 // Test to see if strict progress has occurred, which happens whenever the new
414 // frontier isn't equal to the previous. It is only in this case that we have any
415 // data processing to do.
416 if prev_frontier.borrow() != frontier.frontier() {
417 // There are two cases to handle with some care:
418 //
419 // 1. If any held capabilities are not in advance of the new input frontier,
420 // we must carve out updates now in advance of the new input frontier and
421 // transmit them as batches, which requires appropriate *single* capabilities;
422 // Until timely dataflow supports multiple capabilities on messages, at least.
423 //
424 // 2. If there are no held capabilities in advance of the new input frontier,
425 // then there are no updates not in advance of the new input frontier and
426 // we can simply create an empty input batch with the new upper frontier
427 // and feed this to the trace agent (but not along the timely output).
428
429 // If there is at least one capability not in advance of the input frontier ...
430 if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
431
432 let mut upper = Antichain::new(); // re-used allocation for sealing batches.
433
434 // For each capability not in advance of the input frontier ...
435 for (index, capability) in capabilities.elements().iter().enumerate() {
436
437 if !frontier.less_equal(capability.time()) {
438
439 // Assemble the upper bound on times we can commit with this capabilities.
440 // We must respect the input frontier, and *subsequent* capabilities, as
441 // we are pretending to retire the capability changes one by one.
442 upper.clear();
443 for time in frontier.frontier().iter() {
444 upper.insert(time.clone());
445 }
446 for other_capability in &capabilities.elements()[(index + 1) .. ] {
447 upper.insert(other_capability.time().clone());
448 }
449
450 // Extract updates not in advance of `upper`.
451 let batch = batcher.seal::<Bu>(upper.clone());
452
453 writer.insert(batch.clone(), Some(capability.time().clone()));
454
455 // send the batch to downstream consumers, empty or not.
456 output.session(&capabilities.elements()[index]).give(batch);
457 }
458 }
459
460 // Having extracted and sent batches between each capability and the input frontier,
461 // we should downgrade all capabilities to match the batcher's lower update frontier.
462 // This may involve discarding capabilities, which is fine as any new updates arrive
463 // in messages with new capabilities.
464
465 let mut new_capabilities = Antichain::new();
466 for time in batcher.frontier().iter() {
467 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
468 new_capabilities.insert(capability.delayed(time));
469 }
470 else {
471 panic!("failed to find capability");
472 }
473 }
474
475 capabilities = new_capabilities;
476 }
477 else {
478 // Announce progress updates, even without data.
479 let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
480 writer.seal(frontier.frontier().to_owned());
481 }
482
483 prev_frontier.clear();
484 prev_frontier.extend(frontier.frontier().iter().cloned());
485 }
486
487 writer.exert();
488 }
489 });
490
491 Arranged { stream, trace: reader.unwrap() }
492}