use std::
{
borrow::Cow,
fmt,
os::{
fd::{AsFd, AsRawFd, BorrowedFd}, unix::prelude::RawFd
},
sync::TryLockResult,
task::Poll
};
use crate::timer_portable::{timer::{FdTimerRead, ModeTimeType}, TimerExpMode};
use crossbeam_utils::atomic::AtomicCell;
use nix::
{
errno::Errno,
fcntl::{self, FcntlArg, OFlag},
libc::timespec,
sys::event::{EvFlags, EventFilter, FilterFlag, KEvent, Kqueue}
};
use crate::
{
map_portable_err,
portable_err,
timer_portable::
{
portable_error::TimerPortResult,
TimerFlags,
TimerType
},
FdTimerCom,
TimerReadRes
};
use super::kqueue_itimerspecs_specific::itimerspeckenent;
#[derive(Debug)]
pub struct TimerFdInternal
{
label: Cow<'static, str>,
timer_fd: Kqueue,
timer_flags: AtomicCell<TimerFlags>,
it_delay_int: AtomicCell<Option<(i64, FilterFlag)>>,
}
impl fmt::Display for TimerFdInternal
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.timer_fd.as_fd().as_raw_fd())
}
}
impl AsFd for TimerFdInternal
{
fn as_fd(&self) -> BorrowedFd<'_>
{
return self.timer_fd.as_fd();
}
}
impl AsRawFd for TimerFdInternal
{
fn as_raw_fd(&self) -> RawFd
{
return self.timer_fd.as_fd().as_raw_fd();
}
}
impl Eq for TimerFdInternal {}
impl PartialEq for TimerFdInternal
{
fn eq(&self, other: &Self) -> bool
{
return self.timer_fd.as_fd().as_raw_fd() == other.timer_fd.as_fd().as_raw_fd();
}
}
impl PartialEq<RawFd> for TimerFdInternal
{
fn eq(&self, other: &RawFd) -> bool
{
return self.timer_fd.as_fd().as_raw_fd() == *other;
}
}
impl PartialEq<str> for TimerFdInternal
{
fn eq(&self, other: &str) -> bool
{
return self.label == other;
}
}
impl AsRef<str> for TimerFdInternal
{
fn as_ref(&self) -> &str
{
return &self.label;
}
}
impl Ord for TimerFdInternal
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering
{
return self.timer_fd.as_fd().as_raw_fd().cmp(&other.timer_fd.as_fd().as_raw_fd());
}
}
impl PartialOrd for TimerFdInternal
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
{
return Some(self.cmp(other));
}
}
impl FdTimerRead for TimerFdInternal
{
fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
{
let timeout =
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == true
{
Some(timespec{ tv_sec: 0, tv_nsec: 0 })
}
else
{
None
};
loop
{
let mut ev_list =
[
KEvent::new(
self.timer_fd.as_fd().as_raw_fd() as usize,
EventFilter::EVFILT_TIMER,
EvFlags::empty(),
FilterFlag::empty(),
0,
0
)
];
let ret =
self.timer_fd.kevent(&[], ev_list.as_mut_slice(), timeout);
if let Ok(1) = ret
{
let overl = ev_list[0].data();
if let Some((data, fflags)) = self.it_delay_int.swap(None)
{
let ev =
KEvent::new(
0,
EventFilter::EVFILT_TIMER,
EvFlags::EV_ADD | EvFlags::EV_ENABLE,
fflags,
data as isize,
0
);
self
.timer_fd
.kevent(&[ev], &mut [], None)
.map_err(|e|
map_portable_err!(e, "timer: '{}' kevent() failed to set interval after delay", self)
)?;
}
return Ok(TimerReadRes::Ok(overl as u64));
}
else if let Ok(0) = ret
{
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == true
{
return Ok(TimerReadRes::WouldBlock);
}
else
{
return Ok(TimerReadRes::Cancelled);
}
}
else if let Err(Errno::EINTR) = ret
{
continue;
}
else if let Err(Errno::ECANCELED) = ret
{
return Ok(TimerReadRes::Cancelled);
}
else if let Err(e) = ret
{
portable_err!(e, "read timer overflow error for timer: '{}'", self.label)
}
}
}
}
impl FdTimerCom for TimerFdInternal
{
fn new(label: Cow<'static, str>, _timer_type: TimerType, timer_flags: TimerFlags) -> TimerPortResult<Self>
{
let timer_fd =
Kqueue::new()
.map_err(|e|
map_portable_err!(e,
"KQueue timer: '{}' init failed!", label)
)?;
return Ok(
Self
{
label:
label,
timer_fd:
timer_fd,
timer_flags:
AtomicCell::new(timer_flags),
it_delay_int:
AtomicCell::new(None),
}
);
}
fn set_time<TIMERTYPE: ModeTimeType>(&self, timer_exp: TimerExpMode<TIMERTYPE>) -> TimerPortResult<()>
{
let it_ent = itimerspeckenent::try_from(&timer_exp)?;
self
.timer_fd
.kevent(&[it_ent.it_value], &mut [], None)
.map_err(|e|
map_portable_err!(e, "timer: '{}' set_time() kevent failed", self)
)?;
if it_ent.is_interv_with_delay() == true
{
self.it_delay_int.store(it_ent.it_delay_int);
}
return Ok(());
}
fn unset_time(&self) -> TimerPortResult<()>
{
self.it_delay_int.store(None);
let ev =
KEvent::new(
0,
EventFilter::EVFILT_TIMER,
EvFlags::EV_DELETE,
FilterFlag::empty(),
0,
0
);
let Err(errn) =
self
.timer_fd
.kevent(&[ev], &mut [], None)
else
{
return Ok(());
};
if errn != Errno::ENOENT
{
portable_err!(errn, "timer: '{}' set_time() kevent failed", self)
}
return Ok(());
}
fn set_nonblocking(&self, flag: bool) -> TimerPortResult<()>
{
let old_timer_flags = self.timer_flags.load();
let mut timer_flags = old_timer_flags.clone();
timer_flags.set(TimerFlags::TFD_NONBLOCK, flag);
if self.timer_flags.swap(timer_flags) != old_timer_flags
{
map_portable_err!(Errno::EACCES, "changed from another thread");
}
return Ok(());
}
fn is_nonblocking(&self) -> TimerPortResult<bool>
{
let old_timer_flags = self.timer_flags.load();
return Ok(old_timer_flags.intersects(TimerFlags::TFD_NONBLOCK));
}
}
impl Future for &TimerFdInternal
{
type Output = TimerPortResult<TimerReadRes<u64>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
{
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == false
{
return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
}
let res = self.read();
if let Ok(TimerReadRes::WouldBlock) = res
{
cx.waker().wake_by_ref();
return Poll::Pending;
}
else
{
return Poll::Ready(res);
}
}
}
impl Future for TimerFdInternal
{
type Output = TimerPortResult<TimerReadRes<u64>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
{
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == false
{
return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
}
let res = self.read();
if let Ok(TimerReadRes::WouldBlock) = res
{
cx.waker().wake_by_ref();
return Poll::Pending;
}
else
{
return Poll::Ready(res);
}
}
}