ev_slotmap 0.2.1

A lock-free, eventually consistent, concurrent slot map.
Documentation
use super::Operation;
use crate::inner::{Inner, InnerKey};
use crate::read::{user_friendly, ReadHandle};
use evmap::ShallowCopy;
use one_way_slot_map::{SlotMapKey as Key, SlotMapKeyData};
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::sync::atomic;
use std::sync::{Arc, MutexGuard};
use std::{fmt, mem, thread};

/// A handle that may be used to modify the concurrent map.
///
/// When the `WriteHandle` is dropped, the map is immediately (but safely) taken away from all
/// readers, causing all future lookups to return `None`.
///
/// ```
pub struct WriteHandle<K, P, V>
where
    K: Key<P>,
    V: ShallowCopy,
{
    epochs: crate::Epochs,
    w_handle: Option<Box<Inner<ManuallyDrop<V>>>>,
    last_op: Option<Operation<V>>,
    r_handle: ReadHandle<K, P, V>,
    last_epochs: Vec<usize>,

    phantom_p: PhantomData<P>,
}

impl<K, P, V> fmt::Debug for WriteHandle<K, P, V>
where
    K: Key<P> + fmt::Debug,
    V: fmt::Debug + ShallowCopy,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("WriteHandle")
            .field("epochs", &self.epochs)
            .field("w_handle", &self.w_handle)
            .field("last_op", &self.last_op)
            .field("r_handle", &self.r_handle)
            .finish()
    }
}

pub(crate) fn new<K, P, V>(
    w_handle: Inner<ManuallyDrop<V>>,
    epochs: crate::Epochs,
    r_handle: ReadHandle<K, P, V>,
) -> WriteHandle<K, P, V>
where
    K: Key<P>,
    V: ShallowCopy,
{
    WriteHandle {
        epochs,
        w_handle: Some(Box::new(w_handle)),
        last_op: Default::default(),
        r_handle,
        last_epochs: Vec::new(),

        phantom_p: Default::default(),
    }
}

impl<K, P, V> Drop for WriteHandle<K, P, V>
where
    K: Key<P>,
    V: ShallowCopy,
{
    fn drop(&mut self) {
        use std::ptr;

        // first, ensure both maps are up to date
        // (otherwise safely dropping de-duplicated rows is a pain)
        while self.last_op.is_some() {
            self.refresh();
        }

        // next, grab the read handle and set it to NULL
        let r_handle = self
            .r_handle
            .inner
            .swap(ptr::null_mut(), atomic::Ordering::Release);

        // now, wait for all readers to depart
        let epochs = Arc::clone(&self.epochs);
        let mut epochs = epochs.lock().unwrap();
        self.wait(&mut epochs);

        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
        atomic::fence(atomic::Ordering::SeqCst);

        let w_handle = &mut self.w_handle.as_mut().unwrap().data;

        // all readers have now observed the NULL, so we own both handles.
        // all records are duplicated between w_handle and r_handle.
        // since the two maps are exactly equal, we need to make sure that we *don't* call the
        // destructors of any of the values that are in our map, as they'll all be called when the
        // last read handle goes out of scope. to do so, we first clear w_handle, which won't drop
        // any elements since its values are kept as ManuallyDrop:
        w_handle.clear();

        // then we transmute r_handle to remove the ManuallyDrop, and then drop it, which will free
        // all the records. this is safe, since we know that no readers are using this pointer
        // anymore (due to the .wait() following swapping the pointer with NULL).
        drop(unsafe { Box::from_raw(r_handle as *mut Inner<V>) });
    }
}

impl<K, P, V> WriteHandle<K, P, V>
where
    K: Key<P>,
    V: ShallowCopy,
{
    fn wait(
        &mut self,
        epochs: &mut MutexGuard<'_, slab::Slab<Arc<atomic::AtomicUsize>>>,
    ) {
        let mut iter = 0;
        let mut start_i = 0;
        let high_bit = 1usize << (mem::size_of::<usize>() * 8 - 1);
        // we're over-estimating here, but slab doesn't expose its max index
        self.last_epochs.resize(epochs.capacity(), 0);
        'retry: loop {
            // read all and see if all have changed (which is likely)
            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(start_i) {
                // note that `ri` _may_ have been re-used since we last read into last_epochs.
                // this is okay though, as a change still implies that the new reader must have
                // arrived _after_ we did the atomic swap, and thus must also have seen the new
                // pointer.
                if self.last_epochs[ri] & high_bit != 0 {
                    // reader was not active right after last swap
                    // and therefore *must* only see new pointer
                    continue;
                }

                let now = epoch.load(atomic::Ordering::Acquire);
                if (now != self.last_epochs[ri])
                    | (now & high_bit != 0)
                    | (now == 0)
                {
                    // reader must have seen last swap
                } else {
                    // reader may not have seen swap
                    // continue from this reader's epoch
                    start_i = ii;

                    // how eagerly should we retry?
                    if iter != 20 {
                        iter += 1;
                    } else {
                        thread::yield_now();
                    }

                    continue 'retry;
                }
            }
            break;
        }
    }

    #[allow(clippy::borrowed_box)]
    fn run_operation_first(
        target: &mut Box<Inner<ManuallyDrop<V>>>,
        op: &Operation<V>,
    ) -> Option<InnerKey> {
        let mut result = None;

        match op {
            Operation::NoOp => (),
            Operation::Add(value) => {
                result = Some(
                    target.data.insert((), unsafe { value.shallow_copy() }),
                );
            }
            Operation::Replace(key, value) => {
                let old_value = target
                    .data
                    .get_mut_unbounded(key)
                    .expect("Tried to replace empty key");

                *old_value = unsafe { value.shallow_copy() };
            }
            Operation::Remove(key) => {
                let _ = target.data.remove_unbounded(key);
            }
            Operation::Clear => {
                target.data.clear();
            }
        }

        result
    }

    fn run_operation_second(target: &mut Inner<V>, op: Operation<V>) {
        match op {
            Operation::NoOp => (),
            Operation::Add(value) => {
                let _ = target.data.insert((), value);
            }
            Operation::Replace(key, value) => {
                let old_value = target
                    .data
                    .get_mut_unbounded(&key)
                    .expect("Tried to replace empty key");

                *old_value = value;
            }
            Operation::Remove(key) => {
                let _ = target.data.remove_unbounded(&key);
            }
            Operation::Clear => {
                target.data.clear();
            }
        }
    }

    /// refresh the write/read handle with the given operation
    fn refresh_with_operation(&mut self, op: Operation<V>) -> Option<InnerKey> {
        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
        // flag has been observed to be on for two subsequent iterations (there still may be some
        // readers present since we did the previous refresh)
        //
        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
        // only block on pre-existing readers, and they are never waiting to push onto epochs
        // unless they have finished reading.
        let epochs = Arc::clone(&self.epochs);
        let mut epochs = epochs.lock().unwrap();

        self.wait(&mut epochs);

        let result = {
            // all the readers have left!
            // we can safely bring the w_handle up to date.
            let w_handle = self.w_handle.as_mut().unwrap();

            if let Some(last_op) = self.last_op.take() {
                Self::run_operation_second(
                    unsafe { w_handle.do_drop() },
                    last_op,
                );
            }

            if let Operation::NoOp = &op {
                None
            } else {
                let result = Self::run_operation_first(w_handle, &op);

                self.last_op = Some(op);

                w_handle.mark_ready();

                // w_handle (the old r_handle) is now fully up to date!
                result
            }
        };

        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
        // inside the ReadHandle. op log contains all the changes that are in w_handle, but not in
        // r_handle.
        //
        // it's now time for us to swap the maps so that readers see up-to-date results from
        // w_handle.

        // prepare w_handle
        let w_handle = self.w_handle.take().unwrap();
        let w_handle = Box::into_raw(w_handle);

        // swap in our w_handle, and get r_handle in return
        let r_handle = self
            .r_handle
            .inner
            .swap(w_handle, atomic::Ordering::Release);
        let r_handle = unsafe { Box::from_raw(r_handle) };

        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
        atomic::fence(atomic::Ordering::SeqCst);

        for (ri, epoch) in epochs.iter() {
            self.last_epochs[ri] = epoch.load(atomic::Ordering::Acquire);
        }

        // NOTE: at this point, there are likely still readers using the w_handle we got
        self.w_handle = Some(r_handle);

        result
    }

    pub(crate) fn refresh(&mut self) {
        let _ = self.refresh_with_operation(Operation::NoOp);
    }

    /// Insert the given value into the slot map and return the associated key
    pub fn insert(&mut self, p: P, v: V) -> K {
        self.refresh_with_operation(Operation::Add(v))
            .expect("No key returned on insert")
            .to_outer_key(p)
    }

    /// Replace the value of the given key with the given value.
    pub fn update(&mut self, k: K, v: V) {
        let _ = self.refresh_with_operation(Operation::Replace(*k.borrow(), v));
    }

    /// Clear the slot map.
    pub fn clear(&mut self) {
        let _ = self.refresh_with_operation(Operation::Clear);
    }

    /// Remove the value from the map for the given key
    pub fn remove(&mut self, k: &K) {
        let _ = self.refresh_with_operation(Operation::Remove(*k.borrow()));
    }

    /// Get an iterator over all the items in the slot map directly without
    /// using an intermediate object. This is safe only because we are taking
    /// exclusive access to the writer to ensure no writes can occur while we
    /// are reading
    pub fn values(&mut self) -> impl Iterator<Item = &V> {
        self.refresh();
        self.w_handle
            .as_ref()
            .expect("There should always be something to read")
            .data
            .values()
            .map(user_friendly)
    }

    /// Get an iterator over all the keys and values in the slot map as long
    /// as you have a way to create the pointer value from the stored value.
    /// This is done directly without needing an intermediate object, and is
    /// safe only because we are taking exclusive access to the writer to ensure
    /// no writes can occur while we are reading
    pub fn iter<F>(
        &mut self,
        mut pointer_finder: F,
    ) -> impl Iterator<Item = (K, &V)>
    where
        F: FnMut(&V) -> P,
    {
        self.iter_raw().map(move |(key_data, v)| {
            (K::from(((&mut pointer_finder)(v), key_data)), v)
        })
    }

    /// Get an iterator over all the raw key data and values in the map directly
    /// without using an intermediate object. This is safe only because we are
    /// taking exclusive access to the writer to ensure no writes can occur
    /// while we are reading
    pub fn iter_raw(&mut self) -> impl Iterator<Item = (SlotMapKeyData, &V)> {
        self.refresh();
        self.w_handle
            .as_ref()
            .expect("There should always be something to read")
            .data
            .iter_raw()
            .map(|(key_data, v)| (key_data, user_friendly(v)))
    }
}

// allow using write handle for reads
use std::ops::Deref;
impl<K, P, V> Deref for WriteHandle<K, P, V>
where
    K: Key<P>,
    V: ShallowCopy,
{
    type Target = ReadHandle<K, P, V>;
    fn deref(&self) -> &Self::Target {
        &self.r_handle
    }
}