use std::{
collections::VecDeque,
future::Future,
mem::ManuallyDrop,
ops::{Deref, DerefMut},
pin::Pin,
ptr::{addr_of_mut, NonNull},
sync::{
atomic::{AtomicU8, Ordering},
Arc, Mutex,
},
task::{Context, Poll, Waker},
thread::{self, Thread},
time::{Duration, Instant},
};
use crate::MaybeFuture;
pub struct Notify {
state: Mutex<NotifyState>,
}
pub enum NotifyState {
None,
Waker(Waker),
Thread(Thread),
}
impl NotifyState {
fn take(&mut self) -> Self {
std::mem::replace(self, NotifyState::None)
}
fn notify(self) {
match self {
NotifyState::None => {}
NotifyState::Waker(waker) => waker.wake(),
NotifyState::Thread(thread) => thread.unpark(),
}
}
}
impl AsRef<Notify> for Notify {
fn as_ref(&self) -> &Notify {
self
}
}
impl Notify {
pub fn new() -> Self {
Self {
state: Mutex::new(NotifyState::None),
}
}
pub fn subscribe(&self, cx: &mut Context) {
*self.state.lock().unwrap() = NotifyState::Waker(cx.waker().clone());
}
pub fn wait<T>(&self, mut check: impl FnMut() -> Option<T>) -> T {
*self.state.lock().unwrap() = NotifyState::Thread(thread::current());
loop {
if let Some(result) = check() {
return result;
}
thread::park();
}
}
pub fn wait_timeout<T>(
&self,
timeout: Duration,
mut check: impl FnMut() -> Option<T>,
) -> Option<T> {
*self.state.lock().unwrap() = NotifyState::Thread(thread::current());
let start = Instant::now();
loop {
if let Some(result) = check() {
return Some(result);
}
let remaining = timeout.checked_sub(start.elapsed())?;
thread::park_timeout(remaining);
}
}
fn take_notify_state(&self) -> NotifyState {
self.state.lock().unwrap().take()
}
}
#[repr(C)]
struct TransferInner<P> {
platform_data: P,
state: AtomicU8,
notify: Arc<dyn AsRef<Notify> + Send + Sync>,
}
const STATE_IDLE: u8 = 0;
const STATE_PENDING: u8 = 1;
const STATE_ABANDONED: u8 = 2;
pub(crate) struct Idle<P>(Box<TransferInner<P>>);
impl<P> Idle<P> {
pub(crate) fn new(notify: Arc<dyn AsRef<Notify> + Send + Sync>, inner: P) -> Idle<P> {
Idle(Box::new(TransferInner {
platform_data: inner,
state: AtomicU8::new(STATE_IDLE),
notify,
}))
}
pub(crate) fn pre_submit(self) -> Pending<P> {
let prev = self.0.state.swap(STATE_PENDING, Ordering::Relaxed);
assert_eq!(prev, STATE_IDLE, "Transfer should be idle when submitted");
Pending {
ptr: unsafe { NonNull::new_unchecked(Box::into_raw(self.0)) },
}
}
pub(crate) fn simulate_complete(self) -> Pending<P> {
Pending {
ptr: unsafe { NonNull::new_unchecked(Box::into_raw(self.0)) },
}
}
}
impl<P> Deref for Idle<P> {
type Target = P;
fn deref(&self) -> &Self::Target {
&self.0.platform_data
}
}
impl<P> DerefMut for Idle<P> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0.platform_data
}
}
pub(crate) struct Pending<P> {
ptr: NonNull<TransferInner<P>>,
}
unsafe impl<P: Send> Send for Pending<P> {}
unsafe impl<P: Sync> Sync for Pending<P> {}
impl<P> Pending<P> {
pub fn as_ptr(&self) -> *mut P {
self.ptr.as_ptr().cast()
}
fn state(&self) -> &AtomicU8 {
unsafe { &*addr_of_mut!((*self.ptr.as_ptr()).state) }
}
pub fn is_complete(&self) -> bool {
match self.state().load(Ordering::Acquire) {
STATE_PENDING => false,
STATE_IDLE => true,
s => panic!("Polling transfer in unexpected state {s}"),
}
}
pub unsafe fn into_idle(self) -> Idle<P> {
debug_assert!(self.is_complete());
let transfer = ManuallyDrop::new(self);
Idle(Box::from_raw(transfer.ptr.as_ptr()))
}
}
pub fn take_completed_from_queue<P>(queue: &mut VecDeque<Pending<P>>) -> Option<Idle<P>> {
if queue.front().expect("no transfer pending").is_complete() {
Some(unsafe { queue.pop_front().unwrap().into_idle() })
} else {
None
}
}
pub fn take_completed_from_option<P>(option: &mut Option<Pending<P>>) -> Option<Idle<P>> {
if option.as_mut().is_some_and(|next| next.is_complete()) {
option.take().map(|t| unsafe { t.into_idle() })
} else {
None
}
}
impl<P> Drop for Pending<P> {
fn drop(&mut self) {
match self.state().swap(STATE_ABANDONED, Ordering::AcqRel) {
STATE_PENDING => { }
STATE_IDLE => {
unsafe { drop(Box::from_raw(self.ptr.as_ptr())) }
}
s => panic!("Dropping transfer in unexpected state {s}"),
}
}
}
pub(crate) unsafe fn notify_completion<P>(transfer: *mut P) {
unsafe {
let transfer = transfer as *mut TransferInner<P>;
let wake = (*transfer).notify.deref().as_ref().take_notify_state();
match (*transfer).state.swap(STATE_IDLE, Ordering::AcqRel) {
STATE_PENDING => wake.notify(),
STATE_ABANDONED => {
drop(Box::from_raw(transfer));
}
s => panic!("Completing transfer in unexpected state {s}"),
}
}
}
pub(crate) struct TransferFuture<D> {
transfer: Option<Pending<D>>,
notify: Arc<Notify>,
}
impl<D> TransferFuture<D> {
pub(crate) fn new(transfer: D, submit: impl FnOnce(Idle<D>) -> Pending<D>) -> Self {
let notify = Arc::new(Notify::new());
let transfer = submit(Idle::new(notify.clone(), transfer));
Self {
transfer: Some(transfer),
notify,
}
}
}
impl<D> Future for TransferFuture<D> {
type Output = Idle<D>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.notify.subscribe(cx);
take_completed_from_option(&mut self.transfer).map_or(Poll::Pending, Poll::Ready)
}
}
impl<D> MaybeFuture for TransferFuture<D>
where
D: Send,
{
fn wait(mut self) -> Self::Output {
self.notify
.wait(|| take_completed_from_option(&mut self.transfer))
}
}