bombs 0.2.1

Efficient single-producer multi-consumer channel types.
Documentation
mod multi; pub use multi::*;

use std::{
    mem::MaybeUninit,
    sync::Weak,
};

use crate::types::{
    sync::{
        atomic::{ AtomicBool, Ordering },
        Arc,
    },
    cell::UnsafeCell,
};

/// A type alias for [`Fuse<T>`].
pub type Fuze<T> = Fuse<T>;

/// A type alias for [`Flame`].
pub type Fire = Flame;

struct OneTime<T> {
    has_value: AtomicBool,
    cell: UnsafeCell<MaybeUninit<T>>,
}

impl<T> OneTime<T> {
    pub fn new() -> Self {
        Self {
            has_value: AtomicBool::new(false),
            cell: UnsafeCell::new(MaybeUninit::uninit()),
        }
    }
}

impl<T> Drop for OneTime<T> {
    fn drop(&mut self) {
        // `Arc` synchronises with an Acquire fence on drop,
        // which ensures that all writes made by a previous reference
        // are already visible side effects at this point. It is
        // sufficient to read with Relaxed ordering here.
        if self.has_value.load(Ordering::Relaxed) {
            // SAFETY: Guaranteed to have a value, it is safe to drop.

            unsafe {
                // Replace with uninitialised memory, which is safe to not drop.
                #[cfg(not(loom))]
                let value = self.cell.get().replace(MaybeUninit::uninit());
                #[cfg(loom)]
                let value = self.cell.get_mut().with(|p| p.replace(MaybeUninit::uninit()));
                // Acquire value and drop.
                value.assume_init();
            }
        }
    }
}

/// A one-time use bomb.
///
/// The [`Fuse<T>`] for this `Bomb` can be lit only once,
/// and the bomb will remain in an exploded state for the
/// remainder of the existence of this instance.
///
/// `Bomb`s can be safely cloned and sent between threads.
pub struct Bomb<T> {
    data: Arc<OneTime<T>>,

    /// Empty pointer to count strong references (bombs).
    /// Used by [`Fuse`] to check if all bombs have been dropped.
    #[cfg(not(loom))] // Weak not implemented in loom.
    _counter: Arc<()>,
}

// SAFETY: Since `Bomb`s give access to a `&T`, it is required
// that `T` is `Sync`. Also, since the final `Bomb` drops the
// value (gains ownership of it), it is required that `T` is also `Send`.
unsafe impl<T: Send + Sync> Send for Bomb<T> { }
// SAFETY: Since `&Bomb<T>` is `Clone`, `T` must be `Send` for
// `Bomb<T>` to be `Sync`, so that it is safe to clone
// on different threads to which it was initialised.
unsafe impl<T: Send + Sync> Sync for Bomb<T> { }

// Derive requires `T: Clone` which is not necessary.
// Implement manually instead.
impl<T> Clone for Bomb<T> {
    fn clone(&self) -> Self {
        Self { data: self.data.clone(), #[cfg(not(loom))] _counter: self._counter.clone() }
    }
}

/// A one-time use fuse.
///
/// This `Fuse` can only be lit once, and will explode all
/// [`Bomb<T>`] instances associated with it. The `Fuse`
/// instance is consumed on use.
///
/// `Fuse`s cannot be cloned, but may be moved between threads.
pub struct Fuse<T> {
    data: Arc<OneTime<T>>,

    /// Weak pointer to bombs, to check if all bombs
    /// associated with this fuse have been dropped.
    /// Empty type since we only care about strong count.
    #[cfg(not(loom))] // Weak not implemented in loom.
    _counter: Weak<()>
}

// SAFETY: Since `Bomb`s give access to a `&T`, it is required
// that `T` is `Sync`, otherwise this `Fuse` cannot be transferred
// to different threads. Also, since the final `Bomb` drops the value
// (gains ownership of it), it is required that `T` is also `Send`.
unsafe impl<T: Send + Sync> Send for Fuse<T> { }
// SAFETY: Lighting a `Fuse` is consuming and requires ownership, which
// ensures that there is only one instance modifying the internal state.
// `&Fuse<T>` cannot access the internal state.
unsafe impl<T> Sync for Fuse<T> { }

/// Result of a bomb explosion.
///
/// `Flame` is a handle returned by [`Fuse<T>::light()`], which
/// allows to check whether all bombs have been extinguished (dropped).
///
/// [`Fuse<T>::light()`]: ./struct.Fuse.html#method.light
pub struct Flame {
    #[cfg(not(loom))] // Weak not implemented in loom.
    counter: Weak<()>,
}

impl<T> Bomb<T> {
    /// Creates a new single producer [`Fuse<T>`], and a multi-consumer `Bomb<T>`.
    ///
    /// Instances of `Bomb<T>` may be safely cloned and sent between threads.
    pub fn new() -> (Fuse<T>, Bomb<T>) {
        #[cfg(not(loom))] // Weak not implemented in loom.
        let _counter = Arc::new(());

        let fuse = Fuse::new(#[cfg(not(loom))] &_counter);

        let bomb = Self {
            data: fuse.data.clone(),
            #[cfg(not(loom))] // Weak not implemented in loom.
            _counter,
        };

        (fuse, bomb)
    }

    /// Returns `Some` if the `Bomb` has exploded.
    ///
    /// Once the `Bomb` has exploded, it will remain
    /// in an exploded state for the remainder of the
    /// existence of this `Bomb` instance.
    /// It is expected that the `Bomb` is dropped after explosion,
    /// to notify the original [`Fuse`] (now [`Flame`]) that this `Bomb`
    /// has completed processing the data.
    pub fn exploded(&self) -> Option<&T> {
        // Check if the `Bomb` has exploded yet.
        // Return early if it has not.
        // This avoids the need to grab a raw pointer to the data cell.
        if !self.data.has_value.load(Ordering::Acquire) {
            None
        }
        else {
            // The `Bomb` has exploded, we have data.

            // SAFETY: Due to the implementation of `Fuse`,
            // if `exploded` is true, then there are no other
            // mutable references for the pointer to the cell,
            // hence it is safe to read across multiple threads.
            #[cfg(not(loom))]
            unsafe { Some(self.data.cell.get().as_ref().unwrap().assume_init_ref()) }
            #[cfg(loom)]
            unsafe { Some(self.data.cell.get().with(|p| p.as_ref().unwrap().assume_init_ref())) }
        }
    }
}

impl<T> Fuse<T> {
    fn new(#[cfg(not(loom))] _counter: &Arc<()>) -> Self {
        Self {
            data: Arc::new(OneTime::new()),

            #[cfg(not(loom))] // Weak not implemented in loom.
            _counter: Arc::downgrade(&_counter),
        }
    }

    /// Ignites the fuse.
    ///
    /// Explodes all [`Bomb`]s associated with this `Fuse`.
    /// Each [`Bomb`] receives `value`.
    ///
    /// Alias to [`light`](#method.light)
    pub fn ignite(self, value: T) -> Flame {
        self.light(value)
    }

    /// Lights the fuse.
    ///
    /// Explodes all [`Bomb`]s associated with this `Fuse`.
    /// Each [`Bomb`] receives `value`.
    pub fn light(self, value: T) -> Flame {
        // SAFETY: We know that there do not exist
        // other references to the value of the cell,
        // since `has_value` is still set to false,
        // and therefore any `Bomb`s will not attempt to get
        // a reference.
        #[cfg(not(loom))]
        unsafe { *self.data.cell.get() = MaybeUninit::new(value); }
        #[cfg(loom)]
        unsafe { self.data.cell.get_mut().with(|p| { *p = MaybeUninit::new(value); }); }

        // Now we can set `has_value` to true, and
        // allow other `Bomb`s to get a reference safely.
        self.data.has_value.store(true, Ordering::Release);

        // At this point it is safe to drop this `Fuse`,
        // since the inner containers will remain in memory
        // due to other `Arc`s existing in the `Bomb`s.

        // Create a `Flame` handle, with a weak pointer to the counter.
        Flame {
            #[cfg(not(loom))] // Weak not implemented in loom.
            counter: self._counter
        }
    }
}

impl Flame {
    /// Checks if the exploded [`Bomb`]s have been extinguished (dropped).
    ///
    /// For [`MultiBomb`]s, this will return `true` as soon as each [`MultiBomb`]
    /// has exploded with the value associated with this `Flame`,
    /// as the inner [`Bomb`] is dropped immediately (and replaced with a new one).
    pub fn extinguished(&self) -> bool {
        #[cfg(not(loom))] // Weak not implemented in loom.
        { self.counter.strong_count() == 0 }
        #[cfg(loom)]
        unimplemented!("std::sync::Weak not implemented in loom.");
    }
}