#![cfg(any(
target_os = "macos",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd"
))]
use crate::collections::HashMap;
use crate::future::Future;
use crate::io;
use crate::pin::Pin;
use crate::task::{Context, Poll, Waker};
use super::executor::with_driver;
use super::ready::{consume_ready, mark_ready};
#[repr(C)]
#[derive(Copy, Clone)]
struct Kevent {
ident: usize,
filter: i16,
flags: u16,
fflags: u32,
data: isize,
udata: usize,
}
#[repr(C)]
struct Timespec {
tv_sec: i64,
tv_nsec: i64,
}
unsafe extern "C" {
fn kqueue() -> i32;
fn kevent(
kq: i32,
changelist: *const Kevent,
nchanges: i32,
eventlist: *mut Kevent,
nevents: i32,
timeout: *const Timespec,
) -> i32;
fn close(fd: i32) -> i32;
}
const EVFILT_READ: i16 = -1;
const EVFILT_WRITE: i16 = -2;
const EV_ADD: u16 = 0x0001;
const EV_ONESHOT: u16 = 0x0010;
pub struct Driver {
kq_fd: i32,
wakers: HashMap<usize, Waker>,
}
impl Driver {
pub fn new() -> io::Result<Self> {
let kq_fd = unsafe { kqueue() };
if kq_fd < 0 {
return Err(io::Error::last_os_error());
}
Ok(Self {
kq_fd,
wakers: HashMap::new(),
})
}
fn register_filter(&self, fd: i32, filter: i16) -> io::Result<()> {
let ev = Kevent {
ident: fd as usize,
filter,
flags: EV_ADD | EV_ONESHOT,
fflags: 0,
data: 0,
udata: fd as usize,
};
let r = unsafe {
kevent(
self.kq_fd,
&ev,
1,
core::ptr::null_mut(),
0,
core::ptr::null(),
)
};
if r < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
pub fn register_read(&self, fd: i32) -> io::Result<()> {
self.register_filter(fd, EVFILT_READ)
}
pub fn register_write(&self, fd: i32) -> io::Result<()> {
self.register_filter(fd, EVFILT_WRITE)
}
pub(crate) fn register_waker(&mut self, token: usize, waker: Waker) {
self.wakers.insert(token, waker);
}
pub fn poll_nonblocking(&mut self) -> io::Result<bool> {
let zero_timeout = Timespec {
tv_sec: 0,
tv_nsec: 0,
};
let mut evbuf = [Kevent {
ident: 0,
filter: 0,
flags: 0,
fflags: 0,
data: 0,
udata: 0,
}; 64];
let n = unsafe {
kevent(
self.kq_fd,
core::ptr::null(),
0,
evbuf.as_mut_ptr(),
evbuf.len() as i32,
&zero_timeout,
)
};
if n < 0 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::Interrupted {
return Ok(false);
}
return Err(e);
}
for ev in &evbuf[..n as usize] {
let token = ev.udata;
mark_ready(token as u64);
if let Some(waker) = self.wakers.remove(&token) {
waker.wake();
}
}
Ok(n > 0)
}
}
impl Drop for Driver {
fn drop(&mut self) {
unsafe { close(self.kq_fd) };
}
}
pub struct WaitReadable {
fd: i32,
registered: bool,
}
impl WaitReadable {
pub fn new(fd: i32) -> Self {
Self {
fd,
registered: false,
}
}
}
impl Future for WaitReadable {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let token = self.fd as usize;
if consume_ready(token as u64) {
return Poll::Ready(Ok(()));
}
let _ = with_driver(|d| d.register_waker(token, cx.waker().clone()));
if !self.registered {
if let Err(e) = with_driver(|d| d.register_read(self.fd))
.unwrap_or_else(|| Err(io::Error::other("kqueue: no driver")))
{
return Poll::Ready(Err(e));
}
self.registered = true;
}
Poll::Pending
}
}
pub struct WaitWritable {
fd: i32,
registered: bool,
}
impl WaitWritable {
pub fn new(fd: i32) -> Self {
Self {
fd,
registered: false,
}
}
}
impl Future for WaitWritable {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let token = self.fd as usize;
if consume_ready(token as u64) {
return Poll::Ready(Ok(()));
}
let _ = with_driver(|d| d.register_waker(token, cx.waker().clone()));
if !self.registered {
if let Err(e) = with_driver(|d| d.register_write(self.fd))
.unwrap_or_else(|| Err(io::Error::other("kqueue: no driver")))
{
return Poll::Ready(Err(e));
}
self.registered = true;
}
Poll::Pending
}
}