Skip to main content

differential_dataflow/operators/arrange/
arrangement.rs

1//! Arranges a collection into a re-usable trace structure.
2//!
3//! The `arrange` operator applies to a differential dataflow `Collection` and returns an `Arranged`
4//! structure, provides access to both an indexed form of accepted updates as well as a stream of
5//! batches of newly arranged updates.
6//!
7//! Several operators (`join`, `reduce`, and `count`, among others) are implemented against `Arranged`,
8//! and can be applied directly to arranged data instead of the collection. Internally, the operators
9//! will borrow the shared state, and listen on the timely stream for shared batches of data. The
10//! resources to index the collection---communication, computation, and memory---are spent only once,
11//! and only one copy of the index needs to be maintained as the collection changes.
12//!
13//! The arranged collection is stored in a trace, whose append-only operation means that it is safe to
14//! share between the single `arrange` writer and multiple readers. Each reader is expected to interrogate
15//! the trace only at times for which it knows the trace is complete, as indicated by the frontiers on its
16//! incoming channels. Failing to do this is "safe" in the Rust sense of memory safety, but the reader may
17//! see ill-defined data at times for which the trace is not complete. (All current implementations
18//! commit only completed data to the trace).
19
20use timely::dataflow::operators::{Enter, vec::Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, VecCollection, AsCollection};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::merge_batcher::container::InternalMerge;
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38
39use super::TraceAgent;
40
41/// An arranged collection of `(K,V)` values.
42///
43/// An `Arranged` allows multiple differential operators to share the resources (communication,
44/// computation, memory) required to produce and maintain an indexed representation of a collection.
45pub struct Arranged<G, Tr>
46where
47    G: Scope<Timestamp: Lattice+Ord>,
48    Tr: TraceReader+Clone,
49{
50    /// A stream containing arranged updates.
51    ///
52    /// This stream contains the same batches of updates the trace itself accepts, so there should
53    /// be no additional overhead to receiving these records. The batches can be navigated just as
54    /// the batches in the trace, by key and by value.
55    pub stream: Stream<G, Vec<Tr::Batch>>,
56    /// A shared trace, updated by the `Arrange` operator and readable by others.
57    pub trace: Tr,
58    // TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
59    // returns when invoked, so as to not duplicate work with multiple calls to `as_collection`.
60}
61
62impl<G, Tr> Clone for Arranged<G, Tr>
63where
64    G: Scope<Timestamp=Tr::Time>,
65    Tr: TraceReader + Clone,
66{
67    fn clone(&self) -> Self {
68        Arranged {
69            stream: self.stream.clone(),
70            trace: self.trace.clone(),
71        }
72    }
73}
74
75use ::timely::dataflow::scopes::Child;
76use ::timely::progress::timestamp::Refines;
77use timely::Container;
78use timely::container::PushInto;
79
80impl<G, Tr> Arranged<G, Tr>
81where
82    G: Scope<Timestamp=Tr::Time>,
83    Tr: TraceReader + Clone,
84{
85    /// Brings an arranged collection into a nested scope.
86    ///
87    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
88    /// have all been extended with an additional coordinate with the default value. The resulting collection does
89    /// not vary with the new timestamp coordinate.
90    pub fn enter<'a, TInner>(self, child: &Child<'a, G, TInner>)
91        -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
92        where
93            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
94    {
95        Arranged {
96            stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
97            trace: TraceEnter::make_from(self.trace),
98        }
99    }
100
101    /// Brings an arranged collection into a nested region.
102    ///
103    /// This method only applies to *regions*, which are subscopes with the same timestamp
104    /// as their containing scope. In this case, the trace type does not need to change.
105    pub fn enter_region<'a>(self, child: &Child<'a, G, G::Timestamp>)
106        -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
107        Arranged {
108            stream: self.stream.enter(child),
109            trace: self.trace,
110        }
111    }
112
113    /// Brings an arranged collection into a nested scope.
114    ///
115    /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps
116    /// have all been extended with an additional coordinate with the default value. The resulting collection does
117    /// not vary with the new timestamp coordinate.
118    pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G, TInner>, logic: F, prior: P)
119        -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
120        where
121            TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
122            F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
123            P: FnMut(&TInner)->Tr::Time+Clone+'static,
124        {
125        let logic1 = logic.clone();
126        let logic2 = logic.clone();
127        Arranged {
128            trace: TraceEnterAt::make_from(self.trace, logic1, prior),
129            stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
130        }
131    }
132
133    /// Extracts a collection of any container from the stream of batches.
134    ///
135    /// This method is like `self.stream.flat_map`, except that it produces containers
136    /// directly, rather than form a container of containers as `flat_map` would.
137    pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<G, I::Item>
138    where
139        I: IntoIterator<Item: Container>,
140        L: FnMut(Tr::Batch) -> I+'static,
141    {
142        self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
143            input.for_each(|time, data| {
144                let mut session = output.session(&time);
145                for wrapper in data.drain(..) {
146                    for mut container in logic(wrapper) {
147                        session.give_container(&mut container);
148                    }
149                }
150            });
151        })
152        .as_collection()
153    }
154
155    /// Flattens the stream into a `VecCollection`.
156    ///
157    /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
158    /// and this method should only be used when the data need to be transformed or exchanged, rather than
159    /// supplied as arguments to an operator using the same key-value structure.
160    pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
161        where
162            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
163    {
164        self.flat_map_ref(move |key, val| Some(logic(key,val)))
165    }
166
167    /// Flattens the stream into a `VecCollection`.
168    ///
169    /// The underlying `Stream<G, Vec<BatchWrapper<T::Batch>>>` is a much more efficient way to access the data,
170    /// and this method should only be used when the data need to be transformed or exchanged, rather than
171    /// supplied as arguments to an operator using the same key-value structure.
172    pub fn as_vecs(self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
173    where
174        Tr::KeyOwn: crate::ExchangeData,
175        Tr::ValOwn: crate::ExchangeData,
176    {
177        self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
178    }
179
180    /// Extracts elements from an arrangement as a `VecCollection`.
181    ///
182    /// The supplied logic may produce an iterator over output values, allowing either
183    /// filtering or flat mapping as part of the extraction.
184    pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
185        where
186            I: IntoIterator<Item: Data>,
187            L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
188    {
189        Self::flat_map_batches(self.stream, logic)
190    }
191
192    /// Extracts elements from a stream of batches as a `VecCollection`.
193    ///
194    /// The supplied logic may produce an iterator over output values, allowing either
195    /// filtering or flat mapping as part of the extraction.
196    ///
197    /// This method exists for streams of batches without the corresponding arrangement.
198    /// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
199    pub fn flat_map_batches<I, L>(stream: Stream<G, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<G, I::Item, Tr::Diff>
200    where
201        I: IntoIterator<Item: Data>,
202        L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
203    {
204        stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
205            input.for_each(|time, data| {
206                let mut session = output.session(&time);
207                for wrapper in data.iter() {
208                    let batch = &wrapper;
209                    let mut cursor = batch.cursor();
210                    while let Some(key) = cursor.get_key(batch) {
211                        while let Some(val) = cursor.get_val(batch) {
212                            for datum in logic(key, val) {
213                                cursor.map_times(batch, |time, diff| {
214                                    session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
215                                });
216                            }
217                            cursor.step_val(batch);
218                        }
219                        cursor.step_key(batch);
220                    }
221                }
222            });
223        })
224        .as_collection()
225    }
226}
227
228
229use crate::difference::Multiply;
230// Direct join implementations.
231impl<G, T1> Arranged<G, T1>
232where
233    G: Scope<Timestamp=T1::Time>,
234    T1: TraceReader + Clone + 'static,
235{
236    /// A convenience method to join and produce `VecCollection` output.
237    ///
238    /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
239    pub fn join_core<T2,I,L>(self, other: Arranged<G,T2>, mut result: L) -> VecCollection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
240    where
241        T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
242        T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
243        I: IntoIterator<Item: Data>,
244        L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
245    {
246        let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
247            let t = t.clone();
248            let r = (r1.clone()).multiply(r2);
249            result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
250        };
251
252        use crate::operators::join::join_traces;
253        join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
254            self,
255            other,
256            move |k, v1, v2, t, d1, d2, c| {
257                for datum in result(k, v1, v2, t, d1, d2) {
258                    c.give(datum);
259                }
260            }
261        )
262            .as_collection()
263    }
264}
265
266// Direct reduce implementations.
267use crate::difference::Abelian;
268impl<G, T1> Arranged<G, T1>
269where
270    G: Scope<Timestamp = T1::Time>,
271    T1: TraceReader + Clone + 'static,
272{
273    /// A direct implementation of `ReduceCore::reduce_abelian`.
274    pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
275    where
276        T1: TraceReader<KeyOwn: Ord>,
277        T2: for<'a> Trace<
278            Key<'a>= T1::Key<'a>,
279            KeyOwn=T1::KeyOwn,
280            ValOwn: Data,
281            Time=T1::Time,
282            Diff: Abelian,
283        >+'static,
284        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
285        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
286    {
287        self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
288            if !input.is_empty() {
289                logic(key, input, change);
290            }
291            change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
292            crate::consolidation::consolidate(change);
293        })
294    }
295
296    /// A direct implementation of `ReduceCore::reduce_core`.
297    pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
298    where
299        T1: TraceReader<KeyOwn: Ord>,
300        T2: for<'a> Trace<
301            Key<'a>=T1::Key<'a>,
302            KeyOwn=T1::KeyOwn,
303            ValOwn: Data,
304            Time=T1::Time,
305        >+'static,
306        Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
307        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
308    {
309        use crate::operators::reduce::reduce_trace;
310        reduce_trace::<_,_,Bu,_,_>(self, name, logic)
311    }
312}
313
314
315impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
316where
317    G: Scope<Timestamp=Tr::Time>,
318    Tr: TraceReader + Clone,
319{
320    /// Brings an arranged collection out of a nested region.
321    ///
322    /// This method only applies to *regions*, which are subscopes with the same timestamp
323    /// as their containing scope. In this case, the trace type does not need to change.
324    pub fn leave_region(self) -> Arranged<G, Tr> {
325        use timely::dataflow::operators::Leave;
326        Arranged {
327            stream: self.stream.leave(),
328            trace: self.trace,
329        }
330    }
331}
332
333/// A type that can be arranged as if a collection of updates.
334pub trait Arrange<G, C> : Sized
335where
336    G: Scope<Timestamp: Lattice>,
337{
338    /// Arranges updates into a shared trace.
339    fn arrange<Ba, Bu, Tr>(self) -> Arranged<G, TraceAgent<Tr>>
340    where
341        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
342        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
343        Tr: Trace<Time=G::Timestamp> + 'static,
344    {
345        self.arrange_named::<Ba, Bu, Tr>("Arrange")
346    }
347
348    /// Arranges updates into a shared trace, with a supplied name.
349    fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
350    where
351        Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
352        Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
353        Tr: Trace<Time=G::Timestamp> + 'static,
354    ;
355}
356
357/// Arranges a stream of updates by a key, configured with a name and a parallelization contract.
358///
359/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
360/// It uses the supplied parallelization contract to distribute the data, which does not need to
361/// be consistently by key (though this is the most common).
362pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: Stream<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
363where
364    G: Scope<Timestamp: Lattice>,
365    P: ParallelizationContract<G::Timestamp, Ba::Input>,
366    Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
367    Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
368    Tr: Trace<Time=G::Timestamp>+'static,
369{
370    // The `Arrange` operator is tasked with reacting to an advancing input
371    // frontier by producing the sequence of batches whose lower and upper
372    // bounds are those frontiers, containing updates at times greater or
373    // equal to lower and not greater or equal to upper.
374    //
375    // The operator uses its batch type's `Batcher`, which accepts update
376    // triples and responds to requests to "seal" batches (presented as new
377    // upper frontiers).
378    //
379    // Each sealed batch is presented to the trace, and if at all possible
380    // transmitted along the outgoing channel. Empty batches may not have
381    // a corresponding capability, as they are only retained for actual data
382    // held by the batcher, which may prevents the operator from sending an
383    // empty batch.
384
385    let mut reader: Option<TraceAgent<Tr>> = None;
386
387    // fabricate a data-parallel operator using the `unary_notify` pattern.
388    let reader_ref = &mut reader;
389    let scope = stream.scope();
390
391    let stream = stream.unary_frontier(pact, name, move |_capability, info| {
392
393        // Acquire a logger for arrange events.
394        let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
395
396        // Where we will deposit received updates, and from which we extract batches.
397        let mut batcher = Ba::new(logger.clone(), info.global_id);
398
399        // Capabilities for the lower envelope of updates in `batcher`.
400        let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
401
402        let activator = Some(scope.activator_for(info.address.clone()));
403        let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
404        // If there is default exertion logic set, install it.
405        if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
406            empty_trace.set_exert_logic(exert_logic);
407        }
408
409        let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
410
411        *reader_ref = Some(reader_local);
412
413        // Initialize to the minimal input frontier.
414        let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
415
416        move |(input, frontier), output| {
417
418            // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
419            // We don't have to keep all capabilities, but we need to be able to form output messages
420            // when we realize that time intervals are complete.
421
422            input.for_each(|cap, data| {
423                capabilities.insert(cap.retain(0));
424                batcher.push_container(data);
425            });
426
427            // The frontier may have advanced by multiple elements, which is an issue because
428            // timely dataflow currently only allows one capability per message. This means we
429            // must pretend to process the frontier advances one element at a time, batching
430            // and sending smaller bites than we might have otherwise done.
431
432            // Assert that the frontier never regresses.
433            assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
434
435            // Test to see if strict progress has occurred, which happens whenever the new
436            // frontier isn't equal to the previous. It is only in this case that we have any
437            // data processing to do.
438            if prev_frontier.borrow() != frontier.frontier() {
439                // There are two cases to handle with some care:
440                //
441                // 1. If any held capabilities are not in advance of the new input frontier,
442                //    we must carve out updates now in advance of the new input frontier and
443                //    transmit them as batches, which requires appropriate *single* capabilities;
444                //    Until timely dataflow supports multiple capabilities on messages, at least.
445                //
446                // 2. If there are no held capabilities in advance of the new input frontier,
447                //    then there are no updates not in advance of the new input frontier and
448                //    we can simply create an empty input batch with the new upper frontier
449                //    and feed this to the trace agent (but not along the timely output).
450
451                // If there is at least one capability not in advance of the input frontier ...
452                if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
453
454                    let mut upper = Antichain::new();   // re-used allocation for sealing batches.
455
456                    // For each capability not in advance of the input frontier ...
457                    for (index, capability) in capabilities.elements().iter().enumerate() {
458
459                        if !frontier.less_equal(capability.time()) {
460
461                            // Assemble the upper bound on times we can commit with this capabilities.
462                            // We must respect the input frontier, and *subsequent* capabilities, as
463                            // we are pretending to retire the capability changes one by one.
464                            upper.clear();
465                            for time in frontier.frontier().iter() {
466                                upper.insert(time.clone());
467                            }
468                            for other_capability in &capabilities.elements()[(index + 1) .. ] {
469                                upper.insert(other_capability.time().clone());
470                            }
471
472                            // Extract updates not in advance of `upper`.
473                            let batch = batcher.seal::<Bu>(upper.clone());
474
475                            writer.insert(batch.clone(), Some(capability.time().clone()));
476
477                            // send the batch to downstream consumers, empty or not.
478                            output.session(&capabilities.elements()[index]).give(batch);
479                        }
480                    }
481
482                    // Having extracted and sent batches between each capability and the input frontier,
483                    // we should downgrade all capabilities to match the batcher's lower update frontier.
484                    // This may involve discarding capabilities, which is fine as any new updates arrive
485                    // in messages with new capabilities.
486
487                    let mut new_capabilities = Antichain::new();
488                    for time in batcher.frontier().iter() {
489                        if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
490                            new_capabilities.insert(capability.delayed(time));
491                        }
492                        else {
493                            panic!("failed to find capability");
494                        }
495                    }
496
497                    capabilities = new_capabilities;
498                }
499                else {
500                    // Announce progress updates, even without data.
501                    let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
502                    writer.seal(frontier.frontier().to_owned());
503                }
504
505                prev_frontier.clear();
506                prev_frontier.extend(frontier.frontier().iter().cloned());
507            }
508
509            writer.exert();
510        }
511    });
512
513    Arranged { stream, trace: reader.unwrap() }
514}