differential_dataflow/trace/implementations/
mod.rs

1//! Implementations of `Trace` and associated traits.
2//!
3//! The `Trace` trait provides access to an ordered collection of `(key, val, time, diff)` tuples, but
4//! there is substantial flexibility in implementations of this trait. Depending on characteristics of
5//! the data, we may wish to represent the data in different ways. This module contains several of these
6//! implementations, and combiners for merging the results of different traces.
7//!
8//! As examples of implementations,
9//!
10//! *  The `trie` module is meant to represent general update tuples, with no particular assumptions made
11//!    about their contents. It organizes the data first by key, then by val, and then leaves the rest
12//!    in an unordered pile.
13//!
14//! *  The `keys` module is meant for collections whose value type is `()`, which is to say there is no
15//!    (key, val) structure on the records; all of them are just viewed as "keys".
16//!
17//! *  The `time` module is meant for collections with a single time value. This can remove repetition
18//!    from the representation, at the cost of requiring more instances and run-time merging.
19//!
20//! *  The `base` module is meant for collections with a single time value equivalent to the least time.
21//!    These collections must always accumulate to non-negative collections, and as such we can indicate
22//!    the frequency of an element by its multiplicity. This removes both the time and weight from the
23//!    representation, but is only appropriate for a subset (often substantial) of the data.
24//!
25//! Each of these representations is best suited for different data, but they can be combined to get the
26//! benefits of each, as appropriate. There are several `Cursor` combiners, `CursorList` and `CursorPair`,
27//! for homogeneous and inhomogeneous cursors, respectively.
28//!
29//! #Musings
30//!
31//! What is less clear is how to transfer updates between the representations at merge time in a tasteful
32//! way. Perhaps we could put an ordering on the representations, each pair with a dominant representation,
33//! and part of merging the latter filters updates into the former. Although back and forth might be
34//! appealing, more thinking is required to negotiate all of these policies.
35//!
36//! One option would be to require the layer builder to handle these smarts. Merging is currently done by
37//! the layer as part of custom code, but we could make it simply be "iterate through cursor, push results
38//! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite
39//! trace, rather than just a batch of the type merged.
40
41pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49// Opinionated takes on default spines.
50pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::convert::TryInto;
58
59use columnation::Columnation;
60use serde::{Deserialize, Serialize};
61use timely::container::{DrainContainer, PushInto};
62use timely::progress::Timestamp;
63
64use crate::containers::TimelyStack;
65use crate::lattice::Lattice;
66use crate::difference::Semigroup;
67
68/// A type that names constituent update types.
69pub trait Update {
70    /// Key by which data are grouped.
71    type Key: Ord + Clone + 'static;
72    /// Values associated with the key.
73    type Val: Ord + Clone + 'static;
74    /// Time at which updates occur.
75    type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
76    /// Way in which updates occur.
77    type Diff: Ord + Semigroup + 'static;
78}
79
80impl<K,V,T,R> Update for ((K, V), T, R)
81where
82    K: Ord+Clone+'static,
83    V: Ord+Clone+'static,
84    T: Ord+Clone+Lattice+timely::progress::Timestamp,
85    R: Ord+Semigroup+'static,
86{
87    type Key = K;
88    type Val = V;
89    type Time = T;
90    type Diff = R;
91}
92
93/// A type with opinions on how updates should be laid out.
94pub trait Layout {
95    /// Container for update keys.
96    type KeyContainer: BatchContainer;
97    /// Container for update vals.
98    type ValContainer: BatchContainer;
99    /// Container for times.
100    type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
101    /// Container for diffs.
102    type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
103    /// Container for offsets.
104    type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
105}
106
107/// A type bearing a layout.
108pub trait WithLayout {
109    /// The layout.
110    type Layout: Layout;
111}
112
113/// Automatically implemented trait for types with layouts.
114pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
115    /// Alias for an owned key of a layout.
116    type KeyOwn;
117    /// Alias for an borrowed key of a layout.
118    type Key<'a>: Copy + Ord;
119    /// Alias for an owned val of a layout.
120    type ValOwn: Clone + Ord;
121    /// Alias for an borrowed val of a layout.
122    type Val<'a>: Copy + Ord;
123    /// Alias for an owned time of a layout.
124    type Time: Lattice + timely::progress::Timestamp;
125    /// Alias for an borrowed time of a layout.
126    type TimeGat<'a>: Copy + Ord;
127    /// Alias for an owned diff of a layout.
128    type Diff: Semigroup + 'static;
129    /// Alias for an borrowed diff of a layout.
130    type DiffGat<'a>: Copy + Ord;
131
132    /// Container for update keys.
133    type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>, Owned = Self::KeyOwn>;
134    /// Container for update vals.
135    type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
136    /// Container for times.
137    type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
138    /// Container for diffs.
139    type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
140
141    /// Construct an owned key from a reference.
142    fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn;
143    /// Construct an owned val from a reference.
144    fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
145    /// Construct an owned time from a reference.
146    fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
147    /// Construct an owned diff from a reference.
148    fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
149
150    /// Clones a reference time onto an owned time.
151    fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
152}
153
154impl<L: WithLayout> LayoutExt for L {
155    type KeyOwn = <<L::Layout as Layout>::KeyContainer as BatchContainer>::Owned;
156    type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
157    type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
158    type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
159    type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
160    type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
161    type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
162    type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
163
164    type KeyContainer = <L::Layout as Layout>::KeyContainer;
165    type ValContainer = <L::Layout as Layout>::ValContainer;
166    type TimeContainer = <L::Layout as Layout>::TimeContainer;
167    type DiffContainer = <L::Layout as Layout>::DiffContainer;
168
169    #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { <Self::Layout as Layout>::KeyContainer::into_owned(key) }
170    #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
171    #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
172    #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
173    #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
174
175}
176
177// An easy way to provide an explicit layout: as a 5-tuple.
178// Valuable when one wants to perform layout surgery.
179impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
180where
181    KC: BatchContainer,
182    VC: BatchContainer,
183    TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
184    DC: BatchContainer<Owned: Semigroup>,
185    OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
186{
187    type KeyContainer = KC;
188    type ValContainer = VC;
189    type TimeContainer = TC;
190    type DiffContainer = DC;
191    type OffsetContainer = OC;
192}
193
194/// Aliases for types provided by the containers within a `Layout`.
195///
196/// For clarity, prefer `use layout;` and then `layout::Key<L>` to retain the layout context.
197pub mod layout {
198    use crate::trace::implementations::{BatchContainer, Layout};
199
200    /// Alias for an owned key of a layout.
201    pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
202    /// Alias for an borrowed key of a layout.
203    pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
204    /// Alias for an owned val of a layout.
205    pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
206    /// Alias for an borrowed val of a layout.
207    pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
208    /// Alias for an owned time of a layout.
209    pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
210    /// Alias for an borrowed time of a layout.
211    pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
212    /// Alias for an owned diff of a layout.
213    pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
214    /// Alias for an borrowed diff of a layout.
215    pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
216}
217
218/// A layout that uses vectors
219pub struct Vector<U: Update> {
220    phantom: std::marker::PhantomData<U>,
221}
222
223impl<U: Update<Diff: Ord>> Layout for Vector<U> {
224    type KeyContainer = Vec<U::Key>;
225    type ValContainer = Vec<U::Val>;
226    type TimeContainer = Vec<U::Time>;
227    type DiffContainer = Vec<U::Diff>;
228    type OffsetContainer = OffsetList;
229}
230
231/// A layout based on timely stacks
232pub struct TStack<U: Update> {
233    phantom: std::marker::PhantomData<U>,
234}
235
236impl<U> Layout for TStack<U>
237where
238    U: Update<
239        Key: Columnation,
240        Val: Columnation,
241        Time: Columnation,
242        Diff: Columnation + Ord,
243    >,
244{
245    type KeyContainer = TimelyStack<U::Key>;
246    type ValContainer = TimelyStack<U::Val>;
247    type TimeContainer = TimelyStack<U::Time>;
248    type DiffContainer = TimelyStack<U::Diff>;
249    type OffsetContainer = OffsetList;
250}
251
252/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
253#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
254pub struct OffsetList {
255    /// Length of a prefix of zero elements.
256    pub zero_prefix: usize,
257    /// Offsets that fit within a `u32`.
258    pub smol: Vec<u32>,
259    /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
260    pub chonk: Vec<u64>,
261}
262
263impl std::fmt::Debug for OffsetList {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        f.debug_list().entries(self.into_iter()).finish()
266    }
267}
268
269impl OffsetList {
270    /// Allocate a new list with a specified capacity.
271    pub fn with_capacity(cap: usize) -> Self {
272        Self {
273            zero_prefix: 0,
274            smol: Vec::with_capacity(cap),
275            chonk: Vec::new(),
276        }
277    }
278    /// Inserts the offset, as a `u32` if that is still on the table.
279    pub fn push(&mut self, offset: usize) {
280        if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
281            self.zero_prefix += 1;
282        }
283        else if self.chonk.is_empty() {
284            if let Ok(smol) = offset.try_into() {
285                self.smol.push(smol);
286            }
287            else {
288                self.chonk.push(offset.try_into().unwrap())
289            }
290        }
291        else {
292            self.chonk.push(offset.try_into().unwrap())
293        }
294    }
295    /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
296    pub fn index(&self, index: usize) -> usize {
297        if index < self.zero_prefix {
298            0
299        }
300        else if index - self.zero_prefix < self.smol.len() {
301            self.smol[index - self.zero_prefix].try_into().unwrap()
302        }
303        else {
304            self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
305        }
306    }
307    /// The number of offsets in the list.
308    pub fn len(&self) -> usize {
309        self.zero_prefix + self.smol.len() + self.chonk.len()
310    }
311}
312
313impl<'a> IntoIterator for &'a OffsetList {
314    type Item = usize;
315    type IntoIter = OffsetListIter<'a>;
316
317    fn into_iter(self) -> Self::IntoIter {
318        OffsetListIter {list: self, index: 0 }
319    }
320}
321
322/// An iterator for [`OffsetList`].
323pub struct OffsetListIter<'a> {
324    list: &'a OffsetList,
325    index: usize,
326}
327
328impl<'a> Iterator for OffsetListIter<'a> {
329    type Item = usize;
330
331    fn next(&mut self) -> Option<Self::Item> {
332        if self.index < self.list.len() {
333            let res = Some(self.list.index(self.index));
334            self.index += 1;
335            res
336        } else {
337            None
338        }
339    }
340}
341
342impl PushInto<usize> for OffsetList {
343    fn push_into(&mut self, item: usize) {
344        self.push(item);
345    }
346}
347
348impl BatchContainer for OffsetList {
349    type Owned = usize;
350    type ReadItem<'a> = usize;
351
352    #[inline(always)]
353    fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
354    #[inline(always)]
355    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
356
357    fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
358    fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
359
360    fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
361
362    fn with_capacity(size: usize) -> Self {
363        Self::with_capacity(size)
364    }
365
366    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
367        Self::with_capacity(cont1.len() + cont2.len())
368    }
369
370    fn index(&self, index: usize) -> Self::ReadItem<'_> {
371        self.index(index)
372    }
373
374    fn len(&self) -> usize {
375        self.len()
376    }
377}
378
379/// Behavior to split an update into principal components.
380pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
381    /// Key portion
382    type Key<'a>: Ord;
383    /// Value portion
384    type Val<'a>: Ord;
385    /// Time
386    type Time;
387    /// Diff
388    type Diff;
389
390    /// Split an item into separate parts.
391    fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
392
393    /// Test that the key equals a key in the layout's key container.
394    fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
395
396    /// Test that the value equals a key in the layout's value container.
397    fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
398
399    /// Count the number of distinct keys, (key, val) pairs, and total updates.
400    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
401}
402
403impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
404where
405    K: Ord + Clone + 'static,
406    KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
407    V: Ord + Clone + 'static,
408    VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
409    T: Timestamp + Lattice + Clone + 'static,
410    R: Ord + Semigroup + 'static,
411{
412    type Key<'a> = K;
413    type Val<'a> = V;
414    type Time = T;
415    type Diff = R;
416
417    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
418        (key, val, time, diff)
419    }
420
421    fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
422        KBC::reborrow(other) == this
423    }
424
425    fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
426        VBC::reborrow(other) == this
427    }
428
429    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
430        let mut keys = 0;
431        let mut vals = 0;
432        let mut upds = 0;
433        let mut prev_keyval = None;
434        for link in chain.iter() {
435            for ((key, val), _, _) in link.iter() {
436                if let Some((p_key, p_val)) = prev_keyval {
437                    if p_key != key {
438                        keys += 1;
439                        vals += 1;
440                    } else if p_val != val {
441                        vals += 1;
442                    }
443                } else {
444                    keys += 1;
445                    vals += 1;
446                }
447                upds += 1;
448                prev_keyval = Some((key, val));
449            }
450        }
451        (keys, vals, upds)
452    }
453}
454
455impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
456where
457    K: for<'a> BatchContainer<
458        ReadItem<'a>: PartialEq<&'a K::Owned>,
459        Owned: Ord + Columnation + Clone + 'static,
460    >,
461    V: for<'a> BatchContainer<
462        ReadItem<'a>: PartialEq<&'a V::Owned>,
463        Owned: Ord + Columnation + Clone + 'static,
464    >,
465    T: Timestamp + Lattice + Columnation + Clone + 'static,
466    R: Ord + Clone + Semigroup + Columnation + 'static,
467{
468    type Key<'a> = &'a K::Owned;
469    type Val<'a> = &'a V::Owned;
470    type Time = T;
471    type Diff = R;
472
473    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
474        (key, val, time.clone(), diff.clone())
475    }
476
477    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
478        K::reborrow(other) == *this
479    }
480
481    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
482        V::reborrow(other) == *this
483    }
484
485    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
486        let mut keys = 0;
487        let mut vals = 0;
488        let mut upds = 0;
489        let mut prev_keyval = None;
490        for link in chain.iter() {
491            for ((key, val), _, _) in link.iter() {
492                if let Some((p_key, p_val)) = prev_keyval {
493                    if p_key != key {
494                        keys += 1;
495                        vals += 1;
496                    } else if p_val != val {
497                        vals += 1;
498                    }
499                } else {
500                    keys += 1;
501                    vals += 1;
502                }
503                upds += 1;
504                prev_keyval = Some((key, val));
505            }
506        }
507        (keys, vals, upds)
508    }
509}
510
511pub use self::containers::{BatchContainer, SliceContainer};
512
513/// Containers for data that resemble `Vec<T>`, with leaner implementations.
514pub mod containers {
515
516    use columnation::Columnation;
517    use timely::container::PushInto;
518
519    use crate::containers::TimelyStack;
520
521    /// A general-purpose container resembling `Vec<T>`.
522    pub trait BatchContainer: 'static {
523        /// An owned instance of `Self::ReadItem<'_>`.
524        type Owned: Clone + Ord;
525
526        /// The type that can be read back out of the container.
527        type ReadItem<'a>: Copy + Ord;
528
529
530        /// Conversion from an instance of this type to the owned type.
531        #[must_use]
532        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
533        /// Clones `self` onto an existing instance of the owned type.
534        #[inline(always)]
535        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
536            *other = Self::into_owned(item);
537        }
538
539        /// Push an item into this container
540        fn push_ref(&mut self, item: Self::ReadItem<'_>);
541        /// Push an item into this container
542        fn push_own(&mut self, item: &Self::Owned);
543
544        /// Clears the container. May not release resources.
545        fn clear(&mut self);
546
547        /// Creates a new container with sufficient capacity.
548        fn with_capacity(size: usize) -> Self;
549        /// Creates a new container with sufficient capacity.
550        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
551
552        /// Converts a read item into one with a narrower lifetime.
553        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
554
555        /// Reference to the element at this position.
556        fn index(&self, index: usize) -> Self::ReadItem<'_>;
557
558        /// Reference to the element at this position, if it exists.
559        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
560            if index < self.len() {
561                Some(self.index(index))
562            }
563            else { None }
564        }
565
566        /// Number of contained elements
567        fn len(&self) -> usize;
568        /// Returns the last item if the container is non-empty.
569        fn last(&self) -> Option<Self::ReadItem<'_>> {
570            if self.len() > 0 {
571                Some(self.index(self.len()-1))
572            }
573            else {
574                None
575            }
576        }
577        /// Indicates if the length is zero.
578        fn is_empty(&self) -> bool { self.len() == 0 }
579
580        /// Reports the number of elements satisfying the predicate.
581        ///
582        /// This methods *relies strongly* on the assumption that the predicate
583        /// stays false once it becomes false, a joint property of the predicate
584        /// and the layout of `Self. This allows `advance` to use exponential search to
585        /// count the number of elements in time logarithmic in the result.
586        fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
587
588            let small_limit = 8;
589
590            // Exponential search if the answer isn't within `small_limit`.
591            if end > start + small_limit && function(self.index(start + small_limit)) {
592
593                // start with no advance
594                let mut index = small_limit + 1;
595                if start + index < end && function(self.index(start + index)) {
596
597                    // advance in exponentially growing steps.
598                    let mut step = 1;
599                    while start + index + step < end && function(self.index(start + index + step)) {
600                        index += step;
601                        step <<= 1;
602                    }
603
604                    // advance in exponentially shrinking steps.
605                    step >>= 1;
606                    while step > 0 {
607                        if start + index + step < end && function(self.index(start + index + step)) {
608                            index += step;
609                        }
610                        step >>= 1;
611                    }
612
613                    index += 1;
614                }
615
616                index
617            }
618            else {
619                let limit = std::cmp::min(end, start + small_limit);
620                (start .. limit).filter(|x| function(self.index(*x))).count()
621            }
622        }
623    }
624
625    // All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
626    // struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
627    impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
628        type Owned = T;
629        type ReadItem<'a> = &'a T;
630
631        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
632        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
633
634        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
635
636        fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
637        fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
638
639        fn clear(&mut self) { self.clear() }
640
641        fn with_capacity(size: usize) -> Self {
642            Vec::with_capacity(size)
643        }
644        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
645            Vec::with_capacity(cont1.len() + cont2.len())
646        }
647        fn index(&self, index: usize) -> Self::ReadItem<'_> {
648            &self[index]
649        }
650        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
651            <[T]>::get(&self, index)
652        }
653        fn len(&self) -> usize {
654            self[..].len()
655        }
656    }
657
658    // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
659    // be presented with the actual contained type, rather than a type that borrows into it.
660    impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
661        type Owned = T;
662        type ReadItem<'a> = &'a T;
663
664        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
665        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
666
667        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
668
669        fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
670        fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
671
672        fn clear(&mut self) { self.clear() }
673
674        fn with_capacity(size: usize) -> Self {
675            Self::with_capacity(size)
676        }
677        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
678            let mut new = Self::default();
679            new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
680            new
681        }
682        fn index(&self, index: usize) -> Self::ReadItem<'_> {
683            &self[index]
684        }
685        fn len(&self) -> usize {
686            self[..].len()
687        }
688    }
689
690    /// A container that accepts slices `[B::Item]`.
691    pub struct SliceContainer<B> {
692        /// Offsets that bound each contained slice.
693        ///
694        /// The length will be one greater than the number of contained slices,
695        /// starting with zero and ending with `self.inner.len()`.
696        offsets: Vec<usize>,
697        /// An inner container for sequences of `B` that dereferences to a slice.
698        inner: Vec<B>,
699    }
700
701    impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
702        fn push_into(&mut self, item: &[B]) {
703            for x in item.iter() {
704                self.inner.push_into(x);
705            }
706            self.offsets.push(self.inner.len());
707        }
708    }
709
710    impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
711        fn push_into(&mut self, item: &Vec<B>) {
712            self.push_into(&item[..]);
713        }
714    }
715
716    impl<B> BatchContainer for SliceContainer<B>
717    where
718        B: Ord + Clone + Sized + 'static,
719    {
720        type Owned = Vec<B>;
721        type ReadItem<'a> = &'a [B];
722
723        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
724        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
725
726        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
727
728        fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
729        fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
730
731        fn clear(&mut self) {
732            self.offsets.clear();
733            self.offsets.push(0);
734            self.inner.clear();
735        }
736
737        fn with_capacity(size: usize) -> Self {
738            let mut offsets = Vec::with_capacity(size + 1);
739            offsets.push(0);
740            Self {
741                offsets,
742                inner: Vec::with_capacity(size),
743            }
744        }
745        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
746            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
747            offsets.push(0);
748            Self {
749                offsets,
750                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
751            }
752        }
753        fn index(&self, index: usize) -> Self::ReadItem<'_> {
754            let lower = self.offsets[index];
755            let upper = self.offsets[index+1];
756            &self.inner[lower .. upper]
757        }
758        fn len(&self) -> usize {
759            self.offsets.len() - 1
760        }
761    }
762
763    /// Default implementation introduces a first offset.
764    impl<B> Default for SliceContainer<B> {
765        fn default() -> Self {
766            Self {
767                offsets: vec![0],
768                inner: Default::default(),
769            }
770        }
771    }
772}