use std::
{
collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{Arc, RwLock, atomic::{AtomicBool, Ordering}}
};
use nix::
{
errno::Errno, poll::PollTimeout, sys::
{
epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags},
eventfd::{EfdFlags, EventFd}
}
};
use crate::
{
error::{TimerError, TimerErrorType, TimerResult},
map_timer_err,
timer_err,
timer_portable::
{
AsTimerId, FdTimerMarker, TimerFd, TimerId, poll::{ PollEventType, PollInterrupt, TimerPollOps}, timer::FdTimerRead
}
};
#[derive(Debug)]
pub struct TimerEventWatch
{
epoll: Epoll,
wakeup_event: Arc<EventFd>,
polling_flag: AtomicBool,
timers: RwLock<BTreeMap<TimerId, TimerFd>>,
}
impl fmt::Display for TimerEventWatch
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(),
self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
) }
}
impl Eq for TimerEventWatch {}
impl PartialEq for TimerEventWatch
{
fn eq(&self, other: &Self) -> bool
{
return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
}
}
impl TimerPollOps for TimerEventWatch
{
fn new() -> TimerResult<Self>
{
let epoll =
Epoll::new(EpollCreateFlags::empty())
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
)?;
let wakeup_event =
EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
)?;
epoll
.add(
&wakeup_event,
EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
)?;
return Ok(
Self
{
epoll: epoll,
wakeup_event: Arc::new(wakeup_event),
polling_flag: AtomicBool::new(false),
timers: RwLock::new(BTreeMap::new())
}
);
}
fn add(&self, timer: TimerFd) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let false =
timers_lock.contains_key(&timer.as_timer_id())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
timer)
};
self
.epoll
.add(
&timer,
EpollEvent::new(EpollFlags::EPOLLIN, timer.as_timer_id().0 as u64)
)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
)?;
timers_lock.insert(timer.as_timer_id(), timer);
return Ok(());
}
fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let true =
timers_lock.contains_key(&timer.as_timer_id())
else
{
timer_err!(TimerErrorType::Duplicate, "can not remove timer {} to epoll, reason duplicate",
timer.as_fd().as_raw_fd())
};
if let Err(ern) =
self
.epoll
.delete(&timer)
{
if ern != Errno::ENOENT
{
timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} from epoll",
timer.as_timer_id());
}
}
let Some(_) = timers_lock.remove(&timer.as_timer_id())
else
{
timer_err!(TimerErrorType::NotFound, "timer {} not found in the list", timer.as_fd().as_raw_fd())
};
return Ok(());
}
fn poll(&self, timeout: Option<i32>) -> TimerResult<Option<Vec<PollEventType>>>
{
if self.polling_flag.swap(true, Ordering::SeqCst) == true
{
timer_err!(TimerErrorType::EPollAlreadyPolling,
"epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
}
let mut events = vec![EpollEvent::empty(); self.get_count() + 1];
let mut poll_funct = ||
{
let poll_timeout =
timeout
.map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
.map_err(|e|
map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)
)?;
let evs_res =
self
.epoll
.wait(events.as_mut_slice(), poll_timeout)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "poll error")
)?;
return Ok(evs_res);
};
let poll_res: Result<usize, TimerError> = poll_funct();
self.polling_flag.store(false, Ordering::SeqCst);
if let Err(err) = poll_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
{
return Ok(None);
}
let evs = poll_res?;
if evs == 0
{
return Ok(None);
}
else
{
let timers_read =
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v);
let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
for event
in events[..evs].iter().filter(|ev| ev.events().intersects(EpollFlags::EPOLLIN))
{
if event.data() as RawFd == self.wakeup_event.as_raw_fd()
{
let _ = self.wakeup_event.read();
}
else
{
let Some(timer) =
timers_read
.get(&(TimerId::from(event.data()))).map(|c| c)
else
{
continue;
};
match timer.read()
{
Ok(res) =>
poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
Err(e) =>
poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
}
}
}
if poll_res.is_empty() == false
{
return Ok(Some(poll_res));
}
else
{
return Ok(None);
}
}
}
fn get_count(&self) -> usize
{
return
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v)
.len();
}
fn get_poll_interruptor(&self) -> PollInterrupt
{
return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
}
fn interrupt_poll(&self) -> bool
{
return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
}
}