nb_executor/
collections.rs

1use super::{EventMask, Signals};
2use core::convert::Infallible;
3use heapless::mpmc::MpMcQueue;
4use nb::Error::WouldBlock;
5
6/// Multi-producer, multi-consumer, fixed-capacity async queue.
7///
8/// This wraps a [`MpMcQueue`] from the `heapless` crate, adding support for
9/// asynchronous enqueue and dequeue operations. `N` must be a power of two.
10#[cfg_attr(docsrs, doc(cfg(feature = "heapless")))]
11pub struct Mpmc<T, const N: usize>(MpMcQueue<T, N>);
12
13impl<T, const N: usize> Default for Mpmc<T, N> {
14    fn default() -> Self {
15        Mpmc(Default::default())
16    }
17}
18
19impl<T, const N: usize> Mpmc<T, N> {
20    /// Creates an empty queue.
21    pub const fn new() -> Self {
22        Mpmc(MpMcQueue::new())
23    }
24
25    /// Accesses the inner [`MpMcQueue`].
26    pub fn inner(&self) -> &MpMcQueue<T, N> {
27        &self.0
28    }
29
30    /// Adds an item to the queue.
31    ///
32    /// If the queue is full, this will wait on `ev` until a successful insertion.
33    /// After inserting the item, `ev` will be raised in order to signal producers.
34    pub async fn enqueue<S: EventMask>(&self, item: T, signals: &Signals<'_, S>, ev: S) {
35        let mut item = Some(item);
36        let queued = signals.drive_infallible(ev, move || {
37            self.try_enqueue(item.take().ok_or(WouldBlock)?, signals, ev)
38                .map_err(|returned| {
39                    item = Some(returned);
40                    WouldBlock
41                })
42        });
43
44        queued.await
45    }
46
47    /// Attempts to add an item to the queue.
48    ///
49    /// If the queue is currently full, the item is returned in the `Err` variant.
50    /// See also [`Mpmc::enqueue()`].
51    pub fn try_enqueue<S: EventMask>(
52        &self,
53        item: T,
54        signals: &Signals<'_, S>,
55        ev: S,
56    ) -> Result<(), T> {
57        let result = self.0.enqueue(item);
58        if let Ok(()) = result {
59            signals.raise(ev);
60        }
61
62        result
63    }
64
65    /// Removes an item from the queue.
66    ///
67    /// If the queue is empty, this will wait on `ev` until an item is enqueued.
68    /// After removing an item, `ev` will be raised to notify producers of available
69    /// space in the queue.
70    pub async fn dequeue<S: EventMask>(&self, signals: &Signals<'_, S>, ev: S) -> T {
71        signals
72            .drive_infallible(ev, || self.try_dequeue(signals, ev))
73            .await
74    }
75
76    /// Attempts to remove an item from the queue.
77    ///
78    /// This will return `WouldBlock` if the queue is empty. See also [`Mpmc::dequeue()`].
79    pub fn try_dequeue<S: EventMask>(
80        &self,
81        signals: &Signals<'_, S>,
82        ev: S,
83    ) -> nb::Result<T, Infallible> {
84        let item = self.0.dequeue().ok_or(WouldBlock)?;
85
86        signals.raise(ev);
87        Ok(item)
88    }
89}
90
91impl<T, const N: usize> From<MpMcQueue<T, N>> for Mpmc<T, N> {
92    fn from(queue: MpMcQueue<T, N>) -> Self {
93        Mpmc(queue)
94    }
95}
96
97impl<T, const N: usize> From<Mpmc<T, N>> for MpMcQueue<T, N> {
98    fn from(queue: Mpmc<T, N>) -> Self {
99        queue.0
100    }
101}