differential_dataflow/
containers.rs1use std::iter::FromIterator;
4
5pub use columnation::*;
6use timely::container::PushInto;
7
8pub struct TimelyStack<T: Columnation> {
18    local: Vec<T>,
19    inner: T::InnerRegion,
20}
21
22impl<T: Columnation> TimelyStack<T> {
23    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            local: Vec::with_capacity(capacity),
30            inner: T::InnerRegion::default(),
31        }
32    }
33
34    #[inline(always)]
39    pub fn reserve_items<'a, I>(&mut self, items: I)
40    where
41        I: Iterator<Item= &'a T> + Clone,
42        T: 'a,
43    {
44        self.local.reserve(items.clone().count());
45        self.inner.reserve_items(items);
46    }
47
48    #[inline(always)]
53    pub fn reserve_regions<'a, I>(&mut self, regions: I)
54    where
55        Self: 'a,
56        I: Iterator<Item= &'a Self> + Clone,
57    {
58        self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum());
59        self.inner.reserve_regions(regions.map(|cs| &cs.inner));
60    }
61
62
63
64    pub fn copy(&mut self, item: &T) {
68        unsafe {
71            self.local.push(self.inner.copy(item));
72        }
73    }
74    pub fn clear(&mut self) {
76        unsafe {
77            self.local.set_len(0);
80            self.inner.clear();
81        }
82    }
83    pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
87        let mut write_position = index;
88        for position in index..self.local.len() {
89            if predicate(&self[position]) {
90                self.local.swap(position, write_position);
92                write_position += 1;
93            }
94        }
95        unsafe {
96            self.local.set_len(write_position);
99        }
100    }
101
102    pub unsafe fn local(&mut self) -> &mut [T] {
108        &mut self.local[..]
109    }
110
111    #[inline]
113    pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
114        let size_of = std::mem::size_of::<T>();
115        callback(self.local.len() * size_of, self.local.capacity() * size_of);
116        self.inner.heap_size(callback);
117    }
118
119    #[inline]
121    pub fn summed_heap_size(&self) -> (usize, usize) {
122        let (mut length, mut capacity) = (0, 0);
123        self.heap_size(|len, cap| {
124            length += len;
125            capacity += cap
126        });
127        (length, capacity)
128    }
129
130    #[inline]
132    pub fn len(&self) -> usize {
133        self.local.len()
134    }
135
136    pub fn is_empty(&self) -> bool {
138        self.local.is_empty()
139    }
140
141    #[inline]
143    pub fn capacity(&self) -> usize {
144        self.local.capacity()
145    }
146
147    #[inline]
149    pub fn reserve(&mut self, additional: usize) {
150        self.local.reserve(additional)
151    }
152}
153
154impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
155    pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
162        unsafe {
163            self.local.push(self.inner.copy_destructured(t1, t2));
164        }
165    }
166}
167
168impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
169    pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
176        unsafe {
177            self.local.push(self.inner.copy_destructured(r0, r1, r2));
178        }
179    }
180}
181
182impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
183    type Target = [T];
184    #[inline(always)]
185    fn deref(&self) -> &Self::Target {
186        &self.local[..]
187    }
188}
189
190impl<T: Columnation> Drop for TimelyStack<T> {
191    fn drop(&mut self) {
192        self.clear();
193    }
194}
195
196impl<T: Columnation> Default for TimelyStack<T> {
197    fn default() -> Self {
198        Self {
199            local: Vec::new(),
200            inner: T::InnerRegion::default(),
201        }
202    }
203}
204
205impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
206    fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
207        let iter = iter.into_iter();
208        let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
209        for element in iter {
210            c.copy(element);
211        }
212
213        c
214    }
215}
216
217impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
218    fn eq(&self, other: &Self) -> bool {
219        PartialEq::eq(&self[..], &other[..])
220    }
221}
222
223impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
224
225impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        self[..].fmt(f)
228    }
229}
230
231impl<T: Columnation> Clone for TimelyStack<T> {
232    fn clone(&self) -> Self {
233        let mut new: Self = Default::default();
234        for item in &self[..] {
235            new.copy(item);
236        }
237        new
238    }
239
240    fn clone_from(&mut self, source: &Self) {
241        self.clear();
242        for item in &source[..] {
243            self.copy(item);
244        }
245    }
246}
247
248impl<T: Columnation> PushInto<T> for TimelyStack<T> {
249    #[inline]
250    fn push_into(&mut self, item: T) {
251        self.copy(&item);
252    }
253}
254
255impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
256    #[inline]
257    fn push_into(&mut self, item: &T) {
258        self.copy(item);
259    }
260}
261
262
263impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
264    #[inline]
265    fn push_into(&mut self, item: &&T) {
266        self.copy(*item);
267    }
268}
269
270mod container {
271    use columnation::Columnation;
272
273    use crate::containers::TimelyStack;
274
275    impl<T: Columnation> timely::container::Accountable for TimelyStack<T> {
276        #[inline] fn record_count(&self) -> i64 { i64::try_from(self.local.len()).unwrap() }
277        #[inline] fn is_empty(&self) -> bool { self.local.is_empty() }
278    }
279    impl<T: Columnation> timely::container::DrainContainer for TimelyStack<T> {
280        type Item<'a> = &'a T where Self: 'a;
281        type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
282        #[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
283            (*self).iter()
284        }
285    }
286
287    impl<T: Columnation> timely::container::SizableContainer for TimelyStack<T> {
288        fn at_capacity(&self) -> bool {
289            self.len() == self.capacity()
290        }
291        fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
292            if self.capacity() == 0 {
293                *self = stash.take().unwrap_or_default();
294                self.clear();
295            }
296            let preferred = timely::container::buffer::default_capacity::<T>();
297            if self.capacity() < preferred {
298                self.reserve(preferred - self.capacity());
299            }
300        }
301    }
302}