breadthread/
lib.rs

1// MIT/Apache2 License
2
3//! A runtime that allows thread-unsafe code to be executed on a designated
4//!
5//! ## Motivation
6//!
7//! There are quite a few APIs that are thread unsafe. The one I had in mind
8//! while designing this crate is the Windows `winuser` windowing API, but
9//! there are many others. While small programs may be able to get away with
10//! being thread unsafe, larger programs, with runtimes that require `Safe`
11//! bounds, may not.
12//!
13//! Ordinarily, these programs will have to resort to convoluted systems to
14//! ensure that code runs on a desigated "local" thread or thread pool. The
15//! goal of this crate is to simplify these systems by providing a runtime
16//! that allows thread-unsafe code to be executed on a designated thread.
17//!
18//! ## Usage
19//!
20//! First, create a type to use as a [`Tag`]. This type will be used to
21//! uniquely identify the thread that the runtime will run on at compile time.
22//! This ensures that values that are native to one thread will not be used
23//! on another thread.
24//!
25//! Any `'static` type can be used as a tag, and it's recommended to use a
26//! zero-sized type.
27//!
28//! ```rust
29//! struct MyTag;
30//! # let _ = MyTag;
31//! ```
32//!
33//! Then, create a [`BreadThread`] type. This is the runtime that directives
34//! are sent along. To spawn a new thread to run directives on, use the `new`
35//! method.
36//!
37//! ```rust
38//! use breadthread::BreadThread;
39//!
40//! # struct MyTag;
41//! let bt = BreadThread::<'static, MyTag>::new();
42//! # let _ = bt;
43//! ```
44//!
45//! However, if you already have a system that you'd like to take advantage of
46//! (a dedicated thread pool like [`rayon`], for instance), you can use the
47//! `undriven()` method to create both a `BreadThread` and a [`Driver`]. You can
48//! transform a thread or thread-like task into the driving thread by calling
49//! `drive()` on the `Driver`.
50//!
51//! ```rust
52//! # use breadthread::BreadThread;
53//! # struct MyTag;
54//! let (bt, driver) = BreadThread::<'static, MyTag>::undriven();
55//! my_runtime::spawn_task(move || driver.drive());
56//! # let _ = bt;
57//! # mod my_runtime {
58//! #     pub fn spawn_task<F: FnOnce()>(_f: F) {}
59//! # }
60//! ```
61//!
62//! Note that the `BreadThread` and `Driver` are parameterized by a lifetime, which
63//! in this case is `'static`. The lifetime is used as a bound for the directives that
64//! we send to the thread. Consider using this if you want to send a directive that borrows
65//! other data.
66//!
67//! Now, we can call the `run` method on the `BreadThread` to run a given method.
68//!
69//! ```rust
70//! # use breadthread::BreadThread;
71//! # struct MyTag;
72//! # let bt = BreadThread::<'static, MyTag>::new();
73//!
74//! use breadthread::DirectiveOutput;
75//!
76//! let input_value = 7;
77//! let value = bt.run((), move |()| {
78//!     let ret_ty = thread_unsafe_code(input_value);
79//!     DirectiveOutput {
80//!         thread_safe_value: (),
81//!         thread_unsafe_value: ret_ty,
82//!         deleted_values: vec![],
83//!     }
84//! });
85//!
86//! # fn thread_unsafe_code(_i: i32) -> i32 { 0 }
87//! ```
88//!
89//! `bt.run()` expects a return type of [`DirectiveOutput`], which consists of:
90//!
91//! - A `thread_safe_value` that is implied to be `Send` and `Sync`.
92//! - A `thread_unsafe_value` that may not be any of these. Once `value` returns, this
93//!   value will be wrapped in [`Object`], which essentially allows it to be sent to
94//!   other threads, but only used in the driving thread. To use this value again, pass it
95//!   into the `bt.run()` method in place of the empty tuple, and it can be used raw
96//!   again. Tuples and slices of `Object`s can be returned using this strategy.
97//!   Note that values of this kind have to implement [`Compatible`].
98//! - `deleted_values` consists of values to be deleted from the thread's internal bank
99//!   that keeps track of the values that are valid for it. By default, values returned
100//!   are added to the "valid" list, so if you don't expect to use the value again, you
101//!   can add it to the `deleted_values` list.
102//!
103//! `value` is of type [`Value`], and resolves to a tuple of `thread_safe_value` and the
104//! safe version of `thread_unsafe_value`. It can be resolved in one of three ways:
105//!
106//! - Poll for whether or not it's resolved using `value.resolve()`.
107//! - Wait for it to be resolved by parking the thread, using `value.wait()`.
108//! - With the `async` feature enabled, `Value` implements [`Future`].
109//!
110//! ## Safety
111//!
112//! Using `tag` ensures that the bread thread will only have values that are tagged as
113//! valid associated with it. As long as two threads do not have the same tag, this
114//! validation is done at compile time, and the only real overhead is in sending and
115//! receiving values from the two threads.
116//!
117//! If more than one thread is created with the same tag, by default the library panics.
118//! If this behavior is not desired, enable the `fallback` feature. Instead, when two
119//! threads share a tag, they will manually keep track of which values are valid for
120//! which thread.
121//!
122//! ## `no_std`
123//!
124//! The `std` feature is enabled by default. Without `std`, this crate only relies on the
125//! `alloc` crate. However, certain changes are made both externally and internally.
126//!
127//! - `BreadThread::new()`, `Driv(er::drive()` and `Value::wait()` are not present.
128//! - Internally, the data structures use spinlock-based APIs instead of ones based on
129//!   system synchronization. This is often undesireable behavior.
130//!
131//! It is recommended to use the `std` feature unless it is necessary to use this crate
132//! in a `no_std` environment.
133//!
134//! [`rayon`]: https://crates.io/crates/rayon
135//! [`Future`]: std::future::Future
136
137#![no_std]
138#![deprecated = "It is probably a bad idea to use this crate"]
139
140extern crate alloc;
141
142#[cfg(feature = "std")]
143extern crate std;
144
145use ahash::RandomState;
146use alloc::boxed::Box;
147use core::{any::TypeId, marker::PhantomData, mem};
148use hashbrown::{hash_map::Entry, HashMap as HbHashMap, HashSet as HbHashSet};
149use sync::{Arc, AtomicUsize, SeqCst};
150
151pub use value::Value;
152
153pub(crate) mod channel;
154pub(crate) mod sync;
155pub(crate) mod value;
156
157mod current;
158mod directive;
159mod driver;
160mod object;
161mod wrapped;
162
163pub use current::if_tagged_thread;
164pub use directive::DirectiveOutput;
165pub use driver::{Driver, Pinned, Unpinned};
166pub use object::{Compatible, Object};
167pub use wrapped::Wrapped;
168
169/// A runtime for allowing thread unsafe code to be run on a designated
170/// thread.
171pub struct BreadThread<'lt, Tag: 'static> {
172    // a channel used to send directives to the thread
173    sender: channel::Sender<directive::Directive<'lt>>,
174    // count to decrement on drop
175    count: Arc<AtomicUsize>,
176    _tag: PhantomData<Tag>,
177}
178
179impl<'lt, Tag: 'static> Drop for BreadThread<'lt, Tag> {
180    fn drop(&mut self) {
181        // decrement the count
182        self.count.fetch_sub(1, SeqCst);
183    }
184}
185
186impl<'lt, Tag: 'static> BreadThread<'lt, Tag> {
187    /// Creates a new `BreadThread` along with a `Driver`.
188    ///
189    /// This method can be used to use the current thread as the driving thread,
190    /// if that is desired.
191    pub fn undriven() -> (Self, Driver<'lt, Unpinned>) {
192        // check to see if we need to use fallback capabilities
193        let id = TypeId::of::<Tag>();
194        let fallback = match sync::lock(&*TAGS).entry(id) {
195            Entry::Occupied(entry) => {
196                let fallback = entry.get();
197                fallback.fetch_add(1, SeqCst);
198                fallback.clone()
199            }
200            Entry::Vacant(entry) => {
201                let fallback = Arc::new(AtomicUsize::new(1));
202                entry.insert(fallback.clone());
203                fallback
204            }
205        };
206
207        // if we don't have to deal with fallbacks, panic if we have more than one
208        // thread with this tag active
209        #[cfg(not(feature = "fallback"))]
210        if fallback.load(SeqCst) > 1 {
211            panic!(
212                "
213cannot create more than one BreadThread with the same tag
214enable the `fallback` feature on the `breadthread` crate to allow this
215            "
216            );
217        }
218
219        let (sender, receiver) = channel::channel();
220        let bt = Self {
221            sender,
222            count: fallback.clone(),
223            _tag: PhantomData,
224        };
225        let driver = Driver::new::<Tag>(receiver, fallback);
226
227        (bt, driver)
228    }
229
230    /// Send a new directive to the thread to be polled and used.
231    pub fn run<
232        Input: 'lt + Wrapped<Tag>,
233        NtsOutput: 'lt + Send + Sync,
234        TsOutput: 'lt + Wrapped<Tag>,
235    >(
236        &self,
237        input: Input,
238        op: impl FnOnce(Input::Unwrapped) -> DirectiveOutput<NtsOutput, TsOutput::Unwrapped>
239            + Send
240            + 'lt,
241    ) -> Value<(NtsOutput, TsOutput)> {
242        // create the directive and output value
243        let (directive, value) = directive::Directive::new(input, op);
244
245        // try to send the directive
246        match self.sender.send(directive) {
247            Ok(()) => value,
248            Err(channel::TrySendError::Disconnected(_)) => {
249                panic!("Driver has been dropped, cannot send directive")
250            }
251            Err(channel::TrySendError::Full(_)) => {
252                panic!("{}", CHANNEL_FULL)
253            }
254        }
255    }
256}
257
258#[cfg(feature = "std")]
259impl<'lt, Tag: 'static> BreadThread<'lt, Tag> {
260    /// Create a new `BreadThread` that spawns a new thread that is used
261    /// to run the directives.
262    #[allow(clippy::new_without_default)]
263    pub fn new() -> Self {
264        static ID_GENERATOR: std::sync::atomic::AtomicUsize =
265            std::sync::atomic::AtomicUsize::new(0);
266        let id = ID_GENERATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
267
268        let (this, driver) = BreadThread::undriven();
269
270        // box a function for driving the thread, and cast it to make it static
271        // SAFETY: driver will always outlive the bread thread
272        let driver: Box<dyn FnOnce() + Send + 'lt> = Box::new(move || driver.pin().drive());
273        let driver: *mut (dyn FnOnce() + Send + 'lt) = Box::into_raw(driver);
274        let driver: *mut (dyn FnOnce() + Send + 'static) = unsafe { mem::transmute(driver) };
275        let driver: Box<dyn FnOnce() + Send + 'static> = unsafe { Box::from_raw(driver as *mut _) };
276
277        std::thread::Builder::new()
278            .name(std::format!("breadthread-{}", id))
279            .spawn(driver)
280            .expect("failed to spawn thread");
281
282        this
283    }
284}
285
286/// A hash set containing tags that currently exist.
287static TAGS: sync::Lazy<sync::Mutex<HashMap<TypeId, Arc<AtomicUsize>>>> =
288    sync::Lazy::new(|| sync::Mutex::new(HashMap::with_hasher(RandomState::default())));
289
290type HashSet<K> = HbHashSet<K, RandomState>;
291type HashMap<K, V> = HbHashMap<K, V, RandomState>;
292
293// error messages
294
295const CHANNEL_FULL: &str = "
296The bounded channel used to send directives to the driving thread is full.
297
298This is unlikely to happen, and should only really happen in one of three cases:
299
3001). The driving thread did not call `Driver::drive()` or `Driver::tick()`.
3012). The driving thread is hanging on a directive that is never resolved.
3023). Other threads sent too many directives too fast, and the driving thread is unable
303    to process all of them.
304
305If resolving one of these three use cases is not possible, set the
306`BREADTHREAD_UNBOUNDED_CHANNEL` environment variable, and the channel will be
307unbounded.
308";