use std::future::Future;
use std::mem;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::future::FutureExt;
use futures::{pin_mut, select_biased};
use crate::{send, upcast, Actor, ActorResult, Addr, AddrLike, WeakAddr};
pub trait SupportsTimers {
type Delay: Future<Output = ()> + Send + 'static;
fn delay(&self, deadline: Instant) -> Self::Delay;
}
#[async_trait]
pub trait Tick: Actor {
async fn tick(&mut self) -> ActorResult<()>;
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TimerState {
Inactive,
Timeout {
deadline: Instant,
},
Interval {
deadline: Instant,
interval: Duration,
},
}
impl TimerState {
pub fn deadline(&self) -> Option<Instant> {
match *self {
TimerState::Inactive => None,
TimerState::Timeout { deadline } => Some(deadline),
TimerState::Interval { deadline, .. } => Some(deadline),
}
}
pub fn interval(&self) -> Option<Duration> {
match *self {
TimerState::Inactive | TimerState::Timeout { .. } => None,
TimerState::Interval { interval, .. } => Some(interval),
}
}
}
impl Default for TimerState {
fn default() -> Self {
Self::Inactive
}
}
#[derive(Debug)]
enum InternalTimerState {
Inactive,
Timeout {
deadline: Instant,
},
IntervalWeak {
addr: WeakAddr<dyn Tick>,
deadline: Instant,
interval: Duration,
},
IntervalStrong {
addr: Addr<dyn Tick>,
deadline: Instant,
interval: Duration,
},
}
impl Default for InternalTimerState {
fn default() -> Self {
Self::Inactive
}
}
impl InternalTimerState {
fn public_state(&self) -> TimerState {
match *self {
InternalTimerState::Inactive => TimerState::Inactive,
InternalTimerState::Timeout { deadline } => TimerState::Timeout { deadline },
InternalTimerState::IntervalWeak {
deadline, interval, ..
}
| InternalTimerState::IntervalStrong {
deadline, interval, ..
} => TimerState::Interval { deadline, interval },
}
}
}
#[derive(Debug, Default)]
pub struct Timer<R> {
runtime: R,
state: InternalTimerState,
}
impl<R: SupportsTimers> Timer<R> {
pub fn new(runtime: R) -> Self {
Self {
runtime,
state: InternalTimerState::Inactive,
}
}
pub fn state(&self) -> TimerState {
self.state.public_state()
}
pub fn is_active(&self) -> bool {
self.state() != TimerState::Inactive
}
pub fn clear(&mut self) {
self.state = InternalTimerState::Inactive;
}
pub fn tick(&mut self) -> bool {
match mem::replace(&mut self.state, InternalTimerState::Inactive) {
InternalTimerState::Inactive => false,
InternalTimerState::Timeout { deadline } => {
if deadline <= Instant::now() {
true
} else {
self.state = InternalTimerState::Timeout { deadline };
false
}
}
InternalTimerState::IntervalWeak {
deadline,
interval,
addr,
} => {
if deadline <= Instant::now() {
self.set_interval_at_weak_internal(addr, deadline + interval, interval);
true
} else {
self.state = InternalTimerState::IntervalWeak {
deadline,
interval,
addr,
};
false
}
}
InternalTimerState::IntervalStrong {
deadline,
interval,
addr,
} => {
if deadline <= Instant::now() {
self.set_interval_at_strong_internal(addr, deadline + interval, interval);
true
} else {
self.state = InternalTimerState::IntervalStrong {
deadline,
interval,
addr,
};
false
}
}
}
}
fn set_interval_at_weak_internal(
&mut self,
addr: WeakAddr<dyn Tick>,
start: Instant,
interval: Duration,
) {
let addr2 = addr.clone();
let delay = self.runtime.delay(start);
addr.send_fut(async move {
delay.await;
send!(addr2.tick());
});
self.state = InternalTimerState::IntervalWeak {
deadline: start,
interval,
addr,
};
}
fn set_interval_at_strong_internal(
&mut self,
addr: Addr<dyn Tick>,
start: Instant,
interval: Duration,
) {
let addr2 = addr.clone();
let delay = self.runtime.delay(start);
addr.send_fut(async move {
delay.await;
send!(addr2.tick());
});
self.state = InternalTimerState::IntervalStrong {
deadline: start,
interval,
addr,
};
}
fn set_timeout_internal<T: Tick + ?Sized>(
&mut self,
addr: impl AddrLike<Actor = T>,
deadline: Instant,
) {
let addr2 = addr.clone();
let delay = self.runtime.delay(deadline);
addr.send_fut(async move {
delay.await;
send!(addr2.tick());
});
self.state = InternalTimerState::Timeout { deadline };
}
fn run_with_timeout_internal<
T: Tick + ?Sized,
A: AddrLike<Actor = T>,
F: Future<Output = ()> + Send + 'static,
>(
&mut self,
addr: A,
deadline: Instant,
f: impl FnOnce(A) -> F + Send + 'static,
) {
let addr2 = addr.clone();
let delay = self.runtime.delay(deadline).fuse();
addr.send_fut(async move {
pin_mut!(delay);
if select_biased! {
_ = f(addr2.clone()).fuse() => true,
_ = delay => false,
} {
delay.await;
}
send!(addr2.tick());
});
self.state = InternalTimerState::Timeout { deadline };
}
pub fn set_interval_at_weak<T: Tick>(
&mut self,
addr: WeakAddr<T>,
start: Instant,
interval: Duration,
) {
self.set_interval_at_weak_internal(upcast!(addr), start, interval);
}
pub fn set_interval_at_strong<T: Tick>(
&mut self,
addr: Addr<T>,
start: Instant,
interval: Duration,
) {
self.set_interval_at_strong_internal(upcast!(addr), start, interval);
}
pub fn set_interval_weak<T: Tick>(&mut self, addr: WeakAddr<T>, interval: Duration) {
self.set_interval_at_weak_internal(upcast!(addr), Instant::now(), interval);
}
pub fn set_interval_strong<T: Tick>(&mut self, addr: Addr<T>, interval: Duration) {
self.set_interval_at_strong_internal(upcast!(addr), Instant::now(), interval);
}
pub fn set_timeout_weak<T: Tick>(&mut self, addr: WeakAddr<T>, deadline: Instant) {
self.set_timeout_internal(addr, deadline);
}
pub fn set_timeout_strong<T: Tick>(&mut self, addr: Addr<T>, deadline: Instant) {
self.set_timeout_internal(addr, deadline);
}
pub fn set_timeout_for_weak<T: Tick>(&mut self, addr: WeakAddr<T>, duration: Duration) {
self.set_timeout_internal(addr, Instant::now() + duration);
}
pub fn set_timeout_for_strong<T: Tick>(&mut self, addr: Addr<T>, duration: Duration) {
self.set_timeout_internal(addr, Instant::now() + duration);
}
pub fn run_with_timeout_weak<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
&mut self,
addr: WeakAddr<T>,
deadline: Instant,
f: impl FnOnce(WeakAddr<T>) -> F + Send + 'static,
) {
self.run_with_timeout_internal(addr, deadline, f);
}
pub fn run_with_timeout_strong<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
&mut self,
addr: Addr<T>,
deadline: Instant,
f: impl FnOnce(Addr<T>) -> F + Send + 'static,
) {
self.run_with_timeout_internal(addr, deadline, f);
}
pub fn run_with_timeout_for_weak<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
&mut self,
addr: WeakAddr<T>,
duration: Duration,
f: impl FnOnce(WeakAddr<T>) -> F + Send + 'static,
) {
self.run_with_timeout_internal(addr, Instant::now() + duration, f);
}
pub fn run_with_timeout_for_strong<
T: Tick + ?Sized,
F: Future<Output = ()> + Send + 'static,
>(
&mut self,
addr: Addr<T>,
duration: Duration,
f: impl FnOnce(Addr<T>) -> F + Send + 'static,
) {
self.run_with_timeout_internal(addr, Instant::now() + duration, f);
}
}