use std::io;
use std::os::unix::io::RawFd;
use libc::{
epoll_create1, epoll_ctl, epoll_event, epoll_wait, EPOLLET, EPOLLIN, EPOLLONESHOT, EPOLLOUT,
EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD,
};
use super::ReactorBackend;
use crate::platform::sys::{Event, Events, Interest, RawSource};
const MAX_EVENTS: usize = 1024;
pub(crate) struct EpollReactor {
epoll_fd: RawFd,
}
impl EpollReactor {
pub(crate) fn new() -> io::Result<Self> {
let fd = unsafe { epoll_create1(EPOLL_CLOEXEC) };
if fd == -1 {
return Err(io::Error::last_os_error());
}
Ok(Self { epoll_fd: fd })
}
fn build_event(token: usize, interest: Interest) -> epoll_event {
let mut events: u32 = EPOLLET as u32 | EPOLLONESHOT as u32;
if interest.is_readable() {
events |= EPOLLIN as u32;
}
if interest.is_writable() {
events |= EPOLLOUT as u32;
}
epoll_event {
events,
u64: token as u64,
}
}
}
impl ReactorBackend for EpollReactor {
fn new() -> io::Result<Self> {
EpollReactor::new()
}
fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
let mut ev = Self::build_event(token, interest);
let rc = unsafe { epoll_ctl(self.epoll_fd, EPOLL_CTL_ADD, source, &mut ev) };
if rc == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
fn reregister(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
let mut ev = Self::build_event(token, interest);
let rc = unsafe { epoll_ctl(self.epoll_fd, EPOLL_CTL_MOD, source, &mut ev) };
if rc == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
fn deregister(&self, source: RawSource) -> io::Result<()> {
let rc = unsafe { epoll_ctl(self.epoll_fd, EPOLL_CTL_DEL, source, std::ptr::null_mut()) };
if rc == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
fn poll(&self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize> {
let cap = MAX_EVENTS.max(events.capacity());
let mut raw: Vec<epoll_event> = Vec::with_capacity(cap);
let timeout = match timeout_ms {
Some(ms) => ms.min(i32::MAX as u64) as i32,
None => -1, };
let n = unsafe { epoll_wait(self.epoll_fd, raw.as_mut_ptr(), cap as i32, timeout) };
if n == -1 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
return Ok(0);
}
return Err(err);
}
let n = n as usize;
unsafe { raw.set_len(n) };
events.clear();
events.reserve(n);
for ev in &raw {
let token = ev.u64 as usize;
let readable = ev.events & EPOLLIN as u32 != 0;
let writable = ev.events & EPOLLOUT as u32 != 0;
events.push(Event::new(token, readable, writable));
}
Ok(n)
}
}
impl Drop for EpollReactor {
fn drop(&mut self) {
unsafe { libc::close(self.epoll_fd) };
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::platform::sys::{create_pipe, events_with_capacity};
#[test]
fn epoll_reactor_creates_successfully() {
EpollReactor::new().expect("epoll_create1 should succeed");
}
#[test]
fn register_and_deregister_pipe_read_end() {
let reactor = EpollReactor::new().unwrap();
let (r, w) = create_pipe().unwrap();
reactor
.register(r, 1, Interest::READABLE)
.expect("register failed");
reactor.deregister(r).expect("deregister failed");
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
#[test]
fn poll_detects_readable_pipe() {
let reactor = EpollReactor::new().unwrap();
let (r, w) = create_pipe().unwrap();
reactor.register(r, 99, Interest::READABLE).unwrap();
let byte: u8 = 1;
unsafe { libc::write(w, &byte as *const u8 as *const _, 1) };
let mut events = events_with_capacity(16);
let n = reactor.poll(&mut events, Some(100)).expect("poll failed");
assert_eq!(n, 1);
assert_eq!(events[0].token, 99);
assert!(events[0].readable);
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
}