refcapsule/
lib.rs

1//! This module is somewhat similar to scoped threads, but allows passing references to other
2//! threads over channels, or similar mechanisms.
3//!
4//! It captures zero or more references inside of "capsules" with a specific lifetime, then runs a function in a context
5//! where control won't be returned to the caller until all capsules for that lifetime have been
6//! dropped.
7//!
8//! Since it blocks waiting for the "capsules" to be dropped, the capsules can be safely passed to
9//! other threads (for example with a channel), wher they can be derferenced as references again.
10//!
11//! # Examples
12//!
13//! ```
14//! use std::thread;
15//! use std::time::Duration;
16//! use std::sync::mpsc::channel;
17//! use refcapsule::{Capsule, with_encapsulated};
18//!
19//! let (sender, receiver) = channel::<Capsule<u32>>();
20//!
21//! // receiver of references
22//!
23//! thread::spawn(move || {
24//!     {
25//!         let r = receiver.recv().unwrap();
26//!         thread::sleep(Duration::from_millis(100));
27//!         assert_eq!(*r, 4);
28//!     }
29//!     {
30//!         let r = receiver.recv().unwrap();
31//!         thread::sleep(Duration::from_millis(100));
32//!         assert_eq!(*r, 12);
33//!     }
34//! });
35//!
36//! let x: u32 = 4;
37//! let s1 = sender.clone();
38//! with_encapsulated(&x, move |x| s1.send(x).unwrap());
39//!
40//! with_encapsulated(&12, move |cap| sender.send(cap).unwrap());
41//! ```
42//!
43//!
44//! ## Things that shouldn't compile
45//!
46//! Mutating the original variable, while it is encapsulated:
47//!
48//! ```compile_fail,E0499
49//! use refcapsule::with_encapsulated;
50//!
51//! let mut x = 43;
52//! with_encapsulated(&mut x, |y| {
53//!     x = 4;
54//! });
55//! ```
56//!
57//! Encapsulating a reference with a lifetime shorter than the encapsulation scope:
58//!
59//! ```compile_fail,E0597
60//! use refcapsule::{with_encapsulated, encapsulate::gen};
61//!
62//! with_encapsulated(gen(|s| {
63//!     let x = 43;
64//!     s.encapsulate(&x);
65//! }), |x| {});
66//! ```
67//!
68//! Mutating the original variable when it is encapsulated using a generator function:
69//!
70//! ```compile_fail,E0499
71//! use refcapsule::{with_encapsulated, encapsulate::gen};
72//!
73//! let mut x = 43;
74//! with_encapsulated(gen(|s| {
75//!     s.encapsulate_mut(&mut x);
76//! }), |y| {
77//!     x = 4;
78//! });
79//! ```
80//!
81//! Save a scope for a longer duration:
82//!
83//! ```compile_fail
84//! use refcapsule::{with_encapsulated, encapsulate::gen};
85//! with_encapsulated(gen(|s| s), |s| ());
86//! ```
87use std::fmt::{self, Debug};
88use std::marker::PhantomData;
89use std::ops::{Deref, DerefMut};
90use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
91use std::ptr::NonNull;
92use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
93use std::sync::Arc;
94use std::thread::{self, Thread};
95
96pub mod encapsulate;
97
98use encapsulate::*;
99
100struct State {
101    source: Thread,
102    count: AtomicUsize,
103    poisoned: AtomicBool,
104}
105
106impl State {
107    /// Increment the number of references held.
108    ///
109    /// This should be called when creating a new Capsule.
110    #[inline]
111    fn increment(&self) {
112        // We check for "overflow" with half of usize, to make sure there's no
113        // chance it overflows to 0, which could result in unsoundness
114        if self.count.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
115            self.decrement();
116            panic!("Exceeded maximum number of reference capsules for a single scope");
117        }
118    }
119
120    /// Decrement the number of references held.
121    ///
122    /// This is called when a capsule is dropped.
123    #[inline]
124    fn decrement(&self) {
125        if self.count.fetch_sub(1, Ordering::Release) == 1 {
126            self.source.unpark();
127        }
128    }
129}
130
131// NOTE:
132// We need to store the state in an arc to prevent a race condition
133// if the original thread returns before all threads have finished decrementing.
134// For example, in the case of a spurious wakeup between the last decrement of the counter
135// and the call to unpark.
136#[derive(Clone)]
137struct Notifier(Arc<State>);
138
139impl Drop for Notifier {
140    fn drop(&mut self) {
141        // Decrement the count of live capsules, and if it is the last one,
142        // notify the source thread it can proceed.
143        self.0.decrement();
144    }
145}
146
147impl Notifier {
148    /// Mark that a MutCapsule was dropped while panicking.
149    fn poison(&self) {
150        self.0.poisoned.store(true, Ordering::Relaxed);
151    }
152}
153
154/// An encapsulated `&T` that can be passed between threads.
155///
156/// Since the thread that created it will block until this is dropped, it doesn't have
157/// a lifetime. However, note that if it isn't dropped (for example if it is passed to
158/// [`std::mem::forget`]) then it can create a deadlock, as the original thread is waiting
159/// for something that will never happen.
160pub struct Capsule<T: ?Sized + Sync> {
161    _notifier: Notifier,
162    data: NonNull<T>,
163}
164
165// Safety:
166// This is safe because the thread that created the capsule will wait for
167// it to be dropped before ending the original lifetime
168unsafe impl<T: ?Sized + Sync> Send for Capsule<T> {}
169
170impl<T: ?Sized + Sync> Deref for Capsule<T> {
171    type Target = T;
172
173    fn deref(&self) -> &T {
174        // Safety:
175        // The thread that created the pointer will block until this capsule is dropped
176        // before dropping the reference this is derived from.
177        unsafe { self.data.as_ref() }
178    }
179}
180
181impl<T: ?Sized + Sync> Debug for Capsule<T>
182where
183    for<'a> &'a T: Debug,
184{
185    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
186        fmt.debug_tuple("Capsule").field(&self.deref()).finish()
187    }
188}
189
190/// If this is dropped while panicking, then the original thread will also panic.
191pub struct CapsuleMut<T: ?Sized + Sync> {
192    notifier: Notifier,
193    data: NonNull<T>,
194}
195
196impl<T: ?Sized + Sync> Deref for CapsuleMut<T> {
197    type Target = T;
198
199    fn deref(&self) -> &T {
200        // Safety:
201        // The thread that created the pointer will block until this capsule is dropped
202        // before dropping the reference this is derived from.
203        unsafe { self.data.as_ref() }
204    }
205}
206
207impl<T: ?Sized + Sync> Drop for CapsuleMut<T> {
208    fn drop(&mut self) {
209        // If a thread panicks while holding a mutable capsule,
210        // the referenced data may be in an inconsistent state, so
211        // propagate the panic.
212        if thread::panicking() {
213            self.notifier.poison();
214        }
215    }
216}
217
218impl<T: ?Sized + Sync> DerefMut for CapsuleMut<T> {
219    fn deref_mut(&mut self) -> &mut T {
220        // Safety:
221        // The thread that created the pointer will block until this capsule is dropped
222        // before dropping the reference this is derived from.
223        unsafe { self.data.as_mut() }
224    }
225}
226
227/// Type representing a scope for encapsulated references
228///
229/// Most user code shouldn't have to directly use this. It is primarily for implementing
230/// [`Encapsulate`]. It can convert a `&T` or `&mut T` into a `Capsule<T>` or `CapsuleMut<T>`
231/// respectively. A typical implementation of [`Encapsulate`] will project embedded reference
232/// types from the source type to capsule types in the destination types, using the methods on
233/// `Scope`.
234///
235pub struct Scope<'env> {
236    state: Arc<State>,
237    _env: PhantomData<&'env mut &'env ()>,
238}
239
240impl<'env> Scope<'env> {
241    /// Encapsulate a `&T` so it can be passed to another thread.
242    #[inline]
243    pub fn encapsulate<T: ?Sized + Sync>(&self, data: &'env T) -> Capsule<T> {
244        self.state.increment();
245        Capsule {
246            _notifier: Notifier(self.state.clone()),
247            data: data.into(),
248        }
249    }
250
251    /// Encapsulate a `&mut T` so it can be passed to another thread.
252    #[inline]
253    pub fn encapsulate_mut<T: ?Sized + Sync>(&self, data: &'env mut T) -> CapsuleMut<T> {
254        self.state.increment();
255        CapsuleMut {
256            notifier: Notifier(self.state.clone()),
257            data: data.into(),
258        }
259    }
260
261    fn new() -> Self {
262        Scope {
263            state: Arc::new(State {
264                source: thread::current(),
265                count: AtomicUsize::new(0),
266                poisoned: AtomicBool::new(false),
267            }),
268            _env: PhantomData,
269        }
270    }
271
272    /// Synchronously wait for all capsules created by this scope to be dropped.
273    fn wait(&self) {
274        while self.state.count.load(Ordering::Acquire) > 0 {
275            thread::park();
276        }
277    }
278}
279
280/// Create a scope for sending references to another thread.
281///
282/// Encapsulate one or more references from `data` inside of `Capsule` or `CapsuleMut` which can
283/// be safely passed to another thread (for example, through a channel). The `f` function is then
284/// called with the capsule(s), and then this function will wait until all created capsules have
285/// been dropped, even if they are eventually dropped by another thread.
286///
287/// Note that this function will block unless the capsules are dropped by `f` on the same thread.
288///
289/// `data` is any type that implements the [`Encapsulate`] trait. The most significant
290/// implementations of this trait includes:
291///
292///  - `&T` which is convertend into a `Capsule<T>`
293///  - `&mut T` which is converted into a `CapsuleMut<T>`
294///  - A tuple of up to 8 elements, each of which implements `Encapsulate`, which is converted into
295///    a tuple where each item is converted into the respective `Encapsulated` type.
296///  - `[T; N]` where T implements `Encapsulate`.
297///  - [`encapsulate::Gen`] which will allow you to directly use the [`Scope`] to convert
298///  references to capsules inside a function or (more likely) closure.
299pub fn with_encapsulated<'env, E, F, T>(data: E, f: F) -> T
300where
301    E: Encapsulate<'env>,
302    F: FnOnce(E::Encapsulated) -> T,
303{
304    let scope = Scope::<'env>::new();
305    let res = catch_unwind(AssertUnwindSafe(|| f(data.encapsulate(&scope))));
306    scope.wait();
307    match res {
308        Err(e) => resume_unwind(e),
309        Ok(_) if scope.state.poisoned.load(Ordering::Relaxed) => {
310            panic!("a thread panicked while holding a mutable capsule")
311        }
312        Ok(result) => result,
313    }
314}