use std::
{
collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock}
};
use nix::
{
errno::Errno, libc::timespec, sys::{event::{EvFlags, EventFilter, FilterFlag, KEvent, Kqueue}, eventfd::EventFd}
};
use nix::sys::eventfd::EfdFlags;
use crate::
{
error::{TimerErrorType, TimerResult},
map_timer_err,
timer_err,
timer_portable::{poll::{PollInterrupt, TimerPollOps}, timer::FdTimerRead, PollEventType, TimerFd}
};
#[derive(Debug)]
pub struct TimerEventWatch
{
epoll: Kqueue,
wakeup_event: Arc<EventFd>,
polling_flag: AtomicBool,
timers: RwLock<BTreeMap<RawFd, TimerFd>>,
}
impl fmt::Display for TimerEventWatch
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "fd:{}, cnt:{}", self.epoll.as_fd().as_raw_fd(),
self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
) }
}
impl Eq for TimerEventWatch{}
impl PartialEq for TimerEventWatch
{
fn eq(&self, other: &Self) -> bool
{
return self.epoll.as_fd().as_raw_fd() == other.epoll.as_fd().as_raw_fd();
}
}
impl TimerPollOps for TimerEventWatch
{
fn new() -> TimerResult<Self>
{
let kqueue =
Kqueue::new()
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "kqueue new() error")
)?;
let wakeup_event =
EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
)?;
let wakeup_kevent =
KEvent::new(wakeup_event.as_raw_fd() as usize, EventFilter::EVFILT_READ,
EvFlags::EV_ADD | EvFlags::EV_ENABLE, FilterFlag::empty(), 0, 0);
kqueue
.kevent(&[wakeup_kevent], &mut [], None)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll kevent() error")
)?;
return Ok(
Self
{
epoll: kqueue,
wakeup_event: Arc::new(wakeup_event),
polling_flag: AtomicBool::new(false),
timers: RwLock::new(BTreeMap::new())
}
);
}
fn add(&self, timer: TimerFd) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let false =
timers_lock.contains_key(&timer.as_raw_fd())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
timer)
};
let timer_kevent =
KEvent::new(timer.as_fd().as_raw_fd() as usize, EventFilter::EVFILT_READ,
EvFlags::EV_ADD | EvFlags::EV_ENABLE, FilterFlag::empty(),
timer.as_fd().as_raw_fd() as isize, timer.as_fd().as_raw_fd() as isize);
self
.epoll
.kevent(&[timer_kevent], &mut [], None)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to kqueue", timer)
)?;
timers_lock.insert(timer.as_fd().as_raw_fd(), timer);
return Ok(());
}
fn delete<FD: AsFd>(&self, timer: FD) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let true =
timers_lock.contains_key(&timer.as_fd().as_raw_fd())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
timer.as_fd().as_raw_fd())
};
let _ = timers_lock.remove(&timer.as_fd().as_raw_fd());
let timer_kevent =
KEvent::new(timer.as_fd().as_raw_fd() as usize, EventFilter::EVFILT_READ,
EvFlags::EV_DELETE , FilterFlag::empty(), 0, 0);
let res =
self
.epoll
.kevent(&[timer_kevent], &mut [], None);
if let Err(ern) = res && ern != Errno::ENOENT
{
timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} to kqueue", timer.as_fd().as_raw_fd());
}
return Ok(());
}
fn poll(&self, timeout: Option<i32>) -> TimerResult<Option<Vec<PollEventType>>>
{
if self.polling_flag.swap(true, Ordering::SeqCst) == true
{
timer_err!(TimerErrorType::EPollAlreadyPolling,
"epoll fd: '{}' other thread already polling", self.epoll.as_fd().as_raw_fd());
}
let revent =
KEvent::new(0, EventFilter::EVFILT_READ,
EvFlags::empty(), FilterFlag::empty(), 0, 0);
let mut events = vec![revent; self.get_count() + 1];
let poll_timeout =
timeout
.map_or(
None,
|f|
{
let sec = f / 1000;
let nsec = f % 1000 * 100_000;
Some(timespec{ tv_sec: sec as i64, tv_nsec: nsec as i64})
}
);
let evs_res =
self
.epoll
.kevent(&[], events.as_mut_slice(), poll_timeout)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "kevent error")
);
self.polling_flag.store(false, Ordering::SeqCst);
if let Err(err) = evs_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
{
return Ok(None);
}
let evs = evs_res?;
if evs == 0
{
return Ok(None);
}
else
{
let timers_read =
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v);
let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
for event in events[..evs].iter()
{
if event.ident() as RawFd == self.wakeup_event.as_raw_fd()
{
let _ = self.wakeup_event.read();
}
else
{
let Some(timer) =
timers_read
.get(&(event.ident() as RawFd)).map(|c| c)
else
{
continue;
};
match timer.read()
{
Ok(res) =>
poll_res.push(PollEventType::TimerRes(timer.as_raw_fd(), res)),
Err(e) =>
poll_res.push(PollEventType::SubError(timer.as_raw_fd(), e)),
}
}
}
if poll_res.is_empty() == false
{
return Ok(Some(poll_res));
}
else
{
return Ok(None);
}
}
}
fn get_count(&self) -> usize
{
return
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v)
.len();
}
fn get_poll_interruptor(&self) -> PollInterrupt
{
return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
}
fn interrupt_poll(&self) -> bool
{
return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
}
}