differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38use trace::wrappers::filter::{TraceFilter, BatchFilter};
39
40use super::TraceAgent;
41
42/// An arranged collection of `(K,V)` values.
43///
44/// An `Arranged` allows multiple differential operators to share the resources (communication,
45/// computation, memory) required to produce and maintain an indexed representation of a collection.
46pub struct Arranged<G, Tr>
47where
48    G: Scope<Timestamp: Lattice+Ord>,
49    Tr: TraceReader+Clone,
50{
51    /// A stream containing arranged updates.
52    ///
53    /// This stream contains the same batches of updates the trace itself accepts, so there should
54    /// be no additional overhead to receiving these records. The batches can be navigated just as
55    /// the batches in the trace, by key and by value.
56    pub stream: Stream<G, Tr::Batch>,
57    /// A shared trace, updated by the `Arrange` operator and readable by others.
58    pub trace: Tr,
59    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
60    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
61}
62
63impl<G, Tr> Clone for Arranged<G, Tr>
64where
65    G: Scope<Timestamp=Tr::Time>,
66    Tr: TraceReader + Clone,
67{
68    fn clone(&self) -> Self {
69        Arranged {
70            stream: self.stream.clone(),
71            trace: self.trace.clone(),
72        }
73    }
74}
75
76use ::timely::dataflow::scopes::Child;
77use ::timely::progress::timestamp::Refines;
78use timely::Container;
79use timely::container::PushInto;
80
81impl<G, Tr> Arranged<G, Tr>
82where
83    G: Scope<Timestamp=Tr::Time>,
84    Tr: TraceReader + Clone,
85{
86    /// Brings an arranged collection into a nested scope.
87    ///
88    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
89    /// have all been extended with an additional coordinate with the default value. The resulting collection does
90    /// not vary with the new timestamp coordinate.
91    pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92        -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93        where
94            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
95    {
96        Arranged {
97            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
98            trace: TraceEnter::make_from(self.trace.clone()),
99        }
100    }
101
102    /// Brings an arranged collection into a nested region.
103    ///
104    /// This method only applies to *regions*, which are subscopes with the same timestamp
105    /// as their containing scope. In this case, the trace type does not need to change.
106    pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
107        -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
108        Arranged {
109            stream: self.stream.enter(child),
110            trace: self.trace.clone(),
111        }
112    }
113
114    /// Brings an arranged collection into a nested scope.
115    ///
116    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
117    /// have all been extended with an additional coordinate with the default value. The resulting collection does
118    /// not vary with the new timestamp coordinate.
119    pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
120        -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
121        where
122            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
123            F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
124            P: FnMut(&TInner)->Tr::Time+Clone+'static,
125        {
126        let logic1 = logic.clone();
127        let logic2 = logic.clone();
128        Arranged {
129            trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
130            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
131        }
132    }
133
134    /// Filters an arranged collection.
135    ///
136    /// This method produces a new arrangement backed by the same shared
137    /// arrangement as `self`, paired with user-specified logic that can
138    /// filter by key and value. The resulting collection is restricted
139    /// to the keys and values that return true under the user predicate.
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// use differential_dataflow::input::Input;
145    /// use differential_dataflow::operators::arrange::ArrangeByKey;
146    ///
147    /// ::timely::example(|scope| {
148    ///
149    ///     let arranged =
150    ///     scope.new_collection_from(0 .. 10).1
151    ///          .map(|x| (x, x+1))
152    ///          .arrange_by_key();
153    ///
154    ///     arranged
155    ///         .filter(|k,v| k == v)
156    ///         .as_collection(|k,v| (*k,*v))
157    ///         .assert_empty();
158    /// });
159    /// ```
160    pub fn filter<F>(&self, logic: F)
161        -> Arranged<G, TraceFilter<Tr, F>>
162        where
163            F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
164    {
165        let logic1 = logic.clone();
166        let logic2 = logic.clone();
167        Arranged {
168            trace: TraceFilter::make_from(self.trace.clone(), logic1),
169            stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
170        }
171    }
172    /// Flattens the stream into a `Collection`.
173    ///
174    /// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
175    /// and this method should only be used when the data need to be transformed or exchanged, rather than
176    /// supplied as arguments to an operator using the same key-value structure.
177    pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
178        where
179            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
180    {
181        self.flat_map_ref(move |key, val| Some(logic(key,val)))
182    }
183
184    /// Extracts elements from an arrangement as a collection.
185    ///
186    /// The supplied logic may produce an iterator over output values, allowing either
187    /// filtering or flat mapping as part of the extraction.
188    pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
189        where
190            I: IntoIterator<Item: Data>,
191            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
192    {
193        Self::flat_map_batches(&self.stream, logic)
194    }
195
196    /// Extracts elements from a stream of batches as a collection.
197    ///
198    /// The supplied logic may produce an iterator over output values, allowing either
199    /// filtering or flat mapping as part of the extraction.
200    ///
201    /// This method exists for streams of batches without the corresponding arrangement.
202    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
203    pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
204    where
205        I: IntoIterator<Item: Data>,
206        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
207    {
208        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
209            input.for_each(|time, data| {
210                let mut session = output.session(&time);
211                for wrapper in data.iter() {
212                    let batch = &wrapper;
213                    let mut cursor = batch.cursor();
214                    while let Some(key) = cursor.get_key(batch) {
215                        while let Some(val) = cursor.get_val(batch) {
216                            for datum in logic(key, val) {
217                                cursor.map_times(batch, |time, diff| {
218                                    session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
219                                });
220                            }
221                            cursor.step_val(batch);
222                        }
223                        cursor.step_key(batch);
224                    }
225                }
226            });
227        })
228        .as_collection()
229    }
230}
231
232
233use crate::difference::Multiply;
234// Direct join implementations.
235impl<G, T1> Arranged<G, T1>
236where
237    G: Scope<Timestamp=T1::Time>,
238    T1: TraceReader + Clone + 'static,
239{
240    /// A direct implementation of the `JoinCore::join_core` method.
241    pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
242    where
243        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
244        T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
245        I: IntoIterator<Item: Data>,
246        L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
247    {
248        let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
249            let t = t.clone();
250            let r = (r1.clone()).multiply(r2);
251            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
252        };
253        self.join_core_internal_unsafe(other, result)
254    }
255    /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
256    pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
257    where
258        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
259        D: Data,
260        ROut: Semigroup+'static,
261        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
262        L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
263    {
264        use crate::operators::join::join_traces;
265        join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
266            self,
267            other,
268            move |k, v1, v2, t, d1, d2, c| {
269                for datum in result(k, v1, v2, t, d1, d2) {
270                    c.give(datum);
271                }
272            }
273        )
274            .as_collection()
275    }
276}
277
278// Direct reduce implementations.
279use crate::difference::Abelian;
280impl<G, T1> Arranged<G, T1>
281where
282    G: Scope<Timestamp = T1::Time>,
283    T1: TraceReader + Clone + 'static,
284{
285    /// A direct implementation of `ReduceCore::reduce_abelian`.
286    pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
287    where
288        T1: TraceReader<KeyOwn: Ord>,
289        T2: for<'a> Trace<
290            Key<'a>= T1::Key<'a>,
291            KeyOwn=T1::KeyOwn,
292            ValOwn: Data,
293            Time=T1::Time,
294            Diff: Abelian,
295        >+'static,
296        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
297        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
298    {
299        self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
300            if !input.is_empty() {
301                logic(key, input, change);
302            }
303            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
304            crate::consolidation::consolidate(change);
305        })
306    }
307
308    /// A direct implementation of `ReduceCore::reduce_core`.
309    pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
310    where
311        T1: TraceReader<KeyOwn: Ord>,
312        T2: for<'a> Trace<
313            Key<'a>=T1::Key<'a>,
314            KeyOwn=T1::KeyOwn,
315            ValOwn: Data,
316            Time=T1::Time,
317        >+'static,
318        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
320    {
321        use crate::operators::reduce::reduce_trace;
322        reduce_trace::<_,_,Bu,_,_>(self, name, logic)
323    }
324}
325
326
327impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
328where
329    G: Scope<Timestamp=Tr::Time>,
330    Tr: TraceReader + Clone,
331{
332    /// Brings an arranged collection out of a nested region.
333    ///
334    /// This method only applies to *regions*, which are subscopes with the same timestamp
335    /// as their containing scope. In this case, the trace type does not need to change.
336    pub fn leave_region(&self) -> Arranged<G, Tr> {
337        use timely::dataflow::operators::Leave;
338        Arranged {
339            stream: self.stream.leave(),
340            trace: self.trace.clone(),
341        }
342    }
343}
344
345/// A type that can be arranged as if a collection of updates.
346pub trait Arrange<G, C>
347where
348    G: Scope<Timestamp: Lattice>,
349{
350    /// Arranges updates into a shared trace.
351    fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
352    where
353        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
354        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
355        Tr: Trace<Time=G::Timestamp> + 'static,
356    {
357        self.arrange_named::<Ba, Bu, Tr>("Arrange")
358    }
359
360    /// Arranges updates into a shared trace, with a supplied name.
361    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
362    where
363        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
364        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
365        Tr: Trace<Time=G::Timestamp> + 'static,
366    ;
367}
368
369impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
370where
371    G: Scope<Timestamp: Lattice>,
372    K: ExchangeData + Hashable,
373    V: ExchangeData,
374    R: ExchangeData + Semigroup,
375{
376    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
377    where
378        Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
379        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
380        Tr: Trace<Time=G::Timestamp> + 'static,
381    {
382        let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
383        arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
384    }
385}
386
387/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
388///
389/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
390/// It uses the supplied parallelization contract to distribute the data, which does not need to
391/// be consistently by key (though this is the most common).
392pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
393where
394    G: Scope<Timestamp: Lattice>,
395    P: ParallelizationContract<G::Timestamp, Ba::Input>,
396    Ba: Batcher<Time=G::Timestamp,Input: Container + Clone + 'static> + 'static,
397    Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
398    Tr: Trace<Time=G::Timestamp>+'static,
399{
400    // The `Arrange` operator is tasked with reacting to an advancing input
401    // frontier by producing the sequence of batches whose lower and upper
402    // bounds are those frontiers, containing updates at times greater or
403    // equal to lower and not greater or equal to upper.
404    //
405    // The operator uses its batch type's `Batcher`, which accepts update
406    // triples and responds to requests to "seal" batches (presented as new
407    // upper frontiers).
408    //
409    // Each sealed batch is presented to the trace, and if at all possible
410    // transmitted along the outgoing channel. Empty batches may not have
411    // a corresponding capability, as they are only retained for actual data
412    // held by the batcher, which may prevents the operator from sending an
413    // empty batch.
414
415    let mut reader: Option<TraceAgent<Tr>> = None;
416
417    // fabricate a data-parallel operator using the `unary_notify` pattern.
418    let reader_ref = &mut reader;
419    let scope = stream.scope();
420
421    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
422
423        // Acquire a logger for arrange events.
424        let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
425
426        // Where we will deposit received updates, and from which we extract batches.
427        let mut batcher = Ba::new(logger.clone(), info.global_id);
428
429        // Capabilities for the lower envelope of updates in `batcher`.
430        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
431
432        let activator = Some(scope.activator_for(info.address.clone()));
433        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
434        // If there is default exertion logic set, install it.
435        if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
436            empty_trace.set_exert_logic(exert_logic);
437        }
438
439        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
440
441        *reader_ref = Some(reader_local);
442
443        // Initialize to the minimal input frontier.
444        let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
445
446        move |input, output| {
447
448            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
449            // We don't have to keep all capabilities, but we need to be able to form output messages
450            // when we realize that time intervals are complete.
451
452            input.for_each(|cap, data| {
453                capabilities.insert(cap.retain());
454                batcher.push_container(data);
455            });
456
457            // The frontier may have advanced by multiple elements, which is an issue because
458            // timely dataflow currently only allows one capability per message. This means we
459            // must pretend to process the frontier advances one element at a time, batching
460            // and sending smaller bites than we might have otherwise done.
461
462            // Assert that the frontier never regresses.
463            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
464
465            // Test to see if strict progress has occurred, which happens whenever the new
466            // frontier isn't equal to the previous. It is only in this case that we have any
467            // data processing to do.
468            if prev_frontier.borrow() != input.frontier().frontier() {
469                // There are two cases to handle with some care:
470                //
471                // 1. If any held capabilities are not in advance of the new input frontier,
472                //    we must carve out updates now in advance of the new input frontier and
473                //    transmit them as batches, which requires appropriate *single* capabilities;
474                //    Until timely dataflow supports multiple capabilities on messages, at least.
475                //
476                // 2. If there are no held capabilities in advance of the new input frontier,
477                //    then there are no updates not in advance of the new input frontier and
478                //    we can simply create an empty input batch with the new upper frontier
479                //    and feed this to the trace agent (but not along the timely output).
480
481                // If there is at least one capability not in advance of the input frontier ...
482                if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
483
484                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
485
486                    // For each capability not in advance of the input frontier ...
487                    for (index, capability) in capabilities.elements().iter().enumerate() {
488
489                        if !input.frontier().less_equal(capability.time()) {
490
491                            // Assemble the upper bound on times we can commit with this capabilities.
492                            // We must respect the input frontier, and *subsequent* capabilities, as
493                            // we are pretending to retire the capability changes one by one.
494                            upper.clear();
495                            for time in input.frontier().frontier().iter() {
496                                upper.insert(time.clone());
497                            }
498                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
499                                upper.insert(other_capability.time().clone());
500                            }
501
502                            // Extract updates not in advance of `upper`.
503                            let batch = batcher.seal::<Bu>(upper.clone());
504
505                            writer.insert(batch.clone(), Some(capability.time().clone()));
506
507                            // send the batch to downstream consumers, empty or not.
508                            output.session(&capabilities.elements()[index]).give(batch);
509                        }
510                    }
511
512                    // Having extracted and sent batches between each capability and the input frontier,
513                    // we should downgrade all capabilities to match the batcher's lower update frontier.
514                    // This may involve discarding capabilities, which is fine as any new updates arrive
515                    // in messages with new capabilities.
516
517                    let mut new_capabilities = Antichain::new();
518                    for time in batcher.frontier().iter() {
519                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
520                            new_capabilities.insert(capability.delayed(time));
521                        }
522                        else {
523                            panic!("failed to find capability");
524                        }
525                    }
526
527                    capabilities = new_capabilities;
528                }
529                else {
530                    // Announce progress updates, even without data.
531                    let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
532                    writer.seal(input.frontier().frontier().to_owned());
533                }
534
535                prev_frontier.clear();
536                prev_frontier.extend(input.frontier().frontier().iter().cloned());
537            }
538
539            writer.exert();
540        }
541    });
542
543    Arranged { stream, trace: reader.unwrap() }
544}
545
546impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
547where
548    G: Scope<Timestamp: Lattice+Ord>,
549{
550    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
551    where
552        Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
553        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
554        Tr: Trace<Time=G::Timestamp> + 'static,
555    {
556        let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
557        arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
558    }
559}
560
561/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
562///
563/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
564/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
565/// pair `(u64, K)` of hash value and key.
566pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Ord+Semigroup+'static>
567where
568    G: Scope<Timestamp: Lattice+Ord>,
569{
570    /// Arranges a collection of `(Key, Val)` records by `Key`.
571    ///
572    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
573    /// This trace is current for all times completed by the output stream, which can be used to
574    /// safely identify the stable times and values in the trace.
575    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
576
577    /// As `arrange_by_key` but with the ability to name the arrangement.
578    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
579}
580
581impl<G, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
582where
583    G: Scope<Timestamp: Lattice+Ord>,
584{
585    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
586        self.arrange_by_key_named("ArrangeByKey")
587    }
588
589    fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
590        self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
591    }
592}
593
594/// Arranges something as `(Key, ())` pairs according to a type `T` of trace.
595///
596/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
597/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
598/// pair `(u64, K)` of hash value and key.
599pub trait ArrangeBySelf<G, K: Data+Hashable, R: Ord+Semigroup+'static>
600where
601    G: Scope<Timestamp: Lattice+Ord>,
602{
603    /// Arranges a collection of `Key` records by `Key`.
604    ///
605    /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
606    /// This trace is current for all times complete in the output stream, which can be used to safely
607    /// identify the stable times and values in the trace.
608    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
609
610    /// As `arrange_by_self` but with the ability to name the arrangement.
611    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
612}
613
614
615impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
616where
617    G: Scope<Timestamp: Lattice+Ord>,
618{
619    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
620        self.arrange_by_self_named("ArrangeBySelf")
621    }
622
623    fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
624        self.map(|k| (k, ()))
625            .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
626    }
627}