use std::io;
use std::os::fd::AsFd;
use std::os::fd::BorrowedFd;
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[cfg(feature = "io_timeout")]
use std::time::Duration;
use super::{from_nix_error, EventData, IoData};
#[cfg(feature = "io_timeout")]
use super::{timeout_handler, TimerList};
use crate::scheduler::Scheduler;
#[cfg(feature = "io_timeout")]
use crate::timeout_list::now;
use may_queue::mpsc::Queue;
use nix::sys::epoll::*;
use nix::sys::eventfd::*;
use nix::unistd::{read, write};
use smallvec::SmallVec;
pub type SysEvent = EpollEvent;
struct SingleSelector {
epoll: Epoll,
evfd: EventFd,
#[cfg(feature = "io_timeout")]
timer_list: TimerList,
free_ev: Queue<Arc<EventData>>,
}
impl SingleSelector {
pub fn new() -> io::Result<Self> {
let info = EpollEvent::new(EpollFlags::EPOLLIN, 0);
let epoll = Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC)?;
let evfd = EventFd::from_flags(EfdFlags::EFD_NONBLOCK | EfdFlags::EFD_CLOEXEC)?;
epoll.add(evfd.as_fd(), info)?;
Ok(SingleSelector {
epoll,
evfd,
free_ev: Queue::new(),
#[cfg(feature = "io_timeout")]
timer_list: TimerList::new(),
})
}
}
pub(crate) struct Selector {
vec: SmallVec<[SingleSelector; 128]>,
}
impl Selector {
pub fn new(io_workers: usize) -> io::Result<Self> {
let mut s = Selector {
vec: SmallVec::new(),
};
for _ in 0..io_workers {
let ss = SingleSelector::new()?;
s.vec.push(ss);
}
Ok(s)
}
#[inline]
pub fn select(
&self,
scheduler: &Scheduler,
id: usize,
events: &mut [SysEvent],
_timeout: Option<u64>,
) -> io::Result<Option<u64>> {
assert!(id < self.vec.len());
#[cfg(feature = "io_timeout")]
let timeout_ms = _timeout
.map(|to| EpollTimeout::try_from(to.div_ceil(1_000_000)).unwrap())
.unwrap_or(EpollTimeout::NONE);
#[cfg(not(feature = "io_timeout"))]
let timeout_ms = EpollTimeout::NONE;
let single_selector = &self.vec[id];
let epoll = &single_selector.epoll;
let n = epoll.wait(events, timeout_ms)?;
for event in &events[..n] {
if event.data() == 0 {
let mut buf = [0u8; 8];
read(single_selector.evfd.as_fd(), &mut buf).ok();
scheduler.collect_global(id);
continue;
}
let data = unsafe { &mut *(event.data() as *mut EventData) };
let events = event.events().bits() as usize;
data.io_flag.fetch_or(events, Ordering::Release);
let co = match data.co.take() {
Some(co) => co,
None => continue,
};
#[cfg(feature = "io_timeout")]
data.timer.borrow_mut().take().map(|h| {
unsafe {
h.with_mut_data(|value| value.data.event_data = std::ptr::null_mut());
}
h.remove()
});
#[cfg(feature = "work_steal")]
scheduler.schedule_with_id(co, id);
#[cfg(not(feature = "work_steal"))]
crate::coroutine_impl::run_coroutine(co);
}
scheduler.run_queued_tasks(id);
self.free_unused_event_data(id);
#[cfg(feature = "io_timeout")]
let next_expire = single_selector
.timer_list
.schedule_timer(now(), &timeout_handler);
#[cfg(not(feature = "io_timeout"))]
let next_expire = None;
Ok(next_expire)
}
#[inline]
pub fn wakeup(&self, id: usize) {
let buf = 1u64.to_le_bytes();
let ret = write(&self.vec[id].evfd, &buf);
trace!("wakeup id={:?}, ret={:?}", id, ret);
}
#[inline]
pub fn add_fd(&self, io_data: IoData) -> io::Result<IoData> {
let info = EpollEvent::new(
EpollFlags::EPOLLIN
| EpollFlags::EPOLLOUT
| EpollFlags::EPOLLRDHUP
| EpollFlags::EPOLLET,
io_data.as_ref() as *const _ as _,
);
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let single_selector = &self.vec[id];
let epoll = &single_selector.epoll;
info!("add fd to epoll select, fd={:?}", fd);
epoll
.add(unsafe { BorrowedFd::borrow_raw(fd) }, info)
.map_err(from_nix_error)
.map(|_| io_data)
}
#[inline]
pub fn mod_fd(&self, io_data: &IoData, is_read: bool) -> io::Result<()> {
let mut info = if is_read {
EpollEvent::new(
EpollFlags::EPOLLIN | EpollFlags::EPOLLRDHUP | EpollFlags::EPOLLET,
io_data.as_ref() as *const _ as _,
)
} else {
EpollEvent::new(
EpollFlags::EPOLLOUT | EpollFlags::EPOLLHUP | EpollFlags::EPOLLET,
io_data.as_ref() as *const _ as _,
)
};
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let single_selector = &self.vec[id];
let epoll = &single_selector.epoll;
info!("mod fd to epoll select, fd={:?}, is_read={}", fd, is_read);
epoll
.modify(unsafe { BorrowedFd::borrow_raw(fd) }, &mut info)
.map_err(from_nix_error)
}
#[inline]
pub fn del_fd(&self, io_data: &IoData) {
#[cfg(feature = "io_timeout")]
if let Some(h) = io_data.timer.borrow_mut().take() {
unsafe {
h.with_mut_data(|value| value.data.event_data = std::ptr::null_mut());
}
}
let fd = io_data.fd;
let id = fd as usize % self.vec.len();
let single_selector = &self.vec[id];
let epoll = &single_selector.epoll;
info!("del fd from epoll select, fd={:?}", fd);
epoll.delete(unsafe { BorrowedFd::borrow_raw(fd) }).ok();
single_selector.free_ev.push((*io_data).clone());
}
#[inline]
fn free_unused_event_data(&self, id: usize) {
let free_ev = &self.vec[id].free_ev;
while !free_ev.bulk_pop().is_empty() {}
}
#[inline]
#[cfg(feature = "io_timeout")]
pub fn add_io_timer(&self, io: &IoData, timeout: Duration) {
let id = io.fd as usize % self.vec.len();
let (h, b_new) = self.vec[id].timer_list.add_timer(timeout, io.timer_data());
if b_new {
self.wakeup(id);
}
io.timer.borrow_mut().replace(h);
}
}