async_fifo/channel/
async_api.rs

1use core::ops::Deref;
2use core::mem::drop;
3use alloc::vec::Vec;
4
5use crate::fifo::BlockSize;
6
7use super::subscription::Subscribers;
8use super::non_blocking::{Producer, Consumer};
9use super::future::{RecvOne, RecvArray, FillMany, FillExact};
10
11/// Asynchronous FIFO sender
12pub struct Sender<T> {
13    producer: Option<Producer<T>>,
14    subscribers: Subscribers,
15}
16
17// I don't understand why deriving requires T: Clone
18impl<T> Clone for Sender<T> {
19    fn clone(&self) -> Self {
20        Self {
21            producer: self.producer.clone(),
22            subscribers: self.subscribers.clone(),
23        }
24    }
25}
26
27impl<T> Sender<T> {
28    #[doc(hidden)]
29    /// This `send_iter` variant returns the number of woken-up receivers
30    pub fn send_iter_2<I>(&self, into_iter: I) -> usize
31    where
32        I: IntoIterator,
33        I::IntoIter: ExactSizeIterator<Item = T>,
34    {
35        let producer = self.producer.as_ref();
36        producer.unwrap().send_iter(into_iter);
37        self.subscribers.notify_all()
38    }
39
40    /// Sends a batch of items in the channel, atomically.
41    ///
42    /// This operation is non-blocking and always succeeds immediately.
43    /// When the call returns, all items are ready to be received.
44    pub fn send_iter<I>(&self, into_iter: I)
45    where
46        I: IntoIterator,
47        I::IntoIter: ExactSizeIterator<Item = T>,
48    {
49        self.send_iter_2(into_iter);
50    }
51
52    /// Sends one item through the channel.
53    ///
54    /// This operation is non-blocking and always succeeds immediately.
55    pub fn send(&self, item: T) {
56        self.send_iter_2(core::iter::once(item));
57    }
58}
59
60impl<T> Drop for Sender<T> {
61    fn drop(&mut self) {
62        drop(self.producer.take());
63        self.subscribers.notify_all();
64    }
65}
66
67/// Asynchronous FIFO receiver
68#[derive(Clone)]
69pub struct Receiver<T> {
70    consumer: Consumer<T>,
71    subscribers: Subscribers,
72    pub(super) last_wake_count: Option<usize>,
73}
74
75impl<T> Deref for Receiver<T> {
76    type Target = Consumer<T>;
77
78    fn deref(&self) -> &Consumer<T> {
79        &self.consumer
80    }
81}
82
83impl<T> Receiver<T> {
84    pub(super) fn subscribers(&self) -> &Subscribers {
85        &self.subscribers
86    }
87}
88
89impl<T: Unpin> Receiver<T> {
90    /// Returns true if all senders have been dropped
91    ///
92    /// # Example
93    ///
94    /// ```rust
95    /// let (tx, mut rx) = async_fifo::new();
96    /// tx.send('z');
97    ///
98    /// // one remaining sender
99    /// assert_eq!(rx.no_senders(), false);
100    ///
101    /// // drop it
102    /// core::mem::drop(tx);
103    ///
104    /// // all senders are gone
105    /// assert_eq!(rx.no_senders(), true);
106    ///
107    /// // No sender, yes, but one item is still in there.
108    /// assert_eq!(rx.try_recv(), Some('z'));
109    /// assert_eq!(rx.try_recv(), None);
110    ///
111    /// ```
112    pub fn no_senders(&self) -> bool {
113        self.consumer.no_producers()
114    }
115
116    /// Receives one item, asynchronously.
117    ///
118    /// # Example
119    ///
120    /// ```rust
121    /// # use async_fifo::{block_on, Closed};
122    /// # block_on(async {
123    /// let (tx, mut rx) = async_fifo::new();
124    /// tx.send_iter(['a', 'b', 'c']);
125    ///
126    /// // Receive one by one
127    /// assert_eq!(rx.recv().await, Ok('a'));
128    /// assert_eq!(rx.recv().await, Ok('b'));
129    /// assert_eq!(rx.recv().await, Ok('c'));
130    ///
131    /// core::mem::drop(tx);
132    /// assert_eq!(rx.recv().await, Err(Closed));
133    /// # });
134    /// ```
135    pub fn recv(&mut self) -> RecvOne<'_, T> {
136        self.into_recv()
137    }
138
139    /// Receives as many items as possible, into a vector, asynchronously.
140    ///
141    /// The number of received items is returned.
142    ///
143    /// # Example
144    ///
145    /// ```rust
146    /// # use async_fifo::{block_on, Closed};
147    /// # block_on(async {
148    /// let (tx, mut rx) = async_fifo::new();
149    /// tx.send_iter(['a', 'b', 'c', 'd']);
150    /// 
151    /// // Pull as much as possible into a vec
152    /// let mut bucket = Vec::new();
153    /// assert_eq!(rx.recv_many(&mut bucket).await, Ok(4));
154    /// assert_eq!(bucket, ['a', 'b', 'c', 'd']);
155    ///
156    /// core::mem::drop(tx);
157    /// assert_eq!(rx.recv_many(&mut bucket).await, Err(Closed));
158    /// # });
159    /// ```
160    pub fn recv_many<'a>(&'a mut self, vec: &'a mut Vec<T>) -> FillMany<'a, T> {
161        self.into_fill(vec)
162    }
163
164    /// Receives exactly `slice.len()` items into a slice, asynchronously.
165    ///
166    /// # Example
167    ///
168    /// ```rust
169    /// # use async_fifo::{block_on, Closed};
170    /// # block_on(async {
171    /// let (tx, mut rx) = async_fifo::new();
172    /// tx.send_iter(['a', 'b', 'c']);
173    /// 
174    /// // Pull a specific amount into a slice
175    /// let mut buffer = ['_'; 3];
176    /// assert_eq!(rx.recv_exact(&mut buffer).await, Ok(3));
177    /// assert_eq!(buffer, ['a', 'b', 'c']);
178    ///
179    /// core::mem::drop(tx);
180    /// assert_eq!(rx.recv_exact(&mut buffer).await, Err(Closed));
181    /// # });
182    /// ```
183    pub fn recv_exact<'a>(&'a mut self, slice: &'a mut [T]) -> FillExact<'a, T> {
184        self.into_fill(slice)
185    }
186
187    /// Receives exactly `N` items into an array, asynchronously.
188    ///
189    /// # Example
190    ///
191    /// ```rust
192    /// # use async_fifo::{block_on, Closed};
193    /// # block_on(async {
194    /// let (tx, mut rx) = async_fifo::new();
195    /// tx.send_iter(['a', 'b', 'c']);
196    /// 
197    /// // Pull a specific amount into an array
198    /// assert_eq!(rx.recv_array().await, Ok(['a', 'b', 'c']));
199    ///
200    /// core::mem::drop(tx);
201    /// assert_eq!(rx.recv_array::<3>().await, Err(Closed));
202    /// # });
203    /// ```
204    pub fn recv_array<const N: usize>(&mut self) -> RecvArray<'_, N, T> {
205        self.into_recv()
206    }
207}
208
209impl<const L: usize, const F: usize> BlockSize<L, F> {
210    pub fn channel<T: 'static>() -> (Sender<T>, Receiver<T>) {
211        let (producer, consumer) = Self::non_blocking();
212
213        let subscribers = Subscribers::default();
214
215        let sender = Sender {
216            producer: Some(producer),
217            subscribers: subscribers.clone(),
218        };
219
220        let receiver = Receiver {
221            consumer,
222            last_wake_count: None,
223            subscribers: subscribers.clone(),
224        };
225
226        (sender, receiver)
227    }
228}