Skip to main content

differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, VecCollection, AsCollection};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33
34use trace::wrappers::enter::{TraceEnter, BatchEnter,};
35use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
36use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
37
38use super::TraceAgent;
39
40/// An arranged collection of `(K,V)` values.
41///
42/// An `Arranged` allows multiple differential operators to share the resources (communication,
43/// computation, memory) required to produce and maintain an indexed representation of a collection.
44pub struct Arranged<'scope, Tr>
45where
46    Tr: TraceReader+Clone,
47{
48    /// A stream containing arranged updates.
49    ///
50    /// This stream contains the same batches of updates the trace itself accepts, so there should
51    /// be no additional overhead to receiving these records. The batches can be navigated just as
52    /// the batches in the trace, by key and by value.
53    pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>,
54    /// A shared trace, updated by the `Arrange` operator and readable by others.
55    pub trace: Tr,
56    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
57    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
58}
59
60impl<'scope, Tr> Clone for Arranged<'scope, Tr>
61where
62    Tr: TraceReader + Clone,
63{
64    fn clone(&self) -> Self {
65        Arranged {
66            stream: self.stream.clone(),
67            trace: self.trace.clone(),
68        }
69    }
70}
71
72use ::timely::progress::timestamp::Refines;
73use timely::Container;
74
75impl<'scope, Tr> Arranged<'scope, Tr>
76where
77    Tr: TraceReader + Clone,
78{
79    /// Brings an arranged collection into a nested scope.
80    ///
81    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
82    /// have all been extended with an additional coordinate with the default value. The resulting collection does
83    /// not vary with the new timestamp coordinate.
84    pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter<Tr, TInner>>
85    where
86        TInner: Refines<Tr::Time>+Lattice,
87    {
88        Arranged {
89            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
90            trace: TraceEnter::make_from(self.trace),
91        }
92    }
93
94    /// Brings an arranged collection into a nested region.
95    ///
96    /// This method only applies to *regions*, which are subscopes with the same timestamp
97    /// as their containing scope. In this case, the trace type does not need to change.
98    pub fn enter_region<'inner>(self, child: Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> {
99        Arranged {
100            stream: self.stream.enter(child),
101            trace: self.trace,
102        }
103    }
104
105    /// Brings an arranged collection into a nested scope.
106    ///
107    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
108    /// have all been extended with an additional coordinate with the default value. The resulting collection does
109    /// not vary with the new timestamp coordinate.
110    pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt<Tr, TInner, F, P>>
111    where
112        TInner: Refines<Tr::Time>+Lattice+'static,
113        F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
114        P: FnMut(&TInner)->Tr::Time+Clone+'static,
115    {
116        let logic1 = logic.clone();
117        let logic2 = logic.clone();
118        Arranged {
119            trace: TraceEnterAt::make_from(self.trace, logic1, prior),
120            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
121        }
122    }
123
124    /// Extracts a collection of any container from the stream of batches.
125    ///
126    /// This method is like `self.stream.flat_map`, except that it produces containers
127    /// directly, rather than form a container of containers as `flat_map` would.
128    pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<'scope, Tr::Time, I::Item>
129    where
130        I: IntoIterator<Item: Container>,
131        L: FnMut(Tr::Batch) -> I+'static,
132    {
133        self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
134            input.for_each(|time, data| {
135                let mut session = output.session(&time);
136                for wrapper in data.drain(..) {
137                    for mut container in logic(wrapper) {
138                        session.give_container(&mut container);
139                    }
140                }
141            });
142        })
143        .as_collection()
144    }
145
146    /// Flattens the stream into a `VecCollection`.
147    ///
148    /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
149    /// and this method should only be used when the data need to be transformed or exchanged, rather than
150    /// supplied as arguments to an operator using the same key-value structure.
151    pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff>
152        where
153            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
154    {
155        self.flat_map_ref(move |key, val| Some(logic(key,val)))
156    }
157
158    /// Flattens the stream into a `VecCollection`.
159    ///
160    /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
161    /// and this method should only be used when the data need to be transformed or exchanged, rather than
162    /// supplied as arguments to an operator using the same key-value structure.
163    ///
164    /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support
165    /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic
166    /// on the reference types.
167    pub fn as_vecs<K, V>(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff>
168    where
169        K: crate::ExchangeData,
170        V: crate::ExchangeData,
171        Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
172    {
173        self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
174    }
175
176    /// Extracts elements from an arrangement as a `VecCollection`.
177    ///
178    /// The supplied logic may produce an iterator over output values, allowing either
179    /// filtering or flat mapping as part of the extraction.
180    pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
181        where
182            I: IntoIterator<Item: Data>,
183            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
184    {
185        Self::flat_map_batches(self.stream, logic)
186    }
187
188    /// Extracts elements from a stream of batches as a `VecCollection`.
189    ///
190    /// The supplied logic may produce an iterator over output values, allowing either
191    /// filtering or flat mapping as part of the extraction.
192    ///
193    /// This method exists for streams of batches without the corresponding arrangement.
194    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
195    pub fn flat_map_batches<I, L>(stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
196    where
197        I: IntoIterator<Item: Data>,
198        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
199    {
200        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
201            input.for_each(|time, data| {
202                let mut session = output.session(&time);
203                for wrapper in data.iter() {
204                    let batch = &wrapper;
205                    let mut cursor = batch.cursor();
206                    while let Some(key) = cursor.get_key(batch) {
207                        while let Some(val) = cursor.get_val(batch) {
208                            for datum in logic(key, val) {
209                                cursor.map_times(batch, |time, diff| {
210                                    session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
211                                });
212                            }
213                            cursor.step_val(batch);
214                        }
215                        cursor.step_key(batch);
216                    }
217                }
218            });
219        })
220        .as_collection()
221    }
222}
223
224
225use crate::difference::Multiply;
226// Direct join implementations.
227impl<'scope, Tr1> Arranged<'scope, Tr1>
228where
229    Tr1: TraceReader + Clone + 'static,
230{
231    /// A convenience method to join and produce `VecCollection` output.
232    ///
233    /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
234    pub fn join_core<Tr2,I,L>(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,<Tr1::Diff as Multiply<Tr2::Diff>>::Output>
235    where
236        Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static,
237        Tr1::Diff: Multiply<Tr2::Diff, Output: Semigroup+'static>,
238        I: IntoIterator<Item: Data>,
239        L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static
240    {
241        let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: &Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
242            let t = t.clone();
243            let r = (r1.clone()).multiply(r2);
244            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
245        };
246
247        use crate::operators::join::join_traces;
248        join_traces::<_, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
249            self,
250            other,
251            move |k, v1, v2, t, d1, d2, c| {
252                for datum in result(k, v1, v2, t, d1, d2) {
253                    c.give(datum);
254                }
255            }
256        )
257            .as_collection()
258    }
259}
260
261// Direct reduce implementations.
262use crate::difference::Abelian;
263impl<'scope, Tr1> Arranged<'scope, Tr1>
264where
265    Tr1: TraceReader + Clone + 'static,
266{
267    /// A direct implementation of `ReduceCore::reduce_abelian`.
268    pub fn reduce_abelian<L, Bu, Tr2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
269    where
270        Tr2: for<'a> Trace<
271            Key<'a>= Tr1::Key<'a>,
272            ValOwn: Data,
273            Time=Tr1::Time,
274            Diff: Abelian,
275        >+'static,
276        Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
277        L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
278        P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
279    {
280        self.reduce_core::<_,Bu,Tr2,_>(name, move |key, input, output, change| {
281            if !input.is_empty() {
282                logic(key, input, change);
283            }
284            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
285            crate::consolidation::consolidate(change);
286        }, push)
287    }
288
289    /// A direct implementation of `ReduceCore::reduce_core`.
290    pub fn reduce_core<L, Bu, Tr2, P>(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
291    where
292        Tr2: for<'a> Trace<
293            Key<'a>=Tr1::Key<'a>,
294            ValOwn: Data,
295            Time=Tr1::Time,
296        >+'static,
297        Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
298        L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
299        P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
300    {
301        use crate::operators::reduce::reduce_trace;
302        reduce_trace::<_,Bu,_,_,_>(self, name, logic, push)
303    }
304}
305
306
307impl<'scope, Tr> Arranged<'scope, Tr>
308where
309    Tr: TraceReader + Clone,
310{
311    /// Brings an arranged collection out of a nested region.
312    ///
313    /// This method only applies to *regions*, which are subscopes with the same timestamp
314    /// as their containing scope. In this case, the trace type does not need to change.
315    pub fn leave_region<'outer>(self, outer: Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> {
316        use timely::dataflow::operators::Leave;
317        Arranged {
318            stream: self.stream.leave(outer),
319            trace: self.trace,
320        }
321    }
322}
323
324/// A type that can be arranged as if a collection of updates.
325pub trait Arrange<'scope, T, C> : Sized
326where
327    T: Timestamp + Lattice,
328{
329    /// Arranges updates into a shared trace.
330    fn arrange<Ba, Bu, Tr>(self) -> Arranged<'scope, TraceAgent<Tr>>
331    where
332        Ba: Batcher<Input=C, Time=T> + 'static,
333        Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
334        Tr: Trace<Time=T> + 'static,
335    {
336        self.arrange_named::<Ba, Bu, Tr>("Arrange")
337    }
338
339    /// Arranges updates into a shared trace, with a supplied name.
340    fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
341    where
342        Ba: Batcher<Input=C, Time=T> + 'static,
343        Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
344        Tr: Trace<Time=T> + 'static,
345    ;
346}
347
348/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
349///
350/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
351/// It uses the supplied parallelization contract to distribute the data, which does not need to
352/// be consistently by key (though this is the most common).
353pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
354where
355    P: ParallelizationContract<Tr::Time, Ba::Input>,
356    Ba: Batcher<Time=Tr::Time,Input: Container> + 'static,
357    Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output = Tr::Batch>,
358    Tr: Trace+'static,
359{
360    // The `Arrange` operator is tasked with reacting to an advancing input
361    // frontier by producing the sequence of batches whose lower and upper
362    // bounds are those frontiers, containing updates at times greater or
363    // equal to lower and not greater or equal to upper.
364    //
365    // The operator uses its batch type's `Batcher`, which accepts update
366    // triples and responds to requests to "seal" batches (presented as new
367    // upper frontiers).
368    //
369    // Each sealed batch is presented to the trace, and if at all possible
370    // transmitted along the outgoing channel. Empty batches may not have
371    // a corresponding capability, as they are only retained for actual data
372    // held by the batcher, which may prevents the operator from sending an
373    // empty batch.
374
375    let mut reader: Option<TraceAgent<Tr>> = None;
376
377    // fabricate a data-parallel operator using the `unary_notify` pattern.
378    let reader_ref = &mut reader;
379    let scope = stream.scope();
380
381    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
382
383        // Acquire a logger for arrange events.
384        let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
385
386        // Where we will deposit received updates, and from which we extract batches.
387        let mut batcher = Ba::new(logger.clone(), info.global_id);
388
389        // Capabilities for the lower envelope of updates in `batcher`.
390        let mut capabilities = Antichain::<Capability<Tr::Time>>::new();
391
392        let activator = Some(scope.activator_for(info.address.clone()));
393        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
394        // If there is default exertion logic set, install it.
395        if let Some(exert_logic) = scope.worker().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
396            empty_trace.set_exert_logic(exert_logic);
397        }
398
399        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
400
401        *reader_ref = Some(reader_local);
402
403        // Initialize to the minimal input frontier.
404        let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());
405
406        move |(input, frontier), output| {
407
408            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
409            // We don't have to keep all capabilities, but we need to be able to form output messages
410            // when we realize that time intervals are complete.
411
412            input.for_each(|cap, data| {
413                capabilities.insert(cap.retain(0));
414                batcher.push_container(data);
415            });
416
417            // The frontier may have advanced by multiple elements, which is an issue because
418            // timely dataflow currently only allows one capability per message. This means we
419            // must pretend to process the frontier advances one element at a time, batching
420            // and sending smaller bites than we might have otherwise done.
421
422            // Assert that the frontier never regresses.
423            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
424
425            // Test to see if strict progress has occurred, which happens whenever the new
426            // frontier isn't equal to the previous. It is only in this case that we have any
427            // data processing to do.
428            if prev_frontier.borrow() != frontier.frontier() {
429                // There are two cases to handle with some care:
430                //
431                // 1. If any held capabilities are not in advance of the new input frontier,
432                //    we must carve out updates now in advance of the new input frontier and
433                //    transmit them as batches, which requires appropriate *single* capabilities;
434                //    Until timely dataflow supports multiple capabilities on messages, at least.
435                //
436                // 2. If there are no held capabilities in advance of the new input frontier,
437                //    then there are no updates not in advance of the new input frontier and
438                //    we can simply create an empty input batch with the new upper frontier
439                //    and feed this to the trace agent (but not along the timely output).
440
441                // If there is at least one capability not in advance of the input frontier ...
442                if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
443
444                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
445
446                    // For each capability not in advance of the input frontier ...
447                    for (index, capability) in capabilities.elements().iter().enumerate() {
448
449                        if !frontier.less_equal(capability.time()) {
450
451                            // Assemble the upper bound on times we can commit with this capabilities.
452                            // We must respect the input frontier, and *subsequent* capabilities, as
453                            // we are pretending to retire the capability changes one by one.
454                            upper.clear();
455                            for time in frontier.frontier().iter() {
456                                upper.insert(time.clone());
457                            }
458                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
459                                upper.insert(other_capability.time().clone());
460                            }
461
462                            // Extract updates not in advance of `upper`.
463                            let batch = batcher.seal::<Bu>(upper.clone());
464
465                            writer.insert(batch.clone(), Some(capability.time().clone()));
466
467                            // send the batch to downstream consumers, empty or not.
468                            output.session(&capabilities.elements()[index]).give(batch);
469                        }
470                    }
471
472                    // Having extracted and sent batches between each capability and the input frontier,
473                    // we should downgrade all capabilities to match the batcher's lower update frontier.
474                    // This may involve discarding capabilities, which is fine as any new updates arrive
475                    // in messages with new capabilities.
476
477                    let mut new_capabilities = Antichain::new();
478                    for time in batcher.frontier().iter() {
479                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
480                            new_capabilities.insert(capability.delayed(time));
481                        }
482                        else {
483                            panic!("failed to find capability");
484                        }
485                    }
486
487                    capabilities = new_capabilities;
488                }
489                else {
490                    // Announce progress updates, even without data.
491                    let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
492                    writer.seal(frontier.frontier().to_owned());
493                }
494
495                prev_frontier.clear();
496                prev_frontier.extend(frontier.frontier().iter().cloned());
497            }
498
499            writer.exert();
500        }
501    });
502
503    Arranged { stream, trace: reader.unwrap() }
504}