tokio/runtime/scheduler/util/
time_alt.rs1use crate::runtime::scheduler::driver;
2use crate::runtime::time_alt::cancellation_queue::{Receiver, Sender};
3use crate::runtime::time_alt::{EntryHandle, RegistrationQueue, WakeQueue, Wheel};
4use std::time::Duration;
5
6pub(crate) fn min_duration(a: Option<Duration>, b: Option<Duration>) -> Option<Duration> {
7 match (a, b) {
8 (Some(dur_a), Some(dur_b)) => Some(std::cmp::min(dur_a, dur_b)),
9 (Some(dur_a), None) => Some(dur_a),
10 (None, Some(dur_b)) => Some(dur_b),
11 (None, None) => None,
12 }
13}
14
15pub(crate) fn process_registration_queue(
16 registration_queue: &mut RegistrationQueue,
17 wheel: &mut Wheel,
18 tx: &Sender,
19 wake_queue: &mut WakeQueue,
20) {
21 while let Some(hdl) = registration_queue.pop_front() {
22 if hdl.deadline() <= wheel.elapsed() {
23 unsafe {
24 wake_queue.push_front(hdl);
25 }
26 } else {
27 unsafe {
29 wheel.insert(hdl, tx.clone());
30 }
31 }
32 }
33}
34
35pub(crate) fn insert_inject_timers(
36 wheel: &mut Wheel,
37 tx: &Sender,
38 inject: Vec<EntryHandle>,
39 wake_queue: &mut WakeQueue,
40) {
41 for hdl in inject {
42 if hdl.deadline() <= wheel.elapsed() {
43 unsafe {
44 wake_queue.push_front(hdl);
45 }
46 } else {
47 unsafe {
49 wheel.insert(hdl, tx.clone());
50 }
51 }
52 }
53}
54
55pub(crate) fn remove_cancelled_timers(wheel: &mut Wheel, rx: &mut Receiver) {
56 for hdl in rx.recv_all() {
57 debug_assert!(hdl.is_cancelled());
58
59 if hdl.deadline() > wheel.elapsed() {
60 unsafe {
62 wheel.remove(hdl);
63 }
64 }
65 }
66}
67
68pub(crate) fn next_expiration_time(wheel: &Wheel, drv_hdl: &driver::Handle) -> Option<Duration> {
69 drv_hdl.with_time(|maybe_time_hdl| {
70 let Some(time_hdl) = maybe_time_hdl else {
71 return None;
73 };
74
75 let clock = drv_hdl.clock();
76 let time_source = time_hdl.time_source();
77
78 wheel.next_expiration_time().map(|tick| {
79 let now = time_source.now(clock);
80 time_source.tick_to_duration(tick.saturating_sub(now))
81 })
82 })
83}
84
85#[cfg(feature = "test-util")]
86pub(crate) fn pre_auto_advance(drv_hdl: &driver::Handle, duration: Option<Duration>) -> bool {
87 drv_hdl.with_time(|maybe_time_hdl| {
88 if maybe_time_hdl.is_none() {
89 return false;
91 }
92
93 if duration.is_some() {
94 let clock = drv_hdl.clock();
95 if clock.can_auto_advance() {
96 return true;
97 }
98
99 false
100 } else {
101 false
102 }
103 })
104}
105
106pub(crate) fn process_expired_timers(
107 wheel: &mut Wheel,
108 drv_hdl: &driver::Handle,
109 wake_queue: &mut WakeQueue,
110) {
111 drv_hdl.with_time(|maybe_time_hdl| {
112 let Some(time_hdl) = maybe_time_hdl else {
113 return;
115 };
116
117 let clock = drv_hdl.clock();
118 let time_source = time_hdl.time_source();
119
120 let now = time_source.now(clock);
121 time_hdl.process_at_time_alt(wheel, now, wake_queue);
122 });
123}
124
125pub(crate) fn shutdown_local_timers(
126 wheel: &mut Wheel,
127 rx: &mut Receiver,
128 inject: Vec<EntryHandle>,
129 drv_hdl: &driver::Handle,
130) {
131 drv_hdl.with_time(|maybe_time_hdl| {
132 let Some(time_hdl) = maybe_time_hdl else {
133 return;
135 };
136
137 remove_cancelled_timers(wheel, rx);
138 time_hdl.shutdown_alt(wheel);
139
140 let mut wake_queue = WakeQueue::new();
141 for hdl in inject.into_iter().filter(|hdl| !hdl.is_cancelled()) {
143 unsafe {
144 wake_queue.push_front(hdl);
145 }
146 }
147
148 wake_queue.wake_all();
149 });
150}
151
152#[cfg(feature = "test-util")]
153pub(crate) fn post_auto_advance(drv_hdl: &driver::Handle, duration: Option<Duration>) {
154 drv_hdl.with_time(|maybe_time_hdl| {
155 let Some(time_hdl) = maybe_time_hdl else {
156 return;
158 };
159
160 if let Some(park_duration) = duration {
161 let clock = drv_hdl.clock();
162 if clock.can_auto_advance() && !time_hdl.did_wake() {
163 if let Err(msg) = clock.advance(park_duration) {
164 panic!("{msg}");
165 }
166 }
167 }
168 })
169}
170
171#[cfg(not(feature = "test-util"))]
172pub(crate) fn pre_auto_advance(_drv_hdl: &driver::Handle, _duration: Option<Duration>) -> bool {
173 false
174}
175
176#[cfg(not(feature = "test-util"))]
177pub(crate) fn post_auto_advance(_drv_hdl: &driver::Handle, _duration: Option<Duration>) {
178 }