use std::os::unix::io::RawFd;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, io, isize, ptr};
use super::{from_nix_error, timeout_handler, EventData, IoData, TimerList};
use crate::coroutine_impl::run_coroutine;
use crate::scheduler::get_scheduler;
use crate::std::queue::seg_queue::SegQueue as mpsc;
use crate::timeout_list::{now, ns_to_ms};
use libc::{eventfd, EFD_NONBLOCK};
use nix::sys::epoll::*;
use nix::unistd::{close, read, write};
use smallvec::SmallVec;
fn create_eventfd() -> io::Result<RawFd> {
let fd = unsafe { eventfd(0, EFD_NONBLOCK) };
if fd < 0 {
return Err(io::Error::last_os_error());
}
Ok(fd as RawFd)
}
pub type SysEvent = EpollEvent;
struct SingleSelector {
epfd: RawFd,
evfd: RawFd,
timer_list: TimerList,
free_ev: mpsc<Arc<EventData>>,
}
impl SingleSelector {
pub fn new() -> io::Result<Self> {
let mut info = EpollEvent::new(EpollFlags::EPOLLET | EpollFlags::EPOLLIN, 0);
let epfd = epoll_create().map_err(from_nix_error)?;
let evfd = create_eventfd()?;
epoll_ctl(epfd, EpollOp::EpollCtlAdd, evfd, &mut info).map_err(from_nix_error)?;
Ok(SingleSelector {
epfd,
evfd,
free_ev: mpsc::new(),
timer_list: TimerList::new(),
})
}
}
impl Drop for SingleSelector {
fn drop(&mut self) {
let _ = close(self.evfd);
let _ = close(self.epfd);
}
}
pub struct Selector {
vec: Vec<SingleSelector>,
}
impl Selector {
pub fn new(io_workers: usize) -> io::Result<Self> {
let mut s = Selector {
vec: Vec::with_capacity(io_workers),
};
for _ in 0..io_workers {
let ss = SingleSelector::new()?;
s.vec.push(ss);
}
Ok(s)
}
pub fn select(
&self,
id: usize,
events: &mut [SysEvent],
timeout: Option<u64>,
) -> io::Result<Option<u64>> {
let timeout_ms = timeout
.map(|to| cmp::min(ns_to_ms(to), isize::MAX as u64) as isize)
.unwrap_or(-1);
let mask = self.vec.len() + id;
let single_selector = unsafe { self.vec.get_unchecked(id) };
let epfd = single_selector.epfd;
let scheduler = get_scheduler();
scheduler
.workers
.parked
.fetch_or(mask as u64, Ordering::Relaxed);
let n = epoll_wait(epfd, events, timeout_ms).map_err(from_nix_error)?;
scheduler
.workers
.parked
.fetch_and((mask - self.vec.len()) as u64, Ordering::Relaxed);
for event in events[..n].iter() {
if event.data() == 0 {
{
let mut buf = [0u8; 8];
read(single_selector.evfd, &mut buf).ok();
continue;
}
}
let data = unsafe { &mut *(event.data() as *mut EventData) };
data.io_flag.store(true, Ordering::Release);
let co = match data.co.take() {
None => continue,
Some(co) => co,
};
co.prefetch();
data.timer.borrow_mut().take().map(|h| {
unsafe {
h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
}
h.remove()
});
run_coroutine(co);
}
scheduler.run_queued_tasks(id);
self.free_unused_event_data(id);
let next_expire = single_selector
.timer_list
.schedule_timer(now(), &timeout_handler);
Ok(next_expire)
}
#[inline]
pub fn wakeup(&self, id: usize) {
let buf = unsafe { ::std::slice::from_raw_parts(&1u64 as *const u64 as _, 8) };
let ret = write(unsafe { self.vec.get_unchecked(id) }.evfd, buf);
}
#[inline]
pub fn add_fd(&self, io_data: IoData) -> io::Result<IoData> {
let mut info = EpollEvent::new(
EpollFlags::EPOLLIN
| EpollFlags::EPOLLOUT
| EpollFlags::EPOLLRDHUP
| EpollFlags::EPOLLET,
io_data.as_ref() as *const _ as _,
);
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let single_selector = unsafe { self.vec.get_unchecked(id) };
let epfd = single_selector.epfd;
epoll_ctl(epfd, EpollOp::EpollCtlAdd, fd, &mut info)
.map_err(from_nix_error)
.map(|_| io_data)
}
#[inline]
pub fn del_fd(&self, io_data: &IoData) {
use std::ops::Deref;
let mut info = EpollEvent::empty();
if let Some(h) = io_data.timer.borrow_mut().take() {
unsafe {
h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
}
}
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let single_selector = unsafe { self.vec.get_unchecked(id) };
let epfd = single_selector.epfd;
epoll_ctl(epfd, EpollOp::EpollCtlDel, fd, &mut info).ok();
single_selector.free_ev.push(io_data.deref().clone());
}
#[inline]
fn free_unused_event_data(&self, id: usize) {
let free_ev = &unsafe { self.vec.get_unchecked(id) }.free_ev;
while free_ev.pop().is_some() {}
}
#[inline]
pub fn add_io_timer(&self, io: &IoData, timeout: Duration) {
let id = io.fd as usize % self.vec.len();
let (h, b_new) = unsafe { self.vec.get_unchecked(id) }
.timer_list
.add_timer(timeout, io.timer_data());
if b_new {
self.wakeup(id);
}
io.timer.borrow_mut().replace(h);
}
}