#![allow(unsafe_code)]
use super::{Event, Events, Interest, Reactor, Source, Token};
use parking_lot::Mutex;
use polling::{Event as PollEvent, Events as PollingEvents, PollMode, Poller};
use std::collections::HashMap;
use std::io;
use std::num::NonZeroUsize;
use std::os::fd::BorrowedFd;
use std::time::Duration;
#[derive(Debug)]
struct RegistrationInfo {
raw_fd: i32,
interest: Interest,
}
pub struct KqueueReactor {
poller: Poller,
registrations: Mutex<HashMap<Token, RegistrationInfo>>,
poll_events: Mutex<PollingEvents>,
}
const DEFAULT_POLL_EVENTS_CAPACITY: usize = 64;
#[inline]
fn should_resize_poll_events(current: usize, target: usize) -> bool {
current < target || target.checked_mul(4).is_some_and(|t4| current >= t4)
}
impl KqueueReactor {
#[inline]
fn validate_supported_interest(interest: Interest) -> io::Result<()> {
if interest.is_dispatch() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Interest::DISPATCH is not supported by the kqueue reactor",
));
}
if interest.is_priority() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Interest::PRIORITY is not supported by the kqueue reactor",
));
}
Ok(())
}
#[inline]
const fn interest_to_poll_mode(interest: Interest) -> PollMode {
let single_shot = interest.is_oneshot();
if interest.is_edge_triggered() {
if single_shot {
PollMode::EdgeOneshot
} else {
PollMode::Edge
}
} else {
PollMode::Oneshot
}
}
pub fn new() -> io::Result<Self> {
let poller = Poller::new()?;
Ok(Self {
poller,
registrations: Mutex::new(HashMap::new()),
poll_events: Mutex::new(PollingEvents::with_capacity(
NonZeroUsize::new(DEFAULT_POLL_EVENTS_CAPACITY).expect("non-zero capacity"),
)),
})
}
fn interest_to_poll_event(token: Token, interest: Interest) -> PollEvent {
let key = token.0;
let readable = interest.is_readable();
let writable = interest.is_writable();
match (readable, writable) {
(true, true) => PollEvent::all(key),
(true, false) => PollEvent::readable(key),
(false, true) => PollEvent::writable(key),
(false, false) => PollEvent::none(key),
}
}
fn poll_event_to_interest(
event: &PollEvent,
registered_interest: Option<Interest>,
) -> Interest {
let mut observed_readiness = Interest::NONE;
if event.readable {
observed_readiness = observed_readiness.add(Interest::READABLE);
}
if event.writable {
observed_readiness = observed_readiness.add(Interest::WRITABLE);
}
let mut interest = match registered_interest {
Some(registered) => observed_readiness & registered,
None => observed_readiness,
};
let eof_for_single_direction = event.readable
&& event.writable
&& registered_interest
.is_some_and(|registered| registered.is_readable() ^ registered.is_writable());
if eof_for_single_direction {
interest = interest.add(Interest::HUP);
}
interest
}
}
impl Reactor for KqueueReactor {
fn register(&self, source: &dyn Source, token: Token, interest: Interest) -> io::Result<()> {
Self::validate_supported_interest(interest)?;
let raw_fd = source.as_raw_fd();
let mut regs = self.registrations.lock();
if regs.contains_key(&token) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"token already registered",
));
}
if regs.values().any(|info| info.raw_fd == raw_fd) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"fd already registered",
));
}
if unsafe { libc::fcntl(raw_fd, libc::F_GETFD) } == -1 {
return Err(io::Error::last_os_error());
}
let event = Self::interest_to_poll_event(token, interest);
let borrowed_fd = unsafe { BorrowedFd::borrow_raw(raw_fd) };
let mode = Self::interest_to_poll_mode(interest);
unsafe { self.poller.add_with_mode(&borrowed_fd, event, mode)? };
regs.insert(token, RegistrationInfo { raw_fd, interest });
Ok(())
}
fn modify(&self, token: Token, interest: Interest) -> io::Result<()> {
Self::validate_supported_interest(interest)?;
let mut regs = self.registrations.lock();
let entry = match regs.entry(token) {
std::collections::hash_map::Entry::Occupied(entry) => entry,
std::collections::hash_map::Entry::Vacant(_) => {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
));
}
};
let raw_fd = entry.get().raw_fd;
let event = Self::interest_to_poll_event(token, interest);
let borrowed_fd = unsafe { BorrowedFd::borrow_raw(raw_fd) };
let mode = Self::interest_to_poll_mode(interest);
let result = match self.poller.modify_with_mode(&borrowed_fd, event, mode) {
Ok(()) => {
entry.into_mut().interest = interest;
Ok(())
}
Err(err) => match err.raw_os_error() {
Some(libc::ENOENT) => {
entry.remove();
Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
))
}
Some(libc::EBADF) => {
let fd_still_valid = unsafe { libc::fcntl(raw_fd, libc::F_GETFD) } != -1;
if fd_still_valid {
Err(err)
} else {
entry.remove();
Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
))
}
}
_ => Err(err),
},
};
result
}
fn deregister(&self, token: Token) -> io::Result<()> {
let mut regs = self.registrations.lock();
let info = regs
.get(&token)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "token not registered"))?;
let borrowed_fd = unsafe { BorrowedFd::borrow_raw(info.raw_fd) };
let fd_still_valid = unsafe { libc::fcntl(info.raw_fd, libc::F_GETFD) } != -1;
match self.poller.delete(&borrowed_fd) {
Ok(()) => {
regs.remove(&token);
Ok(())
}
Err(err) => match err.raw_os_error() {
Some(libc::ENOENT) => {
regs.remove(&token);
Ok(())
}
Some(libc::EBADF) if !fd_still_valid => {
regs.remove(&token);
Ok(())
}
_ => Err(err),
},
}
}
fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
events.clear();
let requested_capacity = NonZeroUsize::new(events.capacity().max(1)).expect("max(1)");
let mut poll_events = self.poll_events.lock();
let current = poll_events.capacity().get();
let target = requested_capacity.get();
if should_resize_poll_events(current, target) {
*poll_events = PollingEvents::with_capacity(requested_capacity);
} else {
poll_events.clear();
}
self.poller.wait(&mut poll_events, timeout)?;
let registrations = self.registrations.lock();
for poll_event in poll_events.iter() {
let token = Token(poll_event.key);
let registered_interest = registrations.get(&token).map(|info| info.interest);
let interest = Self::poll_event_to_interest(&poll_event, registered_interest);
events.push(Event::new(token, interest));
}
drop(registrations);
drop(poll_events);
Ok(events.len())
}
fn wake(&self) -> io::Result<()> {
self.poller.notify()
}
fn registration_count(&self) -> usize {
self.registrations.lock().len()
}
}
impl std::fmt::Debug for KqueueReactor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let reg_count = self.registrations.lock().len();
f.debug_struct("KqueueReactor")
.field("registration_count", ®_count)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::init_test_logging;
use nix::unistd::{close, dup};
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::time::Duration;
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
#[derive(Debug)]
struct RawFdSource(RawFd);
impl AsRawFd for RawFdSource {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
#[derive(Debug)]
struct FdRestoreGuard {
target_fd: RawFd,
saved_fd: Option<RawFd>,
}
impl FdRestoreGuard {
fn new(target_fd: RawFd, saved_fd: RawFd) -> Self {
Self {
target_fd,
saved_fd: Some(saved_fd),
}
}
fn restore(&mut self) -> (i32, i32) {
let saved_fd = self
.saved_fd
.take()
.expect("restore guard must hold a saved fd");
let restore_result = unsafe { libc::dup2(saved_fd, self.target_fd) };
let close_saved = unsafe { libc::close(saved_fd) };
(restore_result, close_saved)
}
}
impl Drop for FdRestoreGuard {
fn drop(&mut self) {
let Some(saved_fd) = self.saved_fd.take() else {
return;
};
let _ = unsafe { libc::dup2(saved_fd, self.target_fd) };
let _ = unsafe { libc::close(saved_fd) };
}
}
#[test]
fn interest_to_poll_mode_mapping() {
init_test("kqueue_interest_to_poll_mode_mapping");
crate::assert_with_log!(
KqueueReactor::interest_to_poll_mode(Interest::READABLE) == PollMode::Oneshot,
"default oneshot mode",
PollMode::Oneshot,
KqueueReactor::interest_to_poll_mode(Interest::READABLE)
);
crate::assert_with_log!(
KqueueReactor::interest_to_poll_mode(Interest::READABLE.with_edge_triggered())
== PollMode::Edge,
"edge mode",
PollMode::Edge,
KqueueReactor::interest_to_poll_mode(Interest::READABLE.with_edge_triggered())
);
crate::assert_with_log!(
KqueueReactor::interest_to_poll_mode(Interest::READABLE.with_oneshot())
== PollMode::Oneshot,
"oneshot mode",
PollMode::Oneshot,
KqueueReactor::interest_to_poll_mode(Interest::READABLE.with_oneshot())
);
crate::assert_with_log!(
KqueueReactor::interest_to_poll_mode(
Interest::READABLE.with_edge_triggered().with_oneshot()
) == PollMode::EdgeOneshot,
"edge oneshot mode",
PollMode::EdgeOneshot,
KqueueReactor::interest_to_poll_mode(
Interest::READABLE.with_edge_triggered().with_oneshot()
)
);
crate::test_complete!("kqueue_interest_to_poll_mode_mapping");
}
#[test]
fn dispatch_interest_is_rejected() {
init_test("kqueue_dispatch_interest_is_rejected");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let register_err = reactor
.register(&sock1, Token::new(77), Interest::dispatch())
.expect_err("dispatch register should be rejected");
crate::assert_with_log!(
register_err.kind() == io::ErrorKind::InvalidInput,
"register rejects unsupported dispatch interest",
io::ErrorKind::InvalidInput,
register_err.kind()
);
reactor
.register(&sock1, Token::new(78), Interest::READABLE)
.expect("plain register should succeed");
let modify_err = reactor
.modify(Token::new(78), Interest::dispatch())
.expect_err("dispatch modify should be rejected");
crate::assert_with_log!(
modify_err.kind() == io::ErrorKind::InvalidInput,
"modify rejects unsupported dispatch interest",
io::ErrorKind::InvalidInput,
modify_err.kind()
);
reactor
.deregister(Token::new(78))
.expect("cleanup deregister should succeed");
crate::test_complete!("kqueue_dispatch_interest_is_rejected");
}
#[test]
fn priority_interest_is_rejected() {
init_test("kqueue_priority_interest_is_rejected");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let register_err = reactor
.register(
&sock1,
Token::new(79),
Interest::READABLE.add(Interest::PRIORITY),
)
.expect_err("priority register should be rejected");
crate::assert_with_log!(
register_err.kind() == io::ErrorKind::InvalidInput,
"register rejects unsupported priority interest",
io::ErrorKind::InvalidInput,
register_err.kind()
);
reactor
.register(&sock1, Token::new(80), Interest::READABLE)
.expect("plain register should succeed");
let modify_err = reactor
.modify(Token::new(80), Interest::READABLE.add(Interest::PRIORITY))
.expect_err("priority modify should be rejected");
crate::assert_with_log!(
modify_err.kind() == io::ErrorKind::InvalidInput,
"modify rejects unsupported priority interest",
io::ErrorKind::InvalidInput,
modify_err.kind()
);
reactor
.deregister(Token::new(80))
.expect("cleanup deregister should succeed");
crate::test_complete!("kqueue_priority_interest_is_rejected");
}
#[test]
fn create_reactor() {
init_test("kqueue_create_reactor");
let reactor = KqueueReactor::new().expect("failed to create reactor");
crate::assert_with_log!(
reactor.is_empty(),
"reactor empty",
true,
reactor.is_empty()
);
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration count",
0usize,
reactor.registration_count()
);
crate::test_complete!("kqueue_create_reactor");
}
#[test]
fn register_and_deregister() {
init_test("kqueue_register_and_deregister");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(42);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration count",
1usize,
reactor.registration_count()
);
crate::assert_with_log!(
!reactor.is_empty(),
"reactor not empty",
false,
reactor.is_empty()
);
reactor.deregister(token).expect("deregister failed");
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration count",
0usize,
reactor.registration_count()
);
crate::assert_with_log!(
reactor.is_empty(),
"reactor empty",
true,
reactor.is_empty()
);
crate::test_complete!("kqueue_register_and_deregister");
}
#[test]
fn deregister_not_found() {
init_test("kqueue_deregister_not_found");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let result = reactor.deregister(Token::new(999));
crate::assert_with_log!(result.is_err(), "deregister fails", true, result.is_err());
let kind = result.unwrap_err().kind();
crate::assert_with_log!(
kind == io::ErrorKind::NotFound,
"not found kind",
io::ErrorKind::NotFound,
kind
);
crate::test_complete!("kqueue_deregister_not_found");
}
#[test]
fn modify_interest() {
init_test("kqueue_modify_interest");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
reactor
.modify(token, Interest::WRITABLE)
.expect("modify failed");
let regs = reactor.registrations.lock();
let info = regs.get(&token).unwrap();
crate::assert_with_log!(
info.interest == Interest::WRITABLE,
"interest updated",
Interest::WRITABLE,
info.interest
);
drop(regs);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_modify_interest");
}
#[test]
fn modify_not_found() {
init_test("kqueue_modify_not_found");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let result = reactor.modify(Token::new(999), Interest::READABLE);
crate::assert_with_log!(result.is_err(), "modify fails", true, result.is_err());
let kind = result.unwrap_err().kind();
crate::assert_with_log!(
kind == io::ErrorKind::NotFound,
"not found kind",
io::ErrorKind::NotFound,
kind
);
crate::test_complete!("kqueue_modify_not_found");
}
#[test]
fn wake_unblocks_poll() {
init_test("kqueue_wake_unblocks_poll");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let mut events = Events::with_capacity(64);
let reactor_ref = &reactor;
std::thread::scope(|s| {
s.spawn(|| {
std::thread::sleep(Duration::from_millis(50));
reactor_ref.wake().expect("wake failed");
});
let start = std::time::Instant::now();
let _count = reactor
.poll(&mut events, Some(Duration::from_secs(5)))
.expect("poll failed");
let elapsed = start.elapsed();
crate::assert_with_log!(
elapsed < Duration::from_secs(1),
"poll woke early",
true,
elapsed < Duration::from_secs(1)
);
});
crate::test_complete!("kqueue_wake_unblocks_poll");
}
#[test]
fn poll_timeout() {
init_test("kqueue_poll_timeout");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let mut events = Events::with_capacity(64);
let start = std::time::Instant::now();
let count = reactor
.poll(&mut events, Some(Duration::from_millis(50)))
.expect("poll failed");
let elapsed = start.elapsed();
crate::assert_with_log!(
elapsed >= Duration::from_millis(40),
"elapsed lower bound",
true,
elapsed >= Duration::from_millis(40)
);
crate::assert_with_log!(
elapsed < Duration::from_millis(200),
"elapsed upper bound",
true,
elapsed < Duration::from_millis(200)
);
crate::assert_with_log!(count == 0, "no events", 0usize, count);
crate::test_complete!("kqueue_poll_timeout");
}
#[test]
fn poll_non_blocking() {
init_test("kqueue_poll_non_blocking");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let mut events = Events::with_capacity(64);
let start = std::time::Instant::now();
let count = reactor
.poll(&mut events, Some(Duration::ZERO))
.expect("poll failed");
let elapsed = start.elapsed();
crate::assert_with_log!(
elapsed < Duration::from_millis(10),
"poll returns quickly",
true,
elapsed < Duration::from_millis(10)
);
crate::assert_with_log!(count == 0, "no events", 0usize, count);
crate::test_complete!("kqueue_poll_non_blocking");
}
#[test]
fn poll_writable() {
init_test("kqueue_poll_writable");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::WRITABLE)
.expect("register failed");
let mut events = Events::with_capacity(64);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "has events", true, count >= 1);
let mut found = false;
for event in events.iter() {
if event.token == token && event.is_writable() {
found = true;
break;
}
}
crate::assert_with_log!(found, "expected writable event for token", true, found);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_poll_writable");
}
#[test]
fn poll_readable() {
init_test("kqueue_poll_readable");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, mut sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
sock2.write_all(b"hello").expect("write failed");
let mut events = Events::with_capacity(64);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "has events", true, count >= 1);
let mut found = false;
for event in events.iter() {
if event.token == token && event.is_readable() {
found = true;
break;
}
}
crate::assert_with_log!(found, "expected readable event for token", true, found);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_poll_readable");
}
#[test]
fn edge_triggered_requires_drain() {
init_test("kqueue_edge_triggered_requires_drain");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (mut read_sock, mut write_sock) =
UnixStream::pair().expect("failed to create unix stream pair");
read_sock
.set_nonblocking(true)
.expect("failed to set nonblocking");
let token = Token::new(7);
reactor
.register(&read_sock, token, Interest::READABLE.with_edge_triggered())
.expect("register failed");
write_sock.write_all(b"hello").expect("write failed");
let mut events = Events::with_capacity(64);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "has events", true, count >= 1);
let mut buf = [0_u8; 1];
let read_count = read_sock.read(&mut buf).expect("read failed");
crate::assert_with_log!(read_count == 1, "read one byte", 1usize, read_count);
let count = reactor
.poll(&mut events, Some(Duration::ZERO))
.expect("poll failed");
crate::assert_with_log!(count == 0, "no new edge before drain", 0usize, count);
let mut drain_buf = [0_u8; 16];
loop {
match read_sock.read(&mut drain_buf) {
Ok(0) => break,
Ok(_) => continue,
Err(err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(err) => unreachable!("drain failed: {err}"),
}
}
write_sock.write_all(b"world").expect("write failed");
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "edge after new data", true, count >= 1);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_edge_triggered_requires_drain");
}
#[test]
fn default_oneshot_requires_rearm_for_new_edges() {
init_test("kqueue_default_oneshot_requires_rearm_for_new_edges");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (mut read_sock, mut write_sock) =
UnixStream::pair().expect("failed to create unix stream pair");
read_sock
.set_nonblocking(true)
.expect("failed to set nonblocking");
let token = Token::new(8);
reactor
.register(&read_sock, token, Interest::READABLE)
.expect("register failed");
write_sock.write_all(b"hello").expect("write failed");
let mut events = Events::with_capacity(64);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("first poll failed");
crate::assert_with_log!(count >= 1, "first poll has events", true, count >= 1);
let mut found = false;
for event in events.iter() {
if event.token == token && event.is_readable() {
found = true;
break;
}
}
crate::assert_with_log!(found, "first poll found readable event", true, found);
let mut buf = [0_u8; 16];
let read_count = read_sock.read(&mut buf).expect("drain read failed");
crate::assert_with_log!(read_count == 5, "drained first payload", 5usize, read_count);
write_sock.write_all(b"world").expect("second write failed");
events.clear();
let count = reactor
.poll(&mut events, Some(Duration::ZERO))
.expect("second poll failed");
crate::assert_with_log!(
count == 0,
"oneshot stays silent until rearm",
0usize,
count
);
reactor
.modify(token, Interest::READABLE)
.expect("rearm modify failed");
events.clear();
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("third poll failed");
crate::assert_with_log!(count >= 1, "rearmed poll has events", true, count >= 1);
let mut found_after_rearm = false;
for event in events.iter() {
if event.token == token && event.is_readable() {
found_after_rearm = true;
break;
}
}
crate::assert_with_log!(
found_after_rearm,
"rearmed poll found readable event",
true,
found_after_rearm
);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_default_oneshot_requires_rearm_for_new_edges");
}
#[test]
fn duplicate_register_fails() {
init_test("kqueue_duplicate_register_fails");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(1);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("first register should succeed");
let result = reactor.register(&sock1, token, Interest::WRITABLE);
crate::assert_with_log!(result.is_err(), "duplicate fails", true, result.is_err());
let kind = result.unwrap_err().kind();
crate::assert_with_log!(
kind == io::ErrorKind::AlreadyExists,
"already exists kind",
io::ErrorKind::AlreadyExists,
kind
);
reactor.deregister(token).expect("deregister failed");
crate::test_complete!("kqueue_duplicate_register_fails");
}
#[test]
fn register_invalid_fd_fails() {
init_test("kqueue_register_invalid_fd_fails");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let dup_fd = dup(&sock1).expect("dup failed").into_raw_fd();
close(unsafe { std::os::unix::io::OwnedFd::from_raw_fd(dup_fd) }).expect("close failed");
let invalid = RawFdSource(dup_fd);
let result = reactor.register(&invalid, Token::new(99), Interest::READABLE);
crate::assert_with_log!(
result.is_err(),
"invalid fd register",
true,
result.is_err()
);
crate::test_complete!("kqueue_register_invalid_fd_fails");
}
#[test]
fn deregister_closed_fd_is_best_effort() {
init_test("kqueue_deregister_closed_fd_is_best_effort");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(77);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
drop(sock1);
let result = reactor.deregister(token);
crate::assert_with_log!(
result.is_ok(),
"closed fd cleanup succeeds",
true,
result.is_ok()
);
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration removed from bookkeeping",
0usize,
reactor.registration_count()
);
crate::test_complete!("kqueue_deregister_closed_fd_is_best_effort");
}
#[test]
fn deregister_hard_delete_failure_preserves_bookkeeping_for_retry() {
init_test("kqueue_deregister_hard_delete_failure_preserves_bookkeeping_for_retry");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _sock2) = UnixStream::pair().expect("failed to create unix stream pair");
let token = Token::new(78);
reactor
.register(&sock1, token, Interest::READABLE)
.expect("register failed");
let poller_fd = reactor.poller.as_raw_fd();
let borrowed = unsafe { std::os::unix::io::BorrowedFd::borrow_raw(poller_fd) };
let saved_poller_fd = dup(borrowed).expect("dup poller fd failed").into_raw_fd();
let mut poller_restore = FdRestoreGuard::new(poller_fd, saved_poller_fd);
let close_result = unsafe { libc::close(poller_fd) };
crate::assert_with_log!(close_result == 0, "close poller fd", 0, close_result);
let err = reactor
.deregister(token)
.expect_err("deregister should fail when poller fd is closed");
let errno = err
.raw_os_error()
.expect("poller close failure should preserve errno");
crate::assert_with_log!(
errno == libc::EBADF,
"closed poller reports EBADF",
libc::EBADF,
errno
);
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration kept after hard delete failure",
1usize,
reactor.registration_count()
);
let (restore_result, close_saved) = poller_restore.restore();
crate::assert_with_log!(
restore_result == poller_fd,
"restore poller fd",
poller_fd,
restore_result
);
crate::assert_with_log!(close_saved == 0, "close saved poller fd", 0, close_saved);
reactor
.deregister(token)
.expect("retry deregister after poller restore failed");
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration removed after successful retry",
0usize,
reactor.registration_count()
);
crate::test_complete!(
"kqueue_deregister_hard_delete_failure_preserves_bookkeeping_for_retry"
);
}
#[test]
fn reused_fd_cannot_register_under_new_token_until_stale_token_removed() {
init_test("kqueue_reused_fd_cannot_register_under_new_token_until_stale_token_removed");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (old_sock, _old_peer) = UnixStream::pair().expect("failed to create unix stream pair");
let stale_fd = old_sock.as_raw_fd();
let stale_token = Token::new(87);
reactor
.register(&old_sock, stale_token, Interest::READABLE)
.expect("stale registration failed");
drop(old_sock);
let (new_sock, mut write_peer) =
UnixStream::pair().expect("failed to create second unix stream pair");
let new_sock_fd = new_sock.as_raw_fd();
let dup_result = unsafe { libc::dup2(new_sock_fd, stale_fd) };
crate::assert_with_log!(
dup_result == stale_fd,
"dup2 reused stale fd slot",
stale_fd,
dup_result
);
let reused_source = RawFdSource(stale_fd);
let new_token = Token::new(88);
let duplicate_result = reactor.register(&reused_source, new_token, Interest::READABLE);
crate::assert_with_log!(
duplicate_result.is_err(),
"duplicate fd registration rejected while stale token exists",
true,
duplicate_result.is_err()
);
let duplicate_kind = duplicate_result.unwrap_err().kind();
crate::assert_with_log!(
duplicate_kind == io::ErrorKind::AlreadyExists,
"duplicate fd reports already exists",
io::ErrorKind::AlreadyExists,
duplicate_kind
);
reactor
.deregister(stale_token)
.expect("stale token deregister should succeed");
reactor
.register(&reused_source, new_token, Interest::READABLE)
.expect("register reused fd after stale cleanup failed");
write_peer.write_all(b"x").expect("write failed");
let mut events = Events::with_capacity(8);
let count = reactor
.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
crate::assert_with_log!(count >= 1, "has events", true, count >= 1);
let mut found = false;
for event in events.iter() {
if event.token == new_token && event.is_readable() {
found = true;
break;
}
}
crate::assert_with_log!(found, "readable event for reused fd token", true, found);
reactor
.deregister(new_token)
.expect("deregister reused fd token failed");
if stale_fd != new_sock_fd {
let close_result = unsafe { libc::close(stale_fd) };
crate::assert_with_log!(close_result == 0, "close duplicated fd", 0, close_result);
}
crate::test_complete!(
"kqueue_reused_fd_cannot_register_under_new_token_until_stale_token_removed"
);
}
#[test]
fn multiple_registrations() {
init_test("kqueue_multiple_registrations");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let (sock1, _) = UnixStream::pair().expect("failed to create unix stream pair");
let (sock2, _) = UnixStream::pair().expect("failed to create unix stream pair");
let (sock3, _) = UnixStream::pair().expect("failed to create unix stream pair");
reactor
.register(&sock1, Token::new(1), Interest::READABLE)
.expect("register 1 failed");
reactor
.register(&sock2, Token::new(2), Interest::WRITABLE)
.expect("register 2 failed");
reactor
.register(&sock3, Token::new(3), Interest::both())
.expect("register 3 failed");
let count = reactor.registration_count();
crate::assert_with_log!(count == 3, "registration count", 3usize, count);
reactor
.deregister(Token::new(2))
.expect("deregister failed");
let count = reactor.registration_count();
crate::assert_with_log!(count == 2, "after deregister", 2usize, count);
reactor
.deregister(Token::new(1))
.expect("deregister failed");
reactor
.deregister(Token::new(3))
.expect("deregister failed");
let count = reactor.registration_count();
crate::assert_with_log!(count == 0, "after deregister all", 0usize, count);
crate::test_complete!("kqueue_multiple_registrations");
}
#[test]
fn interest_to_poll_event_mapping() {
init_test("kqueue_interest_to_poll_event_mapping");
let event = KqueueReactor::interest_to_poll_event(Token::new(1), Interest::READABLE);
crate::assert_with_log!(event.readable, "readable set", true, event.readable);
crate::assert_with_log!(!event.writable, "writable unset", false, event.writable);
let event = KqueueReactor::interest_to_poll_event(Token::new(2), Interest::WRITABLE);
crate::assert_with_log!(!event.readable, "readable unset", false, event.readable);
crate::assert_with_log!(event.writable, "writable set", true, event.writable);
let event = KqueueReactor::interest_to_poll_event(Token::new(3), Interest::both());
crate::assert_with_log!(event.readable, "readable set", true, event.readable);
crate::assert_with_log!(event.writable, "writable set", true, event.writable);
let event = KqueueReactor::interest_to_poll_event(Token::new(4), Interest::NONE);
crate::assert_with_log!(!event.readable, "readable unset", false, event.readable);
crate::assert_with_log!(!event.writable, "writable unset", false, event.writable);
crate::test_complete!("kqueue_interest_to_poll_event_mapping");
}
#[test]
fn poll_event_to_interest_mapping() {
init_test("kqueue_poll_event_to_interest_mapping");
let event = PollEvent::all(1);
let interest = KqueueReactor::poll_event_to_interest(&event, Some(Interest::both()));
crate::assert_with_log!(
interest.is_readable(),
"all readable",
true,
interest.is_readable()
);
crate::assert_with_log!(
interest.is_writable(),
"all writable",
true,
interest.is_writable()
);
let event = PollEvent::readable(2);
let interest = KqueueReactor::poll_event_to_interest(&event, Some(Interest::READABLE));
crate::assert_with_log!(
interest.is_readable(),
"readable set",
true,
interest.is_readable()
);
crate::assert_with_log!(
!interest.is_writable(),
"writable unset",
false,
interest.is_writable()
);
let event = PollEvent::writable(3);
let interest = KqueueReactor::poll_event_to_interest(&event, Some(Interest::WRITABLE));
crate::assert_with_log!(
!interest.is_readable(),
"readable unset",
false,
interest.is_readable()
);
crate::assert_with_log!(
interest.is_writable(),
"writable set",
true,
interest.is_writable()
);
let read_eof_event = PollEvent::all(4);
let interest =
KqueueReactor::poll_event_to_interest(&read_eof_event, Some(Interest::READABLE));
crate::assert_with_log!(
interest.is_readable(),
"read eof stays readable",
true,
interest.is_readable()
);
crate::assert_with_log!(
!interest.is_writable(),
"read eof does not invent writable readiness",
false,
interest.is_writable()
);
crate::assert_with_log!(
interest.is_hup(),
"read eof becomes hangup for readable-only registration",
true,
interest.is_hup()
);
let write_eof_event = PollEvent::all(5);
let interest =
KqueueReactor::poll_event_to_interest(&write_eof_event, Some(Interest::WRITABLE));
crate::assert_with_log!(
!interest.is_readable(),
"write eof does not invent readable readiness",
false,
interest.is_readable()
);
crate::assert_with_log!(
interest.is_writable(),
"write eof stays writable",
true,
interest.is_writable()
);
crate::assert_with_log!(
interest.is_hup(),
"write eof becomes hangup for writable-only registration",
true,
interest.is_hup()
);
crate::test_complete!("kqueue_poll_event_to_interest_mapping");
}
#[test]
fn debug_impl() {
init_test("kqueue_debug_impl");
let reactor = KqueueReactor::new().expect("failed to create reactor");
let debug_text = format!("{:?}", reactor);
crate::assert_with_log!(
debug_text.contains("KqueueReactor"),
"debug contains type",
true,
debug_text.contains("KqueueReactor")
);
crate::assert_with_log!(
debug_text.contains("registration_count"),
"debug contains registration_count",
true,
debug_text.contains("registration_count")
);
crate::test_complete!("kqueue_debug_impl");
}
}