timely_container/
lib.rs

1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5pub mod columnation;
6
7/// A container transferring data through dataflow edges
8///
9/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
11///
12/// A container must implement default. The default implementation is not required to allocate
13/// memory for variable-length components.
14///
15/// We require the container to be cloneable to enable efficient copies when providing references
16/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17/// is efficient (which is not necessarily the case when deriving `Clone`.)
18/// TODO: Don't require `Container: Clone`
19pub trait Container: Default + Clone + 'static {
20    /// The type of elements this container holds.
21    type Item;
22
23    /// The number of elements in this container
24    ///
25    /// The length of a container must be consistent between sending and receiving it.
26    /// When exchanging a container and partitioning it into pieces, the sum of the length
27    /// of all pieces must be equal to the length of the original container.
28    fn len(&self) -> usize;
29
30    /// Determine if the container contains any elements, corresponding to `len() == 0`.
31    fn is_empty(&self) -> bool {
32        self.len() == 0
33    }
34
35    /// The capacity of the underlying container
36    fn capacity(&self) -> usize;
37
38    /// Remove all contents from `self` while retaining allocated memory.
39    /// After calling `clear`, `is_empty` must return `true` and `len` 0.
40    fn clear(&mut self);
41}
42
43impl<T: Clone + 'static> Container for Vec<T> {
44    type Item = T;
45
46    fn len(&self) -> usize {
47        Vec::len(self)
48    }
49
50    fn is_empty(&self) -> bool {
51        Vec::is_empty(self)
52    }
53
54    fn capacity(&self) -> usize {
55        Vec::capacity(self)
56    }
57
58    fn clear(&mut self) { Vec::clear(self) }
59}
60
61mod rc {
62    use std::rc::Rc;
63
64    use crate::Container;
65
66    impl<T: Container> Container for Rc<T> {
67        type Item = T::Item;
68
69        fn len(&self) -> usize {
70            std::ops::Deref::deref(self).len()
71        }
72
73        fn is_empty(&self) -> bool {
74            std::ops::Deref::deref(self).is_empty()
75        }
76
77        fn capacity(&self) -> usize {
78            std::ops::Deref::deref(self).capacity()
79        }
80
81        fn clear(&mut self) {
82            // Try to reuse the allocation if possible
83            if let Some(inner) = Rc::get_mut(self) {
84                inner.clear();
85            } else {
86                *self = Self::default();
87            }
88        }
89    }
90}
91
92mod arc {
93    use std::sync::Arc;
94
95    use crate::Container;
96
97    impl<T: Container> Container for Arc<T> {
98        type Item = T::Item;
99
100        fn len(&self) -> usize {
101            std::ops::Deref::deref(self).len()
102        }
103
104        fn is_empty(&self) -> bool {
105            std::ops::Deref::deref(self).is_empty()
106        }
107
108        fn capacity(&self) -> usize {
109            std::ops::Deref::deref(self).capacity()
110        }
111
112        fn clear(&mut self) {
113            // Try to reuse the allocation if possible
114            if let Some(inner) = Arc::get_mut(self) {
115                inner.clear();
116            } else {
117                *self = Self::default();
118            }
119        }
120    }
121}
122
123/// A container that can partition itself into pieces.
124pub trait PushPartitioned: Container {
125    /// Partition and push this container.
126    ///
127    /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to
128    /// append an element to. Call `flush` with an index and a buffer to send the data downstream.
129    fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], index: I, flush: F)
130    where
131        I: FnMut(&Self::Item) -> usize,
132        F: FnMut(usize, &mut Self);
133}
134
135impl<T: Clone + 'static> PushPartitioned for Vec<T> {
136    fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
137    where
138        I: FnMut(&Self::Item) -> usize,
139        F: FnMut(usize, &mut Self),
140    {
141        fn ensure_capacity<E>(this: &mut Vec<E>) {
142            let capacity = this.capacity();
143            let desired_capacity = buffer::default_capacity::<E>();
144            if capacity < desired_capacity {
145                this.reserve(desired_capacity - capacity);
146            }
147        }
148
149        for datum in self.drain(..) {
150            let index = index(&datum);
151            ensure_capacity(&mut buffers[index]);
152            buffers[index].push(datum);
153            if buffers[index].len() == buffers[index].capacity() {
154                flush(index, &mut buffers[index]);
155            }
156        }
157    }
158}
159
160pub mod buffer {
161    //! Functionality related to calculating default buffer sizes
162
163    /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
164    /// this to size in elements.
165    pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
166
167    /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
168    /// and 1, inclusively.
169    pub const fn default_capacity<T>() -> usize {
170        let size = ::std::mem::size_of::<T>();
171        if size == 0 {
172            BUFFER_SIZE_BYTES
173        } else if size <= BUFFER_SIZE_BYTES {
174            BUFFER_SIZE_BYTES / size
175        } else {
176            1
177        }
178    }
179}