use crate::common::constants::SLICE;
use crate::common::CondvarBlocker;
use dashmap::{DashMap, DashSet};
use once_cell::sync::Lazy;
use std::ffi::c_int;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
pub(crate) trait Interest: Copy {
fn read(token: usize) -> Self;
fn write(token: usize) -> Self;
fn read_and_write(token: usize) -> Self;
}
pub(crate) trait Event {
fn get_token(&self) -> usize;
fn readable(&self) -> bool;
fn writable(&self) -> bool;
}
static TOKEN_FD: Lazy<DashMap<usize, c_int>> = Lazy::new(DashMap::new);
static READABLE_RECORDS: Lazy<DashSet<c_int>> = Lazy::new(DashSet::new);
static READABLE_TOKEN_RECORDS: Lazy<DashMap<c_int, usize>> = Lazy::new(DashMap::new);
static WRITABLE_RECORDS: Lazy<DashSet<c_int>> = Lazy::new(DashSet::new);
static WRITABLE_TOKEN_RECORDS: Lazy<DashMap<c_int, usize>> = Lazy::new(DashMap::new);
pub(crate) trait EventIterator<E: Event> {
fn iterator<'a>(&'a self) -> impl Iterator<Item = &'a E>
where
E: 'a;
}
pub(crate) trait Selector<I: Interest, E: Event, S: EventIterator<E>> {
fn select(&self, events: &mut S, timeout: Option<Duration>) -> std::io::Result<()> {
if self
.waiting()
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
self.blocker().block(timeout.unwrap_or(SLICE));
return Ok(());
}
let result = self.do_select(events, timeout);
self.waiting().store(false, Ordering::Release);
for event in events.iterator() {
let token = event.get_token();
let fd = TOKEN_FD.remove(&token).map_or(0, |r| r.1);
if event.readable() {
_ = READABLE_TOKEN_RECORDS.remove(&fd);
}
if event.writable() {
_ = WRITABLE_TOKEN_RECORDS.remove(&fd);
}
}
result
}
fn add_read_event(&self, fd: c_int, token: usize) -> std::io::Result<()> {
if READABLE_RECORDS.contains(&fd) {
return Ok(());
}
if WRITABLE_RECORDS.contains(&fd) {
let interests = I::read_and_write(token);
self.reregister(fd, token, interests)
.or(self.register(fd, token, interests))
} else {
self.register(fd, token, I::read(token))
}?;
_ = READABLE_RECORDS.insert(fd);
_ = READABLE_TOKEN_RECORDS.insert(fd, token);
Ok(())
}
fn add_write_event(&self, fd: c_int, token: usize) -> std::io::Result<()> {
if WRITABLE_RECORDS.contains(&fd) {
return Ok(());
}
if READABLE_RECORDS.contains(&fd) {
let interests = I::read_and_write(token);
self.reregister(fd, token, interests)
.or(self.register(fd, token, interests))
} else {
self.register(fd, token, I::write(token))
}?;
_ = WRITABLE_RECORDS.insert(fd);
_ = WRITABLE_TOKEN_RECORDS.insert(fd, token);
Ok(())
}
fn del_event(&self, fd: c_int) -> std::io::Result<()> {
if READABLE_RECORDS.contains(&fd) || WRITABLE_RECORDS.contains(&fd) {
let token = READABLE_TOKEN_RECORDS
.remove(&fd)
.or(WRITABLE_TOKEN_RECORDS.remove(&fd))
.map_or(0, |r| r.1);
self.deregister(fd, token)?;
_ = READABLE_RECORDS.remove(&fd);
_ = WRITABLE_RECORDS.remove(&fd);
}
Ok(())
}
fn del_read_event(&self, fd: c_int) -> std::io::Result<()> {
if READABLE_RECORDS.contains(&fd) {
if WRITABLE_RECORDS.contains(&fd) {
let token = WRITABLE_TOKEN_RECORDS.get(&fd).map_or(0, |r| *r.value());
self.reregister(fd, token, I::write(token))?;
assert!(
READABLE_RECORDS.remove(&fd).is_some(),
"Clean READABLE_RECORDS failed !"
);
_ = READABLE_TOKEN_RECORDS.remove(&fd);
} else {
self.del_event(fd)?;
}
}
Ok(())
}
fn del_write_event(&self, fd: c_int) -> std::io::Result<()> {
if WRITABLE_RECORDS.contains(&fd) {
if READABLE_RECORDS.contains(&fd) {
let token = READABLE_TOKEN_RECORDS.get(&fd).map_or(0, |r| *r.value());
self.reregister(fd, token, I::read(token))?;
assert!(
WRITABLE_RECORDS.remove(&fd).is_some(),
"Clean WRITABLE_RECORDS failed !"
);
_ = WRITABLE_TOKEN_RECORDS.remove(&fd);
} else {
self.del_event(fd)?;
}
}
Ok(())
}
fn register(&self, fd: c_int, token: usize, interests: I) -> std::io::Result<()> {
self.do_register(fd, token, interests).map(|()| {
_ = TOKEN_FD.insert(token, fd);
})
}
fn reregister(&self, fd: c_int, token: usize, interests: I) -> std::io::Result<()> {
self.do_reregister(fd, token, interests).map(|()| {
_ = TOKEN_FD.insert(token, fd);
})
}
fn deregister(&self, fd: c_int, token: usize) -> std::io::Result<()> {
self.do_deregister(fd, token).map(|()| {
_ = TOKEN_FD.remove(&token);
})
}
fn waiting(&self) -> &AtomicBool;
fn blocker(&self) -> &CondvarBlocker;
fn do_select(&self, events: &mut S, timeout: Option<Duration>) -> std::io::Result<()>;
fn do_register(&self, fd: c_int, token: usize, interests: I) -> std::io::Result<()>;
fn do_reregister(&self, fd: c_int, token: usize, interests: I) -> std::io::Result<()>;
fn do_deregister(&self, fd: c_int, token: usize) -> std::io::Result<()>;
}
#[cfg(unix)]
pub(super) use {mio::Events, mio_adapter::Poller};
#[cfg(unix)]
mod mio_adapter;
#[cfg(windows)]
pub(super) use {polling_adapter::Events, polling_adapter::Poller};
#[cfg(windows)]
mod polling_adapter;