Skip to main content

differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::container::{ContainerBuilder, PushInto};
28use timely::dataflow::operators::Capability;
29
30use crate::{Data, VecCollection, AsCollection};
31use crate::difference::Semigroup;
32use crate::lattice::Lattice;
33use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38
39use super::TraceAgent;
40
41/// An arranged collection of `(K,V)` values.
42///
43/// An `Arranged` allows multiple differential operators to share the resources (communication,
44/// computation, memory) required to produce and maintain an indexed representation of a collection.
45pub struct Arranged<'scope, Tr: TraceReader> {
46    /// A stream containing arranged updates.
47    ///
48    /// This stream contains the same batches of updates the trace itself accepts, so there should
49    /// be no additional overhead to receiving these records. The batches can be navigated just as
50    /// the batches in the trace, by key and by value.
51    pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>,
52    /// A shared trace, updated by the `Arrange` operator and readable by others.
53    pub trace: Tr,
54}
55
56impl<'scope, Tr: TraceReader+Clone> Clone for Arranged<'scope, Tr> {
57    fn clone(&self) -> Self {
58        Arranged {
59            stream: self.stream.clone(),
60            trace: self.trace.clone(),
61        }
62    }
63}
64
65use ::timely::progress::timestamp::Refines;
66use timely::Container;
67
68impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> {
69    /// Brings an arranged collection into a nested scope.
70    ///
71    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
72    /// have all been extended with an additional coordinate with the default value. The resulting collection does
73    /// not vary with the new timestamp coordinate.
74    pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter<Tr, TInner>>
75    where
76        TInner: Refines<Tr::Time>+Lattice,
77    {
78        Arranged {
79            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
80            trace: TraceEnter::make_from(self.trace),
81        }
82    }
83
84    /// Brings an arranged collection into a nested region.
85    ///
86    /// This method only applies to *regions*, which are subscopes with the same timestamp
87    /// as their containing scope. In this case, the trace type does not need to change.
88    pub fn enter_region<'inner>(self, child: Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> {
89        Arranged {
90            stream: self.stream.enter(child),
91            trace: self.trace,
92        }
93    }
94
95    /// Brings an arranged collection into a nested scope.
96    ///
97    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
98    /// have all been extended with an additional coordinate with the default value. The resulting collection does
99    /// not vary with the new timestamp coordinate.
100    pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt<Tr, TInner, F, P>>
101    where
102        TInner: Refines<Tr::Time>+Lattice+'static,
103        F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
104        P: FnMut(&TInner)->Tr::Time+Clone+'static,
105    {
106        let logic1 = logic.clone();
107        let logic2 = logic.clone();
108        Arranged {
109            trace: TraceEnterAt::make_from(self.trace, logic1, prior),
110            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
111        }
112    }
113
114    /// Extracts a collection of any container from the stream of batches.
115    ///
116    /// This method is like `self.stream.flat_map`, except that it produces containers
117    /// directly, rather than form a container of containers as `flat_map` would.
118    pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<'scope, Tr::Time, I::Item>
119    where
120        I: IntoIterator<Item: Container>,
121        L: FnMut(Tr::Batch) -> I+'static,
122    {
123        self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
124            input.for_each(|time, data| {
125                let mut session = output.session(&time);
126                for wrapper in data.drain(..) {
127                    for mut container in logic(wrapper) {
128                        session.give_container(&mut container);
129                    }
130                }
131            });
132        })
133        .as_collection()
134    }
135
136    /// Flattens the stream into a `VecCollection`.
137    ///
138    /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
139    /// and this method should only be used when the data need to be transformed or exchanged, rather than
140    /// supplied as arguments to an operator using the same key-value structure.
141    pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff>
142        where
143            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
144    {
145        self.flat_map_ref(move |key, val| Some(logic(key,val)))
146    }
147
148    /// Flattens the stream into a `VecCollection`.
149    ///
150    /// The underlying `Stream<T, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
151    /// and this method should only be used when the data need to be transformed or exchanged, rather than
152    /// supplied as arguments to an operator using the same key-value structure.
153    ///
154    /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support
155    /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic
156    /// on the reference types.
157    pub fn as_vecs<K, V>(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff>
158    where
159        K: crate::ExchangeData,
160        V: crate::ExchangeData,
161        Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
162    {
163        self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
164    }
165
166    /// Extracts elements from an arrangement as a `VecCollection`.
167    ///
168    /// The supplied logic may produce an iterator over output values, allowing either
169    /// filtering or flat mapping as part of the extraction.
170    pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
171        where
172            I: IntoIterator<Item: Data>,
173            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
174    {
175        Self::flat_map_batches(self.stream, logic)
176    }
177
178    /// Extracts elements from a stream of batches as a `VecCollection`.
179    ///
180    /// The supplied logic may produce an iterator over output values, allowing either
181    /// filtering or flat mapping as part of the extraction.
182    ///
183    /// This method exists for streams of batches without the corresponding arrangement.
184    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
185    pub fn flat_map_batches<I, L>(stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
186    where
187        I: IntoIterator<Item: Data>,
188        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
189    {
190        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
191            input.for_each(|time, data| {
192                let mut session = output.session(&time);
193                for wrapper in data.iter() {
194                    let batch = &wrapper;
195                    let mut cursor = batch.cursor();
196                    while let Some(key) = cursor.get_key(batch) {
197                        while let Some(val) = cursor.get_val(batch) {
198                            for datum in logic(key, val) {
199                                cursor.map_times(batch, |time, diff| {
200                                    session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
201                                });
202                            }
203                            cursor.step_val(batch);
204                        }
205                        cursor.step_key(batch);
206                    }
207                }
208            });
209        })
210        .as_collection()
211    }
212}
213
214
215use crate::difference::Multiply;
216// Direct join implementations.
217impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> {
218    /// A convenience method to join and produce `VecCollection` output.
219    ///
220    /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
221    pub fn join_core<Tr2,I,L>(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,<Tr1::Diff as Multiply<Tr2::Diff>>::Output>
222    where
223        Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static,
224        Tr1::Diff: Multiply<Tr2::Diff, Output: Semigroup+'static>,
225        I: IntoIterator<Item: Data>,
226        L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static
227    {
228        let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
229            let r = (r1.clone()).multiply(r2);
230            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
231        };
232
233        use crate::operators::join::join_traces;
234        join_traces::<_, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
235            self,
236            other,
237            move |k, v1, v2, t, d1, d2, c| {
238                for datum in result(k, v1, v2, t, d1, d2) {
239                    c.give(datum);
240                }
241            }
242        )
243            .as_collection()
244    }
245}
246
247// Direct reduce implementations.
248use crate::difference::Abelian;
249impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> {
250    /// A direct implementation of `ReduceCore::reduce_abelian`.
251    pub fn reduce_abelian<L, Bu, Tr2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
252    where
253        Tr2: for<'a> Trace<
254            Key<'a>= Tr1::Key<'a>,
255            ValOwn: Data,
256            Time=Tr1::Time,
257            Diff: Abelian,
258        >+'static,
259        Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
260        L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
261        P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
262    {
263        self.reduce_core::<_,Bu,Tr2,_>(name, move |key, input, output, change| {
264            if !input.is_empty() {
265                logic(key, input, change);
266            }
267            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
268            crate::consolidation::consolidate(change);
269        }, push)
270    }
271
272    /// A direct implementation of `ReduceCore::reduce_core`.
273    pub fn reduce_core<L, Bu, Tr2, P>(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
274    where
275        Tr2: for<'a> Trace<
276            Key<'a>=Tr1::Key<'a>,
277            ValOwn: Data,
278            Time=Tr1::Time,
279        >+'static,
280        Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
281        L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
282        P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
283    {
284        use crate::operators::reduce::reduce_trace;
285        reduce_trace::<_,Bu,_,_,_>(self, name, logic, push)
286    }
287}
288
289
290impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> {
291    /// Brings an arranged collection out of a nested region.
292    ///
293    /// This method only applies to *regions*, which are subscopes with the same timestamp
294    /// as their containing scope. In this case, the trace type does not need to change.
295    pub fn leave_region<'outer>(self, outer: Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> {
296        use timely::dataflow::operators::Leave;
297        Arranged {
298            stream: self.stream.leave(outer),
299            trace: self.trace,
300        }
301    }
302}
303
304/// A type that can be arranged as if a collection of updates.
305pub trait Arrange<'scope, T: Timestamp+Lattice, C> : Sized {
306    /// Arranges updates into a shared trace.
307    ///
308    /// The batcher's output container must equal the stream container `C`; the default
309    /// chunker only consolidates same-type containers. For chunker setups that convert
310    /// between container types (e.g. columnar layouts), call [`arrange_core`] directly.
311    fn arrange<Ba, Bu, Tr>(self) -> Arranged<'scope, TraceAgent<Tr>>
312    where
313        Ba: Batcher<Output=C, Time=T> + 'static,
314        Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
315        Tr: Trace<Time=T> + 'static,
316    {
317        self.arrange_named::<Ba, Bu, Tr>("Arrange")
318    }
319
320    /// Arranges updates into a shared trace, with a supplied name.
321    ///
322    /// See [`Arrange::arrange`] for constraints on the batcher's output container.
323    fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
324    where
325        Ba: Batcher<Output=C, Time=T> + 'static,
326        Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
327        Tr: Trace<Time=T> + 'static,
328    ;
329}
330
331/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
332///
333/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
334/// It uses the supplied parallelization contract to distribute the data, which does not need to
335/// be consistently by key (though this is the most common).
336pub fn arrange_core<'scope, P, C, Chu, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, C>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
337where
338    C: Container + Clone + 'static,
339    P: ParallelizationContract<Tr::Time, C>,
340    Chu: ContainerBuilder<Container=Ba::Output> + for<'a> PushInto<&'a mut C> + 'static,
341    Ba: Batcher<Time=Tr::Time> + 'static,
342    Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output = Tr::Batch>,
343    Tr: Trace+'static,
344{
345    // The `Arrange` operator is tasked with reacting to an advancing input
346    // frontier by producing the sequence of batches whose lower and upper
347    // bounds are those frontiers, containing updates at times greater or
348    // equal to lower and not greater or equal to upper.
349    //
350    // The operator uses its batch type's `Batcher`, which accepts update
351    // triples and responds to requests to "seal" batches (presented as new
352    // upper frontiers).
353    //
354    // Each sealed batch is presented to the trace, and if at all possible
355    // transmitted along the outgoing channel. Empty batches may not have
356    // a corresponding capability, as they are only retained for actual data
357    // held by the batcher, which may prevents the operator from sending an
358    // empty batch.
359
360    let mut reader: Option<TraceAgent<Tr>> = None;
361
362    // fabricate a data-parallel operator using the `unary_notify` pattern.
363    let reader_ref = &mut reader;
364    let scope = stream.scope();
365
366    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
367
368        // Acquire a logger for arrange events.
369        let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
370
371        // Where we will deposit received updates, and from which we extract batches.
372        let mut batcher = Ba::new(logger.clone(), info.global_id);
373
374        // Capabilities for the lower envelope of updates in `batcher`.
375        let mut capabilities = Antichain::<Capability<Tr::Time>>::new();
376
377        let activator = Some(scope.activator_for(std::rc::Rc::clone(&info.address)));
378        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
379        // If there is default exertion logic set, install it.
380        if let Some(exert_logic) = scope.worker().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
381            empty_trace.set_exert_logic(exert_logic);
382        }
383
384        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
385
386        *reader_ref = Some(reader_local);
387
388        // Initialize to the minimal input frontier.
389        let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());
390
391        let mut chunker = Chu::default();
392
393        move |(input, frontier), output| {
394
395            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
396            // We don't have to keep all capabilities, but we need to be able to form output messages
397            // when we realize that time intervals are complete.
398
399            input.for_each(|cap, data| {
400                capabilities.insert(cap.retain(0));
401                chunker.push_into(data);
402                while let Some(chunk) = chunker.extract() {
403                    batcher.push_into(std::mem::take(chunk));
404                }
405            });
406
407            // The frontier may have advanced by multiple elements, which is an issue because
408            // timely dataflow currently only allows one capability per message. This means we
409            // must pretend to process the frontier advances one element at a time, batching
410            // and sending smaller bites than we might have otherwise done.
411
412            // Assert that the frontier never regresses.
413            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
414
415            // Test to see if strict progress has occurred, which happens whenever the new
416            // frontier isn't equal to the previous. It is only in this case that we have any
417            // data processing to do.
418            if prev_frontier.borrow() != frontier.frontier() {
419                // Flush any data the chunker is still accumulating into the batcher before we
420                // seal. The batcher only sees chunks the chunker has emitted; without this drain
421                // a partial final chunk would never reach the batcher.
422                while let Some(chunk) = chunker.finish() {
423                    batcher.push_into(std::mem::take(chunk));
424                }
425
426                // There are two cases to handle with some care:
427                //
428                // 1. If any held capabilities are not in advance of the new input frontier,
429                //    we must carve out updates now in advance of the new input frontier and
430                //    transmit them as batches, which requires appropriate *single* capabilities;
431                //    Until timely dataflow supports multiple capabilities on messages, at least.
432                //
433                // 2. If there are no held capabilities in advance of the new input frontier,
434                //    then there are no updates not in advance of the new input frontier and
435                //    we can simply create an empty input batch with the new upper frontier
436                //    and feed this to the trace agent (but not along the timely output).
437
438                // If there is at least one capability not in advance of the input frontier ...
439                if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
440
441                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
442
443                    // For each capability not in advance of the input frontier ...
444                    for (index, capability) in capabilities.elements().iter().enumerate() {
445
446                        if !frontier.less_equal(capability.time()) {
447
448                            // Assemble the upper bound on times we can commit with this capabilities.
449                            // We must respect the input frontier, and *subsequent* capabilities, as
450                            // we are pretending to retire the capability changes one by one.
451                            upper.clear();
452                            for time in frontier.frontier().iter() {
453                                upper.insert(time.clone());
454                            }
455                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
456                                upper.insert(other_capability.time().clone());
457                            }
458
459                            // Extract updates not in advance of `upper`.
460                            let (mut chain, description) = batcher.seal(upper.clone());
461                            let batch = Bu::seal(&mut chain, description);
462
463                            writer.insert(batch.clone(), Some(capability.time().clone()));
464
465                            // send the batch to downstream consumers, empty or not.
466                            output.session(&capabilities.elements()[index]).give(batch);
467                        }
468                    }
469
470                    // Having extracted and sent batches between each capability and the input frontier,
471                    // we should downgrade all capabilities to match the batcher's lower update frontier.
472                    // This may involve discarding capabilities, which is fine as any new updates arrive
473                    // in messages with new capabilities.
474
475                    let mut new_capabilities = Antichain::new();
476                    for time in batcher.frontier().iter() {
477                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
478                            new_capabilities.insert(capability.delayed(time));
479                        }
480                        else {
481                            panic!("failed to find capability");
482                        }
483                    }
484
485                    capabilities = new_capabilities;
486                }
487                else {
488                    // Announce progress updates, even without data. We seal the batcher to
489                    // advance its lower bound and frontier, but discard the readied updates
490                    // rather than building a batch we would immediately drop.
491                    let _ = batcher.seal(frontier.frontier().to_owned());
492                    writer.seal(frontier.frontier().to_owned());
493                }
494
495                prev_frontier.clear();
496                prev_frontier.extend(frontier.frontier().iter().cloned());
497            }
498
499            writer.exert();
500        }
501    });
502
503    Arranged { stream, trace: reader.unwrap() }
504}