use std::io;
use std::os::unix::io::RawFd;
use libc::{
kevent, kqueue, timespec, EVFILT_READ, EVFILT_WRITE, EV_ADD, EV_CLEAR, EV_DELETE, EV_ERROR,
};
use super::ReactorBackend;
use crate::platform::sys::{Event, Events, Interest, RawSource};
const MAX_EVENTS: usize = 1024;
pub(crate) struct KqueueReactor {
kq_fd: RawFd,
}
impl KqueueReactor {
pub(crate) fn new() -> io::Result<Self> {
let fd = unsafe { kqueue() };
if fd == -1 {
return Err(io::Error::last_os_error());
}
Ok(Self { kq_fd: fd })
}
fn kevent_submit(&self, changes: &[libc::kevent]) -> io::Result<()> {
let rc = unsafe {
kevent(
self.kq_fd,
changes.as_ptr(),
changes.len() as i32,
std::ptr::null_mut(), 0,
std::ptr::null(), )
};
if rc == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[inline]
fn make_add_event(source: RawSource, filter: i16, token: usize) -> libc::kevent {
libc::kevent {
ident: source as libc::uintptr_t,
filter,
flags: (EV_ADD | EV_CLEAR),
fflags: 0,
data: 0,
udata: token as *mut libc::c_void,
}
}
#[inline]
fn make_del_event(source: RawSource, filter: i16) -> libc::kevent {
libc::kevent {
ident: source as libc::uintptr_t,
filter,
flags: EV_DELETE,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
}
}
}
impl ReactorBackend for KqueueReactor {
fn new() -> io::Result<Self> {
KqueueReactor::new()
}
fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
let mut changes: Vec<libc::kevent> = Vec::with_capacity(2);
if interest.is_readable() {
changes.push(Self::make_add_event(source, EVFILT_READ, token));
}
if interest.is_writable() {
changes.push(Self::make_add_event(source, EVFILT_WRITE, token));
}
self.kevent_submit(&changes)
}
fn reregister(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
self.register(source, token, interest)
}
fn deregister(&self, source: RawSource) -> io::Result<()> {
let changes = [
Self::make_del_event(source, EVFILT_READ),
Self::make_del_event(source, EVFILT_WRITE),
];
let mut out: [libc::kevent; 2] = unsafe { std::mem::zeroed() };
let rc = unsafe {
kevent(
self.kq_fd,
changes.as_ptr(),
changes.len() as i32,
out.as_mut_ptr(),
out.len() as i32,
std::ptr::null(),
)
};
if rc == -1 {
return Err(io::Error::last_os_error());
}
for entry in &out[..rc as usize] {
if entry.flags & EV_ERROR != 0 && entry.data != libc::ENOENT as isize {
return Err(io::Error::from_raw_os_error(entry.data as i32));
}
}
Ok(())
}
fn poll(&self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize> {
let cap = MAX_EVENTS.max(events.capacity());
let mut raw: Vec<libc::kevent> = Vec::with_capacity(cap);
let ts_storage;
let ts_ptr = match timeout_ms {
Some(ms) => {
ts_storage = timespec {
tv_sec: (ms / 1000) as libc::time_t,
tv_nsec: ((ms % 1000) * 1_000_000) as libc::c_long,
};
&ts_storage as *const timespec
}
None => std::ptr::null(), };
let n = unsafe {
kevent(
self.kq_fd,
std::ptr::null(), 0,
raw.as_mut_ptr(),
cap as i32,
ts_ptr,
)
};
if n == -1 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
return Ok(0);
}
return Err(err);
}
let n = n as usize;
unsafe { raw.set_len(n) };
events.clear();
events.reserve(n);
for kev in &raw {
if kev.flags & EV_ERROR != 0 {
continue;
}
let token = kev.udata as usize;
let readable = kev.filter == EVFILT_READ;
let writable = kev.filter == EVFILT_WRITE;
events.push(Event::new(token, readable, writable));
}
Ok(events.len())
}
}
impl Drop for KqueueReactor {
fn drop(&mut self) {
unsafe { libc::close(self.kq_fd) };
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::platform::sys::{create_pipe, events_with_capacity};
#[test]
fn kqueue_reactor_creates_successfully() {
KqueueReactor::new().expect("kqueue() should succeed");
}
#[test]
fn register_and_deregister_pipe_read_end() {
let reactor = KqueueReactor::new().unwrap();
let (r, w) = create_pipe().unwrap();
reactor
.register(r, 1, Interest::READABLE)
.expect("register failed");
reactor.deregister(r).expect("deregister failed");
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
#[test]
fn poll_detects_readable_pipe() {
let reactor = KqueueReactor::new().unwrap();
let (r, w) = create_pipe().unwrap();
reactor.register(r, 77, Interest::READABLE).unwrap();
let byte: u8 = 0xFF;
unsafe { libc::write(w, &byte as *const u8 as *const _, 1) };
let mut events = events_with_capacity(16);
let n = reactor.poll(&mut events, Some(200)).expect("poll failed");
assert!(n >= 1);
let ev = events
.iter()
.find(|e| e.token == 77)
.expect("token 77 not found");
assert!(ev.readable);
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
}