Skip to main content

sharded_vec_writer/
lib.rs

1use std::error::Error;
2use std::fmt::Display;
3use std::marker::PhantomData;
4
5/// Builds a `Vec<T>`, with each variable-sized chunk of the Vec being initialised separately, most
6/// likely from a separate thread.
7pub struct VecWriter<'vec, T> {
8    storage: &'vec mut Vec<T>,
9    taken: usize,
10}
11
12/// A mutable borrow of part of a `Vec`. Can be used to initialise that part of the `Vec` before
13/// returning it. Dropping a shard without returning it to the writer will drop any values that were
14/// written into it.
15pub struct Shard<'vec, T> {
16    /// Pointer to the start off `storage` on the builder.
17    storage: *mut T,
18
19    /// The start offset within the original builder that we're responsible for.
20    start_offset: usize,
21
22    /// The exclusive end-offset up to which we're responsible for.
23    end_offset: usize,
24
25    /// The exclusive offset up to which we have initialised.
26    initialised_up_to: usize,
27
28    _phantom: PhantomData<&'vec mut T>,
29}
30
31impl<T> Drop for Shard<'_, T> {
32    fn drop(&mut self) {
33        // We've been dropped without being returned to the writer, clean up any values that were
34        // written so that they don't leak.
35        for offset in self.start_offset..self.initialised_up_to {
36            unsafe { self.storage.add(offset).read() };
37        }
38    }
39}
40
41unsafe impl<T: Send> Send for Shard<'_, T> {}
42unsafe impl<T: Sync> Sync for Shard<'_, T> {}
43
44impl<'vec, T> VecWriter<'vec, T> {
45    /// Creates a new writer that will write into the supplied `Vec`.
46    pub fn new(storage: &'vec mut Vec<T>) -> Self {
47        let taken = storage.len();
48        Self { storage, taken }
49    }
50
51    /// Takes the next `n` elements of the vector or panics if there is insufficient capacity.
52    pub fn take_shard(&mut self, n: usize) -> Shard<'vec, T> {
53        self.try_take_shard(n).unwrap_or_else(|_| {
54            panic!(
55                "Tried to take {n} when only {} available",
56                self.storage.capacity() - self.taken
57            );
58        })
59    }
60
61    /// Takes the next `n` elements of the vector or returns an error if there is insufficient
62    /// capacity.
63    pub fn try_take_shard(&mut self, n: usize) -> Result<Shard<'vec, T>, InsufficientCapacity> {
64        let end_offset = self.taken.saturating_add(n);
65        if end_offset > self.storage.capacity() {
66            return Err(InsufficientCapacity);
67        }
68        let shard = Shard {
69            storage: self.storage.as_mut_ptr(),
70            start_offset: self.taken,
71            initialised_up_to: self.taken,
72            end_offset,
73            _phantom: Default::default(),
74        };
75        self.taken = end_offset;
76        Ok(shard)
77    }
78
79    /// Takes shards with sizes supplied by `sizes`. Panics if there is insufficient capacity.
80    pub fn take_shards(&mut self, sizes: impl Iterator<Item = usize>) -> Vec<Shard<'vec, T>> {
81        sizes.map(|n| self.take_shard(n)).collect()
82    }
83
84    pub fn try_take_shards(
85        &mut self,
86        sizes: impl Iterator<Item = usize>,
87    ) -> Result<Vec<Shard<'vec, T>>, InsufficientCapacity> {
88        sizes.map(|n| self.try_take_shard(n)).collect()
89    }
90
91    /// Returns a shard to the vector, increasing the initialised length of the vector by the size
92    /// of the shard. The shard must have been fully initialised before being returned. Shards must
93    /// be returned in order. Panics on failure.
94    #[track_caller]
95    pub fn return_shard(&mut self, shard: Shard<T>) {
96        self.try_return_shard(shard).unwrap()
97    }
98
99    /// As for `return_shard`, but returns an error on failure rather than panicking.
100    pub fn try_return_shard(&mut self, shard: Shard<T>) -> Result<(), InitError> {
101        if self.storage.as_mut_ptr() != shard.storage {
102            return Err(InitError::WrongVec);
103        }
104        if shard.initialised_up_to != shard.end_offset {
105            return Err(InitError::UninitElements);
106        }
107        if self.storage.len() != shard.start_offset {
108            return Err(InitError::OutOfOrder);
109        }
110        // Safety: All values between the previous length and the new length were set by writes in
111        // `try_push`.
112        unsafe { self.storage.set_len(shard.initialised_up_to) };
113
114        // The values written into the shard are now owned by the vec, so forget the shard without
115        // dropping it, otherwise it'll double-free the values in the shard.
116        core::mem::forget(shard);
117        Ok(())
118    }
119
120    /// Returns the supplied shards. Panics if any shards have not been fully initialised or if the
121    /// shards are out-of-order.
122    pub fn return_shards(&mut self, shards: Vec<Shard<T>>) {
123        shards
124            .into_iter()
125            .for_each(|shard| self.return_shard(shard));
126    }
127
128    /// Returns the supplied shards.
129    pub fn try_return_shards(&mut self, shards: Vec<Shard<T>>) -> Result<(), InitError> {
130        shards
131            .into_iter()
132            .try_for_each(|shard| self.try_return_shard(shard))
133    }
134}
135
136impl<'store, T> Shard<'store, T> {
137    /// Appends a value to the shard. Panics if the shard has already been fully used. Returns a
138    /// mutable reference to the value that was pushed.
139    #[track_caller]
140    #[inline]
141    pub fn push(&mut self, value: T) -> &'store mut T {
142        self.try_push(value).unwrap()
143    }
144
145    /// Appends a value to the shard or returns an error if it has already been fully used.
146    #[inline]
147    pub fn try_push(&mut self, value: T) -> Result<&'store mut T, InsufficientCapacity> {
148        if self.initialised_up_to == self.end_offset {
149            return Err(InsufficientCapacity);
150        }
151        let ptr;
152        // Safety: The memory we're writing to was allocated by the Vec that we're writing. It's
153        // currently uninitialised (not that that matters for safety). It doesn't alias, since all
154        // shards are created non-overlapping.
155        unsafe {
156            ptr = self.storage.add(self.initialised_up_to);
157            ptr.write(value);
158        }
159        self.initialised_up_to += 1;
160        // Safety: The memory to which we're taking a reference is now initialised with a valid T.
161        // Alignment requirement will have been upheld by the underlying Vec. The returned reference
162        // won't alias with references returned by any other calls to push, since we always advance.
163        Ok(unsafe { &mut *ptr })
164    }
165
166    /// Returns the size of this shard (initialised and uninitialised).
167    #[inline]
168    pub fn len(&self) -> usize {
169        self.end_offset - self.start_offset
170    }
171
172    pub fn is_empty(&self) -> bool {
173        self.len() == 0
174    }
175
176    /// Returns the offset in the output vector at which the next push will write.
177    #[inline]
178    pub fn output_offset(&self) -> usize {
179        self.initialised_up_to
180    }
181}
182
183/// Insufficient capacity for operation.
184#[derive(Debug, PartialEq, Eq)]
185pub struct InsufficientCapacity;
186impl Error for InsufficientCapacity {}
187impl Display for InsufficientCapacity {
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        write!(f, "Insufficient capacity")
190    }
191}
192
193/// An error that can occur when returning a shard to a writer.
194#[derive(Debug, PartialEq, Eq)]
195pub enum InitError {
196    /// One or more elements weren't initialised.
197    UninitElements,
198
199    /// A shard was returned to a writer other than the one that created it.
200    WrongVec,
201
202    /// Shards were returned out-of-order or a shard was missing.
203    OutOfOrder,
204}
205impl Error for InitError {}
206impl Display for InitError {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        match self {
209            InitError::UninitElements => write!(f, "Elements not initialised"),
210            InitError::WrongVec => write!(f, "Shard returned to wrong vec"),
211            InitError::OutOfOrder => write!(f, "Shards returned out-of-order"),
212        }
213    }
214}