async_fifo/channel/async_api.rs
1use core::ops::Deref;
2use core::mem::drop;
3use alloc::vec::Vec;
4
5use crate::fifo::BlockSize;
6
7use super::subscription::Subscribers;
8use super::non_blocking::{Producer, Consumer};
9use super::future::{RecvOne, RecvArray, FillMany, FillExact};
10
11/// Asynchronous FIFO sender
12pub struct Sender<T> {
13 producer: Option<Producer<T>>,
14 subscribers: Subscribers,
15}
16
17// I don't understand why deriving requires T: Clone
18impl<T> Clone for Sender<T> {
19 fn clone(&self) -> Self {
20 Self {
21 producer: self.producer.clone(),
22 subscribers: self.subscribers.clone(),
23 }
24 }
25}
26
27impl<T> Sender<T> {
28 #[doc(hidden)]
29 /// This `send_iter` variant returns the number of woken-up receivers
30 pub fn send_iter_2<I>(&self, into_iter: I) -> usize
31 where
32 I: IntoIterator,
33 I::IntoIter: ExactSizeIterator<Item = T>,
34 {
35 let producer = self.producer.as_ref();
36 producer.unwrap().send_iter(into_iter);
37 self.subscribers.notify_all()
38 }
39
40 /// Sends a batch of items in the channel, atomically.
41 ///
42 /// This operation is non-blocking and always succeeds immediately.
43 /// When the call returns, all items are ready to be received.
44 pub fn send_iter<I>(&self, into_iter: I)
45 where
46 I: IntoIterator,
47 I::IntoIter: ExactSizeIterator<Item = T>,
48 {
49 self.send_iter_2(into_iter);
50 }
51
52 /// Sends one item through the channel.
53 ///
54 /// This operation is non-blocking and always succeeds immediately.
55 pub fn send(&self, item: T) {
56 self.send_iter_2(core::iter::once(item));
57 }
58}
59
60impl<T> Drop for Sender<T> {
61 fn drop(&mut self) {
62 drop(self.producer.take());
63 self.subscribers.notify_all();
64 }
65}
66
67/// Asynchronous FIFO receiver
68#[derive(Clone)]
69pub struct Receiver<T> {
70 consumer: Consumer<T>,
71 subscribers: Subscribers,
72 pub(super) last_wake_count: Option<usize>,
73}
74
75impl<T> Deref for Receiver<T> {
76 type Target = Consumer<T>;
77
78 fn deref(&self) -> &Consumer<T> {
79 &self.consumer
80 }
81}
82
83impl<T> Receiver<T> {
84 pub(super) fn subscribers(&self) -> &Subscribers {
85 &self.subscribers
86 }
87}
88
89impl<T: Unpin> Receiver<T> {
90 /// Returns true if all senders have been dropped
91 ///
92 /// # Example
93 ///
94 /// ```rust
95 /// let (tx, mut rx) = async_fifo::new();
96 /// tx.send('z');
97 ///
98 /// // one remaining sender
99 /// assert_eq!(rx.no_senders(), false);
100 ///
101 /// // drop it
102 /// core::mem::drop(tx);
103 ///
104 /// // all senders are gone
105 /// assert_eq!(rx.no_senders(), true);
106 ///
107 /// // No sender, yes, but one item is still in there.
108 /// assert_eq!(rx.try_recv(), Some('z'));
109 /// assert_eq!(rx.try_recv(), None);
110 ///
111 /// ```
112 pub fn no_senders(&self) -> bool {
113 self.consumer.no_producers()
114 }
115
116 /// Receives one item, asynchronously.
117 ///
118 /// # Example
119 ///
120 /// ```rust
121 /// # use async_fifo::{block_on, Closed};
122 /// # block_on(async {
123 /// let (tx, mut rx) = async_fifo::new();
124 /// tx.send_iter(['a', 'b', 'c']);
125 ///
126 /// // Receive one by one
127 /// assert_eq!(rx.recv().await, Ok('a'));
128 /// assert_eq!(rx.recv().await, Ok('b'));
129 /// assert_eq!(rx.recv().await, Ok('c'));
130 ///
131 /// core::mem::drop(tx);
132 /// assert_eq!(rx.recv().await, Err(Closed));
133 /// # });
134 /// ```
135 pub fn recv(&mut self) -> RecvOne<'_, T> {
136 self.into_recv()
137 }
138
139 /// Receives as many items as possible, into a vector, asynchronously.
140 ///
141 /// The number of received items is returned.
142 ///
143 /// # Example
144 ///
145 /// ```rust
146 /// # use async_fifo::{block_on, Closed};
147 /// # block_on(async {
148 /// let (tx, mut rx) = async_fifo::new();
149 /// tx.send_iter(['a', 'b', 'c', 'd']);
150 ///
151 /// // Pull as much as possible into a vec
152 /// let mut bucket = Vec::new();
153 /// assert_eq!(rx.recv_many(&mut bucket).await, Ok(4));
154 /// assert_eq!(bucket, ['a', 'b', 'c', 'd']);
155 ///
156 /// core::mem::drop(tx);
157 /// assert_eq!(rx.recv_many(&mut bucket).await, Err(Closed));
158 /// # });
159 /// ```
160 pub fn recv_many<'a>(&'a mut self, vec: &'a mut Vec<T>) -> FillMany<'a, T> {
161 self.into_fill(vec)
162 }
163
164 /// Receives exactly `slice.len()` items into a slice, asynchronously.
165 ///
166 /// # Example
167 ///
168 /// ```rust
169 /// # use async_fifo::{block_on, Closed};
170 /// # block_on(async {
171 /// let (tx, mut rx) = async_fifo::new();
172 /// tx.send_iter(['a', 'b', 'c']);
173 ///
174 /// // Pull a specific amount into a slice
175 /// let mut buffer = ['_'; 3];
176 /// assert_eq!(rx.recv_exact(&mut buffer).await, Ok(3));
177 /// assert_eq!(buffer, ['a', 'b', 'c']);
178 ///
179 /// core::mem::drop(tx);
180 /// assert_eq!(rx.recv_exact(&mut buffer).await, Err(Closed));
181 /// # });
182 /// ```
183 pub fn recv_exact<'a>(&'a mut self, slice: &'a mut [T]) -> FillExact<'a, T> {
184 self.into_fill(slice)
185 }
186
187 /// Receives exactly `N` items into an array, asynchronously.
188 ///
189 /// # Example
190 ///
191 /// ```rust
192 /// # use async_fifo::{block_on, Closed};
193 /// # block_on(async {
194 /// let (tx, mut rx) = async_fifo::new();
195 /// tx.send_iter(['a', 'b', 'c']);
196 ///
197 /// // Pull a specific amount into an array
198 /// assert_eq!(rx.recv_array().await, Ok(['a', 'b', 'c']));
199 ///
200 /// core::mem::drop(tx);
201 /// assert_eq!(rx.recv_array::<3>().await, Err(Closed));
202 /// # });
203 /// ```
204 pub fn recv_array<const N: usize>(&mut self) -> RecvArray<'_, N, T> {
205 self.into_recv()
206 }
207}
208
209impl<const L: usize, const F: usize> BlockSize<L, F> {
210 pub fn channel<T: 'static>() -> (Sender<T>, Receiver<T>) {
211 let (producer, consumer) = Self::non_blocking();
212
213 let subscribers = Subscribers::default();
214
215 let sender = Sender {
216 producer: Some(producer),
217 subscribers: subscribers.clone(),
218 };
219
220 let receiver = Receiver {
221 consumer,
222 last_wake_count: None,
223 subscribers: subscribers.clone(),
224 };
225
226 (sender, receiver)
227 }
228}