use std::
{
fmt,
os::fd::{AsRawFd, RawFd},
sync::
{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc
}
};
use std::collections::BTreeMap;
use std::sync::RwLock;
use nix::{errno::Errno, libc::{self}, sys::eventfd::EventFd};
use nix::sys::eventfd::EfdFlags;
use crate::
{
error::{TimerErrorType, TimerResult},
map_timer_err,
timer_err,
timer_portable::
{
poll::{PollInterrupt, TimerPollOps}, AsTimerId, FdTimerMarker, PollEventType, TimerId
}
};
use crate::timer_portable::timer::FdTimerRead;
use crate::timer_portable::TimerFd;
static ID_GEN: AtomicU32 = AtomicU32::new(0);
#[derive(Debug)]
pub struct TimerEventWatch
{
id: u32,
wakeup_event: Arc<EventFd>,
polling_flag: AtomicBool,
timers: RwLock<BTreeMap<TimerId, TimerFd>>
}
impl fmt::Display for TimerEventWatch
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "fake_fd:{}, cnt:{}", self.id,
self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
)
}
}
impl Eq for TimerEventWatch{}
impl PartialEq for TimerEventWatch
{
fn eq(&self, other: &Self) -> bool
{
return self.id == other.id;
}
}
impl TimerPollOps for TimerEventWatch
{
fn new() -> TimerResult<Self>
{
let wakeup_event =
EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
)?;
let mut pollfds =
[
libc::pollfd
{
fd: wakeup_event.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
},
]
.to_vec();
pollfds.sort_by(|a, b| a.fd.cmp(&b.fd));
return Ok(
Self
{
id: ID_GEN.fetch_add(1, Ordering::SeqCst),
wakeup_event: Arc::new(wakeup_event),
polling_flag: AtomicBool::new(false),
timers: RwLock::new(BTreeMap::new()),
}
);
}
fn add(&self, timer: TimerFd) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let false =
timers_lock.contains_key(&timer.as_timer_id())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
timer)
};
timers_lock.insert(timer.as_timer_id(), timer);
return Ok(());
}
fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let true =
timers_lock.contains_key(&timer.as_timer_id())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason does not exist",
timer.as_timer_id())
};
let _ = timers_lock.remove(&timer.as_timer_id());
return Ok(());
}
fn poll(&self, timeout: Option<i32>) -> TimerResult<Option<Vec<PollEventType>>>
{
if self.polling_flag.swap(true, Ordering::SeqCst) == true
{
timer_err!(TimerErrorType::EPollAlreadyPolling,
"epoll fd: '{}' other thread already polling", self.id);
}
let timers_lock =
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v);
let mut pollfds: Vec<libc::pollfd> =
timers_lock
.iter()
.map(|timer|
{
libc::pollfd
{
fd: timer.1.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
}
}
)
.collect();
drop(timers_lock);
let wakeup_fd =
libc::pollfd
{
fd: self.wakeup_event.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
};
pollfds.insert(0, wakeup_fd);
pollfds.sort_by(|a, b| a.fd.cmp(&b.fd));
let res =
unsafe
{
libc::poll(
pollfds.as_mut_ptr().cast(),
pollfds.len() as libc::nfds_t,
timeout.map_or(-1, |f| f),
)
};
self.polling_flag.store(false, Ordering::SeqCst);
if res < 0
{
let errn = Errno::last();
if errn != Errno::EINTR
{
timer_err!(TimerErrorType::EPoll(errn), "poll {} error", self.id);
}
else
{
return Ok(None);
}
} else if res == 0
{
return Ok(None);
}
else
{
let timers_read =
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v);
let mut poll_res: Vec<PollEventType> = Vec::with_capacity(res as usize);
for event in pollfds.iter()
{
if event.revents & libc::POLLIN == libc::POLLIN
{
if event.fd as RawFd == self.wakeup_event.as_raw_fd()
{
let _ = self.wakeup_event.read();
}
else
{
let Some(timer) =
timers_read
.get(&TimerId::from(event.fd)).map(|c| c)
else
{
continue;
};
match timer.read()
{
Ok(res) =>
poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
Err(e) =>
poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
}
}
}
else if event.revents & (libc::POLLHUP | libc::POLLERR | libc::POLLRDHUP | libc::POLLNVAL) > 0
{
}
}
if poll_res.is_empty() == false
{
return Ok(Some(poll_res));
}
else
{
return Ok(None);
}
}
}
fn get_count(&self) -> usize
{
return
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v)
.len();
}
fn get_poll_interruptor(&self) -> PollInterrupt
{
return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
}
fn interrupt_poll(&self) -> bool
{
return self.wakeup_event.write(1).is_ok();
}
}