tokio 0.2.19

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use crate::loom::{
    cell::UnsafeCell,
    sync::atomic::{AtomicPtr, AtomicUsize},
    thread,
};

use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};

/// A block in a linked list.
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
pub(crate) struct Block<T> {
    /// The start index of this block.
    ///
    /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
    start_index: usize,

    /// The next block in the linked list.
    next: AtomicPtr<Block<T>>,

    /// Bitfield tracking slots that are ready to have their values consumed.
    ready_slots: AtomicUsize,

    /// The observed `tail_position` value *after* the block has been passed by
    /// `block_tail`.
    observed_tail_position: UnsafeCell<usize>,

    /// Array containing values pushed into the block. Values are stored in a
    /// continuous array in order to improve cache line behavior when reading.
    /// The values must be manually dropped.
    values: Values<T>,
}

pub(crate) enum Read<T> {
    Value(T),
    Closed,
}

struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);

use super::BLOCK_CAP;

/// Masks an index to get the block identifier
const BLOCK_MASK: usize = !(BLOCK_CAP - 1);

/// Masks an index to get the value offset in a block.
const SLOT_MASK: usize = BLOCK_CAP - 1;

/// Flag tracking that a block has gone through the sender's release routine.
///
/// When this is set, the receiver may consider freeing the block.
const RELEASED: usize = 1 << BLOCK_CAP;

/// Flag tracking all senders dropped.
///
/// When this flag is set, the send half of the channel has closed.
const TX_CLOSED: usize = RELEASED << 1;

/// Mask covering all bits used to track slot readiness.
const READY_MASK: usize = RELEASED - 1;

/// Returns the index of the first slot in the block referenced by `slot_index`.
#[inline(always)]
pub(crate) fn start_index(slot_index: usize) -> usize {
    BLOCK_MASK & slot_index
}

/// Returns the offset into the block referenced by `slot_index`.
#[inline(always)]
pub(crate) fn offset(slot_index: usize) -> usize {
    SLOT_MASK & slot_index
}

impl<T> Block<T> {
    pub(crate) fn new(start_index: usize) -> Block<T> {
        Block {
            // The absolute index in the channel of the first slot in the block.
            start_index,

            // Pointer to the next block in the linked list.
            next: AtomicPtr::new(ptr::null_mut()),

            ready_slots: AtomicUsize::new(0),

            observed_tail_position: UnsafeCell::new(0),

            // Value storage
            values: unsafe { Values::uninitialized() },
        }
    }

    /// Returns `true` if the block matches the given index
    pub(crate) fn is_at_index(&self, index: usize) -> bool {
        debug_assert!(offset(index) == 0);
        self.start_index == index
    }

    /// Returns the number of blocks between `self` and the block at the
    /// specified index.
    ///
    /// `start_index` must represent a block *after* `self`.
    pub(crate) fn distance(&self, other_index: usize) -> usize {
        debug_assert!(offset(other_index) == 0);
        other_index.wrapping_sub(self.start_index) / BLOCK_CAP
    }

    /// Reads the value at the given offset.
    ///
    /// Returns `None` if the slot is empty.
    ///
    /// # Safety
    ///
    /// To maintain safety, the caller must ensure:
    ///
    /// * No concurrent access to the slot.
    pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
        let offset = offset(slot_index);

        let ready_bits = self.ready_slots.load(Acquire);

        if !is_ready(ready_bits, offset) {
            if is_tx_closed(ready_bits) {
                return Some(Read::Closed);
            }

            return None;
        }

        // Get the value
        let value = self.values[offset].with(|ptr| ptr::read(ptr));

        Some(Read::Value(value.assume_init()))
    }

    /// Writes a value to the block at the given offset.
    ///
    /// # Safety
    ///
    /// To maintain safety, the caller must ensure:
    ///
    /// * The slot is empty.
    /// * No concurrent access to the slot.
    pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
        // Get the offset into the block
        let slot_offset = offset(slot_index);

        self.values[slot_offset].with_mut(|ptr| {
            ptr::write(ptr, MaybeUninit::new(value));
        });

        // Release the value. After this point, the slot ref may no longer
        // be used. It is possible for the receiver to free the memory at
        // any point.
        self.set_ready(slot_offset);
    }

    /// Signal to the receiver that the sender half of the list is closed.
    pub(crate) unsafe fn tx_close(&self) {
        self.ready_slots.fetch_or(TX_CLOSED, Release);
    }

    /// Resets the block to a blank state. This enables reusing blocks in the
    /// channel.
    ///
    /// # Safety
    ///
    /// To maintain safety, the caller must ensure:
    ///
    /// * All slots are empty.
    /// * The caller holds a unique pointer to the block.
    pub(crate) unsafe fn reclaim(&mut self) {
        self.start_index = 0;
        self.next = AtomicPtr::new(ptr::null_mut());
        self.ready_slots = AtomicUsize::new(0);
    }

    /// Releases the block to the rx half for freeing.
    ///
    /// This function is called by the tx half once it can be guaranteed that no
    /// more senders will attempt to access the block.
    ///
    /// # Safety
    ///
    /// To maintain safety, the caller must ensure:
    ///
    /// * The block will no longer be accessed by any sender.
    pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
        // Track the observed tail_position. Any sender targetting a greater
        // tail_position is guaranteed to not access this block.
        self.observed_tail_position
            .with_mut(|ptr| *ptr = tail_position);

        // Set the released bit, signalling to the receiver that it is safe to
        // free the block's memory as soon as all slots **prior** to
        // `observed_tail_position` have been filled.
        self.ready_slots.fetch_or(RELEASED, Release);
    }

    /// Mark a slot as ready
    fn set_ready(&self, slot: usize) {
        let mask = 1 << slot;
        self.ready_slots.fetch_or(mask, Release);
    }

    /// Returns `true` when all slots have their `ready` bits set.
    ///
    /// This indicates that the block is in its final state and will no longer
    /// be mutated.
    ///
    /// # Implementation
    ///
    /// The implementation walks each slot checking the `ready` flag. It might
    /// be that it would make more sense to coalesce ready flags as bits in a
    /// single atomic cell. However, this could have negative impact on cache
    /// behavior as there would be many more mutations to a single slot.
    pub(crate) fn is_final(&self) -> bool {
        self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
    }

    /// Returns the `observed_tail_position` value, if set
    pub(crate) fn observed_tail_position(&self) -> Option<usize> {
        if 0 == RELEASED & self.ready_slots.load(Acquire) {
            None
        } else {
            Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
        }
    }

    /// Loads the next block
    pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
        let ret = NonNull::new(self.next.load(ordering));

        debug_assert!(unsafe {
            ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
                .unwrap_or(true)
        });

        ret
    }

    /// Pushes `block` as the next block in the link.
    ///
    /// Returns Ok if successful, otherwise, a pointer to the next block in
    /// the list is returned.
    ///
    /// This requires that the next pointer is null.
    ///
    /// # Ordering
    ///
    /// This performs a compare-and-swap on `next` using AcqRel ordering.
    ///
    /// # Safety
    ///
    /// To maintain safety, the caller must ensure:
    ///
    /// * `block` is not freed until it has been removed from the list.
    pub(crate) unsafe fn try_push(
        &self,
        block: &mut NonNull<Block<T>>,
        ordering: Ordering,
    ) -> Result<(), NonNull<Block<T>>> {
        block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);

        let next_ptr = self
            .next
            .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);

        match NonNull::new(next_ptr) {
            Some(next_ptr) => Err(next_ptr),
            None => Ok(()),
        }
    }

    /// Grows the `Block` linked list by allocating and appending a new block.
    ///
    /// The next block in the linked list is returned. This may or may not be
    /// the one allocated by the function call.
    ///
    /// # Implementation
    ///
    /// It is assumed that `self.next` is null. A new block is allocated with
    /// `start_index` set to be the next block. A compare-and-swap is performed
    /// with AcqRel memory ordering. If the compare-and-swap is successful, the
    /// newly allocated block is released to other threads walking the block
    /// linked list. If the compare-and-swap fails, the current thread acquires
    /// the next block in the linked list, allowing the current thread to access
    /// the slots.
    pub(crate) fn grow(&self) -> NonNull<Block<T>> {
        // Create the new block. It is assumed that the block will become the
        // next one after `&self`. If this turns out to not be the case,
        // `start_index` is updated accordingly.
        let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));

        let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };

        // Attempt to store the block. The first compare-and-swap attempt is
        // "unrolled" due to minor differences in logic
        //
        // `AcqRel` is used as the ordering **only** when attempting the
        // compare-and-swap on self.next.
        //
        // If the compare-and-swap fails, then the actual value of the cell is
        // returned from this function and accessed by the caller. Given this,
        // the memory must be acquired.
        //
        // `Release` ensures that the newly allocated block is available to
        // other threads acquiring the next pointer.
        let next = NonNull::new(self.next.compare_and_swap(
            ptr::null_mut(),
            new_block.as_ptr(),
            AcqRel,
        ));

        let next = match next {
            Some(next) => next,
            None => {
                // The compare-and-swap succeeded and the newly allocated block
                // is successfully pushed.
                return new_block;
            }
        };

        // There already is a next block in the linked list. The newly allocated
        // block could be dropped and the discovered next block returned;
        // however, that would be wasteful. Instead, the linked list is walked
        // by repeatedly attempting to compare-and-swap the pointer into the
        // `next` register until the compare-and-swap succeed.
        //
        // Care is taken to update new_block's start_index field as appropriate.

        let mut curr = next;

        // TODO: Should this iteration be capped?
        loop {
            let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };

            curr = match actual {
                Ok(_) => {
                    return next;
                }
                Err(curr) => curr,
            };

            // When running outside of loom, this calls `spin_loop_hint`.
            thread::yield_now();
        }
    }
}

/// Returns `true` if the specificed slot has a value ready to be consumed.
fn is_ready(bits: usize, slot: usize) -> bool {
    let mask = 1 << slot;
    mask == mask & bits
}

/// Returns `true` if the closed flag has been set.
fn is_tx_closed(bits: usize) -> bool {
    TX_CLOSED == bits & TX_CLOSED
}

impl<T> Values<T> {
    unsafe fn uninitialized() -> Values<T> {
        let mut vals = MaybeUninit::uninit();

        // When fuzzing, `UnsafeCell` needs to be initialized.
        if_loom! {
            let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
            for i in 0..BLOCK_CAP {
                p.add(i)
                    .write(UnsafeCell::new(MaybeUninit::uninit()));
            }
        }

        Values(vals.assume_init())
    }
}

impl<T> ops::Index<usize> for Values<T> {
    type Output = UnsafeCell<MaybeUninit<T>>;

    fn index(&self, index: usize) -> &Self::Output {
        self.0.index(index)
    }
}