#[must_use = "You should call Sender::send with the result"]
pub struct Sender<T>(std::sync::mpsc::Sender<T>);
impl<T> Sender<T> {
pub fn send(self, value: T) {
self.0.send(value).ok(); }
}
#[must_use]
pub struct Promise<T: Send + 'static> {
data: PromiseImpl<T>,
}
static_assertions::assert_not_impl_all!(Promise<u32>: Sync);
static_assertions::assert_impl_all!(Promise<u32>: Send);
impl<T: Send + 'static> Promise<T> {
pub fn new() -> (Sender<T>, Self) {
let (tx, rx) = std::sync::mpsc::channel();
(
Sender(tx),
Self {
data: PromiseImpl::Pending(rx),
},
)
}
pub fn from_ready(value: T) -> Self {
Self {
data: PromiseImpl::Ready(value),
}
}
pub fn spawn_thread<F>(thread_name: impl Into<String>, f: F) -> Self
where
F: FnOnce() -> T + Send + 'static,
{
let (sender, promise) = Self::new();
std::thread::Builder::new()
.name(thread_name.into())
.spawn(move || sender.send(f()))
.expect("Failed to spawn thread");
promise
}
#[cfg(feature = "tokio")]
pub fn spawn_blocking<F>(f: F) -> Self
where
F: FnOnce() -> T + Send + 'static,
{
let (sender, promise) = Self::new();
tokio::task::spawn(async move { sender.send(tokio::task::block_in_place(f)) });
promise
}
#[cfg(feature = "tokio")]
pub fn spawn_async(future: impl std::future::Future<Output = T> + 'static + Send) -> Self {
let (sender, promise) = Self::new();
tokio::task::spawn(async move { sender.send(future.await) });
promise
}
pub fn ready(&self) -> Option<&T> {
match self.poll() {
std::task::Poll::Pending => None,
std::task::Poll::Ready(value) => Some(value),
}
}
pub fn ready_mut(&mut self) -> Option<&mut T> {
match self.poll_mut() {
std::task::Poll::Pending => None,
std::task::Poll::Ready(value) => Some(value),
}
}
pub fn try_take(self) -> Result<T, Self> {
self.data.try_take().map_err(|data| Self { data })
}
pub fn block_until_ready(&self) -> &T {
self.data.block_until_ready()
}
pub fn block_until_ready_mut(&mut self) -> &mut T {
self.data.block_until_ready_mut()
}
pub fn block_and_take(self) -> T {
self.data.block_until_ready();
match self.data {
PromiseImpl::Pending(_) => unreachable!(),
PromiseImpl::Ready(value) => value,
}
}
pub fn poll(&self) -> std::task::Poll<&T> {
self.data.poll()
}
pub fn poll_mut(&mut self) -> std::task::Poll<&mut T> {
self.data.poll_mut()
}
}
enum PromiseImpl<T: Send + 'static> {
Pending(std::sync::mpsc::Receiver<T>),
Ready(T),
}
impl<T: Send + 'static> PromiseImpl<T> {
fn poll_mut(&mut self) -> std::task::Poll<&mut T> {
match self {
Self::Pending(rx) => {
if let Ok(value) = rx.try_recv() {
*self = Self::Ready(value);
match self {
Self::Ready(ref mut value) => std::task::Poll::Ready(value),
Self::Pending(_) => unreachable!(),
}
} else {
std::task::Poll::Pending
}
}
Self::Ready(ref mut value) => std::task::Poll::Ready(value),
}
}
fn try_take(self) -> Result<T, Self> {
match self {
Self::Pending(ref rx) => match rx.try_recv() {
Ok(value) => Ok(value),
Err(std::sync::mpsc::TryRecvError::Empty) => Err(self),
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
panic!("The Promise Sender was dropped")
}
},
Self::Ready(value) => Ok(value),
}
}
#[allow(unsafe_code)]
fn poll(&self) -> std::task::Poll<&T> {
match self {
Self::Pending(rx) => {
match rx.try_recv() {
Ok(value) => {
unsafe {
let myself = self as *const Self as *mut Self;
*myself = Self::Ready(value);
}
match self {
Self::Ready(ref value) => std::task::Poll::Ready(value),
Self::Pending(_) => unreachable!(),
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => std::task::Poll::Pending,
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
panic!("The Promise Sender was dropped")
}
}
}
Self::Ready(ref value) => std::task::Poll::Ready(value),
}
}
fn block_until_ready_mut(&mut self) -> &mut T {
match self {
Self::Pending(rx) => {
let value = rx.recv().expect("The Promise Sender was dropped");
*self = Self::Ready(value);
match self {
Self::Ready(ref mut value) => value,
Self::Pending(_) => unreachable!(),
}
}
Self::Ready(ref mut value) => value,
}
}
#[allow(unsafe_code)]
fn block_until_ready(&self) -> &T {
match self {
Self::Pending(rx) => {
let value = rx.recv().expect("The Promise Sender was dropped");
unsafe {
let myself = self as *const Self as *mut Self;
*myself = Self::Ready(value);
}
match self {
Self::Ready(ref value) => value,
Self::Pending(_) => unreachable!(),
}
}
Self::Ready(ref value) => value,
}
}
}