vec_belt/
lib.rs

1#![allow(internal_features)]
2#![cfg_attr(docsrs, feature(rustdoc_internals))]
3#![doc = include_str!("../README.md")]
4#![cfg_attr(doc, deny(missing_docs))]
5#![no_std]
6extern crate alloc;
7
8use alloc::{
9    alloc::{Layout, LayoutError, alloc, dealloc, handle_alloc_error},
10    boxed::Box,
11    vec::Vec,
12};
13use core::{
14    cell::UnsafeCell,
15    fmt,
16    marker::PhantomData,
17    mem::{ManuallyDrop, MaybeUninit},
18    ops::{Deref, DerefMut},
19    panic::{RefUnwindSafe, UnwindSafe},
20    ptr::{NonNull, addr_eq, null_mut, slice_from_raw_parts, slice_from_raw_parts_mut},
21    sync::atomic::{AtomicUsize, Ordering::*},
22};
23
24use crossbeam_utils::{Backoff, CachePadded};
25use likely_stable::{LikelyResult, likely, unlikely};
26
27/// Most significant bit used as a resource acquisition flag.
28const FLAG: usize = 1 << (usize::BITS - 1);
29/// Available bits used as index, fitting [`isize::MAX`].
30const MASK: usize = !FLAG;
31
32/// Dynamically-sized fragment that backs up the memory for [`VecBelt<T>`].
33#[repr(C)]
34struct Fragment<T> {
35    /// Pointer to the next fragment. If this is [`null`](null_mut), then [`len`](Self::len) is `0`
36    /// and therefore invalid.
37    next: *mut Self,
38    /// How many initialized elements are contained within this fragment.
39    len: usize,
40    /// The actual backing memory for elements. This is stored as a direct slice and not a pointer
41    /// to improve cache locality.
42    data: [MaybeUninit<T>],
43}
44
45impl<T> Fragment<T> {
46    /// Returns the [`Layout`] for fragments of the given capacity.
47    #[inline]
48    fn layout(capacity: usize) -> Result<Layout, LayoutError> {
49        // Extend the layout field-by-field...
50        let (layout, ..) = Layout::new::<*mut Self>().extend(Layout::new::<usize>())?;
51        let (layout, ..) = layout.extend(Layout::array::<MaybeUninit<T>>(capacity)?)?;
52
53        // ...and pad it to its alignment, as a requirement for `repr(C)`.
54        Ok(layout.pad_to_align())
55    }
56
57    /// Allocates a new fragment of the given capacity, returning a non-null pointer to it and its
58    /// data. Initializes [`next`](Self::next) to [`null`](null_mut) and [`len`](Self::len) to `0`.
59    #[inline]
60    fn new(capacity: usize) -> Result<(NonNull<Self>, *mut T), LayoutError> {
61        let layout = Self::layout(capacity)?;
62
63        // Currently, allocation failures are treated as fatal. This might change in the future.
64        let ptr = slice_from_raw_parts_mut(unsafe { alloc(layout) } as *mut (), capacity) as *mut Self;
65        if ptr.is_null() {
66            handle_alloc_error(layout)
67        }
68
69        // Initialize fields accordingly.
70        unsafe {
71            (&raw mut (*ptr).next).write(slice_from_raw_parts_mut(null_mut::<()>(), 0) as *mut Self);
72            (&raw mut (*ptr).len).write(0)
73        }
74
75        Ok(unsafe { (NonNull::new_unchecked(ptr), &raw mut (*ptr).data as *mut T) })
76    }
77}
78
79/// A high-performant, concurrent, lock-free data structure suitable for multi-threaded
80/// bulk-appending (takes a `&self`) and single-threaded consuming (takes a `&mut self`).
81///
82/// See the [module-level](crate) documentation for more details.
83pub struct VecBelt<T> {
84    /// The total length of all fragments before the tail. This is used to offset the current total
85    /// length to reside within the tail fragment.
86    preceding_len: UnsafeCell<usize>,
87    /// The synchronization primitive, contains the current total length and an [acquisition
88    /// bit](FLAG).
89    len: CachePadded<AtomicUsize>,
90    /// The first fragment that contains elements starting from index 0.
91    head: NonNull<Fragment<T>>,
92    /// The tail fragment that threads are currently modifying.
93    tail: UnsafeCell<NonNull<Fragment<T>>>,
94}
95
96impl<T> fmt::Debug for VecBelt<T> {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        f.debug_struct(core::any::type_name::<T>())
99            .field("len", &self.len())
100            .finish_non_exhaustive()
101    }
102}
103
104/// # Safety
105///
106/// [`VecBelt<T>`] provides synchronization primitives that won't lead to data races.
107unsafe impl<T: Send> Send for VecBelt<T> {}
108/// # Safety
109///
110/// `T: Sync` isn't necessary here, because there is guaranteed not to be references to the same
111/// elements across threads.
112unsafe impl<T: Send> Sync for VecBelt<T> {}
113
114impl<T> UnwindSafe for VecBelt<T> {}
115impl<T> RefUnwindSafe for VecBelt<T> {}
116
117impl<T> VecBelt<T> {
118    /// Creates a new [`VecBelt<T>`], preallocating a fragment of the given initial size.
119    #[inline]
120    pub fn new(initial_size: usize) -> Self {
121        let (head, ..) = Fragment::new(initial_size).expect("couldn't allocate a fragment");
122        Self {
123            preceding_len: UnsafeCell::new(0),
124            len: CachePadded::new(AtomicUsize::new(0)),
125            head,
126            tail: UnsafeCell::new(head),
127        }
128    }
129
130    /// Reads the length of the vector atomically. Prefer [`len_mut`](Self::len_mut) if possible.
131    #[inline]
132    pub fn len(&self) -> usize {
133        self.len.load(Relaxed) & MASK
134    }
135
136    /// Tests whether the vector is empty atomically. Prefer [`is_empty_mut`](Self::is_empty_mut) if
137    /// possible.
138    #[inline]
139    pub fn is_empty(&self) -> bool {
140        self.len.load(Relaxed) & MASK == 0
141    }
142
143    /// Reads the length of the vector non-atomically via `&mut self`.
144    #[inline]
145    pub fn len_mut(&mut self) -> usize {
146        *self.len.get_mut() & MASK
147    }
148
149    /// Tests whether the vector is empty non-atomically via `&mut self`.
150    #[inline]
151    pub fn is_empty_mut(&mut self) -> bool {
152        *self.len.get_mut() & MASK == 0
153    }
154
155    /// Extends the vector by `additional` elements, returning an uninitialized slice to it and the
156    /// starting element's absolute index in the vector.
157    ///
158    /// # Safety
159    /// `additional` must be less or equal to [`isize::MAX`].
160    unsafe fn append_raw_erased(&self, additional: usize) -> (*mut T, usize) {
161        let backoff = Backoff::new();
162        let preceding_len = self.preceding_len.get();
163        let tail = self.tail.get();
164
165        return loop {
166            unsafe {
167                // Queries the current length of the vector, ignoring the flag.
168                let len = self.len.load(Relaxed) & MASK;
169                let new_len = len.unchecked_add(additional);
170
171                if unlikely(new_len & FLAG == FLAG) {
172                    panic!("too many elements")
173                }
174
175                // If the current length is exactly `len` without the flag, then store it as `new_len` with the
176                // flag, momentarily locking the vector...
177                if self.len.compare_exchange(len, new_len | FLAG, Acquire, Relaxed).is_err() {
178                    // ...otherwise, we need to wait for the other worker to finish unlocking.
179                    backoff.snooze();
180                    continue
181                }
182
183                // `self.len` is the absolute total length, while we want an index relative to the current tail
184                // fragment's starting index. This is done by reading `preceding` and subtracting `len` with it.
185                let preceding = *preceding_len;
186                let data = &raw mut (*(*tail).as_ptr()).data;
187
188                if likely(data.len() >= new_len - preceding) {
189                    // Immediately release the lock, because the fragment fits and the data slice we requested is
190                    // guaranteed not to be aliased by this atomic store.
191                    self.len.store(new_len, Release);
192                    break ((data as *mut T).add(len - preceding), len)
193                } else {
194                    // Try to allocate a new fragment and linking to it, releasing the lock as soon as possible.
195                    break new_fragment(len, additional, preceding, preceding_len, &self.len, tail)
196                }
197            }
198        };
199
200        #[cold]
201        unsafe fn new_fragment<T>(
202            len: usize,
203            additional: usize,
204            preceding: usize,
205            this_preceding_len: *mut usize,
206            this_len: &AtomicUsize,
207            this_tail: *mut NonNull<Fragment<T>>,
208        ) -> (*mut T, usize) {
209            // First, allocate a new fragment that fits...
210            let (new_tail, new_data) = Fragment::<T>::new(len + additional).unwrap_or_else_likely(|_| {
211                this_len.store(len, Release);
212                panic!("couldn't allocate a fragment")
213            });
214
215            unsafe {
216                // ...then, write `preceding_len` to the current length and `tail` to the new pointer.
217                this_preceding_len.replace(len);
218                let tail = this_tail.replace(new_tail).as_ptr();
219
220                // Immediately release the lock so other threads can continue working.
221                this_len.store(len + additional, Release);
222
223                // These fields aren't written within the lock because we know subsequent operations will not ever
224                // access this fragment in particular, therefore eliminating mutable aliasing.
225                (&raw mut (*tail).next).write(new_tail.as_ptr());
226                (&raw mut (*tail).len).write(len - preceding)
227            }
228
229            (new_data, len)
230        }
231    }
232
233    /// Extends the vector by `additional` elements, invoking a closure with an uninitialized slice
234    /// to it and the starting element's absolute index in the vector.
235    ///
236    /// # Safety
237    /// - `additional` must be less or equal to [`isize::MAX`].
238    /// - `acceptor` must fully initialize and not read the given pointer for `additional`
239    ///   consecutive items. Notably, this means [`panic!(..)`](panic)-ing mid-way will lead to
240    ///   **undefined behavior** due to uninitialized elements present.
241    #[inline]
242    pub unsafe fn append_raw(&self, additional: usize, acceptor: impl FnOnce(*mut T)) -> usize {
243        let (data, index) = unsafe { self.append_raw_erased(additional) };
244
245        acceptor(data);
246        index
247    }
248
249    /// Appends a slice to this vector, returning the absolute index of the first element.
250    #[inline]
251    pub fn append(&self, transfer: impl Transfer<T>) -> usize {
252        let len = transfer.len();
253        if unlikely(len & FLAG == FLAG) {
254            panic!("too many elements")
255        }
256
257        unsafe { self.append_raw(len, |ptr| transfer.transfer(len, ptr)) }
258    }
259
260    /// Flattens the vector (if it isn't flattened already), invokes a closure with an owned slice,
261    /// and clears the vector.
262    #[inline]
263    pub fn clear<'a, R>(&'a mut self, consumer: impl FnOnce(ConsumeSlice<'a, T>) -> R) -> R {
264        // First, get and replace both `preceding` and `current` to `0`, marking the vector empty.
265        let preceding = core::mem::replace(self.preceding_len.get_mut(), 0);
266        let current = core::mem::replace(self.len.get_mut(), 0);
267
268        // If the head and tail points to the same fragment, then the vector is already flattened.
269        let head = self.head.as_ptr();
270        let slice = if likely(addr_eq(head, self.tail.get_mut().as_ptr())) {
271            slice_from_raw_parts_mut(unsafe { &raw mut (*head).data as *mut T }, current)
272        } else {
273            unsafe { merge(&mut self.head, self.tail.get_mut(), preceding, current) }
274        };
275
276        return consumer(ConsumeSlice {
277            slice,
278            _marker: PhantomData,
279        });
280
281        #[cold]
282        unsafe fn merge<T>(
283            head: &mut NonNull<Fragment<T>>,
284            tail: &mut NonNull<Fragment<T>>,
285            preceding: usize,
286            current: usize,
287        ) -> *mut [T] {
288            // First, allocate a fragment that fits the current total length...
289            let (new_head, mut new_data) =
290                Fragment::<T>::new(current).unwrap_or_else_likely(|_| panic!("couldn't allocate a fragment"));
291
292            // ...then, replace the head and tail pointer with this new fragment...
293            let start_data = new_data;
294            let mut node = core::mem::replace(head, new_head).as_ptr();
295            *tail = new_head;
296
297            // ...and finally, collect and deallocate all fragments.
298            loop {
299                unsafe {
300                    let next = (*node).next;
301                    let data = &raw const (*node).data;
302
303                    // `next.is_null()` implies `append()` hasn't written to `node->len` yet, so we have to rely on the
304                    // current total length subtracted the preceding fragments' total length.
305                    let len = if next.is_null() { current - preceding } else { (*node).len };
306
307                    // Copy the data to the new fragment without dropping anything.
308                    new_data.copy_from_nonoverlapping(data as *const T, len);
309                    new_data = new_data.add(len);
310
311                    // Deallocate the fragment. `unwrap_unchecked()` is safe here, because if a fragment has been
312                    // allocated by such a layout, then the same layout may be created again with no issue.
313                    dealloc(node as *mut u8, Fragment::<T>::layout(data.len()).unwrap_unchecked());
314                    node = match next {
315                        // If this is the last fragment, return the fully initialized memory slice ready to be used.
316                        ptr if ptr.is_null() => break slice_from_raw_parts_mut(start_data, current),
317                        ptr => ptr,
318                    };
319                }
320            }
321        }
322    }
323}
324
325impl<T> Drop for VecBelt<T> {
326    fn drop(&mut self) {
327        let preceeding = *self.preceding_len.get_mut();
328        let current = *self.len.get_mut();
329        let mut node = self.head.as_ptr();
330
331        loop {
332            let next = unsafe { (*node).next };
333            let data = unsafe { &raw mut (*node).data };
334
335            // `next.is_null()` implies `append()` hasn't written to `node->len` yet, so we have to rely on the
336            // current total length subtracted the preceeding fragments' total length.
337            let taken = if next.is_null() { current - preceeding } else { unsafe { (*node).len } };
338
339            unsafe {
340                // Drop `taken` elements starting from the first index in the fragment, as the rest of the data are
341                // uninitialized.
342                slice_from_raw_parts_mut(data as *mut T, taken).drop_in_place();
343
344                // Deallocate the fragment. `unwrap_unchecked()` is safe here, because if a fragment has been
345                // allocated by such a layout, then the same layout may be created again with no issue.
346                dealloc(node as *mut u8, Fragment::<T>::layout(data.len()).unwrap_unchecked());
347            }
348
349            node = match next {
350                ptr if ptr.is_null() => break,
351                ptr => ptr,
352            };
353        }
354    }
355}
356
357/// Types that may be passed to [`append`](VecBelt::append).
358///
359/// This trait is implemented on collections that coerce into a slice (`[T]` or `&[T] where T:
360/// Copy`), which notably don't include most iterator adapters. This is because
361/// [`append`](VecBelt::append) requires all transfer operations to not diverge (e.g.
362/// [`panic!(..)`](panic)), otherwise the fragment slice may end up with uninitialized memory which
363/// will lead to **undefined behavior** when used or even dropped.
364///
365/// # Safety
366/// - [`len`](Transfer::len) must return **precisely** how much elements this slice will copy. In
367///   particular, the returned value will by passed to [`transfer`](Transfer::transfer).
368/// - [`transfer`](Transfer::transfer) must write exactly `len` initialized elements to `dst`, and
369///   must **not diverge**.
370#[allow(clippy::len_without_is_empty)]
371pub unsafe trait Transfer<T> {
372    /// Returns the length of the slice.
373    fn len(&self) -> usize;
374
375    /// Transfer contents of this slice into the destination pointer.
376    ///
377    /// # Safety
378    /// - `len` must be the same as what [`len`](Transfer::len) returns.
379    /// - `dst` must be a pointer valid for writing `len` elements, and must not overlap this slice.
380    unsafe fn transfer(self, len: usize, dst: *mut T);
381}
382
383unsafe impl<T, const LEN: usize> Transfer<T> for [T; LEN] {
384    #[inline]
385    fn len(&self) -> usize {
386        LEN
387    }
388
389    #[inline]
390    unsafe fn transfer(self, len: usize, dst: *mut T) {
391        unsafe { dst.copy_from_nonoverlapping(self.as_ptr(), len) }
392        core::mem::forget(self)
393    }
394}
395
396unsafe impl<T, const LEN: usize> Transfer<T> for core::array::IntoIter<T, LEN> {
397    #[inline]
398    fn len(&self) -> usize {
399        LEN
400    }
401
402    #[inline]
403    unsafe fn transfer(self, len: usize, dst: *mut T) {
404        unsafe { dst.copy_from_nonoverlapping(self.as_slice().as_ptr(), len) }
405        self.for_each(core::mem::forget)
406    }
407}
408
409unsafe impl<T> Transfer<T> for Box<[T]> {
410    #[inline]
411    fn len(&self) -> usize {
412        <[T]>::len(self)
413    }
414
415    #[inline]
416    unsafe fn transfer(self, len: usize, dst: *mut T) {
417        let ptr = Box::into_raw(self);
418
419        unsafe {
420            dst.copy_from_nonoverlapping(ptr as *const T, len);
421            _ = Box::from_raw(ptr as *mut MaybeUninit<T>)
422        }
423    }
424}
425
426unsafe impl<T> Transfer<T> for Vec<T> {
427    #[inline]
428    fn len(&self) -> usize {
429        <Vec<T>>::len(self)
430    }
431
432    #[inline]
433    unsafe fn transfer(mut self, len: usize, dst: *mut T) {
434        unsafe {
435            dst.copy_from_nonoverlapping(self.as_ptr(), len);
436            self.set_len(0)
437        }
438    }
439}
440
441unsafe impl<T> Transfer<T> for alloc::vec::IntoIter<T> {
442    #[inline]
443    fn len(&self) -> usize {
444        <Self as ExactSizeIterator>::len(self)
445    }
446
447    #[inline]
448    unsafe fn transfer(self, len: usize, dst: *mut T) {
449        unsafe {
450            dst.copy_from_nonoverlapping(self.as_slice().as_ptr(), len);
451            self.for_each(core::mem::forget)
452        }
453    }
454}
455
456unsafe impl<T> Transfer<T> for alloc::vec::Drain<'_, T> {
457    #[inline]
458    fn len(&self) -> usize {
459        <Self as ExactSizeIterator>::len(self)
460    }
461
462    #[inline]
463    unsafe fn transfer(self, len: usize, dst: *mut T) {
464        unsafe {
465            dst.copy_from_nonoverlapping(self.as_slice().as_ptr(), len);
466            self.for_each(core::mem::forget)
467        }
468    }
469}
470
471unsafe impl<T: Copy> Transfer<T> for &[T] {
472    #[inline]
473    fn len(&self) -> usize {
474        <[T]>::len(self)
475    }
476
477    #[inline]
478    unsafe fn transfer(self, len: usize, dst: *mut T) {
479        unsafe { dst.copy_from_nonoverlapping(self.as_ptr(), len) }
480    }
481}
482
483unsafe impl<T: Copy> Transfer<T> for core::slice::Iter<'_, T> {
484    #[inline]
485    fn len(&self) -> usize {
486        <Self as ExactSizeIterator>::len(self)
487    }
488
489    #[inline]
490    unsafe fn transfer(self, len: usize, dst: *mut T) {
491        unsafe { dst.copy_from_nonoverlapping(self.as_slice().as_ptr(), len) }
492    }
493}
494
495/// Create a manually-implemented [`Transfer`] with a length and a transferer closure.
496///
497/// # Safety
498/// See the safety documents of [`Transfer`].
499#[inline]
500pub unsafe fn transfer<T>(len: usize, transfer: impl FnOnce(*mut T)) -> impl Transfer<T> {
501    struct Driver<T, F: FnOnce(*mut T)>(usize, F, PhantomData<fn(*mut T)>);
502    unsafe impl<T, F: FnOnce(*mut T)> Transfer<T> for Driver<T, F> {
503        #[inline]
504        fn len(&self) -> usize {
505            self.0
506        }
507
508        #[inline]
509        unsafe fn transfer(self, _: usize, dst: *mut T) {
510            self.1(dst)
511        }
512    }
513
514    Driver(len, transfer, PhantomData)
515}
516
517/// An owned slice created by [`VecBelt::append`]. Unconsumed elements will be dropped.
518pub struct ConsumeSlice<'a, T> {
519    /// Slice returned by [`VecBelt::append_raw`].
520    slice: *mut [T],
521    _marker: PhantomData<&'a mut [T]>,
522}
523
524impl<T> Drop for ConsumeSlice<'_, T> {
525    #[inline]
526    fn drop(&mut self) {
527        unsafe { self.slice.drop_in_place() }
528    }
529}
530
531impl<T> Deref for ConsumeSlice<'_, T> {
532    type Target = [T];
533
534    #[inline]
535    fn deref(&self) -> &Self::Target {
536        unsafe { &*self.slice }
537    }
538}
539
540impl<T> DerefMut for ConsumeSlice<'_, T> {
541    #[inline]
542    fn deref_mut(&mut self) -> &mut Self::Target {
543        unsafe { &mut *self.slice }
544    }
545}
546
547impl<'a, T> IntoIterator for ConsumeSlice<'a, T> {
548    type Item = T;
549    type IntoIter = ConsumeIter<'a, T>;
550
551    #[inline]
552    fn into_iter(self) -> Self::IntoIter {
553        let this = ManuallyDrop::new(self);
554
555        let len = this.slice.len();
556        let begin = this.slice as *mut T;
557        let end = unsafe { begin.add(len) };
558
559        ConsumeIter {
560            begin: unsafe { NonNull::new_unchecked(begin) },
561            end: unsafe { NonNull::new_unchecked(end) },
562            _marker: PhantomData,
563        }
564    }
565}
566
567/// Owned iterator created by [`ConsumeSlice`] that may be used to read owned elements from the
568/// slice.
569pub struct ConsumeIter<'a, T> {
570    begin: NonNull<T>,
571    end: NonNull<T>,
572    _marker: PhantomData<&'a mut [T]>,
573}
574
575impl<T> ConsumeIter<'_, T> {
576    /// Returns the remaining immutable slice iterated by this iterator.
577    #[inline]
578    pub const fn as_slice(&self) -> &[T] {
579        unsafe { &*slice_from_raw_parts(self.begin.as_ptr(), self.end.offset_from(self.begin) as usize) }
580    }
581
582    /// Returns the remaining mutable slice iterated by this iterator.
583    #[inline]
584    pub const fn as_slice_mut(&mut self) -> &mut [T] {
585        unsafe { &mut *slice_from_raw_parts_mut(self.begin.as_ptr(), self.end.offset_from(self.begin) as usize) }
586    }
587
588    /// Returns the remaining length of the slice iterated by this iterator.
589    #[inline]
590    pub const fn slice_len(&self) -> usize {
591        unsafe { self.end.offset_from(self.begin) as usize }
592    }
593}
594
595impl<T> Iterator for ConsumeIter<'_, T> {
596    type Item = T;
597
598    #[inline]
599    fn next(&mut self) -> Option<Self::Item> {
600        if self.slice_len() == 0 {
601            return None;
602        }
603
604        unsafe {
605            let item = self.begin.read();
606            self.begin = self.begin.add(1);
607
608            Some(item)
609        }
610    }
611
612    #[inline]
613    fn size_hint(&self) -> (usize, Option<usize>) {
614        let len = self.slice_len();
615        (len, Some(len))
616    }
617}
618
619impl<T> DoubleEndedIterator for ConsumeIter<'_, T> {
620    #[inline]
621    fn next_back(&mut self) -> Option<Self::Item> {
622        if self.slice_len() == 0 {
623            return None;
624        }
625
626        unsafe {
627            self.end = self.end.sub(1);
628            Some(self.end.read())
629        }
630    }
631}
632
633impl<T> ExactSizeIterator for ConsumeIter<'_, T> {
634    #[inline]
635    fn len(&self) -> usize {
636        self.slice_len()
637    }
638}
639
640impl<T> Drop for ConsumeIter<'_, T> {
641    #[inline]
642    fn drop(&mut self) {
643        unsafe { slice_from_raw_parts_mut(self.begin.as_ptr(), self.end.offset_from(self.begin) as usize).drop_in_place() }
644    }
645}
646
647impl<'a, T> IntoIterator for &'a ConsumeSlice<'a, T> {
648    type Item = &'a T;
649    type IntoIter = core::slice::Iter<'a, T>;
650
651    #[inline]
652    fn into_iter(self) -> Self::IntoIter {
653        self.iter()
654    }
655}
656
657impl<'a, T> IntoIterator for &'a mut ConsumeSlice<'a, T> {
658    type Item = &'a mut T;
659    type IntoIter = core::slice::IterMut<'a, T>;
660
661    #[inline]
662    fn into_iter(self) -> Self::IntoIter {
663        self.iter_mut()
664    }
665}