use std::
{
borrow::Cow,
fmt,
os::{fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd}, unix::prelude::RawFd},
sync::{Arc, RwLock, TryLockResult, Weak},
task::Poll
};
use crate::
{
timer_portable::
{
timer::{itimerspec, ModeTimeType},
TimerExpMode
},
AbsoluteTime
};
use nix::
{
errno::Errno, fcntl::{self, FcntlArg, OFlag}, libc::{self}
};
use crate::
{
map_portable_err,
portable_err,
timer_portable::
{
portable_error::TimerPortResult,
TimerFlags,
TimerType
},
FdTimerCom,
TimerReadRes
};
use crate::timer_portable::timer::FdTimerRead;
#[derive(Debug)]
pub struct TimerFdInternal
{
label: Cow<'static, str>,
timer_fd: OwnedFd,
}
impl fmt::Display for TimerFdInternal
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{}", self.timer_fd.as_fd().as_raw_fd())
}
}
impl AsFd for TimerFdInternal
{
fn as_fd(&self) -> BorrowedFd<'_>
{
return self.timer_fd.as_fd();
}
}
impl AsRawFd for TimerFdInternal
{
fn as_raw_fd(&self) -> RawFd
{
return self.timer_fd.as_raw_fd();
}
}
impl Eq for TimerFdInternal {}
impl PartialEq for TimerFdInternal
{
fn eq(&self, other: &Self) -> bool
{
return self.timer_fd.as_fd().as_raw_fd() == other.timer_fd.as_fd().as_raw_fd();
}
}
impl PartialEq<RawFd> for TimerFdInternal
{
fn eq(&self, other: &RawFd) -> bool
{
return self.timer_fd.as_fd().as_raw_fd() == *other;
}
}
impl PartialEq<str> for TimerFdInternal
{
fn eq(&self, other: &str) -> bool
{
return self.label == other;
}
}
impl AsRef<str> for TimerFdInternal
{
fn as_ref(&self) -> &str
{
return &self.label;
}
}
impl Ord for TimerFdInternal
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering
{
return self.timer_fd.as_fd().as_raw_fd().cmp(&other.timer_fd.as_fd().as_raw_fd());
}
}
impl PartialOrd for TimerFdInternal
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
{
return Some(self.cmp(other));
}
}
impl FdTimerRead for TimerFdInternal
{
fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
{
let mut timer_overfl = [0u8; size_of::<u64>()];
loop
{
let ret =
nix::unistd::read(&self.timer_fd, &mut timer_overfl);
if let Ok(_) = ret
{
let overl: u64 = u64::from_ne_bytes(timer_overfl);
return Ok(TimerReadRes::Ok(overl));
}
else if let Err(Errno::EINTR) = ret
{
continue;
}
else if let Err(Errno::EAGAIN) = ret
{
return Ok(TimerReadRes::WouldBlock);
}
else if let Err(Errno::ECANCELED) = ret
{
return Ok(TimerReadRes::Cancelled);
}
else if let Err(e) = ret
{
portable_err!(e, "read timer overflow error for timer: '{}'", self.label)
}
}
}
}
impl FdTimerCom for TimerFdInternal
{
fn new(label: Cow<'static, str>, timer_type: TimerType, timer_flags: TimerFlags) -> TimerPortResult<Self>
{
let timer_fd =
unsafe { libc::timerfd_create(timer_type.into(), timer_flags.bits())};
return Ok(
Self
{
label:
label,
timer_fd:
unsafe { OwnedFd::from_raw_fd(timer_fd) },
}
);
}
fn set_time<TIMERTYPE: ModeTimeType>(&self, timer_exp: TimerExpMode<TIMERTYPE>) -> TimerPortResult<()>
{
let flags = TIMERTYPE::get_flags().bits();
let timer_value: itimerspec = timer_exp.into();
let res =
unsafe
{
libc::timerfd_settime(self.timer_fd.as_raw_fd(), flags,
&timer_value, core::ptr::null_mut())
};
if res == -1
{
portable_err!(Errno::last(), "can not init timer: '{}'", self.label);
}
return Ok(());
}
fn unset_time(&self) -> TimerPortResult<()>
{
let mut timer_value: itimerspec = unsafe { std::mem::zeroed() };
let res =
unsafe
{
libc::timerfd_gettime(self.timer_fd.as_raw_fd(),
&mut timer_value as *mut _ as *mut itimerspec)
};
if res == -1
{
portable_err!(Errno::last(), "can not timerfd_gettime for timer: '{}'", self.label);
}
let timer_mode = TimerExpMode::<AbsoluteTime>::from(timer_value);
if TimerExpMode::<AbsoluteTime>::reset() == timer_mode
{
return Ok(());
}
let timer_value: itimerspec = TimerExpMode::<AbsoluteTime>::reset().into();
let res =
unsafe
{
libc::timerfd_settime(self.timer_fd.as_raw_fd(), 0,
&timer_value, core::ptr::null_mut())
};
if res == -1
{
portable_err!(Errno::last(), "can not unset timer: '{}'", self.label);
}
return Ok(());
}
fn set_nonblocking(&self, flag: bool) -> TimerPortResult<()>
{
let mut fl =
OFlag::from_bits_retain(
fcntl::fcntl(&self.timer_fd, FcntlArg::F_GETFL)
.map_err(|e|
map_portable_err!(e, "timer: '{}', fcntl F_GETFL failed", self.label)
)?
);
fl.set(OFlag::O_NONBLOCK, flag);
fcntl::fcntl(&self.timer_fd, FcntlArg::F_SETFL(fl))
.map_err(|e|
map_portable_err!(e, "timer: '{}', fcntl F_SETFL failed", self.label)
)?;
return Ok(());
}
fn is_nonblocking(&self) -> TimerPortResult<bool>
{
let fl =
OFlag::from_bits_retain(
fcntl::fcntl(&self.timer_fd, FcntlArg::F_GETFL)
.map_err(|e|
map_portable_err!(e, "timer: '{}', fcntl F_GETFL failed", self.label)
)?
);
return Ok(fl.intersects(OFlag::O_NONBLOCK));
}
}
impl Future for &TimerFdInternal
{
type Output = TimerPortResult<TimerReadRes<u64>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
{
if self.is_nonblocking()?== false
{
return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
}
let res = self.read();
if let Ok(TimerReadRes::WouldBlock) = res
{
cx.waker().wake_by_ref();
return Poll::Pending;
}
else
{
return Poll::Ready(res);
}
}
}
impl Future for TimerFdInternal
{
type Output = TimerPortResult<TimerReadRes<u64>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
{
if self.is_nonblocking()? == false
{
return Poll::Ready(Err(map_portable_err!(Errno::EINVAL, "timer fd is in blocking mode")));
}
let res = self.read();
if let Ok(TimerReadRes::WouldBlock) = res
{
cx.waker().wake_by_ref();
return Poll::Pending;
}
else
{
return Poll::Ready(res);
}
}
}
#[cfg(test)]
mod tests
{
use std::time::{Duration, Instant};
use tokio::io::{unix::AsyncFd, AsyncRead, Interest};
use crate::{common, timer_portable::{timer::AbsoluteTime, TimerExpMode}, RelativeTime};
use super::*;
#[test]
fn test1()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::empty()).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
let ovf = timer.read().unwrap().unwrap();
let ts = chrono::offset::Local::now().timestamp();
let e = s.elapsed();
assert_eq!(ovf, 1);
assert_eq!(ts, snow);
println!("elapsed: {:?}, ts: {}", e, ts);
assert_eq!((e.as_millis() <= 3100), true);
println!("Success");
return;
}
#[test]
fn test1_1()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::empty()).unwrap();
let timer1 =
TimerFdInternal::new(Cow::Borrowed("test2"), TimerType::CLOCK_REALTIME,
TimerFlags::TFD_NONBLOCK).unwrap();
let ts = common::get_current_timestamp();
println!("{} 1: {}, 2:{}", ts.timestamp(), ts.timestamp() + 3, ts.timestamp() + 2);
let timer1_time =
TimerExpMode::<AbsoluteTime>::new_oneshot(AbsoluteTime::from(ts) + RelativeTime::new_time(3, 0));
let time2_time =
TimerExpMode::<AbsoluteTime>::new_oneshot(AbsoluteTime::from(ts) + RelativeTime::new_time(4, 0));
timer.set_time(timer1_time).unwrap();
let res = timer.read();
let tss = common::get_current_timestamp();
println!("{:?} ts: {}", res, tss.timestamp());
let ts = common::get_current_timestamp();
let res = timer1.read();
println!("{:?} ts: {}", res, ts.timestamp());
println!("Success");
return;
}
#[tokio::test]
async fn test2_fut()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::TFD_NONBLOCK).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
tokio::select! {
ovf = timer => {
let ts = chrono::offset::Local::now().timestamp();
let e = s.elapsed();
assert_eq!(ovf, Ok(TimerReadRes::Ok(1)));
assert_eq!(ts, snow);
println!("timeout e: {:?}, ts:{} snow:{}", e, ts, snow);
}
}
}
#[tokio::test]
async fn test3_tokio()
{
let tm = TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME, TimerFlags::TFD_NONBLOCK).unwrap();
tm.set_nonblocking(true).unwrap();
let mut timer: AsyncFd<TimerFdInternal> =
AsyncFd::new(tm).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.get_mut()
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
tokio::select! {
read_guard_res = timer.ready(Interest::READABLE) =>
{
let read_guard = read_guard_res.unwrap();
let res = read_guard.get_inner().read();
let ts = chrono::offset::Local::now().timestamp();
let e = s.elapsed();
assert_eq!(res, Ok(TimerReadRes::Ok(1)));
assert_eq!(ts, snow);
println!("timeout e: {:?}, ts:{} snow:{}", e, ts, snow);
}
}
}
#[test]
fn test4_cancel()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
TimerFlags::TFD_NONBLOCK).unwrap();
let now = chrono::offset::Local::now().timestamp();
let snow = now + 3;
let s = Instant::now();
let timer_mode1 =
TimerExpMode::<AbsoluteTime>::new_oneshot(
AbsoluteTime::new_time(snow, 0).unwrap()
);
let res =
timer
.set_time(timer_mode1);
println!("timer was set: '{}' '{}'", now, snow);
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
let ovf = timer.read().unwrap();
println!("{}", ovf);
timer.unset_time().unwrap();
let ovf = timer.read().unwrap();
println!("{}", ovf);
}
#[test]
fn test5_preiodic_with_delay()
{
let timer =
TimerFdInternal::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME, TimerFlags::empty()).unwrap();
let timer_mode1 =
TimerExpMode
::<RelativeTime>
::new_interval_with_init_delay(
RelativeTime::new_time(1, 0),
RelativeTime::new_time(0, 500_000_000)
);
timer.set_time(timer_mode1).unwrap();
let start = AbsoluteTime::now();
let res = timer.read().unwrap();
let end = AbsoluteTime::now();
let diff = end - start;
println!("timer was set s: '{}' e:'{}', diff = '{}' res: '{}'", start, end, diff, res);
assert_eq!(diff.get_sec(), 1);
assert_eq!(res, TimerReadRes::Ok(1));
let res = timer.read().unwrap();
let end2 = AbsoluteTime::now();
let diff = end2 - end;
println!("timer was set s: '{}' e:'{}', diff = '{}' res: '{}'", end, end2, diff, res);
assert_eq!(diff.get_sec(), 0);
assert!(diff.get_nsec() >= 499_930_000 && diff.get_nsec() <= 500_400_000);
assert_eq!(res, TimerReadRes::Ok(1));
let res = timer.read().unwrap();
let end3 = AbsoluteTime::now();
let diff = end3 - end2;
println!("timer was set s: '{}' e:'{}', diff = '{}' res: '{}'", end2, end3, diff, res);
assert_eq!(diff.get_sec(), 0);
assert!(diff.get_nsec() >= 499_930_000 && diff.get_nsec() <= 500_400_000);
assert_eq!(res, TimerReadRes::Ok(1));
timer.unset_time().unwrap();
timer.set_nonblocking(true).unwrap();
std::thread::sleep(Duration::from_millis(1000));
let res = timer.read().unwrap();
assert_eq!(res, TimerReadRes::WouldBlock);
}
}