ora_timer/
lib.rs

1//! Low-level timer implementations.
2
3use std::{cmp::Reverse, collections::BinaryHeap, thread, time::Duration};
4
5use minstant::Instant;
6use resolution::{MillisecondResolution, Resolution};
7
8extern crate alloc;
9
10pub mod resolution;
11pub mod wheel;
12
13/// A delayed item with a delay duration
14/// and an arbitrary payload.
15pub struct Delayed<T>(T, Duration);
16
17impl<T> Delayed<T> {
18    /// Create a new delayed item.
19    pub fn new(item: T, delay: Duration) -> Self {
20        Self(item, delay)
21    }
22}
23
24impl<T> PartialEq for Delayed<T> {
25    fn eq(&self, other: &Self) -> bool {
26        self.1 == other.1
27    }
28}
29
30impl<T> Eq for Delayed<T> {}
31
32impl<T> PartialOrd for Delayed<T> {
33    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34        Some(self.1.cmp(&other.1))
35    }
36}
37
38impl<T> Ord for Delayed<T> {
39    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40        self.1.cmp(&other.1)
41    }
42}
43
44/// Options for the timer loop.
45#[derive(Debug, Clone, Copy)]
46pub struct TimerOptions {
47    /// The threshold to the next tick at which the timer loop
48    /// will sleep to reduce CPU usage.
49    ///
50    /// Set to `Duration::ZERO` to always keep
51    /// the timer in a busy loop achieving
52    /// the lowest possible latency at the
53    /// cost of high CPU usage.
54    pub sleep_threshold: Duration,
55    /// The rate at which bookkeeping is run
56    /// while the timer loop is idle or waiting
57    /// for the next tick.
58    pub bookkeeping_interval: Duration,
59}
60
61impl Default for TimerOptions {
62    fn default() -> Self {
63        Self {
64            sleep_threshold: Duration::from_millis(20),
65            bookkeeping_interval: Duration::from_millis(500),
66        }
67    }
68}
69
70/// Run a timer loop with a hierarchical timing wheel
71/// algorithm and the given resolution.
72///
73/// The precision of the timer loop is constrained by the
74/// resolution used (among other factors) but generally
75/// provides predictable latency even for large
76/// numbers of scheduled jobs.
77#[allow(clippy::cast_possible_truncation)]
78pub fn run_hierarchical_timer<T, R: Resolution>(
79    options: TimerOptions,
80    mut callback: impl FnMut(&mut Vec<Delayed<T>>, &mut Vec<T>) -> TimerLoopAction,
81) {
82    let mut wheel = wheel::TimingWheel::<T, R>::new();
83    let TimerOptions {
84        sleep_threshold,
85        bookkeeping_interval,
86    } = options;
87
88    macro_rules! run_callback {
89        ($wheel:tt, $new_jobs:tt, $ready_jobs:tt) => {{
90            let action = callback(&mut $new_jobs, &mut $ready_jobs);
91            $ready_jobs.clear();
92            let got_new_jobs = !$new_jobs.is_empty();
93            if got_new_jobs {
94                for new in $new_jobs.drain(..) {
95                    if let Some(t) = $wheel.insert(new.0, new.1) {
96                        $ready_jobs.push(t);
97                    }
98                }
99            }
100
101            match action {
102                TimerLoopAction::Continue => {}
103                TimerLoopAction::Stop => {
104                    return;
105                }
106                TimerLoopAction::StopWhenIdle => {
107                    if $wheel.is_empty() {
108                        return;
109                    }
110                }
111            }
112
113            got_new_jobs
114        }};
115    }
116
117    let mut last_tick = Instant::now();
118    let mut new_jobs = Vec::<Delayed<T>>::new();
119    let mut ready_jobs = Vec::new();
120    loop {
121        run_callback!(wheel, new_jobs, ready_jobs);
122
123        let now = Instant::now();
124        let elapsed = now - last_tick;
125        let elapsed_steps = R::whole_steps(&elapsed);
126
127        if elapsed_steps == 0 {
128            continue;
129        }
130
131        let mut can_skip_steps = wheel.can_skip();
132        can_skip_steps = can_skip_steps.min(elapsed_steps as u32);
133
134        if can_skip_steps > 0 {
135            wheel.skip(can_skip_steps);
136        }
137
138        let tick_steps = elapsed_steps - u64::from(can_skip_steps);
139
140        for _ in 0..tick_steps {
141            wheel.tick_with(&mut ready_jobs);
142        }
143        wheel.gc(0xF_FFFF);
144
145        last_tick = now;
146
147        if wheel.is_empty() {
148            thread::sleep(bookkeeping_interval);
149            continue;
150        }
151
152        let can_skip_steps = wheel.can_skip();
153        let sleep_delay = MillisecondResolution::steps_as_duration(u64::from(can_skip_steps));
154
155        // We continue the timer loop earlier than the next tick
156        // to reduce the chance of skipping a tick.
157        let mut wait_duration = sleep_delay / 2;
158
159        loop {
160            let got_new_jobs = run_callback!(wheel, new_jobs, ready_jobs);
161            if got_new_jobs {
162                continue;
163            }
164
165            if sleep_threshold == Duration::ZERO
166                || wait_duration == Duration::ZERO
167                || bookkeeping_interval == Duration::ZERO
168                || wait_duration < sleep_threshold
169            {
170                break;
171            }
172
173            let poll_duration = wait_duration.min(bookkeeping_interval);
174
175            thread::sleep(poll_duration);
176            wait_duration -= poll_duration;
177        }
178    }
179}
180
181/// Run a timer loop backed by a binary heap structure.
182///
183/// The precision of the timer loop is not constrained
184/// by the algorithm itself but by the resolution of the
185/// time source.
186///
187/// This algorithm also has lower bookkeeping overhead
188/// when it is not contended and offers a very low minimum latency
189/// however performance degrades as the number of scheduled jobs increases.
190pub fn run_binary_heap_timer<T>(
191    options: TimerOptions,
192    mut callback: impl FnMut(&mut Vec<Delayed<T>>, &mut Vec<T>) -> TimerLoopAction,
193) {
194    let mut heap: BinaryHeap<Reverse<Delayed<T>>> = BinaryHeap::new();
195    let TimerOptions {
196        sleep_threshold,
197        bookkeeping_interval,
198    } = options;
199
200    macro_rules! run_callback {
201        ($heap:tt, $new_jobs:tt, $ready_jobs:tt, $elapsed:tt) => {{
202            let action = callback(&mut $new_jobs, &mut $ready_jobs);
203            $ready_jobs.clear();
204            let got_new_jobs = !$new_jobs.is_empty();
205            if got_new_jobs {
206                for mut new in $new_jobs.drain(..) {
207                    if new.1 == Duration::ZERO {
208                        $ready_jobs.push(new.0);
209                        continue;
210                    }
211
212                    new.1 += $elapsed;
213                    $heap.push(Reverse(new));
214                }
215            }
216
217            match action {
218                TimerLoopAction::Continue => {}
219                TimerLoopAction::Stop => {
220                    return;
221                }
222                TimerLoopAction::StopWhenIdle => {
223                    if $heap.is_empty() {
224                        return;
225                    }
226                }
227            }
228
229            got_new_jobs
230        }};
231    }
232
233    let start = Instant::now();
234    let mut elapsed = Duration::ZERO;
235    let mut new_jobs = Vec::<Delayed<T>>::new();
236    let mut ready_jobs = Vec::new();
237
238    loop {
239        run_callback!(heap, new_jobs, ready_jobs, elapsed);
240
241        let now = Instant::now();
242        elapsed = now - start;
243
244        while let Some(Reverse(job)) = heap.peek() {
245            if job.1 <= elapsed {
246                let Reverse(job) = unsafe { heap.pop().unwrap_unchecked() };
247                ready_jobs.push(job.0);
248            } else {
249                break;
250            }
251        }
252
253        if heap.is_empty() {
254            thread::sleep(bookkeeping_interval);
255            continue;
256        }
257
258        let sleep_delay = heap
259            .peek()
260            .map(|Reverse(job)| job.1 - elapsed)
261            .unwrap_or_default();
262
263        // We continue the timer loop earlier than the next tick
264        // to reduce the chance of skipping a tick.
265        let mut wait_duration = sleep_delay / 2;
266
267        loop {
268            let got_new_jobs = run_callback!(heap, new_jobs, ready_jobs, elapsed);
269            if got_new_jobs {
270                continue;
271            }
272
273            if wait_duration == Duration::ZERO
274                || bookkeeping_interval == Duration::ZERO
275                || wait_duration <= sleep_threshold
276            {
277                break;
278            }
279
280            let poll_duration = wait_duration.min(bookkeeping_interval);
281
282            thread::sleep(poll_duration);
283            wait_duration -= poll_duration;
284        }
285    }
286}
287
288/// A desired action to be taken by the timer loop.
289#[must_use]
290pub enum TimerLoopAction {
291    /// Continue the timer loop.
292    Continue,
293    /// Stop the timer loop immediately.
294    Stop,
295    /// Stop the timer loop when it becomes idle.
296    ///
297    /// Note that this does not prevent new jobs from being added,
298    /// and the timer loop will continue to run until it becomes idle.
299    StopWhenIdle,
300}