use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll};
use super::{with_reactor, with_reactor_mut};
use crate::platform::sys::{Interest, RawSource};
static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(1);
pub(crate) const WAKE_TOKEN: usize = usize::MAX;
pub(crate) const SIGNAL_TOKEN_GUARD: usize = usize::MAX - 1;
pub(crate) fn next_token() -> usize {
let t = TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed);
debug_assert!(
t < WAKE_TOKEN - 2,
"token counter approaching sentinel values — process has allocated too many IoSources"
);
t
}
pub struct IoSource {
raw: RawSource,
token: usize,
registered: AtomicBool,
}
impl IoSource {
pub fn new(raw: RawSource, token: usize, interest: Interest) -> io::Result<Self> {
let source = Self {
raw,
token,
registered: AtomicBool::new(false),
};
with_reactor(|r| r.register(raw, token, interest))?;
source.registered.store(true, Ordering::Release);
Ok(source)
}
pub fn reregister(&self, interest: Interest) -> io::Result<()> {
if !self.registered.load(Ordering::Acquire) {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"IoSource: reregister called on unregistered source",
));
}
with_reactor(|r| r.reregister(self.raw, self.token, interest))
}
#[inline]
pub fn raw(&self) -> RawSource {
self.raw
}
#[inline]
pub fn token(&self) -> usize {
self.token
}
pub fn readable(&self) -> ReadableFuture<'_> {
ReadableFuture {
source: self,
armed: false,
}
}
pub fn writable(&self) -> WritableFuture<'_> {
WritableFuture {
source: self,
armed: false,
}
}
}
impl Drop for IoSource {
fn drop(&mut self) {
if self.registered.swap(false, Ordering::AcqRel) {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = with_reactor_mut(|r| r.deregister_with_token(self.raw, self.token));
}));
}
}
}
pub struct ReadableFuture<'a> {
source: &'a IoSource,
armed: bool,
}
impl<'a> Future for ReadableFuture<'a> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.armed {
return Poll::Ready(Ok(()));
}
with_reactor_mut(|r| {
r.wakers
.set_read_waker(self.source.token, cx.waker().clone());
});
self.armed = true;
if let Err(e) = self.source.reregister(Interest::READABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
}
pub struct WritableFuture<'a> {
source: &'a IoSource,
armed: bool,
}
impl<'a> Future for WritableFuture<'a> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.armed {
return Poll::Ready(Ok(()));
}
with_reactor_mut(|r| {
r.wakers
.set_write_waker(self.source.token, cx.waker().clone());
});
self.armed = true;
if let Err(e) = self.source.reregister(Interest::WRITABLE) {
return Poll::Ready(Err(e));
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::platform::sys::create_pipe;
#[cfg(unix)]
#[test]
fn io_source_registers_on_construction() {
let (r, w) = create_pipe().unwrap();
let src = IoSource::new(r, next_token(), Interest::READABLE).expect("IoSource::new failed");
assert!(src.registered.load(Ordering::Acquire));
drop(src);
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
#[cfg(unix)]
#[test]
fn io_source_deregisters_on_drop() {
let (r, w) = create_pipe().unwrap();
{
let src = IoSource::new(r, next_token(), Interest::READABLE).unwrap();
assert!(src.registered.load(Ordering::Acquire));
}
let src2 = IoSource::new(r, next_token(), Interest::READABLE)
.expect("re-register after drop failed");
drop(src2);
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
#[test]
fn next_token_is_unique() {
let t1 = next_token();
let t2 = next_token();
let t3 = next_token();
assert!(t1 < t2 && t2 < t3);
}
}