crossio_core/
timer.rs

1//! A small, in-memory timer queue used by reactors and runtimes to track
2//! upcoming wakeups. It stays backend-agnostic so every platform can share
3//! the same scheduling logic.
4
5use std::{
6    cmp::Ordering,
7    collections::{BinaryHeap, HashMap},
8    time::{Duration, Instant},
9};
10
11/// Lightweight identifier returned when scheduling a timer.
12#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
13pub struct TimerId(u64);
14
15impl TimerId {
16    fn new(raw: u64) -> Self {
17        TimerId(raw)
18    }
19}
20
21/// Internal representation of a scheduled timer.
22#[derive(Debug, Clone)]
23struct TimerEntry {
24    id: TimerId,
25    deadline: Instant,
26}
27
28/// Wrapper used to turn the `BinaryHeap` into a min-heap on deadline.
29#[derive(Debug)]
30struct HeapItem(TimerEntry);
31
32impl PartialEq for HeapItem {
33    fn eq(&self, other: &Self) -> bool {
34        self.0.deadline.eq(&other.0.deadline) && self.0.id.eq(&other.0.id)
35    }
36}
37
38impl Eq for HeapItem {}
39
40impl PartialOrd for HeapItem {
41    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
42        // Reverse ordering to make the smallest deadline bubble to the top.
43        Some(other.0.deadline.cmp(&self.0.deadline))
44    }
45}
46
47impl Ord for HeapItem {
48    fn cmp(&self, other: &Self) -> Ordering {
49        // Reverse ordering to make the smallest deadline bubble to the top.
50        other.0.deadline.cmp(&self.0.deadline)
51    }
52}
53
54/// Small timer queue backed by a binary heap.
55///
56/// The queue knows nothing about sockets or backends – it simply tracks
57/// deadlines. Reactors can use it to pick reasonable poll timeouts and decide
58/// which tasks to wake up next.
59#[derive(Debug, Default)]
60pub struct TimerQueue {
61    next_id: u64,
62    heap: BinaryHeap<HeapItem>,
63    // Tracks which timers are still active so cancellations can be applied
64    // lazily without mutating the heap in-place.
65    active: HashMap<TimerId, Instant>,
66}
67
68impl TimerQueue {
69    /// Creates an empty timer queue.
70    pub fn new() -> Self {
71        Self::default()
72    }
73
74    /// Returns true when no timers are scheduled.
75    pub fn is_empty(&self) -> bool {
76        self.active.is_empty()
77    }
78
79    /// Returns the number of active timers.
80    pub fn len(&self) -> usize {
81        self.active.len()
82    }
83
84    /// Returns true if the given timer id is still scheduled.
85    pub fn contains(&self, id: TimerId) -> bool {
86        self.active.contains_key(&id)
87    }
88
89    /// Schedules a timer to fire after the provided duration.
90    pub fn schedule_after(&mut self, delay: Duration, now: Instant) -> TimerId {
91        self.schedule_deadline(now + delay)
92    }
93
94    /// Schedules a timer for the given absolute deadline.
95    pub fn schedule_deadline(&mut self, deadline: Instant) -> TimerId {
96        let id = TimerId::new(self.next_id);
97        self.next_id = self.next_id.wrapping_add(1);
98
99        let entry = TimerEntry { id, deadline };
100        self.active.insert(id, deadline);
101        self.heap.push(HeapItem(entry));
102
103        id
104    }
105
106    /// Cancels a previously scheduled timer. Returns true if the timer was
107    /// active at the time of the call.
108    pub fn cancel(&mut self, id: TimerId) -> bool {
109        self.active.remove(&id).is_some()
110    }
111
112    /// Returns the deadline of the next timer to fire, if any.
113    pub fn next_deadline(&mut self) -> Option<Instant> {
114        self.peek_active().map(|entry| entry.deadline)
115    }
116
117    /// Drains all timers whose deadlines are <= `now`, returning their ids in
118    /// chronological order.
119    pub fn poll_expired(&mut self, now: Instant) -> Vec<TimerId> {
120        let mut expired = Vec::new();
121
122        while let Some(entry) = self.peek_active() {
123            if entry.deadline > now {
124                break;
125            }
126
127            let id = entry.id;
128            // Remove from active set so subsequent calls do not surface it
129            // again.
130            self.active.remove(&id);
131            // Remove the corresponding heap entry. At this point we know the
132            // front of the heap matches `entry` because `peek_active` only
133            // returns active timers and does not modify the heap in that
134            // branch.
135            let _ = self.heap.pop();
136            expired.push(id);
137        }
138
139        expired
140    }
141
142    fn peek_active(&mut self) -> Option<TimerEntry> {
143        loop {
144            let head = match self.heap.peek() {
145                Some(item) => item.0.clone(),
146                None => return None,
147            };
148
149            if self.active.contains_key(&head.id) {
150                return Some(head);
151            }
152
153            // Timer was cancelled; remove it from the heap and keep searching.
154            let _ = self.heap.pop();
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::TimerQueue;
162    use std::time::{Duration, Instant};
163
164    #[test]
165    fn schedule_and_expire_in_order() {
166        let start = Instant::now();
167        let mut timers = TimerQueue::new();
168
169        let t1 = timers.schedule_after(Duration::from_millis(10), start);
170        let t2 = timers.schedule_after(Duration::from_millis(20), start);
171
172        assert_eq!(timers.len(), 2);
173
174        let expired = timers.poll_expired(start + Duration::from_millis(15));
175        assert_eq!(expired, vec![t1]);
176
177        let expired = timers.poll_expired(start + Duration::from_millis(25));
178        assert_eq!(expired, vec![t2]);
179        assert!(timers.is_empty());
180    }
181
182    #[test]
183    fn cancellation_prevents_expiry() {
184        let start = Instant::now();
185        let mut timers = TimerQueue::new();
186
187        let t1 = timers.schedule_after(Duration::from_millis(10), start);
188        let _t2 = timers.schedule_after(Duration::from_millis(20), start);
189
190        assert!(timers.cancel(t1));
191
192        let expired = timers.poll_expired(start + Duration::from_millis(25));
193        assert_eq!(expired.len(), 1);
194        assert_ne!(expired[0], t1);
195    }
196
197    #[test]
198    fn next_deadline_reports_earliest() {
199        let start = Instant::now();
200        let mut timers = TimerQueue::new();
201
202        let d1 = start + Duration::from_millis(30);
203        let d2 = start + Duration::from_millis(10);
204
205        timers.schedule_deadline(d1);
206        timers.schedule_deadline(d2);
207
208        assert_eq!(timers.next_deadline(), Some(d2));
209    }
210}