1use std::{
2 future::Future,
3 pin::Pin,
4 task::{ready, Context, Poll},
5 time::Duration,
6};
7
8use pin_project::pin_project;
9
10use tokio::time::{Instant, Sleep};
11
12#[pin_project(project = MuxTimerProj)]
21#[derive(Debug)]
22pub struct MuxTimer<const N: usize> {
23 deadlines: [Option<Instant>; N],
24 #[pin]
25 sleep: Sleep,
26 armed_ordinal: usize,
27}
28
29pub enum CoalesceMode {
31 Earliest,
33
34 Latest,
36}
37
38impl<const N: usize> Default for MuxTimer<N> {
39 fn default() -> Self {
40 Self {
41 deadlines: [None; N],
42 sleep: tokio::time::sleep(Duration::ZERO),
43 armed_ordinal: N,
44 }
45 }
46}
47
48impl<const N: usize> MuxTimer<N> {
49 pub fn fire_after(
52 self: Pin<&mut Self>,
53 ordinal: impl Into<usize>,
54 timeout: Duration,
55 coalesce_mode: CoalesceMode,
56 ) -> bool {
57 self.fire_at(ordinal, Instant::now() + timeout, coalesce_mode)
58 }
59
60 pub fn fire_at(
63 self: Pin<&mut Self>,
64 ordinal: impl Into<usize>,
65 deadline: Instant,
66 coalesce_mode: CoalesceMode,
67 ) -> bool {
68 let ordinal = ordinal.into();
69 if self.deadlines[ordinal].is_some_and(|d| match coalesce_mode {
70 CoalesceMode::Earliest => d < deadline,
71 CoalesceMode::Latest => d > deadline,
72 }) {
73 return false;
74 }
75
76 let current_deadline = self.deadline();
77 let mut this = self.project();
78 this.deadlines[ordinal] = Some(deadline);
79
80 match coalesce_mode {
81 CoalesceMode::Earliest => {
82 if current_deadline.map_or(true, |d| deadline < d) {
83 this.arm(ordinal, deadline)
84 }
85 }
86 CoalesceMode::Latest => {
87 match current_deadline {
88 None => this.arm(ordinal, deadline),
89 Some(_) if *this.armed_ordinal == ordinal => {
90 let (next_ordinal, next_deadline) =
93 this.soonest_event().expect("soonest event");
94 this.arm(next_ordinal, next_deadline);
95 }
96 Some(_) => {
97 }
99 }
100 }
101 }
102 true
103 }
104
105 pub fn cancel(self: Pin<&mut Self>, ordinal: impl Into<usize>) -> bool {
110 let ordinal = ordinal.into();
111 if self.deadlines[ordinal].is_some() {
112 let mut this = self.project();
113 this.deadlines[ordinal] = None;
114 if *this.armed_ordinal == ordinal {
115 if let Some((next_ordinal, next_deadline)) = this.soonest_event() {
116 this.arm(next_ordinal, next_deadline);
118 } else {
119 *this.armed_ordinal = N;
121 }
122 };
123 true
124 } else {
125 false
126 }
127 }
128
129 pub fn is_armed(&self) -> bool {
131 self.armed_ordinal < N
132 }
133
134 pub fn deadline(&self) -> Option<Instant> {
136 (self.armed_ordinal < N).then(|| self.sleep.deadline())
137 }
138
139 pub fn deadlines(&self) -> &[Option<Instant>; N] {
141 &self.deadlines
142 }
143}
144
145impl<'pin, const N: usize> MuxTimerProj<'pin, N> {
146 fn arm(&mut self, ordinal: usize, deadline: Instant) {
147 self.sleep.as_mut().reset(deadline);
148 *self.armed_ordinal = ordinal;
149 }
150
151 fn soonest_event(&self) -> Option<(usize, Instant)> {
152 self.deadlines
153 .iter()
154 .enumerate()
155 .filter_map(|(ordinal, slot)| slot.map(|deadline| (ordinal, deadline)))
156 .min_by(|(_, x), (_, y)| x.cmp(y))
157 }
158}
159
160impl<const N: usize> Future for MuxTimer<N> {
163 type Output = (usize, Instant);
164
165 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166 assert!(self.armed_ordinal < N);
167 let mut this = self.project();
168 ready!(this.sleep.as_mut().poll(cx));
169 let fired_ordinal = std::mem::replace(this.armed_ordinal, N);
170 let fired_deadline = this.deadlines[fired_ordinal].take().expect("armed");
171 assert_eq!(fired_deadline, this.sleep.deadline());
172 if let Some((ordinal, deadline)) = this.soonest_event() {
173 this.arm(ordinal, deadline);
174 }
175 Poll::Ready((fired_ordinal, fired_deadline))
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::time::Duration;
182
183 use tokio::pin;
184 use tokio::time::Instant;
185
186 use super::{CoalesceMode, MuxTimer};
187
188 const EVENT_A: usize = 0;
189 const EVENT_B: usize = 1;
190 const EVENT_C: usize = 2;
191
192 #[tokio::main(flavor = "current_thread", start_paused = true)]
193 #[test]
194 async fn firing_order() {
195 let timer: MuxTimer<3> = MuxTimer::default();
196 pin!(timer);
197
198 assert_eq!(timer.deadline(), None);
199
200 assert!(timer.as_mut().fire_after(
201 EVENT_C,
202 Duration::from_millis(100),
203 CoalesceMode::Earliest
204 ));
205 assert!(timer.as_mut().fire_after(
206 EVENT_B,
207 Duration::from_millis(50),
208 CoalesceMode::Earliest
209 ));
210 assert!(timer.as_mut().fire_after(
211 EVENT_A,
212 Duration::from_millis(150),
213 CoalesceMode::Earliest
214 ));
215
216 let (event, instant_b) = timer.as_mut().await;
217 assert_eq!(event, EVENT_B);
218
219 let (event, instant_c) = timer.as_mut().await;
220 assert_eq!(instant_c.duration_since(instant_b).as_millis(), 50);
221 assert_eq!(event, EVENT_C);
222
223 let (event, instant_a) = timer.as_mut().await;
224 assert_eq!(instant_a.duration_since(instant_c).as_millis(), 50);
225 assert_eq!(event, EVENT_A);
226
227 assert_eq!(timer.deadline(), None);
228 }
229
230 #[tokio::main(flavor = "current_thread", start_paused = true)]
231 #[test]
232 async fn rearming_earliest() {
233 let timer: MuxTimer<3> = MuxTimer::default();
234 pin!(timer);
235
236 let start = Instant::now();
237 assert!(timer.as_mut().fire_after(
238 EVENT_A,
239 Duration::from_millis(100),
240 CoalesceMode::Earliest
241 ));
242 assert!(!timer.as_mut().fire_after(
243 EVENT_A,
244 Duration::from_millis(200),
245 CoalesceMode::Earliest
246 ));
247 assert!(timer.as_mut().fire_after(
248 EVENT_A,
249 Duration::from_millis(50),
250 CoalesceMode::Earliest
251 ));
252
253 let (event, instant) = timer.as_mut().await;
254 assert_eq!(event, EVENT_A);
255 assert_eq!(instant.duration_since(start), Duration::from_millis(50));
256 assert_eq!(timer.deadline(), None);
257 }
258
259 #[tokio::main(flavor = "current_thread", start_paused = true)]
260 #[test]
261 async fn rearming_latest() {
262 let timer: MuxTimer<3> = MuxTimer::default();
263 pin!(timer);
264
265 let start = Instant::now();
266 assert!(timer.as_mut().fire_after(
267 EVENT_A,
268 Duration::from_millis(100),
269 CoalesceMode::Latest
270 ));
271 assert!(timer.as_mut().fire_after(
272 EVENT_A,
273 Duration::from_millis(200),
274 CoalesceMode::Latest
275 ));
276 assert!(!timer.as_mut().fire_after(
277 EVENT_A,
278 Duration::from_millis(50),
279 CoalesceMode::Latest
280 ));
281
282 let (event, instant) = timer.as_mut().await;
283 assert_eq!(event, EVENT_A);
284 assert_eq!(instant.duration_since(start), Duration::from_millis(200));
285 assert_eq!(timer.deadline(), None);
286 }
287
288 #[tokio::main(flavor = "current_thread", start_paused = true)]
289 #[test]
290 async fn rearming_interleaved() {
291 let timer: MuxTimer<3> = MuxTimer::default();
292 pin!(timer);
293
294 let start = Instant::now();
295 assert!(timer.as_mut().fire_after(
296 EVENT_A,
297 Duration::from_millis(100),
298 CoalesceMode::Latest
299 ));
300 assert!(timer.as_mut().fire_after(
301 EVENT_A,
302 Duration::from_millis(200),
303 CoalesceMode::Latest
304 ));
305 assert!(!timer.as_mut().fire_after(
306 EVENT_A,
307 Duration::from_millis(50),
308 CoalesceMode::Latest
309 ));
310
311 assert!(timer.as_mut().fire_after(
312 EVENT_B,
313 Duration::from_millis(1000),
314 CoalesceMode::Earliest,
315 ));
316 assert!(timer.as_mut().fire_after(
317 EVENT_B,
318 Duration::from_millis(100),
319 CoalesceMode::Earliest,
320 ));
321 assert!(!timer.as_mut().fire_after(
322 EVENT_B,
323 Duration::from_millis(500),
324 CoalesceMode::Earliest,
325 ));
326 assert!(timer.as_mut().fire_after(
327 EVENT_B,
328 Duration::from_millis(150),
329 CoalesceMode::Latest,
330 ));
331
332 let (event, instant) = timer.as_mut().await;
333 assert_eq!(event, EVENT_B);
334 assert_eq!(instant.duration_since(start), Duration::from_millis(150));
335
336 let (event, instant) = timer.as_mut().await;
337 assert_eq!(event, EVENT_A);
338 assert_eq!(instant.duration_since(start), Duration::from_millis(200));
339 assert_eq!(timer.deadline(), None);
340 }
341
342 #[tokio::main(flavor = "current_thread", start_paused = true)]
343 #[test]
344 async fn cancellation() {
345 let timer: MuxTimer<3> = MuxTimer::default();
346 pin!(timer);
347
348 assert!(timer.as_mut().fire_after(
349 EVENT_A,
350 Duration::from_millis(100),
351 CoalesceMode::Latest
352 ));
353
354 assert!(timer
355 .as_mut()
356 .fire_after(EVENT_B, Duration::from_secs(1), CoalesceMode::Latest));
357
358 assert!(timer.as_mut().cancel(EVENT_A));
359 assert!(!timer.as_mut().cancel(EVENT_A));
360
361 let (event, _) = timer.as_mut().await;
362 assert_eq!(event, EVENT_B);
363 }
364}