#![cfg(feature = "std")]
#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::usize;
use cfg_if::cfg_if;
#[cfg(unix)]
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
cfg_if! {
if #[cfg(any(target_os = "linux", target_os = "android"))] {
mod epoll;
use epoll as sys;
} else if #[cfg(any(
target_os = "illumos",
target_os = "solaris",
))] {
mod port;
use port as sys;
} else if #[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))] {
mod kqueue;
use kqueue as sys;
} else if #[cfg(target_os = "windows")] {
mod wepoll;
use wepoll as sys;
} else {
compile_error!("polling does not support this target OS");
}
}
const NOTIFY_KEY: usize = std::usize::MAX;
#[derive(Debug)]
pub struct Event {
pub key: usize,
pub readable: bool,
pub writable: bool,
}
impl Event {
pub fn all(key: usize) -> Event {
Event {
key,
readable: true,
writable: true,
}
}
pub fn readable(key: usize) -> Event {
Event {
key,
readable: true,
writable: false,
}
}
pub fn writable(key: usize) -> Event {
Event {
key,
readable: false,
writable: true,
}
}
pub fn none(key: usize) -> Event {
Event {
key,
readable: false,
writable: false,
}
}
}
pub struct Poller {
poller: sys::Poller,
events: Mutex<sys::Events>,
notified: AtomicBool,
}
impl Poller {
pub fn new() -> io::Result<Poller> {
Ok(Poller {
poller: sys::Poller::new()?,
events: Mutex::new(sys::Events::new()),
notified: AtomicBool::new(false),
})
}
pub fn insert(&self, source: impl Source) -> io::Result<()> {
self.poller.insert(source.raw())
}
pub fn interest(&self, source: impl Source, event: Event) -> io::Result<()> {
if event.key == usize::MAX {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
))
} else {
self.poller.interest(source.raw(), event)
}
}
pub fn remove(&self, source: impl Source) -> io::Result<()> {
self.poller.remove(source.raw())
}
pub fn wait(&self, events: &mut Vec<Event>, timeout: Option<Duration>) -> io::Result<usize> {
log::trace!("Poller::wait(_, {:?})", timeout);
if let Ok(mut lock) = self.events.try_lock() {
self.poller.wait(&mut lock, timeout)?;
self.notified.swap(false, Ordering::SeqCst);
let len = events.len();
events.extend(lock.iter().filter(|ev| ev.key != usize::MAX));
Ok(events.len() - len)
} else {
log::trace!("wait: skipping because another thread is already waiting on I/O");
Ok(0)
}
}
pub fn notify(&self) -> io::Result<()> {
log::trace!("Poller::notify()");
if !self
.notified
.compare_and_swap(false, true, Ordering::SeqCst)
{
self.poller.notify()?;
}
Ok(())
}
}
impl fmt::Debug for Poller {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.poller.fmt(f)
}
}
cfg_if! {
if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, RawFd};
pub trait Source {
fn raw(&self) -> RawFd;
}
impl Source for RawFd {
fn raw(&self) -> RawFd {
*self
}
}
impl<T: AsRawFd> Source for &T {
fn raw(&self) -> RawFd {
self.as_raw_fd()
}
}
} else if #[cfg(windows)] {
use std::os::windows::io::{AsRawSocket, RawSocket};
pub trait Source {
fn raw(&self) -> RawSocket;
}
impl Source for RawSocket {
fn raw(&self) -> RawSocket {
*self
}
}
impl<T: AsRawSocket> Source for &T {
fn raw(&self) -> RawSocket {
self.as_raw_socket()
}
}
}
}