embassy_time_queue_utils/queue_integrated.rs
1//! Timer queue operations.
2use core::cell::Cell;
3use core::cmp::min;
4use core::ptr::NonNull;
5use core::task::Waker;
6
7use embassy_executor_timer_queue::TimerQueueItem;
8
9/// An item in the timer queue.
10#[derive(Default)]
11struct QueueItem {
12 /// The next item in the queue.
13 ///
14 /// If this field contains `Some`, the item is in the queue. The last item in the queue has a
15 /// value of `Some(dangling_pointer)`
16 pub next: Cell<Option<NonNull<QueueItem>>>,
17
18 /// The time at which this item expires.
19 pub expires_at: u64,
20
21 /// The registered waker. If Some, the item is enqueued in the timer queue.
22 pub waker: Option<Waker>,
23}
24
25unsafe impl Sync for QueueItem {}
26
27/// A timer queue, with items integrated into tasks.
28///
29/// # Safety
30///
31/// **This Queue is only safe when there is a single integrated queue in the system.**
32///
33/// If there are multiple integrated queues, additional checks are necessary to ensure that a Waker
34/// is not attempted to be enqueued in multiple queues.
35pub struct Queue {
36 head: Cell<Option<NonNull<QueueItem>>>,
37}
38
39impl core::fmt::Debug for Queue {
40 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
41 f.debug_struct("Queue").finish()
42 }
43}
44
45unsafe impl Send for Queue {}
46unsafe impl Sync for Queue {}
47
48impl Queue {
49 /// Creates a new timer queue.
50 pub const fn new() -> Self {
51 Self { head: Cell::new(None) }
52 }
53
54 /// Schedules a task to run at a specific time.
55 ///
56 /// If this function returns `true`, the caller should find the next expiration time and set
57 /// a new alarm for that time.
58 pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool {
59 let item = unsafe {
60 // Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient to
61 // ensure that this function creates the only mutable reference to the queue item.
62 TimerQueueItem::from_embassy_waker(waker)
63 };
64 self.schedule_wake_queue_item(at, item, waker)
65 }
66
67 /// Schedules a task to run at a specific time, using a timer queue item as storage.
68 ///
69 /// If this function returns `true`, the caller should find the next expiration time and set
70 /// a new alarm for that time.
71 ///
72 /// This function assumes that the timer queue item is associated with the provided waker.
73 /// It can be used to support cases where there may not be an embassy task behind the waker.
74 /// One such case is integration with other async runtimes, be it a simple `block_on`
75 /// implementation or something more complex.
76 pub fn schedule_wake_queue_item(&mut self, at: u64, item: &mut TimerQueueItem, waker: &Waker) -> bool {
77 let item = unsafe { item.as_mut::<QueueItem>() };
78 match item.waker.as_ref() {
79 Some(_) if at <= item.expires_at => {
80 // If expiration is sooner than previously set, update.
81 item.expires_at = at;
82 // The waker is always stored in its own queue item, so we don't need to update it.
83
84 // Trigger a queue update in case this item can be immediately dequeued.
85 true
86 }
87 Some(_) => {
88 // Queue item does not need to be updated, the task will be scheduled to be woken
89 // before the new expiration.
90 false
91 }
92 None => {
93 // If not in the queue, add it and update.
94 let mut item_ptr = NonNull::from(item);
95 let prev = self.head.replace(Some(item_ptr));
96
97 let item = unsafe { item_ptr.as_mut() };
98
99 item.expires_at = at;
100 item.waker = Some(waker.clone());
101 item.next.set(prev);
102 // The default implementation doesn't care about the
103 // opaque payload, leave it unchanged.
104
105 true
106 }
107 }
108 }
109
110 /// Dequeues expired timers and returns the next alarm time.
111 ///
112 /// The provided callback will be called for each expired task. Tasks that never expire
113 /// will be removed, but the callback will not be called.
114 pub fn next_expiration(&mut self, now: u64) -> u64 {
115 let mut next_expiration = u64::MAX;
116
117 self.retain(|item| {
118 if item.expires_at <= now {
119 // Timer expired, process task.
120 if let Some(waker) = item.waker.take() {
121 waker.wake();
122 }
123 false
124 } else {
125 // Timer didn't yet expire, or never expires.
126 next_expiration = min(next_expiration, item.expires_at);
127 item.expires_at != u64::MAX
128 }
129 });
130
131 next_expiration
132 }
133
134 fn retain(&mut self, mut f: impl FnMut(&mut QueueItem) -> bool) {
135 let mut prev = &self.head;
136 while let Some(mut p) = prev.get() {
137 let mut item = unsafe { p.as_mut() };
138
139 if f(&mut item) {
140 // Skip to next
141 prev = &item.next;
142 } else {
143 // Remove it
144 prev.set(item.next.get());
145 item.next.set(None);
146 // Presence of a waker is used by schedule_wake to determine
147 // if the item is part of the queue or not,
148 // so ensure there is no waker for items removed from the queue.
149 item.waker = None;
150 }
151 }
152 }
153}