mod os;
#[cfg(target_os = "linux")]
use os::epoll as sys;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "tvos",
target_os = "watchos",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "openbsd",
target_os = "netbsd"
))]
use os::kqueue as sys;
#[cfg(test)]
pub(crate) mod tests;
use core::slice;
use std::collections::HashMap;
use std::io;
use std::os::fd::RawFd;
use std::time::Duration;
#[inline]
fn get_errno() -> libc::c_int {
#[cfg(target_os = "linux")]
unsafe {
*libc::__errno_location()
}
#[cfg(not(target_os = "linux"))]
unsafe {
*libc::__error()
}
}
#[inline]
fn syscall_result(ret: libc::c_int) -> isize {
if ret < 0 { -(get_errno() as isize) } else { ret as isize }
}
#[inline]
fn syscall_result_ssize(ret: libc::ssize_t) -> isize {
if ret < 0 { -(get_errno() as isize) } else { ret }
}
use crate::backends::pollingv2::interest::Interest;
use crate::backends::{IoBackend, OpCompleted};
mod interest;
pub trait ReadinessPoll {
type NativeEvent;
fn add(&self, fd: RawFd, key: u64, interest: Interest) -> io::Result<()>;
fn modify(&self, fd: RawFd, key: u64, interest: Interest) -> io::Result<()>;
fn delete(&self, fd: RawFd) -> io::Result<()>;
fn delete_timer(&self, key: u64) -> io::Result<()>;
fn wait(
&self,
events: &mut [Self::NativeEvent],
timeout: Option<Duration>,
) -> io::Result<usize>;
fn notify(&self) -> io::Result<()>;
fn event_key(event: &Self::NativeEvent) -> u64;
fn event_interest(event: &Self::NativeEvent) -> Interest;
}
#[derive(Debug, Clone, Copy)]
pub struct Event {
pub key: u64,
pub interest: Interest,
}
pub struct Events {
events: Vec<<sys::OsPoller as ReadinessPoll>::NativeEvent>,
}
impl AsMut<[<sys::OsPoller as ReadinessPoll>::NativeEvent]> for Events {
fn as_mut(&mut self) -> &mut [<sys::OsPoller as ReadinessPoll>::NativeEvent] {
&mut self.events
}
}
impl Default for Events {
fn default() -> Self {
Self::with_capacity(512)
}
}
impl Events {
pub fn with_capacity(capacity: usize) -> Self {
Self { events: vec![unsafe { std::mem::zeroed() }; capacity] }
}
pub fn iter(&self) -> EventsIter<'_> {
EventsIter { events: self, index: 0 }
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
unsafe fn as_raw_buf(
&mut self,
) -> &mut [<sys::OsPoller as ReadinessPoll>::NativeEvent] {
unsafe {
slice::from_raw_parts_mut(
self.events.as_mut_ptr(),
self.events.capacity(),
)
}
}
unsafe fn set_len(&mut self, len: usize) {
assert!(len <= self.events.capacity(), "set_len: len must be <= capacity");
unsafe { self.events.set_len(len) }
}
fn clear(&mut self) {
self.events.clear();
}
fn get_event(&self, index: usize) -> Event {
assert!(index < self.events.len(), "get_event: index out of bounds");
let native_event = &self.events[index];
let key = sys::OsPoller::event_key(native_event);
let interest = sys::OsPoller::event_interest(native_event);
Event { key, interest }
}
}
pub struct EventsIter<'a> {
events: &'a Events,
index: usize,
}
impl<'a> Iterator for EventsIter<'a> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.events.len() {
return None;
}
let event = self.events.get_event(self.index);
self.index += 1;
if event.key == u64::MAX {
return self.next();
}
Some(event)
}
}
struct ImmediateCompletion {
id: u64,
result: isize,
}
#[derive(Default)]
pub struct Poller {
sys: Option<sys::OsPoller>,
fd_map: Option<HashMap<u64, RawFd>>,
op_map: std::collections::HashMap<u64, crate::op::Op>,
events: Events,
immediate: Vec<ImmediateCompletion>,
completed: Vec<OpCompleted>,
}
impl Poller {
pub fn new() -> Self {
Self::default()
}
#[inline]
fn sys(&self) -> &sys::OsPoller {
self.sys.as_ref().expect("Poller not initialized - call init() first")
}
#[inline]
fn fd_map(&mut self) -> &mut HashMap<u64, RawFd> {
self.fd_map.as_mut().expect("Poller not initialized - call init() first")
}
fn run_op_on_event(op: &crate::op::Op) -> isize {
use crate::op::Op;
use std::os::fd::AsRawFd;
match op {
Op::Read { fd, buffer } => {
let fd = fd.as_raw_fd();
let crate::op::RawBuf { ptr, len } =
unsafe { buffer.peek::<crate::op::RawBuf>() };
syscall_result_ssize(unsafe { libc::read(fd, ptr as *mut _, len) })
}
Op::Write { fd, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe { libc::write(fd, ptr as *const _, len) })
}
Op::ReadAt { fd, offset, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::pread(fd, ptr as *mut _, len, *offset)
})
}
Op::WriteAt { fd, offset, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::pwrite(fd, ptr as *const _, len, *offset)
})
}
Op::Send { fd, flags, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::send(fd, ptr as *const _, len, *flags)
})
}
Op::Recv { fd, flags, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::recv(fd, ptr as *mut _, len, *flags)
})
}
Op::Accept { fd, addr, len } => unsafe {
syscall_result(libc::accept(fd.as_raw_fd(), *addr as *mut _, *len))
},
Op::Timeout { .. } => 0,
Op::Connect { fd, addr, len, connect_called } => {
let fd = fd.as_raw_fd();
unsafe {
let ret = libc::connect(fd, *addr as *const libc::sockaddr, *len);
if ret < 0 {
let err = get_errno();
if err == libc::EINPROGRESS {
return -(err as isize);
}
if err == libc::EISCONN && *connect_called {
return 0;
}
return -(err as isize);
}
ret as isize
}
}
_ => panic!("run_op_on_event called for non-event op"),
}
}
fn run_op_blocking(op: crate::op::Op) -> isize {
use crate::op::Op;
use std::os::fd::AsRawFd;
match op {
Op::Read { fd, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe { libc::read(fd, ptr as *mut _, len) })
}
Op::Write { fd, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe { libc::write(fd, ptr as *const _, len) })
}
Op::ReadAt { fd, offset, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::pread(fd, ptr as *mut _, len, offset)
})
}
Op::WriteAt { fd, offset, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::pwrite(fd, ptr as *const _, len, offset)
})
}
Op::Send { fd, flags, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::send(fd, ptr as *const _, len, flags)
})
}
Op::Recv { fd, flags, buffer } => {
let fd = fd.as_raw_fd();
let (ptr, len) = unsafe { buffer.peek::<(*mut u8, usize)>() };
syscall_result_ssize(unsafe {
libc::recv(fd, ptr as *mut _, len, flags)
})
}
Op::Accept { fd, addr, len } => unsafe {
syscall_result(libc::accept(fd.as_raw_fd(), addr as *mut _, len))
},
Op::Connect { fd, addr, len, connect_called } => {
let fd = fd.as_raw_fd();
unsafe {
let ret = libc::connect(fd, addr as *const libc::sockaddr, len);
if ret < 0 {
let err = get_errno();
if err == libc::EINPROGRESS {
return -(err as isize);
}
if err == libc::EISCONN && connect_called {
return 0;
}
return -(err as isize);
}
ret as isize
}
}
Op::Bind { fd, addr, addrlen } => unsafe {
syscall_result(libc::bind(
fd.as_raw_fd(),
addr as *const libc::sockaddr,
addrlen,
))
},
Op::Listen { fd, backlog } => unsafe {
syscall_result(libc::listen(fd.as_raw_fd(), backlog))
},
Op::Shutdown { fd, how } => unsafe {
syscall_result(libc::shutdown(fd.as_raw_fd(), how))
},
Op::Socket { domain, ty, proto } => unsafe {
syscall_result(libc::socket(domain, ty, proto))
},
Op::OpenAt { dir_fd, path, flags } => unsafe {
syscall_result(libc::openat(dir_fd.as_raw_fd(), path, flags))
},
Op::Close { fd } => unsafe { syscall_result(libc::close(fd)) },
Op::Fsync { fd } => unsafe {
syscall_result(libc::fsync(fd.as_raw_fd()))
},
Op::Truncate { fd, size } => unsafe {
syscall_result(libc::ftruncate(fd.as_raw_fd(), size as libc::off_t))
},
Op::LinkAt { old_dir_fd, old_path, new_dir_fd, new_path } => unsafe {
syscall_result(libc::linkat(
old_dir_fd.as_raw_fd(),
old_path,
new_dir_fd.as_raw_fd(),
new_path,
0,
))
},
Op::SymlinkAt { target, linkpath, dir_fd } => unsafe {
syscall_result(libc::symlinkat(target, dir_fd.as_raw_fd(), linkpath))
},
#[cfg(target_os = "linux")]
Op::Tee { fd_in, fd_out, size } => unsafe {
syscall_result_ssize(libc::tee(
fd_in.as_raw_fd(),
fd_out.as_raw_fd(),
size as libc::size_t,
0,
))
},
Op::Timeout { duration, .. } => {
std::thread::sleep(duration);
0
}
Op::Nop => 0,
}
}
}
impl IoBackend for Poller {
fn init(&mut self, cap: usize) -> io::Result<()> {
self.sys = Some(sys::OsPoller::new()?);
self.fd_map = Some(HashMap::with_capacity(cap));
self.op_map = std::collections::HashMap::with_capacity(cap);
self.events = Events::with_capacity(cap.min(4096));
self.immediate = Vec::with_capacity(64);
self.completed = Vec::with_capacity(cap.min(256));
Ok(())
}
fn push(&mut self, id: u64, op: crate::op::Op) -> io::Result<()> {
use crate::backends::pollingv2::interest::Interest;
use crate::op::Op;
use std::os::fd::AsRawFd;
let fd_and_interest = match &op {
Op::ReadAt { .. }
| Op::WriteAt { .. }
| Op::Read { .. }
| Op::Write { .. } => {
let result = Poller::run_op_blocking(op);
self.immediate.push(ImmediateCompletion { id, result });
return Ok(());
}
Op::Send { fd, .. } => Some((fd.as_raw_fd(), Interest::WRITE)),
Op::Recv { fd, .. } => Some((fd.as_raw_fd(), Interest::READ)),
Op::Accept { fd, .. } => Some((fd.as_raw_fd(), Interest::READ)),
Op::Connect { .. } => None,
Op::Bind { .. }
| Op::Listen { .. }
| Op::Shutdown { .. }
| Op::OpenAt { .. }
| Op::Close { .. }
| Op::Fsync { .. }
| Op::Truncate { .. } => {
let result = Poller::run_op_blocking(op);
self.immediate.push(ImmediateCompletion { id, result });
return Ok(());
}
#[cfg(target_os = "linux")]
Op::Tee { fd_in, .. } => {
Some((fd_in.as_raw_fd(), Interest::READ_AND_WRITE))
}
Op::Timeout { .. } => None,
Op::Nop => {
let result = Poller::run_op_blocking(op);
self.immediate.push(ImmediateCompletion { id, result });
return Ok(());
}
Op::Socket { .. } => None,
Op::LinkAt { .. } | Op::SymlinkAt { .. } => {
let result = Poller::run_op_blocking(op);
self.immediate.push(ImmediateCompletion { id, result });
return Ok(());
}
};
self.op_map.insert(id, op);
if let Some((fd, interest)) = fd_and_interest {
self.fd_map().insert(id, fd);
if let Err(e) = self.sys().add(fd, id, interest) {
self.fd_map().remove(&id);
let op = self.op_map.remove(&id).unwrap();
let errno = e.raw_os_error().unwrap_or(libc::EIO);
let result = Poller::run_op_blocking(op);
let final_result = if result < 0 { result } else { -(errno as isize) };
self.immediate.push(ImmediateCompletion { id, result: final_result });
return Ok(());
}
} else {
match &self.op_map[&id] {
Op::Connect { fd, .. } => {
let fd = fd.as_raw_fd();
let result =
Poller::run_op_blocking(self.op_map.remove(&id).unwrap());
if result == -(libc::EINPROGRESS as isize) {
self.fd_map().insert(id, fd);
self.sys().add(fd, id, Interest::WRITE)?;
} else {
self.immediate.push(ImmediateCompletion { id, result });
}
}
Op::Timeout { duration, .. } => {
let duration_ms = duration.as_millis() as RawFd;
self.fd_map().insert(id, duration_ms);
self.sys().add(duration_ms, id, Interest::TIMER)?;
}
Op::Socket { .. } => {
let result =
Poller::run_op_blocking(self.op_map.remove(&id).unwrap());
self.immediate.push(ImmediateCompletion { id, result });
}
_ => {}
}
}
Ok(())
}
fn flush(&mut self) -> io::Result<usize> {
Ok(0)
}
fn wait_timeout(
&mut self,
timeout: Option<Duration>,
) -> io::Result<&[OpCompleted]> {
self.completed.clear();
for imm in self.immediate.drain(..) {
self.completed.push(OpCompleted::new(imm.id, imm.result));
}
self.events.clear();
let events = unsafe { self.events.as_raw_buf() };
let items_written = match self.sys.as_ref().unwrap().wait(events, timeout) {
Ok(n) => n,
Err(e) => {
if !self.completed.is_empty() {
return Ok(self.completed.as_ref());
}
return Err(e);
}
};
unsafe { self.events.set_len(items_written) };
let events_to_process: Vec<_> = self.events.iter().collect();
for event in events_to_process {
let operation_id = event.key;
if operation_id == u64::MAX {
continue;
}
let Some(entry_fd) = self.fd_map().get(&operation_id).copied() else {
panic!("couldn't find fd for operation {}", operation_id);
};
let op = match self.op_map.remove(&operation_id) {
Some(op) => op,
None => {
panic!("couldn't find op for operation {}", operation_id);
}
};
let result = Poller::run_op_on_event(&op);
if result < 0 {
let errno = (-result) as i32;
if errno == libc::EAGAIN
|| errno == libc::EWOULDBLOCK
|| errno == libc::EINPROGRESS
{
self.op_map.insert(operation_id, op);
self.sys().modify(entry_fd, operation_id, event.interest)?;
continue;
}
}
if event.interest.is_timer() {
self.sys().delete_timer(operation_id)?;
} else {
self.sys().delete(entry_fd)?;
}
let was_deleted = self.fd_map().remove(&operation_id).is_some();
assert!(was_deleted);
self.completed.push(OpCompleted::new(operation_id, result));
}
Ok(self.completed.as_ref())
}
}
#[cfg(test)]
mod unit_tests {
use super::*;
#[test]
fn test_init() {
let mut backend = Poller::new();
backend.init(64).unwrap();
}
}