use std::{
future::Future,
io::Result,
marker::PhantomPinned,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
pin::Pin,
ptr::null_mut,
task::{Context, Poll},
time::{Duration, SystemTime},
};
use io_uring::{opcode, types};
use libc::{CLOCK_MONOTONIC, TFD_NONBLOCK};
use crate::reactor::{Reactor, ReactorIo};
pub struct Timer {
expiration: SystemTime,
io: ReactorIo,
buf: [u8; std::mem::size_of::<u64>()],
fd: OwnedFd,
_phantom: PhantomPinned,
}
impl Timer {
pub fn sleep(d: Duration) -> Result<Self> {
let expiration = SystemTime::now() + d;
let timer = unsafe { libc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK) };
if timer == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(Self {
expiration,
io: Reactor::new_io(),
buf: [0; std::mem::size_of::<u64>()],
fd: unsafe { OwnedFd::from_raw_fd(timer) },
_phantom: PhantomPinned,
})
}
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if SystemTime::now() > self.expiration {
return Poll::Ready(());
}
let this = unsafe { self.get_unchecked_mut() };
this.io
.submit_or_get_result(|| {
let expiration = this.expiration.duration_since(SystemTime::now()).unwrap();
let mut tspec = unsafe { std::mem::zeroed::<libc::itimerspec>() };
tspec.it_value.tv_sec = expiration.as_secs() as _;
tspec.it_value.tv_nsec = expiration.subsec_nanos() as _;
let ret = unsafe {
libc::timerfd_settime(this.fd.as_raw_fd(), 0, &tspec as *const _, null_mut())
};
if ret == -1 {
panic!("timerfd_settime returned error");
}
(
opcode::Read::new(
types::Fd(this.fd.as_raw_fd()),
this.buf.as_mut_ptr(),
this.buf.len() as _,
)
.build(),
cx.waker().clone(),
)
})
.map(|_| ())
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use crate::task::Executor;
use super::Timer;
#[test]
fn sleep_simple() {
let before = Instant::now();
Executor::block_on(async {
Timer::sleep(Duration::from_secs(1)).unwrap().await;
});
assert!(Instant::now() - before > Duration::from_millis(900));
}
#[test]
fn sleep_multiple_tasks() {
Executor::block_on(async {
let before = Instant::now();
let t1 = Executor::spawn(async {
Timer::sleep(Duration::from_secs(1)).unwrap().await;
});
let t2 = Executor::spawn(async {
Timer::sleep(Duration::from_secs(1)).unwrap().await;
});
let t3 = Executor::spawn(async {
Timer::sleep(Duration::from_secs(2)).unwrap().await;
});
t1.await;
t2.await;
assert!(Instant::now() - before > Duration::from_millis(900));
assert!(Instant::now() - before < Duration::from_millis(1100));
t3.await;
assert!(Instant::now() - before > Duration::from_millis(1900));
assert!(Instant::now() - before < Duration::from_millis(2100));
});
}
#[test]
fn sleep_subtasks() {
let before = Instant::now();
Executor::block_on(async move {
Timer::sleep(Duration::from_secs(1)).unwrap().await;
assert!(Instant::now() - before > Duration::from_millis(900));
assert!(Instant::now() - before < Duration::from_millis(1100));
let t1 = Executor::spawn(async {
Timer::sleep(Duration::from_secs(1)).unwrap().await;
});
let t2 = Executor::spawn(async {
Timer::sleep(Duration::from_secs(1)).unwrap().await;
});
t1.await;
t2.await;
assert!(Instant::now() - before > Duration::from_millis(1900));
assert!(Instant::now() - before < Duration::from_millis(2100));
});
assert!(Instant::now() - before > Duration::from_millis(1900));
assert!(Instant::now() - before < Duration::from_millis(2100));
}
}