Skip to main content

yash_env/waker/
queue.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2026 WATANABE Yuki
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17//! Items for managing time-based wakers in the virtual system
18
19use super::WakerEntry;
20use std::cell::Cell;
21use std::collections::{BTreeSet, HashMap};
22use std::rc::Weak;
23use std::task::Waker;
24use std::time::Instant;
25
26/// Priority queue of scheduled wakers to wake up processes at specific times
27///
28/// This struct represents a priority queue of scheduled wakers, where each
29/// waker is associated with a specific time at which a process should be woken
30/// up.
31///
32/// The queue is effectively an extension of [`WakerSet`] ordered by wake time,
33/// and is (currently) implemented as a pair of a `BTreeSet` and a `HashMap` to
34/// allow efficient insertion and deduplication of wakers. See the documentation
35/// of [`WakerSet`] for more details on the data structure of wakers and the
36/// rationale behind it.
37///
38/// Like [`WakerSet`], wakers in this queue are compared by their pointer
39/// addresses, and dead wakers in the queue may be automatically removed as a
40/// side effect of other operations.
41///
42/// [`WakerSet`]: super::WakerSet
43#[derive(Clone, Debug, Default)]
44pub struct ScheduledWakerQueue {
45    /// Set of scheduled wakers ordered by wake time
46    wakers_by_time: BTreeSet<(Instant, WakerEntry)>,
47    /// Map from wakers to their scheduled wake times for efficient deduplication
48    waker_to_time: HashMap<WakerEntry, Instant>,
49}
50
51impl ScheduledWakerQueue {
52    /// Creates a new empty `ScheduledWakerQueue`.
53    #[inline(always)]
54    #[must_use]
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    /// Returns the number of scheduled wakers in the queue.
60    ///
61    /// Wakers may become dead over time, so the actual number of valid wakers
62    /// may be less than this count.
63    #[inline(always)]
64    #[must_use]
65    pub fn len(&self) -> usize {
66        self.wakers_by_time.len()
67    }
68
69    /// Checks if the queue is empty.
70    ///
71    /// Wakers may become dead over time, so there may be no valid wakers even
72    /// if this method returns `false`.
73    #[inline(always)]
74    #[must_use]
75    pub fn is_empty(&self) -> bool {
76        self.wakers_by_time.is_empty()
77    }
78
79    /// Clears all scheduled wakers from the queue.
80    pub fn clear(&mut self) {
81        self.wakers_by_time.clear();
82        self.waker_to_time.clear();
83
84        #[cfg(debug_assertions)]
85        self.validate();
86    }
87
88    /// Pushes a new scheduled waker into the queue.
89    ///
90    /// This method adds a new scheduled waker to the priority queue so that the
91    /// associated process can be woken up at the specified time.
92    ///
93    /// Returns `true` if the waker was successfully added to the queue, or
94    /// `false` if it was already present or dead, in which case the passed weak
95    /// reference is dropped.
96    ///
97    /// The amortized time complexity of this method is O(log(n)). If the queue is
98    /// full, it will first clean up dead wakers and possibly reallocate to
99    /// optimize the capacity for future insertions, which will cost O(n log(n))
100    /// time. Because of this cleanup, the number of wakers in the queue may
101    /// decrease after calling this method, regardless of whether the new waker
102    /// was added or not.
103    pub fn push(&mut self, wake_time: Instant, waker: Weak<Cell<Option<Waker>>>) -> bool {
104        let waker_entry = WakerEntry(waker);
105        if !waker_entry.is_alive() {
106            return false;
107        }
108
109        // Do some cleanup before insertion.
110        self.trim_to_next_wake_time();
111        if self.len() == self.waker_to_time.capacity() {
112            // The hash map is full. Before it increases its capacity, we try to
113            // clean up dead wakers from the queue to make room for new entries.
114            self.wakers_by_time.retain(|(_wake_time, waker_entry)| {
115                if waker_entry.is_alive() {
116                    true
117                } else {
118                    self.waker_to_time.remove(waker_entry);
119                    false
120                }
121            });
122
123            // If we have removed substantially many wakers, we can also shrink
124            // the capacity to save memory. This is not strictly necessary, but
125            // it can help prevent the hash map from growing too large if many
126            // wakers are added and removed over time.
127            self.waker_to_time
128                .shrink_to(std::cmp::max(8, self.len() * 2));
129
130            // For amortized O(n log(n)) time complexity, we make sure the next
131            // cleanup will not occur until the number of wakers doubles again.
132            self.waker_to_time.reserve(self.len());
133        }
134
135        // Now we can insert the new waker.
136        let pushed = match self.waker_to_time.get_mut(&waker_entry) {
137            None => {
138                self.waker_to_time.insert(waker_entry.clone(), wake_time);
139                self.wakers_by_time.insert((wake_time, waker_entry))
140            }
141            Some(wake_time_entry) => {
142                if *wake_time_entry <= wake_time {
143                    // The existing entry is earlier than the new one,
144                    // so we ignore the new entry
145                    false
146                } else {
147                    // The new entry is earlier than the existing one,
148                    // so we replace the existing entry with the new one
149                    let old_wake_time = std::mem::replace(wake_time_entry, wake_time);
150                    let waker_entry = self
151                        .wakers_by_time
152                        .take(&(old_wake_time, waker_entry))
153                        .unwrap()
154                        .1;
155                    self.wakers_by_time.insert((wake_time, waker_entry))
156                }
157            }
158        };
159
160        #[cfg(debug_assertions)]
161        self.validate();
162
163        pushed
164    }
165
166    /// Returns the next scheduled wake time, if any.
167    ///
168    /// This method peeks at the priority queue to find the scheduled waker with
169    /// the earliest wake time. If the queue is not empty, it returns the wake
170    /// time of that waker; otherwise, it returns `None`.
171    ///
172    /// If you have a mutable reference to the queue, you can use
173    /// [`trim_to_next_wake_time`](Self::trim_to_next_wake_time) instead of this
174    /// method to remove dead wakers as the queue is traversed to find the next
175    /// wake time.
176    pub fn next_wake_time(&self) -> Option<Instant> {
177        self.wakers_by_time
178            .iter()
179            .find(|&(_, entry)| entry.is_alive())
180            .map(|(wake_time, _)| *wake_time)
181    }
182
183    /// Trims dead wakers to find the next wake time.
184    ///
185    /// This method removes dead wakers from the beginning of the priority queue
186    /// until it finds a live waker or the queue becomes empty. The return value
187    /// is the wake time of the first live waker in the queue after trimming, or
188    /// `None` if the queue is empty.
189    ///
190    /// This method is solely for optimization purposes and does not affect the
191    /// correctness of the queue. Using this method instead of
192    /// [`next_wake_time`](Self::next_wake_time) can help avoid unnecessary
193    /// processing of dead wakers, particularly when `next_wake_time` is
194    /// followed by [`wake`](Self::wake) that will remove dead wakers anyway.
195    ///
196    /// The [`push`](Self::push) method will also call this method to clean up
197    /// dead wakers before inserting a new waker, so you don't need to call this
198    /// method manually in most cases.
199    pub fn trim_to_next_wake_time(&mut self) -> Option<Instant> {
200        let mut next_wake_time = None;
201        while let Some((wake_time, waker_entry)) = self.wakers_by_time.first() {
202            if waker_entry.is_alive() {
203                next_wake_time = Some(*wake_time);
204                break;
205            }
206            self.waker_to_time.remove(waker_entry);
207            self.wakers_by_time.pop_first();
208        }
209
210        #[cfg(debug_assertions)]
211        self.validate();
212
213        next_wake_time
214    }
215
216    /// Wakes up processes whose scheduled wake time has been reached.
217    ///
218    /// This method checks the priority queue for any scheduled wakers whose
219    /// wake time is less than or equal to the current time (`now`). For each
220    /// such waker, it takes the waker from the `Cell` and calls `wake()` on it
221    /// to wake up the associated process. After waking up the process, the item
222    /// is removed from the queue.
223    pub fn wake(&mut self, now: Instant) {
224        while let Some((wake_time, waker_entry)) = self.wakers_by_time.first() {
225            if *wake_time > now {
226                break;
227            }
228            self.waker_to_time.remove(waker_entry);
229            let waker_entry = self.wakers_by_time.pop_first().unwrap().1;
230            if let Some(waker) = waker_entry.0.upgrade().and_then(|cell| cell.take()) {
231                waker.wake();
232            }
233        }
234
235        #[cfg(debug_assertions)]
236        self.validate();
237    }
238
239    /// Validates the internal consistency of the queue.
240    #[cfg(debug_assertions)]
241    fn validate(&self) {
242        assert_eq!(self.wakers_by_time.len(), self.waker_to_time.len());
243        for (wake_time, entry) in &self.wakers_by_time {
244            assert_eq!(*wake_time, self.waker_to_time[entry]);
245        }
246        for (entry, wake_time) in &self.waker_to_time {
247            assert!(self.wakers_by_time.contains(&(*wake_time, entry.clone())));
248        }
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use crate::test_helper::WakeFlag;
256    use std::rc::Rc;
257    use std::sync::Arc;
258    use std::time::Duration;
259
260    fn dummy_waker() -> Rc<Cell<Option<Waker>>> {
261        Rc::new(Cell::new(Some(Waker::noop().clone())))
262    }
263
264    #[test]
265    fn queue_is_initially_empty() {
266        let queue = ScheduledWakerQueue::new();
267        assert!(queue.is_empty());
268        assert_eq!(queue.len(), 0);
269    }
270
271    #[test]
272    fn pushed_wakers_are_stored_in_queue() {
273        let mut queue = ScheduledWakerQueue::new();
274        let waker = dummy_waker();
275
276        let pushed = queue.push(Instant::now(), Rc::downgrade(&waker));
277        assert!(pushed);
278        assert!(!queue.is_empty());
279        assert_eq!(queue.len(), 1);
280        assert_eq!(Rc::strong_count(&waker), 1);
281        assert_eq!(Rc::weak_count(&waker), 2);
282
283        let another_waker = dummy_waker();
284
285        let pushed = queue.push(Instant::now(), Rc::downgrade(&another_waker));
286        assert!(pushed);
287        assert!(!queue.is_empty());
288        assert_eq!(queue.len(), 2);
289        assert_eq!(Rc::strong_count(&another_waker), 1);
290        assert_eq!(Rc::weak_count(&another_waker), 2);
291    }
292
293    #[test]
294    fn queue_is_empty_after_cleared() {
295        let mut queue = ScheduledWakerQueue::new();
296        let waker = dummy_waker();
297        queue.push(Instant::now(), Rc::downgrade(&waker));
298
299        queue.clear();
300        assert!(queue.is_empty());
301        assert_eq!(queue.len(), 0);
302    }
303
304    #[test]
305    fn pushing_existing_waker_with_earlier_wake_time_discards_existing_waker() {
306        let mut queue = ScheduledWakerQueue::new();
307        let now = Instant::now();
308        let waker = dummy_waker();
309        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker));
310
311        let pushed = queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker));
312        assert!(pushed);
313        assert_eq!(queue.len(), 1);
314        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
315    }
316
317    #[test]
318    fn pushing_existing_waker_with_later_wake_time_discards_new_waker() {
319        let mut queue = ScheduledWakerQueue::new();
320        let now = Instant::now();
321        let waker = dummy_waker();
322        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker));
323
324        let pushed = queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker));
325        assert!(!pushed);
326        assert_eq!(queue.len(), 1);
327        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
328    }
329
330    #[test]
331    fn pushing_dead_waker_is_noop() {
332        let mut queue = ScheduledWakerQueue::new();
333        let now = Instant::now();
334
335        let pushed = queue.push(now, Weak::new());
336        assert!(!pushed);
337        assert!(queue.is_empty());
338
339        let waker = dummy_waker();
340        waker.take();
341        let pushed = queue.push(now, Rc::downgrade(&waker));
342        assert!(!pushed);
343        assert!(queue.is_empty());
344    }
345
346    #[test]
347    fn next_wake_time_returns_none_if_empty() {
348        let queue = ScheduledWakerQueue::new();
349        assert_eq!(queue.next_wake_time(), None);
350    }
351
352    #[test]
353    fn next_wake_time_returns_earliest_pending_waker_time() {
354        let mut queue = ScheduledWakerQueue::new();
355        let now = Instant::now();
356        let waker_1 = dummy_waker();
357        let waker_2 = dummy_waker();
358        let waker_3 = dummy_waker();
359
360        assert_eq!(queue.next_wake_time(), None);
361
362        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_1));
363        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(5)));
364
365        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
366        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
367
368        queue.push(now + Duration::from_secs(10), Rc::downgrade(&waker_3));
369        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
370    }
371
372    #[test]
373    fn next_wake_time_ignores_dead_wakers() {
374        let mut queue = ScheduledWakerQueue::new();
375        let now = Instant::now();
376        let waker_1 = dummy_waker();
377        let waker_2 = dummy_waker();
378        let waker_3 = dummy_waker();
379        queue.push(now, Rc::downgrade(&waker_1));
380        queue.push(now, Rc::downgrade(&waker_2));
381        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
382        drop(waker_1);
383        waker_2.take();
384
385        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(5)));
386    }
387
388    #[test]
389    fn trim_to_next_wake_time_removes_leading_dead_wakers() {
390        let mut queue = ScheduledWakerQueue::new();
391        let now = Instant::now();
392        let waker_1 = dummy_waker();
393        let waker_2 = dummy_waker();
394        let waker_3 = dummy_waker();
395        queue.push(now, Rc::downgrade(&waker_1));
396        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
397        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
398        drop(waker_1);
399        waker_2.take();
400
401        queue.trim_to_next_wake_time();
402        assert_eq!(queue.len(), 1);
403        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(5)));
404    }
405
406    #[test]
407    fn trim_to_next_wake_time_returns_next_wake_time() {
408        let mut queue = ScheduledWakerQueue::new();
409        let now = Instant::now();
410        let waker_1 = dummy_waker();
411        let waker_2 = dummy_waker();
412        let waker_3 = dummy_waker();
413        queue.push(now, Rc::downgrade(&waker_1));
414        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
415        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
416        drop(waker_1);
417        waker_2.take();
418
419        let next_wake_time = queue.trim_to_next_wake_time();
420        assert_eq!(next_wake_time, Some(now + Duration::from_secs(5)));
421
422        drop(waker_3);
423        let next_wake_time = queue.trim_to_next_wake_time();
424        assert_eq!(next_wake_time, None);
425    }
426
427    #[test]
428    fn wake_removes_all_wakers_up_to_given_time() {
429        let mut queue = ScheduledWakerQueue::new();
430        let now = Instant::now();
431
432        let waker_1 = dummy_waker();
433        let waker_2 = dummy_waker();
434        let waker_3 = dummy_waker();
435        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
436        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_2));
437        queue.push(now + Duration::from_secs(6), Rc::downgrade(&waker_3));
438
439        // The first two wakers should be removed, but not the third one
440        queue.wake(now + Duration::from_secs(5));
441        assert_eq!(queue.len(), 1);
442        assert_eq!(Rc::weak_count(&waker_1), 0);
443        assert_eq!(Rc::weak_count(&waker_2), 0);
444        assert_eq!(Rc::weak_count(&waker_3), 2);
445    }
446
447    #[test]
448    fn wake_activates_all_wakers_up_to_given_time() {
449        let mut queue = ScheduledWakerQueue::new();
450        let now = Instant::now();
451
452        let wake_flag_1 = Arc::new(WakeFlag::new());
453        let wake_flag_2 = Arc::new(WakeFlag::new());
454        let wake_flag_3 = Arc::new(WakeFlag::new());
455        let waker_1 = Rc::new(Cell::new(Some(Waker::from(wake_flag_1.clone()))));
456        let waker_2 = Rc::new(Cell::new(Some(Waker::from(wake_flag_2.clone()))));
457        let waker_3 = Rc::new(Cell::new(Some(Waker::from(wake_flag_3.clone()))));
458        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
459        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_2));
460        queue.push(now + Duration::from_secs(6), Rc::downgrade(&waker_3));
461
462        // The first two wakers should be triggered, but not the third one
463        queue.wake(now + Duration::from_secs(5));
464        assert!(wake_flag_1.is_woken());
465        assert!(wake_flag_2.is_woken());
466        assert!(!wake_flag_3.is_woken());
467    }
468
469    #[test]
470    fn complex_pushes_and_wakes() {
471        let mut queue = ScheduledWakerQueue::new();
472        let now = Instant::now();
473
474        let wake_flag_1 = Arc::new(WakeFlag::new());
475        let wake_flag_2 = Arc::new(WakeFlag::new());
476        let wake_flag_3 = Arc::new(WakeFlag::new());
477        let waker_1 = Rc::new(Cell::new(Some(Waker::from(wake_flag_1.clone()))));
478        let waker_2 = Rc::new(Cell::new(Some(Waker::from(wake_flag_2.clone()))));
479        let waker_3 = Rc::new(Cell::new(Some(Waker::from(wake_flag_3.clone()))));
480
481        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_1));
482        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
483        queue.push(now + Duration::from_secs(10), Rc::downgrade(&waker_3));
484
485        // The first two wakers should be triggered, but not the third one
486        queue.wake(now + Duration::from_secs(5));
487        assert!(wake_flag_1.is_woken());
488        assert!(wake_flag_2.is_woken());
489        assert!(!wake_flag_3.is_woken());
490
491        // After waking, the next wake time should be the third one
492        assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(10)));
493
494        // The third waker should be triggered now
495        queue.wake(now + Duration::from_secs(15));
496        assert!(wake_flag_3.is_woken());
497
498        // After waking all, the next wake time should be None
499        assert_eq!(queue.next_wake_time(), None);
500    }
501
502    #[test]
503    fn push_trims_earliest_dead_entries() {
504        let mut queue = ScheduledWakerQueue::new();
505        let now = Instant::now();
506
507        let wake_flag_1 = Arc::new(WakeFlag::new());
508        let wake_flag_2 = Arc::new(WakeFlag::new());
509        let wake_flag_3 = Arc::new(WakeFlag::new());
510        let wake_flag_4 = Arc::new(WakeFlag::new());
511        let waker_1 = Rc::new(Cell::new(Some(Waker::from(wake_flag_1))));
512        let waker_2 = Rc::new(Cell::new(Some(Waker::from(wake_flag_2))));
513        let waker_3 = Rc::new(Cell::new(Some(Waker::from(wake_flag_3.clone()))));
514        let waker_4 = Rc::new(Cell::new(Some(Waker::from(wake_flag_4.clone()))));
515
516        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
517        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_2));
518        queue.push(now + Duration::from_secs(7), Rc::downgrade(&waker_3));
519
520        // Manually wake the first two wakers to simulate them being woken by other conditions
521        waker_1.take().unwrap().wake();
522        waker_2.take().unwrap().wake();
523
524        // Now push a new waker, which should trigger the cleanup of the dead entries
525        queue.push(now + Duration::from_secs(1), Rc::downgrade(&waker_4));
526        assert_eq!(queue.len(), 2);
527
528        // The remaining wakers should be the third and fourth ones
529        queue.wake(now + Duration::from_secs(10));
530        assert!(wake_flag_3.is_woken());
531        assert!(wake_flag_4.is_woken());
532    }
533
534    #[test]
535    fn push_cleans_up_all_dead_entries_if_full() {
536        let mut queue = ScheduledWakerQueue::new();
537        let now = Instant::now();
538
539        let waker_1 = Rc::new(Cell::new(Some(Waker::noop().clone())));
540        let waker_2 = Rc::new(Cell::new(Some(Waker::noop().clone())));
541        let waker_3 = Rc::new(Cell::new(Some(Waker::noop().clone())));
542        let waker_4 = Rc::new(Cell::new(Some(Waker::noop().clone())));
543
544        queue.waker_to_time.reserve(10);
545        queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
546        while queue.len() + 1 < queue.waker_to_time.capacity() {
547            let waker = dummy_waker();
548            queue.push(
549                now + Duration::new(3, queue.len() as u32),
550                Rc::downgrade(&waker),
551            );
552        }
553        queue.push(now + Duration::from_secs(4), Rc::downgrade(&waker_2));
554        assert_eq!(queue.len(), queue.waker_to_time.capacity());
555
556        // The next push should trigger cleanup of expired entries
557        queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
558        assert_eq!(queue.len(), 3);
559
560        // Manually wake the last waker to simulate it being woken by other conditions
561        waker_3.take().unwrap().wake();
562
563        // Another push does not trigger cleanup since the capacity is not yet reached
564        queue.push(now + Duration::from_secs(6), Rc::downgrade(&waker_4));
565        assert_eq!(queue.len(), 4);
566    }
567}