use super::super::{Interest, ReadinessPoll};
use super::NOTIFY_KEY;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::os::fd::{AsFd, BorrowedFd};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::time::Duration;
use std::{io, ptr};
pub struct OsPoller {
epoll_fd: OwnedFd,
notifier: Notifier,
#[cfg(not(target_os = "redox"))]
timer_fd: Option<OwnedFd>,
_not_send: PhantomData<*const ()>,
}
impl OsPoller {
pub fn new() -> io::Result<Self> {
let epoll_fd = {
let fd = syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?;
unsafe { OwnedFd::from_raw_fd(fd) }
};
let notifier = Notifier::new()?;
#[cfg(not(target_os = "redox"))]
let timer_fd = {
let fd: RawFd =
syscall!(timerfd_create(libc::CLOCK_MONOTONIC, libc::TFD_CLOEXEC))?;
unsafe { OwnedFd::from_raw_fd(fd) }
};
let epoll = Self {
epoll_fd,
notifier,
#[cfg(not(target_os = "redox"))]
timer_fd: Some(timer_fd),
_not_send: PhantomData,
};
#[cfg(not(target_os = "redox"))]
if let Some(ref timer_fd) = epoll.timer_fd {
epoll.add(timer_fd.as_raw_fd(), NOTIFY_KEY, Interest::NONE)?;
}
epoll.add(
epoll.notifier.as_fd().as_raw_fd(),
NOTIFY_KEY,
Interest::READ,
)?;
Ok(epoll)
}
}
impl Drop for OsPoller {
fn drop(&mut self) {
#[cfg(not(target_os = "redox"))]
if let Some(timer_fd) = self.timer_fd.take() {
let _ = self.delete(timer_fd.as_fd().as_raw_fd());
}
let _ = self.delete(self.notifier.as_fd().as_raw_fd());
}
}
impl ReadinessPoll for OsPoller {
type NativeEvent = libc::epoll_event;
fn add(&self, fd: RawFd, key: u64, interest: Interest) -> io::Result<()> {
let mut events = 0u32;
if interest.is_readable() {
events |= libc::EPOLLIN as u32;
}
if interest.is_writable() {
events |= libc::EPOLLOUT as u32;
}
events |= libc::EPOLLONESHOT as u32;
let mut event = libc::epoll_event { events, u64: key as u64 };
syscall!(epoll_ctl(
self.epoll_fd.as_raw_fd(),
libc::EPOLL_CTL_ADD,
fd,
&mut event as *mut libc::epoll_event,
))?;
Ok(())
}
fn modify(&self, fd: RawFd, key: u64, interest: Interest) -> io::Result<()> {
let mut events = 0u32;
if interest.is_readable() {
events |= libc::EPOLLIN as u32;
}
if interest.is_writable() {
events |= libc::EPOLLOUT as u32;
}
events |= libc::EPOLLONESHOT as u32;
let mut event = libc::epoll_event { events, u64: key as u64 };
syscall!(epoll_ctl(
self.epoll_fd.as_raw_fd(),
libc::EPOLL_CTL_MOD,
fd,
&mut event as *mut libc::epoll_event,
))?;
Ok(())
}
fn delete(&self, fd: RawFd) -> io::Result<()> {
match syscall!(epoll_ctl(
self.epoll_fd.as_raw_fd(),
libc::EPOLL_CTL_DEL,
fd,
ptr::null_mut(),
)) {
Ok(_) => Ok(()),
Err(err) => Err(match err.raw_os_error() {
Some(libc::EBADF) => io::Error::from_raw_os_error(libc::ENOENT),
_ => err,
}),
}
}
fn delete_timer(&self, _key: u64) -> io::Result<()> {
Err(io::Error::from_raw_os_error(libc::ENOENT))
}
fn wait(
&self,
events: &mut [Self::NativeEvent],
timeout: Option<Duration>,
) -> io::Result<usize> {
#[cfg(not(target_os = "redox"))]
const TS_ZERO: libc::timespec = unsafe {
std::mem::transmute([0u8; std::mem::size_of::<libc::timespec>()])
};
#[allow(dead_code)]
const ITS_ZERO: libc::itimerspec = unsafe {
std::mem::transmute([0u8; std::mem::size_of::<libc::itimerspec>()])
};
#[cfg(not(target_os = "redox"))]
if let Some(ref timer_fd) = self.timer_fd {
let mut new_val = libc::itimerspec {
it_interval: TS_ZERO,
it_value: match timeout {
None => TS_ZERO,
Some(t) => {
let mut ts = TS_ZERO;
ts.tv_sec = t.as_secs() as _;
ts.tv_nsec = t.subsec_nanos() as _;
ts
}
},
};
let mut result = MaybeUninit::<libc::itimerspec>::uninit();
syscall!(timerfd_settime(
timer_fd.as_raw_fd(),
0,
&mut new_val as *mut _,
result.as_mut_ptr()
))?;
self.modify(timer_fd.as_raw_fd(), NOTIFY_KEY, Interest::READ)?;
}
#[cfg(not(target_os = "redox"))]
let timer_fd = &self.timer_fd;
#[cfg(target_os = "redox")]
let timer_fd: Option<core::convert::Infallible> = None;
let timeout = match (timer_fd, timeout) {
(_, Some(t)) if t == Duration::from_secs(0) => Some(TS_ZERO),
(None, Some(t)) => Some(libc::timespec {
tv_sec: t.as_secs() as i64,
tv_nsec: t.subsec_nanos() as i64,
}),
_ => None,
};
let n = syscall!(epoll_pwait2(
self.epoll_fd.as_raw_fd(),
events.as_mut_ptr(),
events.len() as i32,
timeout.as_ref().map(|v| v as *const _).unwrap_or(ptr::null()),
ptr::null_mut()
))?;
self.notifier.clear();
self.modify(
self.notifier.as_fd().as_raw_fd(),
NOTIFY_KEY,
Interest::READ,
)?;
let mut write_idx = 0;
for read_idx in 0..n as usize {
if events[read_idx].u64 != NOTIFY_KEY {
if write_idx != read_idx {
events[write_idx] = events[read_idx];
}
write_idx += 1;
}
}
Ok(write_idx)
}
fn notify(&self) -> io::Result<()> {
self.notifier.notify();
Ok(())
}
fn event_key(event: &Self::NativeEvent) -> u64 {
event.u64
}
fn event_interest(event: &Self::NativeEvent) -> Interest {
let readable = (event.events & libc::EPOLLIN as u32) != 0;
let writable = (event.events & libc::EPOLLOUT as u32) != 0;
let is_hup = (event.events & libc::EPOLLHUP as u32) != 0;
let is_err = (event.events & libc::EPOLLERR as u32) != 0;
let is_rdhup = (event.events & libc::EPOLLRDHUP as u32) != 0;
let error_or_hup = is_hup || is_err || is_rdhup;
let effective_readable = readable || error_or_hup;
match (effective_readable, writable) {
(true, true) => Interest::READ_AND_WRITE,
(true, false) => Interest::READ,
(false, true) => Interest::WRITE,
(false, false) => Interest::READ, }
}
}
enum Notifier {
#[cfg(not(target_os = "redox"))]
EventFd(OwnedFd),
Pipe {
read_pipe: OwnedFd,
write_pipe: OwnedFd,
},
}
impl AsFd for Notifier {
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
#[cfg(not(target_os = "redox"))]
Notifier::EventFd(fd) => fd.as_fd(),
Notifier::Pipe { read_pipe: read, .. } => read.as_fd(),
}
}
}
fn pipe() -> io::Result<(OwnedFd, OwnedFd)> {
let mut result = MaybeUninit::<[OwnedFd; 2]>::uninit();
match syscall!(pipe2(result.as_mut_ptr().cast::<_>(), libc::O_CLOEXEC)) {
Ok(_) => {
let [read, write] = unsafe { result.assume_init() };
Ok((read, write))
}
Err(_) => {
use libc::{F_GETFD, F_SETFD, FD_CLOEXEC};
syscall!(pipe(result.as_mut_ptr().cast::<_>()))?;
let [read, write] = unsafe { result.assume_init() };
let flags = syscall!(fcntl(read.as_raw_fd(), F_GETFD))?;
syscall!(fcntl(read.as_raw_fd(), F_SETFD, flags | FD_CLOEXEC))?;
let flags = syscall!(fcntl(write.as_raw_fd(), F_GETFD))?;
syscall!(fcntl(write.as_raw_fd(), F_SETFD, flags | FD_CLOEXEC))?;
Ok((read, write))
}
}
}
impl Notifier {
fn new() -> io::Result<Self> {
#[cfg(not(target_os = "redox"))]
{
match syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)) {
Ok(fd) => {
let owned = unsafe { OwnedFd::from_raw_fd(fd) };
return Ok(Notifier::EventFd(owned));
}
Err(_err) => {}
}
}
let (read, write) = pipe()?;
let flags = syscall!(fcntl(read.as_raw_fd(), libc::F_GETFL))?;
syscall!(fcntl(read.as_raw_fd(), libc::F_SETFL, flags | libc::O_NONBLOCK))?;
Ok(Notifier::Pipe { read_pipe: read, write_pipe: write })
}
pub fn notify(&self) {
match self {
#[cfg(not(target_os = "redox"))]
Self::EventFd(fd) => {
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
fd.as_raw_fd(),
buf.as_ptr().cast::<libc::c_void>(),
buf.len()
));
}
Self::Pipe { write_pipe, .. } => {
let buf = [0; 1];
syscall!(write(
write_pipe.as_raw_fd(),
buf.as_ptr().cast::<libc::c_void>(),
buf.len()
))
.ok();
}
}
}
fn clear(&self) {
match self {
#[cfg(not(target_os = "redox"))]
Self::EventFd(fd) => {
const SIZE: usize = 8;
let mut buf = [0u8; SIZE];
let _ = syscall!(read(
fd.as_raw_fd(),
buf.as_mut_ptr().cast::<libc::c_void>(),
SIZE
));
}
Self::Pipe { read_pipe, .. } => {
const SIZE: usize = 1024;
while syscall!(read(
read_pipe.as_raw_fd(),
[0u8; SIZE].as_mut_ptr().cast::<libc::c_void>(),
SIZE
))
.is_ok()
{}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
crate::generate_tests!(OsPoller::new().unwrap());
}