use crate::{reactor::Reactor, task::JoinHandle, GlommioError, TaskQueueHandle};
use pin_project_lite::pin_project;
use std::{
cell::RefCell,
future::Future,
pin::Pin,
rc::{Rc, Weak},
task::{Context, Poll},
time::{Duration, Instant},
};
type Result<T> = crate::Result<T, ()>;
#[derive(Debug)]
struct Inner {
id: u64,
is_charged: bool,
when: Instant,
reactor: Weak<Reactor>,
}
impl Inner {
fn reset(&mut self, dur: Duration) {
let mut waker = None;
if self.is_charged {
waker = self.reactor.upgrade().unwrap().remove_timer(self.id);
}
self.when = Instant::now() + dur;
if let Some(waker) = waker {
self.reactor
.upgrade()
.unwrap()
.insert_timer(self.id, self.when, waker);
}
}
}
#[derive(Debug)]
pub struct Timer {
inner: Rc<RefCell<Inner>>,
}
impl Timer {
pub fn new(dur: Duration) -> Timer {
let reactor = crate::executor().reactor();
Timer {
inner: Rc::new(RefCell::new(Inner {
id: reactor.register_timer(),
is_charged: false,
when: Instant::now() + dur,
reactor: Rc::downgrade(&reactor),
})),
}
}
fn from_id(id: u64, dur: Duration) -> Timer {
Timer {
inner: Rc::new(RefCell::new(Inner {
id,
is_charged: false,
when: Instant::now() + dur,
reactor: Rc::downgrade(&crate::executor().reactor()),
})),
}
}
pub fn reset(&mut self, dur: Duration) {
let mut inner = self.inner.borrow_mut();
inner.reset(dur);
}
}
impl Drop for Timer {
fn drop(&mut self) {
let inner = self.inner.borrow_mut();
if inner.is_charged {
if let Some(reactor) = inner.reactor.upgrade() {
reactor.remove_timer(inner.id);
}
}
}
}
impl Future for Timer {
type Output = Instant;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = self.inner.borrow_mut();
if Instant::now() >= inner.when {
inner.reactor.upgrade().unwrap().remove_timer(inner.id);
Poll::Ready(inner.when)
} else {
inner
.reactor
.upgrade()
.unwrap()
.insert_timer(inner.id, inner.when, cx.waker().clone());
inner.is_charged = true;
Poll::Pending
}
}
}
#[derive(Debug)]
pub struct TimerActionOnce<T> {
handle: JoinHandle<T>,
inner: Rc<RefCell<Inner>>,
reactor: Weak<Reactor>,
}
#[derive(Debug)]
pub struct TimerActionRepeat {
handle: JoinHandle<()>,
timer_id: u64,
reactor: Weak<Reactor>,
}
impl<T: 'static> TimerActionOnce<T> {
pub fn do_in(when: Duration, action: impl Future<Output = T> + 'static) -> TimerActionOnce<T> {
Self::do_in_into(when, action, crate::executor().current_task_queue()).unwrap()
}
pub fn do_in_into(
when: Duration,
action: impl Future<Output = T> + 'static,
tq: TaskQueueHandle,
) -> Result<TimerActionOnce<T>> {
let reactor = crate::executor().reactor();
let timer_id = reactor.register_timer();
let timer = Timer::from_id(timer_id, when);
let inner = timer.inner.clone();
let task = crate::spawn_local_into(
async move {
timer.await;
action.await
},
tq,
)?;
Ok(TimerActionOnce {
handle: task.detach(),
inner,
reactor: Rc::downgrade(&reactor),
})
}
pub fn do_at(when: Instant, action: impl Future<Output = T> + 'static) -> TimerActionOnce<T> {
Self::do_at_into(when, action, crate::executor().current_task_queue()).unwrap()
}
pub fn do_at_into(
when: Instant,
action: impl Future<Output = T> + 'static,
tq: TaskQueueHandle,
) -> Result<TimerActionOnce<T>> {
let now = Instant::now();
let dur = {
if when > now {
when.duration_since(now)
} else {
Duration::from_micros(0)
}
};
Self::do_in_into(dur, action, tq)
}
pub async fn cancel(self) {
self.destroy();
self.join().await;
}
pub fn destroy(&self) {
self.reactor
.upgrade()
.unwrap()
.remove_timer(self.inner.borrow().id);
self.handle.cancel();
}
pub async fn join(self) -> Option<T> {
self.handle.await
}
pub fn rearm_in(&self, dur: Duration) {
let mut inner = self.inner.borrow_mut();
inner.reset(dur);
}
pub fn rearm_at(&self, when: Instant) {
let now = Instant::now();
let dur = {
if when > now {
when.duration_since(now)
} else {
Duration::from_micros(0)
}
};
self.rearm_in(dur);
}
}
impl TimerActionRepeat {
pub fn repeat_into<G, F>(action_gen: G, tq: TaskQueueHandle) -> Result<TimerActionRepeat>
where
G: Fn() -> F + 'static,
F: Future<Output = Option<Duration>> + 'static,
{
let reactor = crate::executor().reactor();
let timer_id = reactor.register_timer();
let task = crate::spawn_local_into(
async move {
while let Some(period) = action_gen().await {
Timer::from_id(timer_id, period).await;
}
},
tq,
)?;
Ok(TimerActionRepeat {
handle: task.detach(),
timer_id,
reactor: Rc::downgrade(&reactor),
})
}
pub fn repeat<G, F>(action_gen: G) -> TimerActionRepeat
where
G: Fn() -> F + 'static,
F: Future<Output = Option<Duration>> + 'static,
{
Self::repeat_into(action_gen, crate::executor().current_task_queue()).unwrap()
}
pub async fn cancel(self) {
self.destroy();
self.join().await;
}
pub fn destroy(&self) {
self.reactor.upgrade().unwrap().remove_timer(self.timer_id);
self.handle.cancel();
}
pub async fn join(self) -> Option<()> {
self.handle.await.map(|_| ())
}
}
pin_project! {
#[derive(Debug)]
pub(super) struct Timeout<F, T>
where
F: Future<Output = Result<T>>,
{
#[pin]
pub(super) future: F,
#[pin]
pub(super) timeout: Timer,
pub(super) dur: Duration,
}
}
impl<F, T> Timeout<F, T>
where
F: Future<Output = Result<T>>,
{
pub(super) fn new(future: F, dur: Duration) -> Self {
Self {
dur,
future,
timeout: Timer::new(dur),
}
}
}
impl<F, T> Future for Timeout<F, T>
where
F: Future<Output = Result<T>>,
{
type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Pending => {}
other => return other,
}
if this.timeout.poll(cx).is_ready() {
let err = Err(GlommioError::TimedOut(*this.dur));
Poll::Ready(err)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::LocalExecutorBuilder;
use std::{
cell::{Cell, RefCell},
rc::Rc,
};
#[test]
fn timeout_does_not_expire() {
test_executor!(async move {
let now = Instant::now();
let res = Timeout::new(
async move {
Timer::new(Duration::from_millis(1)).await;
Ok(5)
},
Duration::from_millis(50),
)
.await
.unwrap();
let elapsed = now.elapsed();
assert_eq!(res, 5);
assert!(elapsed.as_millis() >= 1);
assert!(elapsed.as_millis() < 50, "{}", elapsed.as_millis());
});
}
#[test]
fn timeout_expires() {
test_executor!(async move {
let now = Instant::now();
let dur = Duration::from_millis(10);
let err = Timeout::new(
async move {
Timer::new(Duration::from_millis(100)).await;
Ok(5)
},
dur,
)
.await
.unwrap_err();
assert!(now.elapsed().as_millis() >= 10);
assert!(now.elapsed().as_millis() < 100);
assert_eq!(format!("{}", err), "Operation timed out after 10ms");
match err {
GlommioError::TimedOut(d) => assert_eq!(d, dur),
_ => unreachable!(),
}
});
}
#[test]
fn timeout_expiration_cancels_future() {
test_executor!(async move {
struct Foo {
val: Rc<Cell<usize>>,
}
impl Drop for Foo {
fn drop(&mut self) {
self.val.set(10);
}
}
let tracker = Rc::new(Cell::new(0));
let f = Foo {
val: tracker.clone(),
};
let dur = Duration::from_millis(10);
let _err = Timeout::new(
async move {
Timer::new(Duration::from_millis(100)).await;
f.val.set(2);
Ok(5)
},
dur,
)
.await
.unwrap_err();
assert_eq!(tracker.get(), 10);
});
}
#[test]
fn basic_timer_works() {
test_executor!(async move {
let now = Instant::now();
Timer::new(Duration::from_millis(100)).await;
assert!(now.elapsed().as_millis() >= 100)
});
}
#[test]
fn basic_timer_action_instant_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let when = Instant::now()
.checked_add(Duration::from_millis(50))
.unwrap();
let _ = TimerActionOnce::do_at(when, async move {
*(exec1.borrow_mut()) = 1;
});
Timer::new(Duration::from_millis(100)).await;
assert_eq!(*(exec2.borrow()), 1);
});
}
#[test]
fn basic_timer_action_instant_past_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let when = Instant::now()
.checked_sub(Duration::from_millis(50))
.unwrap();
let _ = TimerActionOnce::do_at(when, async move {
*(exec1.borrow_mut()) = 1;
});
crate::executor().yield_task_queue_now().await;
assert_eq!(*(exec2.borrow()), 1);
});
}
#[test]
fn basic_timer_action_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let _ = TimerActionOnce::do_in(Duration::from_millis(50), async move {
*(exec1.borrow_mut()) = 1;
});
Timer::new(Duration::from_millis(100)).await;
assert_eq!(*(exec2.borrow()), 1);
});
}
#[test]
fn basic_timer_rearm_pending_timer_for_the_past_ok() {
test_executor!(async move {
let now = Instant::now();
let action: TimerActionOnce<usize> =
TimerActionOnce::do_in(Duration::from_millis(50), async move {
Timer::new(Duration::from_millis(50)).await;
1
});
Timer::new(Duration::from_millis(60)).await;
action.rearm_at(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
let ret = action.join().await;
assert_eq!(ret.unwrap(), 1);
assert!(now.elapsed().as_millis() >= 100);
});
}
#[test]
fn basic_timer_rearm_executed_action_ok() {
test_executor!(async move {
let action: TimerActionOnce<usize> =
TimerActionOnce::do_in(Duration::from_millis(1), async move { 1 });
Timer::new(Duration::from_millis(10)).await;
action.rearm_at(
Instant::now()
.checked_add(Duration::from_secs(100))
.unwrap(),
);
let now = Instant::now();
let ret = action.join().await;
assert_eq!(ret.unwrap(), 1);
assert!(now.elapsed().as_millis() <= 10);
});
}
#[test]
fn basic_timer_rearm_future_timer_ok() {
test_executor!(async move {
let now = Instant::now();
let action: TimerActionOnce<usize> =
TimerActionOnce::do_in(Duration::from_millis(10), async move { 1 });
action.rearm_in(Duration::from_millis(100));
let ret = action.join().await;
assert_eq!(ret.unwrap(), 1);
assert!(now.elapsed().as_millis() >= 100);
});
}
#[test]
fn basic_timer_action_return_ok() {
test_executor!(async move {
let now = Instant::now();
let action: TimerActionOnce<usize> =
TimerActionOnce::do_in(Duration::from_millis(50), async move { 1 });
let ret = action.join().await;
assert_eq!(ret.unwrap(), 1);
assert!(now.elapsed().as_millis() >= 50);
});
}
#[test]
fn basic_timer_action_join_reflects_cancel() {
test_executor!(async move {
let now = Instant::now();
let action: TimerActionOnce<usize> =
TimerActionOnce::do_in(Duration::from_millis(50), async move { 1 });
action.destroy();
let ret = action.join().await;
assert!(ret.is_none());
assert!(now.elapsed().as_millis() < 50);
});
}
#[test]
fn basic_timer_action_cancel_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let action = TimerActionOnce::do_in(Duration::from_millis(50), async move {
*(exec1.borrow_mut()) = 1;
});
crate::executor().yield_task_queue_now().await;
action.cancel().await;
Timer::new(Duration::from_millis(100)).await;
assert_eq!(*(exec2.borrow()), 0);
});
}
#[test]
fn basic_timer_action_destroy_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let action = TimerActionOnce::do_in(Duration::from_millis(50), async move {
*(exec1.borrow_mut()) = 1;
});
action.destroy();
Timer::new(Duration::from_millis(100)).await;
assert_eq!(*(exec2.borrow()), 0);
action.join().await;
});
}
#[test]
fn basic_timer_action_destroy_cancel_initiated_action() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let action = TimerActionOnce::do_in(Duration::from_millis(10), async move {
*(exec1.borrow_mut()) = 1;
for _ in 0..10 {
Timer::new(Duration::from_millis(10)).await;
*(exec1.borrow_mut()) += 1;
}
});
Timer::new(Duration::from_millis(50)).await;
action.destroy();
action.join().await;
assert!(*(exec2.borrow()) > 1);
assert_ne!(*(exec2.borrow()), 11);
});
}
#[test]
fn basic_timer_action_destroy_detached_spawn_survives() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let action = TimerActionOnce::do_in(Duration::from_millis(10), async move {
crate::spawn_local(async move {
*(exec1.borrow_mut()) = 1;
for _ in 0..10 {
Timer::new(Duration::from_millis(10)).await;
*(exec1.borrow_mut()) += 1;
}
})
.detach();
});
Timer::new(Duration::from_millis(50)).await;
action.destroy();
action.join().await;
assert_ne!(*(exec2.borrow()), 11);
Timer::new(Duration::from_millis(100)).await;
assert_eq!(*(exec2.borrow()), 11);
});
}
#[test]
fn basic_timer_action_cancel_fails_if_fired() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let action = TimerActionOnce::do_in(Duration::from_millis(1), async move {
*(exec1.borrow_mut()) = 1;
});
Timer::new(Duration::from_millis(10)).await;
action.cancel().await;
Timer::new(Duration::from_millis(90)).await;
assert_eq!(*(exec2.borrow()), 1);
});
}
#[test]
fn basic_timer_action_repeat_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let repeat = TimerActionRepeat::repeat(move || {
let ex = exec1.clone();
async move {
*(ex.borrow_mut()) += 1;
if (*ex.borrow()) == 10 {
None
} else {
Some(Duration::from_millis(5))
}
}
});
Timer::new(Duration::from_millis(100)).await;
let value = *(exec2.borrow());
assert!(value == 10);
let v = repeat.join().await;
assert!(v.is_some());
});
}
#[test]
fn basic_timer_action_repeat_cancellation_works() {
make_shared_var_mut!(0, exec1, exec2);
test_executor!(async move {
let action = TimerActionRepeat::repeat(move || {
let ex = exec1.clone();
async move {
*(ex.borrow_mut()) += 1;
Some(Duration::from_millis(10))
}
});
Timer::new(Duration::from_millis(50)).await;
action.cancel().await;
let old_value = *(exec2.borrow());
Timer::new(Duration::from_millis(50)).await;
assert_eq!(*(exec2.borrow()), old_value);
});
}
#[test]
fn basic_timer_action_repeat_destruction_works() {
test_executor!(async move {
let action =
TimerActionRepeat::repeat(move || async move { Some(Duration::from_millis(10)) });
action.destroy();
let v = action.join().await;
assert!(v.is_none());
});
}
#[test]
fn test_memory_leak_unfinished_timer() {
let handle = LocalExecutorBuilder::default()
.spawn(|| async move {
let action = TimerActionOnce::do_in(Duration::from_millis(100), async move {
println!("hello");
});
action.rearm_in(Duration::from_millis(100));
})
.unwrap();
handle.join().unwrap();
}
}