embassy_sync/waitqueue/
multi_waker.rs

1use core::task::Waker;
2
3use heapless::Vec;
4
5/// Utility struct to register and wake multiple wakers.
6/// Queue of wakers with a maximum length of `N`.
7/// Intended for waking multiple tasks.
8#[derive(Debug)]
9pub struct MultiWakerRegistration<const N: usize> {
10    wakers: Vec<Waker, N>,
11}
12
13impl<const N: usize> MultiWakerRegistration<N> {
14    /// Create a new empty instance
15    pub const fn new() -> Self {
16        Self { wakers: Vec::new() }
17    }
18
19    /// Register a waker.
20    ///
21    /// If the buffer is full, [wakes all the wakers](Self::wake), clears its buffer and registers the waker.
22    pub fn register(&mut self, w: &Waker) {
23        // If we already have some waker that wakes the same task as `w`, do nothing.
24        // This avoids cloning wakers, and avoids unnecessary mass-wakes.
25        for w2 in &self.wakers {
26            if w.will_wake(w2) {
27                return;
28            }
29        }
30
31        if self.wakers.is_full() {
32            // All waker slots were full. It's a bit inefficient, but we can wake everything.
33            // Any future that is still active will simply reregister.
34            // This won't happen a lot, so it's ok.
35            self.wake();
36        }
37
38        if self.wakers.push(w.clone()).is_err() {
39            // This can't happen unless N=0
40            // (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
41            panic!("tried to push a waker to a zero-length MultiWakerRegistration")
42        }
43    }
44
45    /// Wake all registered wakers. This clears the buffer
46    pub fn wake(&mut self) {
47        // heapless::Vec has no `drain()`, do it unsafely ourselves...
48
49        // First set length to 0, without dropping the contents.
50        // This is necessary for soundness: if wake() panics and we're using panic=unwind.
51        // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
52        // (it'll leak wakers, but that's not UB)
53        let len = self.wakers.len();
54        unsafe { self.wakers.set_len(0) }
55
56        for i in 0..len {
57            // Move a waker out of the vec.
58            let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
59            // Wake it by value, which consumes (drops) it.
60            waker.wake();
61        }
62    }
63}