Skip to main content

embassy_sync/
channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an  "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20//! # Example: Message passing between task and interrupt handler
21//!
22//! ```rust
23//! use embassy_sync::channel::Channel;
24//! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
25//!
26//! static SHARED_CHANNEL: Channel<CriticalSectionRawMutex, u32, 8> = Channel::new();
27//!
28//! fn my_interrupt_handler() {
29//!     // Do some work..
30//!     // ...
31//!     if let Err(e) = SHARED_CHANNEL.sender().try_send(42) {
32//!         // Channel is full..
33//!     }
34//! }
35//!
36//! async fn my_async_task() {
37//!     // ...
38//!     let receiver = SHARED_CHANNEL.receiver();
39//!     loop {
40//!         let data_from_interrupt = receiver.receive().await;
41//!         // Do something with the data.
42//!     }
43//! }
44//! ```
45
46use core::cell::RefCell;
47use core::future::Future;
48use core::pin::Pin;
49use core::task::{Context, Poll};
50
51use heapless::Deque;
52
53use crate::blocking_mutex::Mutex;
54use crate::blocking_mutex::raw::RawMutex;
55use crate::waitqueue::WakerRegistration;
56
57/// Send-only access to a [`Channel`].
58#[derive(Debug)]
59pub struct Sender<'ch, M, T, const N: usize>
60where
61    M: RawMutex,
62{
63    channel: &'ch Channel<M, T, N>,
64}
65
66impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
67where
68    M: RawMutex,
69{
70    fn clone(&self) -> Self {
71        *self
72    }
73}
74
75impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
76
77impl<'ch, M, T, const N: usize> futures_sink::Sink<T> for Sender<'ch, M, T, N>
78where
79    M: RawMutex,
80{
81    type Error = TrySendError<T>;
82
83    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84        self.poll_ready_to_send(cx).map(|()| Ok(()))
85    }
86
87    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
88        self.try_send(item)
89    }
90
91    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92        Poll::Ready(Ok(()))
93    }
94
95    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96        Poll::Ready(Ok(()))
97    }
98}
99
100impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
101where
102    M: RawMutex,
103{
104    /// Sends a value.
105    ///
106    /// See [`Channel::send()`]
107    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
108        self.channel.send(message)
109    }
110
111    /// Attempt to immediately send a message.
112    ///
113    /// See [`Channel::send()`]
114    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
115        self.channel.try_send(message)
116    }
117
118    /// Allows a poll_fn to poll until the channel is ready to send
119    ///
120    /// See [`Channel::poll_ready_to_send()`]
121    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
122        self.channel.poll_ready_to_send(cx)
123    }
124
125    /// Returns the maximum number of elements the channel can hold.
126    ///
127    /// See [`Channel::capacity()`]
128    pub const fn capacity(&self) -> usize {
129        self.channel.capacity()
130    }
131
132    /// Returns the free capacity of the channel.
133    ///
134    /// See [`Channel::free_capacity()`]
135    pub fn free_capacity(&self) -> usize {
136        self.channel.free_capacity()
137    }
138
139    /// Clears all elements in the channel.
140    ///
141    /// See [`Channel::clear()`]
142    pub fn clear(&self) {
143        self.channel.clear();
144    }
145
146    /// Returns the number of elements currently in the channel.
147    ///
148    /// See [`Channel::len()`]
149    pub fn len(&self) -> usize {
150        self.channel.len()
151    }
152
153    /// Returns whether the channel is empty.
154    ///
155    /// See [`Channel::is_empty()`]
156    pub fn is_empty(&self) -> bool {
157        self.channel.is_empty()
158    }
159
160    /// Returns whether the channel is full.
161    ///
162    /// See [`Channel::is_full()`]
163    pub fn is_full(&self) -> bool {
164        self.channel.is_full()
165    }
166}
167
168/// Send-only access to a [`Channel`] without knowing channel size.
169pub struct DynamicSender<'ch, T> {
170    pub(crate) channel: &'ch dyn DynamicChannel<T>,
171}
172
173impl<'ch, T> Clone for DynamicSender<'ch, T> {
174    fn clone(&self) -> Self {
175        *self
176    }
177}
178
179impl<'ch, T> Copy for DynamicSender<'ch, T> {}
180
181impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
182where
183    M: RawMutex,
184{
185    fn from(s: Sender<'ch, M, T, N>) -> Self {
186        Self { channel: s.channel }
187    }
188}
189
190impl<'ch, T> DynamicSender<'ch, T> {
191    /// Sends a value.
192    ///
193    /// See [`Channel::send()`]
194    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
195        DynamicSendFuture {
196            channel: self.channel,
197            message: Some(message),
198        }
199    }
200
201    /// Attempt to immediately send a message.
202    ///
203    /// See [`Channel::send()`]
204    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
205        self.channel.try_send_with_context(message, None)
206    }
207
208    /// Allows a poll_fn to poll until the channel is ready to send
209    ///
210    /// See [`Channel::poll_ready_to_send()`]
211    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
212        self.channel.poll_ready_to_send(cx)
213    }
214}
215
216/// Send-only access to a [`Channel`] without knowing channel size.
217/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
218pub struct SendDynamicSender<'ch, T> {
219    pub(crate) channel: &'ch dyn DynamicChannel<T>,
220}
221
222impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
223    fn clone(&self) -> Self {
224        *self
225    }
226}
227
228impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
229unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
230unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
231
232impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
233where
234    M: RawMutex + Sync + Send,
235{
236    fn from(s: Sender<'ch, M, T, N>) -> Self {
237        Self { channel: s.channel }
238    }
239}
240
241impl<'ch, T> SendDynamicSender<'ch, T> {
242    /// Sends a value.
243    ///
244    /// See [`Channel::send()`]
245    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
246        DynamicSendFuture {
247            channel: self.channel,
248            message: Some(message),
249        }
250    }
251
252    /// Attempt to immediately send a message.
253    ///
254    /// See [`Channel::send()`]
255    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
256        self.channel.try_send_with_context(message, None)
257    }
258
259    /// Allows a poll_fn to poll until the channel is ready to send
260    ///
261    /// See [`Channel::poll_ready_to_send()`]
262    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
263        self.channel.poll_ready_to_send(cx)
264    }
265}
266
267/// Receive-only access to a [`Channel`].
268#[derive(Debug)]
269pub struct Receiver<'ch, M, T, const N: usize>
270where
271    M: RawMutex,
272{
273    channel: &'ch Channel<M, T, N>,
274}
275
276impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
277where
278    M: RawMutex,
279{
280    fn clone(&self) -> Self {
281        *self
282    }
283}
284
285impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
286
287impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
288where
289    M: RawMutex,
290{
291    /// Receive the next value.
292    ///
293    /// See [`Channel::receive()`].
294    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
295        self.channel.receive()
296    }
297
298    /// Is a value ready to be received in the channel
299    ///
300    /// See [`Channel::ready_to_receive()`].
301    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
302        self.channel.ready_to_receive()
303    }
304
305    /// Attempt to immediately receive the next value.
306    ///
307    /// See [`Channel::try_receive()`]
308    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
309        self.channel.try_receive()
310    }
311
312    /// Peek at the next value without removing it from the queue.
313    ///
314    /// See [`Channel::try_peek()`]
315    pub fn try_peek(&self) -> Result<T, TryReceiveError>
316    where
317        T: Clone,
318    {
319        self.channel.try_peek()
320    }
321
322    /// Allows a poll_fn to poll until the channel is ready to receive
323    ///
324    /// See [`Channel::poll_ready_to_receive()`]
325    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
326        self.channel.poll_ready_to_receive(cx)
327    }
328
329    /// Poll the channel for the next item
330    ///
331    /// See [`Channel::poll_receive()`]
332    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
333        self.channel.poll_receive(cx)
334    }
335
336    /// Returns the maximum number of elements the channel can hold.
337    ///
338    /// See [`Channel::capacity()`]
339    pub const fn capacity(&self) -> usize {
340        self.channel.capacity()
341    }
342
343    /// Returns the free capacity of the channel.
344    ///
345    /// See [`Channel::free_capacity()`]
346    pub fn free_capacity(&self) -> usize {
347        self.channel.free_capacity()
348    }
349
350    /// Clears all elements in the channel.
351    ///
352    /// See [`Channel::clear()`]
353    pub fn clear(&self) {
354        self.channel.clear();
355    }
356
357    /// Returns the number of elements currently in the channel.
358    ///
359    /// See [`Channel::len()`]
360    pub fn len(&self) -> usize {
361        self.channel.len()
362    }
363
364    /// Returns whether the channel is empty.
365    ///
366    /// See [`Channel::is_empty()`]
367    pub fn is_empty(&self) -> bool {
368        self.channel.is_empty()
369    }
370
371    /// Returns whether the channel is full.
372    ///
373    /// See [`Channel::is_full()`]
374    pub fn is_full(&self) -> bool {
375        self.channel.is_full()
376    }
377}
378
379/// Receive-only access to a [`Channel`] without knowing channel size.
380pub struct DynamicReceiver<'ch, T> {
381    pub(crate) channel: &'ch dyn DynamicChannel<T>,
382}
383
384impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
385    fn clone(&self) -> Self {
386        *self
387    }
388}
389
390impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
391
392impl<'ch, T> DynamicReceiver<'ch, T> {
393    /// Receive the next value.
394    ///
395    /// See [`Channel::receive()`].
396    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
397        DynamicReceiveFuture { channel: self.channel }
398    }
399
400    /// Attempt to immediately receive the next value.
401    ///
402    /// See [`Channel::try_receive()`]
403    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
404        self.channel.try_receive_with_context(None)
405    }
406
407    /// Peek at the next value without removing it from the queue.
408    ///
409    /// See [`Channel::try_peek()`]
410    pub fn try_peek(&self) -> Result<T, TryReceiveError>
411    where
412        T: Clone,
413    {
414        self.channel.try_peek_with_context(None)
415    }
416
417    /// Allows a poll_fn to poll until the channel is ready to receive
418    ///
419    /// See [`Channel::poll_ready_to_receive()`]
420    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
421        self.channel.poll_ready_to_receive(cx)
422    }
423
424    /// Poll the channel for the next item
425    ///
426    /// See [`Channel::poll_receive()`]
427    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
428        self.channel.poll_receive(cx)
429    }
430}
431
432impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
433where
434    M: RawMutex,
435{
436    fn from(s: Receiver<'ch, M, T, N>) -> Self {
437        Self { channel: s.channel }
438    }
439}
440
441/// Receive-only access to a [`Channel`] without knowing channel size.
442/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
443pub struct SendDynamicReceiver<'ch, T> {
444    pub(crate) channel: &'ch dyn DynamicChannel<T>,
445}
446
447/// Receive-only access to a [`Channel`] without knowing channel size.
448/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
449#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
450pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
451
452impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
453    fn clone(&self) -> Self {
454        *self
455    }
456}
457
458impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
459unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
460unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
461
462impl<'ch, T> SendDynamicReceiver<'ch, T> {
463    /// Receive the next value.
464    ///
465    /// See [`Channel::receive()`].
466    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
467        DynamicReceiveFuture { channel: self.channel }
468    }
469
470    /// Attempt to immediately receive the next value.
471    ///
472    /// See [`Channel::try_receive()`]
473    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
474        self.channel.try_receive_with_context(None)
475    }
476
477    /// Allows a poll_fn to poll until the channel is ready to receive
478    ///
479    /// See [`Channel::poll_ready_to_receive()`]
480    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
481        self.channel.poll_ready_to_receive(cx)
482    }
483
484    /// Poll the channel for the next item
485    ///
486    /// See [`Channel::poll_receive()`]
487    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
488        self.channel.poll_receive(cx)
489    }
490}
491
492impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
493where
494    M: RawMutex + Sync + Send,
495{
496    fn from(s: Receiver<'ch, M, T, N>) -> Self {
497        Self { channel: s.channel }
498    }
499}
500
501impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
502where
503    M: RawMutex,
504{
505    type Item = T;
506
507    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
508        self.channel.poll_receive(cx).map(Some)
509    }
510}
511
512/// Future returned by [`Channel::receive`] and  [`Receiver::receive`].
513#[must_use = "futures do nothing unless you `.await` or poll them"]
514#[derive(Debug)]
515pub struct ReceiveFuture<'ch, M, T, const N: usize>
516where
517    M: RawMutex,
518{
519    channel: &'ch Channel<M, T, N>,
520}
521
522impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
523where
524    M: RawMutex,
525{
526    type Output = T;
527
528    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
529        self.channel.poll_receive(cx)
530    }
531}
532
533/// Future returned by [`Channel::ready_to_receive`] and  [`Receiver::ready_to_receive`].
534#[must_use = "futures do nothing unless you `.await` or poll them"]
535#[derive(Debug)]
536pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
537where
538    M: RawMutex,
539{
540    channel: &'ch Channel<M, T, N>,
541}
542
543impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
544where
545    M: RawMutex,
546{
547    type Output = ();
548
549    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
550        self.channel.poll_ready_to_receive(cx)
551    }
552}
553
554/// Future returned by [`DynamicReceiver::receive`].
555#[must_use = "futures do nothing unless you `.await` or poll them"]
556pub struct DynamicReceiveFuture<'ch, T> {
557    channel: &'ch dyn DynamicChannel<T>,
558}
559
560impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
561    type Output = T;
562
563    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
564        match self.channel.try_receive_with_context(Some(cx)) {
565            Ok(v) => Poll::Ready(v),
566            Err(TryReceiveError::Empty) => Poll::Pending,
567        }
568    }
569}
570
571impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
572    fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
573        Self { channel: value.channel }
574    }
575}
576
577/// Future returned by [`Channel::send`] and  [`Sender::send`].
578#[must_use = "futures do nothing unless you `.await` or poll them"]
579#[derive(Debug)]
580pub struct SendFuture<'ch, M, T, const N: usize>
581where
582    M: RawMutex,
583{
584    channel: &'ch Channel<M, T, N>,
585    message: Option<T>,
586}
587
588impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
589where
590    M: RawMutex,
591{
592    type Output = ();
593
594    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
595        match self.message.take() {
596            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
597                Ok(..) => Poll::Ready(()),
598                Err(TrySendError::Full(m)) => {
599                    self.message = Some(m);
600                    Poll::Pending
601                }
602            },
603            None => panic!("Message cannot be None"),
604        }
605    }
606}
607
608impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
609
610/// Future returned by [`DynamicSender::send`].
611#[must_use = "futures do nothing unless you `.await` or poll them"]
612pub struct DynamicSendFuture<'ch, T> {
613    channel: &'ch dyn DynamicChannel<T>,
614    message: Option<T>,
615}
616
617impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
618    type Output = ();
619
620    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
621        match self.message.take() {
622            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
623                Ok(..) => Poll::Ready(()),
624                Err(TrySendError::Full(m)) => {
625                    self.message = Some(m);
626                    Poll::Pending
627                }
628            },
629            None => panic!("Message cannot be None"),
630        }
631    }
632}
633
634impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
635
636impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
637    fn from(value: SendFuture<'ch, M, T, N>) -> Self {
638        Self {
639            channel: value.channel,
640            message: value.message,
641        }
642    }
643}
644
645pub(crate) trait DynamicChannel<T> {
646    fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
647
648    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
649
650    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
651    where
652        T: Clone;
653
654    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
655    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
656
657    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
658}
659
660/// Error returned by [`try_receive`](Channel::try_receive).
661#[derive(PartialEq, Eq, Clone, Copy, Debug)]
662#[cfg_attr(feature = "defmt", derive(defmt::Format))]
663pub enum TryReceiveError {
664    /// A message could not be received because the channel is empty.
665    Empty,
666}
667
668/// Error returned by [`try_send`](Channel::try_send).
669#[derive(PartialEq, Eq, Clone, Copy, Debug)]
670#[cfg_attr(feature = "defmt", derive(defmt::Format))]
671pub enum TrySendError<T> {
672    /// The data could not be sent on the channel because the channel is
673    /// currently full and sending would require blocking.
674    Full(T),
675}
676
677#[derive(Debug)]
678struct ChannelState<T, const N: usize> {
679    queue: Deque<T, N>,
680    receiver_waker: WakerRegistration,
681    senders_waker: WakerRegistration,
682}
683
684impl<T, const N: usize> ChannelState<T, N> {
685    const fn new() -> Self {
686        ChannelState {
687            queue: Deque::new(),
688            receiver_waker: WakerRegistration::new(),
689            senders_waker: WakerRegistration::new(),
690        }
691    }
692
693    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
694        self.try_receive_with_context(None)
695    }
696
697    fn try_peek(&mut self) -> Result<T, TryReceiveError>
698    where
699        T: Clone,
700    {
701        self.try_peek_with_context(None)
702    }
703
704    fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
705    where
706        T: Clone,
707    {
708        if self.queue.is_full() {
709            self.senders_waker.wake();
710        }
711
712        if let Some(message) = self.queue.front() {
713            Ok(message.clone())
714        } else {
715            if let Some(cx) = cx {
716                self.receiver_waker.register(cx.waker());
717            }
718            Err(TryReceiveError::Empty)
719        }
720    }
721
722    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
723        if self.queue.is_full() {
724            self.senders_waker.wake();
725        }
726
727        if let Some(message) = self.queue.pop_front() {
728            Ok(message)
729        } else {
730            if let Some(cx) = cx {
731                self.receiver_waker.register(cx.waker());
732            }
733            Err(TryReceiveError::Empty)
734        }
735    }
736
737    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
738        if self.queue.is_full() {
739            self.senders_waker.wake();
740        }
741
742        if let Some(message) = self.queue.pop_front() {
743            Poll::Ready(message)
744        } else {
745            self.receiver_waker.register(cx.waker());
746            Poll::Pending
747        }
748    }
749
750    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
751        self.receiver_waker.register(cx.waker());
752
753        if !self.queue.is_empty() {
754            Poll::Ready(())
755        } else {
756            Poll::Pending
757        }
758    }
759
760    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
761        self.try_send_with_context(message, None)
762    }
763
764    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
765        match self.queue.push_back(message) {
766            Ok(()) => {
767                self.receiver_waker.wake();
768                Ok(())
769            }
770            Err(message) => {
771                if let Some(cx) = cx {
772                    self.senders_waker.register(cx.waker());
773                }
774                Err(TrySendError::Full(message))
775            }
776        }
777    }
778
779    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
780        self.senders_waker.register(cx.waker());
781
782        if !self.queue.is_full() {
783            Poll::Ready(())
784        } else {
785            Poll::Pending
786        }
787    }
788
789    fn clear(&mut self) {
790        if self.queue.is_full() {
791            self.senders_waker.wake();
792        }
793        self.queue.clear();
794    }
795
796    fn len(&self) -> usize {
797        self.queue.len()
798    }
799
800    fn is_empty(&self) -> bool {
801        self.queue.is_empty()
802    }
803
804    fn is_full(&self) -> bool {
805        self.queue.is_full()
806    }
807}
808
809/// A bounded channel for communicating between asynchronous tasks
810/// with backpressure.
811///
812/// The channel will buffer up to the provided number of messages.  Once the
813/// buffer is full, attempts to `send` new messages will wait until a message is
814/// received from the channel.
815///
816/// All data sent will become available in the same order as it was sent.
817#[derive(Debug)]
818pub struct Channel<M, T, const N: usize>
819where
820    M: RawMutex,
821{
822    inner: Mutex<M, RefCell<ChannelState<T, N>>>,
823}
824
825impl<M, T, const N: usize> Channel<M, T, N>
826where
827    M: RawMutex,
828{
829    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
830    ///
831    /// ```
832    /// use embassy_sync::channel::Channel;
833    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
834    ///
835    /// // Declare a bounded channel of 3 u32s.
836    /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
837    /// ```
838    pub const fn new() -> Self {
839        Self {
840            inner: Mutex::new(RefCell::new(ChannelState::new())),
841        }
842    }
843
844    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
845        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
846    }
847
848    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
849        self.lock(|c| c.try_receive_with_context(cx))
850    }
851
852    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
853    where
854        T: Clone,
855    {
856        self.lock(|c| c.try_peek_with_context(cx))
857    }
858
859    /// Poll the channel for the next message
860    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
861        self.lock(|c| c.poll_receive(cx))
862    }
863
864    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
865        self.lock(|c| c.try_send_with_context(m, cx))
866    }
867
868    /// Allows a poll_fn to poll until the channel is ready to receive
869    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
870        self.lock(|c| c.poll_ready_to_receive(cx))
871    }
872
873    /// Allows a poll_fn to poll until the channel is ready to send
874    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
875        self.lock(|c| c.poll_ready_to_send(cx))
876    }
877
878    /// Get a sender for this channel.
879    pub fn sender(&self) -> Sender<'_, M, T, N> {
880        Sender { channel: self }
881    }
882
883    /// Get a receiver for this channel.
884    pub fn receiver(&self) -> Receiver<'_, M, T, N> {
885        Receiver { channel: self }
886    }
887
888    /// Get a sender for this channel using dynamic dispatch.
889    pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
890        DynamicSender { channel: self }
891    }
892
893    /// Get a receiver for this channel using dynamic dispatch.
894    pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
895        DynamicReceiver { channel: self }
896    }
897
898    /// Send a value, waiting until there is capacity.
899    ///
900    /// Sending completes when the value has been pushed to the channel's queue.
901    /// This doesn't mean the value has been received yet.
902    pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
903        SendFuture {
904            channel: self,
905            message: Some(message),
906        }
907    }
908
909    /// Attempt to immediately send a message.
910    ///
911    /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
912    /// buffer is full, instead of waiting.
913    ///
914    /// # Errors
915    ///
916    /// If the channel capacity has been reached, i.e., the channel has `n`
917    /// buffered values where `n` is the argument passed to [`Channel`], then an
918    /// error is returned.
919    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
920        self.lock(|c| c.try_send(message))
921    }
922
923    /// Receive the next value.
924    ///
925    /// If there are no messages in the channel's buffer, this method will
926    /// wait until a message is sent.
927    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
928        ReceiveFuture { channel: self }
929    }
930
931    /// Is a value ready to be received in the channel
932    ///
933    /// If there are no messages in the channel's buffer, this method will
934    /// wait until there is at least one
935    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
936        ReceiveReadyFuture { channel: self }
937    }
938
939    /// Attempt to immediately receive a message.
940    ///
941    /// This method will either receive a message from the channel immediately or return an error
942    /// if the channel is empty.
943    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
944        self.lock(|c| c.try_receive())
945    }
946
947    /// Peek at the next value without removing it from the queue.
948    ///
949    /// This method will either receive a copy of the message from the channel immediately or return
950    /// an error if the channel is empty.
951    pub fn try_peek(&self) -> Result<T, TryReceiveError>
952    where
953        T: Clone,
954    {
955        self.lock(|c| c.try_peek())
956    }
957
958    /// Returns the maximum number of elements the channel can hold.
959    pub const fn capacity(&self) -> usize {
960        N
961    }
962
963    /// Returns the free capacity of the channel.
964    ///
965    /// This is equivalent to `capacity() - len()`
966    pub fn free_capacity(&self) -> usize {
967        N - self.len()
968    }
969
970    /// Clears all elements in the channel.
971    pub fn clear(&self) {
972        self.lock(|c| c.clear());
973    }
974
975    /// Returns the number of elements currently in the channel.
976    pub fn len(&self) -> usize {
977        self.lock(|c| c.len())
978    }
979
980    /// Returns whether the channel is empty.
981    pub fn is_empty(&self) -> bool {
982        self.lock(|c| c.is_empty())
983    }
984
985    /// Returns whether the channel is full.
986    pub fn is_full(&self) -> bool {
987        self.lock(|c| c.is_full())
988    }
989}
990
991/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
992/// tradeoff cost of dynamic dispatch.
993impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
994where
995    M: RawMutex,
996{
997    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
998        Channel::try_send_with_context(self, m, cx)
999    }
1000
1001    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
1002        Channel::try_receive_with_context(self, cx)
1003    }
1004
1005    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
1006    where
1007        T: Clone,
1008    {
1009        Channel::try_peek_with_context(self, cx)
1010    }
1011
1012    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
1013        Channel::poll_ready_to_send(self, cx)
1014    }
1015
1016    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
1017        Channel::poll_ready_to_receive(self, cx)
1018    }
1019
1020    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
1021        Channel::poll_receive(self, cx)
1022    }
1023}
1024
1025impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
1026where
1027    M: RawMutex,
1028{
1029    type Item = T;
1030
1031    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1032        self.poll_receive(cx).map(Some)
1033    }
1034}
1035
1036impl<M, T, const N: usize> futures_sink::Sink<T> for Channel<M, T, N>
1037where
1038    M: RawMutex,
1039{
1040    type Error = TrySendError<T>;
1041
1042    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1043        self.poll_ready_to_send(cx).map(|()| Ok(()))
1044    }
1045
1046    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
1047        self.try_send(item)
1048    }
1049
1050    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1051        Poll::Ready(Ok(()))
1052    }
1053
1054    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1055        Poll::Ready(Ok(()))
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use core::time::Duration;
1062
1063    use futures_executor::ThreadPool;
1064    use futures_timer::Delay;
1065    use futures_util::task::SpawnExt;
1066    use static_cell::StaticCell;
1067
1068    use super::*;
1069    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
1070
1071    fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
1072        c.queue.capacity() - c.queue.len()
1073    }
1074
1075    #[test]
1076    fn sending_once() {
1077        let mut c = ChannelState::<u32, 3>::new();
1078        assert!(c.try_send(1).is_ok());
1079        assert_eq!(capacity(&c), 2);
1080    }
1081
1082    #[test]
1083    fn sending_when_full() {
1084        let mut c = ChannelState::<u32, 3>::new();
1085        let _ = c.try_send(1);
1086        let _ = c.try_send(1);
1087        let _ = c.try_send(1);
1088        match c.try_send(2) {
1089            Err(TrySendError::Full(2)) => assert!(true),
1090            _ => assert!(false),
1091        }
1092        assert_eq!(capacity(&c), 0);
1093    }
1094
1095    #[test]
1096    fn receiving_once_with_one_send() {
1097        let mut c = ChannelState::<u32, 3>::new();
1098        assert!(c.try_send(1).is_ok());
1099        assert_eq!(c.try_receive().unwrap(), 1);
1100        assert_eq!(capacity(&c), 3);
1101    }
1102
1103    #[test]
1104    fn receiving_when_empty() {
1105        let mut c = ChannelState::<u32, 3>::new();
1106        match c.try_receive() {
1107            Err(TryReceiveError::Empty) => assert!(true),
1108            _ => assert!(false),
1109        }
1110        assert_eq!(capacity(&c), 3);
1111    }
1112
1113    #[test]
1114    fn simple_send_and_receive() {
1115        let c = Channel::<NoopRawMutex, u32, 3>::new();
1116        assert!(c.try_send(1).is_ok());
1117        assert_eq!(c.try_peek().unwrap(), 1);
1118        assert_eq!(c.try_peek().unwrap(), 1);
1119        assert_eq!(c.try_receive().unwrap(), 1);
1120    }
1121
1122    #[test]
1123    fn cloning() {
1124        let c = Channel::<NoopRawMutex, u32, 3>::new();
1125        let r1 = c.receiver();
1126        let s1 = c.sender();
1127
1128        let _ = r1.clone();
1129        let _ = s1.clone();
1130    }
1131
1132    #[test]
1133    fn dynamic_dispatch_into() {
1134        let c = Channel::<NoopRawMutex, u32, 3>::new();
1135        let s: DynamicSender<'_, u32> = c.sender().into();
1136        let r: DynamicReceiver<'_, u32> = c.receiver().into();
1137
1138        assert!(s.try_send(1).is_ok());
1139        assert_eq!(r.try_receive().unwrap(), 1);
1140    }
1141
1142    #[test]
1143    fn dynamic_dispatch_constructor() {
1144        let c = Channel::<NoopRawMutex, u32, 3>::new();
1145        let s = c.dyn_sender();
1146        let r = c.dyn_receiver();
1147
1148        assert!(s.try_send(1).is_ok());
1149        assert_eq!(r.try_peek().unwrap(), 1);
1150        assert_eq!(r.try_peek().unwrap(), 1);
1151        assert_eq!(r.try_receive().unwrap(), 1);
1152    }
1153
1154    #[futures_test::test]
1155    async fn receiver_receives_given_try_send_async() {
1156        let executor = ThreadPool::new().unwrap();
1157
1158        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1159        let c = &*CHANNEL.init(Channel::new());
1160        let c2 = c;
1161        assert!(
1162            executor
1163                .spawn(async move {
1164                    assert!(c2.try_send(1).is_ok());
1165                })
1166                .is_ok()
1167        );
1168        assert_eq!(c.receive().await, 1);
1169    }
1170
1171    #[futures_test::test]
1172    async fn sender_send_completes_if_capacity() {
1173        let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1174        c.send(1).await;
1175        assert_eq!(c.receive().await, 1);
1176    }
1177
1178    #[futures_test::test]
1179    async fn senders_sends_wait_until_capacity() {
1180        let executor = ThreadPool::new().unwrap();
1181
1182        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1183        let c = &*CHANNEL.init(Channel::new());
1184        assert!(c.try_send(1).is_ok());
1185
1186        let c2 = c;
1187        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1188        let c2 = c;
1189        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1190        // Wish I could think of a means of determining that the async send is waiting instead.
1191        // However, I've used the debugger to observe that the send does indeed wait.
1192        Delay::new(Duration::from_millis(500)).await;
1193        assert_eq!(c.receive().await, 1);
1194        assert!(
1195            executor
1196                .spawn(async move {
1197                    loop {
1198                        c.receive().await;
1199                    }
1200                })
1201                .is_ok()
1202        );
1203        send_task_1.unwrap().await;
1204        send_task_2.unwrap().await;
1205    }
1206}