use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::runtime::io::{Direction, ReadyEvent, Tick};
use crate::util::bit;
use crate::util::linked_list::{self, LinkedList};
use crate::util::WakeList;
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::task::{Context, Poll, Waker};
#[derive(Debug)]
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "sparc",
target_arch = "hexagon",
),
repr(align(32))
)]
#[cfg_attr(target_arch = "m68k", repr(align(16)))]
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "sparc",
target_arch = "hexagon",
target_arch = "m68k",
target_arch = "s390x",
)),
repr(align(64))
)]
pub(crate) struct ScheduledIo {
pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
readiness: AtomicUsize,
waiters: Mutex<Waiters>,
}
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
#[derive(Debug, Default)]
struct Waiters {
list: WaitList,
reader: Option<Waker>,
writer: Option<Waker>,
}
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
waker: Option<Waker>,
interest: Interest,
is_ready: bool,
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
state: State,
waiter: UnsafeCell<Waiter>,
}
enum State {
Init,
Waiting,
Done,
}
const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(15);
const SHUTDOWN: bit::Pack = TICK.then(1);
impl Default for ScheduledIo {
fn default() -> ScheduledIo {
ScheduledIo {
linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
readiness: AtomicUsize::new(0),
waiters: Mutex::new(Waiters::default()),
}
}
}
impl ScheduledIo {
pub(crate) fn token(&self) -> mio::Token {
mio::Token(super::EXPOSE_IO.expose_provenance(self))
}
pub(super) fn shutdown(&self) {
let mask = SHUTDOWN.pack(1, 0);
self.readiness.fetch_or(mask, AcqRel);
self.wake(Ready::ALL);
}
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));
const MAX_TICK: usize = TICK.max_value() + 1;
let tick = TICK.unpack(curr);
let new_tick = match tick_op {
Tick::Clear(t) if tick as u8 != t => return None,
Tick::Clear(t) => t as usize,
Tick::Set => tick.wrapping_add(1) % MAX_TICK,
};
let ready = Ready::from_usize(READINESS.unpack(curr));
Some(TICK.pack(new_tick, f(ready).as_usize()))
});
}
pub(super) fn wake(&self, ready: Ready) {
let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers.push(waker);
}
}
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers.push(waker);
}
}
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers.push(waker);
}
}
None => {
break 'outer;
}
}
}
drop(waiters);
wakers.wake_all();
waiters = self.waiters.lock();
}
drop(waiters);
wakers.wake_all();
}
pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
let curr = self.readiness.load(Acquire);
ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
is_shutdown: SHUTDOWN.unpack(curr) != 0,
}
}
pub(super) fn poll_readiness(
&self,
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<ReadyEvent> {
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if ready.is_empty() && !is_shutdown {
let mut waiters = self.waiters.lock();
let waker = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
match waker {
Some(waker) => waker.clone_from(cx.waker()),
None => *waker = Some(cx.waker().clone()),
}
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if is_shutdown {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
is_shutdown,
})
} else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
is_shutdown,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
is_shutdown,
})
}
}
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
}
pub(crate) fn clear_wakers(&self) {
let mut waiters = self.waiters.lock();
waiters.reader.take();
waiters.writer.take();
}
}
impl Drop for ScheduledIo {
fn drop(&mut self) {
self.wake(Ready::ALL);
}
}
unsafe impl Send for ScheduledIo {}
unsafe impl Sync for ScheduledIo {}
impl ScheduledIo {
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
interest,
_p: PhantomPinned,
}),
}
}
}
unsafe impl linked_list::Link for Waiter {
type Handle = NonNull<Waiter>;
type Target = Waiter;
fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
*handle
}
unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
ptr
}
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
}
}
impl Future for Readiness<'_> {
type Output = ReadyEvent;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
let (scheduled_io, state, waiter) = {
let me = unsafe { self.get_unchecked_mut() };
(me.scheduled_io, &mut me.state, &me.waiter)
};
loop {
match *state {
State::Init => {
let curr = scheduled_io.readiness.load(SeqCst);
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
let interest = unsafe { (*waiter.get()).interest };
let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
if !ready.is_empty() || is_shutdown {
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if is_shutdown {
ready = Ready::ALL;
}
let ready = ready.intersection(interest);
if !ready.is_empty() || is_shutdown {
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
let waker = unsafe { &mut (*waiter.get()).waker };
let old = waker.replace(cx.waker().clone());
debug_assert!(old.is_none(), "waker should be None at the first poll");
waiters
.list
.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = State::Waiting;
}
State::Waiting => {
let waiters = scheduled_io.waiters.lock();
let w = unsafe { &mut *waiter.get() };
if w.is_ready {
*state = State::Done;
} else {
w.waker.as_mut().unwrap().clone_from(cx.waker());
return Poll::Pending;
}
drop(waiters);
}
State::Done => {
let curr = scheduled_io.readiness.load(Acquire);
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
let tick = TICK.unpack(curr) as u8;
let interest = unsafe { (*waiter.get()).interest };
let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
}
}
}
}
impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock();
unsafe {
waiters
.list
.remove(NonNull::new_unchecked(self.waiter.get()))
};
}
}
unsafe impl Send for Readiness<'_> {}
unsafe impl Sync for Readiness<'_> {}