use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::util::bit;
use crate::util::slab::{Address, Entry, Generation};
use std::sync::atomic::Ordering::{Acquire, AcqRel, SeqCst};
#[derive(Debug)]
pub(crate) struct ScheduledIo {
readiness: AtomicUsize,
pub(crate) reader: AtomicWaker,
pub(crate) writer: AtomicWaker,
}
const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH);
impl Entry for ScheduledIo {
fn generation(&self) -> Generation {
unpack_generation(self.readiness.load(SeqCst))
}
fn reset(&self, generation: Generation) -> bool {
let mut current = self.readiness.load(Acquire);
loop {
if unpack_generation(current) != generation {
return false;
}
let next = PACK.pack(generation.next().to_usize(), 0);
match self.readiness.compare_exchange(
current,
next,
AcqRel,
Acquire,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
drop(self.reader.take_waker());
drop(self.writer.take_waker());
true
}
}
impl Default for ScheduledIo {
fn default() -> ScheduledIo {
ScheduledIo {
readiness: AtomicUsize::new(0),
reader: AtomicWaker::new(),
writer: AtomicWaker::new(),
}
}
}
impl ScheduledIo {
pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> {
let ready = self.readiness.load(Acquire);
if unpack_generation(ready) != address.generation() {
return None;
}
Some(ready & !PACK.mask())
}
pub(crate) fn set_readiness(
&self,
address: Address,
f: impl Fn(usize) -> usize,
) -> Result<usize, ()> {
let generation = address.generation();
let mut current = self.readiness.load(Acquire);
loop {
if unpack_generation(current) != generation {
return Err(());
}
let current_readiness = current & mio::Ready::all().as_usize();
let new = f(current_readiness);
debug_assert!(
new <= !PACK.max_value(),
"new readiness value would overwrite generation bits!"
);
match self.readiness.compare_exchange(
current,
PACK.pack(generation.to_usize(), new),
AcqRel,
Acquire,
) {
Ok(_) => return Ok(current),
Err(actual) => current = actual,
}
}
}
}
impl Drop for ScheduledIo {
fn drop(&mut self) {
self.writer.wake();
self.reader.wake();
}
}
fn unpack_generation(src: usize) -> Generation {
Generation::new(PACK.unpack(src))
}