async_fifo/channel/
non_blocking.rs

1use core::sync::atomic::AtomicUsize;
2use core::sync::atomic::Ordering::SeqCst;
3use core::array::from_fn;
4
5use alloc::sync::Arc;
6use alloc::vec::Vec;
7
8use crate::fifo::{FifoApi, Storage, TmpArray, BlockSize};
9
10/// Fifo Production Handle
11///
12/// This is the "sending" half of a FIFO channel.
13///
14/// When you call the methods of this object to send N items,
15/// a reservation of N consecutive slots is performed on the
16/// underlying FIFO. Then, once this reservation is negociated,
17/// all items are pushed into the slots sequentially.
18///
19/// Unlike [`Consumer`]s, Producers implement [`Clone`].
20///
21/// # Example
22///
23/// ```rust
24/// let (tx, rx) = async_fifo::new();
25/// 
26/// // Sending items one by one
27/// tx.send('a');
28/// tx.send('b');
29/// tx.send('c');
30/// 
31/// // Pouring items from a Vec
32/// let mut vec = vec!['a', 'b', 'c'];
33/// tx.send_iter(vec.drain(..));
34/// 
35/// // Sending an array of items
36/// let array = ['a', 'b', 'c'];
37/// tx.send_iter(array);
38/// 
39/// // Sending a slice of primitive items
40/// let slice = ['a', 'b', 'c'].as_slice();
41/// let iter = slice.iter().copied();
42/// tx.send_iter(iter);
43/// 
44/// // Receiving a total of 12 items
45/// let expected = ['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'];
46/// assert_eq!(rx.try_recv_array(), Some(expected));
47/// assert_eq!(rx.try_recv(), None);
48/// ```
49pub struct Producer<T> {
50    fifo: Arc<dyn FifoApi<T>>,
51    num_prod: Arc<AtomicUsize>,
52}
53
54impl<T> Clone for Producer<T> {
55    fn clone(&self) -> Self {
56        self.num_prod.fetch_add(1, SeqCst);
57        Self {
58            fifo: self.fifo.clone(),
59            num_prod: self.num_prod.clone(),
60        }
61    }
62}
63
64impl<T> Drop for Producer<T> {
65    fn drop(&mut self) {
66        self.num_prod.fetch_sub(1, SeqCst);
67    }
68}
69
70impl<T> Producer<T> {
71    /// Sends a batch of items in the channel, atomically.
72    ///
73    /// This operation is non-blocking and always succeeds immediately.
74    pub fn send_iter<I>(&self, into_iter: I)
75    where
76        I: IntoIterator,
77        I::IntoIter: ExactSizeIterator<Item = T>,
78    {
79        self.fifo.push(&mut into_iter.into_iter());
80    }
81
82    /// Sends one item through the channel.
83    ///
84    /// This operation is non-blocking and always succeeds immediately.
85    pub fn send(&self, item: T) {
86        self.send_iter(core::iter::once(item));
87    }
88}
89
90/// Fifo Consumption Handle
91///
92/// This handle allows you to pull items or batches
93/// of items from the FIFO, atomically.
94///
95/// Under the hood, most methods call [`Self::try_recv_into`],
96/// which knows how many items to pull from the FIFO based on
97/// the constraints set by its [`Storage`] parameter. For instance,
98/// [`Self::try_recv_exact`] has the effect of pulling either
99/// zero or exactly N items, depending on how many items are
100/// available in the FIFO.
101///
102/// Items are pulled in the exact same order as they were pushed.
103#[derive(Clone)]
104pub struct Consumer<T> {
105    fifo: Arc<dyn FifoApi<T>>,
106    num_prod: Arc<AtomicUsize>,
107}
108
109impl<T> Consumer<T> {
110    /// Returns true if all producers have been dropped
111    ///
112    /// # Example
113    ///
114    /// ```rust
115    /// let (tx, rx) = async_fifo::new();
116    /// tx.send('z');
117    ///
118    /// // one remaining producer
119    /// assert_eq!(rx.no_producers(), false);
120    ///
121    /// // drop it
122    /// core::mem::drop(tx);
123    ///
124    /// // all producers are gone
125    /// assert_eq!(rx.no_producers(), true);
126    ///
127    /// // No producer, yes, but one item is still in there.
128    /// assert_eq!(rx.try_recv(), Some('z'));
129    /// assert_eq!(rx.try_recv(), None);
130    ///
131    /// ```
132    pub fn no_producers(&self) -> bool {
133        self.num_prod.load(SeqCst) == 0
134    }
135
136    /// Tries to receive one item.
137    ///
138    /// # Example
139    ///
140    /// ```rust
141    /// let (tx, rx) = async_fifo::new();
142    /// tx.send_iter(['a', 'b', 'c']);
143    /// 
144    /// // Receive one by one
145    /// assert_eq!(rx.try_recv(), Some('a'));
146    /// assert_eq!(rx.try_recv(), Some('b'));
147    /// assert_eq!(rx.try_recv(), Some('c'));
148    /// assert_eq!(rx.try_recv(), None);
149    /// ```
150    pub fn try_recv(&self) -> Option<T> {
151        self.try_recv_array().map(|[item]| item)
152    }
153
154    /// Tries to receive as many items as possible, into a vector.
155    ///
156    /// If at least one item is received, the number of
157    /// received items is returned.
158    ///
159    /// # Example
160    ///
161    /// ```rust
162    /// let (tx, rx) = async_fifo::new();
163    /// tx.send_iter(['a', 'b', 'c', 'd']);
164    /// 
165    /// // Pull as much as possible into a vec
166    /// let mut bucket = Vec::new();
167    /// assert_eq!(rx.try_recv_many(&mut bucket), Some(4));
168    /// assert_eq!(bucket, ['a', 'b', 'c', 'd']);
169    ///
170    /// assert_eq!(rx.try_recv(), None);
171    /// ```
172    pub fn try_recv_many(&self, vec: &mut Vec<T>) -> Option<usize> {
173        self.try_recv_into(vec).ok()
174    }
175
176    /// Tries to receive exactly `slice.len()` items into a slice.
177    ///
178    /// # Example
179    ///
180    /// ```rust
181    /// let (tx, rx) = async_fifo::new();
182    /// tx.send_iter(['a', 'b', 'c']);
183    /// 
184    /// // Pull a specific amount into a slice
185    /// let mut buffer = ['_'; 3];
186    /// assert!(rx.try_recv_exact(&mut buffer).is_some());
187    /// assert_eq!(buffer, ['a', 'b', 'c']);
188    /// assert_eq!(rx.try_recv(), None);
189    /// ```
190    pub fn try_recv_exact(&self, slice: &mut [T]) -> Option<()> {
191        self.try_recv_into(slice).ok()
192    }
193
194    /// Tries to receive exactly `N` items into an array.
195    ///
196    /// # Example
197    ///
198    /// ```rust
199    /// let (tx, rx) = async_fifo::new();
200    /// tx.send_iter(['a', 'b', 'c']);
201    /// 
202    /// // Pull a specific amount into an array
203    /// assert_eq!(rx.try_recv_array(), Some(['a', 'b', 'c']));
204    /// assert_eq!(rx.try_recv(), None);
205    /// ```
206    pub fn try_recv_array<const N: usize>(&self) -> Option<[T; N]> {
207        let array = TmpArray {
208            inner: from_fn(|_| None),
209        };
210
211        self.try_recv_into(array).ok()
212    }
213
214    /// Tries to receive some items into custom storage.
215    pub fn try_recv_into<S: Storage<T>>(&self, mut storage: S) -> Result<S::Output, S> {
216        let pushed = self.fifo.pull(&mut storage);
217        storage.finish(pushed)
218    }
219
220    pub(crate) fn fifo(&self) -> &dyn FifoApi<T> {
221        &*self.fifo
222    }
223}
224
225impl<const L: usize, const F: usize> BlockSize<L, F> {
226    pub fn non_blocking<T: 'static>() -> (Producer<T>, Consumer<T>) {
227        let fifo = Self::arc_fifo();
228        let num_prod = Arc::new(AtomicUsize::new(1));
229
230        let producer = Producer {
231            fifo: fifo.clone(),
232            num_prod: num_prod.clone(),
233        };
234
235        let consumer = Consumer {
236            fifo,
237            num_prod,
238        };
239
240        (producer, consumer)
241    }
242}