use std::
{
borrow::Cow,
fmt,
os::{
fd::{AsFd, AsRawFd, BorrowedFd}, unix::prelude::RawFd
},
sync::TryLockResult,
task::Poll
};
use crate::timer_portable::{timer::{FdTimerRead, ModeTimeType}, TimerExpMode};
use crossbeam_utils::atomic::AtomicCell;
use nix::
{
errno::Errno,
fcntl::{self, FcntlArg, OFlag},
libc::timespec,
sys::event::{EvFlags, EventFilter, FilterFlag, KEvent, Kqueue}
};
use crate::
{
map_portable_err,
portable_err,
timer_portable::
{
portable_error::TimerPortResult,
TimerFlags,
TimerType
},
FdTimerCom,
TimerReadRes
};
use super::kqueue_itimerspecs_specific::itimerspeckenent;
#[derive(Debug)]
pub struct TimerFdInternal
{
label: Cow<'static, str>,
timer_fd: Kqueue,
timer_flags: AtomicCell<TimerFlags>,
it_delay_int: AtomicCell<Option<(i64, FilterFlag)>>,
}
impl fmt::Display for TimerFdInternal
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.timer_fd.as_fd().as_raw_fd())
}
}
impl AsFd for TimerFdInternal
{
fn as_fd(&self) -> BorrowedFd<'_>
{
return self.timer_fd.as_fd();
}
}
impl AsRawFd for TimerFdInternal
{
fn as_raw_fd(&self) -> RawFd
{
return self.timer_fd.as_fd().as_raw_fd();
}
}
impl Eq for TimerFdInternal {}
impl PartialEq for TimerFdInternal
{
fn eq(&self, other: &Self) -> bool
{
return self.timer_fd.as_fd().as_raw_fd() == other.timer_fd.as_fd().as_raw_fd();
}
}
impl PartialEq<RawFd> for TimerFdInternal
{
fn eq(&self, other: &RawFd) -> bool
{
return self.timer_fd.as_fd().as_raw_fd() == *other;
}
}
impl PartialEq<str> for TimerFdInternal
{
fn eq(&self, other: &str) -> bool
{
return self.label == other;
}
}
impl AsRef<str> for TimerFdInternal
{
fn as_ref(&self) -> &str
{
return &self.label;
}
}
impl Ord for TimerFdInternal
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering
{
return self.timer_fd.as_fd().as_raw_fd().cmp(&other.timer_fd.as_fd().as_raw_fd());
}
}
impl PartialOrd for TimerFdInternal
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
{
return Some(self.cmp(other));
}
}
impl FdTimerRead for TimerFdInternal
{
fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
{
let timeout =
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == true
{
Some(timespec{ tv_sec: 0, tv_nsec: 0 })
}
else
{
None
};
loop
{
let mut ev_list =
[
KEvent::new(
self.timer_fd.as_fd().as_raw_fd() as usize,
EventFilter::EVFILT_TIMER,
EvFlags::empty(),
FilterFlag::empty(),
0,
0
)
];
let ret =
self.timer_fd.kevent(&[], ev_list.as_mut_slice(), timeout);
if let Ok(1) = ret
{
let overl = ev_list[0].data();
if let Some((data, fflags)) = self.it_delay_int.swap(None)
{
let ev =
KEvent::new(
0,
EventFilter::EVFILT_TIMER,
EvFlags::EV_ADD | EvFlags::EV_ENABLE,
fflags,
data as isize,
0
);
self
.timer_fd
.kevent(&[ev], &mut [], None)
.map_err(|e|
map_portable_err!(e, "timer: '{}' kevent() failed to set interval after delay", self)
)?;
}
return Ok(TimerReadRes::Ok(overl as u64));
}
else if let Ok(0) = ret
{
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == true
{
return Ok(TimerReadRes::WouldBlock);
}
else
{
return Ok(TimerReadRes::Cancelled);
}
}
else if let Err(Errno::EINTR) = ret
{
continue;
}
else if let Err(Errno::ECANCELED) = ret
{
return Ok(TimerReadRes::Cancelled);
}
else if let Err(e) = ret
{
portable_err!(e, "read timer overflow error for timer: '{}'", self.label)
}
}
}
}
impl FdTimerCom for TimerFdInternal
{
fn new(label: Cow<'static, str>, _timer_type: TimerType, timer_flags: TimerFlags) -> TimerPortResult<Self>
{
let timer_fd =
Kqueue::new()
.map_err(|e|
map_portable_err!(e,
"KQueue timer: '{}' init failed!", label)
)?;
return Ok(
Self
{
label:
label,
timer_fd:
timer_fd,
timer_flags:
AtomicCell::new(timer_flags),
it_delay_int:
AtomicCell::new(None),
}
);
}
fn set_time<TIMERTYPE: ModeTimeType>(&self, timer_exp: TimerExpMode<TIMERTYPE>) -> TimerPortResult<()>
{
let it_ent = itimerspeckenent::try_from(&timer_exp)?;
self
.timer_fd
.kevent(&[it_ent.it_value], &mut [], None)
.map_err(|e|
map_portable_err!(e, "timer: '{}' set_time() kevent failed", self)
)?;
if it_ent.is_interv_with_delay() == true
{
self.it_delay_int.store(it_ent.it_delay_int);
}
return Ok(());
}
fn unset_time(&self) -> TimerPortResult<()>
{
self.it_delay_int.store(None);
let ev =
KEvent::new(
0,
EventFilter::EVFILT_TIMER,
EvFlags::EV_DELETE,
FilterFlag::empty(),
0,
0
);
let Err(errn) =
self
.timer_fd
.kevent(&[ev], &mut [], None)
else
{
return Ok(());
};
if errn != Errno::ENOENT
{
portable_err!(errn, "timer: '{}' set_time() kevent failed", self)
}
return Ok(());
}
fn set_nonblocking(&self, flag: bool) -> TimerPortResult<()>
{
let old_timer_flags = self.timer_flags.load();
let mut timer_flags = old_timer_flags.clone();
timer_flags.set(TimerFlags::TFD_NONBLOCK, flag);
if self.timer_flags.swap(timer_flags) != old_timer_flags
{
map_portable_err!(Errno::EACCES, "changed from another thread");
}
return Ok(());
}
fn is_nonblocking(&self) -> TimerPortResult<bool>
{
let old_timer_flags = self.timer_flags.load();
return Ok(old_timer_flags.intersects(TimerFlags::TFD_NONBLOCK));
}
}
impl Future for &TimerFdInternal
{
type Output = TimerPortResult<TimerReadRes<u64>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
{
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == false
{
return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
}
let res = self.read();
if let Ok(TimerReadRes::WouldBlock) = res
{
cx.waker().wake_by_ref();
return Poll::Pending;
}
else
{
return Poll::Ready(res);
}
}
}
impl Future for TimerFdInternal
{
type Output = TimerPortResult<TimerReadRes<u64>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
{
if self.timer_flags.load().intersects(TimerFlags::TFD_NONBLOCK) == false
{
return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
}
let res = self.read();
if let Ok(TimerReadRes::WouldBlock) = res
{
cx.waker().wake_by_ref();
return Poll::Pending;
}
else
{
return Poll::Ready(res);
}
}
}
#[cfg(test)]
mod tests
{
use std::time::{Duration, Instant};
use tokio::io::{unix::AsyncFd, Interest};
use crate::{common, timer_portable::{timer::AbsoluteTime, TimerExpMode}, RelativeTime};
use super::*;
#[test]
fn test1()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::empty()).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
let ovf = timer.read().unwrap().unwrap();
let ts = chrono::offset::Local::now().timestamp();
let e = s.elapsed();
assert_eq!(ovf, 1);
assert_eq!(ts, snow);
println!("elapsed: {:?}, ts: {}", e, ts);
assert_eq!((e.as_millis() <= 3100), true);
println!("Success");
return;
}
#[test]
fn test1_1()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::empty()).unwrap();
let ts = common::get_current_timestamp();
let timer1_time =
TimerExpMode::<AbsoluteTime>::new_oneshot(AbsoluteTime::from(ts) + RelativeTime::new_time(3, 0));
timer.set_time(timer1_time).unwrap();
let res = timer.read().unwrap();
let ts = common::get_current_timestamp();
let timer1_time =
TimerExpMode::<AbsoluteTime>::new_oneshot(AbsoluteTime::from(ts) + RelativeTime::new_time(3, 0));
timer.set_time(timer1_time).unwrap();
let res = timer.read().unwrap();
println!("Success");
return;
}
#[tokio::test]
async fn test2_fut()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::TFD_NONBLOCK).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
tokio::select! {
ovf = timer => {
let ts = chrono::offset::Local::now().timestamp();
let e = s.elapsed();
assert_eq!(ovf, Ok(TimerReadRes::Ok(1)));
assert_eq!(ts, snow);
println!("timeout e: {:?}, ts:{} snow:{}", e, ts, snow);
}
}
}
#[tokio::test]
async fn test3_tokio()
{
let tm = TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME, TimerFlags::TFD_NONBLOCK).unwrap();
tm.set_nonblocking(true).unwrap();
let mut timer: AsyncFd<TimerFdInternal> =
AsyncFd::with_interest(tm, Interest::READABLE).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.get_mut()
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
tokio::select! {
read_guard_res = timer.ready(Interest::READABLE) =>
{
let read_guard = read_guard_res.unwrap();
let res = read_guard.get_inner().read();
let ts = chrono::offset::Local::now().timestamp();
let e = s.elapsed();
assert_eq!(res, Ok(TimerReadRes::Ok(1)));
assert_eq!(ts, snow);
println!("timeout e: {:?}, ts:{} snow:{}", e, ts, snow);
}
}
}
#[test]
fn test4_cancel()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::TFD_NONBLOCK).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res = timer.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
let ovf = timer.read().unwrap();
println!("{}", ovf);
timer.unset_time().unwrap();
std::thread::sleep(Duration::from_secs(3));
let ovf = timer.read().unwrap();
println!("{}", ovf);
assert_eq!(ovf, TimerReadRes::WouldBlock);
}
#[test]
fn test5_preiodic_with_delay()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME, TimerFlags::empty()).unwrap();
let timer_mode1 =
TimerExpMode
::<RelativeTime>
::new_interval_with_init_delay(
RelativeTime::new_time(1, 0),
RelativeTime::new_time(0, 500_000_000)
);
timer.set_time(timer_mode1).unwrap();
let start = AbsoluteTime::now();
let res = timer.read().unwrap();
let end = AbsoluteTime::now();
let diff = end - start;
println!("timer was set s: '{}' e:'{}', diff = '{}' res: '{}'", start, end, diff, res);
assert_eq!(diff.get_sec(), 1);
assert_eq!(res, TimerReadRes::Ok(1));
let res = timer.read().unwrap();
let end2 = AbsoluteTime::now();
let diff = end2 - end;
println!("timer was set s: '{}' e:'{}', diff = '{}' res: '{}'", end, end2, diff, res);
assert_eq!(diff.get_sec(), 0);
assert!(diff.get_nsec() >= 499_930_000 && diff.get_nsec() <= 500_400_000);
assert_eq!(res, TimerReadRes::Ok(1));
let res = timer.read().unwrap();
let end3 = AbsoluteTime::now();
let diff = end3 - end2;
println!("timer was set s: '{}' e:'{}', diff = '{}' res: '{}'", end2, end3, diff, res);
assert_eq!(diff.get_sec(), 0);
assert!(diff.get_nsec() >= 499_930_000 && diff.get_nsec() <= 500_400_000);
assert_eq!(res, TimerReadRes::Ok(1));
timer.unset_time().unwrap();
timer.set_nonblocking(true).unwrap();
std::thread::sleep(Duration::from_millis(1000));
let res = timer.read().unwrap();
assert_eq!(res, TimerReadRes::WouldBlock);
}
}