#![cfg(target_os = "linux")]
use crate::collections::HashMap;
use crate::future::Future;
use crate::io;
use crate::pin::Pin;
use crate::task::{Context, Poll};
use super::executor::with_driver;
use super::ready::{consume_ready, mark_ready};
const EPOLLIN: u32 = 0x0001;
const EPOLLOUT: u32 = 0x0004;
const EPOLLONESHOT: u32 = 1 << 30;
const EPOLL_CTL_ADD: i32 = 1;
const EPOLL_CTL_MOD: i32 = 3;
const EEXIST: i32 = 17;
#[cfg_attr(any(target_arch = "x86", target_arch = "x86_64"), repr(C, packed))]
#[cfg_attr(not(any(target_arch = "x86", target_arch = "x86_64")), repr(C))]
#[derive(Copy, Clone)]
struct EpollEvent {
events: u32,
data: u64,
}
unsafe extern "C" {
fn epoll_create1(flags: i32) -> i32;
fn epoll_ctl(epfd: i32, op: i32, fd: i32, event: *mut EpollEvent) -> i32;
fn epoll_wait(epfd: i32, events: *mut EpollEvent, maxevents: i32, timeout: i32) -> i32;
pub(crate) fn close(fd: i32) -> i32;
}
pub struct Driver {
epfd: i32,
registered_fds: HashMap<i32, ()>,
wakers: HashMap<u64, crate::task::Waker>,
next_token: u64,
}
impl Driver {
pub fn new() -> io::Result<Self> {
let epfd = unsafe { epoll_create1(0) };
if epfd < 0 {
return Err(io::Error::last_os_error());
}
Ok(Self {
epfd,
registered_fds: HashMap::new(),
wakers: HashMap::new(),
next_token: 1,
})
}
fn do_register(&mut self, fd: i32, events: u32, token: u64) -> io::Result<()> {
let mut ev = EpollEvent {
events: events | EPOLLONESHOT,
data: token,
};
let op = if self.registered_fds.contains_key(&fd) {
EPOLL_CTL_MOD
} else {
EPOLL_CTL_ADD
};
let r = unsafe { epoll_ctl(self.epfd, op, fd, &mut ev) };
if r < 0 {
let e = io::Error::last_os_error();
if e.raw_os_error() == Some(EEXIST) {
let r2 = unsafe { epoll_ctl(self.epfd, EPOLL_CTL_MOD, fd, &mut ev) };
if r2 < 0 {
return Err(io::Error::last_os_error());
}
} else {
return Err(e);
}
}
self.registered_fds.insert(fd, ());
Ok(())
}
pub fn register_read(&mut self, fd: i32) -> io::Result<u64> {
let token = self.next_token;
self.next_token += 1;
self.do_register(fd, EPOLLIN, token)?;
Ok(token)
}
pub fn register_write(&mut self, fd: i32) -> io::Result<u64> {
let token = self.next_token;
self.next_token += 1;
self.do_register(fd, EPOLLOUT, token)?;
Ok(token)
}
pub(crate) fn register_waker(&mut self, token: u64, waker: crate::task::Waker) {
self.wakers.insert(token, waker);
}
pub fn poll_nonblocking(&mut self) -> io::Result<bool> {
let mut evbuf = [EpollEvent { events: 0, data: 0 }; 64];
let n = loop {
let n = unsafe {
epoll_wait(
self.epfd,
evbuf.as_mut_ptr(),
evbuf.len() as i32,
0, )
};
if n >= 0 {
break n;
}
let e = io::Error::last_os_error();
if e.kind() == crate::io::ErrorKind::Interrupted {
continue;
}
return Err(e);
};
for ev in &evbuf[..n as usize] {
let token = unsafe { core::ptr::read_unaligned(core::ptr::addr_of!(ev.data)) };
mark_ready(token);
if let Some(waker) = self.wakers.remove(&token) {
waker.wake();
}
}
Ok(n > 0)
}
}
impl Drop for Driver {
fn drop(&mut self) {
unsafe { close(self.epfd) };
}
}
pub struct WaitReadable {
fd: i32,
token: u64,
registered: bool,
}
impl WaitReadable {
pub fn new(fd: i32) -> Self {
Self {
fd,
token: 0,
registered: false,
}
}
}
impl Future for WaitReadable {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.registered {
if consume_ready(self.token) {
return Poll::Ready(Ok(()));
}
let _ = with_driver(|d| d.register_waker(self.token, cx.waker().clone()));
return Poll::Pending;
}
match with_driver(|d| d.register_read(self.fd)) {
Ok(Ok(token)) => {
self.token = token;
self.registered = true;
let _ = with_driver(|d| d.register_waker(token, cx.waker().clone()));
Poll::Pending
}
Ok(Err(e)) | Err(e) => Poll::Ready(Err(e)),
}
}
}
pub struct WaitWritable {
fd: i32,
token: u64,
registered: bool,
}
impl WaitWritable {
pub fn new(fd: i32) -> Self {
Self {
fd,
token: 0,
registered: false,
}
}
}
impl Future for WaitWritable {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.registered {
if consume_ready(self.token) {
return Poll::Ready(Ok(()));
}
let _ = with_driver(|d| d.register_waker(self.token, cx.waker().clone()));
return Poll::Pending;
}
match with_driver(|d| d.register_write(self.fd)) {
Ok(Ok(token)) => {
self.token = token;
self.registered = true;
let _ = with_driver(|d| d.register_waker(token, cx.waker().clone()));
Poll::Pending
}
Ok(Err(e)) | Err(e) => Poll::Ready(Err(e)),
}
}
}