timely_container/lib.rs
1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5pub mod columnation;
6
7/// A container transferring data through dataflow edges
8///
9/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
11///
12/// A container must implement default. The default implementation is not required to allocate
13/// memory for variable-length components.
14///
15/// We require the container to be cloneable to enable efficient copies when providing references
16/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17/// is efficient (which is not necessarily the case when deriving `Clone`.)
18/// TODO: Don't require `Container: Clone`
19pub trait Container: Default + Clone + 'static {
20 /// The type of elements this container holds.
21 type Item;
22
23 /// The number of elements in this container
24 ///
25 /// The length of a container must be consistent between sending and receiving it.
26 /// When exchanging a container and partitioning it into pieces, the sum of the length
27 /// of all pieces must be equal to the length of the original container.
28 fn len(&self) -> usize;
29
30 /// Determine if the container contains any elements, corresponding to `len() == 0`.
31 fn is_empty(&self) -> bool {
32 self.len() == 0
33 }
34
35 /// The capacity of the underlying container
36 fn capacity(&self) -> usize;
37
38 /// Remove all contents from `self` while retaining allocated memory.
39 /// After calling `clear`, `is_empty` must return `true` and `len` 0.
40 fn clear(&mut self);
41}
42
43impl<T: Clone + 'static> Container for Vec<T> {
44 type Item = T;
45
46 fn len(&self) -> usize {
47 Vec::len(self)
48 }
49
50 fn is_empty(&self) -> bool {
51 Vec::is_empty(self)
52 }
53
54 fn capacity(&self) -> usize {
55 Vec::capacity(self)
56 }
57
58 fn clear(&mut self) { Vec::clear(self) }
59}
60
61mod rc {
62 use std::rc::Rc;
63
64 use crate::Container;
65
66 impl<T: Container> Container for Rc<T> {
67 type Item = T::Item;
68
69 fn len(&self) -> usize {
70 std::ops::Deref::deref(self).len()
71 }
72
73 fn is_empty(&self) -> bool {
74 std::ops::Deref::deref(self).is_empty()
75 }
76
77 fn capacity(&self) -> usize {
78 std::ops::Deref::deref(self).capacity()
79 }
80
81 fn clear(&mut self) {
82 // Try to reuse the allocation if possible
83 if let Some(inner) = Rc::get_mut(self) {
84 inner.clear();
85 } else {
86 *self = Self::default();
87 }
88 }
89 }
90}
91
92mod arc {
93 use std::sync::Arc;
94
95 use crate::Container;
96
97 impl<T: Container> Container for Arc<T> {
98 type Item = T::Item;
99
100 fn len(&self) -> usize {
101 std::ops::Deref::deref(self).len()
102 }
103
104 fn is_empty(&self) -> bool {
105 std::ops::Deref::deref(self).is_empty()
106 }
107
108 fn capacity(&self) -> usize {
109 std::ops::Deref::deref(self).capacity()
110 }
111
112 fn clear(&mut self) {
113 // Try to reuse the allocation if possible
114 if let Some(inner) = Arc::get_mut(self) {
115 inner.clear();
116 } else {
117 *self = Self::default();
118 }
119 }
120 }
121}
122
123/// A container that can partition itself into pieces.
124pub trait PushPartitioned: Container {
125 /// Partition and push this container.
126 ///
127 /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to
128 /// append an element to. Call `flush` with an index and a buffer to send the data downstream.
129 fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], index: I, flush: F)
130 where
131 I: FnMut(&Self::Item) -> usize,
132 F: FnMut(usize, &mut Self);
133}
134
135impl<T: Clone + 'static> PushPartitioned for Vec<T> {
136 fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
137 where
138 I: FnMut(&Self::Item) -> usize,
139 F: FnMut(usize, &mut Self),
140 {
141 fn ensure_capacity<E>(this: &mut Vec<E>) {
142 let capacity = this.capacity();
143 let desired_capacity = buffer::default_capacity::<E>();
144 if capacity < desired_capacity {
145 this.reserve(desired_capacity - capacity);
146 }
147 }
148
149 for datum in self.drain(..) {
150 let index = index(&datum);
151 ensure_capacity(&mut buffers[index]);
152 buffers[index].push(datum);
153 if buffers[index].len() == buffers[index].capacity() {
154 flush(index, &mut buffers[index]);
155 }
156 }
157 }
158}
159
160pub mod buffer {
161 //! Functionality related to calculating default buffer sizes
162
163 /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
164 /// this to size in elements.
165 pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
166
167 /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
168 /// and 1, inclusively.
169 pub const fn default_capacity<T>() -> usize {
170 let size = ::std::mem::size_of::<T>();
171 if size == 0 {
172 BUFFER_SIZE_BYTES
173 } else if size <= BUFFER_SIZE_BYTES {
174 BUFFER_SIZE_BYTES / size
175 } else {
176 1
177 }
178 }
179}