timely_container/
columnation.rs

1//! A columnar container based on the columnation library.
2
3use std::iter::FromIterator;
4
5pub use columnation::*;
6
7/// An append-only vector that store records as columns.
8///
9/// This container maintains elements that might conventionally own
10/// memory allocations, but instead the pointers to those allocations
11/// reference larger regions of memory shared with multiple instances
12/// of the type. Elements can be retrieved as references, and care is
13/// taken when this type is dropped to ensure that the correct memory
14/// is returned (rather than the incorrect memory, from running the
15/// elements `Drop` implementations).
16pub struct TimelyStack<T: Columnation> {
17    local: Vec<T>,
18    inner: T::InnerRegion,
19}
20
21impl<T: Columnation> TimelyStack<T> {
22    /// Construct a [TimelyStack], reserving space for `capacity` elements
23    ///
24    /// Note that the associated region is not initialized to a specific capacity
25    /// because we can't generally know how much space would be required.
26    pub fn with_capacity(capacity: usize) -> Self {
27        Self {
28            local: Vec::with_capacity(capacity),
29            inner: T::InnerRegion::default(),
30        }
31    }
32
33    /// Ensures `Self` can absorb `items` without further allocations.
34    ///
35    /// The argument `items` may be cloned and iterated multiple times.
36    /// Please be careful if it contains side effects.
37    #[inline(always)]
38    pub fn reserve_items<'a, I>(&'a mut self, items: I)
39        where
40            I: Iterator<Item= &'a T>+Clone,
41    {
42        self.local.reserve(items.clone().count());
43        self.inner.reserve_items(items);
44    }
45
46    /// Ensures `Self` can absorb `items` without further allocations.
47    ///
48    /// The argument `items` may be cloned and iterated multiple times.
49    /// Please be careful if it contains side effects.
50    #[inline(always)]
51    pub fn reserve_regions<'a, I>(&mut self, regions: I)
52        where
53            Self: 'a,
54            I: Iterator<Item= &'a Self>+Clone,
55    {
56        self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum());
57        self.inner.reserve_regions(regions.map(|cs| &cs.inner));
58    }
59
60
61
62    /// Copies an element in to the region.
63    ///
64    /// The element can be read by indexing
65    pub fn copy(&mut self, item: &T) {
66        // TODO: Some types `T` should just be cloned.
67        // E.g. types that are `Copy` or vecs of ZSTs.
68        unsafe {
69            self.local.push(self.inner.copy(item));
70        }
71    }
72    /// Empties the collection.
73    pub fn clear(&mut self) {
74        unsafe {
75            // Unsafety justified in that setting the length to zero exposes
76            // no invalid data.
77            self.local.set_len(0);
78            self.inner.clear();
79        }
80    }
81    /// Retain elements that pass a predicate, from a specified offset.
82    ///
83    /// This method may or may not reclaim memory in the inner region.
84    pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
85        let mut write_position = index;
86        for position in index..self.local.len() {
87            if predicate(&self[position]) {
88                // TODO: compact the inner region and update pointers.
89                self.local.swap(position, write_position);
90                write_position += 1;
91            }
92        }
93        unsafe {
94            // Unsafety justified in that `write_position` is no greater than
95            // `self.local.len()` and so this exposes no invalid data.
96            self.local.set_len(write_position);
97        }
98    }
99
100    /// Unsafe access to `local` data. The slices stores data that is backed by a region
101    /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice.
102    ///
103    /// # Safety
104    /// Elements within `local` can be reordered, but not mutated, removed and/or dropped.
105    pub unsafe fn local(&mut self) -> &mut [T] {
106        &mut self.local[..]
107    }
108
109    /// Estimate the memory capacity in bytes.
110    #[inline]
111    pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
112        let size_of = std::mem::size_of::<T>();
113        callback(self.local.len() * size_of, self.local.capacity() * size_of);
114        self.inner.heap_size(callback);
115    }
116
117    /// Estimate the consumed memory capacity in bytes, summing both used and total capacity.
118    #[inline]
119    pub fn summed_heap_size(&self) -> (usize, usize) {
120        let (mut length, mut capacity) = (0, 0);
121        self.heap_size(|len, cap| {
122            length += len;
123            capacity += cap
124        });
125        (length, capacity)
126    }
127}
128
129impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
130    /// Copies a destructured tuple `(A, B)` into this column stack.
131    ///
132    /// This serves situations where a tuple should be constructed from its constituents but not
133    /// not all elements are available as owned data.
134    ///
135    /// The element can be read by indexing
136    pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
137        unsafe {
138            self.local.push(self.inner.copy_destructured(t1, t2));
139        }
140    }
141}
142
143impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
144    /// Copies a destructured tuple `(A, B, C)` into this column stack.
145    ///
146    /// This serves situations where a tuple should be constructed from its constituents but not
147    /// not all elements are available as owned data.
148    ///
149    /// The element can be read by indexing
150    pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
151        unsafe {
152            self.local.push(self.inner.copy_destructured(r0, r1, r2));
153        }
154    }
155}
156
157impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
158    type Target = [T];
159    #[inline(always)]
160    fn deref(&self) -> &Self::Target {
161        &self.local[..]
162    }
163}
164
165impl<T: Columnation> Drop for TimelyStack<T> {
166    fn drop(&mut self) {
167        self.clear();
168    }
169}
170
171impl<T: Columnation> Default for TimelyStack<T> {
172    fn default() -> Self {
173        Self {
174            local: Vec::new(),
175            inner: T::InnerRegion::default(),
176        }
177    }
178}
179
180impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
181    fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
182        let mut iter = iter.into_iter();
183        let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
184        while let Some(element) = iter.next() {
185            c.copy(element);
186        }
187
188        c
189    }
190}
191
192impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
193    fn eq(&self, other: &Self) -> bool {
194        PartialEq::eq(&self[..], &other[..])
195    }
196}
197
198impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
199
200impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        (&self[..]).fmt(f)
203    }
204}
205
206impl<T: Columnation> Clone for TimelyStack<T> {
207    fn clone(&self) -> Self {
208        let mut new: Self = Default::default();
209        for item in &self[..] {
210            new.copy(item);
211        }
212        new
213    }
214
215    fn clone_from(&mut self, source: &Self) {
216        self.clear();
217        for item in &source[..] {
218            self.copy(item);
219        }
220    }
221}
222
223mod serde {
224    use serde::{Deserialize, Deserializer, Serialize, Serializer};
225
226    use crate::columnation::{Columnation, TimelyStack};
227
228    impl<T: Columnation + Serialize> Serialize for TimelyStack<T> {
229        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
230        where
231            S: Serializer,
232        {
233            use serde::ser::SerializeSeq;
234            let mut seq = serializer.serialize_seq(Some(self.local.len()))?;
235            for element in &self[..] {
236                seq.serialize_element(element)?;
237            }
238            seq.end()
239        }
240    }
241
242    impl<'a, T: Columnation + Deserialize<'a>> Deserialize<'a> for TimelyStack<T> {
243        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
244        where
245            D: Deserializer<'a>,
246        {
247            use serde::de::{SeqAccess, Visitor};
248            use std::fmt;
249            use std::marker::PhantomData;
250            struct TimelyStackVisitor<T> {
251                marker: PhantomData<T>,
252            }
253
254            impl<'de, T: Columnation> Visitor<'de> for TimelyStackVisitor<T>
255            where
256                T: Deserialize<'de>,
257            {
258                type Value = TimelyStack<T>;
259
260                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
261                    formatter.write_str("a sequence")
262                }
263
264                fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
265                where
266                    A: SeqAccess<'de>,
267                {
268                    let local = Vec::with_capacity(
269                        seq.size_hint()
270                            .unwrap_or(crate::buffer::default_capacity::<T>()),
271                    );
272                    let mut stack = TimelyStack {
273                        local,
274                        inner: T::InnerRegion::default(),
275                    };
276
277                    while let Some(value) = seq.next_element()? {
278                        stack.copy(&value);
279                    }
280
281                    Ok(stack)
282                }
283            }
284
285            let visitor = TimelyStackVisitor {
286                marker: PhantomData,
287            };
288            deserializer.deserialize_seq(visitor)
289        }
290    }
291}
292
293mod container {
294    use crate::{Container, PushPartitioned};
295
296    use crate::columnation::{Columnation, TimelyStack};
297
298    impl<T: Columnation + 'static> Container for TimelyStack<T> {
299        type Item = T;
300
301        fn len(&self) -> usize {
302            self.local.len()
303        }
304
305        fn is_empty(&self) -> bool {
306            self.local.is_empty()
307        }
308
309        fn capacity(&self) -> usize {
310           self.local.capacity()
311        }
312
313        fn clear(&mut self) {
314            TimelyStack::clear(self)
315        }
316    }
317
318    impl<T: Columnation + 'static> PushPartitioned for TimelyStack<T> {
319        fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
320        where
321            I: FnMut(&Self::Item) -> usize,
322            F: FnMut(usize, &mut Self),
323        {
324            fn ensure_capacity<E: Columnation>(this: &mut TimelyStack<E>) {
325                let capacity = this.local.capacity();
326                let desired_capacity = crate::buffer::default_capacity::<E>();
327                if capacity < desired_capacity {
328                    this.local.reserve(desired_capacity - capacity);
329                }
330            }
331
332            for datum in &self[..] {
333                let index = index(&datum);
334                ensure_capacity(&mut buffers[index]);
335                buffers[index].copy(datum);
336                if buffers[index].len() == buffers[index].local.capacity() {
337                    flush(index, &mut buffers[index]);
338                }
339            }
340            self.clear();
341        }
342    }
343}