embassy_time_queue_utils/
queue_integrated.rs

1//! Timer queue operations.
2use core::cell::Cell;
3use core::cmp::min;
4use core::ptr::NonNull;
5use core::task::Waker;
6
7use embassy_executor_timer_queue::TimerQueueItem;
8
9/// An item in the timer queue.
10#[derive(Default)]
11struct QueueItem {
12    /// The next item in the queue.
13    ///
14    /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
15    /// value of `Some(dangling_pointer)`
16    pub next: Cell<Option<NonNull<QueueItem>>>,
17
18    /// The time at which this item expires.
19    pub expires_at: u64,
20
21    /// The registered waker. If Some, the item is enqueued in the timer queue.
22    pub waker: Option<Waker>,
23}
24
25unsafe impl Sync for QueueItem {}
26
27/// A timer queue, with items integrated into tasks.
28///
29/// # Safety
30///
31/// **This Queue is only safe when there is a single integrated queue in the system.**
32///
33/// If there are multiple integrated queues, additional checks are necessary to ensure that a Waker
34/// is not attempted to be enqueued in multiple queues.
35pub struct Queue {
36    head: Cell<Option<NonNull<QueueItem>>>,
37}
38
39impl core::fmt::Debug for Queue {
40    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
41        f.debug_struct("Queue").finish()
42    }
43}
44
45unsafe impl Send for Queue {}
46unsafe impl Sync for Queue {}
47
48impl Queue {
49    /// Creates a new timer queue.
50    pub const fn new() -> Self {
51        Self { head: Cell::new(None) }
52    }
53
54    /// Schedules a task to run at a specific time.
55    ///
56    /// If this function returns `true`, the called should find the next expiration time and set
57    /// a new alarm for that time.
58    pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool {
59        let item = unsafe {
60            // Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient to
61            // ensure that this function creates the only mutable reference to the queue item.
62            TimerQueueItem::from_embassy_waker(waker)
63        };
64        let item = unsafe { item.as_mut::<QueueItem>() };
65        match item.waker.as_ref() {
66            Some(_) if at <= item.expires_at => {
67                // If expiration is sooner than previously set, update.
68                item.expires_at = at;
69                // The waker is always stored in its own queue item, so we don't need to update it.
70
71                // Trigger a queue update in case this item can be immediately dequeued.
72                true
73            }
74            Some(_) => {
75                // Queue item does not need to be updated, the task will be scheduled to be woken
76                // before the new expiration.
77                false
78            }
79            None => {
80                // If not in the queue, add it and update.
81                let mut item_ptr = NonNull::from(item);
82                let prev = self.head.replace(Some(item_ptr));
83
84                let item = unsafe { item_ptr.as_mut() };
85
86                item.expires_at = at;
87                item.waker = Some(waker.clone());
88                item.next.set(prev);
89                // The default implementation doesn't care about the
90                // opaque payload, leave it unchanged.
91
92                true
93            }
94        }
95    }
96
97    /// Dequeues expired timers and returns the next alarm time.
98    ///
99    /// The provided callback will be called for each expired task. Tasks that never expire
100    /// will be removed, but the callback will not be called.
101    pub fn next_expiration(&mut self, now: u64) -> u64 {
102        let mut next_expiration = u64::MAX;
103
104        self.retain(|item| {
105            if item.expires_at <= now {
106                // Timer expired, process task.
107                if let Some(waker) = item.waker.take() {
108                    waker.wake();
109                }
110                false
111            } else {
112                // Timer didn't yet expire, or never expires.
113                next_expiration = min(next_expiration, item.expires_at);
114                item.expires_at != u64::MAX
115            }
116        });
117
118        next_expiration
119    }
120
121    fn retain(&mut self, mut f: impl FnMut(&mut QueueItem) -> bool) {
122        let mut prev = &self.head;
123        while let Some(mut p) = prev.get() {
124            let mut item = unsafe { p.as_mut() };
125
126            if f(&mut item) {
127                // Skip to next
128                prev = &item.next;
129            } else {
130                // Remove it
131                prev.set(item.next.get());
132                item.next.set(None);
133            }
134        }
135    }
136}