Skip to main content

palimpsest_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
21use timely::dataflow::operators::generic::Operator;
22use timely::dataflow::operators::Capability;
23use timely::dataflow::operators::{Enter, Map};
24use timely::dataflow::{Scope, Stream, StreamCore};
25use timely::order::PartialOrder;
26use timely::progress::Antichain;
27use timely::progress::Timestamp;
28
29use crate::difference::Semigroup;
30use crate::lattice::Lattice;
31use crate::trace::implementations::merge_batcher::container::MergerChunk;
32use crate::trace::implementations::{
33    KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine,
34};
35use crate::trace::{self, BatchReader, Batcher, Builder, Cursor, Trace, TraceReader};
36use crate::{AsCollection, Data, ExchangeData, Hashable, VecCollection};
37
38use trace::wrappers::enter::{BatchEnter, TraceEnter};
39use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
40use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
41
42use super::TraceAgent;
43
44/// An arranged collection of `(K,V)` values.
45///
46/// An `Arranged` allows multiple differential operators to share the resources (communication,
47/// computation, memory) required to produce and maintain an indexed representation of a collection.
48pub struct Arranged<G, Tr>
49where
50    G: Scope<Timestamp: Lattice + Ord>,
51    Tr: TraceReader + Clone,
52{
53    /// A stream containing arranged updates.
54    ///
55    /// This stream contains the same batches of updates the trace itself accepts, so there should
56    /// be no additional overhead to receiving these records. The batches can be navigated just as
57    /// the batches in the trace, by key and by value.
58    pub stream: Stream<G, Tr::Batch>,
59    /// A shared trace, updated by the `Arrange` operator and readable by others.
60    pub trace: Tr,
61    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
62    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
63}
64
65impl<G, Tr> Clone for Arranged<G, Tr>
66where
67    G: Scope<Timestamp = Tr::Time>,
68    Tr: TraceReader + Clone,
69{
70    fn clone(&self) -> Self {
71        Arranged {
72            stream: self.stream.clone(),
73            trace: self.trace.clone(),
74        }
75    }
76}
77
78use ::timely::dataflow::scopes::Child;
79use ::timely::progress::timestamp::Refines;
80use timely::container::PushInto;
81use timely::Container;
82
83impl<G, Tr> Arranged<G, Tr>
84where
85    G: Scope<Timestamp = Tr::Time>,
86    Tr: TraceReader + Clone,
87{
88    /// Brings an arranged collection into a nested scope.
89    ///
90    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
91    /// have all been extended with an additional coordinate with the default value. The resulting collection does
92    /// not vary with the new timestamp coordinate.
93    pub fn enter<'a, TInner>(
94        &self,
95        child: &Child<'a, G, TInner>,
96    ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
97    where
98        TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone,
99    {
100        Arranged {
101            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
102            trace: TraceEnter::make_from(self.trace.clone()),
103        }
104    }
105
106    /// Brings an arranged collection into a nested region.
107    ///
108    /// This method only applies to *regions*, which are subscopes with the same timestamp
109    /// as their containing scope. In this case, the trace type does not need to change.
110    pub fn enter_region<'a>(
111        &self,
112        child: &Child<'a, G, G::Timestamp>,
113    ) -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
114        Arranged {
115            stream: self.stream.enter(child),
116            trace: self.trace.clone(),
117        }
118    }
119
120    /// Brings an arranged collection into a nested scope.
121    ///
122    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
123    /// have all been extended with an additional coordinate with the default value. The resulting collection does
124    /// not vary with the new timestamp coordinate.
125    pub fn enter_at<'a, TInner, F, P>(
126        &self,
127        child: &Child<'a, G, TInner>,
128        logic: F,
129        prior: P,
130    ) -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
131    where
132        TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone + 'static,
133        F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>) -> TInner + Clone + 'static,
134        P: FnMut(&TInner) -> Tr::Time + Clone + 'static,
135    {
136        let logic1 = logic.clone();
137        let logic2 = logic.clone();
138        Arranged {
139            trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
140            stream: self
141                .stream
142                .enter(child)
143                .map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
144        }
145    }
146
147    /// Flattens the stream into a `Collection`.
148    ///
149    /// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
150    /// and this method should only be used when the data need to be transformed or exchanged, rather than
151    /// supplied as arguments to an operator using the same key-value structure.
152    pub fn as_collection<D: Data, L>(&self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
153    where
154        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
155    {
156        self.flat_map_ref(move |key, val| Some(logic(key, val)))
157    }
158
159    /// Extracts elements from an arrangement as a collection.
160    ///
161    /// The supplied logic may produce an iterator over output values, allowing either
162    /// filtering or flat mapping as part of the extraction.
163    pub fn flat_map_ref<I, L>(&self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
164    where
165        I: IntoIterator<Item: Data>,
166        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,
167    {
168        Self::flat_map_batches(&self.stream, logic)
169    }
170
171    /// Extracts elements from a stream of batches as a collection.
172    ///
173    /// The supplied logic may produce an iterator over output values, allowing either
174    /// filtering or flat mapping as part of the extraction.
175    ///
176    /// This method exists for streams of batches without the corresponding arrangement.
177    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
178    pub fn flat_map_batches<I, L>(
179        stream: &Stream<G, Tr::Batch>,
180        mut logic: L,
181    ) -> VecCollection<G, I::Item, Tr::Diff>
182    where
183        I: IntoIterator<Item: Data>,
184        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,
185    {
186        stream
187            .unary(Pipeline, "AsCollection", move |_, _| {
188                move |input, output| {
189                    input.for_each(|time, data| {
190                        let mut session = output.session(&time);
191                        for wrapper in data.iter() {
192                            let batch = &wrapper;
193                            let mut cursor = batch.cursor();
194                            while let Some(key) = cursor.get_key(batch) {
195                                while let Some(val) = cursor.get_val(batch) {
196                                    for datum in logic(key, val) {
197                                        cursor.map_times(batch, |time, diff| {
198                                            session.give((
199                                                datum.clone(),
200                                                Tr::owned_time(time),
201                                                Tr::owned_diff(diff),
202                                            ));
203                                        });
204                                    }
205                                    cursor.step_val(batch);
206                                }
207                                cursor.step_key(batch);
208                            }
209                        }
210                    });
211                }
212            })
213            .as_collection()
214    }
215}
216
217use crate::difference::Multiply;
218// Direct join implementations.
219impl<G, T1> Arranged<G, T1>
220where
221    G: Scope<Timestamp = T1::Time>,
222    T1: TraceReader + Clone + 'static,
223{
224    /// A direct implementation of the `JoinCore::join_core` method.
225    pub fn join_core<T2, I, L>(
226        &self,
227        other: &Arranged<G, T2>,
228        mut result: L,
229    ) -> VecCollection<G, I::Item, <T1::Diff as Multiply<T2::Diff>>::Output>
230    where
231        T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
232        T1::Diff: Multiply<T2::Diff, Output: Semigroup + 'static>,
233        I: IntoIterator<Item: Data>,
234        L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>) -> I + 'static,
235    {
236        let result = move |k: T1::Key<'_>,
237                           v1: T1::Val<'_>,
238                           v2: T2::Val<'_>,
239                           t: &G::Timestamp,
240                           r1: &T1::Diff,
241                           r2: &T2::Diff| {
242            let t = t.clone();
243            let r = (r1.clone()).multiply(r2);
244            result(k, v1, v2)
245                .into_iter()
246                .map(move |d| (d, t.clone(), r.clone()))
247        };
248        self.join_core_internal_unsafe(other, result)
249    }
250    /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
251    pub fn join_core_internal_unsafe<T2, I, L, D, ROut>(
252        &self,
253        other: &Arranged<G, T2>,
254        mut result: L,
255    ) -> VecCollection<G, D, ROut>
256    where
257        T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
258        D: Data,
259        ROut: Semigroup + 'static,
260        I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
261        L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>, &G::Timestamp, &T1::Diff, &T2::Diff) -> I
262            + 'static,
263    {
264        use crate::operators::join::join_traces;
265        join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
266            self,
267            other,
268            move |k, v1, v2, t, d1, d2, c| {
269                for datum in result(k, v1, v2, t, d1, d2) {
270                    c.give(datum);
271                }
272            },
273        )
274        .as_collection()
275    }
276}
277
278// Direct reduce implementations.
279use crate::difference::Abelian;
280impl<G, T1> Arranged<G, T1>
281where
282    G: Scope<Timestamp = T1::Time>,
283    T1: TraceReader + Clone + 'static,
284{
285    /// A direct implementation of `ReduceCore::reduce_abelian`.
286    pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
287    where
288        T1: TraceReader<KeyOwn: Ord>,
289        T2: for<'a> Trace<
290                Key<'a> = T1::Key<'a>,
291                KeyOwn = T1::KeyOwn,
292                ValOwn: Data,
293                Time = T1::Time,
294                Diff: Abelian,
295            > + 'static,
296        Bu: Builder<
297            Time = G::Timestamp,
298            Output = T2::Batch,
299            Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
300        >,
301        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
302            + 'static,
303    {
304        self.reduce_core::<_, Bu, T2>(name, move |key, input, output, change| {
305            if !input.is_empty() {
306                logic(key, input, change);
307            }
308            change.extend(output.drain(..).map(|(x, mut d)| {
309                d.negate();
310                (x, d)
311            }));
312            crate::consolidation::consolidate(change);
313        })
314    }
315
316    /// A direct implementation of `ReduceCore::reduce_core`.
317    pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
318    where
319        T1: TraceReader<KeyOwn: Ord>,
320        T2: for<'a> Trace<Key<'a> = T1::Key<'a>, KeyOwn = T1::KeyOwn, ValOwn: Data, Time = T1::Time>
321            + 'static,
322        Bu: Builder<
323            Time = G::Timestamp,
324            Output = T2::Batch,
325            Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
326        >,
327        L: FnMut(
328                T1::Key<'_>,
329                &[(T1::Val<'_>, T1::Diff)],
330                &mut Vec<(T2::ValOwn, T2::Diff)>,
331                &mut Vec<(T2::ValOwn, T2::Diff)>,
332            ) + 'static,
333    {
334        use crate::operators::reduce::reduce_trace;
335        reduce_trace::<_, _, Bu, _, _>(self, name, logic)
336    }
337}
338
339impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
340where
341    G: Scope<Timestamp = Tr::Time>,
342    Tr: TraceReader + Clone,
343{
344    /// Brings an arranged collection out of a nested region.
345    ///
346    /// This method only applies to *regions*, which are subscopes with the same timestamp
347    /// as their containing scope. In this case, the trace type does not need to change.
348    pub fn leave_region(&self) -> Arranged<G, Tr> {
349        use timely::dataflow::operators::Leave;
350        Arranged {
351            stream: self.stream.leave(),
352            trace: self.trace.clone(),
353        }
354    }
355}
356
357/// A type that can be arranged as if a collection of updates.
358pub trait Arrange<G, C>
359where
360    G: Scope<Timestamp: Lattice>,
361{
362    /// Arranges updates into a shared trace.
363    fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
364    where
365        Ba: Batcher<Input = C, Time = G::Timestamp> + 'static,
366        Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
367        Tr: Trace<Time = G::Timestamp> + 'static,
368    {
369        self.arrange_named::<Ba, Bu, Tr>("Arrange")
370    }
371
372    /// Arranges updates into a shared trace, with a supplied name.
373    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
374    where
375        Ba: Batcher<Input = C, Time = G::Timestamp> + 'static,
376        Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
377        Tr: Trace<Time = G::Timestamp> + 'static;
378}
379
380impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for VecCollection<G, (K, V), R>
381where
382    G: Scope<Timestamp: Lattice>,
383    K: ExchangeData + Hashable,
384    V: ExchangeData,
385    R: ExchangeData + Semigroup,
386{
387    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
388    where
389        Ba: Batcher<Input = Vec<((K, V), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
390        Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
391        Tr: Trace<Time = G::Timestamp> + 'static,
392    {
393        let exchange =
394            Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
395        arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
396    }
397}
398
399/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
400///
401/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
402/// It uses the supplied parallelization contract to distribute the data, which does not need to
403/// be consistently by key (though this is the most common).
404pub fn arrange_core<G, P, Ba, Bu, Tr>(
405    stream: &StreamCore<G, Ba::Input>,
406    pact: P,
407    name: &str,
408) -> Arranged<G, TraceAgent<Tr>>
409where
410    G: Scope<Timestamp: Lattice>,
411    P: ParallelizationContract<G::Timestamp, Ba::Input>,
412    Ba: Batcher<Time = G::Timestamp, Input: Container> + 'static,
413    Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
414    Tr: Trace<Time = G::Timestamp> + 'static,
415{
416    // The `Arrange` operator is tasked with reacting to an advancing input
417    // frontier by producing the sequence of batches whose lower and upper
418    // bounds are those frontiers, containing updates at times greater or
419    // equal to lower and not greater or equal to upper.
420    //
421    // The operator uses its batch type's `Batcher`, which accepts update
422    // triples and responds to requests to "seal" batches (presented as new
423    // upper frontiers).
424    //
425    // Each sealed batch is presented to the trace, and if at all possible
426    // transmitted along the outgoing channel. Empty batches may not have
427    // a corresponding capability, as they are only retained for actual data
428    // held by the batcher, which may prevents the operator from sending an
429    // empty batch.
430
431    let mut reader: Option<TraceAgent<Tr>> = None;
432
433    // fabricate a data-parallel operator using the `unary_notify` pattern.
434    let reader_ref = &mut reader;
435    let scope = stream.scope();
436
437    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
438        // Acquire a logger for arrange events.
439        let logger = scope
440            .logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange")
441            .map(Into::into);
442
443        // Where we will deposit received updates, and from which we extract batches.
444        let mut batcher = Ba::new(logger.clone(), info.global_id);
445
446        // Capabilities for the lower envelope of updates in `batcher`.
447        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
448
449        let activator = Some(scope.activator_for(info.address.clone()));
450        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
451        // If there is default exertion logic set, install it.
452        if let Some(exert_logic) = scope
453            .config()
454            .get::<trace::ExertionLogic>("differential/default_exert_logic")
455            .cloned()
456        {
457            empty_trace.set_exert_logic(exert_logic);
458        }
459
460        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
461
462        *reader_ref = Some(reader_local);
463
464        // Initialize to the minimal input frontier.
465        let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
466
467        move |(input, frontier), output| {
468            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
469            // We don't have to keep all capabilities, but we need to be able to form output messages
470            // when we realize that time intervals are complete.
471
472            input.for_each(|cap, data| {
473                capabilities.insert(cap.retain());
474                batcher.push_container(data);
475            });
476
477            // The frontier may have advanced by multiple elements, which is an issue because
478            // timely dataflow currently only allows one capability per message. This means we
479            // must pretend to process the frontier advances one element at a time, batching
480            // and sending smaller bites than we might have otherwise done.
481
482            // Assert that the frontier never regresses.
483            assert!(PartialOrder::less_equal(
484                &prev_frontier.borrow(),
485                &frontier.frontier()
486            ));
487
488            // Test to see if strict progress has occurred, which happens whenever the new
489            // frontier isn't equal to the previous. It is only in this case that we have any
490            // data processing to do.
491            if prev_frontier.borrow() != frontier.frontier() {
492                // There are two cases to handle with some care:
493                //
494                // 1. If any held capabilities are not in advance of the new input frontier,
495                //    we must carve out updates now in advance of the new input frontier and
496                //    transmit them as batches, which requires appropriate *single* capabilities;
497                //    Until timely dataflow supports multiple capabilities on messages, at least.
498                //
499                // 2. If there are no held capabilities in advance of the new input frontier,
500                //    then there are no updates not in advance of the new input frontier and
501                //    we can simply create an empty input batch with the new upper frontier
502                //    and feed this to the trace agent (but not along the timely output).
503
504                // If there is at least one capability not in advance of the input frontier ...
505                if capabilities
506                    .elements()
507                    .iter()
508                    .any(|c| !frontier.less_equal(c.time()))
509                {
510                    let mut upper = Antichain::new(); // re-used allocation for sealing batches.
511
512                    // For each capability not in advance of the input frontier ...
513                    for (index, capability) in capabilities.elements().iter().enumerate() {
514                        if !frontier.less_equal(capability.time()) {
515                            // Assemble the upper bound on times we can commit with this capabilities.
516                            // We must respect the input frontier, and *subsequent* capabilities, as
517                            // we are pretending to retire the capability changes one by one.
518                            upper.clear();
519                            for time in frontier.frontier().iter() {
520                                upper.insert(time.clone());
521                            }
522                            for other_capability in &capabilities.elements()[(index + 1)..] {
523                                upper.insert(other_capability.time().clone());
524                            }
525
526                            // Extract updates not in advance of `upper`.
527                            let batch = batcher.seal::<Bu>(upper.clone());
528
529                            writer.insert(batch.clone(), Some(capability.time().clone()));
530
531                            // send the batch to downstream consumers, empty or not.
532                            output.session(&capabilities.elements()[index]).give(batch);
533                        }
534                    }
535
536                    // Having extracted and sent batches between each capability and the input frontier,
537                    // we should downgrade all capabilities to match the batcher's lower update frontier.
538                    // This may involve discarding capabilities, which is fine as any new updates arrive
539                    // in messages with new capabilities.
540
541                    let mut new_capabilities = Antichain::new();
542                    for time in batcher.frontier().iter() {
543                        if let Some(capability) = capabilities
544                            .elements()
545                            .iter()
546                            .find(|c| c.time().less_equal(time))
547                        {
548                            new_capabilities.insert(capability.delayed(time));
549                        } else {
550                            panic!("failed to find capability");
551                        }
552                    }
553
554                    capabilities = new_capabilities;
555                } else {
556                    // Announce progress updates, even without data.
557                    let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
558                    writer.seal(frontier.frontier().to_owned());
559                }
560
561                prev_frontier.clear();
562                prev_frontier.extend(frontier.frontier().iter().cloned());
563            }
564
565            writer.exert();
566        }
567    });
568
569    Arranged {
570        stream,
571        trace: reader.unwrap(),
572    }
573}
574
575impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup>
576    Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for VecCollection<G, K, R>
577where
578    G: Scope<Timestamp: Lattice + Ord>,
579{
580    fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
581    where
582        Ba: Batcher<Input = Vec<((K, ()), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
583        Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
584        Tr: Trace<Time = G::Timestamp> + 'static,
585    {
586        let exchange =
587            Exchange::new(move |update: &((K, ()), G::Timestamp, R)| (update.0).0.hashed().into());
588        arrange_core::<_, _, Ba, Bu, _>(&self.map(|k| (k, ())).inner, exchange, name)
589    }
590}
591
592/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace.
593///
594/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
595/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
596/// pair `(u64, K)` of hash value and key.
597pub trait ArrangeByKey<G: Scope, K: Data + Hashable, V: Data, R: Ord + Semigroup + 'static>
598where
599    G: Scope<Timestamp: Lattice + Ord>,
600{
601    /// Arranges a collection of `(Key, Val)` records by `Key`.
602    ///
603    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
604    /// This trace is current for all times completed by the output stream, which can be used to
605    /// safely identify the stable times and values in the trace.
606    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
607
608    /// As `arrange_by_key` but with the ability to name the arrangement.
609    fn arrange_by_key_named(
610        &self,
611        name: &str,
612    ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
613}
614
615impl<G, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup>
616    ArrangeByKey<G, K, V, R> for VecCollection<G, (K, V), R>
617where
618    G: Scope<Timestamp: Lattice + Ord>,
619{
620    fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
621        self.arrange_by_key_named("ArrangeByKey")
622    }
623
624    fn arrange_by_key_named(
625        &self,
626        name: &str,
627    ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
628        self.arrange_named::<ValBatcher<_, _, _, _>, ValBuilder<_, _, _, _>, _>(name)
629    }
630}
631
632/// Arranges something as `(Key, ())` pairs according to a type `T` of trace.
633///
634/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed
635/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the
636/// pair `(u64, K)` of hash value and key.
637pub trait ArrangeBySelf<G, K: Data + Hashable, R: Ord + Semigroup + 'static>
638where
639    G: Scope<Timestamp: Lattice + Ord>,
640{
641    /// Arranges a collection of `Key` records by `Key`.
642    ///
643    /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
644    /// This trace is current for all times complete in the output stream, which can be used to safely
645    /// identify the stable times and values in the trace.
646    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
647
648    /// As `arrange_by_self` but with the ability to name the arrangement.
649    fn arrange_by_self_named(
650        &self,
651        name: &str,
652    ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
653}
654
655impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ArrangeBySelf<G, K, R>
656    for VecCollection<G, K, R>
657where
658    G: Scope<Timestamp: Lattice + Ord>,
659{
660    fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
661        self.arrange_by_self_named("ArrangeBySelf")
662    }
663
664    fn arrange_by_self_named(
665        &self,
666        name: &str,
667    ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
668        self.map(|k| (k, ()))
669            .arrange_named::<KeyBatcher<_, _, _>, KeyBuilder<_, _, _>, _>(name)
670    }
671}