scc 1.1.1

High performance containers and utilities for concurrent and asynchronous programming
Documentation
//! [`Bag`] is a lock-free concurrent unordered instance container.

use super::ebr::Barrier;
use super::{LinkedList, Stack};

use std::mem::{needs_drop, MaybeUninit};
use std::ptr::drop_in_place;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};

/// The length of the fixed-size array in a [`Bag`].
const STORAGE_LEN: u32 = usize::BITS / 2;

/// [`Bag`] is a lock-free concurrent unordered instance container.
///
/// [`Bag`] is a linearizable concurrent instance container where `usize::BITS / 2` instances are
/// stored in a fixed-size array, and the rest are managed by its backup container which makes a
/// [`Bag`] especially efficient if the expected number of instances does not exceed
/// `usize::BITS / 2`.
#[derive(Debug)]
pub struct Bag<T: 'static> {
    /// Primary storage.
    primary_storage: Storage<T>,

    /// Fallback storage.
    stack: Stack<Storage<T>>,
}

impl<T: 'static> Bag<T> {
    /// Pushes an instance of `T`.
    ///
    /// # Examples
    ///
    /// ```
    /// use scc::Bag;
    ///
    /// let bag: Bag<usize> = Bag::default();
    ///
    /// bag.push(11);
    /// ```
    #[inline]
    pub fn push(&self, val: T) {
        if let Some(val) = self.primary_storage.push(val, true) {
            let barrier = Barrier::new();
            if let Some(storage) = self.stack.peek_with(|e| &**e, &barrier) {
                if let Some(val) = storage.push(val, false) {
                    self.stack.push(Storage::with_val(val));
                }
                return;
            }
            self.stack.push(Storage::with_val(val));
        }
    }

    /// Pops an instance in the [`Bag`] if not empty.
    ///
    /// # Examples
    ///
    /// ```
    /// use scc::Bag;
    ///
    /// let bag: Bag<usize> = Bag::default();
    ///
    /// bag.push(37);
    ///
    /// assert_eq!(bag.pop(), Some(37));
    /// assert!(bag.pop().is_none());
    /// ```
    #[inline]
    pub fn pop(&self) -> Option<T> {
        let barrier = Barrier::new();
        let mut current = self.stack.peek_with(|e| e, &barrier);
        while let Some(e) = current {
            let (val_opt, empty) = e.pop();
            if empty {
                e.delete_self(Relaxed);
            }
            if let Some(val) = val_opt {
                return Some(val);
            }
            current = e.next_ptr(Acquire, &barrier).as_ref();
        }
        self.primary_storage.pop().0
    }

    /// Returns `true` if the [`Bag`] is empty.
    ///
    /// # Examples
    ///
    /// ```
    /// use scc::Bag;
    ///
    /// let bag: Bag<usize> = Bag::default();
    /// assert!(bag.is_empty());
    ///
    /// bag.push(7);
    /// assert!(!bag.is_empty());
    ///
    /// assert_eq!(bag.pop(), Some(7));
    /// assert!(bag.is_empty());
    /// ```
    #[inline]
    pub fn is_empty(&self) -> bool {
        if self.primary_storage.is_empty() {
            self.stack.is_empty()
        } else {
            false
        }
    }
}

impl<T: 'static> Default for Bag<T> {
    #[inline]
    fn default() -> Self {
        Self {
            primary_storage: Storage::new(),
            stack: Stack::default(),
        }
    }
}

#[derive(Debug)]
struct Storage<T: 'static> {
    /// Storage.
    storage: [MaybeUninit<T>; STORAGE_LEN as usize],

    /// Storage metadata.
    ///
    /// The layout of the metadata is,
    /// - Upper `usize::BITS / 2` bits = instantiation bitmap.
    /// - Lower `usize::BITS / 2` bits = owned state bitmap.
    ///
    /// The metadata represents four possible states of a storage slot.
    /// - !instantiated && !owned: initial state.
    /// - !instantiated && owned: owned for instantiating.
    /// - instantiated && !owned: valid and reachable.
    /// - instantiated && owned: owned for moving out the instance.
    metadata: AtomicUsize,
}

impl<T: 'static> Storage<T> {
    /// Creates a new [`Storage`].
    fn new() -> Storage<T> {
        Storage {
            storage: unsafe { MaybeUninit::uninit().assume_init() },
            metadata: AtomicUsize::new(0),
        }
    }

    /// Creates a new [`Storage`] with one inserted.
    fn with_val(val: T) -> Storage<T> {
        let mut storage = Storage::<T> {
            storage: unsafe { MaybeUninit::uninit().assume_init() },
            metadata: AtomicUsize::new(1_usize << STORAGE_LEN),
        };
        unsafe {
            storage.storage[0].as_mut_ptr().write(val);
        }
        storage
    }

    /// Pushes a new value.
    fn push(&self, val: T, allow_empty: bool) -> Option<T> {
        let mut metadata = self.metadata.load(Relaxed);
        'after_read_metadata: loop {
            // Looking for a free slot.
            let mut instance_bitmap = Self::instance_bitmap(metadata);
            if !allow_empty && instance_bitmap == 0 {
                return Some(val);
            }
            let owned_bitmap = Self::owned_bitmap(metadata);
            let mut index = instance_bitmap.trailing_ones();
            while index != STORAGE_LEN {
                debug_assert!(index < STORAGE_LEN);
                if (owned_bitmap & (1_u32 << index)) == 0 {
                    // Mark the slot `owned`.
                    let new = metadata | (1_usize << index);
                    match self
                        .metadata
                        .compare_exchange(metadata, new, Acquire, Relaxed)
                    {
                        Ok(_) => {
                            // Now the free slot is owned by the thread.
                            unsafe {
                                (self.storage[index as usize].as_ptr() as *mut T).write(val);
                            }
                            let result = self.metadata.fetch_update(Release, Relaxed, |m| {
                                debug_assert_ne!(m & (1_usize << index), 0);
                                debug_assert_eq!(m & (1_usize << (index + STORAGE_LEN)), 0);
                                if !allow_empty && Self::instance_bitmap(m) == 0 {
                                    // Disallowed to push a value into an empty array.
                                    None
                                } else {
                                    let new = (m & (!(1_usize << index)))
                                        | (1_usize << (index + STORAGE_LEN));
                                    Some(new)
                                }
                            });
                            if result.is_ok() {
                                return None;
                            }

                            // The array was empty, thus rolling back the change.
                            let val = unsafe { self.storage[index as usize].as_ptr().read() };
                            self.metadata.fetch_and(!(1_usize << index), Relaxed);
                            return Some(val);
                        }
                        Err(prev) => {
                            // Metadata has changed.
                            metadata = prev;
                            continue 'after_read_metadata;
                        }
                    }
                }

                // Looking for another free slot.
                instance_bitmap |= 1_u32 << index;
                index = instance_bitmap.trailing_ones();
            }

            // No free slots or all the entries are owned.
            return Some(val);
        }
    }

    /// Pops a value.
    fn pop(&self) -> (Option<T>, bool) {
        let mut metadata = self.metadata.load(Relaxed);
        'after_read_metadata: loop {
            // Looking for an instantiated, yet unowned entry.
            let instance_bitmap = Self::instance_bitmap(metadata);
            let owned_bitmap = Self::owned_bitmap(metadata);
            let mut index = instance_bitmap.trailing_zeros();
            while index != 32 {
                debug_assert!(index < STORAGE_LEN);
                if (owned_bitmap & (1_u32 << index)) == 0 {
                    // Mark the slot `owned`.
                    let new = metadata | (1_usize << index);
                    match self
                        .metadata
                        .compare_exchange(metadata, new, Acquire, Relaxed)
                    {
                        Ok(_) => {
                            // Now the desired slot is owned by the thread.
                            let inst = unsafe { self.storage[index as usize].as_ptr().read() };
                            let mut empty = false;
                            let result = self.metadata.fetch_update(Relaxed, Relaxed, |m| {
                                debug_assert_ne!(m & (1_usize << index), 0);
                                debug_assert_ne!(m & (1_usize << (index + STORAGE_LEN)), 0);
                                let new = m
                                    & (!((1_usize << index) | (1_usize << (index + STORAGE_LEN))));
                                empty = Self::instance_bitmap(new) == 0;
                                Some(new)
                            });
                            debug_assert!(result.is_ok());
                            return (Some(inst), empty);
                        }
                        Err(prev) => {
                            // Metadata has changed.
                            metadata = prev;
                            continue 'after_read_metadata;
                        }
                    }
                }

                // Looking for another valid slot.
                index = (instance_bitmap & (u32::MAX.wrapping_shl(index).wrapping_shl(1)))
                    .trailing_zeros();
            }

            // All the entries are vacant or owned.
            return (None, instance_bitmap == 0);
        }
    }

    /// Returns `true` if empty.
    fn is_empty(&self) -> bool {
        let metadata = self.metadata.load(Acquire);
        Self::instance_bitmap(metadata) == 0
    }

    #[allow(clippy::cast_possible_truncation)]
    fn instance_bitmap(metadata: usize) -> u32 {
        metadata.wrapping_shr(STORAGE_LEN) as u32
    }

    #[allow(clippy::cast_possible_truncation)]
    fn owned_bitmap(metadata: usize) -> u32 {
        (metadata % (1_usize << STORAGE_LEN)) as u32
    }
}

impl<T: 'static> Drop for Storage<T> {
    #[inline]
    fn drop(&mut self) {
        if needs_drop::<T>() {
            let mut instance_bitmap = Self::instance_bitmap(self.metadata.load(Acquire));
            loop {
                let index = instance_bitmap.trailing_zeros();
                if index == 32 {
                    break;
                }
                instance_bitmap &= !(1_u32 << index);
                unsafe { drop_in_place(self.storage[index as usize].as_mut_ptr()) };
            }
        }
    }
}