#![allow(unsafe_code)]
#[cfg(any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly"
))]
mod kqueue_impl {
use super::super::{Event, Events, Interest, Reactor, Source, Token};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::io;
use std::os::fd::RawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
const WAKE_TOKEN: usize = usize::MAX;
#[derive(Debug)]
struct RegistrationInfo {
raw_fd: RawFd,
interest: Interest,
}
pub struct KqueueReactor {
kq_fd: RawFd,
wake_pipe: (RawFd, RawFd),
wake_pending: AtomicBool,
registrations: Mutex<HashMap<Token, RegistrationInfo>>,
poll_events: Mutex<Vec<libc::kevent>>,
}
const DEFAULT_POLL_EVENTS_CAPACITY: usize = 64;
#[inline]
fn should_resize_poll_events(current: usize, target: usize) -> bool {
current < target || target.checked_mul(4).is_some_and(|t4| current >= t4)
}
impl KqueueReactor {
#[inline]
fn validate_supported_interest(interest: Interest) -> io::Result<()> {
if interest.is_priority() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Interest::PRIORITY is not supported by the raw macOS kqueue reactor",
));
}
if interest.is_dispatch() && interest.is_oneshot() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Interest::DISPATCH and Interest::ONESHOT are mutually exclusive",
));
}
Ok(())
}
#[inline]
fn validate_register_request(
token: Token,
raw_fd: RawFd,
interest: Interest,
) -> io::Result<()> {
Self::validate_supported_interest(interest)?;
if token.0 == WAKE_TOKEN {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"token collides with the reactor wake token",
));
}
if unsafe { libc::fcntl(raw_fd, libc::F_GETFD) } == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[inline]
fn registration_flags(interest: Interest) -> libc::c_ushort {
let mut flags = libc::EV_ADD as libc::c_ushort;
if interest.is_edge_triggered() {
flags |= libc::EV_CLEAR as libc::c_ushort;
} else if interest.is_dispatch() {
flags |= libc::EV_DISPATCH as libc::c_ushort;
} else {
flags |= libc::EV_ONESHOT as libc::c_ushort;
}
if interest.is_oneshot() {
flags |= libc::EV_ONESHOT as libc::c_ushort;
}
flags
}
pub fn new() -> io::Result<Self> {
let kq_fd = unsafe { libc::kqueue() };
if kq_fd < 0 {
return Err(io::Error::last_os_error());
}
let mut fds = [0i32; 2];
if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
unsafe {
libc::close(kq_fd);
}
return Err(io::Error::last_os_error());
}
let wake_pipe = (fds[0], fds[1]);
for &fd in &[wake_pipe.0, wake_pipe.1] {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if flags < 0 {
unsafe {
libc::close(kq_fd);
libc::close(wake_pipe.0);
libc::close(wake_pipe.1);
}
return Err(io::Error::last_os_error());
}
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } < 0 {
unsafe {
libc::close(kq_fd);
libc::close(wake_pipe.0);
libc::close(wake_pipe.1);
}
return Err(io::Error::last_os_error());
}
}
let reactor = Self {
kq_fd,
wake_pipe,
wake_pending: AtomicBool::new(false),
registrations: Mutex::new(HashMap::new()),
poll_events: Mutex::new(Vec::with_capacity(DEFAULT_POLL_EVENTS_CAPACITY)),
};
reactor.register_wake_pipe()?;
Ok(reactor)
}
fn register_wake_pipe(&self) -> io::Result<()> {
let kev = libc::kevent {
ident: self.wake_pipe.0 as usize,
filter: libc::EVFILT_READ,
flags: libc::EV_ADD | libc::EV_CLEAR,
fflags: 0,
data: 0,
udata: WAKE_TOKEN as *mut libc::c_void,
};
let ret = unsafe {
libc::kevent(
self.kq_fd,
&kev,
1,
std::ptr::null_mut(),
0,
std::ptr::null(),
)
};
if ret < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
fn drain_wake_pipe(&self) {
let mut buf = [0u8; 64];
loop {
let ret = unsafe {
libc::read(
self.wake_pipe.0,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len(),
)
};
if ret < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
break;
} else if ret == 0 {
break;
}
}
self.wake_pending.store(false, Ordering::Release);
}
fn interest_to_kevents(
fd: RawFd,
token: Token,
interest: Interest,
previous_interest: Option<Interest>,
) -> Vec<libc::kevent> {
let mut kevents = Vec::with_capacity(2);
let flags = Self::registration_flags(interest);
let old_interest = previous_interest.unwrap_or(Interest::NONE);
if interest.is_readable() || old_interest.is_readable() {
kevents.push(libc::kevent {
ident: fd as usize,
filter: libc::EVFILT_READ,
flags: if interest.is_readable() {
flags
} else {
libc::EV_DELETE
},
fflags: 0,
data: 0,
udata: token.0 as *mut libc::c_void,
});
}
if interest.is_writable() || old_interest.is_writable() {
kevents.push(libc::kevent {
ident: fd as usize,
filter: libc::EVFILT_WRITE,
flags: if interest.is_writable() {
flags
} else {
libc::EV_DELETE
},
fflags: 0,
data: 0,
udata: token.0 as *mut libc::c_void,
});
}
kevents
}
fn kevent_to_interest(
filter: i16,
flags: u16,
registered_interest: Option<Interest>,
) -> Interest {
let mut interest = Interest::NONE;
let registered_interest =
registered_interest.unwrap_or(Interest::READABLE | Interest::WRITABLE);
if filter == libc::EVFILT_READ && registered_interest.is_readable() {
interest = interest.add(Interest::READABLE);
}
if filter == libc::EVFILT_WRITE && registered_interest.is_writable() {
interest = interest.add(Interest::WRITABLE);
}
if flags & libc::EV_EOF != 0 {
interest = interest.add(Interest::HUP);
}
if flags & libc::EV_ERROR != 0 {
interest = interest.add(Interest::ERROR);
}
interest
}
}
impl Reactor for KqueueReactor {
fn register(
&self,
source: &dyn Source,
token: Token,
interest: Interest,
) -> io::Result<()> {
let raw_fd = source.as_raw_fd();
Self::validate_register_request(token, raw_fd, interest)?;
let mut regs = self.registrations.lock();
if regs.contains_key(&token) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"token already registered",
));
}
if regs.values().any(|info| info.raw_fd == raw_fd) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"fd already registered",
));
}
let kevents = Self::interest_to_kevents(raw_fd, token, interest, None);
if !kevents.is_empty() {
let ret = unsafe {
libc::kevent(
self.kq_fd,
kevents.as_ptr(),
kevents.len() as i32,
std::ptr::null_mut(),
0,
std::ptr::null(),
)
};
if ret < 0 {
return Err(io::Error::last_os_error());
}
}
regs.insert(token, RegistrationInfo { raw_fd, interest });
Ok(())
}
fn modify(&self, token: Token, interest: Interest) -> io::Result<()> {
Self::validate_supported_interest(interest)?;
let mut regs = self.registrations.lock();
let info = regs
.get_mut(&token)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "token not registered"))?;
let old_interest = info.interest;
let kevents =
Self::interest_to_kevents(info.raw_fd, token, interest, Some(old_interest));
if !kevents.is_empty() {
let ret = unsafe {
libc::kevent(
self.kq_fd,
kevents.as_ptr(),
kevents.len() as i32,
std::ptr::null_mut(),
0,
std::ptr::null(),
)
};
if ret < 0 {
return Err(io::Error::last_os_error());
}
}
info.interest = interest;
Ok(())
}
fn deregister(&self, token: Token) -> io::Result<()> {
let mut regs = self.registrations.lock();
let info = regs
.get(&token)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "token not registered"))?;
let raw_fd = info.raw_fd;
let interest = info.interest;
let fd_still_valid = unsafe { libc::fcntl(raw_fd, libc::F_GETFD) } != -1;
let kevents = Self::interest_to_kevents(raw_fd, token, Interest::NONE, Some(interest));
if !kevents.is_empty() {
let ret = unsafe {
libc::kevent(
self.kq_fd,
kevents.as_ptr(),
kevents.len() as i32,
std::ptr::null_mut(),
0,
std::ptr::null(),
)
};
if ret < 0 {
let err = io::Error::last_os_error();
return match err.raw_os_error() {
Some(libc::ENOENT) => {
regs.remove(&token);
Ok(())
}
Some(libc::EBADF) if !fd_still_valid => {
regs.remove(&token);
Ok(())
}
_ => Err(err),
};
}
}
regs.remove(&token);
Ok(())
}
fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
events.clear();
let timespec = timeout.map(|d| libc::timespec {
tv_sec: d.as_secs() as libc::time_t,
tv_nsec: d.subsec_nanos() as libc::c_long,
});
let timeout_ptr = timespec
.as_ref()
.map(|t| t as *const libc::timespec)
.unwrap_or(std::ptr::null());
let requested_capacity = events.capacity().max(1);
let mut kevents = self.poll_events.lock();
let current = kevents.capacity();
let target = requested_capacity;
if should_resize_poll_events(current, target) {
*kevents = Vec::with_capacity(requested_capacity);
} else {
kevents.clear();
}
let ret = unsafe {
libc::kevent(
self.kq_fd,
std::ptr::null(),
0,
kevents.as_mut_ptr(),
kevents.capacity() as i32,
timeout_ptr,
)
};
if ret < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
return Ok(0);
}
return Err(err);
}
unsafe {
kevents.set_len(ret as usize);
}
let regs = self.registrations.lock();
for kev in kevents.iter() {
let token_val = kev.udata as usize;
if token_val == WAKE_TOKEN {
self.drain_wake_pipe();
continue;
}
let token = Token(token_val);
let registered_interest = regs.get(&token).map(|info| info.interest);
let interest = Self::kevent_to_interest(kev.filter, kev.flags, registered_interest);
events.push(Event::new(token, interest));
}
drop(regs);
drop(kevents);
Ok(events.len())
}
fn wake(&self) -> io::Result<()> {
if self
.wake_pending
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let buf = [1u8];
let mut ret;
loop {
ret = unsafe {
libc::write(
self.wake_pipe.1,
buf.as_ptr() as *const libc::c_void,
buf.len(),
)
};
if ret < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
}
break;
}
if ret < 0 {
let err = io::Error::last_os_error();
if err.kind() != io::ErrorKind::WouldBlock {
self.wake_pending.store(false, Ordering::Release);
return Err(err);
}
}
}
Ok(())
}
fn registration_count(&self) -> usize {
self.registrations.lock().len()
}
}
impl Drop for KqueueReactor {
fn drop(&mut self) {
unsafe {
libc::close(self.kq_fd);
libc::close(self.wake_pipe.0);
libc::close(self.wake_pipe.1);
}
}
}
impl std::fmt::Debug for KqueueReactor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let reg_count = self.registrations.lock().len();
f.debug_struct("KqueueReactor")
.field("kq_fd", &self.kq_fd)
.field("wake_pipe", &self.wake_pipe)
.field("registration_count", ®_count)
.finish_non_exhaustive()
}
}
unsafe impl Send for KqueueReactor {}
unsafe impl Sync for KqueueReactor {}
}
#[cfg(any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly"
))]
pub use kqueue_impl::KqueueReactor;
#[cfg(not(any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly"
)))]
mod stub {
use super::super::{Events, Interest, Reactor, Source, Token};
use std::io;
use std::time::Duration;
#[derive(Debug, Default)]
pub struct KqueueReactor;
impl KqueueReactor {
pub fn new() -> io::Result<Self> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"KqueueReactor is only available on macOS and BSD systems",
))
}
}
impl Reactor for KqueueReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"KqueueReactor is only available on macOS and BSD systems",
))
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"KqueueReactor is only available on macOS and BSD systems",
))
}
fn deregister(&self, _token: Token) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"KqueueReactor is only available on macOS and BSD systems",
))
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"KqueueReactor is only available on macOS and BSD systems",
))
}
fn wake(&self) -> io::Result<()> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"KqueueReactor is only available on macOS and BSD systems",
))
}
fn registration_count(&self) -> usize {
0
}
}
}
#[cfg(not(any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly"
)))]
pub use stub::KqueueReactor;
#[cfg(all(
test,
any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly"
)
))]
mod tests {
use super::*;
use crate::runtime::reactor::{Events, Interest, Reactor, Token};
use crate::test_utils::init_test_logging;
use std::os::unix::net::UnixStream;
use std::time::Duration;
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
#[test]
fn create_reactor() {
init_test("kqueue_create_reactor");
let reactor = KqueueReactor::new().expect("failed to create reactor");
crate::assert_with_log!(
reactor.is_empty(),
"reactor empty",
true,
reactor.is_empty()
);
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration count",
0usize,
reactor.registration_count()
);
crate::test_complete!("kqueue_create_reactor");
}
#[test]
fn register_and_deregister() {
init_test("kqueue_register_and_deregister");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(42);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration count",
1usize,
reactor.registration_count()
);
crate::assert_with_log!(
!reactor.is_empty(),
"reactor not empty",
false,
reactor.is_empty()
);
reactor.deregister(token).expect("deregister failed");
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration count",
0usize,
reactor.registration_count()
);
crate::assert_with_log!(
reactor.is_empty(),
"reactor empty",
true,
reactor.is_empty()
);
crate::test_complete!("kqueue_register_and_deregister");
}
#[test]
fn deregister_not_found() {
init_test("kqueue_deregister_not_found");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let result = reactor.deregister(Token::new(999));
crate::assert_with_log!(result.is_err(), "deregister fails", true, result.is_err());
let kind = result.unwrap_err().kind();
crate::assert_with_log!(
kind == io::ErrorKind::NotFound,
"not found kind",
io::ErrorKind::NotFound,
kind
);
crate::test_complete!("kqueue_deregister_not_found");
}
#[test]
fn deregister_hard_delete_failure_preserves_bookkeeping_for_retry() {
init_test("kqueue_deregister_hard_delete_failure_preserves_bookkeeping_for_retry");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(78);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
let saved_kq_fd = unsafe { libc::dup(reactor.kq_fd) };
crate::assert_with_log!(saved_kq_fd >= 0, "dup kqueue fd", true, saved_kq_fd >= 0);
let close_result = unsafe { libc::close(reactor.kq_fd) };
crate::assert_with_log!(close_result == 0, "close kqueue fd", 0, close_result);
let err = reactor
.deregister(token)
.expect_err("deregister should fail when kqueue fd is closed");
let errno = err
.raw_os_error()
.expect("closed kqueue failure should preserve errno");
crate::assert_with_log!(
errno == libc::EBADF,
"closed kqueue reports EBADF",
libc::EBADF,
errno
);
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration kept after hard delete failure",
1usize,
reactor.registration_count()
);
let restore_result = unsafe { libc::dup2(saved_kq_fd, reactor.kq_fd) };
crate::assert_with_log!(
restore_result == reactor.kq_fd,
"restore kqueue fd",
reactor.kq_fd,
restore_result
);
let saved_close = unsafe { libc::close(saved_kq_fd) };
crate::assert_with_log!(saved_close == 0, "close saved kqueue fd", 0, saved_close);
reactor
.deregister(token)
.expect("retry deregister after kqueue restore failed");
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration removed after successful retry",
0usize,
reactor.registration_count()
);
crate::test_complete!(
"kqueue_deregister_hard_delete_failure_preserves_bookkeeping_for_retry"
);
}
#[test]
fn modify_interest() {
init_test("kqueue_modify_interest");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
reactor
.modify(token, Interest::WRITABLE)
.expect("modify failed");
let regs = reactor.registrations.lock();
let info = regs.get(&token).unwrap();
crate::assert_with_log!(
info.interest == Interest::WRITABLE,
"interest updated",
Interest::WRITABLE,
info.interest
);
drop(regs);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_modify_interest");
}
#[test]
fn modify_not_found() {
init_test("kqueue_modify_not_found");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let result = reactor.modify(Token::new(999), Interest::READABLE);
crate::assert_with_log!(result.is_err(), "modify fails", true, result.is_err());
let kind = result.unwrap_err().kind();
crate::assert_with_log!(
kind == io::ErrorKind::NotFound,
"not found kind",
io::ErrorKind::NotFound,
kind
);
crate::test_complete!("kqueue_modify_not_found");
}
#[test]
fn wake_unblocks_poll() {
init_test("kqueue_wake_unblocks_poll");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let mut events = Events::with_capacity(64);
let reactor_ref = &reactor;
std::thread::scope(|s| {
s.spawn(|| {
std::thread::sleep(Duration::from_millis(50));
reactor_ref.wake().expect("wake failed");
});
let start = std::time::Instant::now();
let _count = reactor
.poll(&mut events, Some(Duration::from_secs(5)))
.expect("poll failed");
let elapsed = start.elapsed();
crate::assert_with_log!(
elapsed < Duration::from_secs(1),
"poll woke early",
true,
elapsed < Duration::from_secs(1)
);
});
crate::test_complete!("kqueue_wake_unblocks_poll");
}
#[test]
fn poll_timeout() {
init_test("kqueue_poll_timeout");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let mut events = Events::with_capacity(64);
let start = std::time::Instant::now();
let count = reactor
.poll(&mut events, Some(Duration::from_millis(50)))
.expect("poll failed");
let elapsed = start.elapsed();
crate::assert_with_log!(
elapsed >= Duration::from_millis(40),
"elapsed lower bound",
true,
elapsed >= Duration::from_millis(40)
);
crate::assert_with_log!(
elapsed < Duration::from_millis(200),
"elapsed upper bound",
true,
elapsed < Duration::from_millis(200)
);
crate::assert_with_log!(count == 0, "no events", 0usize, count);
crate::test_complete!("kqueue_poll_timeout");
}
#[test]
fn poll_non_blocking() {
init_test("kqueue_poll_non_blocking");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let mut events = Events::with_capacity(64);
let start = std::time::Instant::now();
let count = reactor
.poll(&mut events, Some(Duration::ZERO))
.expect("poll failed");
let elapsed = start.elapsed();
crate::assert_with_log!(
elapsed < Duration::from_millis(10),
"poll returns quickly",
true,
elapsed < Duration::from_millis(10)
);
crate::assert_with_log!(count == 0, "no events", 0usize, count);
crate::test_complete!("kqueue_poll_non_blocking");
}
#[test]
fn poll_writable() {
init_test("kqueue_poll_writable");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::WRITABLE)
.expect("register failed");
let mut events = Events::with_capacity(64);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "has events", true, count >= 1);
let mut found = false;
for event in events.iter() {
if event.token == token && event.is_writable() {
found = true;
break;
}
}
crate::assert_with_log!(found, "expected writable event for token", true, found);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_poll_writable");
}
#[test]
fn poll_readable() {
init_test("kqueue_poll_readable");
use std::io::Write;
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, mut sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
sock2.write_all(b"hello").expect("write failed");
let mut events = Events::with_capacity(64);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "has events", true, count >= 1);
let mut found = false;
for event in events.iter() {
if event.token == token && event.is_readable() {
found = true;
break;
}
}
crate::assert_with_log!(found, "expected readable event for token", true, found);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_poll_readable");
}
#[test]
fn duplicate_register_fails() {
init_test("kqueue_duplicate_register_fails");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("first register should succeed");
let result = reactor.register(&sock1, token, Interest::WRITABLE);
crate::assert_with_log!(result.is_err(), "duplicate fails", true, result.is_err());
let kind = result.unwrap_err().kind();
crate::assert_with_log!(
kind == io::ErrorKind::AlreadyExists,
"already exists kind",
io::ErrorKind::AlreadyExists,
kind
);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_duplicate_register_fails");
}
#[test]
fn multiple_registrations() {
init_test("kqueue_multiple_registrations");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _) = UnixStream::pair().expect("failed to create unix stream pair");
let (sock2, _) = UnixStream::pair().expect("failed to create unix stream pair");
let (sock3, _) = UnixStream::pair().expect("failed to create unix stream pair");
reactor
.register(&sock1, Token::new(1), Interest::READABLE)
.expect("register 1 failed");
reactor
.register(&sock2, Token::new(2), Interest::WRITABLE)
.expect("register 2 failed");
reactor
.register(&sock3, Token::new(3), Interest::both())
.expect("register 3 failed");
let count = reactor.registration_count();
crate::assert_with_log!(count == 3, "registration count", 3usize, count);
reactor
.deregister(Token::new(2))
.expect("deregister failed");
let count = reactor.registration_count();
crate::assert_with_log!(count == 2, "after deregister", 2usize, count);
reactor
.deregister(Token::new(1))
.expect("deregister failed");
reactor
.deregister(Token::new(3))
.expect("deregister failed");
let count = reactor.registration_count();
crate::assert_with_log!(count == 0, "after deregister all", 0usize, count);
crate::test_complete!("kqueue_multiple_registrations");
}
#[test]
fn debug_impl() {
init_test("kqueue_debug_impl");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let debug_text = format!("{:?}", reactor);
crate::assert_with_log!(
debug_text.contains("KqueueReactor"),
"debug contains type",
true,
debug_text.contains("KqueueReactor")
);
crate::assert_with_log!(
debug_text.contains("registration_count"),
"debug contains registration_count",
true,
debug_text.contains("registration_count")
);
crate::test_complete!("kqueue_debug_impl");
}
}