differential_dataflow/trace/implementations/
mod.rs

1//! Implementations of `Trace` and associated traits.
2//!
3//! The `Trace` trait provides access to an ordered collection of `(key, val, time, diff)` tuples, but
4//! there is substantial flexibility in implementations of this trait. Depending on characteristics of
5//! the data, we may wish to represent the data in different ways. This module contains several of these
6//! implementations, and combiners for merging the results of different traces.
7//!
8//! As examples of implementations,
9//!
10//! *  The `trie` module is meant to represent general update tuples, with no particular assumptions made
11//!    about their contents. It organizes the data first by key, then by val, and then leaves the rest
12//!    in an unordered pile.
13//!
14//! *  The `keys` module is meant for collections whose value type is `()`, which is to say there is no
15//!    (key, val) structure on the records; all of them are just viewed as "keys".
16//!
17//! *  The `time` module is meant for collections with a single time value. This can remove repetition
18//!    from the representation, at the cost of requiring more instances and run-time merging.
19//!
20//! *  The `base` module is meant for collections with a single time value equivalent to the least time.
21//!    These collections must always accumulate to non-negative collections, and as such we can indicate
22//!    the frequency of an element by its multiplicity. This removes both the time and weight from the
23//!    representation, but is only appropriate for a subset (often substantial) of the data.
24//!
25//! Each of these representations is best suited for different data, but they can be combined to get the
26//! benefits of each, as appropriate. There are several `Cursor` combiners, `CursorList` and `CursorPair`,
27//! for homogenous and inhomogenous cursors, respectively.
28//!
29//! #Musings
30//!
31//! What is less clear is how to transfer updates between the representations at merge time in a tasteful
32//! way. Perhaps we could put an ordering on the representations, each pair with a dominant representation,
33//! and part of merging the latter filters updates into the former. Although back and forth might be
34//! appealing, more thinking is required to negotiate all of these policies.
35//!
36//! One option would be to require the layer builder to handle these smarts. Merging is currently done by
37//! the layer as part of custom code, but we could make it simply be "iterate through cursor, push results
38//! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite
39//! trace, rather than just a batch of the type merged.
40
41pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod merge_batcher_col;
45
46pub use self::merge_batcher::MergeBatcher as Batcher;
47
48pub mod ord_neu;
49pub mod rhh;
50pub mod huffman_container;
51
52// Opinionated takes on default spines.
53pub use self::ord_neu::OrdValSpine as ValSpine;
54pub use self::ord_neu::OrdKeySpine as KeySpine;
55
56use std::borrow::{ToOwned};
57
58use timely::container::columnation::{Columnation, TimelyStack};
59use crate::lattice::Lattice;
60use crate::difference::Semigroup;
61
62/// A type that names constituent update types.
63pub trait Update {
64    /// Key by which data are grouped.
65    type Key: Ord + Clone + 'static;
66    /// Values associated with the key.
67    type Val: Ord + Clone + 'static;
68    /// Time at which updates occur.
69    type Time: Ord+Lattice+timely::progress::Timestamp+Clone;
70    /// Way in which updates occur.
71    type Diff: Semigroup+Clone;
72}
73
74impl<K,V,T,R> Update for ((K, V), T, R)
75where
76    K: Ord+Clone+'static,
77    V: Ord+Clone+'static,
78    T: Ord+Lattice+timely::progress::Timestamp+Clone,
79    R: Semigroup+Clone,
80{
81    type Key = K;
82    type Val = V;
83    type Time = T;
84    type Diff = R;
85}
86
87/// A type with opinions on how updates should be laid out.
88pub trait Layout {
89    /// The represented update.
90    type Target: Update + ?Sized;
91    /// Container for update keys.
92    type KeyContainer:
93        BatchContainer<PushItem=<Self::Target as Update>::Key>;
94    /// Container for update vals.
95    type ValContainer:
96        BatchContainer<PushItem=<Self::Target as Update>::Val>;
97    /// Container for update vals.
98    type UpdContainer:
99        for<'a> BatchContainer<PushItem=(<Self::Target as Update>::Time, <Self::Target as Update>::Diff), ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
100}
101
102/// A layout that uses vectors
103pub struct Vector<U: Update> {
104    phantom: std::marker::PhantomData<U>,
105}
106
107impl<U: Update> Layout for Vector<U>
108where
109    U::Key: 'static,
110    U::Val: 'static,
111{
112    type Target = U;
113    type KeyContainer = Vec<U::Key>;
114    type ValContainer = Vec<U::Val>;
115    type UpdContainer = Vec<(U::Time, U::Diff)>;
116}
117
118/// A layout based on timely stacks
119pub struct TStack<U: Update> {
120    phantom: std::marker::PhantomData<U>,
121}
122
123impl<U: Update> Layout for TStack<U>
124where
125    U::Key: Columnation + 'static,
126    U::Val: Columnation + 'static,
127    U::Time: Columnation,
128    U::Diff: Columnation,
129{
130    type Target = U;
131    type KeyContainer = TimelyStack<U::Key>;
132    type ValContainer = TimelyStack<U::Val>;
133    type UpdContainer = TimelyStack<(U::Time, U::Diff)>;
134}
135
136/// A type with a preferred container.
137///
138/// Examples include types that implement `Clone` who prefer 
139pub trait PreferredContainer : ToOwned {
140    /// The preferred container for the type.
141    type Container: BatchContainer<PushItem=Self::Owned>;
142}
143
144impl<T: Ord + Clone + 'static> PreferredContainer for T {
145    type Container = Vec<T>;
146}
147
148impl<T: Ord + Clone + 'static> PreferredContainer for [T] {
149    type Container = SliceContainer2<T>;
150}
151
152/// An update and layout description based on preferred containers.
153pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
154    phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
155}
156
157impl<K,V,T,R> Update for Preferred<K, V, T, R>
158where
159    K: ToOwned + ?Sized,
160    K::Owned: Ord+Clone+'static,
161    V: ToOwned + ?Sized + 'static,
162    V::Owned: Ord+Clone,
163    T: Ord+Lattice+timely::progress::Timestamp+Clone,
164    R: Semigroup+Clone,
165{
166    type Key = K::Owned;
167    type Val = V::Owned;
168    type Time = T;
169    type Diff = R;
170}
171
172impl<K, V, T, D> Layout for Preferred<K, V, T, D>
173where
174    K: Ord+ToOwned+PreferredContainer + ?Sized,
175    K::Owned: Ord+Clone+'static,
176    // for<'a> K::Container: BatchContainer<ReadItem<'a> = &'a K>,
177    V: Ord+ToOwned+PreferredContainer + ?Sized + 'static,
178    V::Owned: Ord+Clone,
179    T: Ord+Lattice+timely::progress::Timestamp+Clone,
180    D: Semigroup+Clone,
181{
182    type Target = Preferred<K, V, T, D>;
183    type KeyContainer = K::Container;
184    type ValContainer = V::Container;
185    type UpdContainer = Vec<(T, D)>;
186}
187
188use std::convert::TryInto;
189use abomonation_derive::Abomonation;
190
191/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
192#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)]
193pub struct OffsetList {
194    /// Offsets that fit within a `u32`.
195    pub smol: Vec<u32>,
196    /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
197    pub chonk: Vec<u64>,
198}
199
200impl OffsetList {
201    /// Inserts the offset, as a `u32` if that is still on the table.
202    pub fn push(&mut self, offset: usize) {
203        if self.chonk.is_empty() {
204            if let Ok(smol) = offset.try_into() {
205                self.smol.push(smol);
206            } 
207            else {
208                self.chonk.push(offset.try_into().unwrap())
209            }
210        }
211        else {
212            self.chonk.push(offset.try_into().unwrap())
213        }
214    }
215    /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
216    pub fn index(&self, index: usize) -> usize {
217        if index < self.smol.len() {
218            self.smol[index].try_into().unwrap()
219        }
220        else {
221            self.chonk[index - self.smol.len()].try_into().unwrap()
222        }
223    }
224    /// Set the offset at location index.
225    ///
226    /// Complicated if `offset` does not fit into `self.smol`.
227    pub fn set(&mut self, index: usize, offset: usize) {
228        if index < self.smol.len() {
229            if let Ok(off) = offset.try_into() {
230                self.smol[index] = off;
231            }
232            else {
233                // Move all `smol` elements from `index` onward to the front of `chonk`.
234                self.chonk.splice(0..0, self.smol.drain(index ..).map(|x| x.try_into().unwrap()));
235                self.chonk[index - self.smol.len()] = offset.try_into().unwrap();
236            }
237        }
238        else {
239            self.chonk[index - self.smol.len()] = offset.try_into().unwrap();
240        }
241    }
242    /// The last element in the list of offsets, if non-empty.
243    pub fn last(&self) -> Option<usize> {
244        if self.chonk.is_empty() {
245            self.smol.last().map(|x| (*x).try_into().unwrap())
246        }
247        else {
248            self.chonk.last().map(|x| (*x).try_into().unwrap())
249        }
250    }
251    /// THe number of offsets in the list.
252    pub fn len(&self) -> usize {
253        self.smol.len() + self.chonk.len()
254    }
255    /// Allocate a new list with a specified capacity.
256    pub fn with_capacity(cap: usize) -> Self {
257        Self {
258            smol: Vec::with_capacity(cap),
259            chonk: Vec::new(),
260        }
261    }
262    /// Trim all elements at index `length` and greater.
263    pub fn truncate(&mut self, length: usize) {
264        if length > self.smol.len() {
265            self.chonk.truncate(length - self.smol.len());
266        }
267        else {
268            assert!(self.chonk.is_empty());
269            self.smol.truncate(length);
270        }
271    }
272}
273
274pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2};
275
276/// Containers for data that resemble `Vec<T>`, with leaner implementations.
277pub mod containers {
278
279    use timely::container::columnation::{Columnation, TimelyStack};
280
281    use std::borrow::{Borrow, ToOwned};
282    use crate::trace::MyTrait;
283
284    /// A general-purpose container resembling `Vec<T>`.
285    pub trait BatchContainer: Default + 'static {
286        /// The type of contained item.
287        ///
288        /// The container only supplies references to the item, so it needn't be sized.
289        type PushItem;
290        /// The type that can be read back out of the container.
291        type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd<Self::ReadItem<'b>>;
292        /// Inserts an owned item.
293        fn push(&mut self, item: Self::PushItem);
294        /// Inserts an owned item.
295        fn copy_push(&mut self, item: &Self::PushItem);
296        /// Inserts a borrowed item.
297        fn copy(&mut self, item: Self::ReadItem<'_>);
298        /// Extends from a slice of items.
299        fn copy_slice(&mut self, slice: &[Self::PushItem]);
300        /// Extends from a range of items in another`Self`.
301        fn copy_range(&mut self, other: &Self, start: usize, end: usize);
302        /// Creates a new container with sufficient capacity.
303        fn with_capacity(size: usize) -> Self;
304        /// Reserves additional capacity.
305        fn reserve(&mut self, additional: usize);
306        /// Creates a new container with sufficient capacity.
307        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
308
309        /// Reference to the element at this position.
310        fn index(&self, index: usize) -> Self::ReadItem<'_>;
311        /// Number of contained elements
312        fn len(&self) -> usize;
313        /// Returns the last item if the container is non-empty.
314        fn last(&self) -> Option<Self::ReadItem<'_>> {
315            if self.len() > 0 {
316                Some(self.index(self.len()-1))
317            }
318            else {
319                None
320            }
321        }
322
323        /// Reports the number of elements satisfing the predicate.
324        ///
325        /// This methods *relies strongly* on the assumption that the predicate
326        /// stays false once it becomes false, a joint property of the predicate
327        /// and the layout of `Self. This allows `advance` to use exponential search to
328        /// count the number of elements in time logarithmic in the result.
329        fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
330
331            let small_limit = 8;
332
333            // Exponential seach if the answer isn't within `small_limit`.
334            if end > start + small_limit && function(self.index(start + small_limit)) {
335
336                // start with no advance
337                let mut index = small_limit + 1;
338                if start + index < end && function(self.index(start + index)) {
339
340                    // advance in exponentially growing steps.
341                    let mut step = 1;
342                    while start + index + step < end && function(self.index(start + index + step)) {
343                        index += step;
344                        step <<= 1;
345                    }
346
347                    // advance in exponentially shrinking steps.
348                    step >>= 1;
349                    while step > 0 {
350                        if start + index + step < end && function(self.index(start + index + step)) {
351                            index += step;
352                        }
353                        step >>= 1;
354                    }
355
356                    index += 1;
357                }
358
359                index
360            }
361            else {
362                let limit = std::cmp::min(end, start + small_limit);
363                (start .. limit).filter(|x| function(self.index(*x))).count()
364            }
365        }
366    }
367
368    // All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
369    // struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
370    impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
371        type PushItem = T;
372        type ReadItem<'a> = &'a Self::PushItem;
373
374        fn push(&mut self, item: T) {
375            self.push(item);
376        }
377        fn copy_push(&mut self, item: &T) {
378            self.copy(item);
379        }
380        fn copy(&mut self, item: &T) {
381            self.push(item.clone());
382        }
383        fn copy_slice(&mut self, slice: &[T]) {
384            self.extend_from_slice(slice);
385        }
386        fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
387            self.extend_from_slice(&other[start .. end]);
388        }
389        fn with_capacity(size: usize) -> Self {
390            Vec::with_capacity(size)
391        }
392        fn reserve(&mut self, additional: usize) {
393            self.reserve(additional);
394        }
395        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
396            Vec::with_capacity(cont1.len() + cont2.len())
397        }
398        fn index(&self, index: usize) -> Self::ReadItem<'_> {
399            &self[index]
400        }
401        fn len(&self) -> usize {
402            self[..].len()
403        }
404    }
405
406    // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
407    // be presented with the actual contained type, rather than a type that borrows into it.
408    impl<T: Ord + Columnation + ToOwned<Owned = T> + 'static> BatchContainer for TimelyStack<T> {
409        type PushItem = T;
410        type ReadItem<'a> = &'a Self::PushItem;
411
412        fn push(&mut self, item: Self::PushItem) {
413            self.copy(item.borrow());
414        }
415        fn copy_push(&mut self, item: &Self::PushItem) {
416            self.copy(item);
417        }
418        fn copy(&mut self, item: &T) {
419            self.copy(item);
420        }
421        fn copy_slice(&mut self, slice: &[Self::PushItem]) {
422            self.reserve_items(slice.iter());
423            for item in slice.iter() {
424                self.copy(item);
425            }
426        }
427        fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
428            let slice = &other[start .. end];
429            self.reserve_items(slice.iter());
430            for item in slice.iter() {
431                self.copy(item);
432            }
433        }
434        fn with_capacity(size: usize) -> Self {
435            Self::with_capacity(size)
436        }
437        fn reserve(&mut self, _additional: usize) {
438        }
439        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
440            let mut new = Self::default();
441            new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
442            new
443        }
444        fn index(&self, index: usize) -> Self::ReadItem<'_> {
445            &self[index]
446        }
447        fn len(&self) -> usize {
448            self[..].len()
449        }
450    }
451
452    /// A container that accepts slices `[B::Item]`.
453    pub struct SliceContainer<B> {
454        /// Offsets that bound each contained slice.
455        ///
456        /// The length will be one greater than the number of contained slices,
457        /// starting with zero and ending with `self.inner.len()`.
458        offsets: Vec<usize>,
459        /// An inner container for sequences of `B` that dereferences to a slice.
460        inner: Vec<B>,
461    }
462
463    impl<B> BatchContainer for SliceContainer<B>
464    where
465        B: Ord + Clone + Sized + 'static,
466    {
467        type PushItem = Vec<B>;
468        type ReadItem<'a> = &'a [B];
469        fn push(&mut self, item: Vec<B>) {
470            for x in item.into_iter() {
471                self.inner.push(x);
472            }
473            self.offsets.push(self.inner.len());
474        }
475        fn copy_push(&mut self, item: &Vec<B>) {
476            self.copy(&item[..]);
477        }
478        fn copy(&mut self, item: Self::ReadItem<'_>) {
479            for x in item.iter() {
480                self.inner.copy(x);
481            }
482            self.offsets.push(self.inner.len());
483        }
484        fn copy_slice(&mut self, slice: &[Vec<B>]) {
485            for item in slice {
486                self.copy(item);
487            }
488        }
489        fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
490            for index in start .. end {
491                self.copy(other.index(index));
492            }
493        }
494        fn with_capacity(size: usize) -> Self {
495            let mut offsets = Vec::with_capacity(size + 1);
496            offsets.push(0);
497            Self {
498                offsets,
499                inner: Vec::with_capacity(size),
500            }
501        }
502        fn reserve(&mut self, _additional: usize) {
503        }
504        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
505            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
506            offsets.push(0);
507            Self {
508                offsets,
509                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
510            }
511        }
512        fn index(&self, index: usize) -> Self::ReadItem<'_> {
513            let lower = self.offsets[index];
514            let upper = self.offsets[index+1];
515            &self.inner[lower .. upper]
516        }
517        fn len(&self) -> usize {
518            self.offsets.len() - 1
519        }
520    }
521
522    /// Default implementation introduces a first offset.
523    impl<B> Default for SliceContainer<B> {
524        fn default() -> Self {
525            Self {
526                offsets: vec![0],
527                inner: Default::default(),
528            }
529        }
530    }
531
532    /// A container that accepts slices `[B::Item]`.
533    pub struct SliceContainer2<B> {
534        text: String,
535        /// Offsets that bound each contained slice.
536        ///
537        /// The length will be one greater than the number of contained slices,
538        /// starting with zero and ending with `self.inner.len()`.
539        offsets: Vec<usize>,
540        /// An inner container for sequences of `B` that dereferences to a slice.
541        inner: Vec<B>,
542    }
543
544    /// Welcome to GATs!
545    pub struct Greetings<'a, B> {
546        /// Text that decorates the data.
547        pub text: Option<&'a str>,
548        /// The data itself.
549        pub slice: &'a [B],
550    }
551
552    impl<'a, B> Copy for Greetings<'a, B> { }
553    impl<'a, B> Clone for Greetings<'a, B> { 
554        fn clone(&self) -> Self { *self }
555    }
556
557    use std::cmp::Ordering;
558    impl<'a, 'b, B: Ord> PartialEq<Greetings<'a, B>> for Greetings<'b, B> {
559        fn eq(&self, other: &Greetings<'a, B>) -> bool {
560            self.slice.eq(other.slice)
561        }
562    }
563    impl<'a, B: Ord> Eq for Greetings<'a, B> { }
564    impl<'a, 'b, B: Ord> PartialOrd<Greetings<'a, B>> for Greetings<'b, B> {
565        fn partial_cmp(&self, other: &Greetings<'a, B>) -> Option<Ordering> {
566            self.slice.partial_cmp(other.slice)
567        }
568    }
569    impl<'a, B: Ord> Ord for Greetings<'a, B> {
570        fn cmp(&self, other: &Self) -> Ordering {
571            self.partial_cmp(other).unwrap()
572        }
573    }
574
575    impl<'a, B: Ord + Clone> MyTrait<'a> for Greetings<'a, B> {
576        type Owned = Vec<B>;
577        fn into_owned(self) -> Self::Owned { self.slice.to_vec() }
578        fn clone_onto(&self, other: &mut Self::Owned) { 
579            self.slice.clone_into(other);
580        }
581        fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { 
582            self.slice.cmp(&other[..])
583        }
584        fn borrow_as(other: &'a Self::Owned) -> Self {
585            Self {
586                text: None,
587                slice: &other[..],
588            }
589        }
590    }
591    
592    
593
594    impl<B> BatchContainer for SliceContainer2<B>
595    where
596        B: Ord + Clone + Sized + 'static,
597    {
598        type PushItem = Vec<B>;
599        type ReadItem<'a> = Greetings<'a, B>;
600        fn push(&mut self, item: Vec<B>) {
601            for x in item.into_iter() {
602                self.inner.push(x);
603            }
604            self.offsets.push(self.inner.len());
605        }
606        fn copy_push(&mut self, item: &Vec<B>) {
607            self.copy(<_ as MyTrait>::borrow_as(item));
608        }
609        fn copy(&mut self, item: Self::ReadItem<'_>) {
610            for x in item.slice.iter() {
611                self.inner.copy(x);
612            }
613            self.offsets.push(self.inner.len());
614        }
615        fn copy_slice(&mut self, slice: &[Vec<B>]) {
616            for item in slice {
617                self.copy_push(item);
618            }
619        }
620        fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
621            for index in start .. end {
622                self.copy(other.index(index));
623            }
624        }
625        fn with_capacity(size: usize) -> Self {
626            let mut offsets = Vec::with_capacity(size + 1);
627            offsets.push(0);
628            Self {
629                text: format!("Hello!"),
630                offsets,
631                inner: Vec::with_capacity(size),
632            }
633        }
634        fn reserve(&mut self, _additional: usize) {
635        }
636        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
637            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
638            offsets.push(0);
639            Self {
640                text: format!("Hello!"),
641                offsets,
642                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
643            }
644        }
645        fn index(&self, index: usize) -> Self::ReadItem<'_> {
646            let lower = self.offsets[index];
647            let upper = self.offsets[index+1];
648            Greetings {
649                text: Some(&self.text),
650                slice: &self.inner[lower .. upper],
651            }
652        }
653        fn len(&self) -> usize {
654            self.offsets.len() - 1
655        }
656    }
657
658    /// Default implementation introduces a first offset.
659    impl<B> Default for SliceContainer2<B> {
660        fn default() -> Self {
661            Self {
662                text: format!("Hello!"),
663                offsets: vec![0],
664                inner: Default::default(),
665            }
666        }
667    }
668}