#![warn(clippy::pedantic)]
#![allow(clippy::match_wild_err_arm)]
#![allow(clippy::single_match_else)]
#![allow(unused_unsafe)]
use std::{
any::{self, Any},
cell::UnsafeCell,
future::Future,
marker::PhantomData,
mem::{self, MaybeUninit},
pin::Pin,
ptr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll, Waker},
thread::{self, Thread},
};
#[cfg(not(feature = "parking_lot"))]
use std::sync;
#[inline]
pub fn two<I, R: Any + Send>(input: I) -> (Sender<I>, Receiver<R>) {
let inner = Arc::new(Inner {
result: UnsafeCell::new(MaybeUninit::uninit()),
is_result_set: AtomicBool::new(false),
tow: Mutex::new(ThreadOrWaker::None),
});
let sender = Sender {
inner: inner.clone(),
input: Some(input),
};
let receiver = Receiver {
inner,
_marker: PhantomData,
};
(sender, receiver)
}
#[inline]
pub fn complete_boxed<R: Any + Send>(boxed: Box<R>) -> Receiver<R> {
Receiver {
inner: Arc::new(Inner {
result: UnsafeCell::new(MaybeUninit::new(boxed)),
is_result_set: AtomicBool::new(true),
tow: Mutex::new(ThreadOrWaker::None),
}),
_marker: PhantomData,
}
}
#[inline]
pub fn complete<R: Any + Send>(result: R) -> Receiver<R> {
complete_boxed(Box::new(result))
}
#[must_use]
pub struct Sender<I> {
inner: Arc<dyn InnerGeneric + Send + Sync + 'static>,
input: Option<I>,
}
impl<I> Sender<I> {
#[inline]
pub fn input(&mut self) -> Option<I> {
self.input.take()
}
#[inline]
pub fn send<T: Any + Send>(self, res: T) {
unsafe {
self.inner.set_result(Box::new(res));
}
self.inner.wake();
}
}
#[must_use]
pub struct Receiver<R> {
inner: Arc<Inner<R>>,
_marker: PhantomData<Option<R>>,
}
impl<R: Any + Send> Receiver<R> {
#[inline]
#[must_use]
pub fn recv(self) -> R {
let res = self.inner.park_until_result();
*res
}
#[inline]
#[must_use]
pub fn wait(self) -> R {
self.recv()
}
}
impl<R: Any + Send> Future for Receiver<R> {
type Output = R;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<R> {
match self.inner.is_result_set() {
true => {
Poll::Ready(*unsafe { self.inner.get_result() })
}
false => {
*self.inner.tow.lock() = ThreadOrWaker::Waker(cx.waker().clone());
Poll::Pending
}
}
}
}
struct Inner<T> {
tow: Mutex<ThreadOrWaker>,
is_result_set: AtomicBool,
result: UnsafeCell<MaybeUninit<Box<T>>>,
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
impl<T: Any + Send> Inner<T> {
#[inline]
fn is_result_set(&self) -> bool {
self.is_result_set.load(Ordering::Acquire)
}
#[inline]
unsafe fn get_result(&self) -> Box<T> {
unsafe { MaybeUninit::assume_init(ptr::read(self.result.get())) }
}
#[inline]
fn park_until_result(&self) -> Box<T> {
loop {
if self.is_result_set() {
return unsafe { self.get_result() };
}
let cur_thread = thread::current();
*self.tow.lock() = ThreadOrWaker::Thread(cur_thread);
if self.is_result_set() {
return unsafe { self.get_result() };
}
thread::park();
}
}
}
trait InnerGeneric {
unsafe fn set_result(&self, item: Box<dyn Any + Send>);
fn wake(&self);
}
impl<T: Any + Send> InnerGeneric for Inner<T> {
#[inline]
unsafe fn set_result(&self, item: Box<dyn Any + Send>) {
let item = match item.downcast::<T>() {
Err(_) => panic!(
"Passed item is not of expected type \"{}\"",
any::type_name::<T>()
),
Ok(item) => item,
};
unsafe { ptr::write(self.result.get(), MaybeUninit::new(item)) };
self.is_result_set.store(true, Ordering::Release);
}
#[inline]
fn wake(&self) {
let mut lock = self.tow.lock();
match mem::take(&mut *lock) {
ThreadOrWaker::None => (),
ThreadOrWaker::Thread(t) => t.unpark(),
ThreadOrWaker::Waker(w) => w.wake(),
}
}
}
enum ThreadOrWaker {
None,
Thread(Thread),
Waker(Waker),
}
impl Default for ThreadOrWaker {
#[inline]
fn default() -> Self {
Self::None
}
}
#[cfg(feature = "parking_lot")]
#[repr(transparent)]
struct Mutex<T>(parking_lot::Mutex<T>);
#[cfg(feature = "parking_lot")]
impl<T> Mutex<T> {
#[inline]
fn new(val: T) -> Self {
Self(parking_lot::Mutex::new(val))
}
#[inline]
fn lock(&self) -> parking_lot::MutexGuard<'_, T> {
self.0.lock()
}
}
#[cfg(not(feature = "parking_lot"))]
struct Mutex<T>(sync::Mutex<T>);
#[cfg(not(feature = "parking_lot"))]
impl<T> Mutex<T> {
#[inline]
fn new(val: T) -> Self {
Self(sync::Mutex::new(val))
}
#[inline]
fn lock(&self) -> sync::MutexGuard<'_, T> {
self.0.lock().expect("Unable to lock mutex")
}
}