Skip to main content

sdd/
bag.rs

1//! [`Bag`] is a lock-free concurrent unordered instance container.
2
3use std::cell::UnsafeCell;
4use std::iter::FusedIterator;
5use std::mem::{MaybeUninit, needs_drop};
6use std::panic::UnwindSafe;
7use std::ptr::{self, drop_in_place};
8use std::sync::atomic::AtomicUsize;
9use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
10
11use super::{Guard, LinkedEntry, LinkedList, Stack};
12
13/// [`Bag`] is a lock-free concurrent unordered instance container.
14///
15/// [`Bag`] is a linearizable concurrent instance container where `ARRAY_LEN` instances are stored
16/// in a fixed-size array, and the rest are managed by its backup container; this makes a [`Bag`]
17/// especially efficient if the expected number of instances does not exceed `ARRAY_LEN`.
18///
19/// The maximum value of `ARRAY_LEN` is limited to `usize::BITS / 2` which is the default value.
20#[derive(Debug)]
21pub struct Bag<T, const ARRAY_LEN: usize = DEFAULT_ARRAY_LEN> {
22    /// Primary storage.
23    primary_storage: Storage<T, ARRAY_LEN>,
24    /// Fallback storage.
25    stack: Stack<Storage<T, ARRAY_LEN>>,
26}
27
28/// A mutable iterator over the entries of a [`Bag`].
29#[derive(Debug)]
30pub struct IterMut<'b, T, const ARRAY_LEN: usize = DEFAULT_ARRAY_LEN> {
31    bag: &'b mut Bag<T, ARRAY_LEN>,
32    current_index: u32,
33    current_stack_entry: Option<&'b mut LinkedEntry<Storage<T, ARRAY_LEN>>>,
34}
35
36/// An iterator that moves out of a [`Bag`].
37#[derive(Debug)]
38pub struct IntoIter<T, const ARRAY_LEN: usize = DEFAULT_ARRAY_LEN> {
39    bag: Bag<T, ARRAY_LEN>,
40}
41
42/// The default length of the fixed-size array in a [`Bag`].
43const DEFAULT_ARRAY_LEN: usize = usize::BITS as usize / 2;
44
45#[derive(Debug)]
46struct Storage<T, const ARRAY_LEN: usize> {
47    /// Storage.
48    storage: UnsafeCell<[MaybeUninit<T>; ARRAY_LEN]>,
49    /// Storage metadata.
50    ///
51    /// The layout of the metadata is,
52    /// - Upper `usize::BITS / 2` bits: initialization bitmap.
53    /// - Lower `usize::BITS / 2` bits: owned state bitmap.
54    ///
55    /// The metadata represents four possible states of a storage slot.
56    /// - `!instantiated && !owned`: initial state.
57    /// - `!instantiated && owned`: owned for instantiating.
58    /// - `instantiated && !owned`: valid and reachable.
59    /// - `instantiated && owned`: owned for moving out the instance.
60    metadata: AtomicUsize,
61}
62
63impl<T, const ARRAY_LEN: usize> Bag<T, ARRAY_LEN> {
64    /// `ARRAY_LEN` cannot be larger than `usize::BITS / 2`.
65    const CHECK_ARRAY_LEN: () = assert!(ARRAY_LEN <= (usize::BITS as usize) / 2);
66
67    /// Creates an empty [`Bag`].
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// use sdd::Bag;
73    ///
74    /// let bag: Bag<usize, 16> = Bag::new();
75    /// ```
76    #[cfg(not(feature = "loom"))]
77    #[inline]
78    #[must_use]
79    pub const fn new() -> Self {
80        let () = Self::CHECK_ARRAY_LEN;
81        Self {
82            primary_storage: Storage::new(),
83            stack: Stack::new(),
84        }
85    }
86
87    /// Creates an empty [`Bag`].
88    #[cfg(feature = "loom")]
89    #[inline]
90    #[must_use]
91    pub fn new() -> Self {
92        let () = Self::CHECK_ARRAY_LEN;
93        Self {
94            primary_storage: Storage::new(),
95            stack: Stack::new(),
96        }
97    }
98
99    /// Pushes an instance of `T`.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use sdd::Bag;
105    ///
106    /// let bag: Bag<usize> = Bag::default();
107    ///
108    /// bag.push(11);
109    /// ```
110    #[inline]
111    pub fn push(&self, val: T) {
112        if let Some(val) = self.primary_storage.push(val, true) {
113            self.stack.peek_with(|e| {
114                if let Some(storage) = e {
115                    if let Some(val) = storage.push(val, false) {
116                        unsafe {
117                            self.stack.push_unchecked(Storage::with_val(val));
118                        }
119                    }
120                } else {
121                    unsafe {
122                        self.stack.push_unchecked(Storage::with_val(val));
123                    }
124                }
125            });
126        }
127    }
128
129    /// Tries to push an instance of `T` only if the primary storage is not full.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error containing the supplied value if the primary storage is full.
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// use sdd::Bag;
139    ///
140    /// let bag: Bag<usize, 1> = Bag::new();
141    ///
142    /// assert!(bag.try_push(11).is_ok());
143    /// assert_eq!(bag.try_push(17), Err(17));
144    /// ```
145    #[inline]
146    pub fn try_push(&self, val: T) -> Result<(), T> {
147        if let Some(returned) = self.primary_storage.push(val, true) {
148            Err(returned)
149        } else {
150            Ok(())
151        }
152    }
153
154    /// Pops an instance in the [`Bag`] if not empty.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use sdd::Bag;
160    ///
161    /// let bag: Bag<usize> = Bag::default();
162    ///
163    /// bag.push(37);
164    ///
165    /// assert_eq!(bag.pop(), Some(37));
166    /// assert!(bag.pop().is_none());
167    /// ```
168    #[inline]
169    pub fn pop(&self) -> Option<T> {
170        let result = self.stack.peek_with(|e| {
171            e.and_then(|storage| {
172                let (val, empty) = storage.pop();
173                if empty {
174                    // Once marked deleted, new entries will be inserted in a new `Storage`
175                    // that may not be reachable from this one.
176                    storage.delete_self(Relaxed);
177                }
178                val
179            })
180        });
181        if let Some(val) = result {
182            return Some(val);
183        }
184        self.primary_storage.pop().0
185    }
186
187    /// Pops all the entries at once and folds them into an accumulator.
188    ///
189    /// # Examples
190    ///
191    /// ```
192    /// use sdd::Bag;
193    ///
194    /// let bag: Bag<usize> = Bag::default();
195    ///
196    /// bag.push(7);
197    /// bag.push(17);
198    /// bag.push(37);
199    ///
200    /// assert_eq!(bag.pop_all(0, |a, v| a + v), 61);
201    ///
202    /// bag.push(47);
203    /// assert_eq!(bag.pop(), Some(47));
204    /// assert!(bag.pop().is_none());
205    /// assert!(bag.is_empty());
206    /// ```
207    #[inline]
208    pub fn pop_all<B, F: FnMut(B, T) -> B>(&self, init: B, mut fold: F) -> B {
209        let mut acc = init;
210        let popped = self.stack.pop_all();
211        while let Some(storage) = popped.pop() {
212            acc = storage.pop_all(acc, &mut fold);
213        }
214        self.primary_storage.pop_all(acc, &mut fold)
215    }
216
217    /// Returns the number of entries in the [`Bag`].
218    ///
219    /// This method iterates over all the entry arrays in the [`Bag`] to count the number of
220    /// entries; therefore, its time complexity is `O(N)`.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use sdd::Bag;
226    ///
227    /// let bag: Bag<usize> = Bag::default();
228    /// assert_eq!(bag.len(), 0);
229    ///
230    /// bag.push(7);
231    /// assert_eq!(bag.len(), 1);
232    ///
233    /// for v in 0..64 {
234    ///    bag.push(v);
235    /// }
236    /// bag.pop();
237    /// assert_eq!(bag.len(), 64);
238    /// ```
239    #[inline]
240    pub fn len(&self) -> usize {
241        self.stack
242            .iter(&Guard::new())
243            .fold(self.primary_storage.len(), |acc, storage| {
244                acc + storage.len()
245            })
246    }
247
248    /// Returns `true` if the [`Bag`] is empty.
249    ///
250    /// # Examples
251    ///
252    /// ```
253    /// use sdd::Bag;
254    ///
255    /// let bag: Bag<usize> = Bag::default();
256    /// assert!(bag.is_empty());
257    ///
258    /// bag.push(7);
259    /// assert!(!bag.is_empty());
260    ///
261    /// assert_eq!(bag.pop(), Some(7));
262    /// assert!(bag.is_empty());
263    /// ```
264    #[inline]
265    pub fn is_empty(&self) -> bool {
266        if self.primary_storage.len() == 0 {
267            self.stack.is_empty()
268        } else {
269            false
270        }
271    }
272
273    /// Returns an iterator over the contained instances for modification.
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// use sdd::Bag;
279    ///
280    /// let mut bag: Bag<usize> = Bag::default();
281    ///
282    /// bag.push(3);
283    /// bag.push(3);
284    ///
285    /// assert_eq!(bag.iter_mut().count(), 2);
286    /// bag.iter_mut().for_each(|e| { *e += 1; });
287    ///
288    /// assert_eq!(bag.pop(), Some(4));
289    /// assert_eq!(bag.pop(), Some(4));
290    /// assert!(bag.pop().is_none());
291    /// ```
292    #[inline]
293    pub const fn iter_mut(&mut self) -> IterMut<'_, T, ARRAY_LEN> {
294        IterMut {
295            bag: self,
296            current_index: 0,
297            current_stack_entry: None,
298        }
299    }
300}
301
302impl<T> Default for Bag<T, DEFAULT_ARRAY_LEN> {
303    #[inline]
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309impl<T, const ARRAY_LEN: usize> Drop for Bag<T, ARRAY_LEN> {
310    #[inline]
311    fn drop(&mut self) {
312        if needs_drop::<T>() {
313            // It needs to drop all the stored instances in-place.
314            while let Some(v) = self.pop() {
315                drop(v);
316            }
317        }
318    }
319}
320
321impl<T, const ARRAY_LEN: usize> FromIterator<T> for Bag<T, ARRAY_LEN> {
322    #[inline]
323    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
324        let into_iter = iter.into_iter();
325        let bag = Self::new();
326        into_iter.for_each(|v| {
327            bag.push(v);
328        });
329        bag
330    }
331}
332
333impl<T, const ARRAY_LEN: usize> IntoIterator for Bag<T, ARRAY_LEN> {
334    type Item = T;
335    type IntoIter = IntoIter<T, ARRAY_LEN>;
336
337    #[inline]
338    fn into_iter(self) -> Self::IntoIter {
339        IntoIter { bag: self }
340    }
341}
342
343impl<'b, T, const ARRAY_LEN: usize> IntoIterator for &'b mut Bag<T, ARRAY_LEN> {
344    type IntoIter = IterMut<'b, T, ARRAY_LEN>;
345    type Item = &'b mut T;
346
347    #[inline]
348    fn into_iter(self) -> Self::IntoIter {
349        self.iter_mut()
350    }
351}
352
353impl<T, const ARRAY_LEN: usize> FusedIterator for IterMut<'_, T, ARRAY_LEN> {}
354
355impl<'b, T, const ARRAY_LEN: usize> Iterator for IterMut<'b, T, ARRAY_LEN> {
356    type Item = &'b mut T;
357
358    #[inline]
359    fn next(&mut self) -> Option<Self::Item> {
360        while self.current_index != u32::MAX {
361            let current_storage = if let Some(linked) = self.current_stack_entry.as_mut() {
362                &mut **linked
363            } else {
364                &mut self.bag.primary_storage
365            };
366
367            let instance_bitmap =
368                Storage::<T, ARRAY_LEN>::instance_bitmap(current_storage.metadata.load(Acquire));
369            let first_occupied =
370                (instance_bitmap.wrapping_shr(self.current_index)).trailing_zeros();
371            let next_occupied = self.current_index + first_occupied;
372            self.current_index = next_occupied + 1;
373            if (next_occupied as usize) < ARRAY_LEN {
374                return Some(unsafe {
375                    &mut *(*current_storage.storage.get())[next_occupied as usize].as_mut_ptr()
376                });
377            }
378            self.current_index = u32::MAX;
379
380            if let Some(linked) = self.current_stack_entry.as_mut() {
381                let guard = Guard::new();
382                if let Some(next) = linked.next_ptr(Acquire, &guard).as_ref() {
383                    let entry_mut = ptr::from_ref(next).cast_mut();
384                    self.current_stack_entry = unsafe { entry_mut.as_mut() };
385                    self.current_index = 0;
386                }
387            } else {
388                self.bag.stack.peek_with(|e| {
389                    if let Some(e) = e {
390                        let entry_mut = ptr::from_ref(e).cast_mut();
391                        self.current_stack_entry = unsafe { entry_mut.as_mut() };
392                        self.current_index = 0;
393                    }
394                });
395            }
396        }
397        None
398    }
399}
400
401impl<T, const ARRAY_LEN: usize> UnwindSafe for IterMut<'_, T, ARRAY_LEN> where T: UnwindSafe {}
402
403impl<T, const ARRAY_LEN: usize> FusedIterator for IntoIter<T, ARRAY_LEN> {}
404
405impl<T, const ARRAY_LEN: usize> Iterator for IntoIter<T, ARRAY_LEN> {
406    type Item = T;
407
408    #[inline]
409    fn next(&mut self) -> Option<Self::Item> {
410        self.bag.pop()
411    }
412}
413
414impl<T, const ARRAY_LEN: usize> UnwindSafe for IntoIter<T, ARRAY_LEN> where T: UnwindSafe {}
415
416impl<T, const ARRAY_LEN: usize> Storage<T, ARRAY_LEN> {
417    /// Creates a new [`Storage`].
418    const fn new() -> Self {
419        #[allow(clippy::uninit_assumed_init)]
420        Storage {
421            storage: unsafe { MaybeUninit::uninit().assume_init() },
422            metadata: AtomicUsize::new(0),
423        }
424    }
425
426    /// Creates a new [`Storage`] with one inserted.
427    fn with_val(val: T) -> Self {
428        #[allow(clippy::uninit_assumed_init)]
429        let storage = Self {
430            storage: UnsafeCell::new(unsafe { MaybeUninit::uninit().assume_init() }),
431            metadata: AtomicUsize::new(1_usize << ARRAY_LEN),
432        };
433        unsafe {
434            (*storage.storage.get())[0].as_mut_ptr().write(val);
435        }
436        storage
437    }
438
439    /// Returns the number of entries.
440    fn len(&self) -> usize {
441        let metadata = self.metadata.load(Relaxed);
442        let instance_bitmap = Self::instance_bitmap(metadata);
443        let owned_bitmap = Self::owned_bitmap(metadata);
444        let valid_entries_bitmap = instance_bitmap & (!owned_bitmap);
445        valid_entries_bitmap.count_ones() as usize
446    }
447
448    /// Pushes a new value.
449    fn push(&self, val: T, allow_empty: bool) -> Option<T> {
450        let mut metadata = self.metadata.load(Relaxed);
451        'after_read_metadata: loop {
452            // Look for a free slot.
453            let mut instance_bitmap = Self::instance_bitmap(metadata);
454            let owned_bitmap = Self::owned_bitmap(metadata);
455
456            // Regard entries being removed as removed ones.
457            if !allow_empty && (instance_bitmap & !owned_bitmap) == 0 {
458                return Some(val);
459            }
460            let mut index = instance_bitmap.trailing_ones() as usize;
461            while index < ARRAY_LEN {
462                if (owned_bitmap & (1_u32 << index)) == 0 {
463                    // Mark the slot `owned`.
464                    let new = metadata | (1_usize << index);
465                    match self
466                        .metadata
467                        .compare_exchange_weak(metadata, new, Acquire, Relaxed)
468                    {
469                        Ok(_) => {
470                            // Now the free slot is owned by the thread.
471                            unsafe {
472                                (*self.storage.get())[index].as_mut_ptr().write(val);
473                            }
474                            let result = self.metadata.fetch_update(Release, Relaxed, |m| {
475                                debug_assert_ne!(m & (1_usize << index), 0);
476                                debug_assert_eq!(m & (1_usize << (index + ARRAY_LEN)), 0);
477                                if !allow_empty
478                                    && (Self::instance_bitmap(m) & !Self::owned_bitmap(m)) == 0
479                                {
480                                    // Disallow pushing a value into an empty, or a soon-to-be-emptied array.
481                                    None
482                                } else {
483                                    let new = (m & (!(1_usize << index)))
484                                        | (1_usize << (index + ARRAY_LEN));
485                                    Some(new)
486                                }
487                            });
488                            if result.is_ok() {
489                                return None;
490                            }
491
492                            // The array was empty, thus rolling back the change.
493                            let val = unsafe { (*self.storage.get())[index].as_ptr().read() };
494                            self.metadata.fetch_and(!(1_usize << index), Release);
495                            return Some(val);
496                        }
497                        Err(prev) => {
498                            // Metadata has changed.
499                            metadata = prev;
500                            continue 'after_read_metadata;
501                        }
502                    }
503                }
504
505                // Look for another free slot.
506                instance_bitmap |= 1_u32 << index;
507                index = instance_bitmap.trailing_ones() as usize;
508            }
509
510            // No free slots or all the entries are owned.
511            return Some(val);
512        }
513    }
514
515    /// Pops a value.
516    fn pop(&self) -> (Option<T>, bool) {
517        let mut metadata = self.metadata.load(Relaxed);
518        'after_read_metadata: loop {
519            // Look for an instantiated, yet to be owned entry.
520            let mut instance_bitmap_inverted = !Self::instance_bitmap(metadata);
521            let owned_bitmap = Self::owned_bitmap(metadata);
522            let mut index = instance_bitmap_inverted.trailing_ones() as usize;
523            while index < ARRAY_LEN {
524                if (owned_bitmap & (1_u32 << index)) == 0 {
525                    // Mark the slot `owned`.
526                    let new = metadata | (1_usize << index);
527                    match self
528                        .metadata
529                        .compare_exchange_weak(metadata, new, Acquire, Relaxed)
530                    {
531                        Ok(_) => {
532                            // Now the desired slot is owned by the thread.
533                            let inst = unsafe { (*self.storage.get())[index].as_ptr().read() };
534                            let mut empty = false;
535                            let result = self.metadata.fetch_update(Release, Relaxed, |m| {
536                                debug_assert_ne!(m & (1_usize << index), 0);
537                                debug_assert_ne!(m & (1_usize << (index + ARRAY_LEN)), 0);
538                                let new =
539                                    m & (!((1_usize << index) | (1_usize << (index + ARRAY_LEN))));
540                                empty = Self::instance_bitmap(new) == 0;
541                                Some(new)
542                            });
543                            debug_assert!(result.is_ok());
544                            return (Some(inst), empty);
545                        }
546                        Err(prev) => {
547                            // Metadata has changed.
548                            metadata = prev;
549                            continue 'after_read_metadata;
550                        }
551                    }
552                }
553
554                // Look for another valid slot.
555                instance_bitmap_inverted |= 1_u32 << index;
556                index = instance_bitmap_inverted.trailing_ones() as usize;
557            }
558
559            return (None, false);
560        }
561    }
562
563    /// Pops all the values, and folds them.
564    #[allow(clippy::cast_possible_truncation)]
565    fn pop_all<B, F: FnMut(B, T) -> B>(&self, init: B, fold: &mut F) -> B {
566        struct ExitGuard<F: FnOnce()>(Option<F>);
567        impl<F: FnOnce()> Drop for ExitGuard<F> {
568            #[inline]
569            fn drop(&mut self) {
570                let Some(f) = self.0.take() else {
571                    return;
572                };
573                f();
574            }
575        }
576
577        let mut acc = init;
578        let mut metadata = self.metadata.load(Relaxed);
579        loop {
580            // Look for instantiated, and reachable entries.
581            let instance_bitmap = Self::instance_bitmap(metadata) as usize;
582            let owned_bitmap = Self::owned_bitmap(metadata) as usize;
583            let instances_to_pop = instance_bitmap & (!owned_bitmap);
584            debug_assert_eq!(instances_to_pop & owned_bitmap, 0);
585
586            if instances_to_pop == 0 {
587                return acc;
588            }
589
590            let marked_for_removal = metadata | instances_to_pop;
591            match self.metadata.compare_exchange_weak(
592                metadata,
593                marked_for_removal,
594                Acquire,
595                Relaxed,
596            ) {
597                Ok(_) => {
598                    metadata = marked_for_removal;
599                    let _guard = ExitGuard(Some(|| {
600                        loop {
601                            let new_metadata =
602                                metadata & (!((instances_to_pop << ARRAY_LEN) | instances_to_pop));
603                            if let Err(actual) = self.metadata.compare_exchange_weak(
604                                metadata,
605                                new_metadata,
606                                Release,
607                                Relaxed,
608                            ) {
609                                metadata = actual;
610                                continue;
611                            }
612                            break;
613                        }
614                    }));
615
616                    // Now all the valid slots are locked for removal.
617                    let mut index = instances_to_pop.trailing_zeros() as usize;
618                    while index < ARRAY_LEN {
619                        acc = fold(acc, unsafe { (*self.storage.get())[index].as_ptr().read() });
620                        index = (instances_to_pop & (!((1_usize << (index + 1) as u32) - 1)))
621                            .trailing_zeros() as usize;
622                    }
623                    return acc;
624                }
625                Err(actual) => metadata = actual,
626            }
627        }
628    }
629
630    #[allow(clippy::cast_possible_truncation)]
631    const fn instance_bitmap(metadata: usize) -> u32 {
632        metadata.wrapping_shr(ARRAY_LEN as u32) as u32
633    }
634
635    #[allow(clippy::cast_possible_truncation)]
636    const fn owned_bitmap(metadata: usize) -> u32 {
637        (metadata % (1_usize << ARRAY_LEN)) as u32
638    }
639}
640
641impl<T, const ARRAY_LEN: usize> Drop for Storage<T, ARRAY_LEN> {
642    #[inline]
643    fn drop(&mut self) {
644        if needs_drop::<T>() {
645            let mut instance_bitmap = Self::instance_bitmap(self.metadata.load(Acquire));
646            loop {
647                let index = instance_bitmap.trailing_zeros();
648                if index == 32 {
649                    break;
650                }
651                instance_bitmap &= !(1_u32 << index);
652                unsafe { drop_in_place((*self.storage.get())[index as usize].as_mut_ptr()) };
653            }
654        }
655    }
656}
657
658unsafe impl<T: Send, const ARRAY_LEN: usize> Send for Storage<T, ARRAY_LEN> {}
659unsafe impl<T: Send + Sync, const ARRAY_LEN: usize> Sync for Storage<T, ARRAY_LEN> {}