use super::*;
use crate::*;
use hierarchical_hash_wheel_timer::Timer as LowlevelTimer;
use std::{
collections::HashMap,
fmt,
rc::Rc,
sync::{Arc, Weak},
time::Duration,
};
use uuid::Uuid;
pub trait TimerRefFactory {
fn timer_ref(&self) -> TimerRef;
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ScheduledTimer(Uuid);
impl ScheduledTimer {
pub fn from_uuid(id: Uuid) -> ScheduledTimer {
ScheduledTimer(id)
}
}
pub trait Timer<C: ComponentDefinition> {
fn schedule_once<F>(&mut self, timeout: Duration, action: F) -> ScheduledTimer
where
F: FnOnce(&mut C, ScheduledTimer) -> Handled + Send + 'static;
fn schedule_periodic<F>(
&mut self,
delay: Duration,
period: Duration,
action: F,
) -> ScheduledTimer
where
F: Fn(&mut C, ScheduledTimer) -> Handled + Send + 'static;
fn cancel_timer(&mut self, handle: ScheduledTimer);
}
pub trait CanCancelTimers {
fn cancel_timer(&self, handle: ScheduledTimer);
}
impl<F> CanCancelTimers for F
where
F: TimerRefFactory,
{
fn cancel_timer(&self, handle: ScheduledTimer) {
self.timer_ref().cancel(&handle.0);
}
}
#[derive(Clone, Debug)]
pub(crate) struct Timeout(pub(crate) Uuid);
pub(crate) enum ExecuteAction<C: ComponentDefinition> {
None,
Periodic(Uuid, Rc<dyn Fn(&mut C, Uuid) -> Handled>),
Once(Uuid, Box<dyn FnOnce(&mut C, Uuid) -> Handled>),
}
pub(crate) struct TimerManager<C: ComponentDefinition> {
timer: timer::TimerRef,
timer_queue: Arc<ConcurrentQueue<Timeout>>,
handles: HashMap<Uuid, TimerHandle<C>>,
}
impl<C: ComponentDefinition> TimerManager<C> {
pub(crate) fn new(timer: timer::TimerRef) -> TimerManager<C> {
TimerManager {
timer,
timer_queue: Arc::new(ConcurrentQueue::new()),
handles: HashMap::new(),
}
}
fn new_ref(&self, component: Weak<dyn CoreContainer>) -> TimerActorRef {
TimerActorRef::new(component, Arc::downgrade(&self.timer_queue))
}
pub(crate) fn try_action(&mut self) -> ExecuteAction<C> {
if let Some(timeout) = self.timer_queue.pop() {
let res = self.handles.remove(&timeout.0);
match res {
Some(TimerHandle::OneShot { action, .. }) => ExecuteAction::Once(timeout.0, action),
Some(TimerHandle::Periodic { action, .. }) => {
let action2 = action.clone();
self.handles.insert(
timeout.0,
TimerHandle::Periodic {
_id: timeout.0,
action,
},
);
ExecuteAction::Periodic(timeout.0, action2)
}
_ => ExecuteAction::None,
}
} else {
ExecuteAction::None
}
}
pub(crate) fn schedule_once<F>(
&mut self,
component: Weak<dyn CoreContainer>,
timeout: Duration,
action: F,
) -> ScheduledTimer
where
F: FnOnce(&mut C, ScheduledTimer) -> Handled + Send + 'static,
{
let id = Uuid::new_v4();
let handle = TimerHandle::OneShot {
_id: id,
action: Box::new(move |new_self, id| action(new_self, ScheduledTimer::from_uuid(id))),
};
self.handles.insert(id, handle);
let tar = self.new_ref(component);
let state = ActorRefState::new(id, tar);
self.timer.schedule_once(timeout, state);
ScheduledTimer::from_uuid(id)
}
pub(crate) fn schedule_periodic<F>(
&mut self,
component: Weak<dyn CoreContainer>,
delay: Duration,
period: Duration,
action: F,
) -> ScheduledTimer
where
F: Fn(&mut C, ScheduledTimer) -> Handled + Send + 'static,
{
let id = Uuid::new_v4();
let handle = TimerHandle::Periodic {
_id: id,
action: Rc::new(move |new_self, id| action(new_self, ScheduledTimer::from_uuid(id))),
};
self.handles.insert(id, handle);
let tar = self.new_ref(component);
let state = ActorRefState::new(id, tar);
self.timer.schedule_periodic(delay, period, state);
ScheduledTimer::from_uuid(id)
}
pub(crate) fn cancel_timer(&mut self, handle: ScheduledTimer) {
self.timer.cancel(&handle.0);
self.handles.remove(&handle.0);
}
}
impl<C: ComponentDefinition> Drop for TimerManager<C> {
fn drop(&mut self) {
for (id, _) in self.handles.drain() {
self.timer.cancel(&id);
}
}
}
pub(crate) enum TimerHandle<C: ComponentDefinition> {
OneShot {
_id: Uuid, action: Box<dyn FnOnce(&mut C, Uuid) -> Handled + Send + 'static>,
},
Periodic {
_id: Uuid, action: Rc<dyn Fn(&mut C, Uuid) -> Handled + Send + 'static>,
},
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<C: ComponentDefinition> Send for TimerHandle<C> {}
#[derive(Clone)]
pub(crate) struct TimerActorRef {
component: Weak<dyn CoreContainer>,
msg_queue: Weak<ConcurrentQueue<Timeout>>,
}
impl TimerActorRef {
fn new(
component: Weak<dyn CoreContainer>,
msg_queue: Weak<ConcurrentQueue<Timeout>>,
) -> TimerActorRef {
TimerActorRef {
component,
msg_queue,
}
}
pub(crate) fn enqueue(&self, timeout: Timeout) -> Result<(), QueueingError> {
match (self.msg_queue.upgrade(), self.component.upgrade()) {
(Some(q), Some(c)) => {
let res = c.core().increment_work();
q.push(timeout);
if let SchedulingDecision::Schedule = res {
let system = c.core().system();
system.schedule(c.clone());
}
Ok(())
}
(q, c) => {
eprintln!("Dropping timeout as target (queue? {:?}, component? {:?}) is unavailable: {:?}",
q.is_some(),
c.is_some(),
timeout
);
Err(QueueingError)
}
}
}
}
impl fmt::Debug for TimerActorRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "<timer-actor-ref>")
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct QueueingError;