1use std::{
8 num::NonZeroU64,
9 sync::atomic::{AtomicU64, Ordering},
10};
11
12use futures_intrusive::timer::{Clock, Timer, TimerService};
13
14pub use futures_intrusive::timer::TimerFuture;
15use instant::Duration;
16
17use crate::executor::executor_handle;
18
19#[derive(Debug)]
20pub(crate) struct ExecutorTimer {
21 service: TimerService,
22 next_expiration: AtomicU64,
23}
24
25impl ExecutorTimer {
26 pub fn new() -> Self {
27 struct InstantClock;
28
29 impl Clock for InstantClock {
30 fn now(&self) -> u64 {
31 instant::now() as u64
32 }
33 }
34
35 Self {
36 service: TimerService::new(&InstantClock),
37 next_expiration: AtomicU64::new(0),
38 }
39 }
40
41 pub fn update_next(&self) -> UpdateState {
42 let next = self.next_expiration.load(Ordering::Acquire);
43 if next == 0 {
44 return UpdateState::None;
45 }
46
47 let now = instant::now() as u64;
48
49 if next <= now {
50 self.service.check_expirations();
51 self.next_expiration.store(
52 self.service.next_expiration().unwrap_or(0),
53 Ordering::Release,
54 );
55
56 UpdateState::Triggered
57 } else {
58 UpdateState::WaitTimeout(NonZeroU64::new(next - now).unwrap())
59 }
60 }
61
62 pub fn delay(&self, delay: Duration) -> TimerFuture {
63 self.deadline(instant::now() as u64 + delay.as_millis() as u64)
64 }
65
66 pub fn deadline(&self, timestamp: u64) -> TimerFuture {
67 let future = self.service.deadline(timestamp);
68
69 let _ = self
70 .next_expiration
71 .fetch_update(Ordering::Release, Ordering::Acquire, |next| {
72 if next == 0 || next > timestamp {
73 Some(timestamp)
74 } else {
75 None
76 }
77 });
78
79 future
80 }
81}
82
83#[derive(Debug, Clone, Copy)]
84pub(crate) enum UpdateState {
85 None,
86 Triggered,
87 WaitTimeout(NonZeroU64),
88}
89
90pub fn wait(delay: Duration) -> TimerFuture<'static> {
92 executor_handle().wait(delay)
93}
94
95pub fn wait_deadline(timestamp: u64) -> TimerFuture<'static> {
97 executor_handle().wait_deadline(timestamp)
98}