1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
// MIT/Apache2 License
//! A runtime that allows thread-unsafe code to be executed on a designated
//!
//! ## Motivation
//!
//! There are quite a few APIs that are thread unsafe. The one I had in mind
//! while designing this crate is the Windows `winuser` windowing API, but
//! there are many others. While small programs may be able to get away with
//! being thread unsafe, larger programs, with runtimes that require `Safe`
//! bounds, may not.
//!
//! Ordinarily, these programs will have to resort to convoluted systems to
//! ensure that code runs on a desigated "local" thread or thread pool. The
//! goal of this crate is to simplify these systems by providing a runtime
//! that allows thread-unsafe code to be executed on a designated thread.
//!
//! ## Usage
//!
//! First, create a type to use as a [`Tag`]. This type will be used to
//! uniquely identify the thread that the runtime will run on at compile time.
//! This ensures that values that are native to one thread will not be used
//! on another thread.
//!
//! Any `'static` type can be used as a tag, and it's recommended to use a
//! zero-sized type.
//!
//! ```rust
//! struct MyTag;
//! # let _ = MyTag;
//! ```
//!
//! Then, create a [`BreadThread`] type. This is the runtime that directives
//! are sent along. To spawn a new thread to run directives on, use the `new`
//! method.
//!
//! ```rust
//! use breadthread::BreadThread;
//!
//! # struct MyTag;
//! let bt = BreadThread::<'static, MyTag>::new();
//! # let _ = bt;
//! ```
//!
//! However, if you already have a system that you'd like to take advantage of
//! (a dedicated thread pool like [`rayon`], for instance), you can use the
//! `undriven()` method to create both a `BreadThread` and a [`Driver`]. You can
//! transform a thread or thread-like task into the driving thread by calling
//! `drive()` on the `Driver`.
//!
//! ```rust
//! # use breadthread::BreadThread;
//! # struct MyTag;
//! let (bt, driver) = BreadThread::<'static, MyTag>::undriven();
//! my_runtime::spawn_task(move || driver.drive());
//! # let _ = bt;
//! # mod my_runtime {
//! # pub fn spawn_task<F: FnOnce()>(_f: F) {}
//! # }
//! ```
//!
//! Note that the `BreadThread` and `Driver` are parameterized by a lifetime, which
//! in this case is `'static`. The lifetime is used as a bound for the directives that
//! we send to the thread. Consider using this if you want to send a directive that borrows
//! other data.
//!
//! Now, we can call the `run` method on the `BreadThread` to run a given method.
//!
//! ```rust
//! # use breadthread::BreadThread;
//! # struct MyTag;
//! # let bt = BreadThread::<'static, MyTag>::new();
//!
//! use breadthread::DirectiveOutput;
//!
//! let input_value = 7;
//! let value = bt.run((), move |()| {
//! let ret_ty = thread_unsafe_code(input_value);
//! DirectiveOutput {
//! thread_safe_value: (),
//! thread_unsafe_value: ret_ty,
//! deleted_values: vec![],
//! }
//! });
//!
//! # fn thread_unsafe_code(_i: i32) -> i32 { 0 }
//! ```
//!
//! `bt.run()` expects a return type of [`DirectiveOutput`], which consists of:
//!
//! - A `thread_safe_value` that is implied to be `Send` and `Sync`.
//! - A `thread_unsafe_value` that may not be any of these. Once `value` returns, this
//! value will be wrapped in [`Object`], which essentially allows it to be sent to
//! other threads, but only used in the driving thread. To use this value again, pass it
//! into the `bt.run()` method in place of the empty tuple, and it can be used raw
//! again. Tuples and slices of `Object`s can be returned using this strategy.
//! Note that values of this kind have to implement [`Compatible`].
//! - `deleted_values` consists of values to be deleted from the thread's internal bank
//! that keeps track of the values that are valid for it. By default, values returned
//! are added to the "valid" list, so if you don't expect to use the value again, you
//! can add it to the `deleted_values` list.
//!
//! `value` is of type [`Value`], and resolves to a tuple of `thread_safe_value` and the
//! safe version of `thread_unsafe_value`. It can be resolved in one of three ways:
//!
//! - Poll for whether or not it's resolved using `value.resolve()`.
//! - Wait for it to be resolved by parking the thread, using `value.wait()`.
//! - With the `async` feature enabled, `Value` implements [`Future`].
//!
//! ## Safety
//!
//! Using `tag` ensures that the bread thread will only have values that are tagged as
//! valid associated with it. As long as two threads do not have the same tag, this
//! validation is done at compile time, and the only real overhead is in sending and
//! receiving values from the two threads.
//!
//! If more than one thread is created with the same tag, by default the library panics.
//! If this behavior is not desired, enable the `fallback` feature. Instead, when two
//! threads share a tag, they will manually keep track of which values are valid for
//! which thread.
//!
//! ## `no_std`
//!
//! The `std` feature is enabled by default. Without `std`, this crate only relies on the
//! `alloc` crate. However, certain changes are made both externally and internally.
//!
//! - `BreadThread::new()`, `Driv(er::drive()` and `Value::wait()` are not present.
//! - Internally, the data structures use spinlock-based APIs instead of ones based on
//! system synchronization. This is often undesireable behavior.
//!
//! It is recommended to use the `std` feature unless it is necessary to use this crate
//! in a `no_std` environment.
//!
//! [`rayon`]: https://crates.io/crates/rayon
//! [`Future`]: std::future::Future
#![no_std]
#![deprecated = "It is probably a bad idea to use this crate"]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use ahash::RandomState;
use alloc::boxed::Box;
use core::{any::TypeId, marker::PhantomData, mem};
use hashbrown::{hash_map::Entry, HashMap as HbHashMap, HashSet as HbHashSet};
use sync::{Arc, AtomicUsize, SeqCst};
pub use value::Value;
pub(crate) mod channel;
pub(crate) mod sync;
pub(crate) mod value;
mod current;
mod directive;
mod driver;
mod object;
mod wrapped;
pub use current::if_tagged_thread;
pub use directive::DirectiveOutput;
pub use driver::{Driver, Pinned, Unpinned};
pub use object::{Compatible, Object};
pub use wrapped::Wrapped;
/// A runtime for allowing thread unsafe code to be run on a designated
/// thread.
pub struct BreadThread<'lt, Tag: 'static> {
// a channel used to send directives to the thread
sender: channel::Sender<directive::Directive<'lt>>,
// count to decrement on drop
count: Arc<AtomicUsize>,
_tag: PhantomData<Tag>,
}
impl<'lt, Tag: 'static> Drop for BreadThread<'lt, Tag> {
fn drop(&mut self) {
// decrement the count
self.count.fetch_sub(1, SeqCst);
}
}
impl<'lt, Tag: 'static> BreadThread<'lt, Tag> {
/// Creates a new `BreadThread` along with a `Driver`.
///
/// This method can be used to use the current thread as the driving thread,
/// if that is desired.
pub fn undriven() -> (Self, Driver<'lt, Unpinned>) {
// check to see if we need to use fallback capabilities
let id = TypeId::of::<Tag>();
let fallback = match sync::lock(&*TAGS).entry(id) {
Entry::Occupied(entry) => {
let fallback = entry.get();
fallback.fetch_add(1, SeqCst);
fallback.clone()
}
Entry::Vacant(entry) => {
let fallback = Arc::new(AtomicUsize::new(1));
entry.insert(fallback.clone());
fallback
}
};
// if we don't have to deal with fallbacks, panic if we have more than one
// thread with this tag active
#[cfg(not(feature = "fallback"))]
if fallback.load(SeqCst) > 1 {
panic!(
"
cannot create more than one BreadThread with the same tag
enable the `fallback` feature on the `breadthread` crate to allow this
"
);
}
let (sender, receiver) = channel::channel();
let bt = Self {
sender,
count: fallback.clone(),
_tag: PhantomData,
};
let driver = Driver::new::<Tag>(receiver, fallback);
(bt, driver)
}
/// Send a new directive to the thread to be polled and used.
pub fn run<
Input: 'lt + Wrapped<Tag>,
NtsOutput: 'lt + Send + Sync,
TsOutput: 'lt + Wrapped<Tag>,
>(
&self,
input: Input,
op: impl FnOnce(Input::Unwrapped) -> DirectiveOutput<NtsOutput, TsOutput::Unwrapped>
+ Send
+ 'lt,
) -> Value<(NtsOutput, TsOutput)> {
// create the directive and output value
let (directive, value) = directive::Directive::new(input, op);
// try to send the directive
match self.sender.send(directive) {
Ok(()) => value,
Err(channel::TrySendError::Disconnected(_)) => {
panic!("Driver has been dropped, cannot send directive")
}
Err(channel::TrySendError::Full(_)) => {
panic!("{}", CHANNEL_FULL)
}
}
}
}
#[cfg(feature = "std")]
impl<'lt, Tag: 'static> BreadThread<'lt, Tag> {
/// Create a new `BreadThread` that spawns a new thread that is used
/// to run the directives.
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
static ID_GENERATOR: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
let id = ID_GENERATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (this, driver) = BreadThread::undriven();
// box a function for driving the thread, and cast it to make it static
// SAFETY: driver will always outlive the bread thread
let driver: Box<dyn FnOnce() + Send + 'lt> = Box::new(move || driver.pin().drive());
let driver: *mut (dyn FnOnce() + Send + 'lt) = Box::into_raw(driver);
let driver: *mut (dyn FnOnce() + Send + 'static) = unsafe { mem::transmute(driver) };
let driver: Box<dyn FnOnce() + Send + 'static> = unsafe { Box::from_raw(driver as *mut _) };
std::thread::Builder::new()
.name(std::format!("breadthread-{}", id))
.spawn(driver)
.expect("failed to spawn thread");
this
}
}
/// A hash set containing tags that currently exist.
static TAGS: sync::Lazy<sync::Mutex<HashMap<TypeId, Arc<AtomicUsize>>>> =
sync::Lazy::new(|| sync::Mutex::new(HashMap::with_hasher(RandomState::default())));
type HashSet<K> = HbHashSet<K, RandomState>;
type HashMap<K, V> = HbHashMap<K, V, RandomState>;
// error messages
const CHANNEL_FULL: &str = "
The bounded channel used to send directives to the driving thread is full.
This is unlikely to happen, and should only really happen in one of three cases:
1). The driving thread did not call `Driver::drive()` or `Driver::tick()`.
2). The driving thread is hanging on a directive that is never resolved.
3). Other threads sent too many directives too fast, and the driving thread is unable
to process all of them.
If resolving one of these three use cases is not possible, set the
`BREADTHREAD_UNBOUNDED_CHANNEL` environment variable, and the channel will be
unbounded.
";