async_fifo/channel/non_blocking.rs
1use core::sync::atomic::AtomicUsize;
2use core::sync::atomic::Ordering::SeqCst;
3use core::array::from_fn;
4
5use alloc::sync::Arc;
6use alloc::vec::Vec;
7
8use crate::fifo::{FifoApi, Storage, TmpArray, BlockSize};
9
10/// Fifo Production Handle
11///
12/// This is the "sending" half of a FIFO channel.
13///
14/// When you call the methods of this object to send N items,
15/// a reservation of N consecutive slots is performed on the
16/// underlying FIFO. Then, once this reservation is negociated,
17/// all items are pushed into the slots sequentially.
18///
19/// Unlike [`Consumer`]s, Producers implement [`Clone`].
20///
21/// # Example
22///
23/// ```rust
24/// let (tx, rx) = async_fifo::new();
25///
26/// // Sending items one by one
27/// tx.send('a');
28/// tx.send('b');
29/// tx.send('c');
30///
31/// // Pouring items from a Vec
32/// let mut vec = vec!['a', 'b', 'c'];
33/// tx.send_iter(vec.drain(..));
34///
35/// // Sending an array of items
36/// let array = ['a', 'b', 'c'];
37/// tx.send_iter(array);
38///
39/// // Sending a slice of primitive items
40/// let slice = ['a', 'b', 'c'].as_slice();
41/// let iter = slice.iter().copied();
42/// tx.send_iter(iter);
43///
44/// // Receiving a total of 12 items
45/// let expected = ['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'];
46/// assert_eq!(rx.try_recv_array(), Some(expected));
47/// assert_eq!(rx.try_recv(), None);
48/// ```
49pub struct Producer<T> {
50 fifo: Arc<dyn FifoApi<T>>,
51 num_prod: Arc<AtomicUsize>,
52}
53
54impl<T> Clone for Producer<T> {
55 fn clone(&self) -> Self {
56 self.num_prod.fetch_add(1, SeqCst);
57 Self {
58 fifo: self.fifo.clone(),
59 num_prod: self.num_prod.clone(),
60 }
61 }
62}
63
64impl<T> Drop for Producer<T> {
65 fn drop(&mut self) {
66 self.num_prod.fetch_sub(1, SeqCst);
67 }
68}
69
70impl<T> Producer<T> {
71 /// Sends a batch of items in the channel, atomically.
72 ///
73 /// This operation is non-blocking and always succeeds immediately.
74 pub fn send_iter<I>(&self, into_iter: I)
75 where
76 I: IntoIterator,
77 I::IntoIter: ExactSizeIterator<Item = T>,
78 {
79 self.fifo.push(&mut into_iter.into_iter());
80 }
81
82 /// Sends one item through the channel.
83 ///
84 /// This operation is non-blocking and always succeeds immediately.
85 pub fn send(&self, item: T) {
86 self.send_iter(core::iter::once(item));
87 }
88}
89
90/// Fifo Consumption Handle
91///
92/// This handle allows you to pull items or batches
93/// of items from the FIFO, atomically.
94///
95/// Under the hood, most methods call [`Self::try_recv_into`],
96/// which knows how many items to pull from the FIFO based on
97/// the constraints set by its [`Storage`] parameter. For instance,
98/// [`Self::try_recv_exact`] has the effect of pulling either
99/// zero or exactly N items, depending on how many items are
100/// available in the FIFO.
101///
102/// Items are pulled in the exact same order as they were pushed.
103#[derive(Clone)]
104pub struct Consumer<T> {
105 fifo: Arc<dyn FifoApi<T>>,
106 num_prod: Arc<AtomicUsize>,
107}
108
109impl<T> Consumer<T> {
110 /// Returns true if all producers have been dropped
111 ///
112 /// # Example
113 ///
114 /// ```rust
115 /// let (tx, rx) = async_fifo::new();
116 /// tx.send('z');
117 ///
118 /// // one remaining producer
119 /// assert_eq!(rx.no_producers(), false);
120 ///
121 /// // drop it
122 /// core::mem::drop(tx);
123 ///
124 /// // all producers are gone
125 /// assert_eq!(rx.no_producers(), true);
126 ///
127 /// // No producer, yes, but one item is still in there.
128 /// assert_eq!(rx.try_recv(), Some('z'));
129 /// assert_eq!(rx.try_recv(), None);
130 ///
131 /// ```
132 pub fn no_producers(&self) -> bool {
133 self.num_prod.load(SeqCst) == 0
134 }
135
136 /// Tries to receive one item.
137 ///
138 /// # Example
139 ///
140 /// ```rust
141 /// let (tx, rx) = async_fifo::new();
142 /// tx.send_iter(['a', 'b', 'c']);
143 ///
144 /// // Receive one by one
145 /// assert_eq!(rx.try_recv(), Some('a'));
146 /// assert_eq!(rx.try_recv(), Some('b'));
147 /// assert_eq!(rx.try_recv(), Some('c'));
148 /// assert_eq!(rx.try_recv(), None);
149 /// ```
150 pub fn try_recv(&self) -> Option<T> {
151 self.try_recv_array().map(|[item]| item)
152 }
153
154 /// Tries to receive as many items as possible, into a vector.
155 ///
156 /// If at least one item is received, the number of
157 /// received items is returned.
158 ///
159 /// # Example
160 ///
161 /// ```rust
162 /// let (tx, rx) = async_fifo::new();
163 /// tx.send_iter(['a', 'b', 'c', 'd']);
164 ///
165 /// // Pull as much as possible into a vec
166 /// let mut bucket = Vec::new();
167 /// assert_eq!(rx.try_recv_many(&mut bucket), Some(4));
168 /// assert_eq!(bucket, ['a', 'b', 'c', 'd']);
169 ///
170 /// assert_eq!(rx.try_recv(), None);
171 /// ```
172 pub fn try_recv_many(&self, vec: &mut Vec<T>) -> Option<usize> {
173 self.try_recv_into(vec).ok()
174 }
175
176 /// Tries to receive exactly `slice.len()` items into a slice.
177 ///
178 /// # Example
179 ///
180 /// ```rust
181 /// let (tx, rx) = async_fifo::new();
182 /// tx.send_iter(['a', 'b', 'c']);
183 ///
184 /// // Pull a specific amount into a slice
185 /// let mut buffer = ['_'; 3];
186 /// assert!(rx.try_recv_exact(&mut buffer).is_some());
187 /// assert_eq!(buffer, ['a', 'b', 'c']);
188 /// assert_eq!(rx.try_recv(), None);
189 /// ```
190 pub fn try_recv_exact(&self, slice: &mut [T]) -> Option<()> {
191 self.try_recv_into(slice).ok()
192 }
193
194 /// Tries to receive exactly `N` items into an array.
195 ///
196 /// # Example
197 ///
198 /// ```rust
199 /// let (tx, rx) = async_fifo::new();
200 /// tx.send_iter(['a', 'b', 'c']);
201 ///
202 /// // Pull a specific amount into an array
203 /// assert_eq!(rx.try_recv_array(), Some(['a', 'b', 'c']));
204 /// assert_eq!(rx.try_recv(), None);
205 /// ```
206 pub fn try_recv_array<const N: usize>(&self) -> Option<[T; N]> {
207 let array = TmpArray {
208 inner: from_fn(|_| None),
209 };
210
211 self.try_recv_into(array).ok()
212 }
213
214 /// Tries to receive some items into custom storage.
215 pub fn try_recv_into<S: Storage<T>>(&self, mut storage: S) -> Result<S::Output, S> {
216 let pushed = self.fifo.pull(&mut storage);
217 storage.finish(pushed)
218 }
219
220 pub(crate) fn fifo(&self) -> &dyn FifoApi<T> {
221 &*self.fifo
222 }
223}
224
225impl<const L: usize, const F: usize> BlockSize<L, F> {
226 pub fn non_blocking<T: 'static>() -> (Producer<T>, Consumer<T>) {
227 let fifo = Self::arc_fifo();
228 let num_prod = Arc::new(AtomicUsize::new(1));
229
230 let producer = Producer {
231 fifo: fifo.clone(),
232 num_prod: num_prod.clone(),
233 };
234
235 let consumer = Consumer {
236 fifo,
237 num_prod,
238 };
239
240 (producer, consumer)
241 }
242}