#![allow(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss
)]
#![allow(clippy::comparison_chain)]
use std::io;
use std::mem;
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::ptr;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::time::Instant;
const TIMER_IDENT: usize = 1;
#[cfg(any(target_os = "macos", target_os = "ios"))]
const TIMER_FFLAGS_EXTRA: u32 = libc::NOTE_CRITICAL;
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
const TIMER_FFLAGS_EXTRA: u32 = 0;
pub(super) struct Timer {
fd: AsyncFd<OwnedFd>,
}
impl Timer {
pub(super) fn new() -> Self {
let raw = unsafe { libc::kqueue() };
assert!(raw >= 0, "kqueue() failed: {}", io::Error::last_os_error());
let owned = unsafe { OwnedFd::from_raw_fd(raw) };
set_cloexec(owned.as_raw_fd());
let fd = AsyncFd::with_interest(owned, Interest::READABLE)
.expect("registering kqueue fd with the tokio reactor (is a runtime active?)");
Self { fd }
}
pub(super) fn arm(&mut self, deadline: Instant) {
let delta = deadline.saturating_duration_since(Instant::now());
let (data, fflags) = duration_to_timer(delta);
let mut kev: libc::kevent = unsafe { mem::zeroed() };
kev.ident = TIMER_IDENT as _;
kev.filter = libc::EVFILT_TIMER;
kev.flags = libc::EV_ADD | libc::EV_ONESHOT;
kev.fflags = fflags | TIMER_FFLAGS_EXTRA;
kev.data = data as _;
let rc = unsafe {
libc::kevent(
self.fd.as_raw_fd(),
&kev,
1,
ptr::null_mut(),
0,
ptr::null(),
)
};
assert!(
rc >= 0,
"kevent(arm) failed: {}",
io::Error::last_os_error()
);
}
pub(super) fn disarm(&mut self) {
let mut kev: libc::kevent = unsafe { mem::zeroed() };
kev.ident = TIMER_IDENT as _;
kev.filter = libc::EVFILT_TIMER;
kev.flags = libc::EV_DELETE;
unsafe {
libc::kevent(
self.fd.as_raw_fd(),
&kev,
1,
ptr::null_mut(),
0,
ptr::null(),
);
}
self.drain();
}
fn drain(&self) {
let zero = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
let mut buf: [libc::kevent; 4] = unsafe { mem::zeroed() };
loop {
let rc = unsafe {
libc::kevent(
self.fd.as_raw_fd(),
ptr::null(),
0,
buf.as_mut_ptr(),
buf.len() as libc::c_int,
&zero,
)
};
if rc < 0 {
if io::Error::last_os_error().kind() == io::ErrorKind::Interrupted {
continue;
}
break;
}
if (rc as usize) < buf.len() {
break;
}
}
}
pub(super) fn poll_expired(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
let mut guard = match self.fd.poll_read_ready(cx) {
Poll::Ready(Ok(g)) => g,
Poll::Ready(Err(e)) => panic!("kqueue poll_read_ready failed: {e}"),
Poll::Pending => return Poll::Pending,
};
let zero = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
let mut buf: [libc::kevent; 4] = unsafe { mem::zeroed() };
let result = guard.try_io(|inner| {
let rc = unsafe {
libc::kevent(
inner.as_raw_fd(),
ptr::null(),
0,
buf.as_mut_ptr(),
buf.len() as libc::c_int,
&zero,
)
};
if rc < 0 {
Err(io::Error::last_os_error())
} else if rc == 0 {
Err(io::Error::from(io::ErrorKind::WouldBlock))
} else {
Ok(())
}
});
match result {
Ok(Ok(())) => return Poll::Ready(()),
Ok(Err(e)) if e.kind() == io::ErrorKind::Interrupted => {}
Ok(Err(e)) => panic!("kevent(poll) failed: {e}"),
Err(_would_block) => {}
}
}
}
}
fn set_cloexec(fd: libc::c_int) {
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFD);
if flags >= 0 {
libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
}
}
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
))]
fn duration_to_timer(d: Duration) -> (i64, u32) {
let ns = d.as_nanos().max(1);
(clamp_i64(ns), libc::NOTE_NSECONDS)
}
#[cfg(not(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
)))]
fn duration_to_timer(d: Duration) -> (i64, u32) {
let ms = d.as_nanos().div_ceil(1_000_000).max(1);
(clamp_i64(ms), 0)
}
fn clamp_i64(value: u128) -> i64 {
i64::try_from(value).unwrap_or(i64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn clamp_i64_within_range() {
assert_eq!(clamp_i64(0), 0);
assert_eq!(clamp_i64(1_000_000), 1_000_000);
assert_eq!(clamp_i64(i64::MAX as u128), i64::MAX);
}
#[test]
fn clamp_i64_saturates() {
assert_eq!(clamp_i64(i64::MAX as u128 + 1), i64::MAX);
assert_eq!(clamp_i64(u128::MAX), i64::MAX);
}
#[test]
fn duration_to_timer_zero_is_one_unit() {
let (data, _) = duration_to_timer(Duration::ZERO);
assert_eq!(data, 1);
}
#[test]
fn duration_to_timer_uses_expected_unit() {
let (data, fflags) = duration_to_timer(Duration::from_micros(2_500));
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
))]
{
assert_eq!(fflags, libc::NOTE_NSECONDS);
assert_eq!(data, 2_500_000);
}
#[cfg(not(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
)))]
{
assert_eq!(fflags, 0);
assert_eq!(data, 3);
}
}
#[test]
fn duration_to_timer_rounds_ms_up() {
let (data, _) = duration_to_timer(Duration::from_nanos(1));
assert_eq!(data, 1);
}
}