embassy_sync/
channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an  "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20//! # Example: Message passing between task and interrupt handler
21//!
22//! ```rust
23//! use embassy_sync::channel::Channel;
24//! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
25//!
26//! static SHARED_CHANNEL: Channel<CriticalSectionRawMutex, u32, 8> = Channel::new();
27//!
28//! fn my_interrupt_handler() {
29//!     // Do some work..
30//!     // ...
31//!     if let Err(e) = SHARED_CHANNEL.sender().try_send(42) {
32//!         // Channel is full..
33//!     }
34//! }
35//!
36//! async fn my_async_task() {
37//!     // ...
38//!     let receiver = SHARED_CHANNEL.receiver();
39//!     loop {
40//!         let data_from_interrupt = receiver.receive().await;
41//!         // Do something with the data.
42//!     }
43//! }
44//! ```
45
46use core::cell::RefCell;
47use core::future::Future;
48use core::pin::Pin;
49use core::task::{Context, Poll};
50
51use heapless::Deque;
52
53use crate::blocking_mutex::raw::RawMutex;
54use crate::blocking_mutex::Mutex;
55use crate::waitqueue::WakerRegistration;
56
57/// Send-only access to a [`Channel`].
58pub struct Sender<'ch, M, T, const N: usize>
59where
60    M: RawMutex,
61{
62    channel: &'ch Channel<M, T, N>,
63}
64
65impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
66where
67    M: RawMutex,
68{
69    fn clone(&self) -> Self {
70        *self
71    }
72}
73
74impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
75
76impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
77where
78    M: RawMutex,
79{
80    /// Sends a value.
81    ///
82    /// See [`Channel::send()`]
83    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
84        self.channel.send(message)
85    }
86
87    /// Attempt to immediately send a message.
88    ///
89    /// See [`Channel::send()`]
90    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
91        self.channel.try_send(message)
92    }
93
94    /// Allows a poll_fn to poll until the channel is ready to send
95    ///
96    /// See [`Channel::poll_ready_to_send()`]
97    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
98        self.channel.poll_ready_to_send(cx)
99    }
100
101    /// Returns the maximum number of elements the channel can hold.
102    ///
103    /// See [`Channel::capacity()`]
104    pub const fn capacity(&self) -> usize {
105        self.channel.capacity()
106    }
107
108    /// Returns the free capacity of the channel.
109    ///
110    /// See [`Channel::free_capacity()`]
111    pub fn free_capacity(&self) -> usize {
112        self.channel.free_capacity()
113    }
114
115    /// Clears all elements in the channel.
116    ///
117    /// See [`Channel::clear()`]
118    pub fn clear(&self) {
119        self.channel.clear();
120    }
121
122    /// Returns the number of elements currently in the channel.
123    ///
124    /// See [`Channel::len()`]
125    pub fn len(&self) -> usize {
126        self.channel.len()
127    }
128
129    /// Returns whether the channel is empty.
130    ///
131    /// See [`Channel::is_empty()`]
132    pub fn is_empty(&self) -> bool {
133        self.channel.is_empty()
134    }
135
136    /// Returns whether the channel is full.
137    ///
138    /// See [`Channel::is_full()`]
139    pub fn is_full(&self) -> bool {
140        self.channel.is_full()
141    }
142}
143
144/// Send-only access to a [`Channel`] without knowing channel size.
145pub struct DynamicSender<'ch, T> {
146    pub(crate) channel: &'ch dyn DynamicChannel<T>,
147}
148
149impl<'ch, T> Clone for DynamicSender<'ch, T> {
150    fn clone(&self) -> Self {
151        *self
152    }
153}
154
155impl<'ch, T> Copy for DynamicSender<'ch, T> {}
156
157impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
158where
159    M: RawMutex,
160{
161    fn from(s: Sender<'ch, M, T, N>) -> Self {
162        Self { channel: s.channel }
163    }
164}
165
166impl<'ch, T> DynamicSender<'ch, T> {
167    /// Sends a value.
168    ///
169    /// See [`Channel::send()`]
170    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
171        DynamicSendFuture {
172            channel: self.channel,
173            message: Some(message),
174        }
175    }
176
177    /// Attempt to immediately send a message.
178    ///
179    /// See [`Channel::send()`]
180    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
181        self.channel.try_send_with_context(message, None)
182    }
183
184    /// Allows a poll_fn to poll until the channel is ready to send
185    ///
186    /// See [`Channel::poll_ready_to_send()`]
187    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
188        self.channel.poll_ready_to_send(cx)
189    }
190}
191
192/// Send-only access to a [`Channel`] without knowing channel size.
193/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
194pub struct SendDynamicSender<'ch, T> {
195    pub(crate) channel: &'ch dyn DynamicChannel<T>,
196}
197
198impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
199    fn clone(&self) -> Self {
200        *self
201    }
202}
203
204impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
205unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
206unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
207
208impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
209where
210    M: RawMutex + Sync + Send,
211{
212    fn from(s: Sender<'ch, M, T, N>) -> Self {
213        Self { channel: s.channel }
214    }
215}
216
217impl<'ch, T> SendDynamicSender<'ch, T> {
218    /// Sends a value.
219    ///
220    /// See [`Channel::send()`]
221    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
222        DynamicSendFuture {
223            channel: self.channel,
224            message: Some(message),
225        }
226    }
227
228    /// Attempt to immediately send a message.
229    ///
230    /// See [`Channel::send()`]
231    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
232        self.channel.try_send_with_context(message, None)
233    }
234
235    /// Allows a poll_fn to poll until the channel is ready to send
236    ///
237    /// See [`Channel::poll_ready_to_send()`]
238    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
239        self.channel.poll_ready_to_send(cx)
240    }
241}
242
243/// Receive-only access to a [`Channel`].
244pub struct Receiver<'ch, M, T, const N: usize>
245where
246    M: RawMutex,
247{
248    channel: &'ch Channel<M, T, N>,
249}
250
251impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
252where
253    M: RawMutex,
254{
255    fn clone(&self) -> Self {
256        *self
257    }
258}
259
260impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
261
262impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
263where
264    M: RawMutex,
265{
266    /// Receive the next value.
267    ///
268    /// See [`Channel::receive()`].
269    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
270        self.channel.receive()
271    }
272
273    /// Is a value ready to be received in the channel
274    ///
275    /// See [`Channel::ready_to_receive()`].
276    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
277        self.channel.ready_to_receive()
278    }
279
280    /// Attempt to immediately receive the next value.
281    ///
282    /// See [`Channel::try_receive()`]
283    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
284        self.channel.try_receive()
285    }
286
287    /// Peek at the next value without removing it from the queue.
288    ///
289    /// See [`Channel::try_peek()`]
290    pub fn try_peek(&self) -> Result<T, TryReceiveError>
291    where
292        T: Clone,
293    {
294        self.channel.try_peek()
295    }
296
297    /// Allows a poll_fn to poll until the channel is ready to receive
298    ///
299    /// See [`Channel::poll_ready_to_receive()`]
300    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
301        self.channel.poll_ready_to_receive(cx)
302    }
303
304    /// Poll the channel for the next item
305    ///
306    /// See [`Channel::poll_receive()`]
307    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
308        self.channel.poll_receive(cx)
309    }
310
311    /// Returns the maximum number of elements the channel can hold.
312    ///
313    /// See [`Channel::capacity()`]
314    pub const fn capacity(&self) -> usize {
315        self.channel.capacity()
316    }
317
318    /// Returns the free capacity of the channel.
319    ///
320    /// See [`Channel::free_capacity()`]
321    pub fn free_capacity(&self) -> usize {
322        self.channel.free_capacity()
323    }
324
325    /// Clears all elements in the channel.
326    ///
327    /// See [`Channel::clear()`]
328    pub fn clear(&self) {
329        self.channel.clear();
330    }
331
332    /// Returns the number of elements currently in the channel.
333    ///
334    /// See [`Channel::len()`]
335    pub fn len(&self) -> usize {
336        self.channel.len()
337    }
338
339    /// Returns whether the channel is empty.
340    ///
341    /// See [`Channel::is_empty()`]
342    pub fn is_empty(&self) -> bool {
343        self.channel.is_empty()
344    }
345
346    /// Returns whether the channel is full.
347    ///
348    /// See [`Channel::is_full()`]
349    pub fn is_full(&self) -> bool {
350        self.channel.is_full()
351    }
352}
353
354/// Receive-only access to a [`Channel`] without knowing channel size.
355pub struct DynamicReceiver<'ch, T> {
356    pub(crate) channel: &'ch dyn DynamicChannel<T>,
357}
358
359impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
360    fn clone(&self) -> Self {
361        *self
362    }
363}
364
365impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
366
367impl<'ch, T> DynamicReceiver<'ch, T> {
368    /// Receive the next value.
369    ///
370    /// See [`Channel::receive()`].
371    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
372        DynamicReceiveFuture { channel: self.channel }
373    }
374
375    /// Attempt to immediately receive the next value.
376    ///
377    /// See [`Channel::try_receive()`]
378    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
379        self.channel.try_receive_with_context(None)
380    }
381
382    /// Peek at the next value without removing it from the queue.
383    ///
384    /// See [`Channel::try_peek()`]
385    pub fn try_peek(&self) -> Result<T, TryReceiveError>
386    where
387        T: Clone,
388    {
389        self.channel.try_peek_with_context(None)
390    }
391
392    /// Allows a poll_fn to poll until the channel is ready to receive
393    ///
394    /// See [`Channel::poll_ready_to_receive()`]
395    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
396        self.channel.poll_ready_to_receive(cx)
397    }
398
399    /// Poll the channel for the next item
400    ///
401    /// See [`Channel::poll_receive()`]
402    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
403        self.channel.poll_receive(cx)
404    }
405}
406
407impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
408where
409    M: RawMutex,
410{
411    fn from(s: Receiver<'ch, M, T, N>) -> Self {
412        Self { channel: s.channel }
413    }
414}
415
416/// Receive-only access to a [`Channel`] without knowing channel size.
417/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
418pub struct SendDynamicReceiver<'ch, T> {
419    pub(crate) channel: &'ch dyn DynamicChannel<T>,
420}
421
422/// Receive-only access to a [`Channel`] without knowing channel size.
423/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
424#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
425pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
426
427impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
428    fn clone(&self) -> Self {
429        *self
430    }
431}
432
433impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
434unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
435unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
436
437impl<'ch, T> SendDynamicReceiver<'ch, T> {
438    /// Receive the next value.
439    ///
440    /// See [`Channel::receive()`].
441    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
442        DynamicReceiveFuture { channel: self.channel }
443    }
444
445    /// Attempt to immediately receive the next value.
446    ///
447    /// See [`Channel::try_receive()`]
448    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
449        self.channel.try_receive_with_context(None)
450    }
451
452    /// Allows a poll_fn to poll until the channel is ready to receive
453    ///
454    /// See [`Channel::poll_ready_to_receive()`]
455    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
456        self.channel.poll_ready_to_receive(cx)
457    }
458
459    /// Poll the channel for the next item
460    ///
461    /// See [`Channel::poll_receive()`]
462    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
463        self.channel.poll_receive(cx)
464    }
465}
466
467impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
468where
469    M: RawMutex + Sync + Send,
470{
471    fn from(s: Receiver<'ch, M, T, N>) -> Self {
472        Self { channel: s.channel }
473    }
474}
475
476impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
477where
478    M: RawMutex,
479{
480    type Item = T;
481
482    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
483        self.channel.poll_receive(cx).map(Some)
484    }
485}
486
487/// Future returned by [`Channel::receive`] and  [`Receiver::receive`].
488#[must_use = "futures do nothing unless you `.await` or poll them"]
489pub struct ReceiveFuture<'ch, M, T, const N: usize>
490where
491    M: RawMutex,
492{
493    channel: &'ch Channel<M, T, N>,
494}
495
496impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
497where
498    M: RawMutex,
499{
500    type Output = T;
501
502    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
503        self.channel.poll_receive(cx)
504    }
505}
506
507/// Future returned by [`Channel::ready_to_receive`] and  [`Receiver::ready_to_receive`].
508#[must_use = "futures do nothing unless you `.await` or poll them"]
509pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
510where
511    M: RawMutex,
512{
513    channel: &'ch Channel<M, T, N>,
514}
515
516impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
517where
518    M: RawMutex,
519{
520    type Output = ();
521
522    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
523        self.channel.poll_ready_to_receive(cx)
524    }
525}
526
527/// Future returned by [`DynamicReceiver::receive`].
528#[must_use = "futures do nothing unless you `.await` or poll them"]
529pub struct DynamicReceiveFuture<'ch, T> {
530    channel: &'ch dyn DynamicChannel<T>,
531}
532
533impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
534    type Output = T;
535
536    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
537        match self.channel.try_receive_with_context(Some(cx)) {
538            Ok(v) => Poll::Ready(v),
539            Err(TryReceiveError::Empty) => Poll::Pending,
540        }
541    }
542}
543
544impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
545    fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
546        Self { channel: value.channel }
547    }
548}
549
550/// Future returned by [`Channel::send`] and  [`Sender::send`].
551#[must_use = "futures do nothing unless you `.await` or poll them"]
552pub struct SendFuture<'ch, M, T, const N: usize>
553where
554    M: RawMutex,
555{
556    channel: &'ch Channel<M, T, N>,
557    message: Option<T>,
558}
559
560impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
561where
562    M: RawMutex,
563{
564    type Output = ();
565
566    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
567        match self.message.take() {
568            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
569                Ok(..) => Poll::Ready(()),
570                Err(TrySendError::Full(m)) => {
571                    self.message = Some(m);
572                    Poll::Pending
573                }
574            },
575            None => panic!("Message cannot be None"),
576        }
577    }
578}
579
580impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
581
582/// Future returned by [`DynamicSender::send`].
583#[must_use = "futures do nothing unless you `.await` or poll them"]
584pub struct DynamicSendFuture<'ch, T> {
585    channel: &'ch dyn DynamicChannel<T>,
586    message: Option<T>,
587}
588
589impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
590    type Output = ();
591
592    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
593        match self.message.take() {
594            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
595                Ok(..) => Poll::Ready(()),
596                Err(TrySendError::Full(m)) => {
597                    self.message = Some(m);
598                    Poll::Pending
599                }
600            },
601            None => panic!("Message cannot be None"),
602        }
603    }
604}
605
606impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
607
608impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
609    fn from(value: SendFuture<'ch, M, T, N>) -> Self {
610        Self {
611            channel: value.channel,
612            message: value.message,
613        }
614    }
615}
616
617pub(crate) trait DynamicChannel<T> {
618    fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
619
620    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
621
622    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
623    where
624        T: Clone;
625
626    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
627    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
628
629    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
630}
631
632/// Error returned by [`try_receive`](Channel::try_receive).
633#[derive(PartialEq, Eq, Clone, Copy, Debug)]
634#[cfg_attr(feature = "defmt", derive(defmt::Format))]
635pub enum TryReceiveError {
636    /// A message could not be received because the channel is empty.
637    Empty,
638}
639
640/// Error returned by [`try_send`](Channel::try_send).
641#[derive(PartialEq, Eq, Clone, Copy, Debug)]
642#[cfg_attr(feature = "defmt", derive(defmt::Format))]
643pub enum TrySendError<T> {
644    /// The data could not be sent on the channel because the channel is
645    /// currently full and sending would require blocking.
646    Full(T),
647}
648
649struct ChannelState<T, const N: usize> {
650    queue: Deque<T, N>,
651    receiver_waker: WakerRegistration,
652    senders_waker: WakerRegistration,
653}
654
655impl<T, const N: usize> ChannelState<T, N> {
656    const fn new() -> Self {
657        ChannelState {
658            queue: Deque::new(),
659            receiver_waker: WakerRegistration::new(),
660            senders_waker: WakerRegistration::new(),
661        }
662    }
663
664    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
665        self.try_receive_with_context(None)
666    }
667
668    fn try_peek(&mut self) -> Result<T, TryReceiveError>
669    where
670        T: Clone,
671    {
672        self.try_peek_with_context(None)
673    }
674
675    fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
676    where
677        T: Clone,
678    {
679        if self.queue.is_full() {
680            self.senders_waker.wake();
681        }
682
683        if let Some(message) = self.queue.front() {
684            Ok(message.clone())
685        } else {
686            if let Some(cx) = cx {
687                self.receiver_waker.register(cx.waker());
688            }
689            Err(TryReceiveError::Empty)
690        }
691    }
692
693    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
694        if self.queue.is_full() {
695            self.senders_waker.wake();
696        }
697
698        if let Some(message) = self.queue.pop_front() {
699            Ok(message)
700        } else {
701            if let Some(cx) = cx {
702                self.receiver_waker.register(cx.waker());
703            }
704            Err(TryReceiveError::Empty)
705        }
706    }
707
708    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
709        if self.queue.is_full() {
710            self.senders_waker.wake();
711        }
712
713        if let Some(message) = self.queue.pop_front() {
714            Poll::Ready(message)
715        } else {
716            self.receiver_waker.register(cx.waker());
717            Poll::Pending
718        }
719    }
720
721    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
722        self.receiver_waker.register(cx.waker());
723
724        if !self.queue.is_empty() {
725            Poll::Ready(())
726        } else {
727            Poll::Pending
728        }
729    }
730
731    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
732        self.try_send_with_context(message, None)
733    }
734
735    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
736        match self.queue.push_back(message) {
737            Ok(()) => {
738                self.receiver_waker.wake();
739                Ok(())
740            }
741            Err(message) => {
742                if let Some(cx) = cx {
743                    self.senders_waker.register(cx.waker());
744                }
745                Err(TrySendError::Full(message))
746            }
747        }
748    }
749
750    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
751        self.senders_waker.register(cx.waker());
752
753        if !self.queue.is_full() {
754            Poll::Ready(())
755        } else {
756            Poll::Pending
757        }
758    }
759
760    fn clear(&mut self) {
761        if self.queue.is_full() {
762            self.senders_waker.wake();
763        }
764        self.queue.clear();
765    }
766
767    fn len(&self) -> usize {
768        self.queue.len()
769    }
770
771    fn is_empty(&self) -> bool {
772        self.queue.is_empty()
773    }
774
775    fn is_full(&self) -> bool {
776        self.queue.is_full()
777    }
778}
779
780/// A bounded channel for communicating between asynchronous tasks
781/// with backpressure.
782///
783/// The channel will buffer up to the provided number of messages.  Once the
784/// buffer is full, attempts to `send` new messages will wait until a message is
785/// received from the channel.
786///
787/// All data sent will become available in the same order as it was sent.
788pub struct Channel<M, T, const N: usize>
789where
790    M: RawMutex,
791{
792    inner: Mutex<M, RefCell<ChannelState<T, N>>>,
793}
794
795impl<M, T, const N: usize> Channel<M, T, N>
796where
797    M: RawMutex,
798{
799    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
800    ///
801    /// ```
802    /// use embassy_sync::channel::Channel;
803    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
804    ///
805    /// // Declare a bounded channel of 3 u32s.
806    /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
807    /// ```
808    pub const fn new() -> Self {
809        Self {
810            inner: Mutex::new(RefCell::new(ChannelState::new())),
811        }
812    }
813
814    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
815        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
816    }
817
818    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
819        self.lock(|c| c.try_receive_with_context(cx))
820    }
821
822    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
823    where
824        T: Clone,
825    {
826        self.lock(|c| c.try_peek_with_context(cx))
827    }
828
829    /// Poll the channel for the next message
830    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
831        self.lock(|c| c.poll_receive(cx))
832    }
833
834    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
835        self.lock(|c| c.try_send_with_context(m, cx))
836    }
837
838    /// Allows a poll_fn to poll until the channel is ready to receive
839    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
840        self.lock(|c| c.poll_ready_to_receive(cx))
841    }
842
843    /// Allows a poll_fn to poll until the channel is ready to send
844    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
845        self.lock(|c| c.poll_ready_to_send(cx))
846    }
847
848    /// Get a sender for this channel.
849    pub fn sender(&self) -> Sender<'_, M, T, N> {
850        Sender { channel: self }
851    }
852
853    /// Get a receiver for this channel.
854    pub fn receiver(&self) -> Receiver<'_, M, T, N> {
855        Receiver { channel: self }
856    }
857
858    /// Get a sender for this channel using dynamic dispatch.
859    pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
860        DynamicSender { channel: self }
861    }
862
863    /// Get a receiver for this channel using dynamic dispatch.
864    pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
865        DynamicReceiver { channel: self }
866    }
867
868    /// Send a value, waiting until there is capacity.
869    ///
870    /// Sending completes when the value has been pushed to the channel's queue.
871    /// This doesn't mean the value has been received yet.
872    pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
873        SendFuture {
874            channel: self,
875            message: Some(message),
876        }
877    }
878
879    /// Attempt to immediately send a message.
880    ///
881    /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
882    /// buffer is full, instead of waiting.
883    ///
884    /// # Errors
885    ///
886    /// If the channel capacity has been reached, i.e., the channel has `n`
887    /// buffered values where `n` is the argument passed to [`Channel`], then an
888    /// error is returned.
889    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
890        self.lock(|c| c.try_send(message))
891    }
892
893    /// Receive the next value.
894    ///
895    /// If there are no messages in the channel's buffer, this method will
896    /// wait until a message is sent.
897    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
898        ReceiveFuture { channel: self }
899    }
900
901    /// Is a value ready to be received in the channel
902    ///
903    /// If there are no messages in the channel's buffer, this method will
904    /// wait until there is at least one
905    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
906        ReceiveReadyFuture { channel: self }
907    }
908
909    /// Attempt to immediately receive a message.
910    ///
911    /// This method will either receive a message from the channel immediately or return an error
912    /// if the channel is empty.
913    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
914        self.lock(|c| c.try_receive())
915    }
916
917    /// Peek at the next value without removing it from the queue.
918    ///
919    /// This method will either receive a copy of the message from the channel immediately or return
920    /// an error if the channel is empty.
921    pub fn try_peek(&self) -> Result<T, TryReceiveError>
922    where
923        T: Clone,
924    {
925        self.lock(|c| c.try_peek())
926    }
927
928    /// Returns the maximum number of elements the channel can hold.
929    pub const fn capacity(&self) -> usize {
930        N
931    }
932
933    /// Returns the free capacity of the channel.
934    ///
935    /// This is equivalent to `capacity() - len()`
936    pub fn free_capacity(&self) -> usize {
937        N - self.len()
938    }
939
940    /// Clears all elements in the channel.
941    pub fn clear(&self) {
942        self.lock(|c| c.clear());
943    }
944
945    /// Returns the number of elements currently in the channel.
946    pub fn len(&self) -> usize {
947        self.lock(|c| c.len())
948    }
949
950    /// Returns whether the channel is empty.
951    pub fn is_empty(&self) -> bool {
952        self.lock(|c| c.is_empty())
953    }
954
955    /// Returns whether the channel is full.
956    pub fn is_full(&self) -> bool {
957        self.lock(|c| c.is_full())
958    }
959}
960
961/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
962/// tradeoff cost of dynamic dispatch.
963impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
964where
965    M: RawMutex,
966{
967    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
968        Channel::try_send_with_context(self, m, cx)
969    }
970
971    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
972        Channel::try_receive_with_context(self, cx)
973    }
974
975    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
976    where
977        T: Clone,
978    {
979        Channel::try_peek_with_context(self, cx)
980    }
981
982    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
983        Channel::poll_ready_to_send(self, cx)
984    }
985
986    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
987        Channel::poll_ready_to_receive(self, cx)
988    }
989
990    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
991        Channel::poll_receive(self, cx)
992    }
993}
994
995impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
996where
997    M: RawMutex,
998{
999    type Item = T;
1000
1001    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1002        self.poll_receive(cx).map(Some)
1003    }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use core::time::Duration;
1009
1010    use futures_executor::ThreadPool;
1011    use futures_timer::Delay;
1012    use futures_util::task::SpawnExt;
1013    use static_cell::StaticCell;
1014
1015    use super::*;
1016    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
1017
1018    fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
1019        c.queue.capacity() - c.queue.len()
1020    }
1021
1022    #[test]
1023    fn sending_once() {
1024        let mut c = ChannelState::<u32, 3>::new();
1025        assert!(c.try_send(1).is_ok());
1026        assert_eq!(capacity(&c), 2);
1027    }
1028
1029    #[test]
1030    fn sending_when_full() {
1031        let mut c = ChannelState::<u32, 3>::new();
1032        let _ = c.try_send(1);
1033        let _ = c.try_send(1);
1034        let _ = c.try_send(1);
1035        match c.try_send(2) {
1036            Err(TrySendError::Full(2)) => assert!(true),
1037            _ => assert!(false),
1038        }
1039        assert_eq!(capacity(&c), 0);
1040    }
1041
1042    #[test]
1043    fn receiving_once_with_one_send() {
1044        let mut c = ChannelState::<u32, 3>::new();
1045        assert!(c.try_send(1).is_ok());
1046        assert_eq!(c.try_receive().unwrap(), 1);
1047        assert_eq!(capacity(&c), 3);
1048    }
1049
1050    #[test]
1051    fn receiving_when_empty() {
1052        let mut c = ChannelState::<u32, 3>::new();
1053        match c.try_receive() {
1054            Err(TryReceiveError::Empty) => assert!(true),
1055            _ => assert!(false),
1056        }
1057        assert_eq!(capacity(&c), 3);
1058    }
1059
1060    #[test]
1061    fn simple_send_and_receive() {
1062        let c = Channel::<NoopRawMutex, u32, 3>::new();
1063        assert!(c.try_send(1).is_ok());
1064        assert_eq!(c.try_peek().unwrap(), 1);
1065        assert_eq!(c.try_peek().unwrap(), 1);
1066        assert_eq!(c.try_receive().unwrap(), 1);
1067    }
1068
1069    #[test]
1070    fn cloning() {
1071        let c = Channel::<NoopRawMutex, u32, 3>::new();
1072        let r1 = c.receiver();
1073        let s1 = c.sender();
1074
1075        let _ = r1.clone();
1076        let _ = s1.clone();
1077    }
1078
1079    #[test]
1080    fn dynamic_dispatch_into() {
1081        let c = Channel::<NoopRawMutex, u32, 3>::new();
1082        let s: DynamicSender<'_, u32> = c.sender().into();
1083        let r: DynamicReceiver<'_, u32> = c.receiver().into();
1084
1085        assert!(s.try_send(1).is_ok());
1086        assert_eq!(r.try_receive().unwrap(), 1);
1087    }
1088
1089    #[test]
1090    fn dynamic_dispatch_constructor() {
1091        let c = Channel::<NoopRawMutex, u32, 3>::new();
1092        let s = c.dyn_sender();
1093        let r = c.dyn_receiver();
1094
1095        assert!(s.try_send(1).is_ok());
1096        assert_eq!(r.try_peek().unwrap(), 1);
1097        assert_eq!(r.try_peek().unwrap(), 1);
1098        assert_eq!(r.try_receive().unwrap(), 1);
1099    }
1100
1101    #[futures_test::test]
1102    async fn receiver_receives_given_try_send_async() {
1103        let executor = ThreadPool::new().unwrap();
1104
1105        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1106        let c = &*CHANNEL.init(Channel::new());
1107        let c2 = c;
1108        assert!(executor
1109            .spawn(async move {
1110                assert!(c2.try_send(1).is_ok());
1111            })
1112            .is_ok());
1113        assert_eq!(c.receive().await, 1);
1114    }
1115
1116    #[futures_test::test]
1117    async fn sender_send_completes_if_capacity() {
1118        let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1119        c.send(1).await;
1120        assert_eq!(c.receive().await, 1);
1121    }
1122
1123    #[futures_test::test]
1124    async fn senders_sends_wait_until_capacity() {
1125        let executor = ThreadPool::new().unwrap();
1126
1127        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1128        let c = &*CHANNEL.init(Channel::new());
1129        assert!(c.try_send(1).is_ok());
1130
1131        let c2 = c;
1132        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1133        let c2 = c;
1134        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1135        // Wish I could think of a means of determining that the async send is waiting instead.
1136        // However, I've used the debugger to observe that the send does indeed wait.
1137        Delay::new(Duration::from_millis(500)).await;
1138        assert_eq!(c.receive().await, 1);
1139        assert!(executor
1140            .spawn(async move {
1141                loop {
1142                    c.receive().await;
1143                }
1144            })
1145            .is_ok());
1146        send_task_1.unwrap().await;
1147        send_task_2.unwrap().await;
1148    }
1149}