tokio_muxt/
lib.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{ready, Context, Poll},
5    time::Duration,
6};
7
8use pin_project::pin_project;
9
10use tokio::time::{Instant, Sleep};
11
12/// Timer for a limited set of events that are represented by their ordinals.
13/// It multiplexes over a single tokio [Sleep] instance.
14/// Deadlines for the same event are coalesced, either to the earliest or latest one, depending on the `CoalesceMode`, if it has not yet fired.
15///
16/// Deadlines are stored on a stack-allocated array of size `N`, and the ordinals are used to index into it,
17/// so the maximum supported ordinal will be `N - 1`. The implementation is designed for small `N` (think single digits).
18///
19/// Mapping between ordinals and events is up to the user.
20#[pin_project(project = MuxTimerProj)]
21#[derive(Debug)]
22pub struct MuxTimer<const N: usize> {
23    deadlines: [Option<Instant>; N],
24    #[pin]
25    sleep: Sleep,
26    armed_ordinal: usize,
27}
28
29/// How to handle coalescing deadlines for the same event ordinal.
30pub enum CoalesceMode {
31    /// Retain the earliest deadline for the event.
32    Earliest,
33
34    /// Retain the latest deadline for the event.
35    Latest,
36}
37
38impl<const N: usize> Default for MuxTimer<N> {
39    fn default() -> Self {
40        Self {
41            deadlines: [None; N],
42            sleep: tokio::time::sleep(Duration::ZERO),
43            armed_ordinal: N,
44        }
45    }
46}
47
48impl<const N: usize> MuxTimer<N> {
49    /// Fire timer for event with `ordinal` after `timeout` duration.
50    /// Returns `true` if the timer was armed, `false` if it was already armed for the same event and provided `CoalesceMode`.
51    pub fn fire_after(
52        self: Pin<&mut Self>,
53        ordinal: impl Into<usize>,
54        timeout: Duration,
55        coalesce_mode: CoalesceMode,
56    ) -> bool {
57        self.fire_at(ordinal, Instant::now() + timeout, coalesce_mode)
58    }
59
60    /// Fire timer for event with `ordinal` at `deadline`.
61    /// Returns `true` if the timer was armed, `false` if it was already armed for the same event and provided `CoalesceMode`.
62    pub fn fire_at(
63        self: Pin<&mut Self>,
64        ordinal: impl Into<usize>,
65        deadline: Instant,
66        coalesce_mode: CoalesceMode,
67    ) -> bool {
68        let ordinal = ordinal.into();
69        if self.deadlines[ordinal].is_some_and(|d| match coalesce_mode {
70            CoalesceMode::Earliest => d < deadline,
71            CoalesceMode::Latest => d > deadline,
72        }) {
73            return false;
74        }
75
76        let current_deadline = self.deadline();
77        let mut this = self.project();
78        this.deadlines[ordinal] = Some(deadline);
79
80        match coalesce_mode {
81            CoalesceMode::Earliest => {
82                if current_deadline.map_or(true, |d| deadline < d) {
83                    this.arm(ordinal, deadline)
84                }
85            }
86            CoalesceMode::Latest => {
87                match current_deadline {
88                    None => this.arm(ordinal, deadline),
89                    Some(_) if *this.armed_ordinal == ordinal => {
90                        // The currently armed event is the one we are pushing back, so
91                        // rearm with the new soonest event.
92                        let (next_ordinal, next_deadline) =
93                            this.soonest_event().expect("soonest event");
94                        this.arm(next_ordinal, next_deadline);
95                    }
96                    Some(_) => {
97                        // There's a deadline, but it's not for the current ordinal, so do nothing.
98                    }
99                }
100            }
101        }
102        true
103    }
104
105    /// Cancel an event. Returns `true` if timer had a future event for the cancelled ordinal, `false`
106    /// otherwise.
107    ///
108    /// The timer will become disarmed if the last event is cancelled.
109    pub fn cancel(self: Pin<&mut Self>, ordinal: impl Into<usize>) -> bool {
110        let ordinal = ordinal.into();
111        if self.deadlines[ordinal].is_some() {
112            let mut this = self.project();
113            this.deadlines[ordinal] = None;
114            if *this.armed_ordinal == ordinal {
115                if let Some((next_ordinal, next_deadline)) = this.soonest_event() {
116                    // Rearm with next soonest event.
117                    this.arm(next_ordinal, next_deadline);
118                } else {
119                    // Cancelled the last event. Disarm the timer.
120                    *this.armed_ordinal = N;
121                }
122            };
123            true
124        } else {
125            false
126        }
127    }
128
129    /// Returns whether the timer is armed.
130    pub fn is_armed(&self) -> bool {
131        self.armed_ordinal < N
132    }
133
134    /// Returns the next deadline, if armed.
135    pub fn deadline(&self) -> Option<Instant> {
136        (self.armed_ordinal < N).then(|| self.sleep.deadline())
137    }
138
139    /// Returns all current deadlines, which can be indexed by event ordinals.
140    pub fn deadlines(&self) -> &[Option<Instant>; N] {
141        &self.deadlines
142    }
143}
144
145impl<'pin, const N: usize> MuxTimerProj<'pin, N> {
146    fn arm(&mut self, ordinal: usize, deadline: Instant) {
147        self.sleep.as_mut().reset(deadline);
148        *self.armed_ordinal = ordinal;
149    }
150
151    fn soonest_event(&self) -> Option<(usize, Instant)> {
152        self.deadlines
153            .iter()
154            .enumerate()
155            .filter_map(|(ordinal, slot)| slot.map(|deadline| (ordinal, deadline)))
156            .min_by(|(_, x), (_, y)| x.cmp(y))
157    }
158}
159
160/// Wait for the next event and return its ordinal, along with that event's deadline.
161/// Panics if the timer is not armed.
162impl<const N: usize> Future for MuxTimer<N> {
163    type Output = (usize, Instant);
164
165    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166        assert!(self.armed_ordinal < N);
167        let mut this = self.project();
168        ready!(this.sleep.as_mut().poll(cx));
169        let fired_ordinal = std::mem::replace(this.armed_ordinal, N);
170        let fired_deadline = this.deadlines[fired_ordinal].take().expect("armed");
171        assert_eq!(fired_deadline, this.sleep.deadline());
172        if let Some((ordinal, deadline)) = this.soonest_event() {
173            this.arm(ordinal, deadline);
174        }
175        Poll::Ready((fired_ordinal, fired_deadline))
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::time::Duration;
182
183    use tokio::pin;
184    use tokio::time::Instant;
185
186    use super::{CoalesceMode, MuxTimer};
187
188    const EVENT_A: usize = 0;
189    const EVENT_B: usize = 1;
190    const EVENT_C: usize = 2;
191
192    #[tokio::main(flavor = "current_thread", start_paused = true)]
193    #[test]
194    async fn firing_order() {
195        let timer: MuxTimer<3> = MuxTimer::default();
196        pin!(timer);
197
198        assert_eq!(timer.deadline(), None);
199
200        assert!(timer.as_mut().fire_after(
201            EVENT_C,
202            Duration::from_millis(100),
203            CoalesceMode::Earliest
204        ));
205        assert!(timer.as_mut().fire_after(
206            EVENT_B,
207            Duration::from_millis(50),
208            CoalesceMode::Earliest
209        ));
210        assert!(timer.as_mut().fire_after(
211            EVENT_A,
212            Duration::from_millis(150),
213            CoalesceMode::Earliest
214        ));
215
216        let (event, instant_b) = timer.as_mut().await;
217        assert_eq!(event, EVENT_B);
218
219        let (event, instant_c) = timer.as_mut().await;
220        assert_eq!(instant_c.duration_since(instant_b).as_millis(), 50);
221        assert_eq!(event, EVENT_C);
222
223        let (event, instant_a) = timer.as_mut().await;
224        assert_eq!(instant_a.duration_since(instant_c).as_millis(), 50);
225        assert_eq!(event, EVENT_A);
226
227        assert_eq!(timer.deadline(), None);
228    }
229
230    #[tokio::main(flavor = "current_thread", start_paused = true)]
231    #[test]
232    async fn rearming_earliest() {
233        let timer: MuxTimer<3> = MuxTimer::default();
234        pin!(timer);
235
236        let start = Instant::now();
237        assert!(timer.as_mut().fire_after(
238            EVENT_A,
239            Duration::from_millis(100),
240            CoalesceMode::Earliest
241        ));
242        assert!(!timer.as_mut().fire_after(
243            EVENT_A,
244            Duration::from_millis(200),
245            CoalesceMode::Earliest
246        ));
247        assert!(timer.as_mut().fire_after(
248            EVENT_A,
249            Duration::from_millis(50),
250            CoalesceMode::Earliest
251        ));
252
253        let (event, instant) = timer.as_mut().await;
254        assert_eq!(event, EVENT_A);
255        assert_eq!(instant.duration_since(start), Duration::from_millis(50));
256        assert_eq!(timer.deadline(), None);
257    }
258
259    #[tokio::main(flavor = "current_thread", start_paused = true)]
260    #[test]
261    async fn rearming_latest() {
262        let timer: MuxTimer<3> = MuxTimer::default();
263        pin!(timer);
264
265        let start = Instant::now();
266        assert!(timer.as_mut().fire_after(
267            EVENT_A,
268            Duration::from_millis(100),
269            CoalesceMode::Latest
270        ));
271        assert!(timer.as_mut().fire_after(
272            EVENT_A,
273            Duration::from_millis(200),
274            CoalesceMode::Latest
275        ));
276        assert!(!timer.as_mut().fire_after(
277            EVENT_A,
278            Duration::from_millis(50),
279            CoalesceMode::Latest
280        ));
281
282        let (event, instant) = timer.as_mut().await;
283        assert_eq!(event, EVENT_A);
284        assert_eq!(instant.duration_since(start), Duration::from_millis(200));
285        assert_eq!(timer.deadline(), None);
286    }
287
288    #[tokio::main(flavor = "current_thread", start_paused = true)]
289    #[test]
290    async fn rearming_interleaved() {
291        let timer: MuxTimer<3> = MuxTimer::default();
292        pin!(timer);
293
294        let start = Instant::now();
295        assert!(timer.as_mut().fire_after(
296            EVENT_A,
297            Duration::from_millis(100),
298            CoalesceMode::Latest
299        ));
300        assert!(timer.as_mut().fire_after(
301            EVENT_A,
302            Duration::from_millis(200),
303            CoalesceMode::Latest
304        ));
305        assert!(!timer.as_mut().fire_after(
306            EVENT_A,
307            Duration::from_millis(50),
308            CoalesceMode::Latest
309        ));
310
311        assert!(timer.as_mut().fire_after(
312            EVENT_B,
313            Duration::from_millis(1000),
314            CoalesceMode::Earliest,
315        ));
316        assert!(timer.as_mut().fire_after(
317            EVENT_B,
318            Duration::from_millis(100),
319            CoalesceMode::Earliest,
320        ));
321        assert!(!timer.as_mut().fire_after(
322            EVENT_B,
323            Duration::from_millis(500),
324            CoalesceMode::Earliest,
325        ));
326        assert!(timer.as_mut().fire_after(
327            EVENT_B,
328            Duration::from_millis(150),
329            CoalesceMode::Latest,
330        ));
331
332        let (event, instant) = timer.as_mut().await;
333        assert_eq!(event, EVENT_B);
334        assert_eq!(instant.duration_since(start), Duration::from_millis(150));
335
336        let (event, instant) = timer.as_mut().await;
337        assert_eq!(event, EVENT_A);
338        assert_eq!(instant.duration_since(start), Duration::from_millis(200));
339        assert_eq!(timer.deadline(), None);
340    }
341
342    #[tokio::main(flavor = "current_thread", start_paused = true)]
343    #[test]
344    async fn cancellation() {
345        let timer: MuxTimer<3> = MuxTimer::default();
346        pin!(timer);
347
348        assert!(timer.as_mut().fire_after(
349            EVENT_A,
350            Duration::from_millis(100),
351            CoalesceMode::Latest
352        ));
353
354        assert!(timer
355            .as_mut()
356            .fire_after(EVENT_B, Duration::from_secs(1), CoalesceMode::Latest));
357
358        assert!(timer.as_mut().cancel(EVENT_A));
359        assert!(!timer.as_mut().cancel(EVENT_A));
360
361        let (event, _) = timer.as_mut().await;
362        assert_eq!(event, EVENT_B);
363    }
364}