use super::{
super::{Interest, ReadinessPoll},
NOTIFY_KEY,
};
use std::cell::RefCell;
use std::marker::PhantomData;
use std::time::Duration;
use std::{
collections::HashSet,
os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
};
use std::{io, ptr};
const NOTIFY_IDENT: usize = NOTIFY_KEY as usize;
pub struct OsPoller {
kq_fd: OwnedFd,
registered_fds: RefCell<HashSet<RawFd>>,
registered_timers: RefCell<HashSet<u64>>,
notify: notify::Notify,
_not_send: PhantomData<*const ()>,
}
impl OsPoller {
pub fn new() -> io::Result<Self> {
let kqueue_fd = unsafe { OwnedFd::from_raw_fd(syscall!(kqueue())?) };
syscall!(fcntl(kqueue_fd.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?;
let kqueue = Self {
kq_fd: kqueue_fd,
registered_fds: RefCell::new(HashSet::new()),
registered_timers: RefCell::new(HashSet::new()),
notify: notify::Notify::new()?,
_not_send: PhantomData,
};
kqueue.notify.register(&kqueue)?;
Ok(kqueue)
}
pub(crate) fn submit_changes(
&self,
changelist: &[<Self as ReadinessPoll>::NativeEvent],
) -> io::Result<()> {
let changes = changelist;
let nchanges = changes.len();
let mut eventlist: Vec<<Self as ReadinessPoll>::NativeEvent> =
Vec::with_capacity(nchanges);
let spare = eventlist.spare_capacity_mut();
let nevents = spare.len();
syscall!(kevent(
self.kq_fd.as_raw_fd(),
changes.as_ptr(),
nchanges as libc::c_int,
spare.as_mut_ptr().cast(),
nevents as libc::c_int,
ptr::null(),
))?;
for ev in &eventlist {
if (ev.flags & libc::EV_ERROR) != 0 {
let err_code = ev.data as i32;
if err_code != 0 && err_code != libc::ENOENT && err_code != libc::EPIPE
{
return Err(io::Error::from_raw_os_error(err_code));
}
}
}
Ok(())
}
fn change_interests_batched(
&self,
fd: RawFd,
key: usize,
interest: Interest,
) -> io::Result<()> {
if interest.is_timer() {
let duration_ms = fd;
let kev = libc::kevent {
ident: key as libc::uintptr_t,
filter: libc::EVFILT_TIMER,
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
fflags: 0,
data: duration_ms as isize,
udata: key as *mut libc::c_void,
};
syscall!(kevent(
self.kq_fd.as_raw_fd(),
&kev as *const libc::kevent,
1,
ptr::null_mut(),
0,
ptr::null(),
))?;
return Ok(());
}
let mut changes: [libc::kevent; 2] = unsafe { std::mem::zeroed() };
let mut n = 0;
if interest.is_readable() {
changes[n] = libc::kevent {
ident: fd as libc::uintptr_t,
filter: libc::EVFILT_READ,
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
fflags: 0,
data: 0,
udata: key as *mut libc::c_void,
};
n += 1;
}
if interest.is_writable() {
assert!(n < 2, "Must have space for WRITE event");
changes[n] = libc::kevent {
ident: fd as libc::uintptr_t,
filter: libc::EVFILT_WRITE,
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
fflags: 0,
data: 0,
udata: key as *mut libc::c_void,
};
n += 1;
}
assert!(n <= 2, "n must be 0, 1, or 2, got {}", n);
if n > 0 {
let ret = syscall!(kevent(
self.kq_fd.as_raw_fd(),
changes.as_ptr(),
n as i32,
ptr::null_mut(),
0,
ptr::null(),
))?;
assert_eq!(
ret, 0,
"kevent with no output events should return 0, got {}",
ret
);
}
Ok(())
}
fn delete_interest(&self, fd: RawFd, filter: i16) -> io::Result<()> {
let kev = libc::kevent {
ident: fd as libc::uintptr_t,
filter,
flags: libc::EV_DELETE,
fflags: 0,
data: 0,
udata: ptr::null_mut(),
};
let result = syscall!(kevent(
self.kq_fd.as_raw_fd(),
&kev as *const libc::kevent,
1,
std::ptr::null_mut(),
0,
std::ptr::null(),
));
match result {
Err(err) if !utils::is_not_found_error(&err) => Err(err),
_ => Ok(()),
}
}
}
impl ReadinessPoll for OsPoller {
type NativeEvent = libc::kevent;
fn add(&self, fd: RawFd, key: u64, interest: Interest) -> io::Result<()> {
if interest.is_timer() {
if !self.registered_timers.borrow_mut().insert(key) {
return Err(io::Error::from_raw_os_error(libc::EEXIST));
}
match self.change_interests_batched(fd, key as usize, interest) {
Ok(()) => Ok(()),
Err(e) => {
self.registered_timers.borrow_mut().remove(&key);
Err(e)
}
}
} else {
if !self.registered_fds.borrow_mut().insert(fd) {
return Err(io::Error::from_raw_os_error(libc::EEXIST));
}
match self.change_interests_batched(fd, key as usize, interest) {
Ok(()) => {
assert!(
self.registered_fds.borrow().contains(&fd),
"fd should be in registered_fds after successful add"
);
Ok(())
}
Err(e) => {
let removed = self.registered_fds.borrow_mut().remove(&fd);
assert!(
removed,
"fd should have been in registered_fds for rollback"
);
Err(e)
}
}
}
}
fn modify(&self, fd: RawFd, key: u64, interest: Interest) -> io::Result<()> {
if interest.is_timer() {
if !self.registered_timers.borrow().contains(&key) {
return Err(io::Error::from_raw_os_error(libc::ENOENT));
}
self.change_interests_batched(fd, key as usize, interest)
} else {
if !self.registered_fds.borrow().contains(&fd) {
return Err(io::Error::from_raw_os_error(libc::ENOENT));
}
let result = self.change_interests_batched(fd, key as usize, interest);
assert!(
self.registered_fds.borrow().contains(&fd),
"fd should still be in registered_fds after modify"
);
result
}
}
fn delete(&self, fd: RawFd) -> io::Result<()> {
if !self.registered_fds.borrow_mut().remove(&fd) {
return Err(io::Error::from_raw_os_error(libc::ENOENT));
}
assert!(
!self.registered_fds.borrow().contains(&fd),
"fd should not be in registered_fds after remove"
);
let read_result = self.delete_interest(fd, libc::EVFILT_READ);
let write_result = self.delete_interest(fd, libc::EVFILT_WRITE);
match (read_result, write_result) {
(Ok(()), _) | (_, Ok(())) => Ok(()),
(Err(read_err), Err(write_err)) => {
if !utils::is_not_found_error(&read_err) {
Err(read_err)
} else if !utils::is_not_found_error(&write_err) {
Err(write_err)
} else {
Ok(())
}
}
}
}
fn delete_timer(&self, key: u64) -> io::Result<()> {
if !self.registered_timers.borrow_mut().remove(&key) {
return Err(io::Error::from_raw_os_error(libc::ENOENT));
}
let kev = libc::kevent {
ident: key as libc::uintptr_t,
filter: libc::EVFILT_TIMER,
flags: libc::EV_DELETE,
fflags: 0,
data: 0,
udata: ptr::null_mut(),
};
let result = syscall!(kevent(
self.kq_fd.as_raw_fd(),
&kev as *const libc::kevent,
1,
std::ptr::null_mut(),
0,
std::ptr::null(),
));
match result {
Err(err) if !utils::is_not_found_error(&err) => Err(err),
_ => Ok(()),
}
}
fn wait(
&self,
events: &mut [Self::NativeEvent],
timeout: Option<Duration>,
) -> io::Result<usize> {
let timeout_storage = utils::timeout_to_timespec(timeout);
let timeout_ptr = timeout_storage
.as_ref()
.map_or(std::ptr::null(), |ts| ts as *const libc::timespec);
let ret = syscall!(kevent(
self.kq_fd.as_raw_fd(),
std::ptr::null(),
0,
events.as_mut_ptr(),
events.len() as i32,
timeout_ptr,
))?;
let n = ret as usize;
assert!(
n <= events.len(),
"kevent returned more events ({}) than buffer size ({})",
n,
events.len()
);
Ok(n)
}
fn notify(&self) -> io::Result<()> {
let kev = libc::kevent {
ident: NOTIFY_IDENT as libc::uintptr_t,
filter: libc::EVFILT_USER,
flags: 0,
fflags: libc::NOTE_TRIGGER, data: 0,
udata: NOTIFY_IDENT as *mut libc::c_void,
};
syscall!(kevent(
self.kq_fd.as_raw_fd(),
&kev as *const libc::kevent,
1,
ptr::null_mut(),
0,
ptr::null(),
))?;
Ok(())
}
fn event_key(event: &Self::NativeEvent) -> u64 {
event.udata as u64
}
fn event_interest(event: &Self::NativeEvent) -> Interest {
match event.filter {
libc::EVFILT_READ => Interest::READ,
libc::EVFILT_WRITE => Interest::WRITE,
libc::EVFILT_TIMER => Interest::TIMER,
_ => Interest::READ, }
}
}
#[cfg(test)]
mod tests {
use super::*;
crate::generate_tests!(OsPoller::new().unwrap());
}
mod utils {
use std::io;
use std::time::Duration;
pub fn duration_to_timespec(duration: Duration) -> libc::timespec {
libc::timespec {
tv_sec: duration.as_secs() as libc::time_t,
tv_nsec: duration.subsec_nanos() as libc::c_long,
}
}
pub fn timeout_to_timespec(
timeout: Option<Duration>,
) -> Option<libc::timespec> {
timeout.map(duration_to_timespec)
}
pub fn is_not_found_error(err: &io::Error) -> bool {
err.raw_os_error() == Some(libc::ENOENT)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_duration_to_timespec() {
let dur = Duration::from_millis(1500);
let ts = duration_to_timespec(dur);
assert_eq!(ts.tv_sec, 1);
assert_eq!(ts.tv_nsec, 500_000_000);
}
#[test]
fn test_timeout_to_timespec_some() {
let dur = Duration::from_millis(2500);
let timeout = Some(dur);
let storage = timeout_to_timespec(timeout);
assert!(storage.is_some());
let ts = storage.unwrap();
assert_eq!(ts.tv_sec, 2);
assert_eq!(ts.tv_nsec, 500_000_000);
let ptr = &ts as *const libc::timespec;
assert!(!ptr.is_null());
unsafe {
assert_eq!((*ptr).tv_sec, 2);
assert_eq!((*ptr).tv_nsec, 500_000_000);
}
}
#[test]
fn test_timeout_to_timespec_none() {
let storage = timeout_to_timespec(None);
assert!(storage.is_none());
}
#[test]
fn test_is_not_found_error() {
let err = io::Error::from_raw_os_error(libc::ENOENT);
assert!(is_not_found_error(&err));
let err = io::Error::from_raw_os_error(libc::EINVAL);
assert!(!is_not_found_error(&err));
}
}
}
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
target_vendor = "apple",
))]
#[allow(dead_code)]
mod notify {
use super::*;
use crate::backends::pollingv2::os::NOTIFY_KEY;
use std::io;
#[derive(Debug)]
pub(super) struct Notify;
impl Notify {
pub(super) fn new() -> io::Result<Self> {
Ok(Self)
}
pub(super) fn register(&self, poller: &OsPoller) -> io::Result<()> {
poller.submit_changes(&[libc::kevent {
ident: NOTIFY_IDENT as libc::uintptr_t,
filter: libc::EVFILT_USER,
flags: (libc::EV_ADD | libc::EV_RECEIPT | libc::EV_CLEAR) as _,
fflags: 0,
data: 0,
udata: NOTIFY_KEY as *mut _,
}])
}
pub(super) fn reregister(&self, _poller: &OsPoller) -> io::Result<()> {
Ok(())
}
pub(super) fn notify(&self, poller: &OsPoller) -> io::Result<()> {
poller.submit_changes(&[libc::kevent {
ident: 0,
filter: libc::EVFILT_USER as _,
flags: (libc::EV_ADD | libc::EV_RECEIPT) as _,
fflags: libc::NOTE_TRIGGER,
data: 0,
udata: NOTIFY_KEY as *mut _,
}])?;
Ok(())
}
pub(super) fn deregister(&self, poller: &OsPoller) -> io::Result<()> {
poller.submit_changes(&[libc::kevent {
ident: 0,
filter: libc::EVFILT_USER as _,
flags: (libc::EV_DELETE | libc::EV_RECEIPT) as _,
fflags: 0,
data: 0,
udata: NOTIFY_KEY as *mut _,
}])
}
}
}
#[cfg(not(any(
target_os = "freebsd",
target_os = "dragonfly",
target_vendor = "apple",
)))]
mod notify {
use super::Poller;
use crate::{Event, NOTIFY_KEY, PollMode};
use std::io::{self, prelude::*};
#[cfg(feature = "tracing")]
use std::os::unix::io::BorrowedFd;
use std::os::unix::{
io::{AsFd, AsRawFd},
net::UnixStream,
};
#[derive(Debug)]
pub(super) struct Notify {
read_stream: UnixStream,
write_stream: UnixStream,
}
impl Notify {
pub(super) fn new() -> io::Result<Self> {
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
Ok(Self { read_stream, write_stream })
}
pub(super) fn register(&self, poller: &Poller) -> io::Result<()> {
unsafe {
poller.add(
self.read_stream.as_raw_fd(),
Event::readable(NOTIFY_KEY),
PollMode::Oneshot,
)
}
}
pub(super) fn reregister(&self, poller: &Poller) -> io::Result<()> {
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
poller.modify(
self.read_stream.as_fd(),
Event::readable(NOTIFY_KEY),
PollMode::Oneshot,
)
}
#[allow(clippy::unused_io_amount)]
pub(super) fn notify(&self, _poller: &Poller) -> io::Result<()> {
(&self.write_stream).write(&[1])?;
Ok(())
}
pub(super) fn deregister(&self, poller: &Poller) -> io::Result<()> {
poller.delete(self.read_stream.as_fd())
}
#[cfg(feature = "tracing")]
pub(super) fn has_fd(&self, fd: BorrowedFd<'_>) -> bool {
self.read_stream.as_raw_fd() == fd.as_raw_fd()
}
}
}