Skip to main content

palimpsest_dataflow/trace/implementations/
mod.rs

1//! Implementations of `Trace` and associated traits.
2//!
3//! The `Trace` trait provides access to an ordered collection of `(key, val, time, diff)` tuples, but
4//! there is substantial flexibility in implementations of this trait. Depending on characteristics of
5//! the data, we may wish to represent the data in different ways. This module contains several of these
6//! implementations, and combiners for merging the results of different traces.
7//!
8//! As examples of implementations,
9//!
10//! *  The `trie` module is meant to represent general update tuples, with no particular assumptions made
11//!    about their contents. It organizes the data first by key, then by val, and then leaves the rest
12//!    in an unordered pile.
13//!
14//! *  The `keys` module is meant for collections whose value type is `()`, which is to say there is no
15//!    (key, val) structure on the records; all of them are just viewed as "keys".
16//!
17//! *  The `time` module is meant for collections with a single time value. This can remove repetition
18//!    from the representation, at the cost of requiring more instances and run-time merging.
19//!
20//! *  The `base` module is meant for collections with a single time value equivalent to the least time.
21//!    These collections must always accumulate to non-negative collections, and as such we can indicate
22//!    the frequency of an element by its multiplicity. This removes both the time and weight from the
23//!    representation, but is only appropriate for a subset (often substantial) of the data.
24//!
25//! Each of these representations is best suited for different data, but they can be combined to get the
26//! benefits of each, as appropriate. There are several `Cursor` combiners, `CursorList` and `CursorPair`,
27//! for homogeneous and inhomogeneous cursors, respectively.
28//!
29//! #Musings
30//!
31//! What is less clear is how to transfer updates between the representations at merge time in a tasteful
32//! way. Perhaps we could put an ordering on the representations, each pair with a dominant representation,
33//! and part of merging the latter filters updates into the former. Although back and forth might be
34//! appealing, more thinking is required to negotiate all of these policies.
35//!
36//! One option would be to require the layer builder to handle these smarts. Merging is currently done by
37//! the layer as part of custom code, but we could make it simply be "iterate through cursor, push results
38//! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite
39//! trace, rather than just a batch of the type merged.
40
41pub mod spine_fueled;
42
43pub mod chainless_batcher;
44pub mod chunker;
45pub mod huffman_container;
46pub mod merge_batcher;
47pub mod ord_neu;
48pub mod rhh;
49
50// Opinionated takes on default spines.
51pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
52pub use self::ord_neu::OrdKeySpine as KeySpine;
53pub use self::ord_neu::OrdValBatcher as ValBatcher;
54pub use self::ord_neu::OrdValSpine as ValSpine;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
57
58use std::convert::TryInto;
59
60use columnation::Columnation;
61use serde::{Deserialize, Serialize};
62use timely::container::{DrainContainer, PushInto};
63use timely::progress::Timestamp;
64
65use crate::containers::TimelyStack;
66use crate::difference::Semigroup;
67use crate::lattice::Lattice;
68
69/// A type that names constituent update types.
70pub trait Update {
71    /// Key by which data are grouped.
72    type Key: Ord + Clone + 'static;
73    /// Values associated with the key.
74    type Val: Ord + Clone + 'static;
75    /// Time at which updates occur.
76    type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
77    /// Way in which updates occur.
78    type Diff: Ord + Semigroup + 'static;
79}
80
81impl<K, V, T, R> Update for ((K, V), T, R)
82where
83    K: Ord + Clone + 'static,
84    V: Ord + Clone + 'static,
85    T: Ord + Clone + Lattice + timely::progress::Timestamp,
86    R: Ord + Semigroup + 'static,
87{
88    type Key = K;
89    type Val = V;
90    type Time = T;
91    type Diff = R;
92}
93
94/// A type with opinions on how updates should be laid out.
95pub trait Layout {
96    /// Container for update keys.
97    type KeyContainer: BatchContainer;
98    /// Container for update vals.
99    type ValContainer: BatchContainer;
100    /// Container for times.
101    type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
102    /// Container for diffs.
103    type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
104    /// Container for offsets.
105    type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
106}
107
108/// A type bearing a layout.
109pub trait WithLayout {
110    /// The layout.
111    type Layout: Layout;
112}
113
114/// Automatically implemented trait for types with layouts.
115pub trait LayoutExt:
116    WithLayout<
117    Layout: Layout<
118        KeyContainer = Self::KeyContainer,
119        ValContainer = Self::ValContainer,
120        TimeContainer = Self::TimeContainer,
121        DiffContainer = Self::DiffContainer,
122    >,
123>
124{
125    /// Alias for an owned key of a layout.
126    type KeyOwn;
127    /// Alias for an borrowed key of a layout.
128    type Key<'a>: Copy + Ord;
129    /// Alias for an owned val of a layout.
130    type ValOwn: Clone + Ord;
131    /// Alias for an borrowed val of a layout.
132    type Val<'a>: Copy + Ord;
133    /// Alias for an owned time of a layout.
134    type Time: Lattice + timely::progress::Timestamp;
135    /// Alias for an borrowed time of a layout.
136    type TimeGat<'a>: Copy + Ord;
137    /// Alias for an owned diff of a layout.
138    type Diff: Semigroup + 'static;
139    /// Alias for an borrowed diff of a layout.
140    type DiffGat<'a>: Copy + Ord;
141
142    /// Container for update keys.
143    type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>, Owned = Self::KeyOwn>;
144    /// Container for update vals.
145    type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
146    /// Container for times.
147    type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
148    /// Container for diffs.
149    type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
150
151    /// Construct an owned key from a reference.
152    fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn;
153    /// Construct an owned val from a reference.
154    fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
155    /// Construct an owned time from a reference.
156    fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
157    /// Construct an owned diff from a reference.
158    fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
159
160    /// Clones a reference time onto an owned time.
161    fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
162}
163
164impl<L: WithLayout> LayoutExt for L {
165    type KeyOwn = <<L::Layout as Layout>::KeyContainer as BatchContainer>::Owned;
166    type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
167    type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
168    type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
169    type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
170    type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
171    type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
172    type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
173
174    type KeyContainer = <L::Layout as Layout>::KeyContainer;
175    type ValContainer = <L::Layout as Layout>::ValContainer;
176    type TimeContainer = <L::Layout as Layout>::TimeContainer;
177    type DiffContainer = <L::Layout as Layout>::DiffContainer;
178
179    #[inline(always)]
180    fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn {
181        <Self::Layout as Layout>::KeyContainer::into_owned(key)
182    }
183    #[inline(always)]
184    fn owned_val(val: Self::Val<'_>) -> Self::ValOwn {
185        <Self::Layout as Layout>::ValContainer::into_owned(val)
186    }
187    #[inline(always)]
188    fn owned_time(time: Self::TimeGat<'_>) -> Self::Time {
189        <Self::Layout as Layout>::TimeContainer::into_owned(time)
190    }
191    #[inline(always)]
192    fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff {
193        <Self::Layout as Layout>::DiffContainer::into_owned(diff)
194    }
195    #[inline(always)]
196    fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) {
197        <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto)
198    }
199}
200
201// An easy way to provide an explicit layout: as a 5-tuple.
202// Valuable when one wants to perform layout surgery.
203impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
204where
205    KC: BatchContainer,
206    VC: BatchContainer,
207    TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
208    DC: BatchContainer<Owned: Semigroup>,
209    OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
210{
211    type KeyContainer = KC;
212    type ValContainer = VC;
213    type TimeContainer = TC;
214    type DiffContainer = DC;
215    type OffsetContainer = OC;
216}
217
218/// Aliases for types provided by the containers within a `Layout`.
219///
220/// For clarity, prefer `use layout;` and then `layout::Key<L>` to retain the layout context.
221pub mod layout {
222    use crate::trace::implementations::{BatchContainer, Layout};
223
224    /// Alias for an owned key of a layout.
225    pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
226    /// Alias for an borrowed key of a layout.
227    pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
228    /// Alias for an owned val of a layout.
229    pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
230    /// Alias for an borrowed val of a layout.
231    pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
232    /// Alias for an owned time of a layout.
233    pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
234    /// Alias for an borrowed time of a layout.
235    pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
236    /// Alias for an owned diff of a layout.
237    pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
238    /// Alias for an borrowed diff of a layout.
239    pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
240}
241
242/// A layout that uses vectors
243pub struct Vector<U: Update> {
244    phantom: std::marker::PhantomData<U>,
245}
246
247impl<U: Update<Diff: Ord>> Layout for Vector<U> {
248    type KeyContainer = Vec<U::Key>;
249    type ValContainer = Vec<U::Val>;
250    type TimeContainer = Vec<U::Time>;
251    type DiffContainer = Vec<U::Diff>;
252    type OffsetContainer = OffsetList;
253}
254
255/// A layout based on timely stacks
256pub struct TStack<U: Update> {
257    phantom: std::marker::PhantomData<U>,
258}
259
260impl<U> Layout for TStack<U>
261where
262    U: Update<Key: Columnation, Val: Columnation, Time: Columnation, Diff: Columnation + Ord>,
263{
264    type KeyContainer = TimelyStack<U::Key>;
265    type ValContainer = TimelyStack<U::Val>;
266    type TimeContainer = TimelyStack<U::Time>;
267    type DiffContainer = TimelyStack<U::Diff>;
268    type OffsetContainer = OffsetList;
269}
270
271/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
272#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
273pub struct OffsetList {
274    /// Length of a prefix of zero elements.
275    pub zero_prefix: usize,
276    /// Offsets that fit within a `u32`.
277    pub smol: Vec<u32>,
278    /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
279    pub chonk: Vec<u64>,
280}
281
282impl std::fmt::Debug for OffsetList {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        f.debug_list().entries(self.into_iter()).finish()
285    }
286}
287
288impl OffsetList {
289    /// Allocate a new list with a specified capacity.
290    pub fn with_capacity(cap: usize) -> Self {
291        Self {
292            zero_prefix: 0,
293            smol: Vec::with_capacity(cap),
294            chonk: Vec::new(),
295        }
296    }
297    /// Inserts the offset, as a `u32` if that is still on the table.
298    pub fn push(&mut self, offset: usize) {
299        if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
300            self.zero_prefix += 1;
301        } else if self.chonk.is_empty() {
302            if let Ok(smol) = offset.try_into() {
303                self.smol.push(smol);
304            } else {
305                self.chonk.push(offset.try_into().unwrap())
306            }
307        } else {
308            self.chonk.push(offset.try_into().unwrap())
309        }
310    }
311    /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
312    pub fn index(&self, index: usize) -> usize {
313        if index < self.zero_prefix {
314            0
315        } else if index - self.zero_prefix < self.smol.len() {
316            self.smol[index - self.zero_prefix].try_into().unwrap()
317        } else {
318            self.chonk[index - self.zero_prefix - self.smol.len()]
319                .try_into()
320                .unwrap()
321        }
322    }
323    /// The number of offsets in the list.
324    pub fn len(&self) -> usize {
325        self.zero_prefix + self.smol.len() + self.chonk.len()
326    }
327}
328
329impl<'a> IntoIterator for &'a OffsetList {
330    type Item = usize;
331    type IntoIter = OffsetListIter<'a>;
332
333    fn into_iter(self) -> Self::IntoIter {
334        OffsetListIter {
335            list: self,
336            index: 0,
337        }
338    }
339}
340
341/// An iterator for [`OffsetList`].
342pub struct OffsetListIter<'a> {
343    list: &'a OffsetList,
344    index: usize,
345}
346
347impl<'a> Iterator for OffsetListIter<'a> {
348    type Item = usize;
349
350    fn next(&mut self) -> Option<Self::Item> {
351        if self.index < self.list.len() {
352            let res = Some(self.list.index(self.index));
353            self.index += 1;
354            res
355        } else {
356            None
357        }
358    }
359}
360
361impl PushInto<usize> for OffsetList {
362    fn push_into(&mut self, item: usize) {
363        self.push(item);
364    }
365}
366
367impl BatchContainer for OffsetList {
368    type Owned = usize;
369    type ReadItem<'a> = usize;
370
371    #[inline(always)]
372    fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
373        item
374    }
375    #[inline(always)]
376    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
377        item
378    }
379
380    fn push_ref(&mut self, item: Self::ReadItem<'_>) {
381        self.push_into(item)
382    }
383    fn push_own(&mut self, item: &Self::Owned) {
384        self.push_into(*item)
385    }
386
387    fn clear(&mut self) {
388        self.zero_prefix = 0;
389        self.smol.clear();
390        self.chonk.clear();
391    }
392
393    fn with_capacity(size: usize) -> Self {
394        Self::with_capacity(size)
395    }
396
397    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
398        Self::with_capacity(cont1.len() + cont2.len())
399    }
400
401    fn index(&self, index: usize) -> Self::ReadItem<'_> {
402        self.index(index)
403    }
404
405    fn len(&self) -> usize {
406        self.len()
407    }
408}
409
410/// Behavior to split an update into principal components.
411pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
412    /// Key portion
413    type Key<'a>: Ord;
414    /// Value portion
415    type Val<'a>: Ord;
416    /// Time
417    type Time;
418    /// Diff
419    type Diff;
420
421    /// Split an item into separate parts.
422    fn into_parts<'a>(
423        item: Self::Item<'a>,
424    ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
425
426    /// Test that the key equals a key in the layout's key container.
427    fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
428
429    /// Test that the value equals a key in the layout's value container.
430    fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
431
432    /// Count the number of distinct keys, (key, val) pairs, and total updates.
433    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
434}
435
436impl<K, KBC, V, VBC, T, R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
437where
438    K: Ord + Clone + 'static,
439    KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
440    V: Ord + Clone + 'static,
441    VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
442    T: Timestamp + Lattice + Clone + 'static,
443    R: Ord + Semigroup + 'static,
444{
445    type Key<'a> = K;
446    type Val<'a> = V;
447    type Time = T;
448    type Diff = R;
449
450    fn into_parts<'a>(
451        ((key, val), time, diff): Self::Item<'a>,
452    ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
453        (key, val, time, diff)
454    }
455
456    fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
457        KBC::reborrow(other) == this
458    }
459
460    fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
461        VBC::reborrow(other) == this
462    }
463
464    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
465        let mut keys = 0;
466        let mut vals = 0;
467        let mut upds = 0;
468        let mut prev_keyval = None;
469        for link in chain.iter() {
470            for ((key, val), _, _) in link.iter() {
471                if let Some((p_key, p_val)) = prev_keyval {
472                    if p_key != key {
473                        keys += 1;
474                        vals += 1;
475                    } else if p_val != val {
476                        vals += 1;
477                    }
478                } else {
479                    keys += 1;
480                    vals += 1;
481                }
482                upds += 1;
483                prev_keyval = Some((key, val));
484            }
485        }
486        (keys, vals, upds)
487    }
488}
489
490impl<K, V, T, R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
491where
492    K: for<'a> BatchContainer<
493        ReadItem<'a>: PartialEq<&'a K::Owned>,
494        Owned: Ord + Columnation + Clone + 'static,
495    >,
496    V: for<'a> BatchContainer<
497        ReadItem<'a>: PartialEq<&'a V::Owned>,
498        Owned: Ord + Columnation + Clone + 'static,
499    >,
500    T: Timestamp + Lattice + Columnation + Clone + 'static,
501    R: Ord + Clone + Semigroup + Columnation + 'static,
502{
503    type Key<'a> = &'a K::Owned;
504    type Val<'a> = &'a V::Owned;
505    type Time = T;
506    type Diff = R;
507
508    fn into_parts<'a>(
509        ((key, val), time, diff): Self::Item<'a>,
510    ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
511        (key, val, time.clone(), diff.clone())
512    }
513
514    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
515        K::reborrow(other) == *this
516    }
517
518    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
519        V::reborrow(other) == *this
520    }
521
522    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
523        let mut keys = 0;
524        let mut vals = 0;
525        let mut upds = 0;
526        let mut prev_keyval = None;
527        for link in chain.iter() {
528            for ((key, val), _, _) in link.iter() {
529                if let Some((p_key, p_val)) = prev_keyval {
530                    if p_key != key {
531                        keys += 1;
532                        vals += 1;
533                    } else if p_val != val {
534                        vals += 1;
535                    }
536                } else {
537                    keys += 1;
538                    vals += 1;
539                }
540                upds += 1;
541                prev_keyval = Some((key, val));
542            }
543        }
544        (keys, vals, upds)
545    }
546}
547
548pub use self::containers::{BatchContainer, SliceContainer};
549
550/// Containers for data that resemble `Vec<T>`, with leaner implementations.
551pub mod containers {
552
553    use columnation::Columnation;
554    use timely::container::PushInto;
555
556    use crate::containers::TimelyStack;
557
558    /// A general-purpose container resembling `Vec<T>`.
559    pub trait BatchContainer: 'static {
560        /// An owned instance of `Self::ReadItem<'_>`.
561        type Owned: Clone + Ord;
562
563        /// The type that can be read back out of the container.
564        type ReadItem<'a>: Copy + Ord;
565
566        /// Conversion from an instance of this type to the owned type.
567        #[must_use]
568        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
569        /// Clones `self` onto an existing instance of the owned type.
570        #[inline(always)]
571        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
572            *other = Self::into_owned(item);
573        }
574
575        /// Push an item into this container
576        fn push_ref(&mut self, item: Self::ReadItem<'_>);
577        /// Push an item into this container
578        fn push_own(&mut self, item: &Self::Owned);
579
580        /// Clears the container. May not release resources.
581        fn clear(&mut self);
582
583        /// Creates a new container with sufficient capacity.
584        fn with_capacity(size: usize) -> Self;
585        /// Creates a new container with sufficient capacity.
586        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
587
588        /// Converts a read item into one with a narrower lifetime.
589        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
590
591        /// Reference to the element at this position.
592        fn index(&self, index: usize) -> Self::ReadItem<'_>;
593
594        /// Reference to the element at this position, if it exists.
595        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
596            if index < self.len() {
597                Some(self.index(index))
598            } else {
599                None
600            }
601        }
602
603        /// Number of contained elements
604        fn len(&self) -> usize;
605        /// Returns the last item if the container is non-empty.
606        fn last(&self) -> Option<Self::ReadItem<'_>> {
607            if self.len() > 0 {
608                Some(self.index(self.len() - 1))
609            } else {
610                None
611            }
612        }
613        /// Indicates if the length is zero.
614        fn is_empty(&self) -> bool {
615            self.len() == 0
616        }
617
618        /// Reports the number of elements satisfying the predicate.
619        ///
620        /// This methods *relies strongly* on the assumption that the predicate
621        /// stays false once it becomes false, a joint property of the predicate
622        /// and the layout of `Self. This allows `advance` to use exponential search to
623        /// count the number of elements in time logarithmic in the result.
624        fn advance<F: for<'a> Fn(Self::ReadItem<'a>) -> bool>(
625            &self,
626            start: usize,
627            end: usize,
628            function: F,
629        ) -> usize {
630            let small_limit = 8;
631
632            // Exponential search if the answer isn't within `small_limit`.
633            if end > start + small_limit && function(self.index(start + small_limit)) {
634                // start with no advance
635                let mut index = small_limit + 1;
636                if start + index < end && function(self.index(start + index)) {
637                    // advance in exponentially growing steps.
638                    let mut step = 1;
639                    while start + index + step < end && function(self.index(start + index + step)) {
640                        index += step;
641                        step <<= 1;
642                    }
643
644                    // advance in exponentially shrinking steps.
645                    step >>= 1;
646                    while step > 0 {
647                        if start + index + step < end && function(self.index(start + index + step))
648                        {
649                            index += step;
650                        }
651                        step >>= 1;
652                    }
653
654                    index += 1;
655                }
656
657                index
658            } else {
659                let limit = std::cmp::min(end, start + small_limit);
660                (start..limit).filter(|x| function(self.index(*x))).count()
661            }
662        }
663    }
664
665    // All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
666    // struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
667    impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
668        type Owned = T;
669        type ReadItem<'a> = &'a T;
670
671        #[inline(always)]
672        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
673            item.clone()
674        }
675        #[inline(always)]
676        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
677            other.clone_from(item);
678        }
679
680        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
681            item
682        }
683
684        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
685            self.push_into(item)
686        }
687        fn push_own(&mut self, item: &Self::Owned) {
688            self.push_into(item.clone())
689        }
690
691        fn clear(&mut self) {
692            self.clear()
693        }
694
695        fn with_capacity(size: usize) -> Self {
696            Vec::with_capacity(size)
697        }
698        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
699            Vec::with_capacity(cont1.len() + cont2.len())
700        }
701        fn index(&self, index: usize) -> Self::ReadItem<'_> {
702            &self[index]
703        }
704        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
705            <[T]>::get(&self, index)
706        }
707        fn len(&self) -> usize {
708            self[..].len()
709        }
710    }
711
712    // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
713    // be presented with the actual contained type, rather than a type that borrows into it.
714    impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
715        type Owned = T;
716        type ReadItem<'a> = &'a T;
717
718        #[inline(always)]
719        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
720            item.clone()
721        }
722        #[inline(always)]
723        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
724            other.clone_from(item);
725        }
726
727        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
728            item
729        }
730
731        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
732            self.push_into(item)
733        }
734        fn push_own(&mut self, item: &Self::Owned) {
735            self.push_into(item)
736        }
737
738        fn clear(&mut self) {
739            self.clear()
740        }
741
742        fn with_capacity(size: usize) -> Self {
743            Self::with_capacity(size)
744        }
745        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
746            let mut new = Self::default();
747            new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
748            new
749        }
750        fn index(&self, index: usize) -> Self::ReadItem<'_> {
751            &self[index]
752        }
753        fn len(&self) -> usize {
754            self[..].len()
755        }
756    }
757
758    /// A container that accepts slices `[B::Item]`.
759    pub struct SliceContainer<B> {
760        /// Offsets that bound each contained slice.
761        ///
762        /// The length will be one greater than the number of contained slices,
763        /// starting with zero and ending with `self.inner.len()`.
764        offsets: Vec<usize>,
765        /// An inner container for sequences of `B` that dereferences to a slice.
766        inner: Vec<B>,
767    }
768
769    impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
770        fn push_into(&mut self, item: &[B]) {
771            for x in item.iter() {
772                self.inner.push_into(x);
773            }
774            self.offsets.push(self.inner.len());
775        }
776    }
777
778    impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
779        fn push_into(&mut self, item: &Vec<B>) {
780            self.push_into(&item[..]);
781        }
782    }
783
784    impl<B> BatchContainer for SliceContainer<B>
785    where
786        B: Ord + Clone + Sized + 'static,
787    {
788        type Owned = Vec<B>;
789        type ReadItem<'a> = &'a [B];
790
791        #[inline(always)]
792        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
793            item.to_vec()
794        }
795        #[inline(always)]
796        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
797            other.clone_from_slice(item);
798        }
799
800        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
801            item
802        }
803
804        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
805            self.push_into(item)
806        }
807        fn push_own(&mut self, item: &Self::Owned) {
808            self.push_into(item)
809        }
810
811        fn clear(&mut self) {
812            self.offsets.clear();
813            self.offsets.push(0);
814            self.inner.clear();
815        }
816
817        fn with_capacity(size: usize) -> Self {
818            let mut offsets = Vec::with_capacity(size + 1);
819            offsets.push(0);
820            Self {
821                offsets,
822                inner: Vec::with_capacity(size),
823            }
824        }
825        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
826            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
827            offsets.push(0);
828            Self {
829                offsets,
830                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
831            }
832        }
833        fn index(&self, index: usize) -> Self::ReadItem<'_> {
834            let lower = self.offsets[index];
835            let upper = self.offsets[index + 1];
836            &self.inner[lower..upper]
837        }
838        fn len(&self) -> usize {
839            self.offsets.len() - 1
840        }
841    }
842
843    /// Default implementation introduces a first offset.
844    impl<B> Default for SliceContainer<B> {
845        fn default() -> Self {
846            Self {
847                offsets: vec![0],
848                inner: Default::default(),
849            }
850        }
851    }
852}