use std::io;
use std::mem::MaybeUninit;
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::ptr;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::time::Instant;
pub(super) struct Timer {
fd: AsyncFd<OwnedFd>,
epoch_tokio: Instant,
epoch_monotonic: libc::timespec,
}
impl Timer {
pub(super) fn new() -> Self {
let raw = unsafe {
libc::timerfd_create(
libc::CLOCK_MONOTONIC,
libc::TFD_NONBLOCK | libc::TFD_CLOEXEC,
)
};
assert!(
raw >= 0,
"timerfd_create failed: {}",
io::Error::last_os_error()
);
let owned = unsafe { OwnedFd::from_raw_fd(raw) };
let fd = AsyncFd::with_interest(owned, Interest::READABLE)
.expect("registering timerfd with the tokio reactor (is a runtime active?)");
let epoch_monotonic = clock_gettime_monotonic();
let epoch_tokio = Instant::now();
Self {
fd,
epoch_tokio,
epoch_monotonic,
}
}
pub(super) fn arm(&mut self, deadline: Instant) {
let delta = deadline.saturating_duration_since(self.epoch_tokio);
let it_value = add_duration(self.epoch_monotonic, delta);
let new_value = libc::itimerspec {
it_interval: ZERO_TIMESPEC,
it_value,
};
let rc = unsafe {
libc::timerfd_settime(
self.fd.as_raw_fd(),
libc::TFD_TIMER_ABSTIME,
&new_value,
ptr::null_mut(),
)
};
assert!(
rc >= 0,
"timerfd_settime failed: {}",
io::Error::last_os_error()
);
}
pub(super) fn disarm(&mut self) {
let zero = libc::itimerspec {
it_interval: ZERO_TIMESPEC,
it_value: ZERO_TIMESPEC,
};
unsafe {
libc::timerfd_settime(self.fd.as_raw_fd(), 0, &zero, ptr::null_mut());
}
self.drain();
}
fn drain(&self) {
let mut buf = [0u8; 8];
loop {
let rc = unsafe {
libc::read(
self.fd.as_raw_fd(),
buf.as_mut_ptr().cast::<libc::c_void>(),
buf.len(),
)
};
if rc < 0 && io::Error::last_os_error().kind() == io::ErrorKind::Interrupted {
continue;
}
break;
}
}
pub(super) fn poll_expired(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
let mut guard = match self.fd.poll_read_ready(cx) {
Poll::Ready(Ok(g)) => g,
Poll::Ready(Err(e)) => panic!("timerfd poll_read_ready failed: {e}"),
Poll::Pending => return Poll::Pending,
};
let read_result = guard.try_io(|inner| {
let mut buf = [0u8; 8];
let rc = unsafe {
libc::read(
inner.as_raw_fd(),
buf.as_mut_ptr().cast::<libc::c_void>(),
buf.len(),
)
};
if rc == 8 {
Ok(())
} else if rc < 0 {
Err(io::Error::last_os_error())
} else {
Err(io::Error::from(io::ErrorKind::WouldBlock))
}
});
match read_result {
Ok(Ok(())) => return Poll::Ready(()),
Ok(Err(e)) if e.kind() == io::ErrorKind::Interrupted => {}
Ok(Err(e)) => panic!("timerfd read failed: {e}"),
Err(_would_block) => {}
}
}
}
}
const ZERO_TIMESPEC: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
fn clock_gettime_monotonic() -> libc::timespec {
let mut ts = MaybeUninit::<libc::timespec>::uninit();
let rc = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, ts.as_mut_ptr()) };
assert!(
rc >= 0,
"clock_gettime(CLOCK_MONOTONIC) failed: {}",
io::Error::last_os_error()
);
unsafe { ts.assume_init() }
}
#[allow(clippy::similar_names)]
fn add_duration(ts: libc::timespec, d: Duration) -> libc::timespec {
let secs = libc::time_t::try_from(d.as_secs()).unwrap_or(libc::time_t::MAX);
let mut tv_sec = ts.tv_sec.saturating_add(secs);
let mut tv_nsec = ts.tv_nsec + i64::from(d.subsec_nanos());
if tv_nsec >= 1_000_000_000 {
tv_nsec -= 1_000_000_000;
tv_sec = tv_sec.saturating_add(1);
}
libc::timespec { tv_sec, tv_nsec }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn add_duration_no_carry() {
let base = libc::timespec {
tv_sec: 10,
tv_nsec: 100,
};
let out = add_duration(base, Duration::new(2, 50));
assert_eq!(out.tv_sec, 12);
assert_eq!(out.tv_nsec, 150);
}
#[test]
fn add_duration_with_carry() {
let base = libc::timespec {
tv_sec: 10,
tv_nsec: 999_999_900,
};
let out = add_duration(base, Duration::new(0, 200));
assert_eq!(out.tv_sec, 11);
assert_eq!(out.tv_nsec, 100);
}
#[test]
fn add_duration_zero_is_identity() {
let base = libc::timespec {
tv_sec: 42,
tv_nsec: 1234,
};
let out = add_duration(base, Duration::ZERO);
assert_eq!(out.tv_sec, base.tv_sec);
assert_eq!(out.tv_nsec, base.tv_nsec);
}
}