concurrent_slotmap/
hyaline.rs

1//! This module implements the [Hyaline-1 memory reclamation technique].
2//!
3//! [Hyaline-1 memory reclamation technique]: https://arxiv.org/pdf/1905.07903
4
5use crate::{slot::Vec, Slot, SHARD_COUNT};
6use alloc::alloc::{alloc, dealloc, handle_alloc_error, realloc, Layout};
7use core::{
8    cell::{Cell, UnsafeCell},
9    fmt,
10    marker::PhantomData,
11    mem::{self, ManuallyDrop},
12    panic::RefUnwindSafe,
13    ptr, slice,
14    sync::atomic::{
15        self, AtomicPtr, AtomicUsize,
16        Ordering::{Acquire, Relaxed, Release, SeqCst},
17    },
18};
19use std::{num::NonZeroUsize, thread};
20use thread_local::ThreadLocal;
21
22// SAFETY: `usize` and `*mut Node` have the same layout.
23// TODO: Replace with `ptr::without_provanance_mut` once we bump the MSRV.
24#[allow(clippy::useless_transmute)]
25const INACTIVE: *mut Node = unsafe { mem::transmute(usize::MAX) };
26
27const MIN_RETIRED_LEN: usize = 64;
28
29pub struct CollectorHandle {
30    ptr: *mut Collector,
31}
32
33// SAFETY: `Collector` is `Send + Sync` and its lifetime is enforced with reference counting.
34unsafe impl Send for CollectorHandle {}
35
36// SAFETY: `Collector` is `Send + Sync` and its lifetime is enforced with reference counting.
37unsafe impl Sync for CollectorHandle {}
38
39impl Default for CollectorHandle {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl CollectorHandle {
46    #[must_use]
47    pub fn new() -> Self {
48        if SHARD_COUNT.load(Relaxed) == 0 {
49            let num_cpus = thread::available_parallelism()
50                .map(NonZeroUsize::get)
51                .unwrap_or(1);
52            SHARD_COUNT.store(num_cpus.next_power_of_two(), Relaxed);
53        }
54
55        let ptr = Box::into_raw(Box::new(Collector {
56            retirement_lists: ThreadLocal::new(),
57            handle_count: AtomicUsize::new(1),
58        }));
59
60        // SAFETY: We made sure that initialize the `handle_count` to `1` such that the handle's
61        // drop implementation cannot drop the `Collector` while another handle still exists.
62        unsafe { CollectorHandle { ptr } }
63    }
64
65    /// # Safety
66    ///
67    /// The returned `Guard` must not outlive any collection that uses this collector. It would
68    /// result in the `Guard`'s drop implementation attempting to free slots from the already freed
69    /// collection, resulting in a Use-After-Free. You must ensure that the `Guard` has its
70    /// lifetime bound to the collections it protects.
71    #[inline]
72    #[must_use]
73    pub unsafe fn pin(&self) -> Guard<'_> {
74        let mut is_fresh_entry = false;
75
76        let retirement_list = self.collector().retirement_lists.get_or(|| {
77            is_fresh_entry = true;
78
79            crate::set_shard_index();
80
81            RetirementList {
82                head: AtomicPtr::new(INACTIVE),
83                // SAFETY: The collector cannot be dropped until all handles have been dropped, at
84                // which point it is impossible to access this handle.
85                collector: ManuallyDrop::new(unsafe { ptr::read(self) }),
86                guard_count: Cell::new(0),
87                batch: UnsafeCell::new(LocalBatch::new()),
88            }
89        });
90
91        if is_fresh_entry {
92            atomic::fence(SeqCst);
93        }
94
95        retirement_list.pin()
96    }
97
98    #[inline]
99    fn collector(&self) -> &Collector {
100        // SAFETY: The pointer is valid.
101        unsafe { &*self.ptr }
102    }
103}
104
105impl Clone for CollectorHandle {
106    #[inline]
107    fn clone(&self) -> Self {
108        #[allow(clippy::cast_sign_loss)]
109        if self.collector().handle_count.fetch_add(1, Relaxed) > isize::MAX as usize {
110            std::process::abort();
111        }
112
113        // SAFETY: We incremented the `handle_count` above such that the handle's drop
114        // implementation cannot drop the `Collector` while another handle still exists.
115        unsafe { CollectorHandle { ptr: self.ptr } }
116    }
117}
118
119impl fmt::Debug for CollectorHandle {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.debug_struct("CollectorHandle").finish_non_exhaustive()
122    }
123}
124
125impl PartialEq for CollectorHandle {
126    #[inline]
127    fn eq(&self, other: &Self) -> bool {
128        self.ptr == other.ptr
129    }
130}
131
132impl Eq for CollectorHandle {}
133
134impl Drop for CollectorHandle {
135    #[inline]
136    fn drop(&mut self) {
137        if self.collector().handle_count.fetch_sub(1, Release) == 1 {
138            atomic::fence(Acquire);
139
140            // SAFETY: The handle count has gone to zero, which means that no other threads can
141            // register a new handle. The `Acquire` fence above ensures that the drop is
142            // synchronized with the above decrement, such that no access to the `Collector` can be
143            // ordered after the drop.
144            let _ = unsafe { Box::from_raw(self.ptr) };
145        }
146    }
147}
148
149struct Collector {
150    /// Per-thread retirement lists.
151    retirement_lists: ThreadLocal<RetirementList>,
152    /// The number of `CollectorHandle`s that exist.
153    handle_count: AtomicUsize,
154}
155
156/// The retirement list tracking retired batches.
157#[repr(align(128))]
158pub(crate) struct RetirementList {
159    /// The list of retired batches.
160    head: AtomicPtr<Node>,
161    /// The parent collector.
162    collector: ManuallyDrop<CollectorHandle>,
163    /// The number of `Guard`s that exist.
164    guard_count: Cell<usize>,
165    /// The current batch being prepared.
166    batch: UnsafeCell<LocalBatch>,
167}
168
169// SAFETY: The cells are never accessed concurrently.
170unsafe impl Sync for RetirementList {}
171
172// While `RetirementList` does contain interior mutability, this is never exposed to the user and
173// no invariant can be broken.
174impl RefUnwindSafe for RetirementList {}
175
176impl RetirementList {
177    #[inline]
178    pub(crate) fn pin(&self) -> Guard<'_> {
179        let guard_count = self.guard_count.get();
180        self.guard_count.set(guard_count.checked_add(1).unwrap());
181
182        if guard_count == 0 {
183            // SAFETY: The guard count was zero, which means that we couldn't have entered already.
184            unsafe { self.enter() };
185        }
186
187        // SAFETY:
188        // * We incremented the `guard_count` above such that the guard's drop implementation cannot
189        //   unpin the participant while another guard still exists.
190        // * We made sure to pin the participant if it wasn't already.
191        unsafe { Guard::new(self) }
192    }
193
194    #[inline]
195    unsafe fn enter(&self) {
196        self.head.store(ptr::null_mut(), Relaxed);
197        atomic::fence(SeqCst);
198    }
199
200    #[inline]
201    unsafe fn defer_reclaim(
202        &self,
203        index: u32,
204        slots: *const u8,
205        reclaim: unsafe fn(u32, *const u8),
206    ) {
207        // SAFETY: The caller must ensure that this method isn't called concurrently.
208        let batch = unsafe { &mut *self.batch.get() };
209
210        // SAFETY: The caller must ensure that `index` is valid and that it is not reachable
211        // anymore, that `slots` is a valid pointer to the allocation of `Slot`s, and that `reclaim`
212        // is safe to call with the `index` and `slots`.
213        unsafe { batch.push(index, slots, reclaim) };
214
215        if batch.len() == MIN_RETIRED_LEN {
216            // SAFETY: The caller must ensure that this method isn't called concurrently.
217            unsafe { self.retire() };
218        }
219    }
220
221    #[inline(never)]
222    unsafe fn retire(&self) {
223        // SAFETY: The caller must ensure that this method isn't called concurrently.
224        let batch = unsafe { &mut *self.batch.get() };
225
226        if batch.is_empty() {
227            return;
228        }
229
230        let mut batch = mem::take(batch);
231        let retired_len = batch.len();
232        let mut len = 0;
233
234        // SAFETY: `batch.len()` is the number of retired slots in the batch.
235        unsafe { batch.set_retired_len(retired_len) };
236
237        atomic::fence(SeqCst);
238
239        for retirement_list in &self.collector.collector().retirement_lists {
240            if retirement_list.head.load(Relaxed) == INACTIVE {
241                continue;
242            }
243
244            if len >= retired_len {
245                // SAFETY: This node is never getting retired since it is ouside of `retired_len`.
246                unsafe { batch.push(0, ptr::null(), |_, _| {}) };
247            }
248
249            // SAFETY: We made sure that the index is in bounds above.
250            let node = unsafe { batch.as_mut_slice().get_unchecked_mut(len) };
251
252            node.link.retirement_list = &retirement_list.head;
253
254            len += 1;
255        }
256
257        let nodes = batch.as_mut_ptr();
258        let batch = batch.into_raw();
259
260        atomic::fence(Acquire);
261
262        #[allow(clippy::mut_range_bound)]
263        'outer: for node_index in 0..len {
264            // SAFETY: The pointer is valid and the index is in bounds.
265            let node = unsafe { nodes.add(node_index) };
266
267            // SAFETY: The pointer is valid.
268            unsafe { (*node).batch = batch };
269
270            // SAFETY:
271            // * The `node` pointer is valid.
272            // * We pushed the `retirement_list` union variant into the batch above.
273            // * The `retirement_list` pointer must have staid valid because `ThreadLocal` entries
274            //   are never deinitialized.
275            let list = unsafe { &*(*node).link.retirement_list };
276
277            let mut head = list.load(Relaxed);
278
279            loop {
280                if head == INACTIVE {
281                    atomic::fence(Acquire);
282                    len -= 1;
283                    continue 'outer;
284                }
285
286                // SAFETY: The pointer is valid.
287                unsafe { (*node).link.next = head };
288
289                match list.compare_exchange_weak(head, node, Release, Relaxed) {
290                    Ok(_) => break,
291                    Err(new_head) => head = new_head,
292                }
293            }
294        }
295
296        // SAFETY: The pointer is valid.
297        if unsafe { (*batch).ref_count.fetch_add(len, Release) }.wrapping_add(len) == 0 {
298            // SAFETY: We had the last reference.
299            unsafe { self.reclaim(batch) };
300        }
301    }
302
303    #[inline]
304    unsafe fn leave(&self) {
305        let head = self.head.swap(INACTIVE, Release);
306
307        if !head.is_null() {
308            // SAFETY: The caller must ensure that this method isn't called concurrently and that
309            // there are no more references to any retired slots.
310            unsafe { self.traverse(head) };
311        }
312    }
313
314    #[cold]
315    unsafe fn traverse(&self, mut head: *mut Node) {
316        atomic::fence(Acquire);
317
318        while !head.is_null() {
319            // SAFETY: We always push valid pointers into the list.
320            let batch = unsafe { (*head).batch };
321
322            // SAFETY: We always push valid pointers into the list, and when doing so, we make sure
323            // that the `next` union variant is initialized.
324            let next = unsafe { (*head).link.next };
325
326            // SAFETY: The caller must ensure that there are no more references to any retired
327            // slots.
328            let ref_count = unsafe { (*batch).ref_count.fetch_sub(1, Release) }.wrapping_sub(1);
329
330            if ref_count == 0 {
331                // SAFETY: We had the last reference.
332                unsafe { self.reclaim(batch) };
333            }
334
335            head = next;
336        }
337    }
338
339    unsafe fn reclaim(&self, batch: *mut Batch) {
340        atomic::fence(Acquire);
341
342        // SAFETY: The caller must ensure that we own the batch.
343        let mut batch = unsafe { LocalBatch::from_raw(batch) };
344
345        for node in batch.retired_as_mut_slice() {
346            // SAFETY:
347            // * We own the batch, which means that no more references to the slots can exist.
348            // * We always push indices of existing vacant slots into the list.
349            // * `node.slots` is the same pointer that was used when pushing the node.
350            unsafe { (node.reclaim)(node.index, node.slots) };
351        }
352    }
353}
354
355impl Drop for Collector {
356    fn drop(&mut self) {
357        atomic::fence(Acquire);
358
359        for retirement_list in &mut self.retirement_lists {
360            let batch = retirement_list.batch.get_mut();
361
362            if batch.is_empty() {
363                continue;
364            }
365
366            for node in batch.retired_as_mut_slice() {
367                // SAFETY:
368                // * We have mutable access to the batch, which means that no more references to the
369                //   slots can exist.
370                // * We always push indices of existing vacant slots into the list.
371                // * `node.slots` is the same pointer that was used when pushing the node.
372                unsafe { (node.reclaim)(node.index, node.slots) };
373            }
374        }
375    }
376}
377
378/// A batch of retired slots.
379#[repr(C)]
380struct Batch {
381    /// The number of threads that can still access the slots in this batch.
382    ref_count: AtomicUsize,
383    /// The capacity of `nodes`.
384    capacity: usize,
385    /// The number of `nodes`.
386    len: usize,
387    /// The number of `nodes` that should be retired.
388    retired_len: usize,
389    /// An inline allocation of `capacity` nodes with `len` being intialized.
390    nodes: [Node; 0],
391}
392
393/// A node in the retirement list.
394struct Node {
395    link: NodeLink,
396    /// The batch that this node is a part of.
397    batch: *mut Batch,
398    /// The index of the retired slot.
399    index: u32,
400    /// A pointer to the allocation of `slot::Slot<V>`s.
401    slots: *const u8,
402    /// The reclamation function for the retired slot.
403    reclaim: unsafe fn(u32, *const u8),
404}
405
406union NodeLink {
407    /// The retirement list.
408    retirement_list: *const AtomicPtr<Node>,
409    /// The next node in the retirement list.
410    next: *mut Node,
411}
412
413struct LocalBatch {
414    ptr: *mut Batch,
415}
416
417// SAFETY: We own the batch and access to it is synchronized using mutable references.
418unsafe impl Send for LocalBatch {}
419
420// SAFETY: We own the batch and access to it is synchronized using mutable references.
421unsafe impl Sync for LocalBatch {}
422
423impl Default for LocalBatch {
424    fn default() -> Self {
425        LocalBatch::new()
426    }
427}
428
429impl LocalBatch {
430    const MIN_CAP: usize = 4;
431
432    fn new() -> Self {
433        let layout = layout_for_capacity(Self::MIN_CAP);
434
435        // SAFETY: The layout is not zero-sized.
436        let ptr = unsafe { alloc(layout) }.cast::<Batch>();
437
438        if ptr.is_null() {
439            handle_alloc_error(layout);
440        }
441
442        // SAFETY: We successfully allocated the batch.
443        unsafe {
444            *ptr::addr_of_mut!((*ptr).ref_count) = AtomicUsize::new(0);
445            *ptr::addr_of_mut!((*ptr).capacity) = Self::MIN_CAP;
446            *ptr::addr_of_mut!((*ptr).len) = 0;
447            *ptr::addr_of_mut!((*ptr).retired_len) = 0;
448        }
449
450        LocalBatch { ptr }
451    }
452
453    #[inline]
454    unsafe fn from_raw(ptr: *mut Batch) -> Self {
455        LocalBatch { ptr }
456    }
457
458    #[inline]
459    fn into_raw(self) -> *mut Batch {
460        ManuallyDrop::new(self).ptr
461    }
462
463    #[inline]
464    fn capacity(&self) -> usize {
465        // SAFETY: The pointer is valid.
466        unsafe { (*self.ptr).capacity }
467    }
468
469    #[inline]
470    fn len(&self) -> usize {
471        // SAFETY: The pointer is valid.
472        unsafe { (*self.ptr).len }
473    }
474
475    #[inline]
476    fn retired_len(&self) -> usize {
477        // SAFETY: The pointer is valid.
478        unsafe { (*self.ptr).retired_len }
479    }
480
481    #[inline]
482    fn is_empty(&self) -> bool {
483        self.len() == 0
484    }
485
486    #[inline]
487    fn as_mut_ptr(&mut self) -> *mut Node {
488        // SAFETY: The pointer is valid.
489        unsafe { ptr::addr_of_mut!((*self.ptr).nodes) }.cast()
490    }
491
492    fn as_mut_slice(&mut self) -> &mut [Node] {
493        // SAFETY: The pointer is valid and the caller of `LocalBatch::set_len` must ensure that the
494        // length is the correct number of nodes that are initialized.
495        unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) }
496    }
497
498    #[inline]
499    fn retired_as_mut_slice(&mut self) -> &mut [Node] {
500        // SAFETY: The pointer is valid and the caller of `LocalBatch::set_retired_len` must ensure
501        // that the length is the correct number of nodes that should be retired.
502        unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.retired_len()) }
503    }
504
505    #[inline]
506    unsafe fn push(&mut self, index: u32, slots: *const u8, reclaim: unsafe fn(u32, *const u8)) {
507        let len = self.len();
508
509        if len == self.capacity() {
510            self.grow_one();
511        }
512
513        let node = Node {
514            link: NodeLink {
515                retirement_list: ptr::null(),
516            },
517            batch: ptr::null_mut(),
518            index,
519            slots,
520            reclaim,
521        };
522
523        // SAFETY: We made sure that the index is in bounds above.
524        unsafe { self.as_mut_ptr().add(len).write(node) };
525
526        // SAFETY: We wrote the new element above.
527        unsafe { self.set_len(len + 1) };
528    }
529
530    #[inline(never)]
531    fn grow_one(&mut self) {
532        let capacity = self.capacity();
533        let new_capacity = capacity * 2;
534        let layout = layout_for_capacity(capacity);
535        let new_layout = layout_for_capacity(new_capacity);
536
537        // SAFETY:
538        // * `self.ptr` was allocated via the global allocator.
539        // * `layout` is the current layout.
540        // * `new_layout.size()`, when rounded up to the nearest multiple of `new_layout.align()`,
541        //   cannot overflow `isize` since we used `Layout` for the layout calculation.
542        let new_ptr = unsafe { realloc(self.ptr.cast(), layout, new_layout.size()) };
543
544        if new_ptr.is_null() {
545            handle_alloc_error(new_layout);
546        }
547
548        self.ptr = new_ptr.cast();
549
550        // SAFETY: The pointer is valid.
551        unsafe { (*self.ptr).capacity = new_capacity };
552    }
553
554    #[inline]
555    unsafe fn set_len(&mut self, len: usize) {
556        // SAFETY: The pointer is valid.
557        unsafe { (*self.ptr).len = len };
558    }
559
560    #[inline]
561    unsafe fn set_retired_len(&mut self, len: usize) {
562        // SAFETY: The pointer is valid.
563        unsafe { (*self.ptr).retired_len = len };
564    }
565}
566
567impl Drop for LocalBatch {
568    fn drop(&mut self) {
569        let layout = layout_for_capacity(self.capacity());
570
571        // SAFETY:
572        // * `self.ptr` was allocated using the global allocator.
573        // * `layout` is the layout of the allocation.
574        unsafe { dealloc(self.ptr.cast(), layout) };
575    }
576}
577
578fn layout_for_capacity(capacity: usize) -> Layout {
579    Layout::new::<Batch>()
580        .extend(Layout::array::<Node>(capacity).unwrap())
581        .unwrap()
582        .0
583}
584
585pub struct Guard<'a> {
586    retirement_list: &'a RetirementList,
587    marker: PhantomData<*const ()>,
588}
589
590impl<'a> Guard<'a> {
591    #[inline]
592    unsafe fn new(retirement_list: &'a RetirementList) -> Self {
593        Guard {
594            retirement_list,
595            marker: PhantomData,
596        }
597    }
598
599    #[inline]
600    #[must_use]
601    pub fn collector(&self) -> &CollectorHandle {
602        &self.retirement_list.collector
603    }
604
605    #[inline]
606    pub(crate) unsafe fn defer_reclaim<V>(&self, index: u32, slots: &Vec<V>) {
607        let slots = slots.as_ptr().cast();
608        let reclaim = transmute_reclaim_fp(crate::reclaim::<V>);
609
610        // SAFETY:
611        // * `Guard` is `!Send + !Sync`, so this cannot be called concurrently.
612        // * The caller must ensure that `index` is valid and that it is not reachable anymore.
613        // * `slots` is a valid pointer to the allocation of `Slot<V>`s.
614        // * The caller must ensure that `reclaim` is safe to call with the `index` and `slots`.
615        unsafe { self.retirement_list.defer_reclaim(index, slots, reclaim) };
616    }
617
618    #[inline]
619    pub(crate) unsafe fn defer_reclaim_invalidated<V>(&self, index: u32, slots: &Vec<V>) {
620        let slots = slots.as_ptr().cast();
621        let reclaim = transmute_reclaim_fp(crate::reclaim_invalidated::<V>);
622
623        // SAFETY:
624        // * `Guard` is `!Send + !Sync`, so this cannot be called concurrently.
625        // * The caller must ensure that `index` is valid and that it is not reachable anymore.
626        // * `slots` is a valid pointer to the allocation of `Slot<V>`s.
627        // * The caller must ensure that `reclaim` is safe to call with the `index` and `slots`.
628        unsafe { self.retirement_list.defer_reclaim(index, slots, reclaim) }
629    }
630
631    #[inline]
632    pub fn flush(&self) {
633        // SAFETY: `Guard` is `!Send + !Sync`, so this cannot be called concurrently.
634        unsafe { self.retirement_list.retire() };
635    }
636}
637
638fn transmute_reclaim_fp<V>(fp: unsafe fn(u32, *const Slot<V>)) -> unsafe fn(u32, *const u8) {
639    // SAFETY: Pointers have the same ABI for all sized pointees.
640    unsafe { mem::transmute::<unsafe fn(u32, *const Slot<V>), unsafe fn(u32, *const u8)>(fp) }
641}
642
643impl fmt::Debug for Guard<'_> {
644    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645        f.debug_struct("Guard").finish_non_exhaustive()
646    }
647}
648
649impl Clone for Guard<'_> {
650    #[inline]
651    fn clone(&self) -> Self {
652        let guard_count = self.retirement_list.guard_count.get();
653        self.retirement_list
654            .guard_count
655            .set(guard_count.checked_add(1).unwrap());
656
657        // SAFETY:
658        // * We incremented the `guard_count` above, such that the guard's drop implementation
659        //   cannot unpin the participant while another guard still exists.
660        // * The participant is already pinned as this guard's existence is a proof of that.
661        unsafe { Guard::new(self.retirement_list) }
662    }
663}
664
665impl Drop for Guard<'_> {
666    #[inline]
667    fn drop(&mut self) {
668        let guard_count = self.retirement_list.guard_count.get();
669        self.retirement_list.guard_count.set(guard_count - 1);
670
671        if guard_count == 1 {
672            // SAFETY:
673            // * `Guard` is `!Send + !Sync`, so this cannot be called concurrently.
674            // * We are dropping the last guard, so there cannot be any more references to any
675            //   retired slots.
676            unsafe { self.retirement_list.leave() };
677        }
678    }
679}