Skip to main content

palimpsest_dataflow/
containers.rs

1//! A columnar container based on the columnation library.
2
3use std::iter::FromIterator;
4
5pub use columnation::*;
6use timely::container::PushInto;
7
8/// An append-only vector that store records as columns.
9///
10/// This container maintains elements that might conventionally own
11/// memory allocations, but instead the pointers to those allocations
12/// reference larger regions of memory shared with multiple instances
13/// of the type. Elements can be retrieved as references, and care is
14/// taken when this type is dropped to ensure that the correct memory
15/// is returned (rather than the incorrect memory, from running the
16/// elements `Drop` implementations).
17pub struct TimelyStack<T: Columnation> {
18    local: Vec<T>,
19    inner: T::InnerRegion,
20}
21
22impl<T: Columnation> TimelyStack<T> {
23    /// Construct a [TimelyStack], reserving space for `capacity` elements
24    ///
25    /// Note that the associated region is not initialized to a specific capacity
26    /// because we can't generally know how much space would be required.
27    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            local: Vec::with_capacity(capacity),
30            inner: T::InnerRegion::default(),
31        }
32    }
33
34    /// Ensures `Self` can absorb `items` without further allocations.
35    ///
36    /// The argument `items` may be cloned and iterated multiple times.
37    /// Please be careful if it contains side effects.
38    #[inline(always)]
39    pub fn reserve_items<'a, I>(&mut self, items: I)
40    where
41        I: Iterator<Item = &'a T> + Clone,
42        T: 'a,
43    {
44        self.local.reserve(items.clone().count());
45        self.inner.reserve_items(items);
46    }
47
48    /// Ensures `Self` can absorb `items` without further allocations.
49    ///
50    /// The argument `items` may be cloned and iterated multiple times.
51    /// Please be careful if it contains side effects.
52    #[inline(always)]
53    pub fn reserve_regions<'a, I>(&mut self, regions: I)
54    where
55        Self: 'a,
56        I: Iterator<Item = &'a Self> + Clone,
57    {
58        self.local
59            .reserve(regions.clone().map(|cs| cs.local.len()).sum());
60        self.inner.reserve_regions(regions.map(|cs| &cs.inner));
61    }
62
63    /// Copies an element in to the region.
64    ///
65    /// The element can be read by indexing
66    pub fn copy(&mut self, item: &T) {
67        // TODO: Some types `T` should just be cloned.
68        // E.g. types that are `Copy` or vecs of ZSTs.
69        unsafe {
70            self.local.push(self.inner.copy(item));
71        }
72    }
73    /// Empties the collection.
74    pub fn clear(&mut self) {
75        unsafe {
76            // Unsafety justified in that setting the length to zero exposes
77            // no invalid data.
78            self.local.set_len(0);
79            self.inner.clear();
80        }
81    }
82    /// Retain elements that pass a predicate, from a specified offset.
83    ///
84    /// This method may or may not reclaim memory in the inner region.
85    pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
86        let mut write_position = index;
87        for position in index..self.local.len() {
88            if predicate(&self[position]) {
89                // TODO: compact the inner region and update pointers.
90                self.local.swap(position, write_position);
91                write_position += 1;
92            }
93        }
94        unsafe {
95            // Unsafety justified in that `write_position` is no greater than
96            // `self.local.len()` and so this exposes no invalid data.
97            self.local.set_len(write_position);
98        }
99    }
100
101    /// Unsafe access to `local` data. The slices stor data that is backed by a region
102    /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice.
103    ///
104    /// # Safety
105    /// Elements within `local` can be reordered, but not mutated, removed and/or dropped.
106    pub unsafe fn local(&mut self) -> &mut [T] {
107        &mut self.local[..]
108    }
109
110    /// Estimate the memory capacity in bytes.
111    #[inline]
112    pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
113        let size_of = std::mem::size_of::<T>();
114        callback(self.local.len() * size_of, self.local.capacity() * size_of);
115        self.inner.heap_size(callback);
116    }
117
118    /// Estimate the consumed memory capacity in bytes, summing both used and total capacity.
119    #[inline]
120    pub fn summed_heap_size(&self) -> (usize, usize) {
121        let (mut length, mut capacity) = (0, 0);
122        self.heap_size(|len, cap| {
123            length += len;
124            capacity += cap
125        });
126        (length, capacity)
127    }
128
129    /// The length in items.
130    #[inline]
131    pub fn len(&self) -> usize {
132        self.local.len()
133    }
134
135    /// Returns `true` if the stack is empty.
136    pub fn is_empty(&self) -> bool {
137        self.local.is_empty()
138    }
139
140    /// The capacity of the local vector.
141    #[inline]
142    pub fn capacity(&self) -> usize {
143        self.local.capacity()
144    }
145
146    /// Reserve space for `additional` elements.
147    #[inline]
148    pub fn reserve(&mut self, additional: usize) {
149        self.local.reserve(additional)
150    }
151}
152
153impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
154    /// Copies a destructured tuple `(A, B)` into this column stack.
155    ///
156    /// This serves situations where a tuple should be constructed from its constituents but
157    /// not all elements are available as owned data.
158    ///
159    /// The element can be read by indexing
160    pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
161        unsafe {
162            self.local.push(self.inner.copy_destructured(t1, t2));
163        }
164    }
165}
166
167impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
168    /// Copies a destructured tuple `(A, B, C)` into this column stack.
169    ///
170    /// This serves situations where a tuple should be constructed from its constituents but
171    /// not all elements are available as owned data.
172    ///
173    /// The element can be read by indexing
174    pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
175        unsafe {
176            self.local.push(self.inner.copy_destructured(r0, r1, r2));
177        }
178    }
179}
180
181impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
182    type Target = [T];
183    #[inline(always)]
184    fn deref(&self) -> &Self::Target {
185        &self.local[..]
186    }
187}
188
189impl<T: Columnation> Drop for TimelyStack<T> {
190    fn drop(&mut self) {
191        self.clear();
192    }
193}
194
195impl<T: Columnation> Default for TimelyStack<T> {
196    fn default() -> Self {
197        Self {
198            local: Vec::new(),
199            inner: T::InnerRegion::default(),
200        }
201    }
202}
203
204impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
205    fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
206        let iter = iter.into_iter();
207        let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
208        for element in iter {
209            c.copy(element);
210        }
211
212        c
213    }
214}
215
216impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
217    fn eq(&self, other: &Self) -> bool {
218        PartialEq::eq(&self[..], &other[..])
219    }
220}
221
222impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
223
224impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        self[..].fmt(f)
227    }
228}
229
230impl<T: Columnation> Clone for TimelyStack<T> {
231    fn clone(&self) -> Self {
232        let mut new: Self = Default::default();
233        for item in &self[..] {
234            new.copy(item);
235        }
236        new
237    }
238
239    fn clone_from(&mut self, source: &Self) {
240        self.clear();
241        for item in &source[..] {
242            self.copy(item);
243        }
244    }
245}
246
247impl<T: Columnation> PushInto<T> for TimelyStack<T> {
248    #[inline]
249    fn push_into(&mut self, item: T) {
250        self.copy(&item);
251    }
252}
253
254impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
255    #[inline]
256    fn push_into(&mut self, item: &T) {
257        self.copy(item);
258    }
259}
260
261impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
262    #[inline]
263    fn push_into(&mut self, item: &&T) {
264        self.copy(*item);
265    }
266}
267
268mod container {
269    use columnation::Columnation;
270
271    use crate::containers::TimelyStack;
272
273    impl<T: Columnation> timely::container::Accountable for TimelyStack<T> {
274        #[inline]
275        fn record_count(&self) -> i64 {
276            i64::try_from(self.local.len()).unwrap()
277        }
278        #[inline]
279        fn is_empty(&self) -> bool {
280            self.local.is_empty()
281        }
282    }
283    impl<T: Columnation> timely::container::DrainContainer for TimelyStack<T> {
284        type Item<'a>
285            = &'a T
286        where
287            Self: 'a;
288        type DrainIter<'a>
289            = std::slice::Iter<'a, T>
290        where
291            Self: 'a;
292        #[inline]
293        fn drain(&mut self) -> Self::DrainIter<'_> {
294            (*self).iter()
295        }
296    }
297
298    impl<T: Columnation> timely::container::SizableContainer for TimelyStack<T> {
299        fn at_capacity(&self) -> bool {
300            self.len() == self.capacity()
301        }
302        fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
303            if self.capacity() == 0 {
304                *self = stash.take().unwrap_or_default();
305                self.clear();
306            }
307            let preferred = timely::container::buffer::default_capacity::<T>();
308            if self.capacity() < preferred {
309                self.reserve(preferred - self.capacity());
310            }
311        }
312    }
313}