coerce_rt/actor/scheduler/
timer.rs1use crate::actor::message::{Handler, Message};
2use crate::actor::{Actor, ActorRef};
3use log::trace;
4
5use std::time::{Duration, Instant};
6use uuid::Uuid;
7
8pub trait TimerTick: Message {}
9
10impl<T: TimerTick> Message for T
11where
12 T: 'static + Sync + Send,
13{
14 type Result = ();
15}
16
17pub struct Timer {
18 stop: tokio::sync::oneshot::Sender<bool>,
19}
20
21impl Timer {
22 pub fn start<A: Actor, T: TimerTick>(actor: ActorRef<A>, tick: Duration, msg: T) -> Timer
23 where
24 A: 'static + Handler<T> + Sync + Send,
25 T: 'static + Clone + Sync + Send,
26 T::Result: 'static + Sync + Send,
27 {
28 let (stop, stop_rx) = tokio::sync::oneshot::channel();
29 tokio::spawn(timer_loop(tick, msg, actor, stop_rx));
30
31 Timer { stop }
32 }
33
34 pub fn stop(self) -> bool {
35 if self.stop.send(true).is_ok() {
36 true
37 } else {
38 false
39 }
40 }
41}
42
43pub async fn timer_loop<A: Actor, T: TimerTick>(
44 tick: Duration,
45 msg: T,
46 mut actor: ActorRef<A>,
47 mut stop_rx: tokio::sync::oneshot::Receiver<bool>,
48) where
49 A: 'static + Handler<T> + Sync + Send,
50 T: 'static + Clone + Sync + Send,
51 T::Result: 'static + Sync + Send,
52{
53 let mut interval = tokio::time::interval_at(tokio::time::Instant::now(), tick);
54 let timer_id = Uuid::new_v4();
55
56 interval.tick().await;
57
58 trace!(target: "Timer", "{} - timer starting", &timer_id);
59
60 loop {
61 if stop_rx.try_recv().is_ok() {
62 break;
63 }
64
65 trace!(target: "Timer", "{} - timer tick", &timer_id);
66
67 let now = Instant::now();
68
69 if actor.send(msg.clone()).await.is_err() {
70 break;
71 }
72
73 trace!(target: "Timer", "{} - tick res received in {}ms", &timer_id, now.elapsed().as_millis());
74 interval.tick().await;
75 }
76
77 trace!(target: "Timer", "{} - timer finished", timer_id);
78}