nb_executor/
collections.rs1use super::{EventMask, Signals};
2use core::convert::Infallible;
3use heapless::mpmc::MpMcQueue;
4use nb::Error::WouldBlock;
5
6#[cfg_attr(docsrs, doc(cfg(feature = "heapless")))]
11pub struct Mpmc<T, const N: usize>(MpMcQueue<T, N>);
12
13impl<T, const N: usize> Default for Mpmc<T, N> {
14 fn default() -> Self {
15 Mpmc(Default::default())
16 }
17}
18
19impl<T, const N: usize> Mpmc<T, N> {
20 pub const fn new() -> Self {
22 Mpmc(MpMcQueue::new())
23 }
24
25 pub fn inner(&self) -> &MpMcQueue<T, N> {
27 &self.0
28 }
29
30 pub async fn enqueue<S: EventMask>(&self, item: T, signals: &Signals<'_, S>, ev: S) {
35 let mut item = Some(item);
36 let queued = signals.drive_infallible(ev, move || {
37 self.try_enqueue(item.take().ok_or(WouldBlock)?, signals, ev)
38 .map_err(|returned| {
39 item = Some(returned);
40 WouldBlock
41 })
42 });
43
44 queued.await
45 }
46
47 pub fn try_enqueue<S: EventMask>(
52 &self,
53 item: T,
54 signals: &Signals<'_, S>,
55 ev: S,
56 ) -> Result<(), T> {
57 let result = self.0.enqueue(item);
58 if let Ok(()) = result {
59 signals.raise(ev);
60 }
61
62 result
63 }
64
65 pub async fn dequeue<S: EventMask>(&self, signals: &Signals<'_, S>, ev: S) -> T {
71 signals
72 .drive_infallible(ev, || self.try_dequeue(signals, ev))
73 .await
74 }
75
76 pub fn try_dequeue<S: EventMask>(
80 &self,
81 signals: &Signals<'_, S>,
82 ev: S,
83 ) -> nb::Result<T, Infallible> {
84 let item = self.0.dequeue().ok_or(WouldBlock)?;
85
86 signals.raise(ev);
87 Ok(item)
88 }
89}
90
91impl<T, const N: usize> From<MpMcQueue<T, N>> for Mpmc<T, N> {
92 fn from(queue: MpMcQueue<T, N>) -> Self {
93 Mpmc(queue)
94 }
95}
96
97impl<T, const N: usize> From<Mpmc<T, N>> for MpMcQueue<T, N> {
98 fn from(queue: Mpmc<T, N>) -> Self {
99 queue.0
100 }
101}