Skip to main content

palimpsest_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::container::{ContainerBuilder, PushInto};
16use timely::progress::frontier::AntichainRef;
17use timely::progress::{frontier::Antichain, Timestamp};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22/// Creates batches from containers of unordered tuples.
23///
24/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
25/// and must produce outputs of type `M::Chunk`.
26pub struct MergeBatcher<Input, C, M: Merger> {
27    /// Transforms input streams to chunks of sorted, consolidated data.
28    chunker: C,
29    /// A sequence of power-of-two length lists of sorted, consolidated containers.
30    ///
31    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
32    chains: Vec<Vec<M::Chunk>>,
33    /// Stash of empty chunks, recycled through the merging process.
34    stash: Vec<M::Chunk>,
35    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
36    merger: M,
37    /// Current lower frontier, we sealed up to here.
38    lower: Antichain<M::Time>,
39    /// The lower-bound frontier of the data, after the last call to seal.
40    frontier: Antichain<M::Time>,
41    /// Logger for size accounting.
42    logger: Option<Logger>,
43    /// Timely operator ID.
44    operator_id: usize,
45    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
46    _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51    C: ContainerBuilder<Container = M::Chunk> + for<'a> PushInto<&'a mut Input>,
52    M: Merger<Time: Timestamp>,
53{
54    type Input = Input;
55    type Time = M::Time;
56    type Output = M::Chunk;
57
58    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59        Self {
60            logger,
61            operator_id,
62            chunker: C::default(),
63            merger: M::default(),
64            chains: Vec::new(),
65            stash: Vec::new(),
66            frontier: Antichain::new(),
67            lower: Antichain::from_elem(M::Time::minimum()),
68            _marker: PhantomData,
69        }
70    }
71
72    /// Push a container of data into this merge batcher. Updates the internal chain structure if
73    /// needed.
74    fn push_container(&mut self, container: &mut Input) {
75        self.chunker.push_into(container);
76        while let Some(chunk) = self.chunker.extract() {
77            let chunk = std::mem::take(chunk);
78            self.insert_chain(vec![chunk]);
79        }
80    }
81
82    // Sealing a batch means finding those updates with times not greater or equal to any time
83    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
84    // which we call `lower`, by assumption that after sealing a batcher we receive no more
85    // updates with times not greater or equal to `upper`.
86    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
87        &mut self,
88        upper: Antichain<M::Time>,
89    ) -> B::Output {
90        // Finish
91        while let Some(chunk) = self.chunker.finish() {
92            let chunk = std::mem::take(chunk);
93            self.insert_chain(vec![chunk]);
94        }
95
96        // Merge all remaining chains into a single chain.
97        while self.chains.len() > 1 {
98            let list1 = self.chain_pop().unwrap();
99            let list2 = self.chain_pop().unwrap();
100            let merged = self.merge_by(list1, list2);
101            self.chain_push(merged);
102        }
103        let merged = self.chain_pop().unwrap_or_default();
104
105        // Extract readied data.
106        let mut kept = Vec::new();
107        let mut readied = Vec::new();
108        self.frontier.clear();
109
110        self.merger.extract(
111            merged,
112            upper.borrow(),
113            &mut self.frontier,
114            &mut readied,
115            &mut kept,
116            &mut self.stash,
117        );
118
119        if !kept.is_empty() {
120            self.chain_push(kept);
121        }
122
123        self.stash.clear();
124
125        let description = Description::new(
126            self.lower.clone(),
127            upper.clone(),
128            Antichain::from_elem(M::Time::minimum()),
129        );
130        let seal = B::seal(&mut readied, description);
131        self.lower = upper;
132        seal
133    }
134
135    /// The frontier of elements remaining after the most recent call to `self.seal`.
136    #[inline]
137    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
138        self.frontier.borrow()
139    }
140}
141
142impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
143    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
144    /// by decreasing length.
145    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
146        if !chain.is_empty() {
147            self.chain_push(chain);
148            while self.chains.len() > 1
149                && (self.chains[self.chains.len() - 1].len()
150                    >= self.chains[self.chains.len() - 2].len() / 2)
151            {
152                let list1 = self.chain_pop().unwrap();
153                let list2 = self.chain_pop().unwrap();
154                let merged = self.merge_by(list1, list2);
155                self.chain_push(merged);
156            }
157        }
158    }
159
160    // merges two sorted input lists into one sorted output list.
161    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
162        // TODO: `list1` and `list2` get dropped; would be better to reuse?
163        let mut output = Vec::with_capacity(list1.len() + list2.len());
164        self.merger
165            .merge(list1, list2, &mut output, &mut self.stash);
166
167        output
168    }
169
170    /// Pop a chain and account size changes.
171    #[inline]
172    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
173        let chain = self.chains.pop();
174        self.account(chain.iter().flatten().map(M::account), -1);
175        chain
176    }
177
178    /// Push a chain and account size changes.
179    #[inline]
180    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
181        self.account(chain.iter().map(M::account), 1);
182        self.chains.push(chain);
183    }
184
185    /// Account size changes. Only performs work if a logger exists.
186    ///
187    /// Calculate the size based on the iterator passed along, with each attribute
188    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
189    #[inline]
190    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
191        if let Some(logger) = &self.logger {
192            let (mut records, mut size, mut capacity, mut allocations) =
193                (0isize, 0isize, 0isize, 0isize);
194            for (records_, size_, capacity_, allocations_) in items {
195                records = records.saturating_add_unsigned(records_);
196                size = size.saturating_add_unsigned(size_);
197                capacity = capacity.saturating_add_unsigned(capacity_);
198                allocations = allocations.saturating_add_unsigned(allocations_);
199            }
200            logger.log(BatcherEvent {
201                operator: self.operator_id,
202                records_diff: records * diff,
203                size_diff: size * diff,
204                capacity_diff: capacity * diff,
205                allocations_diff: allocations * diff,
206            })
207        }
208    }
209}
210
211impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
212    fn drop(&mut self) {
213        // Cleanup chain to retract accounting information.
214        while self.chain_pop().is_some() {}
215    }
216}
217
218/// A trait to describe interesting moments in a merge batcher.
219pub trait Merger: Default {
220    /// The internal representation of chunks of data.
221    type Chunk: Default;
222    /// The type of time in frontiers to extract updates.
223    type Time;
224    /// Merge chains into an output chain.
225    fn merge(
226        &mut self,
227        list1: Vec<Self::Chunk>,
228        list2: Vec<Self::Chunk>,
229        output: &mut Vec<Self::Chunk>,
230        stash: &mut Vec<Self::Chunk>,
231    );
232    /// Extract ready updates based on the `upper` frontier.
233    fn extract(
234        &mut self,
235        merged: Vec<Self::Chunk>,
236        upper: AntichainRef<Self::Time>,
237        frontier: &mut Antichain<Self::Time>,
238        readied: &mut Vec<Self::Chunk>,
239        kept: &mut Vec<Self::Chunk>,
240        stash: &mut Vec<Self::Chunk>,
241    );
242
243    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
244    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
245}
246
247pub use container::{ColMerger, VecMerger};
248
249pub mod container {
250
251    //! A general purpose `Merger` implementation for arbitrary containers.
252    //!
253    //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`.
254    //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as
255    //! well as the ability to return the container when iteration is complete.
256    //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container
257    //! items should be interpreted with respect to times, and with respect to differences.
258    //! These two traits exist instead of a stack of constraints on the structure of the associated items
259    //! of the containers, allowing them to perform their functions without destructuring their guts.
260    //!
261    //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules.
262
263    use crate::trace::implementations::merge_batcher::Merger;
264    use std::cmp::Ordering;
265    use std::marker::PhantomData;
266    use timely::container::DrainContainer;
267    use timely::container::{PushInto, SizableContainer};
268    use timely::progress::frontier::{Antichain, AntichainRef};
269    use timely::{Accountable, Data, PartialOrder};
270
271    /// An abstraction for a container that can be iterated over, and conclude by returning itself.
272    pub trait ContainerQueue<C: DrainContainer> {
273        /// Returns either the next item in the container, or the container itself.
274        fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
275        /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`.
276        fn is_empty(&self) -> bool;
277        /// Compare the heads of two queues, where empty queues come last.
278        fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
279        /// Create a new queue from an existing container.
280        fn from(container: C) -> Self;
281    }
282
283    /// Behavior to dissect items of chunks in the merge batcher
284    pub trait MergerChunk: Accountable + DrainContainer + SizableContainer + Default {
285        /// An owned time type.
286        ///
287        /// This type is provided so that users can maintain antichains of something, in order to track
288        /// the forward movement of time and extract intervals from chains of updates.
289        type TimeOwned;
290        /// The owned diff type.
291        ///
292        /// This type is provided so that users can provide an owned instance to the `push_and_add` method,
293        /// to act as a scratch space when the type is substantial and could otherwise require allocations.
294        type DiffOwned: Default;
295
296        /// Relates a borrowed time to antichains of owned times.
297        ///
298        /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`.
299        fn time_kept(
300            time1: &Self::Item<'_>,
301            upper: &AntichainRef<Self::TimeOwned>,
302            frontier: &mut Antichain<Self::TimeOwned>,
303        ) -> bool;
304
305        /// Push an entry that adds together two diffs.
306        ///
307        /// This is only called when two items are deemed mergeable by the container queue.
308        /// If the two diffs added together is zero do not push anything.
309        fn push_and_add<'a>(
310            &mut self,
311            item1: Self::Item<'a>,
312            item2: Self::Item<'a>,
313            stash: &mut Self::DiffOwned,
314        );
315
316        /// Account the allocations behind the chunk.
317        // TODO: Find a more universal home for this: `Container`?
318        fn account(&self) -> (usize, usize, usize, usize) {
319            let (size, capacity, allocations) = (0, 0, 0);
320            (
321                usize::try_from(self.record_count()).unwrap(),
322                size,
323                capacity,
324                allocations,
325            )
326        }
327
328        /// Clear the chunk, to be reused.
329        fn clear(&mut self);
330    }
331
332    /// A merger for arbitrary containers.
333    ///
334    /// `MC` is a `Container` that implements [`MergerChunk`].
335    /// `CQ` is a [`ContainerQueue`] supporting `MC`.
336    pub struct ContainerMerger<MC, CQ> {
337        _marker: PhantomData<(MC, CQ)>,
338    }
339
340    impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
341        fn default() -> Self {
342            Self {
343                _marker: PhantomData,
344            }
345        }
346    }
347
348    impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
349        /// Helper to get pre-sized vector from the stash.
350        #[inline]
351        fn empty(&self, stash: &mut Vec<MC>) -> MC {
352            stash.pop().unwrap_or_else(|| {
353                let mut container = MC::default();
354                container.ensure_capacity(&mut None);
355                container
356            })
357        }
358        /// Helper to return a chunk to the stash.
359        #[inline]
360        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
361            // TODO: Should we only retain correctly sized containers?
362            chunk.clear();
363            stash.push(chunk);
364        }
365    }
366
367    impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
368    where
369        for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data>
370            + Clone
371            + PushInto<<MC as DrainContainer>::Item<'a>>
372            + 'static,
373        CQ: ContainerQueue<MC>,
374    {
375        type Time = MC::TimeOwned;
376        type Chunk = MC;
377
378        // TODO: Consider integrating with `ConsolidateLayout`.
379        fn merge(
380            &mut self,
381            list1: Vec<Self::Chunk>,
382            list2: Vec<Self::Chunk>,
383            output: &mut Vec<Self::Chunk>,
384            stash: &mut Vec<Self::Chunk>,
385        ) {
386            let mut list1 = list1.into_iter();
387            let mut list2 = list2.into_iter();
388
389            let mut head1 = CQ::from(list1.next().unwrap_or_default());
390            let mut head2 = CQ::from(list2.next().unwrap_or_default());
391
392            let mut result = self.empty(stash);
393
394            let mut diff_owned = Default::default();
395
396            // while we have valid data in each input, merge.
397            while !head1.is_empty() && !head2.is_empty() {
398                while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
399                    let cmp = head1.cmp_heads(&head2);
400                    // TODO: The following less/greater branches could plausibly be a good moment for
401                    // `copy_range`, on account of runs of records that might benefit more from a
402                    // `memcpy`.
403                    match cmp {
404                        Ordering::Less => {
405                            result.push_into(head1.next_or_alloc().ok().unwrap());
406                        }
407                        Ordering::Greater => {
408                            result.push_into(head2.next_or_alloc().ok().unwrap());
409                        }
410                        Ordering::Equal => {
411                            let item1 = head1.next_or_alloc().ok().unwrap();
412                            let item2 = head2.next_or_alloc().ok().unwrap();
413                            result.push_and_add(item1, item2, &mut diff_owned);
414                        }
415                    }
416                }
417
418                if result.at_capacity() {
419                    output.push_into(result);
420                    result = self.empty(stash);
421                }
422
423                if head1.is_empty() {
424                    self.recycle(head1.next_or_alloc().err().unwrap(), stash);
425                    head1 = CQ::from(list1.next().unwrap_or_default());
426                }
427                if head2.is_empty() {
428                    self.recycle(head2.next_or_alloc().err().unwrap(), stash);
429                    head2 = CQ::from(list2.next().unwrap_or_default());
430                }
431            }
432
433            // TODO: recycle `head1` rather than discarding.
434            while let Ok(next) = head1.next_or_alloc() {
435                result.push_into(next);
436                if result.at_capacity() {
437                    output.push_into(result);
438                    result = self.empty(stash);
439                }
440            }
441            if !result.is_empty() {
442                output.push_into(result);
443                result = self.empty(stash);
444            }
445            output.extend(list1);
446
447            // TODO: recycle `head2` rather than discarding.
448            while let Ok(next) = head2.next_or_alloc() {
449                result.push_into(next);
450                if result.at_capacity() {
451                    output.push(result);
452                    result = self.empty(stash);
453                }
454            }
455            if !result.is_empty() {
456                output.push_into(result);
457                // result = self.empty(stash);
458            }
459            output.extend(list2);
460        }
461
462        fn extract(
463            &mut self,
464            merged: Vec<Self::Chunk>,
465            upper: AntichainRef<Self::Time>,
466            frontier: &mut Antichain<Self::Time>,
467            readied: &mut Vec<Self::Chunk>,
468            kept: &mut Vec<Self::Chunk>,
469            stash: &mut Vec<Self::Chunk>,
470        ) {
471            let mut keep = self.empty(stash);
472            let mut ready = self.empty(stash);
473
474            for mut buffer in merged {
475                for item in buffer.drain() {
476                    if MC::time_kept(&item, &upper, frontier) {
477                        if keep.at_capacity() && !keep.is_empty() {
478                            kept.push(keep);
479                            keep = self.empty(stash);
480                        }
481                        keep.push_into(item);
482                    } else {
483                        if ready.at_capacity() && !ready.is_empty() {
484                            readied.push(ready);
485                            ready = self.empty(stash);
486                        }
487                        ready.push_into(item);
488                    }
489                }
490                // Recycling buffer.
491                self.recycle(buffer, stash);
492            }
493            // Finish the kept data.
494            if !keep.is_empty() {
495                kept.push(keep);
496            }
497            if !ready.is_empty() {
498                readied.push(ready);
499            }
500        }
501
502        /// Account the allocations behind the chunk.
503        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
504            chunk.account()
505        }
506    }
507
508    pub use vec::VecMerger;
509    /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers.
510    pub mod vec {
511
512        use super::{ContainerQueue, MergerChunk};
513        use crate::difference::Semigroup;
514        use std::collections::VecDeque;
515        use timely::progress::{frontier::AntichainRef, Antichain};
516
517        /// A `Merger` implementation backed by vector containers.
518        pub type VecMerger<D, T, R> =
519            super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
520
521        impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
522            fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
523                if self.is_empty() {
524                    Err(Vec::from(std::mem::take(self)))
525                } else {
526                    Ok(self.pop_front().unwrap())
527                }
528            }
529            fn is_empty(&self) -> bool {
530                self.is_empty()
531            }
532            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
533                let (data1, time1, _) = self.front().unwrap();
534                let (data2, time2, _) = other.front().unwrap();
535                (data1, time1).cmp(&(data2, time2))
536            }
537            fn from(list: Vec<(D, T, R)>) -> Self {
538                <Self as From<_>>::from(list)
539            }
540        }
541
542        impl<
543                D: Ord + 'static,
544                T: Ord + timely::PartialOrder + Clone + 'static,
545                R: Semigroup + 'static,
546            > MergerChunk for Vec<(D, T, R)>
547        {
548            type TimeOwned = T;
549            type DiffOwned = ();
550
551            fn time_kept(
552                (_, time, _): &Self::Item<'_>,
553                upper: &AntichainRef<Self::TimeOwned>,
554                frontier: &mut Antichain<Self::TimeOwned>,
555            ) -> bool {
556                if upper.less_equal(time) {
557                    frontier.insert_with(&time, |time| time.clone());
558                    true
559                } else {
560                    false
561                }
562            }
563            fn push_and_add<'a>(
564                &mut self,
565                item1: Self::Item<'a>,
566                item2: Self::Item<'a>,
567                _stash: &mut Self::DiffOwned,
568            ) {
569                let (data, time, mut diff1) = item1;
570                let (_data, _time, diff2) = item2;
571                diff1.plus_equals(&diff2);
572                if !diff1.is_zero() {
573                    self.push((data, time, diff1));
574                }
575            }
576            fn account(&self) -> (usize, usize, usize, usize) {
577                let (size, capacity, allocations) = (0, 0, 0);
578                (self.len(), size, capacity, allocations)
579            }
580            #[inline]
581            fn clear(&mut self) {
582                Vec::clear(self)
583            }
584        }
585    }
586
587    pub use columnation::ColMerger;
588    /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation).
589    pub mod columnation {
590
591        use columnation::Columnation;
592        use timely::progress::{frontier::AntichainRef, Antichain};
593
594        use crate::containers::TimelyStack;
595        use crate::difference::Semigroup;
596
597        use super::{ContainerQueue, MergerChunk};
598
599        /// A `Merger` implementation backed by `TimelyStack` containers (columnation).
600        pub type ColMerger<D, T, R> =
601            super::ContainerMerger<TimelyStack<(D, T, R)>, TimelyStackQueue<(D, T, R)>>;
602
603        /// TODO
604        pub struct TimelyStackQueue<T: Columnation> {
605            list: TimelyStack<T>,
606            head: usize,
607        }
608
609        impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation>
610            ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)>
611        {
612            fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
613                if self.is_empty() {
614                    Err(std::mem::take(&mut self.list))
615                } else {
616                    Ok(self.pop())
617                }
618            }
619            fn is_empty(&self) -> bool {
620                self.head == self.list[..].len()
621            }
622            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
623                let (data1, time1, _) = self.peek();
624                let (data2, time2, _) = other.peek();
625                (data1, time1).cmp(&(data2, time2))
626            }
627            fn from(list: TimelyStack<(D, T, R)>) -> Self {
628                TimelyStackQueue { list, head: 0 }
629            }
630        }
631
632        impl<T: Columnation> TimelyStackQueue<T> {
633            fn pop(&mut self) -> &T {
634                self.head += 1;
635                &self.list[self.head - 1]
636            }
637
638            fn peek(&self) -> &T {
639                &self.list[self.head]
640            }
641        }
642
643        impl<
644                D: Ord + Columnation + 'static,
645                T: Ord + timely::PartialOrder + Clone + Columnation + 'static,
646                R: Default + Semigroup + Columnation + 'static,
647            > MergerChunk for TimelyStack<(D, T, R)>
648        {
649            type TimeOwned = T;
650            type DiffOwned = R;
651
652            fn time_kept(
653                (_, time, _): &Self::Item<'_>,
654                upper: &AntichainRef<Self::TimeOwned>,
655                frontier: &mut Antichain<Self::TimeOwned>,
656            ) -> bool {
657                if upper.less_equal(time) {
658                    frontier.insert_with(&time, |time| time.clone());
659                    true
660                } else {
661                    false
662                }
663            }
664            fn push_and_add<'a>(
665                &mut self,
666                item1: Self::Item<'a>,
667                item2: Self::Item<'a>,
668                stash: &mut Self::DiffOwned,
669            ) {
670                let (data, time, diff1) = item1;
671                let (_data, _time, diff2) = item2;
672                stash.clone_from(diff1);
673                stash.plus_equals(&diff2);
674                if !stash.is_zero() {
675                    self.copy_destructured(data, time, stash);
676                }
677            }
678            fn account(&self) -> (usize, usize, usize, usize) {
679                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
680                let cb = |siz, cap| {
681                    size += siz;
682                    capacity += cap;
683                    allocations += 1;
684                };
685                self.heap_size(cb);
686                (self.len(), size, capacity, allocations)
687            }
688            #[inline]
689            fn clear(&mut self) {
690                TimelyStack::clear(self)
691            }
692        }
693    }
694}