Skip to main content

tokio/runtime/scheduler/util/
time_alt.rs

1use crate::runtime::scheduler::driver;
2use crate::runtime::time_alt::cancellation_queue::{Receiver, Sender};
3use crate::runtime::time_alt::{EntryHandle, RegistrationQueue, WakeQueue, Wheel};
4use std::time::Duration;
5
6pub(crate) fn min_duration(a: Option<Duration>, b: Option<Duration>) -> Option<Duration> {
7    match (a, b) {
8        (Some(dur_a), Some(dur_b)) => Some(std::cmp::min(dur_a, dur_b)),
9        (Some(dur_a), None) => Some(dur_a),
10        (None, Some(dur_b)) => Some(dur_b),
11        (None, None) => None,
12    }
13}
14
15pub(crate) fn process_registration_queue(
16    registration_queue: &mut RegistrationQueue,
17    wheel: &mut Wheel,
18    tx: &Sender,
19    wake_queue: &mut WakeQueue,
20) {
21    while let Some(hdl) = registration_queue.pop_front() {
22        if hdl.deadline() <= wheel.elapsed() {
23            unsafe {
24                wake_queue.push_front(hdl);
25            }
26        } else {
27            // Safety: the entry is not registered yet
28            unsafe {
29                wheel.insert(hdl, tx.clone());
30            }
31        }
32    }
33}
34
35pub(crate) fn insert_inject_timers(
36    wheel: &mut Wheel,
37    tx: &Sender,
38    inject: Vec<EntryHandle>,
39    wake_queue: &mut WakeQueue,
40) {
41    for hdl in inject {
42        if hdl.deadline() <= wheel.elapsed() {
43            unsafe {
44                wake_queue.push_front(hdl);
45            }
46        } else {
47            // Safety: the entry is not registered yet
48            unsafe {
49                wheel.insert(hdl, tx.clone());
50            }
51        }
52    }
53}
54
55pub(crate) fn remove_cancelled_timers(wheel: &mut Wheel, rx: &mut Receiver) {
56    for hdl in rx.recv_all() {
57        debug_assert!(hdl.is_cancelled());
58
59        if hdl.deadline() > wheel.elapsed() {
60            // Safety: the entry is registered in THIS wheel
61            unsafe {
62                wheel.remove(hdl);
63            }
64        }
65    }
66}
67
68pub(crate) fn next_expiration_time(wheel: &Wheel, drv_hdl: &driver::Handle) -> Option<Duration> {
69    drv_hdl.with_time(|maybe_time_hdl| {
70        let Some(time_hdl) = maybe_time_hdl else {
71            // time driver is not enabled, nothing to do.
72            return None;
73        };
74
75        let clock = drv_hdl.clock();
76        let time_source = time_hdl.time_source();
77
78        wheel.next_expiration_time().map(|tick| {
79            let now = time_source.now(clock);
80            time_source.tick_to_duration(tick.saturating_sub(now))
81        })
82    })
83}
84
85#[cfg(feature = "test-util")]
86pub(crate) fn pre_auto_advance(drv_hdl: &driver::Handle, duration: Option<Duration>) -> bool {
87    drv_hdl.with_time(|maybe_time_hdl| {
88        if maybe_time_hdl.is_none() {
89            // time driver is not enabled, nothing to do.
90            return false;
91        }
92
93        if duration.is_some() {
94            let clock = drv_hdl.clock();
95            if clock.can_auto_advance() {
96                return true;
97            }
98
99            false
100        } else {
101            false
102        }
103    })
104}
105
106pub(crate) fn process_expired_timers(
107    wheel: &mut Wheel,
108    drv_hdl: &driver::Handle,
109    wake_queue: &mut WakeQueue,
110) {
111    drv_hdl.with_time(|maybe_time_hdl| {
112        let Some(time_hdl) = maybe_time_hdl else {
113            // time driver is not enabled, nothing to do.
114            return;
115        };
116
117        let clock = drv_hdl.clock();
118        let time_source = time_hdl.time_source();
119
120        let now = time_source.now(clock);
121        time_hdl.process_at_time_alt(wheel, now, wake_queue);
122    });
123}
124
125pub(crate) fn shutdown_local_timers(
126    wheel: &mut Wheel,
127    rx: &mut Receiver,
128    inject: Vec<EntryHandle>,
129    drv_hdl: &driver::Handle,
130) {
131    drv_hdl.with_time(|maybe_time_hdl| {
132        let Some(time_hdl) = maybe_time_hdl else {
133            // time driver is not enabled, nothing to do.
134            return;
135        };
136
137        remove_cancelled_timers(wheel, rx);
138        time_hdl.shutdown_alt(wheel);
139
140        let mut wake_queue = WakeQueue::new();
141        // simply wake all unregistered timers
142        for hdl in inject.into_iter().filter(|hdl| !hdl.is_cancelled()) {
143            unsafe {
144                wake_queue.push_front(hdl);
145            }
146        }
147
148        wake_queue.wake_all();
149    });
150}
151
152#[cfg(feature = "test-util")]
153pub(crate) fn post_auto_advance(drv_hdl: &driver::Handle, duration: Option<Duration>) {
154    drv_hdl.with_time(|maybe_time_hdl| {
155        let Some(time_hdl) = maybe_time_hdl else {
156            // time driver is not enabled, nothing to do.
157            return;
158        };
159
160        if let Some(park_duration) = duration {
161            let clock = drv_hdl.clock();
162            if clock.can_auto_advance() && !time_hdl.did_wake() {
163                if let Err(msg) = clock.advance(park_duration) {
164                    panic!("{msg}");
165                }
166            }
167        }
168    })
169}
170
171#[cfg(not(feature = "test-util"))]
172pub(crate) fn pre_auto_advance(_drv_hdl: &driver::Handle, _duration: Option<Duration>) -> bool {
173    false
174}
175
176#[cfg(not(feature = "test-util"))]
177pub(crate) fn post_auto_advance(_drv_hdl: &driver::Handle, _duration: Option<Duration>) {
178    // No-op in non-test util builds
179}