use std::
{
collections::{HashSet},
fmt,
os::fd::{AsFd, AsRawFd, RawFd},
sync::
{
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
Arc
}
};
use std::collections::BTreeMap;
use std::sync::RwLock;
use instance_copy_on_write::ICoW;
use nix::{errno::Errno, libc::{self}, sys::eventfd::EventFd};
use nix::libc::pollfd;
use nix::sys::eventfd::EfdFlags;
use crate::
{
error::{TimerErrorType, TimerResult},
map_timer_err,
timer_err,
timer_portable::
{
poll::{PollInterrupt, TimerPollOps},
PollEventType
}
};
use crate::timer_portable::timer::FdTimerRead;
use crate::timer_portable::TimerFd;
static ID_GEN: AtomicU32 = AtomicU32::new(0);
#[derive(Debug)]
pub struct TimerEventWatch
{
id: u32,
wakeup_event: Arc<EventFd>,
polling_flag: AtomicBool,
timers: RwLock<BTreeMap<RawFd, TimerFd>>
}
impl fmt::Display for TimerEventWatch
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "fake_fd:{}, cnt:{}", self.id,
self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
)
}
}
impl Eq for TimerEventWatch{}
impl PartialEq for TimerEventWatch
{
fn eq(&self, other: &Self) -> bool
{
return self.id == other.id;
}
}
impl TimerPollOps for TimerEventWatch
{
fn new() -> TimerResult<Self>
{
let wakeup_event =
EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
.map_err(|e|
map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
)?;
let mut pollfds =
[
libc::pollfd
{
fd: wakeup_event.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
},
]
.to_vec();
pollfds.sort_by(|a, b| a.fd.cmp(&b.fd));
return Ok(
Self
{
id: ID_GEN.fetch_add(1, Ordering::SeqCst),
wakeup_event: Arc::new(wakeup_event),
polling_flag: AtomicBool::new(false),
timers: RwLock::new(BTreeMap::new()),
}
);
}
fn add(&self, timer: TimerFd) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let false =
timers_lock.contains_key(&timer.as_raw_fd())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
timer)
};
timers_lock.insert(timer.as_fd().as_raw_fd(), timer);
return Ok(());
}
fn delete<FD: AsFd>(&self, timer: FD) -> TimerResult<()>
{
let mut timers_lock =
self
.timers
.write()
.map_or_else(|e| e.into_inner(), |v| v);
let true =
timers_lock.contains_key(&timer.as_fd().as_raw_fd())
else
{
timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
timer.as_fd().as_raw_fd())
};
let _ = timers_lock.remove(&timer.as_fd().as_raw_fd());
return Ok(());
}
fn poll(&self, timeout: Option<i32>) -> TimerResult<Option<Vec<PollEventType>>>
{
if self.polling_flag.swap(true, Ordering::SeqCst) == true
{
timer_err!(TimerErrorType::EPollAlreadyPolling,
"epoll fd: '{}' other thread already polling", self.id);
}
let mut timers_lock =
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v);
let mut pollfds: Vec<libc::pollfd> =
timers_lock
.iter()
.map(|timer|
{
libc::pollfd
{
fd: *timer.0,
events: libc::POLLIN,
revents: 0,
}
}
)
.collect();
drop(timers_lock);
let wakeup_fd =
libc::pollfd
{
fd: self.wakeup_event.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
};
pollfds.insert(0, wakeup_fd);
let res =
unsafe
{
libc::poll(
pollfds.as_mut_ptr().cast(),
pollfds.len() as libc::nfds_t,
timeout.map_or(-1, |f| f),
)
};
pollfds.sort_by(|a, b| a.fd.cmp(&b.fd));
self.polling_flag.store(false, Ordering::SeqCst);
let mut remove_dubs: HashSet<i32> = HashSet::with_capacity(3);
if res < 0
{
let errn = Errno::last();
if errn != Errno::EINTR
{
timer_err!(TimerErrorType::EPoll(errn), "poll {} error", self.id);
}
else
{
return Ok(None);
}
} else if res == 0
{
return Ok(None);
}
else
{
let timers_read =
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v);
let mut poll_res: Vec<PollEventType> = Vec::with_capacity(res as usize);
for event in pollfds.iter()
{
if event.revents & libc::POLLIN == libc::POLLIN
{
if event.fd as RawFd == self.wakeup_event.as_raw_fd()
{
let _ = self.wakeup_event.read();
}
else
{
let Some(timer) =
timers_read
.get(&event.fd).map(|c| c)
else
{
continue;
};
match timer.read()
{
Ok(res) =>
poll_res.push(PollEventType::TimerRes(timer.as_raw_fd(), res)),
Err(e) =>
poll_res.push(PollEventType::SubError(timer.as_raw_fd(), e)),
}
}
}
else if event.revents & (libc::POLLHUP | libc::POLLERR | libc::POLLRDHUP | libc::POLLNVAL) > 0
{
}
}
if poll_res.is_empty() == false
{
return Ok(Some(poll_res));
}
else
{
return Ok(None);
}
}
}
fn get_count(&self) -> usize
{
return return
self
.timers
.read()
.map_or_else(|e| e.into_inner(), |v| v)
.len();
}
fn get_poll_interruptor(&self) -> PollInterrupt
{
return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
}
fn interrupt_poll(&self) -> bool
{
return self.wakeup_event.write(1).is_ok();
}
}