#[cfg(not(any(
target_os = "linux",
target_os = "android",
target_os = "illumos",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "windows",
)))]
compile_error!("async-io does not support this target OS");
use std::collections::BTreeMap;
use std::fmt;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::task::{Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use futures_util::future;
use once_cell::sync::Lazy;
use slab::Slab;
static PARKER_COUNT: AtomicUsize = AtomicUsize::new(0);
pub fn pair() -> (Parker, Unparker) {
let p = Parker::new();
let u = p.unparker();
(p, u)
}
pub struct Parker {
unparker: Unparker,
}
impl Parker {
pub fn new() -> Parker {
let parker = Parker {
unparker: Unparker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
cvar: Condvar::new(),
}),
},
};
PARKER_COUNT.fetch_add(1, Ordering::SeqCst);
parker
}
pub fn park(&self) {
self.unparker.inner.park(None);
}
pub fn park_timeout(&self, timeout: Duration) -> bool {
self.unparker.inner.park(Some(timeout))
}
pub fn park_deadline(&self, deadline: Instant) -> bool {
self.unparker
.inner
.park(Some(deadline.saturating_duration_since(Instant::now())))
}
pub fn unpark(&self) {
self.unparker.unpark()
}
pub fn unparker(&self) -> Unparker {
self.unparker.clone()
}
}
impl Drop for Parker {
fn drop(&mut self) {
PARKER_COUNT.fetch_sub(1, Ordering::SeqCst);
Reactor::get().thread_unparker.unpark();
}
}
impl fmt::Debug for Parker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Parker { .. }")
}
}
pub struct Unparker {
inner: Arc<Inner>,
}
impl Unparker {
pub fn unpark(&self) {
self.inner.unpark()
}
}
impl fmt::Debug for Unparker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Unparker { .. }")
}
}
impl Clone for Unparker {
fn clone(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const POLLING: usize = 2;
const NOTIFIED: usize = 3;
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}
impl Inner {
fn park(&self, timeout: Option<Duration>) -> bool {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if let Some(reactor_lock) = Reactor::get().try_lock() {
let _ = reactor_lock.react(Some(Duration::from_secs(0)));
}
return true;
}
if let Some(dur) = timeout {
if dur == Duration::from_millis(0) {
if let Some(reactor_lock) = Reactor::get().try_lock() {
let _ = reactor_lock.react(Some(Duration::from_secs(0)));
}
return false;
}
}
let deadline = timeout.map(|t| Instant::now() + t);
loop {
let reactor_lock = Reactor::get().try_lock();
let state = match reactor_lock {
None => PARKED,
Some(_) => POLLING,
};
let mut m = self.lock.lock().unwrap();
match self
.state
.compare_exchange(EMPTY, state, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, Ordering::SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return true;
}
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
match deadline {
None => {
match reactor_lock {
None => m = self.cvar.wait(m).unwrap(),
Some(reactor_lock) => {
drop(m);
let _ = reactor_lock.react(None);
m = self.lock.lock().unwrap();
}
}
match self.state.swap(EMPTY, Ordering::SeqCst) {
NOTIFIED => return true,
PARKED | POLLING => {}
n => panic!("inconsistent state: {}", n),
}
}
Some(deadline) => {
let timeout = deadline.saturating_duration_since(Instant::now());
m = match reactor_lock {
None => self.cvar.wait_timeout(m, timeout).unwrap().0,
Some(reactor_lock) => {
drop(m);
let _ = reactor_lock.react(Some(timeout));
self.lock.lock().unwrap()
}
};
match self.state.swap(EMPTY, Ordering::SeqCst) {
NOTIFIED => return true,
PARKED | POLLING => {}
n => panic!("inconsistent state: {}", n),
}
if Instant::now() >= deadline {
return false;
}
}
}
drop(m);
}
}
pub fn unpark(&self) {
let state = match self.state.swap(NOTIFIED, Ordering::SeqCst) {
EMPTY => return,
NOTIFIED => return,
state => state,
};
drop(self.lock.lock().unwrap());
if state == PARKED {
self.cvar.notify_one();
} else {
Reactor::get().notify();
}
}
}
pub(crate) struct Reactor {
thread_unparker: parking::Unparker,
sys: sys::Reactor,
ticker: AtomicUsize,
sources: Mutex<Slab<Arc<Source>>>,
events: Mutex<sys::Events>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
pub(crate) fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
let (parker, unparker) = parking::pair();
thread::Builder::new()
.name("async-io".to_string())
.spawn(move || {
let reactor = Reactor::get();
let mut sleeps = 0u64;
let mut last_tick = 0;
loop {
let tick = reactor.ticker.load(Ordering::SeqCst);
if last_tick == tick {
let reactor_lock = if sleeps >= 60 {
Some(reactor.lock())
} else {
reactor.try_lock()
};
if let Some(reactor_lock) = reactor_lock {
let _ = reactor_lock.react(None);
last_tick = reactor.ticker.load(Ordering::SeqCst);
}
sleeps = 0;
} else {
last_tick = tick;
sleeps += 1;
}
if PARKER_COUNT.load(Ordering::SeqCst) == 0 {
sleeps = 0;
} else {
let delay_us = if sleeps < 50 {
20
} else {
20 << (sleeps - 50).min(9)
};
if parker.park_timeout(Duration::from_micros(delay_us)) {
sleeps = 0;
}
}
}
})
.expect("cannot spawn async-io thread");
Reactor {
thread_unparker: unparker,
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Slab::new()),
events: Mutex::new(sys::Events::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
}
});
&REACTOR
}
pub(crate) fn notify(&self) {
self.sys.notify().expect("failed to notify reactor");
}
pub(crate) fn insert_io(
&self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock().unwrap();
let vacant = sources.vacant_entry();
let key = vacant.key();
self.sys.register(raw, key)?;
let source = Arc::new(Source {
raw,
key,
wakers: Mutex::new(Wakers {
tick_readable: 0,
tick_writable: 0,
readers: Vec::new(),
writers: Vec::new(),
}),
});
Ok(vacant.insert(source).clone())
}
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
self.sys.deregister(source.raw)
}
pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
while self
.timer_ops
.push(TimerOp::Insert(when, id, waker.clone()))
.is_err()
{
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
self.notify();
id
}
pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
}
fn lock(&self) -> ReactorLock<'_> {
let reactor = self;
let events = self.events.lock().unwrap();
ReactorLock { reactor, events }
}
fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().ok().map(|events| {
let reactor = self;
ReactorLock { reactor, events }
})
}
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
let now = Instant::now();
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
let dur = if ready.is_empty() {
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
Some(Duration::from_secs(0))
};
drop(timers);
for (_, waker) in ready {
wakers.push(waker);
}
dur
}
fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, id)) => {
timers.remove(&(when, id));
}
Err(_) => break,
}
}
}
}
struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, sys::Events>,
}
impl ReactorLock<'_> {
fn react(mut self, timeout: Option<Duration>) -> io::Result<()> {
let mut wakers = Vec::new();
let next_timer = self.reactor.process_timers(&mut wakers);
let timeout = match (next_timer, timeout) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.min(b)),
};
let tick = self
.reactor
.ticker
.fetch_add(1, Ordering::SeqCst)
.wrapping_add(1);
let res = match self.reactor.sys.wait(&mut self.events, timeout) {
Ok(0) => {
if timeout != Some(Duration::from_secs(0)) {
self.reactor.process_timers(&mut wakers);
}
Ok(())
}
Ok(_) => {
let sources = self.reactor.sources.lock().unwrap();
for ev in self.events.iter() {
if let Some(source) = sources.get(ev.key) {
let mut w = source.wakers.lock().unwrap();
if ev.readable {
w.tick_readable = tick;
wakers.append(&mut w.readers);
}
if ev.writable {
w.tick_writable = tick;
wakers.append(&mut w.writers);
}
if !(w.writers.is_empty() && w.readers.is_empty()) {
self.reactor.sys.reregister(
source.raw,
source.key,
!w.readers.is_empty(),
!w.writers.is_empty(),
)?;
}
}
}
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
Err(err) => Err(err),
};
drop(self);
for waker in wakers {
let _ = panic::catch_unwind(|| waker.wake());
}
res
}
}
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}
#[derive(Debug)]
pub(crate) struct Source {
#[cfg(unix)]
pub(crate) raw: RawFd,
#[cfg(windows)]
pub(crate) raw: RawSocket,
key: usize,
wakers: Mutex<Wakers>,
}
#[derive(Debug)]
struct Wakers {
tick_readable: usize,
tick_writable: usize,
readers: Vec<Waker>,
writers: Vec<Waker>,
}
impl Source {
pub(crate) async fn readable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut w = self.wakers.lock().unwrap();
if let Some((a, b)) = ticks {
if w.tick_readable != a && w.tick_readable != b {
return Poll::Ready(Ok(()));
}
}
if w.readers.is_empty() {
Reactor::get()
.sys
.reregister(self.raw, self.key, true, !w.writers.is_empty())?;
}
if w.readers.iter().all(|w| !w.will_wake(cx.waker())) {
w.readers.push(cx.waker().clone());
}
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
w.tick_readable,
));
}
Poll::Pending
})
.await
}
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut w = self.wakers.lock().unwrap();
if let Some((a, b)) = ticks {
if w.tick_writable != a && w.tick_writable != b {
return Poll::Ready(Ok(()));
}
}
if w.writers.is_empty() {
Reactor::get()
.sys
.reregister(self.raw, self.key, !w.readers.is_empty(), true)?;
}
if w.writers.iter().all(|w| !w.will_wake(cx.waker())) {
w.writers.push(cx.waker().clone());
}
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
w.tick_writable,
));
}
Poll::Pending
})
.await
}
}
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::RawFd;
use std::time::Duration;
use crate::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp,
};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
epoll_fd: RawFd,
event_fd: RawFd,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let epoll_fd = epoll_create1()?;
let event_fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let reactor = Reactor { epoll_fd, event_fd };
reactor.register(event_fd, !0)?;
reactor.reregister(event_fd, !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
let ev = &mut EpollEvent::new(0, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
if read {
flags |= read_flags();
}
if write {
flags |= write_flags();
}
let ev = &mut EpollEvent::new(flags, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlMod, fd, Some(ev))
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None)
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.map(|t| {
if t == Duration::from_millis(0) {
t
} else {
t.max(Duration::from_millis(1))
}
})
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
events.len = epoll_wait(self.epoll_fd, &mut events.list, timeout_ms)?;
let mut buf = [0u8; 8];
let _ = syscall!(read(
self.event_fd,
&mut buf[0] as *mut u8 as *mut libc::c_void,
buf.len()
));
self.reregister(self.event_fd, !0, true, false)?;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
self.event_fd,
&buf[0] as *const u8 as *const libc::c_void,
buf.len()
));
Ok(())
}
}
fn read_flags() -> EpollFlags {
libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI
}
fn write_flags() -> EpollFlags {
libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLERR
}
pub struct Events {
list: Box<[EpollEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let list = vec![EpollEvent::empty(); 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events() & read_flags()) != 0,
writable: (ev.events() & write_flags()) != 0,
key: ev.data() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
mod sys {
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::time::Duration;
use crate::sys::event::{kevent_ts, kqueue, KEvent};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
kqueue_fd: RawFd,
read_stream: UnixStream,
write_stream: UnixStream,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let kqueue_fd = kqueue()?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
let reactor = Reactor {
kqueue_fd,
read_stream,
write_stream,
};
reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, fd: RawFd, _key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
let mut write_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
if read {
read_flags |= libc::EV_ADD;
} else {
read_flags |= libc::EV_DELETE;
}
if write {
write_flags |= libc::EV_ADD;
} else {
write_flags |= libc::EV_DELETE;
}
let udata = key as _;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_READ, read_flags, 0, 0, udata),
KEvent::new(fd as _, libc::EVFILT_WRITE, write_flags, 0, 0, udata),
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR) == 1
&& data != 0
&& data != libc::ENOENT as _
&& data != libc::EPIPE as _
{
return Err(io::Error::from_raw_os_error(data as _));
}
}
Ok(())
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let flags = libc::EV_DELETE | libc::EV_RECEIPT;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_WRITE, flags, 0, 0, 0),
KEvent::new(fd as _, libc::EVFILT_READ, flags, 0, 0, 0),
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR == 1) && data != 0 && data != libc::ENOENT as _ {
return Err(io::Error::from_raw_os_error(data as _));
}
}
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?;
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.reregister(self.read_stream.as_raw_fd(), !0, true, false)?;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
let _ = (&self.write_stream).write(&[1]);
Ok(())
}
}
pub struct Events {
list: Box<[KEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let flags = 0;
let event = KEvent::new(0, 0, flags, 0, 0, 0);
let list = vec![event; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: ev.filter() == libc::EVFILT_READ,
writable: ev.filter() == libc::EVFILT_WRITE
|| (ev.filter() == libc::EVFILT_READ && (ev.flags() & libc::EV_EOF) != 0),
key: ev.udata() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}
#[cfg(target_os = "windows")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_sys_stjepang as we;
use winapi::um::winsock2;
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { we::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
handle: we::HANDLE,
}
unsafe impl Send for Reactor {}
unsafe impl Sync for Reactor {}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let handle = unsafe { we::epoll_create1(0) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
Ok(Reactor { handle })
}
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
unsafe {
let mut nonblocking = true as libc::c_ulong;
let res = winsock2::ioctlsocket(
sock as winsock2::SOCKET,
winsock2::FIONBIO,
&mut nonblocking,
);
if res != 0 {
return Err(io::Error::last_os_error());
}
}
let mut ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_ADD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn reregister(
&self,
sock: RawSocket,
key: usize,
read: bool,
write: bool,
) -> io::Result<()> {
let mut flags = we::EPOLLONESHOT;
if read {
flags |= READ_FLAGS;
}
if write {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_MOD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_DEL as libc::c_int,
sock as we::SOCKET,
0 as *mut we::epoll_event,
))?;
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = match timeout {
None => -1,
Some(t) => {
if t == Duration::from_millis(0) {
0
} else {
t.max(Duration::from_millis(1))
.as_millis()
.try_into()
.unwrap_or(libc::c_int::max_value())
}
}
};
events.len = syscall!(epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as libc::c_int,
timeout_ms,
))? as usize;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
unsafe {
winapi::um::ioapiset::PostQueuedCompletionStatus(
self.handle as winapi::um::winnt::HANDLE,
0,
0,
0 as *mut _,
);
}
Ok(())
}
}
struct As(RawSocket);
impl AsRawSocket for As {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
const READ_FLAGS: u32 =
we::EPOLLIN | we::EPOLLRDHUP | we::EPOLLHUP | we::EPOLLERR | we::EPOLLPRI;
const WRITE_FLAGS: u32 = we::EPOLLOUT | we::EPOLLHUP | we::EPOLLERR;
pub struct Events {
list: Box<[we::epoll_event]>,
len: usize,
}
unsafe impl Send for Events {}
unsafe impl Sync for Events {}
impl Events {
pub fn new() -> Events {
let ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: 0 },
};
Events {
list: vec![ev; 1000].into_boxed_slice(),
len: 0,
}
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events & READ_FLAGS) != 0,
writable: (ev.events & WRITE_FLAGS) != 0,
key: unsafe { ev.data.u64 } as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}