refcapsule 0.1.0-beta1

Safely send references to other threads
Documentation
//! This module is somewhat similar to scoped threads, but allows passing references to other
//! threads over channels, or similar mechanisms.
//!
//! It captures zero or more references inside of "capsules" with a specific lifetime, then runs a function in a context
//! where control won't be returned to the caller until all capsules for that lifetime have been
//! dropped.
//!
//! Since it blocks waiting for the "capsules" to be dropped, the capsules can be safely passed to
//! other threads (for example with a channel), wher they can be derferenced as references again.
//!
//! # Examples
//!
//! ```
//! use std::thread;
//! use std::time::Duration;
//! use std::sync::mpsc::channel;
//! use refcapsule::{Capsule, with_encapsulated};
//!
//! let (sender, receiver) = channel::<Capsule<u32>>();
//!
//! // receiver of references
//!
//! thread::spawn(move || {
//!     {
//!         let r = receiver.recv().unwrap();
//!         thread::sleep(Duration::from_millis(100));
//!         assert_eq!(*r, 4);
//!     }
//!     {
//!         let r = receiver.recv().unwrap();
//!         thread::sleep(Duration::from_millis(100));
//!         assert_eq!(*r, 12);
//!     }
//! });
//!
//! let x: u32 = 4;
//! let s1 = sender.clone();
//! with_encapsulated(&x, move |x| s1.send(x).unwrap());
//!
//! with_encapsulated(&12, move |cap| sender.send(cap).unwrap());
//! ```
//!
//!
//! ## Things that shouldn't compile
//!
//! Mutating the original variable, while it is encapsulated:
//!
//! ```compile_fail,E0499
//! use refcapsule::with_encapsulated;
//!
//! let mut x = 43;
//! with_encapsulated(&mut x, |y| {
//!     x = 4;
//! });
//! ```
//!
//! Encapsulating a reference with a lifetime shorter than the encapsulation scope:
//!
//! ```compile_fail,E0597
//! use refcapsule::{with_encapsulated, encapsulate::gen};
//!
//! with_encapsulated(gen(|s| {
//!     let x = 43;
//!     s.encapsulate(&x);
//! }), |x| {});
//! ```
//!
//! Mutating the original variable when it is encapsulated using a generator function:
//!
//! ```compile_fail,E0499
//! use refcapsule::{with_encapsulated, encapsulate::gen};
//!
//! let mut x = 43;
//! with_encapsulated(gen(|s| {
//!     s.encapsulate_mut(&mut x);
//! }), |y| {
//!     x = 4;
//! });
//! ```
//!
//! Save a scope for a longer duration:
//!
//! ```compile_fail
//! use refcapsule::{with_encapsulated, encapsulate::gen};
//! with_encapsulated(gen(|s| s), |s| ());
//! ```
use std::fmt::{self, Debug};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, Thread};

pub mod encapsulate;

use encapsulate::*;

struct State {
    source: Thread,
    count: AtomicUsize,
    poisoned: AtomicBool,
}

impl State {
    /// Increment the number of references held.
    ///
    /// This should be called when creating a new Capsule.
    #[inline]
    fn increment(&self) {
        // We check for "overflow" with half of usize, to make sure there's no
        // chance it overflows to 0, which could result in unsoundness
        if self.count.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
            self.decrement();
            panic!("Exceeded maximum number of reference capsules for a single scope");
        }
    }

    /// Decrement the number of references held.
    ///
    /// This is called when a capsule is dropped.
    #[inline]
    fn decrement(&self) {
        if self.count.fetch_sub(1, Ordering::Release) == 1 {
            self.source.unpark();
        }
    }
}

// NOTE:
// We need to store the state in an arc to prevent a race condition
// if the original thread returns before all threads have finished decrementing.
// For example, in the case of a spurious wakeup between the last decrement of the counter
// and the call to unpark.
#[derive(Clone)]
struct Notifier(Arc<State>);

impl Drop for Notifier {
    fn drop(&mut self) {
        // Decrement the count of live capsules, and if it is the last one,
        // notify the source thread it can proceed.
        self.0.decrement();
    }
}

impl Notifier {
    /// Mark that a MutCapsule was dropped while panicking.
    fn poison(&self) {
        self.0.poisoned.store(true, Ordering::Relaxed);
    }
}

/// An encapsulated `&T` that can be passed between threads.
///
/// Since the thread that created it will block until this is dropped, it doesn't have
/// a lifetime. However, note that if it isn't dropped (for example if it is passed to
/// [`std::mem::forget`]) then it can create a deadlock, as the original thread is waiting
/// for something that will never happen.
pub struct Capsule<T: ?Sized + Sync> {
    _notifier: Notifier,
    data: NonNull<T>,
}

// Safety:
// This is safe because the thread that created the capsule will wait for
// it to be dropped before ending the original lifetime
unsafe impl<T: ?Sized + Sync> Send for Capsule<T> {}

impl<T: ?Sized + Sync> Deref for Capsule<T> {
    type Target = T;

    fn deref(&self) -> &T {
        // Safety:
        // The thread that created the pointer will block until this capsule is dropped
        // before dropping the reference this is derived from.
        unsafe { self.data.as_ref() }
    }
}

impl<T: ?Sized + Sync> Debug for Capsule<T>
where
    for<'a> &'a T: Debug,
{
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_tuple("Capsule").field(&self.deref()).finish()
    }
}

/// If this is dropped while panicking, then the original thread will also panic.
pub struct CapsuleMut<T: ?Sized + Sync> {
    notifier: Notifier,
    data: NonNull<T>,
}

impl<T: ?Sized + Sync> Deref for CapsuleMut<T> {
    type Target = T;

    fn deref(&self) -> &T {
        // Safety:
        // The thread that created the pointer will block until this capsule is dropped
        // before dropping the reference this is derived from.
        unsafe { self.data.as_ref() }
    }
}

impl<T: ?Sized + Sync> Drop for CapsuleMut<T> {
    fn drop(&mut self) {
        // If a thread panicks while holding a mutable capsule,
        // the referenced data may be in an inconsistent state, so
        // propagate the panic.
        if thread::panicking() {
            self.notifier.poison();
        }
    }
}

impl<T: ?Sized + Sync> DerefMut for CapsuleMut<T> {
    fn deref_mut(&mut self) -> &mut T {
        // Safety:
        // The thread that created the pointer will block until this capsule is dropped
        // before dropping the reference this is derived from.
        unsafe { self.data.as_mut() }
    }
}

/// Type representing a scope for encapsulated references
///
/// Most user code shouldn't have to directly use this. It is primarily for implementing
/// [`Encapsulate`]. It can convert a `&T` or `&mut T` into a `Capsule<T>` or `CapsuleMut<T>`
/// respectively. A typical implementation of [`Encapsulate`] will project embedded reference
/// types from the source type to capsule types in the destination types, using the methods on
/// `Scope`.
///
pub struct Scope<'env> {
    state: Arc<State>,
    _env: PhantomData<&'env mut &'env ()>,
}

impl<'env> Scope<'env> {
    /// Encapsulate a `&T` so it can be passed to another thread.
    #[inline]
    pub fn encapsulate<T: ?Sized + Sync>(&self, data: &'env T) -> Capsule<T> {
        self.state.increment();
        Capsule {
            _notifier: Notifier(self.state.clone()),
            data: data.into(),
        }
    }

    /// Encapsulate a `&mut T` so it can be passed to another thread.
    #[inline]
    pub fn encapsulate_mut<T: ?Sized + Sync>(&self, data: &'env mut T) -> CapsuleMut<T> {
        self.state.increment();
        CapsuleMut {
            notifier: Notifier(self.state.clone()),
            data: data.into(),
        }
    }

    fn new() -> Self {
        Scope {
            state: Arc::new(State {
                source: thread::current(),
                count: AtomicUsize::new(0),
                poisoned: AtomicBool::new(false),
            }),
            _env: PhantomData,
        }
    }

    /// Synchronously wait for all capsules created by this scope to be dropped.
    fn wait(&self) {
        while self.state.count.load(Ordering::Acquire) > 0 {
            thread::park();
        }
    }
}

/// Create a scope for sending references to another thread.
///
/// Encapsulate one or more references from `data` inside of `Capsule` or `CapsuleMut` which can
/// be safely passed to another thread (for example, through a channel). The `f` function is then
/// called with the capsule(s), and then this function will wait until all created capsules have
/// been dropped, even if they are eventually dropped by another thread.
///
/// Note that this function will block unless the capsules are dropped by `f` on the same thread.
///
/// `data` is any type that implements the [`Encapsulate`] trait. The most significant
/// implementations of this trait includes:
///
///  - `&T` which is convertend into a `Capsule<T>`
///  - `&mut T` which is converted into a `CapsuleMut<T>`
///  - A tuple of up to 8 elements, each of which implements `Encapsulate`, which is converted into
///    a tuple where each item is converted into the respective `Encapsulated` type.
///  - `[T; N]` where T implements `Encapsulate`.
///  - [`encapsulate::Gen`] which will allow you to directly use the [`Scope`] to convert
///  references to capsules inside a function or (more likely) closure.
pub fn with_encapsulated<'env, E, F, T>(data: E, f: F) -> T
where
    E: Encapsulate<'env>,
    F: FnOnce(E::Encapsulated) -> T,
{
    let scope = Scope::<'env>::new();
    let res = catch_unwind(AssertUnwindSafe(|| f(data.encapsulate(&scope))));
    scope.wait();
    match res {
        Err(e) => resume_unwind(e),
        Ok(_) if scope.state.poisoned.load(Ordering::Relaxed) => {
            panic!("a thread panicked while holding a mutable capsule")
        }
        Ok(result) => result,
    }
}