concurrent_slotmap/
slot.rs

1use crate::{Header, HeaderShard, SlotId, NIL, OCCUPIED_TAG, SHARD_COUNT};
2use core::{
3    alloc::Layout,
4    cell::UnsafeCell,
5    cmp, fmt,
6    marker::PhantomData,
7    mem::{self, MaybeUninit},
8    panic::{RefUnwindSafe, UnwindSafe},
9    ptr, slice,
10    sync::atomic::{
11        AtomicU32, AtomicUsize,
12        Ordering::{Acquire, Relaxed, Release},
13    },
14};
15use virtual_buffer::{align_up, page_size, Allocation, Error};
16use TryReserveErrorKind::{AllocError, CapacityOverflow};
17
18pub(crate) struct Vec<T> {
19    allocation: virtual_buffer::Allocation,
20    slots: *mut u8,
21    max_capacity: u32,
22    capacity: AtomicU32,
23    reserved_len: AtomicUsize,
24    marker: PhantomData<Slot<T>>,
25}
26
27impl<T> Vec<T> {
28    #[track_caller]
29    pub fn new(max_capacity: u32) -> Self {
30        handle_reserve(Self::try_new(max_capacity))
31    }
32
33    pub fn try_new(max_capacity: u32) -> Result<Self, TryReserveError> {
34        assert!(mem::align_of::<HeaderShard>() <= page_size());
35        assert!(mem::align_of::<Slot<T>>() <= page_size());
36
37        let size = usize::try_from(max_capacity)
38            .ok()
39            .and_then(|cap| cap.checked_mul(mem::size_of::<Slot<T>>()))
40            .ok_or(CapacityOverflow)?;
41
42        #[allow(clippy::cast_possible_wrap)]
43        if size > isize::MAX as usize {
44            return Err(CapacityOverflow.into());
45        }
46
47        let header_size = SHARD_COUNT.load(Relaxed) * mem::size_of::<HeaderShard>();
48        let slots_offset = align_up(header_size, mem::align_of::<Slot<T>>());
49        let size = slots_offset.checked_add(size).ok_or(CapacityOverflow)?;
50        let size = align_up(size, page_size());
51        let initial_size = align_up(slots_offset, page_size());
52
53        let allocation = Allocation::new(size).map_err(AllocError)?;
54        allocation
55            .commit(allocation.ptr(), initial_size)
56            .map_err(AllocError)?;
57
58        let slots = allocation.ptr().wrapping_add(slots_offset);
59
60        #[allow(clippy::cast_possible_truncation)]
61        let capacity = ((initial_size - slots_offset) / mem::size_of::<Slot<T>>()) as u32;
62        let capacity = cmp::min(capacity, max_capacity);
63
64        let mut vec = Vec {
65            allocation,
66            slots,
67            max_capacity,
68            capacity: AtomicU32::new(capacity),
69            reserved_len: AtomicUsize::new(0),
70            marker: PhantomData,
71        };
72
73        for shard in vec.header_mut().shards_mut() {
74            *shard.free_list.get_mut() = NIL;
75        }
76
77        Ok(vec)
78    }
79
80    #[inline]
81    pub(crate) fn as_ptr(&self) -> *const Slot<T> {
82        self.slots.cast()
83    }
84
85    #[inline]
86    fn as_mut_ptr(&mut self) -> *mut Slot<T> {
87        self.slots.cast()
88    }
89
90    #[inline]
91    pub fn capacity(&self) -> u32 {
92        self.capacity.load(Relaxed)
93    }
94
95    #[inline]
96    pub fn capacity_mut(&mut self) -> u32 {
97        *self.capacity.get_mut()
98    }
99
100    #[inline]
101    pub fn header(&self) -> &Header {
102        // SAFETY: `self.slots` is a valid pointer to our allocation of `Slot<V>`s.
103        unsafe { &*header_ptr_from_slots(self.slots) }
104    }
105
106    #[inline]
107    pub fn header_mut(&mut self) -> &mut Header {
108        // SAFETY: `self.slots` is a valid pointer to our allocation of `Slot<V>`s.
109        unsafe { &mut *header_ptr_from_slots(self.slots).cast_mut() }
110    }
111
112    #[track_caller]
113    pub fn push_with_tag_with(&self, tag: u32, f: impl FnOnce(SlotId) -> T) -> (SlotId, &Slot<T>) {
114        // This cannot overflow because our capacity can never exceed `isize::MAX` bytes, and
115        // because `self.reserve_for_push()` resets `self.reserved_len` back to `self.max_capacity`
116        // if it was overshot.
117        let index = self.reserved_len.fetch_add(1, Relaxed);
118
119        if index >= self.capacity() as usize {
120            self.reserve_for_push(index);
121        }
122
123        // Our capacity can never exceed `self.max_capacity`, so the index has to fit in a `u32`.
124        #[allow(clippy::cast_possible_truncation)]
125        let index = index as u32;
126        let generation = OCCUPIED_TAG | tag;
127
128        // SAFETY: We made sure that the index is in bounds above.
129        let slot = unsafe { self.get_unchecked(index) };
130
131        // SAFETY: The state tag of the generation is `OCCUPIED_TAG`.
132        let id = unsafe { SlotId::new_unchecked(index, generation) };
133
134        // SAFETY: We reserved an index by incrementing `self.reserved_len`, which means that no
135        // other threads can be attempting to write to this same slot. No other threads can be
136        // reading this slot either until we update the generation below.
137        unsafe { slot.value.get().cast::<T>().write(f(id)) };
138
139        slot.generation.store(generation, Release);
140
141        (id, slot)
142    }
143
144    #[track_caller]
145    pub fn push_with_tag_with_mut(&mut self, tag: u32, f: impl FnOnce(SlotId) -> T) -> SlotId {
146        let index = *self.reserved_len.get_mut();
147
148        // This cannot overflow because our capacity can never exceed `isize::MAX` bytes, and
149        // because `self.reserve_for_push()` resets `self.reserved_len` back to `self.max_capacity`
150        // if it was overshot.
151        *self.reserved_len.get_mut() += 1;
152
153        if index >= self.capacity() as usize {
154            self.reserve_for_push(index);
155        }
156
157        // Our capacity can never exceed `self.max_capacity`, so the index has to fit in a `u32`.
158        #[allow(clippy::cast_possible_truncation)]
159        let index = index as u32;
160        let generation = OCCUPIED_TAG | tag;
161
162        // SAFETY: We made sure that the index is in bounds above.
163        let slot = unsafe { self.get_unchecked_mut(index) };
164
165        // SAFETY: The state tag of the generation is `OCCUPIED_TAG`.
166        let id = unsafe { SlotId::new_unchecked(index, generation) };
167
168        slot.value.get_mut().write(f(id));
169
170        *slot.generation.get_mut() = generation;
171
172        id
173    }
174
175    #[inline(never)]
176    #[track_caller]
177    fn reserve_for_push(&self, len: usize) {
178        handle_reserve(self.grow_amortized(len, 1));
179    }
180
181    // TODO: What's there to amortize over? It should be linear growth.
182    fn grow_amortized(&self, len: usize, additional: usize) -> Result<(), TryReserveError> {
183        debug_assert!(additional > 0);
184
185        let required_capacity = len.checked_add(additional).ok_or(CapacityOverflow)?;
186
187        if required_capacity > self.max_capacity as usize {
188            if self.reserved_len.load(Relaxed) > self.max_capacity as usize {
189                self.reserved_len.store(self.max_capacity as usize, Relaxed);
190            }
191
192            return Err(CapacityOverflow.into());
193        }
194
195        let page_capacity = u32::try_from(page_size() / mem::size_of::<Slot<T>>()).unwrap();
196
197        // We checked that `required_capacity` doesn't exceed `self.max_capacity`, so it must fit in
198        // a `u32`.
199        #[allow(clippy::cast_possible_truncation)]
200        let new_capacity = cmp::max(self.capacity() * 2, required_capacity as u32);
201        let new_capacity = cmp::max(new_capacity, page_capacity);
202        let new_capacity = cmp::min(new_capacity, self.max_capacity);
203
204        grow(
205            &self.allocation,
206            &self.capacity,
207            new_capacity,
208            Layout::new::<Slot<T>>(),
209        )
210    }
211
212    #[inline]
213    pub fn get(&self, index: u32) -> Option<&Slot<T>> {
214        let capacity = self.capacity.load(Acquire);
215
216        if index >= capacity {
217            return None;
218        }
219
220        // SAFETY: We checked that the index is in bounds above.
221        let ptr = unsafe { self.as_ptr().add(index as usize) };
222
223        // SAFETY: We know that newly-allocated pages are zeroed, so we don't need to intialize the
224        // `Slot<T>` as it allows being zeroed. The `Acquire` ordering above synchronizes with the
225        // `Release` ordering when setting the capacity, making sure that the reserved capacity is
226        // visible here.
227        Some(unsafe { &*ptr })
228    }
229
230    #[inline]
231    pub unsafe fn get_unchecked(&self, index: u32) -> &Slot<T> {
232        let _capacity = self.capacity.load(Acquire);
233
234        // SAFETY: The caller must ensure that the index is in bounds.
235        let ptr = unsafe { self.as_ptr().add(index as usize) };
236
237        // SAFETY: Same as in `get` above.
238        unsafe { &*ptr }
239    }
240
241    #[inline]
242    pub fn get_mut(&mut self, index: u32) -> Option<&mut Slot<T>> {
243        let capacity = self.capacity_mut();
244
245        if index >= capacity {
246            return None;
247        }
248
249        // SAFETY: We checked that the index is in bounds above.
250        let ptr = unsafe { self.as_mut_ptr().add(index as usize) };
251
252        // SAFETY: We know that newly-allocated pages are zeroed, so we don't need to intialize the
253        // `Slot<T>` as it allows being zeroed. The mutable reference ensures synchronization in
254        // this case.
255        Some(unsafe { &mut *ptr })
256    }
257
258    #[inline]
259    pub unsafe fn get_unchecked_mut(&mut self, index: u32) -> &mut Slot<T> {
260        // SAFETY: The caller must ensure that the index is in bounds.
261        let ptr = unsafe { self.as_mut_ptr().add(index as usize) };
262
263        // SAFETY: Same as in `get_mut` above.
264        unsafe { &mut *ptr }
265    }
266
267    #[inline]
268    pub fn iter(&self) -> slice::Iter<'_, Slot<T>> {
269        let capacity = self.capacity.load(Acquire) as usize;
270
271        // SAFETY: We know that newly-allocated pages are zeroed, so we don't need to intialize the
272        // `Slot<T>` as it allows being zeroed. The `Acquire` ordering above synchronizes with the
273        // `Release` ordering when setting the capacity, making sure that the reserved capacity is
274        // visible here.
275        unsafe { slice::from_raw_parts(self.as_ptr(), capacity) }.iter()
276    }
277
278    #[inline]
279    pub fn iter_mut(&mut self) -> slice::IterMut<'_, Slot<T>> {
280        let capacity = self.capacity_mut() as usize;
281
282        // SAFETY: We know that newly-allocated pages are zeroed, so we don't need to intialize the
283        // `Slot<T>` as it allows being zeroed. The mutable reference ensures synchronization in
284        // this case.
285        unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), capacity) }.iter_mut()
286    }
287}
288
289#[inline]
290pub(crate) unsafe fn header_ptr_from_slots(slots: *const u8) -> *const Header {
291    let shard_count = SHARD_COUNT.load(Relaxed);
292    let header_size = shard_count * mem::size_of::<HeaderShard>();
293
294    // SAFETY: The caller must ensure that `slots` is a valid pointer to the allocation of
295    // `Slot<V>`s, and that allocation has the `Header` right before the start of the slots.
296    let header = unsafe { slots.sub(header_size) }.cast::<HeaderShard>();
297
298    ptr::slice_from_raw_parts(header, shard_count) as *const Header
299}
300
301#[inline(never)]
302fn grow(
303    allocation: &Allocation,
304    capacity: &AtomicU32,
305    new_capacity: u32,
306    element_layout: Layout,
307) -> Result<(), TryReserveError> {
308    let old_capacity = capacity.load(Relaxed);
309
310    if old_capacity >= new_capacity {
311        // Another thread beat us to it.
312        return Ok(());
313    }
314
315    let page_size = page_size();
316
317    let header_size = SHARD_COUNT.load(Relaxed) * mem::size_of::<HeaderShard>();
318    let slots_offset = align_up(header_size, element_layout.align());
319    let old_size = slots_offset + old_capacity as usize * element_layout.size();
320    let new_size = usize::try_from(new_capacity)
321        .ok()
322        .and_then(|cap| cap.checked_mul(element_layout.size()))
323        .ok_or(CapacityOverflow)?;
324    let new_size = slots_offset.checked_add(new_size).ok_or(CapacityOverflow)?;
325
326    if new_size > allocation.size() {
327        return Err(CapacityOverflow.into());
328    }
329
330    let old_size = align_up(old_size, page_size);
331    let new_size = align_up(new_size, page_size);
332    let ptr = allocation.ptr().wrapping_add(old_size);
333    let size = new_size - old_size;
334
335    allocation.commit(ptr, size).map_err(AllocError)?;
336
337    // The `Release` ordering synchronizes with the `Acquire` ordering in `Vec::as_slice`.
338    if let Err(capacity) = capacity.compare_exchange(old_capacity, new_capacity, Release, Relaxed) {
339        // We lost the race, but the winner must have updated the capacity same as we wanted to.
340        assert!(capacity >= new_capacity);
341    }
342
343    Ok(())
344}
345
346#[inline]
347#[track_caller]
348fn handle_reserve<T>(res: Result<T, TryReserveError>) -> T {
349    match res.map_err(|e| e.kind) {
350        Ok(x) => x,
351        Err(CapacityOverflow) => capacity_overflow(),
352        Err(AllocError(err)) => handle_alloc_error(err),
353    }
354}
355
356#[inline(never)]
357#[track_caller]
358fn capacity_overflow() -> ! {
359    panic!("capacity overflow");
360}
361
362// Dear Clippy, `Error` is 4 bytes.
363#[allow(clippy::needless_pass_by_value)]
364#[cold]
365#[track_caller]
366fn handle_alloc_error(err: Error) -> ! {
367    panic!("allocation failed: {err}");
368}
369
370pub struct Slot<V> {
371    pub(crate) generation: AtomicU32,
372    pub(crate) next_free: AtomicU32,
373    pub(crate) value: UnsafeCell<MaybeUninit<V>>,
374}
375
376impl<V: UnwindSafe> UnwindSafe for Slot<V> {}
377impl<V: RefUnwindSafe> RefUnwindSafe for Slot<V> {}
378
379// SAFETY: We only ever hand out references to a slot in the presence of a `hyaline::Guard`.
380unsafe impl<V: Sync> Sync for Slot<V> {}
381
382impl<V> Slot<V> {
383    #[inline]
384    pub fn generation(&self) -> u32 {
385        self.generation.load(Acquire)
386    }
387
388    #[inline]
389    pub fn value_ptr(&self) -> *mut V {
390        self.value.get().cast()
391    }
392
393    /// # Safety
394    ///
395    /// The value must be initialized. You can use [`generation`] to determine the state of the
396    /// slot.
397    ///
398    /// [`generation`]: Self::generation
399    #[inline(always)]
400    pub unsafe fn value_unchecked(&self) -> &V {
401        // SAFETY: The caller must ensure that access to the cell's inner value is synchronized.
402        let value = unsafe { &*self.value.get() };
403
404        // SAFETY: The caller must ensure that the slot has been initialized.
405        unsafe { value.assume_init_ref() }
406    }
407
408    /// # Safety
409    ///
410    /// The value must be initialized. You can use [`generation`] to determine the state of the
411    /// slot.
412    ///
413    /// [`generation`]: Self::generation
414    #[inline(always)]
415    pub unsafe fn value_unchecked_mut(&mut self) -> &mut V {
416        // SAFETY: The caller must ensure that the slot has been initialized.
417        unsafe { self.value.get_mut().assume_init_mut() }
418    }
419}
420
421impl<V> fmt::Debug for Slot<V> {
422    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423        f.debug_struct("Slot").finish_non_exhaustive()
424    }
425}
426
427#[derive(Debug)]
428pub struct TryReserveError {
429    kind: TryReserveErrorKind,
430}
431
432impl From<TryReserveErrorKind> for TryReserveError {
433    #[inline]
434    fn from(kind: TryReserveErrorKind) -> Self {
435        TryReserveError { kind }
436    }
437}
438
439#[derive(Debug)]
440enum TryReserveErrorKind {
441    CapacityOverflow,
442    AllocError(Error),
443}
444
445impl fmt::Display for TryReserveError {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        match self.kind {
448            CapacityOverflow => f.write_str(
449                "memory allocation failed because the computed capacity exceeded the collection's \
450                maximum",
451            ),
452            AllocError(_) => f.write_str(
453                "memory allocation failed because the operating system returned an error",
454            ),
455        }
456    }
457}
458
459impl std::error::Error for TryReserveError {
460    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
461        match &self.kind {
462            TryReserveErrorKind::CapacityOverflow => None,
463            TryReserveErrorKind::AllocError(err) => Some(err),
464        }
465    }
466}