use super::{Interest, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
use super::Direction;
cfg_io_readiness! {
use crate::util::linked_list::{self, LinkedList};
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
}
#[derive(Debug)]
pub(crate) struct ScheduledIo {
readiness: AtomicUsize,
waiters: Mutex<Waiters>,
}
cfg_io_readiness! {
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
}
#[derive(Debug, Default)]
struct Waiters {
#[cfg(feature = "net")]
list: WaitList,
reader: Option<Waker>,
writer: Option<Waker>,
is_shutdown: bool,
}
cfg_io_readiness! {
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
waker: Option<Waker>,
interest: Interest,
is_ready: bool,
_p: PhantomPinned,
}
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
state: State,
waiter: UnsafeCell<Waiter>,
}
enum State {
Init,
Waiting,
Done,
}
}
const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(8);
const GENERATION: bit::Pack = TICK.then(7);
#[test]
fn test_generations_assert_same() {
assert_eq!(super::GENERATION, GENERATION);
}
impl Entry for ScheduledIo {
fn reset(&self) {
let state = self.readiness.load(Acquire);
let generation = GENERATION.unpack(state);
let next = GENERATION.pack_lossy(generation + 1, 0);
self.readiness.store(next, Release);
}
}
impl Default for ScheduledIo {
fn default() -> ScheduledIo {
ScheduledIo {
readiness: AtomicUsize::new(0),
waiters: Mutex::new(Default::default()),
}
}
}
impl ScheduledIo {
pub(crate) fn generation(&self) -> usize {
GENERATION.unpack(self.readiness.load(Acquire))
}
pub(super) fn shutdown(&self) {
self.wake0(Ready::ALL, true)
}
pub(super) fn set_readiness(
&self,
token: Option<usize>,
tick: Tick,
f: impl Fn(Ready) -> Ready,
) -> Result<(), ()> {
let mut current = self.readiness.load(Acquire);
loop {
let current_generation = GENERATION.unpack(current);
if let Some(token) = token {
if GENERATION.unpack(token) != current_generation {
return Err(());
}
}
let current_readiness = Ready::from_usize(current);
let new = f(current_readiness);
let packed = match tick {
Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
Tick::Clear(t) => {
if TICK.unpack(current) as u8 != t {
return Err(());
}
TICK.pack(t as usize, new.as_usize())
}
};
let next = GENERATION.pack(current_generation, packed);
match self
.readiness
.compare_exchange(current, next, AcqRel, Acquire)
{
Ok(_) => return Ok(()),
Err(actual) => current = actual,
}
}
}
pub(super) fn wake(&self, ready: Ready) {
self.wake0(ready, false);
}
fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;
let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr = 0;
let mut waiters = self.waiters.lock();
waiters.is_shutdown |= shutdown;
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers[curr] = Some(waker);
curr += 1;
}
}
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers[curr] = Some(waker);
curr += 1;
}
}
#[cfg(feature = "net")]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
while curr < NUM_WAKERS {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers[curr] = Some(waker);
curr += 1;
}
}
None => {
break 'outer;
}
}
}
drop(waiters);
for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}
curr = 0;
waiters = self.waiters.lock();
}
drop(waiters);
for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}
}
pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
let curr = self.readiness.load(Acquire);
ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
}
}
pub(super) fn poll_readiness(
&self,
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<ReadyEvent> {
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
if ready.is_empty() {
let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
match slot {
Some(existing) => {
if !existing.will_wake(cx.waker()) {
*existing = cx.waker().clone();
}
}
None => {
*slot = Some(cx.waker().clone());
}
}
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
if waiters.is_shutdown {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
})
} else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
})
}
}
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
}
pub(crate) fn clear_wakers(&self) {
let mut waiters = self.waiters.lock();
waiters.reader.take();
waiters.writer.take();
}
}
impl Drop for ScheduledIo {
fn drop(&mut self) {
self.wake(Ready::ALL);
}
}
unsafe impl Send for ScheduledIo {}
unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
interest,
_p: PhantomPinned,
}),
}
}
}
unsafe impl linked_list::Link for Waiter {
type Handle = NonNull<Waiter>;
type Target = Waiter;
fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
*handle
}
unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
ptr
}
unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
}
}
impl Future for Readiness<'_> {
type Output = ReadyEvent;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
let (scheduled_io, state, waiter) = unsafe {
let me = self.get_unchecked_mut();
(&me.scheduled_io, &mut me.state, &me.waiter)
};
loop {
match *state {
State::Init => {
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
if !ready.is_empty() {
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { ready, tick });
}
let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
if waiters.is_shutdown {
ready = Ready::ALL;
}
let ready = ready.intersection(interest);
if !ready.is_empty() {
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { ready, tick });
}
unsafe {
(*waiter.get()).waker = Some(cx.waker().clone());
}
waiters
.list
.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = State::Waiting;
}
State::Waiting => {
let waiters = scheduled_io.waiters.lock();
let w = unsafe { &mut *waiter.get() };
if w.is_ready {
*state = State::Done;
} else {
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}
return Poll::Pending;
}
drop(waiters);
}
State::Done => {
let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;
let w = unsafe { &mut *waiter.get() };
return Poll::Ready(ReadyEvent {
tick,
ready: Ready::from_interest(w.interest),
});
}
}
}
}
}
impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock();
unsafe {
waiters
.list
.remove(NonNull::new_unchecked(self.waiter.get()))
};
}
}
unsafe impl Send for Readiness<'_> {}
unsafe impl Sync for Readiness<'_> {}
}