embassy_sync/
zerocopy_channel.rs

1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by a producer (sender) and a
4//! consumer (receiver), i.e. it is an  "SPSC channel".
5//!
6//! This queue takes a Mutex type so that various
7//! targets can be attained. For example, a ThreadModeMutex can be used
8//! for single-core Cortex-M targets where messages are only passed
9//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
10//! can also be used for single-core targets where messages are to be
11//! passed from exception mode e.g. out of an interrupt handler.
12//!
13//! This module provides a bounded channel that has a limit on the number of
14//! messages that it can store, and if this limit is reached, trying to send
15//! another message will result in an error being returned.
16
17use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26/// A bounded zero-copy channel for communicating between asynchronous tasks
27/// with backpressure.
28///
29/// The channel will buffer up to the provided number of messages.  Once the
30/// buffer is full, attempts to `send` new messages will wait until a message is
31/// received from the channel.
32///
33/// All data sent will become available in the same order as it was sent.
34///
35/// The channel requires a buffer of recyclable elements.  Writing to the channel is done through
36/// an `&mut T`.
37pub struct Channel<'a, M: RawMutex, T> {
38    buf: BufferPtr<T>,
39    phantom: PhantomData<&'a mut T>,
40    state: Mutex<M, RefCell<State>>,
41}
42
43impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44    /// Initialize a new [`Channel`].
45    ///
46    /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
47    /// channel's capacity.
48    pub fn new(buf: &'a mut [T]) -> Self {
49        let len = buf.len();
50        assert!(len != 0);
51
52        Self {
53            buf: BufferPtr(buf.as_mut_ptr()),
54            phantom: PhantomData,
55            state: Mutex::new(RefCell::new(State {
56                capacity: len,
57                front: 0,
58                back: 0,
59                full: false,
60                send_waker: WakerRegistration::new(),
61                receive_waker: WakerRegistration::new(),
62            })),
63        }
64    }
65
66    /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
67    ///
68    /// Further Senders and Receivers can be created through [`Sender::borrow`] and
69    /// [`Receiver::borrow`] respectively.
70    pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71        (Sender { channel: self }, Receiver { channel: self })
72    }
73
74    /// Clears all elements in the channel.
75    pub fn clear(&mut self) {
76        self.state.lock(|s| {
77            s.borrow_mut().clear();
78        });
79    }
80
81    /// Returns the number of elements currently in the channel.
82    pub fn len(&self) -> usize {
83        self.state.lock(|s| s.borrow().len())
84    }
85
86    /// Returns whether the channel is empty.
87    pub fn is_empty(&self) -> bool {
88        self.state.lock(|s| s.borrow().is_empty())
89    }
90
91    /// Returns whether the channel is full.
92    pub fn is_full(&self) -> bool {
93        self.state.lock(|s| s.borrow().is_full())
94    }
95}
96
97#[repr(transparent)]
98struct BufferPtr<T>(*mut T);
99
100impl<T> BufferPtr<T> {
101    unsafe fn add(&self, count: usize) -> *mut T {
102        self.0.add(count)
103    }
104}
105
106unsafe impl<T> Send for BufferPtr<T> {}
107unsafe impl<T> Sync for BufferPtr<T> {}
108
109/// Send-only access to a [`Channel`].
110pub struct Sender<'a, M: RawMutex, T> {
111    channel: &'a Channel<'a, M, T>,
112}
113
114impl<'a, M: RawMutex, T> Sender<'a, M, T> {
115    /// Creates one further [`Sender`] over the same channel.
116    pub fn borrow(&mut self) -> Sender<'_, M, T> {
117        Sender { channel: self.channel }
118    }
119
120    /// Attempts to send a value over the channel.
121    pub fn try_send(&mut self) -> Option<&mut T> {
122        self.channel.state.lock(|s| {
123            let s = &mut *s.borrow_mut();
124            match s.push_index() {
125                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
126                None => None,
127            }
128        })
129    }
130
131    /// Attempts to send a value over the channel.
132    pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
133        self.channel.state.lock(|s| {
134            let s = &mut *s.borrow_mut();
135            match s.push_index() {
136                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
137                None => {
138                    s.receive_waker.register(cx.waker());
139                    Poll::Pending
140                }
141            }
142        })
143    }
144
145    /// Asynchronously send a value over the channel.
146    pub fn send(&mut self) -> impl Future<Output = &mut T> {
147        poll_fn(|cx| {
148            self.channel.state.lock(|s| {
149                let s = &mut *s.borrow_mut();
150                match s.push_index() {
151                    Some(i) => {
152                        let r = unsafe { &mut *self.channel.buf.add(i) };
153                        Poll::Ready(r)
154                    }
155                    None => {
156                        s.receive_waker.register(cx.waker());
157                        Poll::Pending
158                    }
159                }
160            })
161        })
162    }
163
164    /// Notify the channel that the sending of the value has been finalized.
165    pub fn send_done(&mut self) {
166        self.channel.state.lock(|s| s.borrow_mut().push_done())
167    }
168
169    /// Clears all elements in the channel.
170    pub fn clear(&mut self) {
171        self.channel.state.lock(|s| {
172            s.borrow_mut().clear();
173        });
174    }
175
176    /// Returns the number of elements currently in the channel.
177    pub fn len(&self) -> usize {
178        self.channel.state.lock(|s| s.borrow().len())
179    }
180
181    /// Returns whether the channel is empty.
182    pub fn is_empty(&self) -> bool {
183        self.channel.state.lock(|s| s.borrow().is_empty())
184    }
185
186    /// Returns whether the channel is full.
187    pub fn is_full(&self) -> bool {
188        self.channel.state.lock(|s| s.borrow().is_full())
189    }
190}
191
192/// Receive-only access to a [`Channel`].
193pub struct Receiver<'a, M: RawMutex, T> {
194    channel: &'a Channel<'a, M, T>,
195}
196
197impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
198    /// Creates one further [`Receiver`] over the same channel.
199    pub fn borrow(&mut self) -> Receiver<'_, M, T> {
200        Receiver { channel: self.channel }
201    }
202
203    /// Attempts to receive a value over the channel.
204    pub fn try_receive(&mut self) -> Option<&mut T> {
205        self.channel.state.lock(|s| {
206            let s = &mut *s.borrow_mut();
207            match s.pop_index() {
208                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
209                None => None,
210            }
211        })
212    }
213
214    /// Attempts to asynchronously receive a value over the channel.
215    pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
216        self.channel.state.lock(|s| {
217            let s = &mut *s.borrow_mut();
218            match s.pop_index() {
219                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
220                None => {
221                    s.send_waker.register(cx.waker());
222                    Poll::Pending
223                }
224            }
225        })
226    }
227
228    /// Asynchronously receive a value over the channel.
229    pub fn receive(&mut self) -> impl Future<Output = &mut T> {
230        poll_fn(|cx| {
231            self.channel.state.lock(|s| {
232                let s = &mut *s.borrow_mut();
233                match s.pop_index() {
234                    Some(i) => {
235                        let r = unsafe { &mut *self.channel.buf.add(i) };
236                        Poll::Ready(r)
237                    }
238                    None => {
239                        s.send_waker.register(cx.waker());
240                        Poll::Pending
241                    }
242                }
243            })
244        })
245    }
246
247    /// Notify the channel that the receiving of the value has been finalized.
248    pub fn receive_done(&mut self) {
249        self.channel.state.lock(|s| s.borrow_mut().pop_done())
250    }
251
252    /// Clears all elements in the channel.
253    pub fn clear(&mut self) {
254        self.channel.state.lock(|s| {
255            s.borrow_mut().clear();
256        });
257    }
258
259    /// Returns the number of elements currently in the channel.
260    pub fn len(&self) -> usize {
261        self.channel.state.lock(|s| s.borrow().len())
262    }
263
264    /// Returns whether the channel is empty.
265    pub fn is_empty(&self) -> bool {
266        self.channel.state.lock(|s| s.borrow().is_empty())
267    }
268
269    /// Returns whether the channel is full.
270    pub fn is_full(&self) -> bool {
271        self.channel.state.lock(|s| s.borrow().is_full())
272    }
273}
274
275struct State {
276    /// Maximum number of elements the channel can hold.
277    capacity: usize,
278
279    /// Front index. Always 0..=(N-1)
280    front: usize,
281    /// Back index. Always 0..=(N-1).
282    back: usize,
283
284    /// Used to distinguish "empty" and "full" cases when `front == back`.
285    /// May only be `true` if `front == back`, always `false` otherwise.
286    full: bool,
287
288    send_waker: WakerRegistration,
289    receive_waker: WakerRegistration,
290}
291
292impl State {
293    fn increment(&self, i: usize) -> usize {
294        if i + 1 == self.capacity {
295            0
296        } else {
297            i + 1
298        }
299    }
300
301    fn clear(&mut self) {
302        if self.full {
303            self.receive_waker.wake();
304        }
305        self.front = 0;
306        self.back = 0;
307        self.full = false;
308    }
309
310    fn len(&self) -> usize {
311        if !self.full {
312            if self.back >= self.front {
313                self.back - self.front
314            } else {
315                self.capacity + self.back - self.front
316            }
317        } else {
318            self.capacity
319        }
320    }
321
322    fn is_full(&self) -> bool {
323        self.full
324    }
325
326    fn is_empty(&self) -> bool {
327        self.front == self.back && !self.full
328    }
329
330    fn push_index(&mut self) -> Option<usize> {
331        match self.is_full() {
332            true => None,
333            false => Some(self.back),
334        }
335    }
336
337    fn push_done(&mut self) {
338        assert!(!self.is_full());
339        self.back = self.increment(self.back);
340        if self.back == self.front {
341            self.full = true;
342        }
343        self.send_waker.wake();
344    }
345
346    fn pop_index(&mut self) -> Option<usize> {
347        match self.is_empty() {
348            true => None,
349            false => Some(self.front),
350        }
351    }
352
353    fn pop_done(&mut self) {
354        assert!(!self.is_empty());
355        self.front = self.increment(self.front);
356        self.full = false;
357        self.receive_waker.wake();
358    }
359}