Skip to main content

timely_container/
lib.rs

1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5use std::collections::VecDeque;
6
7/// A type containing a number of records accounted for by progress tracking.
8///
9/// The object stores a number of updates and thus is able to describe it count
10/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
11/// update count is zero.
12pub trait Accountable {
13    /// The number of records
14    ///
15    /// This number is used in progress tracking to confirm the receipt of some number
16    /// of outstanding records, and it is highly load bearing. The main restriction is
17    /// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
18    /// must preserve the number of records.
19    fn record_count(&self) -> i64;
20
21    /// Determine if this contains any updates, corresponding to `update_count() == 0`.
22    /// It is a correctness error for this to be anything other than `self.record_count() == 0`.
23    #[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
24}
25
26/// A container that can drain itself.
27///
28/// Draining the container presents items in an implementation-specific order.
29/// The container is in an undefined state after calling [`drain`]. Dropping
30/// the iterator also leaves the container in an undefined state.
31pub trait DrainContainer {
32    /// The type of elements when draining the container.
33    type Item<'a> where Self: 'a;
34    /// Iterator type when draining the container.
35    type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
36    /// Returns an iterator that drains the contents of this container.
37    /// Drain leaves the container in an undefined state.
38    fn drain(&mut self) -> Self::DrainIter<'_>;
39}
40
41/// A container that can be sized and reveals its capacity.
42pub trait SizableContainer {
43    /// Indicates that the container is "full" and should be shipped.
44    fn at_capacity(&self) -> bool;
45    /// Restores `self` to its desired capacity, if it has one.
46    ///
47    /// The `stash` argument is available, and may have the intended capacity.
48    /// However, it may be non-empty, and may be of the wrong capacity. The
49    /// method should guard against these cases.
50    ///
51    /// Assume that the `stash` is in an undefined state, and properly clear it
52    /// before re-using it.
53    fn ensure_capacity(&mut self, stash: &mut Option<Self>) where Self: Sized;
54}
55
56/// A container that can absorb items of a specific type.
57pub trait PushInto<T> {
58    /// Push item into self.
59    fn push_into(&mut self, item: T);
60}
61
62/// A type that can build containers from items.
63///
64/// An implementation needs to absorb elements, and later reveal equivalent information
65/// chunked into individual containers, but is free to change the data representation to
66/// better fit the properties of the container.
67///
68/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
69/// that users can push the expected item types.
70///
71/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
72/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
73/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
74/// to drain pending or finished data.
75///
76/// The caller should consume the containers returned by [`Self::extract`] and
77/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
78/// any remaining elements.
79///
80/// Implementations are allowed to re-use the contents of the mutable references left by the caller,
81/// but they should ensure that they clear the contents before doing so.
82///
83/// For example, a consolidating builder can aggregate differences in-place, but it has
84/// to ensure that it preserves the intended information.
85///
86/// The trait does not prescribe any specific ordering guarantees, and each implementation can
87/// decide to represent a push order for `extract` and `finish`, or not.
88pub trait ContainerBuilder: Default {
89    /// The container type we're building.
90    // The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
91    // all over Timely. `'static` because we don't want lifetimes everywhere.
92    type Container;
93    /// Extract assembled containers, potentially leaving unfinished data behind. Can
94    /// be called repeatedly, for example while the caller can send data.
95    ///
96    /// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
97    #[must_use]
98    fn extract(&mut self) -> Option<&mut Self::Container>;
99    /// Extract assembled containers and any unfinished data. Should
100    /// be called repeatedly until it returns `None`.
101    #[must_use]
102    fn finish(&mut self) -> Option<&mut Self::Container>;
103    /// Indicates a good moment to release resources.
104    ///
105    /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]
106    /// before calling this function. The implementation should not change the contents of the
107    /// builder.
108    #[inline]
109    fn relax(&mut self) { }
110}
111
112/// A wrapper trait indicating that the container building will preserve the number of records.
113///
114/// Specifically, the sum of record counts of all extracted and finished containers must equal the
115/// number of accounted records that are pushed into the container builder.
116/// If you have any questions about this trait you are best off not implementing it.
117pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
118
119pub use noop::NoopBuilder;
120/// A container builder that accepts containers and produces them immediately.
121mod noop {
122
123    /// A container builder that only accepts containers, and produces them as output.
124    ///
125    /// This exists for operators that have containers ready to go, and haven't the need for a builder.
126    pub struct NoopBuilder<C>{ phantom: std::marker::PhantomData<C> }
127
128    impl<C> Default for NoopBuilder<C> { fn default() -> Self { Self { phantom: std::marker::PhantomData } } }
129    impl<C> super::ContainerBuilder for NoopBuilder<C> {
130        type Container = C;
131        #[inline] fn extract(&mut self) -> Option<&mut C> { None }
132        #[inline] fn finish(&mut self) -> Option<&mut C> { None }
133    }
134
135}
136
137/// A default container builder that uses length and preferred capacity to chunk data.
138///
139/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
140/// across [`Self::finish`] to maintain a low memory footprint.
141///
142/// Maintains FIFO order.
143#[derive(Default, Debug)]
144pub struct CapacityContainerBuilder<C>{
145    /// Container that we're writing to.
146    current: C,
147    /// Empty allocation.
148    empty: Option<C>,
149    /// Completed containers pending to be sent.
150    pending: VecDeque<C>,
151}
152
153impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
154    #[inline]
155    fn push_into(&mut self, item: T) {
156        // Ensure capacity
157        self.current.ensure_capacity(&mut self.empty);
158
159        // Push item
160        self.current.push_into(item);
161
162        // Maybe flush
163        if self.current.at_capacity() {
164            self.pending.push_back(std::mem::take(&mut self.current));
165        }
166    }
167}
168
169impl<C: Accountable + Default> ContainerBuilder for CapacityContainerBuilder<C> {
170    type Container = C;
171
172    #[inline]
173    fn extract(&mut self) -> Option<&mut C> {
174        if let Some(container) = self.pending.pop_front() {
175            self.empty = Some(container);
176            self.empty.as_mut()
177        } else {
178            None
179        }
180    }
181
182    #[inline]
183    fn finish(&mut self) -> Option<&mut C> {
184        if !self.current.is_empty() {
185            self.pending.push_back(std::mem::take(&mut self.current));
186        }
187        self.empty = self.pending.pop_front();
188        self.empty.as_mut()
189    }
190}
191
192impl<C: Accountable + SizableContainer + Default> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
193
194impl<T> Accountable for Vec<T> {
195    #[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
196    #[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
197}
198
199impl<T> DrainContainer for Vec<T> {
200    type Item<'a> = T where T: 'a;
201    type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
202    #[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
203        self.drain(..)
204    }
205}
206
207impl<T> SizableContainer for Vec<T> {
208    #[inline] fn at_capacity(&self) -> bool {
209        self.len() == self.capacity()
210    }
211    #[inline] fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
212        if self.capacity() == 0 {
213            *self = stash.take().unwrap_or_default();
214            self.clear();
215        }
216        let preferred = buffer::default_capacity::<T>();
217        if self.capacity() < preferred {
218            self.reserve(preferred - self.capacity());
219        }
220    }
221}
222
223impl<T> PushInto<T> for Vec<T> {
224    #[inline]
225    fn push_into(&mut self, item: T) {
226        self.push(item)
227    }
228}
229
230
231impl<T: Clone> PushInto<&T> for Vec<T> {
232    #[inline]
233    fn push_into(&mut self, item: &T) {
234        self.push(item.clone())
235    }
236}
237
238impl<T: Clone> PushInto<&&T> for Vec<T> {
239    #[inline]
240    fn push_into(&mut self, item: &&T) {
241        self.push_into(*item)
242    }
243}
244
245mod rc {
246    impl<T: crate::Accountable> crate::Accountable for std::rc::Rc<T> {
247        #[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }
248        #[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
249    }
250    impl<T> crate::DrainContainer for std::rc::Rc<T>
251    where
252        for<'a> &'a T: IntoIterator
253    {
254        type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a;
255        type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a;
256        #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() }
257    }
258}
259
260mod arc {
261    impl<T: crate::Accountable> crate::Accountable for std::sync::Arc<T> {
262        #[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }
263        #[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
264    }
265    impl<T> crate::DrainContainer for std::sync::Arc<T>
266    where
267        for<'a> &'a T: IntoIterator
268    {
269        type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a;
270        type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a;
271        #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() }
272    }
273}
274
275pub mod buffer {
276    //! Functionality related to calculating default buffer sizes
277
278    /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
279    /// this to size in elements.
280    pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
281
282    /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
283    /// and 1, inclusively.
284    pub const fn default_capacity<T>() -> usize {
285        let size = std::mem::size_of::<T>();
286        if size == 0 {
287            BUFFER_SIZE_BYTES
288        } else if size <= BUFFER_SIZE_BYTES {
289            BUFFER_SIZE_BYTES / size
290        } else {
291            1
292        }
293    }
294}