#![no_std]
#![deprecated = "It is probably a bad idea to use this crate"]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
use ahash::RandomState;
use alloc::boxed::Box;
use core::{any::TypeId, marker::PhantomData, mem};
use hashbrown::{hash_map::Entry, HashMap as HbHashMap, HashSet as HbHashSet};
use sync::{Arc, AtomicUsize, SeqCst};
pub use value::Value;
pub(crate) mod channel;
pub(crate) mod sync;
pub(crate) mod value;
mod current;
mod directive;
mod driver;
mod object;
mod wrapped;
pub use current::if_tagged_thread;
pub use directive::DirectiveOutput;
pub use driver::{Driver, Pinned, Unpinned};
pub use object::{Compatible, Object};
pub use wrapped::Wrapped;
pub struct BreadThread<'lt, Tag: 'static> {
sender: channel::Sender<directive::Directive<'lt>>,
count: Arc<AtomicUsize>,
_tag: PhantomData<Tag>,
}
impl<'lt, Tag: 'static> Drop for BreadThread<'lt, Tag> {
fn drop(&mut self) {
self.count.fetch_sub(1, SeqCst);
}
}
impl<'lt, Tag: 'static> BreadThread<'lt, Tag> {
pub fn undriven() -> (Self, Driver<'lt, Unpinned>) {
let id = TypeId::of::<Tag>();
let fallback = match sync::lock(&*TAGS).entry(id) {
Entry::Occupied(entry) => {
let fallback = entry.get();
fallback.fetch_add(1, SeqCst);
fallback.clone()
}
Entry::Vacant(entry) => {
let fallback = Arc::new(AtomicUsize::new(1));
entry.insert(fallback.clone());
fallback
}
};
#[cfg(not(feature = "fallback"))]
if fallback.load(SeqCst) > 1 {
panic!(
"
cannot create more than one BreadThread with the same tag
enable the `fallback` feature on the `breadthread` crate to allow this
"
);
}
let (sender, receiver) = channel::channel();
let bt = Self {
sender,
count: fallback.clone(),
_tag: PhantomData,
};
let driver = Driver::new::<Tag>(receiver, fallback);
(bt, driver)
}
pub fn run<
Input: 'lt + Wrapped<Tag>,
NtsOutput: 'lt + Send + Sync,
TsOutput: 'lt + Wrapped<Tag>,
>(
&self,
input: Input,
op: impl FnOnce(Input::Unwrapped) -> DirectiveOutput<NtsOutput, TsOutput::Unwrapped>
+ Send
+ 'lt,
) -> Value<(NtsOutput, TsOutput)> {
let (directive, value) = directive::Directive::new(input, op);
match self.sender.send(directive) {
Ok(()) => value,
Err(channel::TrySendError::Disconnected(_)) => {
panic!("Driver has been dropped, cannot send directive")
}
Err(channel::TrySendError::Full(_)) => {
panic!("{}", CHANNEL_FULL)
}
}
}
}
#[cfg(feature = "std")]
impl<'lt, Tag: 'static> BreadThread<'lt, Tag> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
static ID_GENERATOR: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
let id = ID_GENERATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (this, driver) = BreadThread::undriven();
let driver: Box<dyn FnOnce() + Send + 'lt> = Box::new(move || driver.pin().drive());
let driver: *mut (dyn FnOnce() + Send + 'lt) = Box::into_raw(driver);
let driver: *mut (dyn FnOnce() + Send + 'static) = unsafe { mem::transmute(driver) };
let driver: Box<dyn FnOnce() + Send + 'static> = unsafe { Box::from_raw(driver as *mut _) };
std::thread::Builder::new()
.name(std::format!("breadthread-{}", id))
.spawn(driver)
.expect("failed to spawn thread");
this
}
}
static TAGS: sync::Lazy<sync::Mutex<HashMap<TypeId, Arc<AtomicUsize>>>> =
sync::Lazy::new(|| sync::Mutex::new(HashMap::with_hasher(RandomState::default())));
type HashSet<K> = HbHashSet<K, RandomState>;
type HashMap<K, V> = HbHashMap<K, V, RandomState>;
const CHANNEL_FULL: &str = "
The bounded channel used to send directives to the driving thread is full.
This is unlikely to happen, and should only really happen in one of three cases:
1). The driving thread did not call `Driver::drive()` or `Driver::tick()`.
2). The driving thread is hanging on a directive that is never resolved.
3). Other threads sent too many directives too fast, and the driving thread is unable
to process all of them.
If resolving one of these three use cases is not possible, set the
`BREADTHREAD_UNBOUNDED_CHANNEL` environment variable, and the channel will be
unbounded.
";