palimpsest_dataflow/
containers.rs1use std::iter::FromIterator;
4
5pub use columnation::*;
6use timely::container::PushInto;
7
8pub struct TimelyStack<T: Columnation> {
18 local: Vec<T>,
19 inner: T::InnerRegion,
20}
21
22impl<T: Columnation> TimelyStack<T> {
23 pub fn with_capacity(capacity: usize) -> Self {
28 Self {
29 local: Vec::with_capacity(capacity),
30 inner: T::InnerRegion::default(),
31 }
32 }
33
34 #[inline(always)]
39 pub fn reserve_items<'a, I>(&mut self, items: I)
40 where
41 I: Iterator<Item = &'a T> + Clone,
42 T: 'a,
43 {
44 self.local.reserve(items.clone().count());
45 self.inner.reserve_items(items);
46 }
47
48 #[inline(always)]
53 pub fn reserve_regions<'a, I>(&mut self, regions: I)
54 where
55 Self: 'a,
56 I: Iterator<Item = &'a Self> + Clone,
57 {
58 self.local
59 .reserve(regions.clone().map(|cs| cs.local.len()).sum());
60 self.inner.reserve_regions(regions.map(|cs| &cs.inner));
61 }
62
63 pub fn copy(&mut self, item: &T) {
67 unsafe {
70 self.local.push(self.inner.copy(item));
71 }
72 }
73 pub fn clear(&mut self) {
75 unsafe {
76 self.local.set_len(0);
79 self.inner.clear();
80 }
81 }
82 pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
86 let mut write_position = index;
87 for position in index..self.local.len() {
88 if predicate(&self[position]) {
89 self.local.swap(position, write_position);
91 write_position += 1;
92 }
93 }
94 unsafe {
95 self.local.set_len(write_position);
98 }
99 }
100
101 pub unsafe fn local(&mut self) -> &mut [T] {
107 &mut self.local[..]
108 }
109
110 #[inline]
112 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
113 let size_of = std::mem::size_of::<T>();
114 callback(self.local.len() * size_of, self.local.capacity() * size_of);
115 self.inner.heap_size(callback);
116 }
117
118 #[inline]
120 pub fn summed_heap_size(&self) -> (usize, usize) {
121 let (mut length, mut capacity) = (0, 0);
122 self.heap_size(|len, cap| {
123 length += len;
124 capacity += cap
125 });
126 (length, capacity)
127 }
128
129 #[inline]
131 pub fn len(&self) -> usize {
132 self.local.len()
133 }
134
135 pub fn is_empty(&self) -> bool {
137 self.local.is_empty()
138 }
139
140 #[inline]
142 pub fn capacity(&self) -> usize {
143 self.local.capacity()
144 }
145
146 #[inline]
148 pub fn reserve(&mut self, additional: usize) {
149 self.local.reserve(additional)
150 }
151}
152
153impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
154 pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
161 unsafe {
162 self.local.push(self.inner.copy_destructured(t1, t2));
163 }
164 }
165}
166
167impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
168 pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
175 unsafe {
176 self.local.push(self.inner.copy_destructured(r0, r1, r2));
177 }
178 }
179}
180
181impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
182 type Target = [T];
183 #[inline(always)]
184 fn deref(&self) -> &Self::Target {
185 &self.local[..]
186 }
187}
188
189impl<T: Columnation> Drop for TimelyStack<T> {
190 fn drop(&mut self) {
191 self.clear();
192 }
193}
194
195impl<T: Columnation> Default for TimelyStack<T> {
196 fn default() -> Self {
197 Self {
198 local: Vec::new(),
199 inner: T::InnerRegion::default(),
200 }
201 }
202}
203
204impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
205 fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
206 let iter = iter.into_iter();
207 let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
208 for element in iter {
209 c.copy(element);
210 }
211
212 c
213 }
214}
215
216impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
217 fn eq(&self, other: &Self) -> bool {
218 PartialEq::eq(&self[..], &other[..])
219 }
220}
221
222impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
223
224impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 self[..].fmt(f)
227 }
228}
229
230impl<T: Columnation> Clone for TimelyStack<T> {
231 fn clone(&self) -> Self {
232 let mut new: Self = Default::default();
233 for item in &self[..] {
234 new.copy(item);
235 }
236 new
237 }
238
239 fn clone_from(&mut self, source: &Self) {
240 self.clear();
241 for item in &source[..] {
242 self.copy(item);
243 }
244 }
245}
246
247impl<T: Columnation> PushInto<T> for TimelyStack<T> {
248 #[inline]
249 fn push_into(&mut self, item: T) {
250 self.copy(&item);
251 }
252}
253
254impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
255 #[inline]
256 fn push_into(&mut self, item: &T) {
257 self.copy(item);
258 }
259}
260
261impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
262 #[inline]
263 fn push_into(&mut self, item: &&T) {
264 self.copy(*item);
265 }
266}
267
268mod container {
269 use columnation::Columnation;
270
271 use crate::containers::TimelyStack;
272
273 impl<T: Columnation> timely::container::Accountable for TimelyStack<T> {
274 #[inline]
275 fn record_count(&self) -> i64 {
276 i64::try_from(self.local.len()).unwrap()
277 }
278 #[inline]
279 fn is_empty(&self) -> bool {
280 self.local.is_empty()
281 }
282 }
283 impl<T: Columnation> timely::container::DrainContainer for TimelyStack<T> {
284 type Item<'a>
285 = &'a T
286 where
287 Self: 'a;
288 type DrainIter<'a>
289 = std::slice::Iter<'a, T>
290 where
291 Self: 'a;
292 #[inline]
293 fn drain(&mut self) -> Self::DrainIter<'_> {
294 (*self).iter()
295 }
296 }
297
298 impl<T: Columnation> timely::container::SizableContainer for TimelyStack<T> {
299 fn at_capacity(&self) -> bool {
300 self.len() == self.capacity()
301 }
302 fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
303 if self.capacity() == 0 {
304 *self = stash.take().unwrap_or_default();
305 self.clear();
306 }
307 let preferred = timely::container::buffer::default_capacity::<T>();
308 if self.capacity() < preferred {
309 self.reserve(preferred - self.capacity());
310 }
311 }
312 }
313}