embassy_sync/
channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an  "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20
21use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32/// Send-only access to a [`Channel`].
33pub struct Sender<'ch, M, T, const N: usize>
34where
35    M: RawMutex,
36{
37    channel: &'ch Channel<M, T, N>,
38}
39
40impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
41where
42    M: RawMutex,
43{
44    fn clone(&self) -> Self {
45        *self
46    }
47}
48
49impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
50
51impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
52where
53    M: RawMutex,
54{
55    /// Sends a value.
56    ///
57    /// See [`Channel::send()`]
58    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
59        self.channel.send(message)
60    }
61
62    /// Attempt to immediately send a message.
63    ///
64    /// See [`Channel::send()`]
65    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
66        self.channel.try_send(message)
67    }
68
69    /// Allows a poll_fn to poll until the channel is ready to send
70    ///
71    /// See [`Channel::poll_ready_to_send()`]
72    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73        self.channel.poll_ready_to_send(cx)
74    }
75
76    /// Returns the maximum number of elements the channel can hold.
77    ///
78    /// See [`Channel::capacity()`]
79    pub const fn capacity(&self) -> usize {
80        self.channel.capacity()
81    }
82
83    /// Returns the free capacity of the channel.
84    ///
85    /// See [`Channel::free_capacity()`]
86    pub fn free_capacity(&self) -> usize {
87        self.channel.free_capacity()
88    }
89
90    /// Clears all elements in the channel.
91    ///
92    /// See [`Channel::clear()`]
93    pub fn clear(&self) {
94        self.channel.clear();
95    }
96
97    /// Returns the number of elements currently in the channel.
98    ///
99    /// See [`Channel::len()`]
100    pub fn len(&self) -> usize {
101        self.channel.len()
102    }
103
104    /// Returns whether the channel is empty.
105    ///
106    /// See [`Channel::is_empty()`]
107    pub fn is_empty(&self) -> bool {
108        self.channel.is_empty()
109    }
110
111    /// Returns whether the channel is full.
112    ///
113    /// See [`Channel::is_full()`]
114    pub fn is_full(&self) -> bool {
115        self.channel.is_full()
116    }
117}
118
119/// Send-only access to a [`Channel`] without knowing channel size.
120pub struct DynamicSender<'ch, T> {
121    pub(crate) channel: &'ch dyn DynamicChannel<T>,
122}
123
124impl<'ch, T> Clone for DynamicSender<'ch, T> {
125    fn clone(&self) -> Self {
126        *self
127    }
128}
129
130impl<'ch, T> Copy for DynamicSender<'ch, T> {}
131
132impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
133where
134    M: RawMutex,
135{
136    fn from(s: Sender<'ch, M, T, N>) -> Self {
137        Self { channel: s.channel }
138    }
139}
140
141impl<'ch, T> DynamicSender<'ch, T> {
142    /// Sends a value.
143    ///
144    /// See [`Channel::send()`]
145    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
146        DynamicSendFuture {
147            channel: self.channel,
148            message: Some(message),
149        }
150    }
151
152    /// Attempt to immediately send a message.
153    ///
154    /// See [`Channel::send()`]
155    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
156        self.channel.try_send_with_context(message, None)
157    }
158
159    /// Allows a poll_fn to poll until the channel is ready to send
160    ///
161    /// See [`Channel::poll_ready_to_send()`]
162    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
163        self.channel.poll_ready_to_send(cx)
164    }
165}
166
167/// Send-only access to a [`Channel`] without knowing channel size.
168/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
169pub struct SendDynamicSender<'ch, T> {
170    pub(crate) channel: &'ch dyn DynamicChannel<T>,
171}
172
173impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
174    fn clone(&self) -> Self {
175        *self
176    }
177}
178
179impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
180unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
181unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
182
183impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
184where
185    M: RawMutex + Sync + Send,
186{
187    fn from(s: Sender<'ch, M, T, N>) -> Self {
188        Self { channel: s.channel }
189    }
190}
191
192impl<'ch, T> SendDynamicSender<'ch, T> {
193    /// Sends a value.
194    ///
195    /// See [`Channel::send()`]
196    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
197        DynamicSendFuture {
198            channel: self.channel,
199            message: Some(message),
200        }
201    }
202
203    /// Attempt to immediately send a message.
204    ///
205    /// See [`Channel::send()`]
206    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
207        self.channel.try_send_with_context(message, None)
208    }
209
210    /// Allows a poll_fn to poll until the channel is ready to send
211    ///
212    /// See [`Channel::poll_ready_to_send()`]
213    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
214        self.channel.poll_ready_to_send(cx)
215    }
216}
217
218/// Receive-only access to a [`Channel`].
219pub struct Receiver<'ch, M, T, const N: usize>
220where
221    M: RawMutex,
222{
223    channel: &'ch Channel<M, T, N>,
224}
225
226impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
227where
228    M: RawMutex,
229{
230    fn clone(&self) -> Self {
231        *self
232    }
233}
234
235impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
236
237impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
238where
239    M: RawMutex,
240{
241    /// Receive the next value.
242    ///
243    /// See [`Channel::receive()`].
244    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
245        self.channel.receive()
246    }
247
248    /// Is a value ready to be received in the channel
249    ///
250    /// See [`Channel::ready_to_receive()`].
251    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
252        self.channel.ready_to_receive()
253    }
254
255    /// Attempt to immediately receive the next value.
256    ///
257    /// See [`Channel::try_receive()`]
258    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
259        self.channel.try_receive()
260    }
261
262    /// Peek at the next value without removing it from the queue.
263    ///
264    /// See [`Channel::try_peek()`]
265    pub fn try_peek(&self) -> Result<T, TryReceiveError>
266    where
267        T: Clone,
268    {
269        self.channel.try_peek()
270    }
271
272    /// Allows a poll_fn to poll until the channel is ready to receive
273    ///
274    /// See [`Channel::poll_ready_to_receive()`]
275    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
276        self.channel.poll_ready_to_receive(cx)
277    }
278
279    /// Poll the channel for the next item
280    ///
281    /// See [`Channel::poll_receive()`]
282    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
283        self.channel.poll_receive(cx)
284    }
285
286    /// Returns the maximum number of elements the channel can hold.
287    ///
288    /// See [`Channel::capacity()`]
289    pub const fn capacity(&self) -> usize {
290        self.channel.capacity()
291    }
292
293    /// Returns the free capacity of the channel.
294    ///
295    /// See [`Channel::free_capacity()`]
296    pub fn free_capacity(&self) -> usize {
297        self.channel.free_capacity()
298    }
299
300    /// Clears all elements in the channel.
301    ///
302    /// See [`Channel::clear()`]
303    pub fn clear(&self) {
304        self.channel.clear();
305    }
306
307    /// Returns the number of elements currently in the channel.
308    ///
309    /// See [`Channel::len()`]
310    pub fn len(&self) -> usize {
311        self.channel.len()
312    }
313
314    /// Returns whether the channel is empty.
315    ///
316    /// See [`Channel::is_empty()`]
317    pub fn is_empty(&self) -> bool {
318        self.channel.is_empty()
319    }
320
321    /// Returns whether the channel is full.
322    ///
323    /// See [`Channel::is_full()`]
324    pub fn is_full(&self) -> bool {
325        self.channel.is_full()
326    }
327}
328
329/// Receive-only access to a [`Channel`] without knowing channel size.
330pub struct DynamicReceiver<'ch, T> {
331    pub(crate) channel: &'ch dyn DynamicChannel<T>,
332}
333
334impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
335    fn clone(&self) -> Self {
336        *self
337    }
338}
339
340impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
341
342impl<'ch, T> DynamicReceiver<'ch, T> {
343    /// Receive the next value.
344    ///
345    /// See [`Channel::receive()`].
346    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
347        DynamicReceiveFuture { channel: self.channel }
348    }
349
350    /// Attempt to immediately receive the next value.
351    ///
352    /// See [`Channel::try_receive()`]
353    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
354        self.channel.try_receive_with_context(None)
355    }
356
357    /// Peek at the next value without removing it from the queue.
358    ///
359    /// See [`Channel::try_peek()`]
360    pub fn try_peek(&self) -> Result<T, TryReceiveError>
361    where
362        T: Clone,
363    {
364        self.channel.try_peek_with_context(None)
365    }
366
367    /// Allows a poll_fn to poll until the channel is ready to receive
368    ///
369    /// See [`Channel::poll_ready_to_receive()`]
370    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
371        self.channel.poll_ready_to_receive(cx)
372    }
373
374    /// Poll the channel for the next item
375    ///
376    /// See [`Channel::poll_receive()`]
377    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
378        self.channel.poll_receive(cx)
379    }
380}
381
382impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
383where
384    M: RawMutex,
385{
386    fn from(s: Receiver<'ch, M, T, N>) -> Self {
387        Self { channel: s.channel }
388    }
389}
390
391/// Receive-only access to a [`Channel`] without knowing channel size.
392/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
393pub struct SendableDynamicReceiver<'ch, T> {
394    pub(crate) channel: &'ch dyn DynamicChannel<T>,
395}
396
397impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> {
398    fn clone(&self) -> Self {
399        *self
400    }
401}
402
403impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {}
404unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {}
405unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {}
406
407impl<'ch, T> SendableDynamicReceiver<'ch, T> {
408    /// Receive the next value.
409    ///
410    /// See [`Channel::receive()`].
411    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
412        DynamicReceiveFuture { channel: self.channel }
413    }
414
415    /// Attempt to immediately receive the next value.
416    ///
417    /// See [`Channel::try_receive()`]
418    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
419        self.channel.try_receive_with_context(None)
420    }
421
422    /// Allows a poll_fn to poll until the channel is ready to receive
423    ///
424    /// See [`Channel::poll_ready_to_receive()`]
425    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
426        self.channel.poll_ready_to_receive(cx)
427    }
428
429    /// Poll the channel for the next item
430    ///
431    /// See [`Channel::poll_receive()`]
432    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
433        self.channel.poll_receive(cx)
434    }
435}
436
437impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendableDynamicReceiver<'ch, T>
438where
439    M: RawMutex + Sync + Send,
440{
441    fn from(s: Receiver<'ch, M, T, N>) -> Self {
442        Self { channel: s.channel }
443    }
444}
445
446impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N>
447where
448    M: RawMutex,
449{
450    type Item = T;
451
452    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453        self.channel.poll_receive(cx).map(Some)
454    }
455}
456
457/// Future returned by [`Channel::receive`] and  [`Receiver::receive`].
458#[must_use = "futures do nothing unless you `.await` or poll them"]
459pub struct ReceiveFuture<'ch, M, T, const N: usize>
460where
461    M: RawMutex,
462{
463    channel: &'ch Channel<M, T, N>,
464}
465
466impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
467where
468    M: RawMutex,
469{
470    type Output = T;
471
472    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
473        self.channel.poll_receive(cx)
474    }
475}
476
477/// Future returned by [`Channel::ready_to_receive`] and  [`Receiver::ready_to_receive`].
478#[must_use = "futures do nothing unless you `.await` or poll them"]
479pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
480where
481    M: RawMutex,
482{
483    channel: &'ch Channel<M, T, N>,
484}
485
486impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
487where
488    M: RawMutex,
489{
490    type Output = ();
491
492    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
493        self.channel.poll_ready_to_receive(cx)
494    }
495}
496
497/// Future returned by [`DynamicReceiver::receive`].
498#[must_use = "futures do nothing unless you `.await` or poll them"]
499pub struct DynamicReceiveFuture<'ch, T> {
500    channel: &'ch dyn DynamicChannel<T>,
501}
502
503impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
504    type Output = T;
505
506    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
507        match self.channel.try_receive_with_context(Some(cx)) {
508            Ok(v) => Poll::Ready(v),
509            Err(TryReceiveError::Empty) => Poll::Pending,
510        }
511    }
512}
513
514impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
515    fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
516        Self { channel: value.channel }
517    }
518}
519
520/// Future returned by [`Channel::send`] and  [`Sender::send`].
521#[must_use = "futures do nothing unless you `.await` or poll them"]
522pub struct SendFuture<'ch, M, T, const N: usize>
523where
524    M: RawMutex,
525{
526    channel: &'ch Channel<M, T, N>,
527    message: Option<T>,
528}
529
530impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
531where
532    M: RawMutex,
533{
534    type Output = ();
535
536    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
537        match self.message.take() {
538            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
539                Ok(..) => Poll::Ready(()),
540                Err(TrySendError::Full(m)) => {
541                    self.message = Some(m);
542                    Poll::Pending
543                }
544            },
545            None => panic!("Message cannot be None"),
546        }
547    }
548}
549
550impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
551
552/// Future returned by [`DynamicSender::send`].
553#[must_use = "futures do nothing unless you `.await` or poll them"]
554pub struct DynamicSendFuture<'ch, T> {
555    channel: &'ch dyn DynamicChannel<T>,
556    message: Option<T>,
557}
558
559impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
560    type Output = ();
561
562    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563        match self.message.take() {
564            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
565                Ok(..) => Poll::Ready(()),
566                Err(TrySendError::Full(m)) => {
567                    self.message = Some(m);
568                    Poll::Pending
569                }
570            },
571            None => panic!("Message cannot be None"),
572        }
573    }
574}
575
576impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
577
578impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
579    fn from(value: SendFuture<'ch, M, T, N>) -> Self {
580        Self {
581            channel: value.channel,
582            message: value.message,
583        }
584    }
585}
586
587pub(crate) trait DynamicChannel<T> {
588    fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
589
590    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
591
592    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
593    where
594        T: Clone;
595
596    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
597    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
598
599    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
600}
601
602/// Error returned by [`try_receive`](Channel::try_receive).
603#[derive(PartialEq, Eq, Clone, Copy, Debug)]
604#[cfg_attr(feature = "defmt", derive(defmt::Format))]
605pub enum TryReceiveError {
606    /// A message could not be received because the channel is empty.
607    Empty,
608}
609
610/// Error returned by [`try_send`](Channel::try_send).
611#[derive(PartialEq, Eq, Clone, Copy, Debug)]
612#[cfg_attr(feature = "defmt", derive(defmt::Format))]
613pub enum TrySendError<T> {
614    /// The data could not be sent on the channel because the channel is
615    /// currently full and sending would require blocking.
616    Full(T),
617}
618
619struct ChannelState<T, const N: usize> {
620    queue: Deque<T, N>,
621    receiver_waker: WakerRegistration,
622    senders_waker: WakerRegistration,
623}
624
625impl<T, const N: usize> ChannelState<T, N> {
626    const fn new() -> Self {
627        ChannelState {
628            queue: Deque::new(),
629            receiver_waker: WakerRegistration::new(),
630            senders_waker: WakerRegistration::new(),
631        }
632    }
633
634    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
635        self.try_receive_with_context(None)
636    }
637
638    fn try_peek(&mut self) -> Result<T, TryReceiveError>
639    where
640        T: Clone,
641    {
642        self.try_peek_with_context(None)
643    }
644
645    fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
646    where
647        T: Clone,
648    {
649        if self.queue.is_full() {
650            self.senders_waker.wake();
651        }
652
653        if let Some(message) = self.queue.front() {
654            Ok(message.clone())
655        } else {
656            if let Some(cx) = cx {
657                self.receiver_waker.register(cx.waker());
658            }
659            Err(TryReceiveError::Empty)
660        }
661    }
662
663    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
664        if self.queue.is_full() {
665            self.senders_waker.wake();
666        }
667
668        if let Some(message) = self.queue.pop_front() {
669            Ok(message)
670        } else {
671            if let Some(cx) = cx {
672                self.receiver_waker.register(cx.waker());
673            }
674            Err(TryReceiveError::Empty)
675        }
676    }
677
678    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
679        if self.queue.is_full() {
680            self.senders_waker.wake();
681        }
682
683        if let Some(message) = self.queue.pop_front() {
684            Poll::Ready(message)
685        } else {
686            self.receiver_waker.register(cx.waker());
687            Poll::Pending
688        }
689    }
690
691    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
692        self.receiver_waker.register(cx.waker());
693
694        if !self.queue.is_empty() {
695            Poll::Ready(())
696        } else {
697            Poll::Pending
698        }
699    }
700
701    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
702        self.try_send_with_context(message, None)
703    }
704
705    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
706        match self.queue.push_back(message) {
707            Ok(()) => {
708                self.receiver_waker.wake();
709                Ok(())
710            }
711            Err(message) => {
712                if let Some(cx) = cx {
713                    self.senders_waker.register(cx.waker());
714                }
715                Err(TrySendError::Full(message))
716            }
717        }
718    }
719
720    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
721        self.senders_waker.register(cx.waker());
722
723        if !self.queue.is_full() {
724            Poll::Ready(())
725        } else {
726            Poll::Pending
727        }
728    }
729
730    fn clear(&mut self) {
731        if self.queue.is_full() {
732            self.senders_waker.wake();
733        }
734        self.queue.clear();
735    }
736
737    fn len(&self) -> usize {
738        self.queue.len()
739    }
740
741    fn is_empty(&self) -> bool {
742        self.queue.is_empty()
743    }
744
745    fn is_full(&self) -> bool {
746        self.queue.is_full()
747    }
748}
749
750/// A bounded channel for communicating between asynchronous tasks
751/// with backpressure.
752///
753/// The channel will buffer up to the provided number of messages.  Once the
754/// buffer is full, attempts to `send` new messages will wait until a message is
755/// received from the channel.
756///
757/// All data sent will become available in the same order as it was sent.
758pub struct Channel<M, T, const N: usize>
759where
760    M: RawMutex,
761{
762    inner: Mutex<M, RefCell<ChannelState<T, N>>>,
763}
764
765impl<M, T, const N: usize> Channel<M, T, N>
766where
767    M: RawMutex,
768{
769    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
770    ///
771    /// ```
772    /// use embassy_sync::channel::Channel;
773    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
774    ///
775    /// // Declare a bounded channel of 3 u32s.
776    /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
777    /// ```
778    pub const fn new() -> Self {
779        Self {
780            inner: Mutex::new(RefCell::new(ChannelState::new())),
781        }
782    }
783
784    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
785        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
786    }
787
788    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
789        self.lock(|c| c.try_receive_with_context(cx))
790    }
791
792    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
793    where
794        T: Clone,
795    {
796        self.lock(|c| c.try_peek_with_context(cx))
797    }
798
799    /// Poll the channel for the next message
800    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
801        self.lock(|c| c.poll_receive(cx))
802    }
803
804    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
805        self.lock(|c| c.try_send_with_context(m, cx))
806    }
807
808    /// Allows a poll_fn to poll until the channel is ready to receive
809    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
810        self.lock(|c| c.poll_ready_to_receive(cx))
811    }
812
813    /// Allows a poll_fn to poll until the channel is ready to send
814    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
815        self.lock(|c| c.poll_ready_to_send(cx))
816    }
817
818    /// Get a sender for this channel.
819    pub fn sender(&self) -> Sender<'_, M, T, N> {
820        Sender { channel: self }
821    }
822
823    /// Get a receiver for this channel.
824    pub fn receiver(&self) -> Receiver<'_, M, T, N> {
825        Receiver { channel: self }
826    }
827
828    /// Get a sender for this channel using dynamic dispatch.
829    pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
830        DynamicSender { channel: self }
831    }
832
833    /// Get a receiver for this channel using dynamic dispatch.
834    pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
835        DynamicReceiver { channel: self }
836    }
837
838    /// Send a value, waiting until there is capacity.
839    ///
840    /// Sending completes when the value has been pushed to the channel's queue.
841    /// This doesn't mean the value has been received yet.
842    pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
843        SendFuture {
844            channel: self,
845            message: Some(message),
846        }
847    }
848
849    /// Attempt to immediately send a message.
850    ///
851    /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
852    /// buffer is full, instead of waiting.
853    ///
854    /// # Errors
855    ///
856    /// If the channel capacity has been reached, i.e., the channel has `n`
857    /// buffered values where `n` is the argument passed to [`Channel`], then an
858    /// error is returned.
859    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
860        self.lock(|c| c.try_send(message))
861    }
862
863    /// Receive the next value.
864    ///
865    /// If there are no messages in the channel's buffer, this method will
866    /// wait until a message is sent.
867    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
868        ReceiveFuture { channel: self }
869    }
870
871    /// Is a value ready to be received in the channel
872    ///
873    /// If there are no messages in the channel's buffer, this method will
874    /// wait until there is at least one
875    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
876        ReceiveReadyFuture { channel: self }
877    }
878
879    /// Attempt to immediately receive a message.
880    ///
881    /// This method will either receive a message from the channel immediately or return an error
882    /// if the channel is empty.
883    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
884        self.lock(|c| c.try_receive())
885    }
886
887    /// Peek at the next value without removing it from the queue.
888    ///
889    /// This method will either receive a copy of the message from the channel immediately or return
890    /// an error if the channel is empty.
891    pub fn try_peek(&self) -> Result<T, TryReceiveError>
892    where
893        T: Clone,
894    {
895        self.lock(|c| c.try_peek())
896    }
897
898    /// Returns the maximum number of elements the channel can hold.
899    pub const fn capacity(&self) -> usize {
900        N
901    }
902
903    /// Returns the free capacity of the channel.
904    ///
905    /// This is equivalent to `capacity() - len()`
906    pub fn free_capacity(&self) -> usize {
907        N - self.len()
908    }
909
910    /// Clears all elements in the channel.
911    pub fn clear(&self) {
912        self.lock(|c| c.clear());
913    }
914
915    /// Returns the number of elements currently in the channel.
916    pub fn len(&self) -> usize {
917        self.lock(|c| c.len())
918    }
919
920    /// Returns whether the channel is empty.
921    pub fn is_empty(&self) -> bool {
922        self.lock(|c| c.is_empty())
923    }
924
925    /// Returns whether the channel is full.
926    pub fn is_full(&self) -> bool {
927        self.lock(|c| c.is_full())
928    }
929}
930
931/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
932/// tradeoff cost of dynamic dispatch.
933impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
934where
935    M: RawMutex,
936{
937    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
938        Channel::try_send_with_context(self, m, cx)
939    }
940
941    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
942        Channel::try_receive_with_context(self, cx)
943    }
944
945    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
946    where
947        T: Clone,
948    {
949        Channel::try_peek_with_context(self, cx)
950    }
951
952    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
953        Channel::poll_ready_to_send(self, cx)
954    }
955
956    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
957        Channel::poll_ready_to_receive(self, cx)
958    }
959
960    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
961        Channel::poll_receive(self, cx)
962    }
963}
964
965impl<M, T, const N: usize> futures_util::Stream for Channel<M, T, N>
966where
967    M: RawMutex,
968{
969    type Item = T;
970
971    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972        self.poll_receive(cx).map(Some)
973    }
974}
975
976#[cfg(test)]
977mod tests {
978    use core::time::Duration;
979
980    use futures_executor::ThreadPool;
981    use futures_timer::Delay;
982    use futures_util::task::SpawnExt;
983    use static_cell::StaticCell;
984
985    use super::*;
986    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
987
988    fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
989        c.queue.capacity() - c.queue.len()
990    }
991
992    #[test]
993    fn sending_once() {
994        let mut c = ChannelState::<u32, 3>::new();
995        assert!(c.try_send(1).is_ok());
996        assert_eq!(capacity(&c), 2);
997    }
998
999    #[test]
1000    fn sending_when_full() {
1001        let mut c = ChannelState::<u32, 3>::new();
1002        let _ = c.try_send(1);
1003        let _ = c.try_send(1);
1004        let _ = c.try_send(1);
1005        match c.try_send(2) {
1006            Err(TrySendError::Full(2)) => assert!(true),
1007            _ => assert!(false),
1008        }
1009        assert_eq!(capacity(&c), 0);
1010    }
1011
1012    #[test]
1013    fn receiving_once_with_one_send() {
1014        let mut c = ChannelState::<u32, 3>::new();
1015        assert!(c.try_send(1).is_ok());
1016        assert_eq!(c.try_receive().unwrap(), 1);
1017        assert_eq!(capacity(&c), 3);
1018    }
1019
1020    #[test]
1021    fn receiving_when_empty() {
1022        let mut c = ChannelState::<u32, 3>::new();
1023        match c.try_receive() {
1024            Err(TryReceiveError::Empty) => assert!(true),
1025            _ => assert!(false),
1026        }
1027        assert_eq!(capacity(&c), 3);
1028    }
1029
1030    #[test]
1031    fn simple_send_and_receive() {
1032        let c = Channel::<NoopRawMutex, u32, 3>::new();
1033        assert!(c.try_send(1).is_ok());
1034        assert_eq!(c.try_peek().unwrap(), 1);
1035        assert_eq!(c.try_peek().unwrap(), 1);
1036        assert_eq!(c.try_receive().unwrap(), 1);
1037    }
1038
1039    #[test]
1040    fn cloning() {
1041        let c = Channel::<NoopRawMutex, u32, 3>::new();
1042        let r1 = c.receiver();
1043        let s1 = c.sender();
1044
1045        let _ = r1.clone();
1046        let _ = s1.clone();
1047    }
1048
1049    #[test]
1050    fn dynamic_dispatch_into() {
1051        let c = Channel::<NoopRawMutex, u32, 3>::new();
1052        let s: DynamicSender<'_, u32> = c.sender().into();
1053        let r: DynamicReceiver<'_, u32> = c.receiver().into();
1054
1055        assert!(s.try_send(1).is_ok());
1056        assert_eq!(r.try_receive().unwrap(), 1);
1057    }
1058
1059    #[test]
1060    fn dynamic_dispatch_constructor() {
1061        let c = Channel::<NoopRawMutex, u32, 3>::new();
1062        let s = c.dyn_sender();
1063        let r = c.dyn_receiver();
1064
1065        assert!(s.try_send(1).is_ok());
1066        assert_eq!(r.try_peek().unwrap(), 1);
1067        assert_eq!(r.try_peek().unwrap(), 1);
1068        assert_eq!(r.try_receive().unwrap(), 1);
1069    }
1070
1071    #[futures_test::test]
1072    async fn receiver_receives_given_try_send_async() {
1073        let executor = ThreadPool::new().unwrap();
1074
1075        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1076        let c = &*CHANNEL.init(Channel::new());
1077        let c2 = c;
1078        assert!(executor
1079            .spawn(async move {
1080                assert!(c2.try_send(1).is_ok());
1081            })
1082            .is_ok());
1083        assert_eq!(c.receive().await, 1);
1084    }
1085
1086    #[futures_test::test]
1087    async fn sender_send_completes_if_capacity() {
1088        let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1089        c.send(1).await;
1090        assert_eq!(c.receive().await, 1);
1091    }
1092
1093    #[futures_test::test]
1094    async fn senders_sends_wait_until_capacity() {
1095        let executor = ThreadPool::new().unwrap();
1096
1097        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1098        let c = &*CHANNEL.init(Channel::new());
1099        assert!(c.try_send(1).is_ok());
1100
1101        let c2 = c;
1102        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1103        let c2 = c;
1104        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1105        // Wish I could think of a means of determining that the async send is waiting instead.
1106        // However, I've used the debugger to observe that the send does indeed wait.
1107        Delay::new(Duration::from_millis(500)).await;
1108        assert_eq!(c.receive().await, 1);
1109        assert!(executor
1110            .spawn(async move {
1111                loop {
1112                    c.receive().await;
1113                }
1114            })
1115            .is_ok());
1116        send_task_1.unwrap().await;
1117        send_task_2.unwrap().await;
1118    }
1119}