differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, Map};
21use timely::order::{PartialOrder, TotalOrder};
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::{Antichain, frontier::AntichainRef};
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeySpine, ValSpine};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38use trace::wrappers::filter::{TraceFilter, BatchFilter};
39
40use trace::cursor::MyTrait;
41
42use super::TraceAgent;
43
44/// An arranged collection of `(K,V)` values.
45///
46/// An `Arranged` allows multiple differential operators to share the resources (communication,
47/// computation, memory) required to produce and maintain an indexed representation of a collection.
48pub struct Arranged<G: Scope, Tr>
49where
50    G::Timestamp: Lattice+Ord,
51    Tr: TraceReader+Clone,
52{
53    /// A stream containing arranged updates.
54    ///
55    /// This stream contains the same batches of updates the trace itself accepts, so there should
56    /// be no additional overhead to receiving these records. The batches can be navigated just as
57    /// the batches in the trace, by key and by value.
58    pub stream: Stream<G, Tr::Batch>,
59    /// A shared trace, updated by the `Arrange` operator and readable by others.
60    pub trace: Tr,
61    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
62    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
63}
64
65impl<G: Scope, Tr> Clone for Arranged<G, Tr>
66where
67    G::Timestamp: Lattice+Ord,
68    Tr: TraceReader<Time=G::Timestamp> + Clone,
69{
70    fn clone(&self) -> Self {
71        Arranged {
72            stream: self.stream.clone(),
73            trace: self.trace.clone(),
74        }
75    }
76}
77
78use ::timely::dataflow::scopes::Child;
79use ::timely::progress::timestamp::Refines;
80
81impl<G: Scope, Tr> Arranged<G, Tr>
82where
83    G::Timestamp: Lattice+Ord,
84    Tr: TraceReader<Time=G::Timestamp> + Clone,
85{
86    /// Brings an arranged collection into a nested scope.
87    ///
88    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
89    /// have all been extended with an additional coordinate with the default value. The resulting collection does
90    /// not vary with the new timestamp coordinate.
91    pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92        -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93        where
94            Tr::Diff: 'static,
95            G::Timestamp: Clone+'static,
96            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
97    {
98        Arranged {
99            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
100            trace: TraceEnter::make_from(self.trace.clone()),
101        }
102    }
103
104    /// Brings an arranged collection into a nested region.
105    ///
106    /// This method only applies to *regions*, which are subscopes with the same timestamp
107    /// as their containing scope. In this case, the trace type does not need to change.
108    pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
109        -> Arranged<Child<'a, G, G::Timestamp>, Tr>
110        where
111            Tr::Diff: 'static,
112            G::Timestamp: Clone+'static,
113    {
114        Arranged {
115            stream: self.stream.enter(child),
116            trace: self.trace.clone(),
117        }
118    }
119
120    /// Brings an arranged collection into a nested scope.
121    ///
122    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
123    /// have all been extended with an additional coordinate with the default value. The resulting collection does
124    /// not vary with the new timestamp coordinate.
125    pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
126        -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
127        where
128            Tr::Diff: 'static,
129            G::Timestamp: Clone+'static,
130            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
131            F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static,
132            P: FnMut(&TInner)->Tr::Time+Clone+'static,
133        {
134        let logic1 = logic.clone();
135        let logic2 = logic.clone();
136        Arranged {
137            trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
138            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
139        }
140    }
141
142    /// Filters an arranged collection.
143    ///
144    /// This method produces a new arrangement backed by the same shared
145    /// arrangement as `self`, paired with user-specified logic that can
146    /// filter by key and value. The resulting collection is restricted
147    /// to the keys and values that return true under the user predicate.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use differential_dataflow::input::Input;
153    /// use differential_dataflow::operators::arrange::ArrangeByKey;
154    ///
155    /// ::timely::example(|scope| {
156    ///
157    ///     let arranged =
158    ///     scope.new_collection_from(0 .. 10).1
159    ///          .map(|x| (x, x+1))
160    ///          .arrange_by_key();
161    ///
162    ///     arranged
163    ///         .filter(|k,v| k == v)
164    ///         .as_collection(|k,v| (*k,*v))
165    ///         .assert_empty();
166    /// });
167    /// ```
168    pub fn filter<F>(&self, logic: F)
169        -> Arranged<G, TraceFilter<Tr, F>>
170        where
171            Tr::Diff: 'static,
172            G::Timestamp: Clone+'static,
173            F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
174    {
175        let logic1 = logic.clone();
176        let logic2 = logic.clone();
177        Arranged {
178            trace: TraceFilter::make_from(self.trace.clone(), logic1),
179            stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
180        }
181    }
182    /// Flattens the stream into a `Collection`.
183    ///
184    /// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
185    /// and this method should only be used when the data need to be transformed or exchanged, rather than
186    /// supplied as arguments to an operator using the same key-value structure.
187    pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
188        where
189            Tr::Diff: Semigroup,
190            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
191    {
192        self.flat_map_ref(move |key, val| Some(logic(key,val)))
193    }
194
195    /// Extracts elements from an arrangement as a collection.
196    ///
197    /// The supplied logic may produce an iterator over output values, allowing either
198    /// filtering or flat mapping as part of the extraction.
199    pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
200        where
201            Tr::Diff: Semigroup,
202            I: IntoIterator,
203            I::Item: Data,
204            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
205    {
206        Self::flat_map_batches(&self.stream, logic)
207    }
208
209    /// Extracts elements from a stream of batches as a collection.
210    ///
211    /// The supplied logic may produce an iterator over output values, allowing either
212    /// filtering or flat mapping as part of the extraction.
213    ///
214    /// This method exists for streams of batches without the corresponding arrangement.
215    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
216    pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
217    where
218        Tr::Diff: Semigroup,
219        I: IntoIterator,
220        I::Item: Data,
221        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
222    {
223        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
224            input.for_each(|time, data| {
225                let mut session = output.session(&time);
226                for wrapper in data.iter() {
227                    let batch = &wrapper;
228                    let mut cursor = batch.cursor();
229                    while let Some(key) = cursor.get_key(batch) {
230                        while let Some(val) = cursor.get_val(batch) {
231                            for datum in logic(key, val) {
232                                cursor.map_times(batch, |time, diff| {
233                                    session.give((datum.clone(), time.clone(), diff.clone()));
234                                });
235                            }
236                            cursor.step_val(batch);
237                        }
238                        cursor.step_key(batch);
239                    }
240                }
241            });
242        })
243        .as_collection()
244    }
245
246    /// Report values associated with keys at certain times.
247    ///
248    /// This method consumes a stream of (key, time) queries and reports the corresponding stream of
249    /// (key, value, time, diff) accumulations in the `self` trace.
250    pub fn lookup(&self, queries: &Stream<G, (Tr::KeyOwned, G::Timestamp)>) -> Stream<G, (Tr::KeyOwned, Tr::ValOwned, G::Timestamp, Tr::Diff)>
251    where
252        G::Timestamp: Data+Lattice+Ord+TotalOrder,
253        Tr::KeyOwned: ExchangeData+Hashable,
254        Tr::ValOwned: ExchangeData,
255        Tr::Diff: ExchangeData+Semigroup,
256        Tr: 'static,
257    {
258        // while the arrangement is already correctly distributed, the query stream may not be.
259        let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into());
260        queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {
261
262            let mut trace = Some(self.trace.clone());
263            // release `set_physical_compaction` capability.
264            trace.as_mut().unwrap().set_physical_compaction(Antichain::new().borrow());
265
266            let mut stash = Vec::new();
267            let mut capability: Option<Capability<G::Timestamp>> = None;
268
269            let mut active = Vec::new();
270            let mut retain = Vec::new();
271
272            let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new();
273            let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new();
274
275            move |input1, input2, output| {
276
277                input1.for_each(|time, data| {
278                    // if the minimum capability "improves" retain it.
279                    if capability.is_none() || time.time().less_than(capability.as_ref().unwrap().time()) {
280                        capability = Some(time.retain());
281                    }
282                    stash.extend(data.iter().cloned());
283                });
284
285                // drain input2; we will consult `trace` directly.
286                input2.for_each(|_time, _data| { });
287
288                assert_eq!(capability.is_none(), stash.is_empty());
289
290                let mut drained = false;
291                if let Some(capability) = capability.as_mut() {
292                    if !input2.frontier().less_equal(capability.time()) {
293                        for datum in stash.drain(..) {
294                            if !input2.frontier().less_equal(&datum.1) {
295                                active.push(datum);
296                            }
297                            else {
298                                retain.push(datum);
299                            }
300                        }
301                        drained = !active.is_empty();
302
303                        ::std::mem::swap(&mut stash, &mut retain);    // retain now the stashed queries.
304
305                        // sort temp1 by key and then by time.
306                        active.sort_unstable_by(|x,y| x.0.cmp(&y.0));
307
308                        let (mut cursor, storage) = trace.as_mut().unwrap().cursor();
309                        let mut session = output.session(&capability);
310
311                        // // V0: Potentially quadratic under load.
312                        // for (key, time) in active.drain(..) {
313                        //     cursor.seek_key(&storage, &key);
314                        //     if cursor.get_key(&storage) == Some(&key) {
315                        //         while let Some(val) = cursor.get_val(&storage) {
316                        //             let mut count = R::zero();
317                        //             cursor.map_times(&storage, |t, d| if t.less_equal(&time) {
318                        //                 count = count + d;
319                        //             });
320                        //             if !count.is_zero() {
321                        //                 session.give((key.clone(), val.clone(), time.clone(), count));
322                        //             }
323                        //             cursor.step_val(&storage);
324                        //         }
325                        //     }
326                        // }
327
328                        // V1: Stable under load
329                        let mut active_finger = 0;
330                        while active_finger < active.len() {
331
332                            let key = &active[active_finger].0;
333                            let mut same_key = active_finger;
334                            while active.get(same_key).map(|x| &x.0) == Some(key) {
335                                same_key += 1;
336                            }
337
338                            cursor.seek_key_owned(&storage, key);
339                            if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) {
340
341                                let mut active = &active[active_finger .. same_key];
342
343                                while let Some(val) = cursor.get_val(&storage) {
344                                    cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone())));
345                                    cursor.step_val(&storage);
346                                }
347
348                                working.sort_by(|x,y| x.0.cmp(&y.0));
349                                for (time, val, diff) in working.drain(..) {
350                                    if !active.is_empty() && active[0].1.less_than(&time) {
351                                        crate::consolidation::consolidate(&mut working2);
352                                        while !active.is_empty() && active[0].1.less_than(&time) {
353                                            for (val, count) in working2.iter() {
354                                                session.give((key.clone(), val.clone(), active[0].1.clone(), count.clone()));
355                                            }
356                                            active = &active[1..];
357                                        }
358                                    }
359                                    working2.push((val, diff));
360                                }
361                                if !active.is_empty() {
362                                    crate::consolidation::consolidate(&mut working2);
363                                    while !active.is_empty() {
364                                        for (val, count) in working2.iter() {
365                                            session.give((key.clone(), val.clone(), active[0].1.clone(), count.clone()));
366                                        }
367                                        active = &active[1..];
368                                    }
369                                }
370                            }
371                            active_finger = same_key;
372                        }
373                        active.clear();
374                    }
375                }
376
377                if drained {
378                    if stash.is_empty() { capability = None; }
379                    if let Some(capability) = capability.as_mut() {
380                        let mut min_time = stash[0].1.clone();
381                        for datum in stash[1..].iter() {
382                            if datum.1.less_than(&min_time) {
383                                min_time = datum.1.clone();
384                            }
385                        }
386                        capability.downgrade(&min_time);
387                    }
388                }
389
390                // Determine new frontier on queries that may be issued.
391                // TODO: This code looks very suspect; explain better or fix.
392                let frontier = IntoIterator::into_iter([
393                    capability.as_ref().map(|c| c.time().clone()),
394                    input1.frontier().frontier().get(0).cloned(),
395                ]).flatten().min();
396
397                if let Some(frontier) = frontier {
398                    trace.as_mut().map(|t| t.set_logical_compaction(AntichainRef::new(&[frontier])));
399                }
400                else {
401                    trace = None;
402                }
403            }
404        })
405    }
406}
407
408
409use crate::difference::Multiply;
410// Direct join implementations.
411impl<G: Scope, Tr> Arranged<G, Tr>
412where
413    G::Timestamp: Lattice+Ord,
414    Tr: TraceReader<Time=G::Timestamp> + Clone + 'static,
415    Tr::Diff: Semigroup,
416{
417    /// A direct implementation of the `JoinCore::join_core` method.
418    pub fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<Tr::Diff as Multiply<Tr2::Diff>>::Output>
419    where
420        Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>,Time=G::Timestamp>+Clone+'static,
421        Tr2::Diff: Semigroup,
422        Tr::Diff: Multiply<Tr2::Diff>,
423        <Tr::Diff as Multiply<Tr2::Diff>>::Output: Semigroup,
424        I: IntoIterator,
425        I::Item: Data,
426        L: FnMut(Tr::Key<'_>,Tr::Val<'_>,Tr2::Val<'_>)->I+'static
427    {
428        let result = move |k: Tr::Key<'_>, v1: Tr::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &Tr::Diff, r2: &Tr2::Diff| {
429            let t = t.clone();
430            let r = (r1.clone()).multiply(r2);
431            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
432        };
433        self.join_core_internal_unsafe(other, result)
434    }
435    /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
436    pub fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
437    where
438        Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>, Time=G::Timestamp>+Clone+'static,
439        Tr2::Diff: Semigroup,
440        D: Data,
441        ROut: Semigroup,
442        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
443        L: FnMut(Tr::Key<'_>, Tr::Val<'_>,Tr2::Val<'_>,&G::Timestamp,&Tr::Diff,&Tr2::Diff)->I+'static,
444    {
445        use crate::operators::join::join_traces;
446        join_traces(self, other, result)
447    }
448}
449
450// Direct reduce implementations.
451use crate::difference::Abelian;
452impl<G: Scope, T1> Arranged<G, T1>
453where
454    G::Timestamp: Lattice+Ord,
455    T1: TraceReader<Time=G::Timestamp>+Clone+'static,
456    T1::Diff: Semigroup,
457{
458    /// A direct implementation of `ReduceCore::reduce_abelian`.
459    pub fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
460    where
461        T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=G::Timestamp>+'static,
462        T2::ValOwned: Data,
463        T2::Diff: Abelian,
464        T2::Batch: Batch,
465        T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
466        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
467    {
468        self.reduce_core::<_,T2>(name, move |key, input, output, change| {
469            if !input.is_empty() {
470                logic(key, input, change);
471            }
472            change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
473            crate::consolidation::consolidate(change);
474        })
475    }
476
477    /// A direct implementation of `ReduceCore::reduce_core`.
478    pub fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
479    where
480        T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+'static,
481        T2::ValOwned: Data,
482        T2::Diff: Semigroup,
483        T2::Batch: Batch,
484        T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
485        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
486    {
487        use crate::operators::reduce::reduce_trace;
488        reduce_trace(self, name, logic)
489    }
490}
491
492
493impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
494where
495    G::Timestamp: Lattice+Ord,
496    Tr: TraceReader<Time=G::Timestamp> + Clone,
497{
498    /// Brings an arranged collection out of a nested region.
499    ///
500    /// This method only applies to *regions*, which are subscopes with the same timestamp
501    /// as their containing scope. In this case, the trace type does not need to change.
502    pub fn leave_region(&self) -> Arranged<G, Tr> {
503        use timely::dataflow::operators::Leave;
504        Arranged {
505            stream: self.stream.leave(),
506            trace: self.trace.clone(),
507        }
508    }
509}
510
511/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`.
512///
513/// This trait is primarily implemented by `Collection<G,(K,V),R>`.
514///
515/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained.
516/// This allows e.g. for `Vec<u8>` inputs to present as `&[u8]` when read, but that relationship is not
517/// constrained by this trait.
518pub trait Arrange<G, K, V, R>
519where
520    G: Scope,
521    G::Timestamp: Lattice,
522{
523    /// Arranges a stream of `(Key, Val)` updates by `Key`.
524    ///
525    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
526    fn arrange<Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
527    where
528        Tr: Trace<Time=G::Timestamp> + 'static,
529        K: ExchangeData + Hashable,
530        V: ExchangeData,
531        R: ExchangeData,
532        Tr::Batch: Batch,
533        Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
534        Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
535    {
536        self.arrange_named("Arrange")
537    }
538
539    /// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument.
540    ///
541    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
542    fn arrange_named<Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
543    where
544        Tr: Trace<Time=G::Timestamp> + 'static,
545        K: ExchangeData + Hashable,
546        V: ExchangeData,
547        R: ExchangeData,
548        Tr::Batch: Batch,
549        Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
550        Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
551    {
552        let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
553        self.arrange_core(exchange, name)
554    }
555
556    /// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract.
557    ///
558    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
559    /// It uses the supplied parallelization contract to distribute the data, which does not need to
560    /// be consistently by key (though this is the most common).
561    fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
562    where
563        P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
564        K: Clone,
565        V: Clone,
566        R: Clone,
567        Tr: Trace<Time=G::Timestamp>+'static,
568        Tr::Batch: Batch,
569        Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
570        Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
571    ;
572}
573
574impl<G, K, V, R> Arrange<G, K, V, R> for Collection<G, (K, V), R>
575where
576    G: Scope,
577    G::Timestamp: Lattice,
578    K: Clone + 'static,
579    V: Clone + 'static,
580    R: Semigroup,
581{
582    fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
583    where
584        P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
585        Tr: Trace<Time=G::Timestamp>+'static,
586        Tr::Batch: Batch,
587        Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
588        Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
589    {
590        // The `Arrange` operator is tasked with reacting to an advancing input
591        // frontier by producing the sequence of batches whose lower and upper
592        // bounds are those frontiers, containing updates at times greater or
593        // equal to lower and not greater or equal to upper.
594        //
595        // The operator uses its batch type's `Batcher`, which accepts update
596        // triples and responds to requests to "seal" batches (presented as new
597        // upper frontiers).
598        //
599        // Each sealed batch is presented to the trace, and if at all possible
600        // transmitted along the outgoing channel. Empty batches may not have
601        // a corresponding capability, as they are only retained for actual data
602        // held by the batcher, which may prevents the operator from sending an
603        // empty batch.
604
605        let mut reader: Option<TraceAgent<Tr>> = None;
606
607        // fabricate a data-parallel operator using the `unary_notify` pattern.
608        let stream = {
609
610            let reader = &mut reader;
611
612            self.inner.unary_frontier(pact, name, move |_capability, info| {
613
614                // Acquire a logger for arrange events.
615                let logger = {
616                    let scope = self.scope();
617                    let register = scope.log_register();
618                    register.get::<crate::logging::DifferentialEvent>("differential/arrange")
619                };
620
621                // Where we will deposit received updates, and from which we extract batches.
622                let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id);
623
624                // Capabilities for the lower envelope of updates in `batcher`.
625                let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
626
627                let activator = Some(self.scope().activator_for(&info.address[..]));
628                let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
629                // If there is default exertion logic set, install it.
630                if let Some(exert_logic) = self.inner.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
631                    empty_trace.set_exert_logic(exert_logic);
632                }
633
634                let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
635
636                *reader = Some(reader_local);
637
638                // Initialize to the minimal input frontier.
639                let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
640
641                move |input, output| {
642
643                    // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
644                    // We don't have to keep all capabilities, but we need to be able to form output messages
645                    // when we realize that time intervals are complete.
646
647                    input.for_each(|cap, data| {
648                        capabilities.insert(cap.retain());
649                        batcher.push_batch(data);
650                    });
651
652                    // The frontier may have advanced by multiple elements, which is an issue because
653                    // timely dataflow currently only allows one capability per message. This means we
654                    // must pretend to process the frontier advances one element at a time, batching
655                    // and sending smaller bites than we might have otherwise done.
656
657                    // Assert that the frontier never regresses.
658                    assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
659
660                    // Test to see if strict progress has occurred, which happens whenever the new
661                    // frontier isn't equal to the previous. It is only in this case that we have any
662                    // data processing to do.
663                    if prev_frontier.borrow() != input.frontier().frontier() {
664                        // There are two cases to handle with some care:
665                        //
666                        // 1. If any held capabilities are not in advance of the new input frontier,
667                        //    we must carve out updates now in advance of the new input frontier and
668                        //    transmit them as batches, which requires appropriate *single* capabilites;
669                        //    Until timely dataflow supports multiple capabilities on messages, at least.
670                        //
671                        // 2. If there are no held capabilities in advance of the new input frontier,
672                        //    then there are no updates not in advance of the new input frontier and
673                        //    we can simply create an empty input batch with the new upper frontier
674                        //    and feed this to the trace agent (but not along the timely output).
675
676                        // If there is at least one capability not in advance of the input frontier ...
677                        if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
678
679                            let mut upper = Antichain::new();   // re-used allocation for sealing batches.
680
681                            // For each capability not in advance of the input frontier ...
682                            for (index, capability) in capabilities.elements().iter().enumerate() {
683
684                                if !input.frontier().less_equal(capability.time()) {
685
686                                    // Assemble the upper bound on times we can commit with this capabilities.
687                                    // We must respect the input frontier, and *subsequent* capabilities, as
688                                    // we are pretending to retire the capability changes one by one.
689                                    upper.clear();
690                                    for time in input.frontier().frontier().iter() {
691                                        upper.insert(time.clone());
692                                    }
693                                    for other_capability in &capabilities.elements()[(index + 1) .. ] {
694                                        upper.insert(other_capability.time().clone());
695                                    }
696
697                                    // Extract updates not in advance of `upper`.
698                                    let batch = batcher.seal::<Tr::Builder>(upper.clone());
699
700                                    writer.insert(batch.clone(), Some(capability.time().clone()));
701
702                                    // send the batch to downstream consumers, empty or not.
703                                    output.session(&capabilities.elements()[index]).give(batch);
704                                }
705                            }
706
707                            // Having extracted and sent batches between each capability and the input frontier,
708                            // we should downgrade all capabilities to match the batcher's lower update frontier.
709                            // This may involve discarding capabilities, which is fine as any new updates arrive
710                            // in messages with new capabilities.
711
712                            let mut new_capabilities = Antichain::new();
713                            for time in batcher.frontier().iter() {
714                                if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
715                                    new_capabilities.insert(capability.delayed(time));
716                                }
717                                else {
718                                    panic!("failed to find capability");
719                                }
720                            }
721
722                            capabilities = new_capabilities;
723                        }
724                        else {
725                            // Announce progress updates, even without data.
726                            let _batch = batcher.seal::<Tr::Builder>(input.frontier().frontier().to_owned());
727                            writer.seal(input.frontier().frontier().to_owned());
728                        }
729
730                        prev_frontier.clear();
731                        prev_frontier.extend(input.frontier().frontier().iter().cloned());
732                    }
733
734                    writer.exert();
735                }
736            })
737        };
738
739        Arranged { stream, trace: reader.unwrap() }
740    }
741}
742
743impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, K, (), R> for Collection<G, K, R>
744where
745    G::Timestamp: Lattice+Ord,
746{
747    fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
748    where
749        P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
750        Tr: Trace<Time=G::Timestamp>+'static,
751        Tr::Batch: Batch,
752        Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
753        Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
754    {
755        self.map(|k| (k, ()))
756            .arrange_core(pact, name)
757    }
758}
759
760/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
761///
762/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
763/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
764/// pair `(u64, K)` of hash value and key.
765pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Semigroup>
766where G::Timestamp: Lattice+Ord {
767    /// Arranges a collection of `(Key, Val)` records by `Key`.
768    ///
769    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
770    /// This trace is current for all times completed by the output stream, which can be used to
771    /// safely identify the stable times and values in the trace.
772    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
773
774    /// As `arrange_by_key` but with the ability to name the arrangement.
775    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
776}
777
778impl<G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
779where
780    G::Timestamp: Lattice+Ord
781{
782    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
783        self.arrange_by_key_named("ArrangeByKey")
784    }
785
786    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
787        self.arrange_named(name)
788    }
789}
790
791/// Arranges something as `(Key, ())` pairs according to a type `T` of trace.
792///
793/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
794/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
795/// pair `(u64, K)` of hash value and key.
796pub trait ArrangeBySelf<G: Scope, K: Data+Hashable, R: Semigroup>
797where
798    G::Timestamp: Lattice+Ord
799{
800    /// Arranges a collection of `Key` records by `Key`.
801    ///
802    /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
803    /// This trace is current for all times complete in the output stream, which can be used to safely
804    /// identify the stable times and values in the trace.
805    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
806
807    /// As `arrange_by_self` but with the ability to name the arrangement.
808    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
809}
810
811
812impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
813where
814    G::Timestamp: Lattice+Ord
815{
816    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
817        self.arrange_by_self_named("ArrangeBySelf")
818    }
819
820    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
821        self.map(|k| (k, ()))
822            .arrange_named(name)
823    }
824}