#![cfg(any(
target_os = "macos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly"
))]
use std::cell::UnsafeCell;
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::time::Duration;
use crate::driver::{CompletionEntry, Driver, ERROR_TRANSPORT, Interest, SubmitEntry};
const MIN_KQUEUE_SIZE: u32 = 32;
struct KqueueState {
submit_head: AtomicUsize,
submit_tail: AtomicUsize,
completion_head: AtomicUsize,
completion_tail: AtomicU32,
}
struct CompletionQueue {
entries: Box<[Option<CompletionEntry>]>,
}
unsafe impl Send for CompletionQueue {}
unsafe impl Sync for CompletionQueue {}
impl CompletionQueue {
fn new(capacity: usize) -> Self {
Self {
entries: vec![None; capacity].into_boxed_slice(),
}
}
fn get(&self, index: usize) -> Option<&CompletionEntry> {
self.entries[index].as_ref()
}
unsafe fn set(&self, index: usize, entry: Option<CompletionEntry>) {
let ptr = self.entries.as_ptr().cast_mut();
*ptr.add(index) = entry;
}
}
pub struct KqueueDriver {
kqueue_fd: RawFd,
submit_queue: UnsafeCell<Vec<SubmitEntry>>,
completion_queue: CompletionQueue,
capacity: usize,
capacity_mask: usize,
state: Arc<KqueueState>,
event_buffer: UnsafeCell<Vec<libc::kevent>>,
#[allow(dead_code)]
change_buffer: UnsafeCell<Vec<libc::kevent>>,
}
unsafe impl Send for KqueueDriver {}
unsafe impl Sync for KqueueDriver {}
impl KqueueDriver {
pub fn new() -> std::io::Result<Self> {
Self::with_config(crate::driver::DriverConfig::default())
}
pub fn with_config(config: crate::driver::DriverConfig) -> std::io::Result<Self> {
let kqueue_fd = unsafe { libc::kqueue() };
if kqueue_fd < 0 {
return Err(std::io::Error::last_os_error());
}
unsafe {
let flags = libc::fcntl(kqueue_fd, libc::F_GETFD);
if flags >= 0 {
libc::fcntl(kqueue_fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
}
}
#[cfg(target_os = "freebsd")]
if let Some(_core) = config.cpu_affinity {
if let Err(e) = Self::set_cpu_affinity(_core) {
eprintln!("Warning: Failed to set CPU affinity: {}", e);
}
}
let capacity = config.entries.max(MIN_KQUEUE_SIZE) as usize;
let capacity_mask = capacity - 1;
Ok(Self {
kqueue_fd,
submit_queue: UnsafeCell::new(vec![SubmitEntry::new(-1, 0, 0); capacity]),
completion_queue: CompletionQueue::new(capacity),
capacity,
capacity_mask,
state: Arc::new(KqueueState {
submit_head: AtomicUsize::new(0),
submit_tail: AtomicUsize::new(0),
completion_head: AtomicUsize::new(0),
completion_tail: AtomicU32::new(0),
}),
event_buffer: UnsafeCell::new(vec![
libc::kevent {
ident: 0,
filter: 0,
flags: 0,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
capacity
]),
change_buffer: UnsafeCell::new(vec![
libc::kevent {
ident: 0,
filter: 0,
flags: 0,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
16 ]),
})
}
#[cfg(target_os = "freebsd")]
fn set_cpu_affinity(core: usize) -> std::io::Result<()> {
unsafe {
let mut cpuset: libc::cpuset_t = std::mem::zeroed();
libc::CPU_ZERO(&mut cpuset);
libc::CPU_SET(core % libc::CPU_SETSIZE as usize, &mut cpuset);
let result = libc::cpuset_setaffinity(
libc::CP_WHICH,
libc::CPU_LEVEL_WHICH,
libc::CPU_LEVEL_SIZE,
std::thread::current().id() as libc::pid_t,
std::mem::size_of::<libc::cpuset_t>(),
&cpuset as *const _ as *const _,
);
if result < 0 {
return Err(std::io::Error::last_os_error());
}
}
Ok(())
}
#[inline]
fn submit_pos(&self, index: usize) -> usize {
index & self.capacity_mask
}
#[inline]
fn completion_pos(&self, index: usize) -> usize {
index & self.capacity_mask
}
fn interest_to_kqueue(&self, interest: Interest) -> (i16, u16) {
let mut filter = 0;
let mut flags = libc::EV_ADD | libc::EV_RECEIPT;
if interest.readable {
filter |= libc::EVFILT_READ;
}
if interest.writable {
filter |= libc::EVFILT_WRITE;
}
if interest.oneshot {
flags |= libc::EV_ONESHOT;
}
if interest.edge {
flags |= libc::EV_CLEAR;
}
if interest.priority {
flags |= libc::EV_DISPATCH;
}
(filter, flags)
}
fn wait_internal(&self, timeout_ms: Option<i32>) -> std::io::Result<usize> {
let event_buffer = unsafe { &mut *self.event_buffer.get() };
let ptr = event_buffer.as_mut_ptr();
let len = event_buffer.len() as i32;
let timeout_ptr = if let Some(ms) = timeout_ms {
let mut timeout = libc::timespec {
tv_sec: (ms / 1000) as libc::time_t,
tv_nsec: ((ms % 1000) * 1_000_000) as libc::c_long,
};
&mut timeout as *mut _
} else {
std::ptr::null_mut()
};
let result =
unsafe { libc::kevent(self.kqueue_fd, std::ptr::null(), 0, ptr, len, timeout_ptr) };
if result < 0 {
return Err(std::io::Error::last_os_error());
}
let count = result as usize;
for i in 0..count {
let event = &event_buffer[i];
let tail = self.state.completion_tail.load(Ordering::Acquire) as usize;
let pos = self.completion_pos(tail);
let result = if event.flags & (libc::EV_ERROR | libc::EV_EOF) != 0 {
ERROR_TRANSPORT
} else {
if event.data != 0 {
event.data as i32
} else {
1 }
};
unsafe {
self.completion_queue.set(
pos,
Some(CompletionEntry {
user_data: event.udata as u64,
result,
flags: event.flags as u32,
}),
);
}
self.state
.completion_tail
.store((tail + 1) as u32, Ordering::Release);
}
Ok(count)
}
}
impl Drop for KqueueDriver {
fn drop(&mut self) {
if self.kqueue_fd >= 0 {
unsafe {
libc::close(self.kqueue_fd);
}
}
}
}
impl AsRawFd for KqueueDriver {
fn as_raw_fd(&self) -> RawFd {
self.kqueue_fd
}
}
impl Driver for KqueueDriver {
fn submit(&self) -> std::io::Result<usize> {
let mut submitted = 0;
let head = self.state.submit_head.load(Ordering::Acquire);
let tail = self.state.submit_tail.load(Ordering::Acquire);
let mut idx = head;
while idx != tail {
let pos = self.submit_pos(idx);
let submit_queue = unsafe { &*self.submit_queue.get() };
let entry = &submit_queue[pos];
if entry.fd >= 0 {
let (filter, flags) = match entry.opcode {
crate::driver::opcode::READ => {
(libc::EVFILT_READ, libc::EV_ADD | libc::EV_ONESHOT)
},
crate::driver::opcode::WRITE => {
(libc::EVFILT_WRITE, libc::EV_ADD | libc::EV_ONESHOT)
},
_ => (0, 0),
};
let mut change = libc::kevent {
ident: entry.fd as libc::uintptr_t,
filter,
flags,
fflags: 0,
data: 0,
udata: entry.user_data as *mut _,
};
let result = unsafe {
libc::kevent(
self.kqueue_fd,
&change,
1,
std::ptr::null_mut(),
0,
std::ptr::null_mut(),
)
};
if result < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::NotFound {
change.flags = libc::EV_ADD | libc::EV_ONESHOT;
let add_result = unsafe {
libc::kevent(
self.kqueue_fd,
&change,
1,
std::ptr::null_mut(),
0,
std::ptr::null_mut(),
)
};
if add_result < 0 {
return Err(err);
}
} else {
return Err(err);
}
}
submitted += 1;
}
idx += 1;
}
self.state.submit_head.store(tail, Ordering::Release);
Ok(submitted)
}
fn wait(&self) -> std::io::Result<usize> {
self.wait_internal(None)
}
fn wait_timeout(&self, duration: Duration) -> std::io::Result<(usize, bool)> {
let timeout_ms = duration.as_millis().min(i32::MAX as u128) as i32;
let result = self.wait_internal(Some(timeout_ms))?;
let head = self.state.completion_head.load(Ordering::Acquire) as u32;
let tail = self.state.completion_tail.load(Ordering::Acquire);
Ok((result, head == tail))
}
fn get_submission(&self) -> Option<&mut SubmitEntry> {
let tail = self.state.submit_tail.load(Ordering::Acquire);
let next_tail = tail + 1;
let head = self.state.submit_head.load(Ordering::Acquire);
if next_tail - head > self.capacity {
return None;
}
match self.state.submit_tail.compare_exchange(
tail,
next_tail,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let pos = self.submit_pos(tail);
unsafe {
let submit_queue = &mut *self.submit_queue.get();
Some(&mut submit_queue[pos])
}
},
Err(_) => None,
}
}
fn get_completion(&self) -> Option<&CompletionEntry> {
let head = self.state.completion_head.load(Ordering::Acquire);
let tail = self.state.completion_tail.load(Ordering::Acquire) as usize;
if head == tail {
return None;
}
let pos = self.completion_pos(head);
self.completion_queue.get(pos)
}
fn advance_completion(&self) {
let head = self.state.completion_head.load(Ordering::Acquire);
let tail = self.state.completion_tail.load(Ordering::Acquire) as usize;
if head != tail {
let pos = self.completion_pos(head);
unsafe {
self.completion_queue.set(pos, None);
}
let new_head = head + 1;
self.state
.completion_head
.store(new_head, Ordering::Release);
}
}
fn register(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
let (filter, flags) = self.interest_to_kqueue(interest);
let change = libc::kevent {
ident: fd as libc::uintptr_t,
filter,
flags,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
let result = unsafe {
libc::kevent(self.kqueue_fd, &change, 1, std::ptr::null_mut(), 0, std::ptr::null_mut())
};
if result < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
fn deregister(&self, fd: RawFd) -> std::io::Result<()> {
let change = libc::kevent {
ident: fd as libc::uintptr_t,
filter: 0,
flags: libc::EV_DELETE,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
let result = unsafe {
libc::kevent(self.kqueue_fd, &change, 1, std::ptr::null_mut(), 0, std::ptr::null_mut())
};
if result < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
fn modify(&self, fd: RawFd, interest: Interest) -> std::io::Result<()> {
self.deregister(fd)?;
self.register(fd, interest)
}
fn submission_capacity(&self) -> usize {
self.capacity
}
fn completion_capacity(&self) -> usize {
self.capacity
}
fn supports_operation(&self, opcode: u8) -> bool {
matches!(
opcode,
crate::driver::opcode::READ
| crate::driver::opcode::WRITE
| crate::driver::opcode::CLOSE
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kqueue_driver_creation() {
let driver = KqueueDriver {
kqueue_fd: -1,
submit_queue: UnsafeCell::new(vec![SubmitEntry::new(-1, 0, 0); 256]),
completion_queue: CompletionQueue::new(256),
capacity: 256,
capacity_mask: 255,
state: Arc::new(KqueueState {
submit_head: AtomicUsize::new(0),
submit_tail: AtomicUsize::new(0),
completion_head: AtomicUsize::new(0),
completion_tail: AtomicU32::new(0),
}),
event_buffer: UnsafeCell::new(Vec::new()),
change_buffer: UnsafeCell::new(Vec::new()),
};
assert_eq!(driver.capacity, 256);
assert_eq!(driver.capacity_mask, 255);
}
#[test]
fn test_kqueue_driver_with_config() {
let cap = 128;
let driver = KqueueDriver {
kqueue_fd: -1,
submit_queue: UnsafeCell::new(vec![SubmitEntry::new(-1, 0, 0); cap]),
completion_queue: CompletionQueue::new(cap),
capacity: cap,
capacity_mask: cap - 1,
state: Arc::new(KqueueState {
submit_head: AtomicUsize::new(0),
submit_tail: AtomicUsize::new(0),
completion_head: AtomicUsize::new(0),
completion_tail: AtomicU32::new(0),
}),
event_buffer: UnsafeCell::new(Vec::new()),
change_buffer: UnsafeCell::new(Vec::new()),
};
assert_eq!(driver.capacity, 128);
}
#[test]
fn test_ring_buffer_positions() {
let driver = KqueueDriver {
kqueue_fd: -1, submit_queue: UnsafeCell::new(Vec::new()),
completion_queue: CompletionQueue::new(256),
capacity: 256,
capacity_mask: 255,
state: Arc::new(KqueueState {
submit_head: AtomicUsize::new(0),
submit_tail: AtomicUsize::new(0),
completion_head: AtomicUsize::new(0),
completion_tail: AtomicU32::new(0),
}),
event_buffer: UnsafeCell::new(Vec::new()),
change_buffer: UnsafeCell::new(Vec::new()),
};
assert_eq!(driver.submit_pos(0), 0);
assert_eq!(driver.submit_pos(255), 255);
assert_eq!(driver.submit_pos(256), 0);
assert_eq!(driver.submit_pos(257), 1);
}
}