use std::{
alloc::{
Layout, alloc, dealloc, handle_alloc_error
}, cell::UnsafeCell, marker::PhantomData, mem::{
MaybeUninit,
transmute,
}, ptr::NonNull, sync::atomic::{AtomicU8, Ordering}
};
pub mod strategy {
pub trait SpawnStrategy {
type Return<T>;
fn spawn<F: FnOnce() + Send + 'static, T>(with: T, worker: F) -> Self::Return<T>;
}
pub enum Std {}
impl SpawnStrategy for Std {
type Return<T> = (T, ::std::thread::JoinHandle<()>);
#[inline]
fn spawn<F: FnOnce() + Send + 'static, T>(with: T, f: F) -> Self::Return<T> {
let handle = std::thread::spawn(f);
(with, handle)
}
}
#[cfg(feature = "rayon")]
pub enum Rayon {}
#[cfg(feature = "rayon")]
impl SpawnStrategy for Rayon {
type Return<T> = T;
#[inline]
fn spawn<F: FnOnce() + Send + 'static, T>(with: T, f: F) -> Self::Return<T> {
rayon::spawn(f);
with
}
}
#[cfg(not(feature = "rayon"))]
pub type DefaultStrategy = Std;
#[cfg(feature = "rayon")]
pub type DefaultStrategy = Rayon;
}
#[repr(u8, align(1))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum State {
Taken = 0,
Waiting = 1,
AvailableSoon = 2,
Ready = 3,
}
#[repr(u8, align(1))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Reason {
Taken = 0,
Waiting = 1,
AvailableSoon = 2,
}
#[repr(transparent)]
#[derive(Debug)]
struct AtomicState(AtomicU8);
impl AtomicState {
#[inline(always)]
const fn new() -> Self {
Self(AtomicU8::new(unsafe { transmute(State::Waiting) }))
}
#[inline(always)]
fn store(&self, state: State) {
self.0.store(unsafe { transmute(state) }, Ordering::Release);
}
#[must_use]
#[inline(always)]
fn load(&self) -> State {
unsafe { transmute(self.0.load(Ordering::Acquire)) }
}
#[inline(always)]
fn compare_exchange(&self, current: State, new: State) -> Result<State, State> {
let current: u8 = unsafe { transmute(current) };
let new: u8 = unsafe { transmute(new) };
match self.0.compare_exchange(current, new, Ordering::AcqRel, Ordering::Relaxed) {
Ok(previous) => Ok(unsafe { transmute(previous) }),
Err(previous) => Err(unsafe { transmute(previous) }),
}
}
#[inline(always)]
fn is_ready(&self) -> bool {
self.compare_exchange(
State::Ready,
State::Ready,
).is_ok()
}
#[inline(always)]
fn take_if_ready(&self) -> Result<(), Reason> {
match self.compare_exchange(State::Ready, State::Taken) {
Ok(_) => Ok(()),
Err(reason) => Err(unsafe { transmute(reason) })
}
}
}
#[repr(C)]
struct Inner<R> {
response: UnsafeCell<MaybeUninit<R>>,
ref_count: AtomicU8,
state: AtomicState,
}
impl<R> Inner<R> {
const LAYOUT: Layout = Layout::new::<Self>();
fn alloc_new() -> NonNull<Inner<R>> {
let ptr = unsafe { alloc(Self::LAYOUT).cast() };
let Some(raw) = NonNull::new(ptr) else {
handle_alloc_error(Self::LAYOUT);
};
unsafe {
raw.write(Self {
response: UnsafeCell::new(MaybeUninit::uninit()),
ref_count: AtomicU8::new(2),
state: AtomicState::new(),
});
}
raw
}
unsafe fn decrement_ref_count_and_maybe_free(raw: NonNull<Self>) {
let inner_ref = unsafe { raw.as_ref() };
if inner_ref.ref_count.fetch_sub(1, Ordering::AcqRel) == 1 {
unsafe { Self::drop_and_dealloc(raw); }
}
}
unsafe fn drop_and_dealloc(mut raw: NonNull<Self>) {
{
let inner_mut = unsafe { raw.as_mut() };
let state = inner_mut.state.load();
use State::*;
match state {
Taken | Waiting => (),
AvailableSoon => unreachable!("Invalid state on cleanup."),
Ready => unsafe { inner_mut.response.get_mut().assume_init_drop() },
}
}
unsafe { raw.drop_in_place() };
unsafe { dealloc(raw.as_ptr().cast(), Self::LAYOUT); }
}
}
mod marker {
pub trait HandleType: Send + Sized + 'static {}
#[derive(Debug)]
pub enum Sender {}
#[derive(Debug)]
pub enum Receiver {}
impl HandleType for Sender {}
impl HandleType for Receiver {}
}
#[repr(transparent)]
#[derive(Debug)]
struct Handle<R, Type: marker::HandleType> {
raw: NonNull<Inner<R>>,
_phantom: PhantomData<*const Type>,
}
type SendHandle<R> = Handle<R, marker::Sender>;
type RecvHandle<R> = Handle<R, marker::Receiver>;
type SpawnOutput<R, S> = <S as strategy::SpawnStrategy>::Return<Pending<R>>;
impl<R: Send + 'static, Type: marker::HandleType> Handle<R, Type> {
#[must_use]
#[inline(always)]
fn from_raw(raw: NonNull<Inner<R>>) -> Self {
Self {
raw,
_phantom: PhantomData,
}
}
}
impl<R, Type: marker::HandleType> Drop for Handle<R, Type> {
fn drop(&mut self) {
unsafe {
Inner::<R>::decrement_ref_count_and_maybe_free(self.raw);
}
}
}
#[repr(transparent)]
#[derive(Debug)]
pub struct Pending<R: Send + 'static> {
handle: RecvHandle<R>,
}
#[repr(transparent)]
#[derive(Debug)]
pub struct Responder<R: Send + 'static> {
handle: SendHandle<R>,
}
impl<R: Send + 'static> Responder<R> {
#[inline(always)]
pub fn respond(self, result: R) {
let inner_ref = unsafe {
self.handle.raw.as_ref()
};
inner_ref.state.store(State::AvailableSoon);
unsafe {
inner_ref.response.get().write(MaybeUninit::new(result));
}
inner_ref.state.store(State::Ready);
}
}
impl<R: Send + 'static> Pending<R> {
#[must_use]
#[inline]
pub fn is_ready(&self) -> bool {
let inner_ref = unsafe {
self.handle.raw.as_ref()
};
inner_ref.state.is_ready()
}
#[inline]
pub fn try_recv(&self) -> Result<R, Reason> {
let inner_ref = unsafe { self.handle.raw.as_ref() };
inner_ref.state
.take_if_ready()
.map(move |_| {
unsafe { inner_ref.response.get().read().assume_init() }
})
}
}
unsafe impl<R: Send + 'static, Type: marker::HandleType> Send for Handle<R, Type> {}
unsafe impl<R: Send + Sync + 'static, Type: marker::HandleType> Sync for Handle<R, Type> {}
unsafe impl<R: Send + 'static> Send for Pending<R> {}
unsafe impl<R: Send + Sync + 'static> Sync for Pending<R> {}
unsafe impl<R: Send + 'static> Send for Responder<R> {}
unsafe impl<R: Send + Sync + 'static> Sync for Responder<R> {}
#[must_use]
#[inline]
pub fn pair<R>() -> (Responder<R>, Pending<R>)
where R: Send + 'static {
let raw = Inner::<R>::alloc_new();
(
Responder { handle: Handle::from_raw(raw) },
Pending { handle: Handle::from_raw(raw) },
)
}
#[must_use]
#[inline]
pub fn spawn<S, R, F>(worker: F) -> SpawnOutput<R, S>
where
S: strategy::SpawnStrategy,
R: Send + 'static,
F: FnOnce() -> R + Send + 'static,
{
let (responder, pending) = pair();
S::spawn(pending, #[inline(always)] move || {
responder.respond(worker());
})
}
#[must_use]
#[inline]
pub fn spawn_std<R, F>(worker: F) -> SpawnOutput<R, strategy::Std>
where
R: Send + 'static,
F: FnOnce() -> R + Send + 'static,
{
spawn::<strategy::Std, R, F>(worker)
}
#[cfg(feature = "rayon")]
#[must_use]
#[inline]
pub fn spawn_rayon<R, F>(worker: F) -> SpawnOutput<R, strategy::Rayon> {
spawn::<strategy::Rayon, R, F>(worker)
}