Skip to main content

differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, VecCollection, AsCollection};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::merge_batcher::container::MergerChunk;
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38
39use super::TraceAgent;
40
41/// An arranged collection of `(K,V)` values.
42///
43/// An `Arranged` allows multiple differential operators to share the resources (communication,
44/// computation, memory) required to produce and maintain an indexed representation of a collection.
45pub struct Arranged<G, Tr>
46where
47    G: Scope<Timestamp: Lattice+Ord>,
48    Tr: TraceReader+Clone,
49{
50    /// A stream containing arranged updates.
51    ///
52    /// This stream contains the same batches of updates the trace itself accepts, so there should
53    /// be no additional overhead to receiving these records. The batches can be navigated just as
54    /// the batches in the trace, by key and by value.
55    pub stream: Stream<G, Vec<Tr::Batch>>,
56    /// A shared trace, updated by the `Arrange` operator and readable by others.
57    pub trace: Tr,
58    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
59    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
60}
61
62impl<G, Tr> Clone for Arranged<G, Tr>
63where
64    G: Scope<Timestamp=Tr::Time>,
65    Tr: TraceReader + Clone,
66{
67    fn clone(&self) -> Self {
68        Arranged {
69            stream: self.stream.clone(),
70            trace: self.trace.clone(),
71        }
72    }
73}
74
75use ::timely::dataflow::scopes::Child;
76use ::timely::progress::timestamp::Refines;
77use timely::Container;
78use timely::container::PushInto;
79
80impl<G, Tr> Arranged<G, Tr>
81where
82    G: Scope<Timestamp=Tr::Time>,
83    Tr: TraceReader + Clone,
84{
85    /// Brings an arranged collection into a nested scope.
86    ///
87    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
88    /// have all been extended with an additional coordinate with the default value. The resulting collection does
89    /// not vary with the new timestamp coordinate.
90    pub fn enter<'a, TInner>(self, child: &Child<'a, G, TInner>)
91        -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
92        where
93            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
94    {
95        Arranged {
96            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
97            trace: TraceEnter::make_from(self.trace),
98        }
99    }
100
101    /// Brings an arranged collection into a nested region.
102    ///
103    /// This method only applies to *regions*, which are subscopes with the same timestamp
104    /// as their containing scope. In this case, the trace type does not need to change.
105    pub fn enter_region<'a>(self, child: &Child<'a, G, G::Timestamp>)
106        -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
107        Arranged {
108            stream: self.stream.enter(child),
109            trace: self.trace,
110        }
111    }
112
113    /// Brings an arranged collection into a nested scope.
114    ///
115    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
116    /// have all been extended with an additional coordinate with the default value. The resulting collection does
117    /// not vary with the new timestamp coordinate.
118    pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G, TInner>, logic: F, prior: P)
119        -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
120        where
121            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
122            F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
123            P: FnMut(&TInner)->Tr::Time+Clone+'static,
124        {
125        let logic1 = logic.clone();
126        let logic2 = logic.clone();
127        Arranged {
128            trace: TraceEnterAt::make_from(self.trace, logic1, prior),
129            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
130        }
131    }
132
133    /// Flattens the stream into a `Collection`.
134    ///
135    /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
136    /// and this method should only be used when the data need to be transformed or exchanged, rather than
137    /// supplied as arguments to an operator using the same key-value structure.
138    pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
139        where
140            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
141    {
142        self.flat_map_ref(move |key, val| Some(logic(key,val)))
143    }
144
145    /// Flattens the stream into a `Collection`.
146    ///
147    /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
148    /// and this method should only be used when the data need to be transformed or exchanged, rather than
149    /// supplied as arguments to an operator using the same key-value structure.
150    pub fn as_vecs(self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
151    where
152        Tr::KeyOwn: crate::ExchangeData,
153        Tr::ValOwn: crate::ExchangeData,
154    {
155        self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
156    }
157
158    /// Extracts elements from an arrangement as a collection.
159    ///
160    /// The supplied logic may produce an iterator over output values, allowing either
161    /// filtering or flat mapping as part of the extraction.
162    pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
163        where
164            I: IntoIterator<Item: Data>,
165            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
166    {
167        Self::flat_map_batches(self.stream, logic)
168    }
169
170    /// Extracts elements from a stream of batches as a collection.
171    ///
172    /// The supplied logic may produce an iterator over output values, allowing either
173    /// filtering or flat mapping as part of the extraction.
174    ///
175    /// This method exists for streams of batches without the corresponding arrangement.
176    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
177    pub fn flat_map_batches<I, L>(stream: Stream<G, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<G, I::Item, Tr::Diff>
178    where
179        I: IntoIterator<Item: Data>,
180        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
181    {
182        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
183            input.for_each(|time, data| {
184                let mut session = output.session(&time);
185                for wrapper in data.iter() {
186                    let batch = &wrapper;
187                    let mut cursor = batch.cursor();
188                    while let Some(key) = cursor.get_key(batch) {
189                        while let Some(val) = cursor.get_val(batch) {
190                            for datum in logic(key, val) {
191                                cursor.map_times(batch, |time, diff| {
192                                    session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
193                                });
194                            }
195                            cursor.step_val(batch);
196                        }
197                        cursor.step_key(batch);
198                    }
199                }
200            });
201        })
202        .as_collection()
203    }
204}
205
206
207use crate::difference::Multiply;
208// Direct join implementations.
209impl<G, T1> Arranged<G, T1>
210where
211    G: Scope<Timestamp=T1::Time>,
212    T1: TraceReader + Clone + 'static,
213{
214    /// A convenience method to join and produce `VecCollection` output.
215    ///
216    /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
217    pub fn join_core<T2,I,L>(self, other: Arranged<G,T2>, mut result: L) -> VecCollection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
218    where
219        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
220        T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
221        I: IntoIterator<Item: Data>,
222        L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
223    {
224        let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
225            let t = t.clone();
226            let r = (r1.clone()).multiply(r2);
227            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
228        };
229
230        use crate::operators::join::join_traces;
231        join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
232            self,
233            other,
234            move |k, v1, v2, t, d1, d2, c| {
235                for datum in result(k, v1, v2, t, d1, d2) {
236                    c.give(datum);
237                }
238            }
239        )
240            .as_collection()
241    }
242}
243
244// Direct reduce implementations.
245use crate::difference::Abelian;
246impl<G, T1> Arranged<G, T1>
247where
248    G: Scope<Timestamp = T1::Time>,
249    T1: TraceReader + Clone + 'static,
250{
251    /// A direct implementation of `ReduceCore::reduce_abelian`.
252    pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
253    where
254        T1: TraceReader<KeyOwn: Ord>,
255        T2: for<'a> Trace<
256            Key<'a>= T1::Key<'a>,
257            KeyOwn=T1::KeyOwn,
258            ValOwn: Data,
259            Time=T1::Time,
260            Diff: Abelian,
261        >+'static,
262        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
263        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
264    {
265        self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
266            if !input.is_empty() {
267                logic(key, input, change);
268            }
269            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
270            crate::consolidation::consolidate(change);
271        })
272    }
273
274    /// A direct implementation of `ReduceCore::reduce_core`.
275    pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
276    where
277        T1: TraceReader<KeyOwn: Ord>,
278        T2: for<'a> Trace<
279            Key<'a>=T1::Key<'a>,
280            KeyOwn=T1::KeyOwn,
281            ValOwn: Data,
282            Time=T1::Time,
283        >+'static,
284        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
285        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
286    {
287        use crate::operators::reduce::reduce_trace;
288        reduce_trace::<_,_,Bu,_,_>(self, name, logic)
289    }
290}
291
292
293impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
294where
295    G: Scope<Timestamp=Tr::Time>,
296    Tr: TraceReader + Clone,
297{
298    /// Brings an arranged collection out of a nested region.
299    ///
300    /// This method only applies to *regions*, which are subscopes with the same timestamp
301    /// as their containing scope. In this case, the trace type does not need to change.
302    pub fn leave_region(self) -> Arranged<G, Tr> {
303        use timely::dataflow::operators::Leave;
304        Arranged {
305            stream: self.stream.leave(),
306            trace: self.trace,
307        }
308    }
309}
310
311/// A type that can be arranged as if a collection of updates.
312pub trait Arrange<G, C> : Sized
313where
314    G: Scope<Timestamp: Lattice>,
315{
316    /// Arranges updates into a shared trace.
317    fn arrange<Ba, Bu, Tr>(self) -> Arranged<G, TraceAgent<Tr>>
318    where
319        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
320        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
321        Tr: Trace<Time=G::Timestamp> + 'static,
322    {
323        self.arrange_named::<Ba, Bu, Tr>("Arrange")
324    }
325
326    /// Arranges updates into a shared trace, with a supplied name.
327    fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
328    where
329        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
330        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
331        Tr: Trace<Time=G::Timestamp> + 'static,
332    ;
333}
334
335/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
336///
337/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
338/// It uses the supplied parallelization contract to distribute the data, which does not need to
339/// be consistently by key (though this is the most common).
340pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: Stream<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
341where
342    G: Scope<Timestamp: Lattice>,
343    P: ParallelizationContract<G::Timestamp, Ba::Input>,
344    Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
345    Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
346    Tr: Trace<Time=G::Timestamp>+'static,
347{
348    // The `Arrange` operator is tasked with reacting to an advancing input
349    // frontier by producing the sequence of batches whose lower and upper
350    // bounds are those frontiers, containing updates at times greater or
351    // equal to lower and not greater or equal to upper.
352    //
353    // The operator uses its batch type's `Batcher`, which accepts update
354    // triples and responds to requests to "seal" batches (presented as new
355    // upper frontiers).
356    //
357    // Each sealed batch is presented to the trace, and if at all possible
358    // transmitted along the outgoing channel. Empty batches may not have
359    // a corresponding capability, as they are only retained for actual data
360    // held by the batcher, which may prevents the operator from sending an
361    // empty batch.
362
363    let mut reader: Option<TraceAgent<Tr>> = None;
364
365    // fabricate a data-parallel operator using the `unary_notify` pattern.
366    let reader_ref = &mut reader;
367    let scope = stream.scope();
368
369    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
370
371        // Acquire a logger for arrange events.
372        let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
373
374        // Where we will deposit received updates, and from which we extract batches.
375        let mut batcher = Ba::new(logger.clone(), info.global_id);
376
377        // Capabilities for the lower envelope of updates in `batcher`.
378        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
379
380        let activator = Some(scope.activator_for(info.address.clone()));
381        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
382        // If there is default exertion logic set, install it.
383        if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
384            empty_trace.set_exert_logic(exert_logic);
385        }
386
387        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
388
389        *reader_ref = Some(reader_local);
390
391        // Initialize to the minimal input frontier.
392        let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
393
394        move |(input, frontier), output| {
395
396            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
397            // We don't have to keep all capabilities, but we need to be able to form output messages
398            // when we realize that time intervals are complete.
399
400            input.for_each(|cap, data| {
401                capabilities.insert(cap.retain(0));
402                batcher.push_container(data);
403            });
404
405            // The frontier may have advanced by multiple elements, which is an issue because
406            // timely dataflow currently only allows one capability per message. This means we
407            // must pretend to process the frontier advances one element at a time, batching
408            // and sending smaller bites than we might have otherwise done.
409
410            // Assert that the frontier never regresses.
411            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
412
413            // Test to see if strict progress has occurred, which happens whenever the new
414            // frontier isn't equal to the previous. It is only in this case that we have any
415            // data processing to do.
416            if prev_frontier.borrow() != frontier.frontier() {
417                // There are two cases to handle with some care:
418                //
419                // 1. If any held capabilities are not in advance of the new input frontier,
420                //    we must carve out updates now in advance of the new input frontier and
421                //    transmit them as batches, which requires appropriate *single* capabilities;
422                //    Until timely dataflow supports multiple capabilities on messages, at least.
423                //
424                // 2. If there are no held capabilities in advance of the new input frontier,
425                //    then there are no updates not in advance of the new input frontier and
426                //    we can simply create an empty input batch with the new upper frontier
427                //    and feed this to the trace agent (but not along the timely output).
428
429                // If there is at least one capability not in advance of the input frontier ...
430                if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
431
432                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
433
434                    // For each capability not in advance of the input frontier ...
435                    for (index, capability) in capabilities.elements().iter().enumerate() {
436
437                        if !frontier.less_equal(capability.time()) {
438
439                            // Assemble the upper bound on times we can commit with this capabilities.
440                            // We must respect the input frontier, and *subsequent* capabilities, as
441                            // we are pretending to retire the capability changes one by one.
442                            upper.clear();
443                            for time in frontier.frontier().iter() {
444                                upper.insert(time.clone());
445                            }
446                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
447                                upper.insert(other_capability.time().clone());
448                            }
449
450                            // Extract updates not in advance of `upper`.
451                            let batch = batcher.seal::<Bu>(upper.clone());
452
453                            writer.insert(batch.clone(), Some(capability.time().clone()));
454
455                            // send the batch to downstream consumers, empty or not.
456                            output.session(&capabilities.elements()[index]).give(batch);
457                        }
458                    }
459
460                    // Having extracted and sent batches between each capability and the input frontier,
461                    // we should downgrade all capabilities to match the batcher's lower update frontier.
462                    // This may involve discarding capabilities, which is fine as any new updates arrive
463                    // in messages with new capabilities.
464
465                    let mut new_capabilities = Antichain::new();
466                    for time in batcher.frontier().iter() {
467                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
468                            new_capabilities.insert(capability.delayed(time));
469                        }
470                        else {
471                            panic!("failed to find capability");
472                        }
473                    }
474
475                    capabilities = new_capabilities;
476                }
477                else {
478                    // Announce progress updates, even without data.
479                    let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
480                    writer.seal(frontier.frontier().to_owned());
481                }
482
483                prev_frontier.clear();
484                prev_frontier.extend(frontier.frontier().iter().cloned());
485            }
486
487            writer.exert();
488        }
489    });
490
491    Arranged { stream, trace: reader.unwrap() }
492}