extern crate alloc;
use std::alloc::{Layout, dealloc};
use std::future::Future;
use std::mem::ManuallyDrop;
use std::panic::{RefUnwindSafe, UnwindSafe};
use crate::loom_exports::sync::atomic::{self, Ordering};
use super::Task;
use super::runnable::Runnable;
use super::util::{RunOnDrop, runnable_exists};
use super::{CLOSED, POLLING, REF_INC, REF_MASK};
#[derive(Debug)]
struct VTable<U: Send + 'static> {
poll: unsafe fn(*const ()) -> Stage<U>,
drop: unsafe fn(*const ()),
}
unsafe fn poll<F, S, T>(ptr: *const ()) -> Stage<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
let this = unsafe { &*(ptr as *const Task<F, S, T>) };
let state = this
.state
.fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| {
if s & (POLLING | CLOSED) == 0 {
Some(s | CLOSED)
} else {
None
}
});
if let Err(s) = state {
if s & CLOSED == CLOSED {
return Stage::Cancelled;
} else {
return Stage::Pending;
};
}
let output = this
.core
.with_mut(|c| unsafe { ManuallyDrop::take(&mut (*c).output) });
Stage::Ready(output)
}
unsafe fn drop<F, S, T>(ptr: *const ())
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
let this = unsafe { &*(ptr as *const Task<F, S, T>) };
let state = this.state.fetch_sub(REF_INC, Ordering::Release);
if state & REF_MASK == REF_INC && !runnable_exists(state) {
atomic::fence(Ordering::Acquire);
let _drop_guard = RunOnDrop::new(|| {
unsafe { dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>()) };
});
unsafe {
if state & POLLING == POLLING {
this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).future));
} else if state & CLOSED == 0 {
this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).output));
}
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) enum Stage<T> {
Ready(T),
Pending,
Cancelled,
}
impl<U> Stage<U> {
#[allow(unused)]
pub(crate) fn map<V, F>(self, f: F) -> Stage<V>
where
F: FnOnce(U) -> V,
{
match self {
Stage::Ready(t) => Stage::Ready(f(t)),
Stage::Pending => Stage::Pending,
Stage::Cancelled => Stage::Cancelled,
}
}
#[allow(unused)]
#[inline]
pub(crate) fn is_ready(&self) -> bool {
matches!(*self, Stage::Ready(_))
}
#[allow(unused)]
#[inline]
pub(crate) fn is_pending(&self) -> bool {
matches!(*self, Stage::Pending)
}
#[allow(unused)]
#[inline]
pub(crate) fn is_cancelled(&self) -> bool {
matches!(*self, Stage::Cancelled)
}
}
#[derive(Debug)]
pub(crate) struct Promise<U: Send + 'static> {
task: *const (),
vtable: &'static VTable<U>,
}
impl<U: Send + 'static> Promise<U> {
pub(super) unsafe fn new_unchecked<F, S, T>(task: *const Task<F, S, T>) -> Self
where
F: Future<Output = U> + Send + 'static,
S: Fn(Runnable, T) + Send + Sync + 'static,
T: Clone + Send + Sync + 'static,
{
Self {
task: task as *const (),
vtable: &VTable::<U> {
poll: poll::<F, S, T>,
drop: drop::<F, S, T>,
},
}
}
#[allow(unused)]
pub(crate) fn poll(&self) -> Stage<U> {
unsafe { (self.vtable.poll)(self.task) }
}
}
impl<U: Send + 'static> Drop for Promise<U> {
fn drop(&mut self) {
unsafe { (self.vtable.drop)(self.task) }
}
}
unsafe impl<U: Send + 'static> Send for Promise<U> {}
impl<U: Send + 'static> UnwindSafe for Promise<U> {}
impl<U: Send + 'static> RefUnwindSafe for Promise<U> {}