differential_dataflow/trace/implementations/
mod.rs

1//! Implementations of `Trace` and associated traits.
2//!
3//! The `Trace` trait provides access to an ordered collection of `(key, val, time, diff)` tuples, but
4//! there is substantial flexibility in implementations of this trait. Depending on characteristics of
5//! the data, we may wish to represent the data in different ways. This module contains several of these
6//! implementations, and combiners for merging the results of different traces.
7//!
8//! As examples of implementations,
9//!
10//! *  The `trie` module is meant to represent general update tuples, with no particular assumptions made
11//!    about their contents. It organizes the data first by key, then by val, and then leaves the rest
12//!    in an unordered pile.
13//!
14//! *  The `keys` module is meant for collections whose value type is `()`, which is to say there is no
15//!    (key, val) structure on the records; all of them are just viewed as "keys".
16//!
17//! *  The `time` module is meant for collections with a single time value. This can remove repetition
18//!    from the representation, at the cost of requiring more instances and run-time merging.
19//!
20//! *  The `base` module is meant for collections with a single time value equivalent to the least time.
21//!    These collections must always accumulate to non-negative collections, and as such we can indicate
22//!    the frequency of an element by its multiplicity. This removes both the time and weight from the
23//!    representation, but is only appropriate for a subset (often substantial) of the data.
24//!
25//! Each of these representations is best suited for different data, but they can be combined to get the
26//! benefits of each, as appropriate. There are several `Cursor` combiners, `CursorList` and `CursorPair`,
27//! for homogeneous and inhomogeneous cursors, respectively.
28//!
29//! #Musings
30//!
31//! What is less clear is how to transfer updates between the representations at merge time in a tasteful
32//! way. Perhaps we could put an ordering on the representations, each pair with a dominant representation,
33//! and part of merging the latter filters updates into the former. Although back and forth might be
34//! appealing, more thinking is required to negotiate all of these policies.
35//!
36//! One option would be to require the layer builder to handle these smarts. Merging is currently done by
37//! the layer as part of custom code, but we could make it simply be "iterate through cursor, push results
38//! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite
39//! trace, rather than just a batch of the type merged.
40
41pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49// Opinionated takes on default spines.
50pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::borrow::{ToOwned};
58use std::convert::TryInto;
59
60use columnation::Columnation;
61use serde::{Deserialize, Serialize};
62use timely::Container;
63use timely::container::PushInto;
64use timely::progress::Timestamp;
65
66use crate::containers::TimelyStack;
67use crate::lattice::Lattice;
68use crate::difference::Semigroup;
69
70/// A type that names constituent update types.
71pub trait Update {
72    /// Key by which data are grouped.
73    type Key: Ord + Clone + 'static;
74    /// Values associated with the key.
75    type Val: Ord + Clone + 'static;
76    /// Time at which updates occur.
77    type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
78    /// Way in which updates occur.
79    type Diff: Ord + Semigroup + 'static;
80}
81
82impl<K,V,T,R> Update for ((K, V), T, R)
83where
84    K: Ord+Clone+'static,
85    V: Ord+Clone+'static,
86    T: Ord+Clone+Lattice+timely::progress::Timestamp,
87    R: Ord+Semigroup+'static,
88{
89    type Key = K;
90    type Val = V;
91    type Time = T;
92    type Diff = R;
93}
94
95/// A type with opinions on how updates should be laid out.
96pub trait Layout {
97    /// The represented update.
98    type Target: Update + ?Sized;
99    /// Container for update keys.
100    // NB: The `PushInto` constraint is only required by `rhh.rs` to push default values.
101    type KeyContainer: BatchContainer + PushInto<<Self::Target as Update>::Key>;
102    /// Container for update vals.
103    type ValContainer: BatchContainer;
104    /// Container for times.
105    type TimeContainer: BatchContainer<Owned = <Self::Target as Update>::Time> + PushInto<<Self::Target as Update>::Time>;
106    /// Container for diffs.
107    type DiffContainer: BatchContainer<Owned = <Self::Target as Update>::Diff> + PushInto<<Self::Target as Update>::Diff>;
108    /// Container for offsets.
109    type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
110}
111
112/// A layout that uses vectors
113pub struct Vector<U: Update> {
114    phantom: std::marker::PhantomData<U>,
115}
116
117impl<U: Update> Layout for Vector<U>
118where
119    U::Diff: Ord,
120{
121    type Target = U;
122    type KeyContainer = Vec<U::Key>;
123    type ValContainer = Vec<U::Val>;
124    type TimeContainer = Vec<U::Time>;
125    type DiffContainer = Vec<U::Diff>;
126    type OffsetContainer = OffsetList;
127}
128
129/// A layout based on timely stacks
130pub struct TStack<U: Update> {
131    phantom: std::marker::PhantomData<U>,
132}
133
134impl<U: Update> Layout for TStack<U>
135where
136    U::Key: Columnation,
137    U::Val: Columnation,
138    U::Time: Columnation,
139    U::Diff: Columnation + Ord,
140{
141    type Target = U;
142    type KeyContainer = TimelyStack<U::Key>;
143    type ValContainer = TimelyStack<U::Val>;
144    type TimeContainer = TimelyStack<U::Time>;
145    type DiffContainer = TimelyStack<U::Diff>;
146    type OffsetContainer = OffsetList;
147}
148
149/// A type with a preferred container.
150///
151/// Examples include types that implement `Clone` who prefer
152pub trait PreferredContainer : ToOwned {
153    /// The preferred container for the type.
154    type Container: BatchContainer + PushInto<Self::Owned>;
155}
156
157impl<T: Ord + Clone + 'static> PreferredContainer for T {
158    type Container = Vec<T>;
159}
160
161impl<T: Ord + Clone + 'static> PreferredContainer for [T] {
162    type Container = SliceContainer<T>;
163}
164
165/// An update and layout description based on preferred containers.
166pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
167    phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
168}
169
170impl<K,V,T,R> Update for Preferred<K, V, T, R>
171where
172    K: ToOwned + ?Sized,
173    K::Owned: Ord+Clone+'static,
174    V: ToOwned + ?Sized,
175    V::Owned: Ord+Clone+'static,
176    T: Ord+Clone+Lattice+timely::progress::Timestamp,
177    R: Ord+Clone+Semigroup+'static,
178{
179    type Key = K::Owned;
180    type Val = V::Owned;
181    type Time = T;
182    type Diff = R;
183}
184
185impl<K, V, T, D> Layout for Preferred<K, V, T, D>
186where
187    K: Ord+ToOwned+PreferredContainer + ?Sized,
188    K::Owned: Ord+Clone+'static,
189    V: Ord+ToOwned+PreferredContainer + ?Sized,
190    V::Owned: Ord+Clone+'static,
191    T: Ord+Clone+Lattice+timely::progress::Timestamp,
192    D: Ord+Clone+Semigroup+'static,
193{
194    type Target = Preferred<K, V, T, D>;
195    type KeyContainer = K::Container;
196    type ValContainer = V::Container;
197    type TimeContainer = Vec<T>;
198    type DiffContainer = Vec<D>;
199    type OffsetContainer = OffsetList;
200}
201
202/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
203#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
204pub struct OffsetList {
205    /// Length of a prefix of zero elements.
206    pub zero_prefix: usize,
207    /// Offsets that fit within a `u32`.
208    pub smol: Vec<u32>,
209    /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
210    pub chonk: Vec<u64>,
211}
212
213impl std::fmt::Debug for OffsetList {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        f.debug_list().entries(self.into_iter()).finish()
216    }
217}
218
219impl OffsetList {
220    /// Allocate a new list with a specified capacity.
221    pub fn with_capacity(cap: usize) -> Self {
222        Self {
223            zero_prefix: 0,
224            smol: Vec::with_capacity(cap),
225            chonk: Vec::new(),
226        }
227    }
228    /// Inserts the offset, as a `u32` if that is still on the table.
229    pub fn push(&mut self, offset: usize) {
230        if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
231            self.zero_prefix += 1;
232        }
233        else if self.chonk.is_empty() {
234            if let Ok(smol) = offset.try_into() {
235                self.smol.push(smol);
236            }
237            else {
238                self.chonk.push(offset.try_into().unwrap())
239            }
240        }
241        else {
242            self.chonk.push(offset.try_into().unwrap())
243        }
244    }
245    /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
246    pub fn index(&self, index: usize) -> usize {
247        if index < self.zero_prefix {
248            0
249        }
250        else if index - self.zero_prefix < self.smol.len() {
251            self.smol[index - self.zero_prefix].try_into().unwrap()
252        }
253        else {
254            self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
255        }
256    }
257    /// The number of offsets in the list.
258    pub fn len(&self) -> usize {
259        self.zero_prefix + self.smol.len() + self.chonk.len()
260    }
261}
262
263impl<'a> IntoIterator for &'a OffsetList {
264    type Item = usize;
265    type IntoIter = OffsetListIter<'a>;
266
267    fn into_iter(self) -> Self::IntoIter {
268        OffsetListIter {list: self, index: 0 }
269    }
270}
271
272/// An iterator for [`OffsetList`].
273pub struct OffsetListIter<'a> {
274    list: &'a OffsetList,
275    index: usize,
276}
277
278impl<'a> Iterator for OffsetListIter<'a> {
279    type Item = usize;
280
281    fn next(&mut self) -> Option<Self::Item> {
282        if self.index < self.list.len() {
283            let res = Some(self.list.index(self.index));
284            self.index += 1;
285            res
286        } else {
287            None
288        }
289    }
290}
291
292impl PushInto<usize> for OffsetList {
293    fn push_into(&mut self, item: usize) {
294        self.push(item);
295    }
296}
297
298impl BatchContainer for OffsetList {
299    type Owned = usize;
300    type ReadItem<'a> = usize;
301
302    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
303
304    fn with_capacity(size: usize) -> Self {
305        Self::with_capacity(size)
306    }
307
308    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
309        Self::with_capacity(cont1.len() + cont2.len())
310    }
311
312    fn index(&self, index: usize) -> Self::ReadItem<'_> {
313        self.index(index)
314    }
315
316    fn len(&self) -> usize {
317        self.len()
318    }
319}
320
321/// Behavior to split an update into principal components.
322pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
323    /// Key portion
324    type Key<'a>: Ord;
325    /// Value portion
326    type Val<'a>: Ord;
327    /// Time
328    type Time;
329    /// Diff
330    type Diff;
331
332    /// Split an item into separate parts.
333    fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
334
335    /// Test that the key equals a key in the layout's key container.
336    fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
337
338    /// Test that the value equals a key in the layout's value container.
339    fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
340
341    /// Count the number of distinct keys, (key, val) pairs, and total updates.
342    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
343}
344
345impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
346where
347    K: Ord + Clone + 'static,
348    KBC: BatchContainer,
349    for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>,
350    V: Ord + Clone + 'static,
351    VBC: BatchContainer,
352    for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>,
353    T: Timestamp + Lattice + Clone + 'static,
354    R: Ord + Semigroup + 'static,
355{
356    type Key<'a> = K;
357    type Val<'a> = V;
358    type Time = T;
359    type Diff = R;
360
361    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
362        (key, val, time, diff)
363    }
364
365    fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
366        KBC::reborrow(other) == this
367    }
368
369    fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
370        VBC::reborrow(other) == this
371    }
372
373    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
374        let mut keys = 0;
375        let mut vals = 0;
376        let mut upds = 0;
377        let mut prev_keyval = None;
378        for link in chain.iter() {
379            for ((key, val), _, _) in link.iter() {
380                if let Some((p_key, p_val)) = prev_keyval {
381                    if p_key != key {
382                        keys += 1;
383                        vals += 1;
384                    } else if p_val != val {
385                        vals += 1;
386                    }
387                } else {
388                    keys += 1;
389                    vals += 1;
390                }
391                upds += 1;
392                prev_keyval = Some((key, val));
393            }
394        }
395        (keys, vals, upds)
396    }
397}
398
399impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
400where
401    K: BatchContainer,
402    for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>,
403    K::Owned: Ord + Columnation + Clone + 'static,
404    V: BatchContainer,
405    for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>,
406    V::Owned: Ord + Columnation + Clone + 'static,
407    T: Timestamp + Lattice + Columnation + Clone + 'static,
408    R: Ord + Clone + Semigroup + Columnation + 'static,
409{
410    type Key<'a> = &'a K::Owned;
411    type Val<'a> = &'a V::Owned;
412    type Time = T;
413    type Diff = R;
414
415    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
416        (key, val, time.clone(), diff.clone())
417    }
418
419    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
420        K::reborrow(other) == *this
421    }
422
423    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
424        V::reborrow(other) == *this
425    }
426
427    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
428        let mut keys = 0;
429        let mut vals = 0;
430        let mut upds = 0;
431        let mut prev_keyval = None;
432        for link in chain.iter() {
433            for ((key, val), _, _) in link.iter() {
434                if let Some((p_key, p_val)) = prev_keyval {
435                    if p_key != key {
436                        keys += 1;
437                        vals += 1;
438                    } else if p_val != val {
439                        vals += 1;
440                    }
441                } else {
442                    keys += 1;
443                    vals += 1;
444                }
445                upds += 1;
446                prev_keyval = Some((key, val));
447            }
448        }
449        (keys, vals, upds)
450    }
451}
452
453pub use self::containers::{BatchContainer, SliceContainer};
454
455/// Containers for data that resemble `Vec<T>`, with leaner implementations.
456pub mod containers {
457
458    use columnation::Columnation;
459    use timely::container::PushInto;
460
461    use crate::containers::TimelyStack;
462    use crate::IntoOwned;
463
464    /// A general-purpose container resembling `Vec<T>`.
465    pub trait BatchContainer: for<'a> PushInto<Self::ReadItem<'a>> + 'static {
466        /// An owned instance of `Self::ReadItem<'_>`.
467        type Owned;
468
469        /// The type that can be read back out of the container.
470        type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>;
471
472        /// Push an item into this container
473        fn push<D>(&mut self, item: D) where Self: PushInto<D> {
474            self.push_into(item);
475        }
476        /// Creates a new container with sufficient capacity.
477        fn with_capacity(size: usize) -> Self;
478        /// Creates a new container with sufficient capacity.
479        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
480
481        /// Converts a read item into one with a narrower lifetime.
482        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
483
484        /// Reference to the element at this position.
485        fn index(&self, index: usize) -> Self::ReadItem<'_>;
486
487        /// Reference to the element at this position, if it exists.
488        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
489            if index < self.len() {
490                Some(self.index(index))
491            }
492            else { None }
493        }
494
495        /// Number of contained elements
496        fn len(&self) -> usize;
497        /// Returns the last item if the container is non-empty.
498        fn last(&self) -> Option<Self::ReadItem<'_>> {
499            if self.len() > 0 {
500                Some(self.index(self.len()-1))
501            }
502            else {
503                None
504            }
505        }
506        /// Indicates if the length is zero.
507        fn is_empty(&self) -> bool { self.len() == 0 }
508
509        /// Reports the number of elements satisfying the predicate.
510        ///
511        /// This methods *relies strongly* on the assumption that the predicate
512        /// stays false once it becomes false, a joint property of the predicate
513        /// and the layout of `Self. This allows `advance` to use exponential search to
514        /// count the number of elements in time logarithmic in the result.
515        fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
516
517            let small_limit = 8;
518
519            // Exponential search if the answer isn't within `small_limit`.
520            if end > start + small_limit && function(self.index(start + small_limit)) {
521
522                // start with no advance
523                let mut index = small_limit + 1;
524                if start + index < end && function(self.index(start + index)) {
525
526                    // advance in exponentially growing steps.
527                    let mut step = 1;
528                    while start + index + step < end && function(self.index(start + index + step)) {
529                        index += step;
530                        step <<= 1;
531                    }
532
533                    // advance in exponentially shrinking steps.
534                    step >>= 1;
535                    while step > 0 {
536                        if start + index + step < end && function(self.index(start + index + step)) {
537                            index += step;
538                        }
539                        step >>= 1;
540                    }
541
542                    index += 1;
543                }
544
545                index
546            }
547            else {
548                let limit = std::cmp::min(end, start + small_limit);
549                (start .. limit).filter(|x| function(self.index(*x))).count()
550            }
551        }
552    }
553
554    // All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
555    // struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
556    impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
557        type Owned = T;
558        type ReadItem<'a> = &'a T;
559
560        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
561
562        fn with_capacity(size: usize) -> Self {
563            Vec::with_capacity(size)
564        }
565        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
566            Vec::with_capacity(cont1.len() + cont2.len())
567        }
568        fn index(&self, index: usize) -> Self::ReadItem<'_> {
569            &self[index]
570        }
571        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
572            <[T]>::get(&self, index)
573        }
574        fn len(&self) -> usize {
575            self[..].len()
576        }
577    }
578
579    // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
580    // be presented with the actual contained type, rather than a type that borrows into it.
581    impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
582        type Owned = T;
583        type ReadItem<'a> = &'a T;
584
585        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
586
587        fn with_capacity(size: usize) -> Self {
588            Self::with_capacity(size)
589        }
590        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
591            let mut new = Self::default();
592            new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
593            new
594        }
595        fn index(&self, index: usize) -> Self::ReadItem<'_> {
596            &self[index]
597        }
598        fn len(&self) -> usize {
599            self[..].len()
600        }
601    }
602
603    /// A container that accepts slices `[B::Item]`.
604    pub struct SliceContainer<B> {
605        /// Offsets that bound each contained slice.
606        ///
607        /// The length will be one greater than the number of contained slices,
608        /// starting with zero and ending with `self.inner.len()`.
609        offsets: Vec<usize>,
610        /// An inner container for sequences of `B` that dereferences to a slice.
611        inner: Vec<B>,
612    }
613
614    impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
615        fn push_into(&mut self, item: &[B]) {
616            for x in item.iter() {
617                self.inner.push_into(x);
618            }
619            self.offsets.push(self.inner.len());
620        }
621    }
622
623    impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
624        fn push_into(&mut self, item: &Vec<B>) {
625            self.push_into(&item[..]);
626        }
627    }
628
629    impl<B> PushInto<Vec<B>> for SliceContainer<B> {
630        fn push_into(&mut self, item: Vec<B>) {
631            for x in item.into_iter() {
632                self.inner.push(x);
633            }
634            self.offsets.push(self.inner.len());
635        }
636    }
637
638    impl<B> BatchContainer for SliceContainer<B>
639    where
640        B: Ord + Clone + Sized + 'static,
641    {
642        type Owned = Vec<B>;
643        type ReadItem<'a> = &'a [B];
644
645        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
646
647        fn with_capacity(size: usize) -> Self {
648            let mut offsets = Vec::with_capacity(size + 1);
649            offsets.push(0);
650            Self {
651                offsets,
652                inner: Vec::with_capacity(size),
653            }
654        }
655        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
656            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
657            offsets.push(0);
658            Self {
659                offsets,
660                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
661            }
662        }
663        fn index(&self, index: usize) -> Self::ReadItem<'_> {
664            let lower = self.offsets[index];
665            let upper = self.offsets[index+1];
666            &self.inner[lower .. upper]
667        }
668        fn len(&self) -> usize {
669            self.offsets.len() - 1
670        }
671    }
672
673    /// Default implementation introduces a first offset.
674    impl<B> Default for SliceContainer<B> {
675        fn default() -> Self {
676            Self {
677                offsets: vec![0],
678                inner: Default::default(),
679            }
680        }
681    }
682}