1use std::future::Future;
6use std::mem;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use futures::future::FutureExt;
11use futures::{pin_mut, select_biased};
12
13use crate::{send, upcast, Actor, ActorResult, Addr, AddrLike, WeakAddr};
14
15pub trait SupportsTimers {
17 type Delay: Future<Output = ()> + Send + 'static;
19
20 fn delay(&self, deadline: Instant) -> Self::Delay;
23}
24
25#[async_trait]
42pub trait Tick: Actor {
43 async fn tick(&mut self) -> ActorResult<()>;
45}
46
47#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
49pub enum TimerState {
50 Inactive,
52 Timeout {
55 deadline: Instant,
57 },
58 Interval {
61 deadline: Instant,
63 interval: Duration,
65 },
66}
67
68impl TimerState {
69 pub fn deadline(&self) -> Option<Instant> {
72 match *self {
73 TimerState::Inactive => None,
74 TimerState::Timeout { deadline } => Some(deadline),
75 TimerState::Interval { deadline, .. } => Some(deadline),
76 }
77 }
78 pub fn interval(&self) -> Option<Duration> {
81 match *self {
82 TimerState::Inactive | TimerState::Timeout { .. } => None,
83 TimerState::Interval { interval, .. } => Some(interval),
84 }
85 }
86}
87
88impl Default for TimerState {
89 fn default() -> Self {
90 Self::Inactive
91 }
92}
93
94#[derive(Debug)]
95enum InternalTimerState {
96 Inactive,
97 Timeout {
98 deadline: Instant,
99 },
100 IntervalWeak {
101 addr: WeakAddr<dyn Tick>,
102 deadline: Instant,
103 interval: Duration,
104 },
105 IntervalStrong {
106 addr: Addr<dyn Tick>,
107 deadline: Instant,
108 interval: Duration,
109 },
110}
111
112impl Default for InternalTimerState {
113 fn default() -> Self {
114 Self::Inactive
115 }
116}
117
118impl InternalTimerState {
119 fn public_state(&self) -> TimerState {
120 match *self {
121 InternalTimerState::Inactive => TimerState::Inactive,
122 InternalTimerState::Timeout { deadline } => TimerState::Timeout { deadline },
123 InternalTimerState::IntervalWeak {
124 deadline, interval, ..
125 }
126 | InternalTimerState::IntervalStrong {
127 deadline, interval, ..
128 } => TimerState::Interval { deadline, interval },
129 }
130 }
131}
132
133#[derive(Debug, Default)]
135pub struct Timer<R> {
136 runtime: R,
137 state: InternalTimerState,
138}
139
140impl<R: SupportsTimers> Timer<R> {
141 pub fn new(runtime: R) -> Self {
143 Self {
144 runtime,
145 state: InternalTimerState::Inactive,
146 }
147 }
148 pub fn state(&self) -> TimerState {
150 self.state.public_state()
151 }
152 pub fn is_active(&self) -> bool {
154 self.state() != TimerState::Inactive
155 }
156 pub fn clear(&mut self) {
158 self.state = InternalTimerState::Inactive;
159 }
160 pub fn tick(&mut self) -> bool {
162 match mem::replace(&mut self.state, InternalTimerState::Inactive) {
163 InternalTimerState::Inactive => false,
164 InternalTimerState::Timeout { deadline } => {
165 if deadline <= Instant::now() {
166 true
167 } else {
168 self.state = InternalTimerState::Timeout { deadline };
169 false
170 }
171 }
172 InternalTimerState::IntervalWeak {
173 deadline,
174 interval,
175 addr,
176 } => {
177 if deadline <= Instant::now() {
178 self.set_interval_at_weak_internal(addr, deadline + interval, interval);
179 true
180 } else {
181 self.state = InternalTimerState::IntervalWeak {
182 deadline,
183 interval,
184 addr,
185 };
186 false
187 }
188 }
189 InternalTimerState::IntervalStrong {
190 deadline,
191 interval,
192 addr,
193 } => {
194 if deadline <= Instant::now() {
195 self.set_interval_at_strong_internal(addr, deadline + interval, interval);
196 true
197 } else {
198 self.state = InternalTimerState::IntervalStrong {
199 deadline,
200 interval,
201 addr,
202 };
203 false
204 }
205 }
206 }
207 }
208 fn set_interval_at_weak_internal(
209 &mut self,
210 addr: WeakAddr<dyn Tick>,
211 start: Instant,
212 interval: Duration,
213 ) {
214 let addr2 = addr.clone();
215 let delay = self.runtime.delay(start);
216 addr.send_fut(async move {
217 delay.await;
218 send!(addr2.tick());
219 });
220
221 self.state = InternalTimerState::IntervalWeak {
222 deadline: start,
223 interval,
224 addr,
225 };
226 }
227 fn set_interval_at_strong_internal(
228 &mut self,
229 addr: Addr<dyn Tick>,
230 start: Instant,
231 interval: Duration,
232 ) {
233 let addr2 = addr.clone();
234 let delay = self.runtime.delay(start);
235 addr.send_fut(async move {
236 delay.await;
237 send!(addr2.tick());
238 });
239
240 self.state = InternalTimerState::IntervalStrong {
241 deadline: start,
242 interval,
243 addr,
244 };
245 }
246 fn set_timeout_internal<T: Tick + ?Sized>(
247 &mut self,
248 addr: impl AddrLike<Actor = T>,
249 deadline: Instant,
250 ) {
251 let addr2 = addr.clone();
252 let delay = self.runtime.delay(deadline);
253 addr.send_fut(async move {
254 delay.await;
255 send!(addr2.tick());
256 });
257
258 self.state = InternalTimerState::Timeout { deadline };
259 }
260 fn run_with_timeout_internal<
261 T: Tick + ?Sized,
262 A: AddrLike<Actor = T>,
263 F: Future<Output = ()> + Send + 'static,
264 >(
265 &mut self,
266 addr: A,
267 deadline: Instant,
268 f: impl FnOnce(A) -> F + Send + 'static,
269 ) {
270 let addr2 = addr.clone();
271 let delay = self.runtime.delay(deadline).fuse();
272
273 addr.send_fut(async move {
274 pin_mut!(delay);
275 if select_biased! {
276 _ = f(addr2.clone()).fuse() => true,
277 _ = delay => false,
278 } {
279 delay.await;
281 }
282 send!(addr2.tick());
283 });
284
285 self.state = InternalTimerState::Timeout { deadline };
286 }
287
288 pub fn set_interval_at_weak<T: Tick>(
291 &mut self,
292 addr: WeakAddr<T>,
293 start: Instant,
294 interval: Duration,
295 ) {
296 self.set_interval_at_weak_internal(upcast!(addr), start, interval);
297 }
298 pub fn set_interval_at_strong<T: Tick>(
301 &mut self,
302 addr: Addr<T>,
303 start: Instant,
304 interval: Duration,
305 ) {
306 self.set_interval_at_strong_internal(upcast!(addr), start, interval);
307 }
308 pub fn set_interval_weak<T: Tick>(&mut self, addr: WeakAddr<T>, interval: Duration) {
311 self.set_interval_at_weak_internal(upcast!(addr), Instant::now(), interval);
312 }
313 pub fn set_interval_strong<T: Tick>(&mut self, addr: Addr<T>, interval: Duration) {
316 self.set_interval_at_strong_internal(upcast!(addr), Instant::now(), interval);
317 }
318 pub fn set_timeout_weak<T: Tick>(&mut self, addr: WeakAddr<T>, deadline: Instant) {
321 self.set_timeout_internal(addr, deadline);
322 }
323 pub fn set_timeout_strong<T: Tick>(&mut self, addr: Addr<T>, deadline: Instant) {
326 self.set_timeout_internal(addr, deadline);
327 }
328 pub fn set_timeout_for_weak<T: Tick>(&mut self, addr: WeakAddr<T>, duration: Duration) {
331 self.set_timeout_internal(addr, Instant::now() + duration);
332 }
333 pub fn set_timeout_for_strong<T: Tick>(&mut self, addr: Addr<T>, duration: Duration) {
336 self.set_timeout_internal(addr, Instant::now() + duration);
337 }
338 pub fn run_with_timeout_weak<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
343 &mut self,
344 addr: WeakAddr<T>,
345 deadline: Instant,
346 f: impl FnOnce(WeakAddr<T>) -> F + Send + 'static,
347 ) {
348 self.run_with_timeout_internal(addr, deadline, f);
349 }
350 pub fn run_with_timeout_strong<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
355 &mut self,
356 addr: Addr<T>,
357 deadline: Instant,
358 f: impl FnOnce(Addr<T>) -> F + Send + 'static,
359 ) {
360 self.run_with_timeout_internal(addr, deadline, f);
361 }
362 pub fn run_with_timeout_for_weak<T: Tick + ?Sized, F: Future<Output = ()> + Send + 'static>(
367 &mut self,
368 addr: WeakAddr<T>,
369 duration: Duration,
370 f: impl FnOnce(WeakAddr<T>) -> F + Send + 'static,
371 ) {
372 self.run_with_timeout_internal(addr, Instant::now() + duration, f);
373 }
374 pub fn run_with_timeout_for_strong<
379 T: Tick + ?Sized,
380 F: Future<Output = ()> + Send + 'static,
381 >(
382 &mut self,
383 addr: Addr<T>,
384 duration: Duration,
385 f: impl FnOnce(Addr<T>) -> F + Send + 'static,
386 ) {
387 self.run_with_timeout_internal(addr, Instant::now() + duration, f);
388 }
389}