Skip to main content

sdd/
bag.rs

1//! [`Bag`] is a lock-free concurrent unordered instance container.
2
3use std::cell::UnsafeCell;
4use std::iter::FusedIterator;
5use std::mem::{MaybeUninit, needs_drop};
6use std::panic::UnwindSafe;
7use std::ptr::{self, drop_in_place};
8use std::sync::atomic::AtomicUsize;
9use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
10
11use super::{Guard, LinkedEntry, LinkedList, Stack};
12
13/// [`Bag`] is a lock-free concurrent unordered instance container.
14///
15/// [`Bag`] is a linearizable concurrent instance container where `ARRAY_LEN` instances are stored
16/// in a fixed-size array, and the rest are managed by its backup container; this makes a [`Bag`]
17/// especially efficient if the expected number of instances does not exceed `ARRAY_LEN`.
18///
19/// The maximum value of `ARRAY_LEN` is limited to `usize::BITS / 2` which is the default value.
20#[derive(Debug)]
21pub struct Bag<T, const ARRAY_LEN: usize = DEFAULT_ARRAY_LEN> {
22    /// Primary storage.
23    primary_storage: Storage<T, ARRAY_LEN>,
24    /// Fallback storage.
25    stack: Stack<Storage<T, ARRAY_LEN>>,
26}
27
28/// A mutable iterator over the entries of a [`Bag`].
29#[derive(Debug)]
30pub struct IterMut<'b, T, const ARRAY_LEN: usize = DEFAULT_ARRAY_LEN> {
31    bag: &'b mut Bag<T, ARRAY_LEN>,
32    current_index: u32,
33    current_stack_entry: Option<&'b mut LinkedEntry<Storage<T, ARRAY_LEN>>>,
34}
35
36/// An iterator that moves out of a [`Bag`].
37#[derive(Debug)]
38pub struct IntoIter<T, const ARRAY_LEN: usize = DEFAULT_ARRAY_LEN> {
39    bag: Bag<T, ARRAY_LEN>,
40}
41
42/// The default length of the fixed-size array in a [`Bag`].
43const DEFAULT_ARRAY_LEN: usize = usize::BITS as usize / 2;
44
45#[derive(Debug)]
46struct Storage<T, const ARRAY_LEN: usize> {
47    /// Storage.
48    storage: UnsafeCell<[MaybeUninit<T>; ARRAY_LEN]>,
49    /// Storage metadata.
50    ///
51    /// The layout of the metadata is,
52    /// - Upper `usize::BITS / 2` bits: initialization bitmap.
53    /// - Lower `usize::BITS / 2` bits: owned state bitmap.
54    ///
55    /// The metadata represents four possible states of a storage slot.
56    /// - `!instantiated && !owned`: initial state.
57    /// - `!instantiated && owned`: owned for instantiating.
58    /// - `instantiated && !owned`: valid and reachable.
59    /// - `instantiated && owned`: owned for moving out the instance.
60    metadata: AtomicUsize,
61}
62
63impl<T, const ARRAY_LEN: usize> Bag<T, ARRAY_LEN> {
64    /// `ARRAY_LEN` cannot be larger than `usize::BITS / 2`.
65    const CHECK_ARRAY_LEN: () = assert!(ARRAY_LEN <= (usize::BITS as usize) / 2);
66
67    /// Creates an empty [`Bag`].
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// use sdd::Bag;
73    ///
74    /// let bag: Bag<usize, 16> = Bag::new();
75    /// ```
76    #[cfg(not(feature = "loom"))]
77    #[inline]
78    #[must_use]
79    pub const fn new() -> Self {
80        let () = Self::CHECK_ARRAY_LEN;
81        Self {
82            primary_storage: Storage::new(),
83            stack: Stack::new(),
84        }
85    }
86
87    /// Creates an empty [`Bag`].
88    #[cfg(feature = "loom")]
89    #[inline]
90    #[must_use]
91    pub fn new() -> Self {
92        let () = Self::CHECK_ARRAY_LEN;
93        Self {
94            primary_storage: Storage::new(),
95            stack: Stack::new(),
96        }
97    }
98
99    /// Pushes an instance of `T`.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use sdd::Bag;
105    ///
106    /// let bag: Bag<usize> = Bag::default();
107    ///
108    /// bag.push(11);
109    /// ```
110    #[inline]
111    pub fn push(&self, val: T) {
112        if let Some(val) = self.primary_storage.push(val, true) {
113            self.stack.peek_with(|e| {
114                if let Some(storage) = e {
115                    if let Some(val) = storage.push(val, false) {
116                        unsafe {
117                            self.stack.push_unchecked(Storage::with_val(val));
118                        }
119                    }
120                } else {
121                    unsafe {
122                        self.stack.push_unchecked(Storage::with_val(val));
123                    }
124                }
125            });
126        }
127    }
128
129    /// Tries to push an instance of `T` only if the primary storage is not full.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error containing the supplied value if the primary storage is full.
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// use sdd::Bag;
139    ///
140    /// let bag: Bag<usize, 1> = Bag::new();
141    ///
142    /// assert!(bag.try_push(11).is_ok());
143    /// assert_eq!(bag.try_push(17), Err(17));
144    /// ```
145    #[inline]
146    pub fn try_push(&self, val: T) -> Result<(), T> {
147        if let Some(returned) = self.primary_storage.push(val, true) {
148            Err(returned)
149        } else {
150            Ok(())
151        }
152    }
153
154    /// Pops an instance in the [`Bag`] if not empty.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use sdd::Bag;
160    ///
161    /// let bag: Bag<usize> = Bag::default();
162    ///
163    /// bag.push(37);
164    ///
165    /// assert_eq!(bag.pop(), Some(37));
166    /// assert!(bag.pop().is_none());
167    /// ```
168    #[inline]
169    pub fn pop(&self) -> Option<T> {
170        let result = self.stack.peek_with(|e| {
171            e.and_then(|storage| {
172                let (val, empty) = storage.pop();
173                if empty {
174                    // Once marked deleted, new entries will be inserted in a new `Storage`
175                    // that may not be reachable from this one.
176                    storage.delete_self(Relaxed);
177                }
178                val
179            })
180        });
181        if let Some(val) = result {
182            return Some(val);
183        }
184        self.primary_storage.pop().0
185    }
186
187    /// Pops all the entries at once and folds them into an accumulator.
188    ///
189    /// # Examples
190    ///
191    /// ```
192    /// use sdd::Bag;
193    ///
194    /// let bag: Bag<usize> = Bag::default();
195    ///
196    /// bag.push(7);
197    /// bag.push(17);
198    /// bag.push(37);
199    ///
200    /// assert_eq!(bag.pop_all(0, |a, v| a + v), 61);
201    ///
202    /// bag.push(47);
203    /// assert_eq!(bag.pop(), Some(47));
204    /// assert!(bag.pop().is_none());
205    /// assert!(bag.is_empty());
206    /// ```
207    #[inline]
208    pub fn pop_all<B, F: FnMut(B, T) -> B>(&self, init: B, mut fold: F) -> B {
209        let mut acc = init;
210        let popped = self.stack.pop_all();
211        while let Some(storage) = popped.pop() {
212            acc = storage.pop_all(acc, &mut fold);
213        }
214        self.primary_storage.pop_all(acc, &mut fold)
215    }
216
217    /// Returns the number of entries in the [`Bag`].
218    ///
219    /// This method iterates over all the entry arrays in the [`Bag`] to count the number of
220    /// entries; therefore, its time complexity is `O(N)`.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use sdd::Bag;
226    ///
227    /// let bag: Bag<usize> = Bag::default();
228    /// assert_eq!(bag.len(), 0);
229    ///
230    /// bag.push(7);
231    /// assert_eq!(bag.len(), 1);
232    ///
233    /// for v in 0..64 {
234    ///    bag.push(v);
235    /// }
236    /// bag.pop();
237    /// assert_eq!(bag.len(), 64);
238    /// ```
239    #[inline]
240    pub fn len(&self) -> usize {
241        self.stack
242            .iter(&Guard::new())
243            .fold(self.primary_storage.len(), |acc, storage| {
244                acc + storage.len()
245            })
246    }
247
248    /// Returns `true` if the [`Bag`] is empty.
249    ///
250    /// # Examples
251    ///
252    /// ```
253    /// use sdd::Bag;
254    ///
255    /// let bag: Bag<usize> = Bag::default();
256    /// assert!(bag.is_empty());
257    ///
258    /// bag.push(7);
259    /// assert!(!bag.is_empty());
260    ///
261    /// assert_eq!(bag.pop(), Some(7));
262    /// assert!(bag.is_empty());
263    /// ```
264    #[inline]
265    pub fn is_empty(&self) -> bool {
266        if self.primary_storage.len() == 0 {
267            self.stack.is_empty()
268        } else {
269            false
270        }
271    }
272
273    /// Returns an iterator over the contained instances for modification.
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// use sdd::Bag;
279    ///
280    /// let mut bag: Bag<usize> = Bag::default();
281    ///
282    /// bag.push(3);
283    /// bag.push(3);
284    ///
285    /// assert_eq!(bag.iter_mut().count(), 2);
286    /// bag.iter_mut().for_each(|e| { *e += 1; });
287    ///
288    /// assert_eq!(bag.pop(), Some(4));
289    /// assert_eq!(bag.pop(), Some(4));
290    /// assert!(bag.pop().is_none());
291    /// ```
292    #[inline]
293    pub const fn iter_mut(&mut self) -> IterMut<'_, T, ARRAY_LEN> {
294        IterMut {
295            bag: self,
296            current_index: 0,
297            current_stack_entry: None,
298        }
299    }
300}
301
302impl<T> Default for Bag<T, DEFAULT_ARRAY_LEN> {
303    #[inline]
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309impl<T, const ARRAY_LEN: usize> Drop for Bag<T, ARRAY_LEN> {
310    #[inline]
311    fn drop(&mut self) {
312        if needs_drop::<T>() {
313            // It needs to drop all the stored instances in-place.
314            while let Some(v) = self.pop() {
315                drop(v);
316            }
317        }
318        for _ in 0..4 {
319            Guard::new().accelerate();
320        }
321    }
322}
323
324impl<T, const ARRAY_LEN: usize> FromIterator<T> for Bag<T, ARRAY_LEN> {
325    #[inline]
326    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
327        let into_iter = iter.into_iter();
328        let bag = Self::new();
329        into_iter.for_each(|v| {
330            bag.push(v);
331        });
332        bag
333    }
334}
335
336impl<T, const ARRAY_LEN: usize> IntoIterator for Bag<T, ARRAY_LEN> {
337    type Item = T;
338    type IntoIter = IntoIter<T, ARRAY_LEN>;
339
340    #[inline]
341    fn into_iter(self) -> Self::IntoIter {
342        IntoIter { bag: self }
343    }
344}
345
346impl<'b, T, const ARRAY_LEN: usize> IntoIterator for &'b mut Bag<T, ARRAY_LEN> {
347    type IntoIter = IterMut<'b, T, ARRAY_LEN>;
348    type Item = &'b mut T;
349
350    #[inline]
351    fn into_iter(self) -> Self::IntoIter {
352        self.iter_mut()
353    }
354}
355
356impl<T, const ARRAY_LEN: usize> FusedIterator for IterMut<'_, T, ARRAY_LEN> {}
357
358impl<'b, T, const ARRAY_LEN: usize> Iterator for IterMut<'b, T, ARRAY_LEN> {
359    type Item = &'b mut T;
360
361    #[inline]
362    fn next(&mut self) -> Option<Self::Item> {
363        while self.current_index != u32::MAX {
364            let current_storage = if let Some(linked) = self.current_stack_entry.as_mut() {
365                &mut **linked
366            } else {
367                &mut self.bag.primary_storage
368            };
369
370            let instance_bitmap =
371                Storage::<T, ARRAY_LEN>::instance_bitmap(current_storage.metadata.load(Acquire));
372            let first_occupied =
373                (instance_bitmap.wrapping_shr(self.current_index)).trailing_zeros();
374            let next_occupied = self.current_index + first_occupied;
375            self.current_index = next_occupied + 1;
376            if (next_occupied as usize) < ARRAY_LEN {
377                return Some(unsafe {
378                    &mut *(*current_storage.storage.get())[next_occupied as usize].as_mut_ptr()
379                });
380            }
381            self.current_index = u32::MAX;
382
383            if let Some(linked) = self.current_stack_entry.as_mut() {
384                let guard = Guard::new();
385                if let Some(next) = linked.next_ptr(Acquire, &guard).as_ref() {
386                    let entry_mut = ptr::from_ref(next).cast_mut();
387                    self.current_stack_entry = unsafe { entry_mut.as_mut() };
388                    self.current_index = 0;
389                }
390            } else {
391                self.bag.stack.peek_with(|e| {
392                    if let Some(e) = e {
393                        let entry_mut = ptr::from_ref(e).cast_mut();
394                        self.current_stack_entry = unsafe { entry_mut.as_mut() };
395                        self.current_index = 0;
396                    }
397                });
398            }
399        }
400        None
401    }
402}
403
404impl<T, const ARRAY_LEN: usize> UnwindSafe for IterMut<'_, T, ARRAY_LEN> where T: UnwindSafe {}
405
406impl<T, const ARRAY_LEN: usize> FusedIterator for IntoIter<T, ARRAY_LEN> {}
407
408impl<T, const ARRAY_LEN: usize> Iterator for IntoIter<T, ARRAY_LEN> {
409    type Item = T;
410
411    #[inline]
412    fn next(&mut self) -> Option<Self::Item> {
413        self.bag.pop()
414    }
415}
416
417impl<T, const ARRAY_LEN: usize> UnwindSafe for IntoIter<T, ARRAY_LEN> where T: UnwindSafe {}
418
419impl<T, const ARRAY_LEN: usize> Storage<T, ARRAY_LEN> {
420    /// Creates a new [`Storage`].
421    const fn new() -> Self {
422        #[allow(clippy::uninit_assumed_init)]
423        Storage {
424            storage: unsafe { MaybeUninit::uninit().assume_init() },
425            metadata: AtomicUsize::new(0),
426        }
427    }
428
429    /// Creates a new [`Storage`] with one inserted.
430    fn with_val(val: T) -> Self {
431        #[allow(clippy::uninit_assumed_init)]
432        let storage = Self {
433            storage: UnsafeCell::new(unsafe { MaybeUninit::uninit().assume_init() }),
434            metadata: AtomicUsize::new(1_usize << ARRAY_LEN),
435        };
436        unsafe {
437            (*storage.storage.get())[0].as_mut_ptr().write(val);
438        }
439        storage
440    }
441
442    /// Returns the number of entries.
443    fn len(&self) -> usize {
444        let metadata = self.metadata.load(Relaxed);
445        let instance_bitmap = Self::instance_bitmap(metadata);
446        let owned_bitmap = Self::owned_bitmap(metadata);
447        let valid_entries_bitmap = instance_bitmap & (!owned_bitmap);
448        valid_entries_bitmap.count_ones() as usize
449    }
450
451    /// Pushes a new value.
452    fn push(&self, val: T, allow_empty: bool) -> Option<T> {
453        let mut metadata = self.metadata.load(Relaxed);
454        'after_read_metadata: loop {
455            // Look for a free slot.
456            let mut instance_bitmap = Self::instance_bitmap(metadata);
457            let owned_bitmap = Self::owned_bitmap(metadata);
458
459            // Regard entries being removed as removed ones.
460            if !allow_empty && (instance_bitmap & !owned_bitmap) == 0 {
461                return Some(val);
462            }
463            let mut index = instance_bitmap.trailing_ones() as usize;
464            while index < ARRAY_LEN {
465                if (owned_bitmap & (1_u32 << index)) == 0 {
466                    // Mark the slot `owned`.
467                    let new = metadata | (1_usize << index);
468                    match self
469                        .metadata
470                        .compare_exchange_weak(metadata, new, Acquire, Relaxed)
471                    {
472                        Ok(_) => {
473                            // Now the free slot is owned by the thread.
474                            unsafe {
475                                (*self.storage.get())[index].as_mut_ptr().write(val);
476                            }
477                            let result = self.metadata.fetch_update(Release, Relaxed, |m| {
478                                debug_assert_ne!(m & (1_usize << index), 0);
479                                debug_assert_eq!(m & (1_usize << (index + ARRAY_LEN)), 0);
480                                if !allow_empty
481                                    && (Self::instance_bitmap(m) & !Self::owned_bitmap(m)) == 0
482                                {
483                                    // Disallow pushing a value into an empty, or a soon-to-be-emptied array.
484                                    None
485                                } else {
486                                    let new = (m & (!(1_usize << index)))
487                                        | (1_usize << (index + ARRAY_LEN));
488                                    Some(new)
489                                }
490                            });
491                            if result.is_ok() {
492                                return None;
493                            }
494
495                            // The array was empty, thus rolling back the change.
496                            let val = unsafe { (*self.storage.get())[index].as_ptr().read() };
497                            self.metadata.fetch_and(!(1_usize << index), Release);
498                            return Some(val);
499                        }
500                        Err(prev) => {
501                            // Metadata has changed.
502                            metadata = prev;
503                            continue 'after_read_metadata;
504                        }
505                    }
506                }
507
508                // Look for another free slot.
509                instance_bitmap |= 1_u32 << index;
510                index = instance_bitmap.trailing_ones() as usize;
511            }
512
513            // No free slots or all the entries are owned.
514            return Some(val);
515        }
516    }
517
518    /// Pops a value.
519    fn pop(&self) -> (Option<T>, bool) {
520        let mut metadata = self.metadata.load(Relaxed);
521        'after_read_metadata: loop {
522            // Look for an instantiated, yet to be owned entry.
523            let mut instance_bitmap_inverted = !Self::instance_bitmap(metadata);
524            let owned_bitmap = Self::owned_bitmap(metadata);
525            let mut index = instance_bitmap_inverted.trailing_ones() as usize;
526            while index < ARRAY_LEN {
527                if (owned_bitmap & (1_u32 << index)) == 0 {
528                    // Mark the slot `owned`.
529                    let new = metadata | (1_usize << index);
530                    match self
531                        .metadata
532                        .compare_exchange_weak(metadata, new, Acquire, Relaxed)
533                    {
534                        Ok(_) => {
535                            // Now the desired slot is owned by the thread.
536                            let inst = unsafe { (*self.storage.get())[index].as_ptr().read() };
537                            let mut empty = false;
538                            let result = self.metadata.fetch_update(Release, Relaxed, |m| {
539                                debug_assert_ne!(m & (1_usize << index), 0);
540                                debug_assert_ne!(m & (1_usize << (index + ARRAY_LEN)), 0);
541                                let new =
542                                    m & (!((1_usize << index) | (1_usize << (index + ARRAY_LEN))));
543                                empty = Self::instance_bitmap(new) == 0;
544                                Some(new)
545                            });
546                            debug_assert!(result.is_ok());
547                            return (Some(inst), empty);
548                        }
549                        Err(prev) => {
550                            // Metadata has changed.
551                            metadata = prev;
552                            continue 'after_read_metadata;
553                        }
554                    }
555                }
556
557                // Look for another valid slot.
558                instance_bitmap_inverted |= 1_u32 << index;
559                index = instance_bitmap_inverted.trailing_ones() as usize;
560            }
561
562            return (None, false);
563        }
564    }
565
566    /// Pops all the values, and folds them.
567    #[allow(clippy::cast_possible_truncation)]
568    fn pop_all<B, F: FnMut(B, T) -> B>(&self, init: B, fold: &mut F) -> B {
569        struct ExitGuard<F: FnOnce()>(Option<F>);
570        impl<F: FnOnce()> Drop for ExitGuard<F> {
571            #[inline]
572            fn drop(&mut self) {
573                let Some(f) = self.0.take() else {
574                    return;
575                };
576                f();
577            }
578        }
579
580        let mut acc = init;
581        let mut metadata = self.metadata.load(Relaxed);
582        loop {
583            // Look for instantiated, and reachable entries.
584            let instance_bitmap = Self::instance_bitmap(metadata) as usize;
585            let owned_bitmap = Self::owned_bitmap(metadata) as usize;
586            let instances_to_pop = instance_bitmap & (!owned_bitmap);
587            debug_assert_eq!(instances_to_pop & owned_bitmap, 0);
588
589            if instances_to_pop == 0 {
590                return acc;
591            }
592
593            let marked_for_removal = metadata | instances_to_pop;
594            match self.metadata.compare_exchange_weak(
595                metadata,
596                marked_for_removal,
597                Acquire,
598                Relaxed,
599            ) {
600                Ok(_) => {
601                    metadata = marked_for_removal;
602                    let _guard = ExitGuard(Some(|| {
603                        loop {
604                            let new_metadata =
605                                metadata & (!((instances_to_pop << ARRAY_LEN) | instances_to_pop));
606                            if let Err(actual) = self.metadata.compare_exchange_weak(
607                                metadata,
608                                new_metadata,
609                                Release,
610                                Relaxed,
611                            ) {
612                                metadata = actual;
613                                continue;
614                            }
615                            break;
616                        }
617                    }));
618
619                    // Now all the valid slots are locked for removal.
620                    let mut index = instances_to_pop.trailing_zeros() as usize;
621                    while index < ARRAY_LEN {
622                        acc = fold(acc, unsafe { (*self.storage.get())[index].as_ptr().read() });
623                        index = (instances_to_pop & (!((1_usize << (index + 1) as u32) - 1)))
624                            .trailing_zeros() as usize;
625                    }
626                    return acc;
627                }
628                Err(actual) => metadata = actual,
629            }
630        }
631    }
632
633    #[allow(clippy::cast_possible_truncation)]
634    const fn instance_bitmap(metadata: usize) -> u32 {
635        metadata.wrapping_shr(ARRAY_LEN as u32) as u32
636    }
637
638    #[allow(clippy::cast_possible_truncation)]
639    const fn owned_bitmap(metadata: usize) -> u32 {
640        (metadata % (1_usize << ARRAY_LEN)) as u32
641    }
642}
643
644impl<T, const ARRAY_LEN: usize> Drop for Storage<T, ARRAY_LEN> {
645    #[inline]
646    fn drop(&mut self) {
647        if needs_drop::<T>() {
648            let mut instance_bitmap = Self::instance_bitmap(self.metadata.load(Acquire));
649            loop {
650                let index = instance_bitmap.trailing_zeros();
651                if index == 32 {
652                    break;
653                }
654                instance_bitmap &= !(1_u32 << index);
655                unsafe { drop_in_place((*self.storage.get())[index as usize].as_mut_ptr()) };
656            }
657        }
658    }
659}
660
661unsafe impl<T: Send, const ARRAY_LEN: usize> Send for Storage<T, ARRAY_LEN> {}
662unsafe impl<T: Send + Sync, const ARRAY_LEN: usize> Sync for Storage<T, ARRAY_LEN> {}