differential_dataflow/operators/arrange/arrangement.rs
1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, Map};
21use timely::order::{PartialOrder, TotalOrder};
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::{Antichain, frontier::AntichainRef};
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeySpine, ValSpine};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38use trace::wrappers::filter::{TraceFilter, BatchFilter};
39
40use trace::cursor::MyTrait;
41
42use super::TraceAgent;
43
44/// An arranged collection of `(K,V)` values.
45///
46/// An `Arranged` allows multiple differential operators to share the resources (communication,
47/// computation, memory) required to produce and maintain an indexed representation of a collection.
48pub struct Arranged<G: Scope, Tr>
49where
50 G::Timestamp: Lattice+Ord,
51 Tr: TraceReader+Clone,
52{
53 /// A stream containing arranged updates.
54 ///
55 /// This stream contains the same batches of updates the trace itself accepts, so there should
56 /// be no additional overhead to receiving these records. The batches can be navigated just as
57 /// the batches in the trace, by key and by value.
58 pub stream: Stream<G, Tr::Batch>,
59 /// A shared trace, updated by the `Arrange` operator and readable by others.
60 pub trace: Tr,
61 // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
62 // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
63}
64
65impl<G: Scope, Tr> Clone for Arranged<G, Tr>
66where
67 G::Timestamp: Lattice+Ord,
68 Tr: TraceReader<Time=G::Timestamp> + Clone,
69{
70 fn clone(&self) -> Self {
71 Arranged {
72 stream: self.stream.clone(),
73 trace: self.trace.clone(),
74 }
75 }
76}
77
78use ::timely::dataflow::scopes::Child;
79use ::timely::progress::timestamp::Refines;
80
81impl<G: Scope, Tr> Arranged<G, Tr>
82where
83 G::Timestamp: Lattice+Ord,
84 Tr: TraceReader<Time=G::Timestamp> + Clone,
85{
86 /// Brings an arranged collection into a nested scope.
87 ///
88 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
89 /// have all been extended with an additional coordinate with the default value. The resulting collection does
90 /// not vary with the new timestamp coordinate.
91 pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92 -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93 where
94 Tr::Diff: 'static,
95 G::Timestamp: Clone+'static,
96 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
97 {
98 Arranged {
99 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
100 trace: TraceEnter::make_from(self.trace.clone()),
101 }
102 }
103
104 /// Brings an arranged collection into a nested region.
105 ///
106 /// This method only applies to *regions*, which are subscopes with the same timestamp
107 /// as their containing scope. In this case, the trace type does not need to change.
108 pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
109 -> Arranged<Child<'a, G, G::Timestamp>, Tr>
110 where
111 Tr::Diff: 'static,
112 G::Timestamp: Clone+'static,
113 {
114 Arranged {
115 stream: self.stream.enter(child),
116 trace: self.trace.clone(),
117 }
118 }
119
120 /// Brings an arranged collection into a nested scope.
121 ///
122 /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
123 /// have all been extended with an additional coordinate with the default value. The resulting collection does
124 /// not vary with the new timestamp coordinate.
125 pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
126 -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
127 where
128 Tr::Diff: 'static,
129 G::Timestamp: Clone+'static,
130 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
131 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static,
132 P: FnMut(&TInner)->Tr::Time+Clone+'static,
133 {
134 let logic1 = logic.clone();
135 let logic2 = logic.clone();
136 Arranged {
137 trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
138 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
139 }
140 }
141
142 /// Filters an arranged collection.
143 ///
144 /// This method produces a new arrangement backed by the same shared
145 /// arrangement as `self`, paired with user-specified logic that can
146 /// filter by key and value. The resulting collection is restricted
147 /// to the keys and values that return true under the user predicate.
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use differential_dataflow::input::Input;
153 /// use differential_dataflow::operators::arrange::ArrangeByKey;
154 ///
155 /// ::timely::example(|scope| {
156 ///
157 /// let arranged =
158 /// scope.new_collection_from(0 .. 10).1
159 /// .map(|x| (x, x+1))
160 /// .arrange_by_key();
161 ///
162 /// arranged
163 /// .filter(|k,v| k == v)
164 /// .as_collection(|k,v| (*k,*v))
165 /// .assert_empty();
166 /// });
167 /// ```
168 pub fn filter<F>(&self, logic: F)
169 -> Arranged<G, TraceFilter<Tr, F>>
170 where
171 Tr::Diff: 'static,
172 G::Timestamp: Clone+'static,
173 F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
174 {
175 let logic1 = logic.clone();
176 let logic2 = logic.clone();
177 Arranged {
178 trace: TraceFilter::make_from(self.trace.clone(), logic1),
179 stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
180 }
181 }
182 /// Flattens the stream into a `Collection`.
183 ///
184 /// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
185 /// and this method should only be used when the data need to be transformed or exchanged, rather than
186 /// supplied as arguments to an operator using the same key-value structure.
187 pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
188 where
189 Tr::Diff: Semigroup,
190 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
191 {
192 self.flat_map_ref(move |key, val| Some(logic(key,val)))
193 }
194
195 /// Extracts elements from an arrangement as a collection.
196 ///
197 /// The supplied logic may produce an iterator over output values, allowing either
198 /// filtering or flat mapping as part of the extraction.
199 pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
200 where
201 Tr::Diff: Semigroup,
202 I: IntoIterator,
203 I::Item: Data,
204 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
205 {
206 Self::flat_map_batches(&self.stream, logic)
207 }
208
209 /// Extracts elements from a stream of batches as a collection.
210 ///
211 /// The supplied logic may produce an iterator over output values, allowing either
212 /// filtering or flat mapping as part of the extraction.
213 ///
214 /// This method exists for streams of batches without the corresponding arrangement.
215 /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
216 pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
217 where
218 Tr::Diff: Semigroup,
219 I: IntoIterator,
220 I::Item: Data,
221 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
222 {
223 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
224 input.for_each(|time, data| {
225 let mut session = output.session(&time);
226 for wrapper in data.iter() {
227 let batch = &wrapper;
228 let mut cursor = batch.cursor();
229 while let Some(key) = cursor.get_key(batch) {
230 while let Some(val) = cursor.get_val(batch) {
231 for datum in logic(key, val) {
232 cursor.map_times(batch, |time, diff| {
233 session.give((datum.clone(), time.clone(), diff.clone()));
234 });
235 }
236 cursor.step_val(batch);
237 }
238 cursor.step_key(batch);
239 }
240 }
241 });
242 })
243 .as_collection()
244 }
245
246 /// Report values associated with keys at certain times.
247 ///
248 /// This method consumes a stream of (key, time) queries and reports the corresponding stream of
249 /// (key, value, time, diff) accumulations in the `self` trace.
250 pub fn lookup(&self, queries: &Stream<G, (Tr::KeyOwned, G::Timestamp)>) -> Stream<G, (Tr::KeyOwned, Tr::ValOwned, G::Timestamp, Tr::Diff)>
251 where
252 G::Timestamp: Data+Lattice+Ord+TotalOrder,
253 Tr::KeyOwned: ExchangeData+Hashable,
254 Tr::ValOwned: ExchangeData,
255 Tr::Diff: ExchangeData+Semigroup,
256 Tr: 'static,
257 {
258 // while the arrangement is already correctly distributed, the query stream may not be.
259 let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into());
260 queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {
261
262 let mut trace = Some(self.trace.clone());
263 // release `set_physical_compaction` capability.
264 trace.as_mut().unwrap().set_physical_compaction(Antichain::new().borrow());
265
266 let mut stash = Vec::new();
267 let mut capability: Option<Capability<G::Timestamp>> = None;
268
269 let mut active = Vec::new();
270 let mut retain = Vec::new();
271
272 let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new();
273 let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new();
274
275 move |input1, input2, output| {
276
277 input1.for_each(|time, data| {
278 // if the minimum capability "improves" retain it.
279 if capability.is_none() || time.time().less_than(capability.as_ref().unwrap().time()) {
280 capability = Some(time.retain());
281 }
282 stash.extend(data.iter().cloned());
283 });
284
285 // drain input2; we will consult `trace` directly.
286 input2.for_each(|_time, _data| { });
287
288 assert_eq!(capability.is_none(), stash.is_empty());
289
290 let mut drained = false;
291 if let Some(capability) = capability.as_mut() {
292 if !input2.frontier().less_equal(capability.time()) {
293 for datum in stash.drain(..) {
294 if !input2.frontier().less_equal(&datum.1) {
295 active.push(datum);
296 }
297 else {
298 retain.push(datum);
299 }
300 }
301 drained = !active.is_empty();
302
303 ::std::mem::swap(&mut stash, &mut retain); // retain now the stashed queries.
304
305 // sort temp1 by key and then by time.
306 active.sort_unstable_by(|x,y| x.0.cmp(&y.0));
307
308 let (mut cursor, storage) = trace.as_mut().unwrap().cursor();
309 let mut session = output.session(&capability);
310
311 // // V0: Potentially quadratic under load.
312 // for (key, time) in active.drain(..) {
313 // cursor.seek_key(&storage, &key);
314 // if cursor.get_key(&storage) == Some(&key) {
315 // while let Some(val) = cursor.get_val(&storage) {
316 // let mut count = R::zero();
317 // cursor.map_times(&storage, |t, d| if t.less_equal(&time) {
318 // count = count + d;
319 // });
320 // if !count.is_zero() {
321 // session.give((key.clone(), val.clone(), time.clone(), count));
322 // }
323 // cursor.step_val(&storage);
324 // }
325 // }
326 // }
327
328 // V1: Stable under load
329 let mut active_finger = 0;
330 while active_finger < active.len() {
331
332 let key = &active[active_finger].0;
333 let mut same_key = active_finger;
334 while active.get(same_key).map(|x| &x.0) == Some(key) {
335 same_key += 1;
336 }
337
338 cursor.seek_key_owned(&storage, key);
339 if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) {
340
341 let mut active = &active[active_finger .. same_key];
342
343 while let Some(val) = cursor.get_val(&storage) {
344 cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone())));
345 cursor.step_val(&storage);
346 }
347
348 working.sort_by(|x,y| x.0.cmp(&y.0));
349 for (time, val, diff) in working.drain(..) {
350 if !active.is_empty() && active[0].1.less_than(&time) {
351 crate::consolidation::consolidate(&mut working2);
352 while !active.is_empty() && active[0].1.less_than(&time) {
353 for (val, count) in working2.iter() {
354 session.give((key.clone(), val.clone(), active[0].1.clone(), count.clone()));
355 }
356 active = &active[1..];
357 }
358 }
359 working2.push((val, diff));
360 }
361 if !active.is_empty() {
362 crate::consolidation::consolidate(&mut working2);
363 while !active.is_empty() {
364 for (val, count) in working2.iter() {
365 session.give((key.clone(), val.clone(), active[0].1.clone(), count.clone()));
366 }
367 active = &active[1..];
368 }
369 }
370 }
371 active_finger = same_key;
372 }
373 active.clear();
374 }
375 }
376
377 if drained {
378 if stash.is_empty() { capability = None; }
379 if let Some(capability) = capability.as_mut() {
380 let mut min_time = stash[0].1.clone();
381 for datum in stash[1..].iter() {
382 if datum.1.less_than(&min_time) {
383 min_time = datum.1.clone();
384 }
385 }
386 capability.downgrade(&min_time);
387 }
388 }
389
390 // Determine new frontier on queries that may be issued.
391 // TODO: This code looks very suspect; explain better or fix.
392 let frontier = IntoIterator::into_iter([
393 capability.as_ref().map(|c| c.time().clone()),
394 input1.frontier().frontier().get(0).cloned(),
395 ]).flatten().min();
396
397 if let Some(frontier) = frontier {
398 trace.as_mut().map(|t| t.set_logical_compaction(AntichainRef::new(&[frontier])));
399 }
400 else {
401 trace = None;
402 }
403 }
404 })
405 }
406}
407
408
409use crate::difference::Multiply;
410// Direct join implementations.
411impl<G: Scope, Tr> Arranged<G, Tr>
412where
413 G::Timestamp: Lattice+Ord,
414 Tr: TraceReader<Time=G::Timestamp> + Clone + 'static,
415 Tr::Diff: Semigroup,
416{
417 /// A direct implementation of the `JoinCore::join_core` method.
418 pub fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<Tr::Diff as Multiply<Tr2::Diff>>::Output>
419 where
420 Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>,Time=G::Timestamp>+Clone+'static,
421 Tr2::Diff: Semigroup,
422 Tr::Diff: Multiply<Tr2::Diff>,
423 <Tr::Diff as Multiply<Tr2::Diff>>::Output: Semigroup,
424 I: IntoIterator,
425 I::Item: Data,
426 L: FnMut(Tr::Key<'_>,Tr::Val<'_>,Tr2::Val<'_>)->I+'static
427 {
428 let result = move |k: Tr::Key<'_>, v1: Tr::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &Tr::Diff, r2: &Tr2::Diff| {
429 let t = t.clone();
430 let r = (r1.clone()).multiply(r2);
431 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
432 };
433 self.join_core_internal_unsafe(other, result)
434 }
435 /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
436 pub fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
437 where
438 Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>, Time=G::Timestamp>+Clone+'static,
439 Tr2::Diff: Semigroup,
440 D: Data,
441 ROut: Semigroup,
442 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
443 L: FnMut(Tr::Key<'_>, Tr::Val<'_>,Tr2::Val<'_>,&G::Timestamp,&Tr::Diff,&Tr2::Diff)->I+'static,
444 {
445 use crate::operators::join::join_traces;
446 join_traces(self, other, result)
447 }
448}
449
450// Direct reduce implementations.
451use crate::difference::Abelian;
452impl<G: Scope, T1> Arranged<G, T1>
453where
454 G::Timestamp: Lattice+Ord,
455 T1: TraceReader<Time=G::Timestamp>+Clone+'static,
456 T1::Diff: Semigroup,
457{
458 /// A direct implementation of `ReduceCore::reduce_abelian`.
459 pub fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
460 where
461 T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=G::Timestamp>+'static,
462 T2::ValOwned: Data,
463 T2::Diff: Abelian,
464 T2::Batch: Batch,
465 T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
466 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
467 {
468 self.reduce_core::<_,T2>(name, move |key, input, output, change| {
469 if !input.is_empty() {
470 logic(key, input, change);
471 }
472 change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
473 crate::consolidation::consolidate(change);
474 })
475 }
476
477 /// A direct implementation of `ReduceCore::reduce_core`.
478 pub fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
479 where
480 T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+'static,
481 T2::ValOwned: Data,
482 T2::Diff: Semigroup,
483 T2::Batch: Batch,
484 T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
485 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
486 {
487 use crate::operators::reduce::reduce_trace;
488 reduce_trace(self, name, logic)
489 }
490}
491
492
493impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
494where
495 G::Timestamp: Lattice+Ord,
496 Tr: TraceReader<Time=G::Timestamp> + Clone,
497{
498 /// Brings an arranged collection out of a nested region.
499 ///
500 /// This method only applies to *regions*, which are subscopes with the same timestamp
501 /// as their containing scope. In this case, the trace type does not need to change.
502 pub fn leave_region(&self) -> Arranged<G, Tr> {
503 use timely::dataflow::operators::Leave;
504 Arranged {
505 stream: self.stream.leave(),
506 trace: self.trace.clone(),
507 }
508 }
509}
510
511/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`.
512///
513/// This trait is primarily implemented by `Collection<G,(K,V),R>`.
514///
515/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained.
516/// This allows e.g. for `Vec<u8>` inputs to present as `&[u8]` when read, but that relationship is not
517/// constrained by this trait.
518pub trait Arrange<G, K, V, R>
519where
520 G: Scope,
521 G::Timestamp: Lattice,
522{
523 /// Arranges a stream of `(Key, Val)` updates by `Key`.
524 ///
525 /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
526 fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
527 where
528 Tr: Trace<Time=G::Timestamp> + 'static,
529 K: ExchangeData + Hashable,
530 V: ExchangeData,
531 R: ExchangeData,
532 Tr::Batch: Batch,
533 Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
534 Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
535 {
536 self.arrange_named("Arrange")
537 }
538
539 /// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument.
540 ///
541 /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
542 fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
543 where
544 Tr: Trace<Time=G::Timestamp> + 'static,
545 K: ExchangeData + Hashable,
546 V: ExchangeData,
547 R: ExchangeData,
548 Tr::Batch: Batch,
549 Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
550 Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
551 {
552 let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
553 self.arrange_core(exchange, name)
554 }
555
556 /// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract.
557 ///
558 /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
559 /// It uses the supplied parallelization contract to distribute the data, which does not need to
560 /// be consistently by key (though this is the most common).
561 fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
562 where
563 P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
564 K: Clone,
565 V: Clone,
566 R: Clone,
567 Tr: Trace<Time=G::Timestamp>+'static,
568 Tr::Batch: Batch,
569 Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
570 Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
571 ;
572}
573
574impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
575where
576 G: Scope,
577 G::Timestamp: Lattice,
578 K: Clone + 'static,
579 V: Clone + 'static,
580 R: Semigroup,
581{
582 fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
583 where
584 P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
585 Tr: Trace<Time=G::Timestamp>+'static,
586 Tr::Batch: Batch,
587 Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
588 Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
589 {
590 // The `Arrange` operator is tasked with reacting to an advancing input
591 // frontier by producing the sequence of batches whose lower and upper
592 // bounds are those frontiers, containing updates at times greater or
593 // equal to lower and not greater or equal to upper.
594 //
595 // The operator uses its batch type's `Batcher`, which accepts update
596 // triples and responds to requests to "seal" batches (presented as new
597 // upper frontiers).
598 //
599 // Each sealed batch is presented to the trace, and if at all possible
600 // transmitted along the outgoing channel. Empty batches may not have
601 // a corresponding capability, as they are only retained for actual data
602 // held by the batcher, which may prevents the operator from sending an
603 // empty batch.
604
605 let mut reader: Option<TraceAgent<Tr>> = None;
606
607 // fabricate a data-parallel operator using the `unary_notify` pattern.
608 let stream = {
609
610 let reader = &mut reader;
611
612 self.inner.unary_frontier(pact, name, move |_capability, info| {
613
614 // Acquire a logger for arrange events.
615 let logger = {
616 let scope = self.scope();
617 let register = scope.log_register();
618 register.get::<crate::logging::DifferentialEvent>("differential/arrange")
619 };
620
621 // Where we will deposit received updates, and from which we extract batches.
622 let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id);
623
624 // Capabilities for the lower envelope of updates in `batcher`.
625 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
626
627 let activator = Some(self.scope().activator_for(&info.address[..]));
628 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
629 // If there is default exertion logic set, install it.
630 if let Some(exert_logic) = self.inner.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
631 empty_trace.set_exert_logic(exert_logic);
632 }
633
634 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
635
636 *reader = Some(reader_local);
637
638 // Initialize to the minimal input frontier.
639 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
640
641 move |input, output| {
642
643 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
644 // We don't have to keep all capabilities, but we need to be able to form output messages
645 // when we realize that time intervals are complete.
646
647 input.for_each(|cap, data| {
648 capabilities.insert(cap.retain());
649 batcher.push_batch(data);
650 });
651
652 // The frontier may have advanced by multiple elements, which is an issue because
653 // timely dataflow currently only allows one capability per message. This means we
654 // must pretend to process the frontier advances one element at a time, batching
655 // and sending smaller bites than we might have otherwise done.
656
657 // Assert that the frontier never regresses.
658 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
659
660 // Test to see if strict progress has occurred, which happens whenever the new
661 // frontier isn't equal to the previous. It is only in this case that we have any
662 // data processing to do.
663 if prev_frontier.borrow() != input.frontier().frontier() {
664 // There are two cases to handle with some care:
665 //
666 // 1. If any held capabilities are not in advance of the new input frontier,
667 // we must carve out updates now in advance of the new input frontier and
668 // transmit them as batches, which requires appropriate *single* capabilites;
669 // Until timely dataflow supports multiple capabilities on messages, at least.
670 //
671 // 2. If there are no held capabilities in advance of the new input frontier,
672 // then there are no updates not in advance of the new input frontier and
673 // we can simply create an empty input batch with the new upper frontier
674 // and feed this to the trace agent (but not along the timely output).
675
676 // If there is at least one capability not in advance of the input frontier ...
677 if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
678
679 let mut upper = Antichain::new(); // re-used allocation for sealing batches.
680
681 // For each capability not in advance of the input frontier ...
682 for (index, capability) in capabilities.elements().iter().enumerate() {
683
684 if !input.frontier().less_equal(capability.time()) {
685
686 // Assemble the upper bound on times we can commit with this capabilities.
687 // We must respect the input frontier, and *subsequent* capabilities, as
688 // we are pretending to retire the capability changes one by one.
689 upper.clear();
690 for time in input.frontier().frontier().iter() {
691 upper.insert(time.clone());
692 }
693 for other_capability in &capabilities.elements()[(index + 1) .. ] {
694 upper.insert(other_capability.time().clone());
695 }
696
697 // Extract updates not in advance of `upper`.
698 let batch = batcher.seal::<Tr::Builder>(upper.clone());
699
700 writer.insert(batch.clone(), Some(capability.time().clone()));
701
702 // send the batch to downstream consumers, empty or not.
703 output.session(&capabilities.elements()[index]).give(batch);
704 }
705 }
706
707 // Having extracted and sent batches between each capability and the input frontier,
708 // we should downgrade all capabilities to match the batcher's lower update frontier.
709 // This may involve discarding capabilities, which is fine as any new updates arrive
710 // in messages with new capabilities.
711
712 let mut new_capabilities = Antichain::new();
713 for time in batcher.frontier().iter() {
714 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
715 new_capabilities.insert(capability.delayed(time));
716 }
717 else {
718 panic!("failed to find capability");
719 }
720 }
721
722 capabilities = new_capabilities;
723 }
724 else {
725 // Announce progress updates, even without data.
726 let _batch = batcher.seal::<Tr::Builder>(input.frontier().frontier().to_owned());
727 writer.seal(input.frontier().frontier().to_owned());
728 }
729
730 prev_frontier.clear();
731 prev_frontier.extend(input.frontier().frontier().iter().cloned());
732 }
733
734 writer.exert();
735 }
736 })
737 };
738
739 Arranged { stream, trace: reader.unwrap() }
740 }
741}
742
743impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, K, (), R> for Collection<G, K, R>
744where
745 G::Timestamp: Lattice+Ord,
746{
747 fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
748 where
749 P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
750 Tr: Trace<Time=G::Timestamp>+'static,
751 Tr::Batch: Batch,
752 Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
753 Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
754 {
755 self.map(|k| (k, ()))
756 .arrange_core(pact, name)
757 }
758}
759
760/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
761///
762/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
763/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
764/// pair `(u64, K)` of hash value and key.
765pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Semigroup>
766where G::Timestamp: Lattice+Ord {
767 /// Arranges a collection of `(Key, Val)` records by `Key`.
768 ///
769 /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
770 /// This trace is current for all times completed by the output stream, which can be used to
771 /// safely identify the stable times and values in the trace.
772 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
773
774 /// As `arrange_by_key` but with the ability to name the arrangement.
775 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
776}
777
778impl<G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
779where
780 G::Timestamp: Lattice+Ord
781{
782 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
783 self.arrange_by_key_named("ArrangeByKey")
784 }
785
786 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
787 self.arrange_named(name)
788 }
789}
790
791/// Arranges something as `(Key, ())` pairs according to a type `T` of trace.
792///
793/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
794/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
795/// pair `(u64, K)` of hash value and key.
796pub trait ArrangeBySelf<G: Scope, K: Data+Hashable, R: Semigroup>
797where
798 G::Timestamp: Lattice+Ord
799{
800 /// Arranges a collection of `Key` records by `Key`.
801 ///
802 /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
803 /// This trace is current for all times complete in the output stream, which can be used to safely
804 /// identify the stable times and values in the trace.
805 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
806
807 /// As `arrange_by_self` but with the ability to name the arrangement.
808 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
809}
810
811
812impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
813where
814 G::Timestamp: Lattice+Ord
815{
816 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
817 self.arrange_by_self_named("ArrangeBySelf")
818 }
819
820 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
821 self.map(|k| (k, ()))
822 .arrange_named(name)
823 }
824}