embassy_sync/
priority_channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue.
4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel.
5
6use core::cell::RefCell;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub use heapless::binary_heap::{Kind, Max, Min};
12use heapless::BinaryHeap;
13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration;
18
19/// Send-only access to a [`PriorityChannel`].
20pub struct Sender<'ch, M, T, K, const N: usize>
21where
22    T: Ord,
23    K: Kind,
24    M: RawMutex,
25{
26    channel: &'ch PriorityChannel<M, T, K, N>,
27}
28
29impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
30where
31    T: Ord,
32    K: Kind,
33    M: RawMutex,
34{
35    fn clone(&self) -> Self {
36        *self
37    }
38}
39
40impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
41where
42    T: Ord,
43    K: Kind,
44    M: RawMutex,
45{
46}
47
48impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
49where
50    T: Ord,
51    K: Kind,
52    M: RawMutex,
53{
54    /// Sends a value.
55    ///
56    /// See [`PriorityChannel::send()`]
57    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> {
58        self.channel.send(message)
59    }
60
61    /// Attempt to immediately send a message.
62    ///
63    /// See [`PriorityChannel::send()`]
64    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65        self.channel.try_send(message)
66    }
67
68    /// Allows a poll_fn to poll until the channel is ready to send
69    ///
70    /// See [`PriorityChannel::poll_ready_to_send()`]
71    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72        self.channel.poll_ready_to_send(cx)
73    }
74
75    /// Returns the maximum number of elements the channel can hold.
76    ///
77    /// See [`PriorityChannel::capacity()`]
78    pub const fn capacity(&self) -> usize {
79        self.channel.capacity()
80    }
81
82    /// Returns the free capacity of the channel.
83    ///
84    /// See [`PriorityChannel::free_capacity()`]
85    pub fn free_capacity(&self) -> usize {
86        self.channel.free_capacity()
87    }
88
89    /// Clears all elements in the channel.
90    ///
91    /// See [`PriorityChannel::clear()`]
92    pub fn clear(&self) {
93        self.channel.clear();
94    }
95
96    /// Returns the number of elements currently in the channel.
97    ///
98    /// See [`PriorityChannel::len()`]
99    pub fn len(&self) -> usize {
100        self.channel.len()
101    }
102
103    /// Returns whether the channel is empty.
104    ///
105    /// See [`PriorityChannel::is_empty()`]
106    pub fn is_empty(&self) -> bool {
107        self.channel.is_empty()
108    }
109
110    /// Returns whether the channel is full.
111    ///
112    /// See [`PriorityChannel::is_full()`]
113    pub fn is_full(&self) -> bool {
114        self.channel.is_full()
115    }
116}
117
118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
119where
120    T: Ord,
121    K: Kind,
122    M: RawMutex,
123{
124    fn from(s: Sender<'ch, M, T, K, N>) -> Self {
125        Self { channel: s.channel }
126    }
127}
128
129/// Receive-only access to a [`PriorityChannel`].
130pub struct Receiver<'ch, M, T, K, const N: usize>
131where
132    T: Ord,
133    K: Kind,
134    M: RawMutex,
135{
136    channel: &'ch PriorityChannel<M, T, K, N>,
137}
138
139impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
140where
141    T: Ord,
142    K: Kind,
143    M: RawMutex,
144{
145    fn clone(&self) -> Self {
146        *self
147    }
148}
149
150impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
151where
152    T: Ord,
153    K: Kind,
154    M: RawMutex,
155{
156}
157
158impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
159where
160    T: Ord,
161    K: Kind,
162    M: RawMutex,
163{
164    /// Receive the next value.
165    ///
166    /// See [`PriorityChannel::receive()`].
167    pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
168        self.channel.receive()
169    }
170
171    /// Attempt to immediately receive the next value.
172    ///
173    /// See [`PriorityChannel::try_receive()`]
174    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
175        self.channel.try_receive()
176    }
177
178    /// Peek at the next value without removing it from the queue.
179    ///
180    /// See [`PriorityChannel::try_peek()`]
181    pub fn try_peek(&self) -> Result<T, TryReceiveError>
182    where
183        T: Clone,
184    {
185        self.channel.try_peek_with_context(None)
186    }
187
188    /// Allows a poll_fn to poll until the channel is ready to receive
189    ///
190    /// See [`PriorityChannel::poll_ready_to_receive()`]
191    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
192        self.channel.poll_ready_to_receive(cx)
193    }
194
195    /// Poll the channel for the next item
196    ///
197    /// See [`PriorityChannel::poll_receive()`]
198    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
199        self.channel.poll_receive(cx)
200    }
201
202    /// Removes the elements from the channel that satisfy the predicate.
203    ///
204    /// See [`PriorityChannel::remove_if()`]
205    pub fn remove_if<F>(&self, predicate: F)
206    where
207        F: Fn(&T) -> bool,
208        T: Clone,
209    {
210        self.channel.remove_if(predicate)
211    }
212
213    /// Returns the maximum number of elements the channel can hold.
214    ///
215    /// See [`PriorityChannel::capacity()`]
216    pub const fn capacity(&self) -> usize {
217        self.channel.capacity()
218    }
219
220    /// Returns the free capacity of the channel.
221    ///
222    /// See [`PriorityChannel::free_capacity()`]
223    pub fn free_capacity(&self) -> usize {
224        self.channel.free_capacity()
225    }
226
227    /// Clears all elements in the channel.
228    ///
229    /// See [`PriorityChannel::clear()`]
230    pub fn clear(&self) {
231        self.channel.clear();
232    }
233
234    /// Returns the number of elements currently in the channel.
235    ///
236    /// See [`PriorityChannel::len()`]
237    pub fn len(&self) -> usize {
238        self.channel.len()
239    }
240
241    /// Returns whether the channel is empty.
242    ///
243    /// See [`PriorityChannel::is_empty()`]
244    pub fn is_empty(&self) -> bool {
245        self.channel.is_empty()
246    }
247
248    /// Returns whether the channel is full.
249    ///
250    /// See [`PriorityChannel::is_full()`]
251    pub fn is_full(&self) -> bool {
252        self.channel.is_full()
253    }
254}
255
256impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
257where
258    T: Ord,
259    K: Kind,
260    M: RawMutex,
261{
262    fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
263        Self { channel: s.channel }
264    }
265}
266
267/// Future returned by [`PriorityChannel::receive`] and  [`Receiver::receive`].
268#[must_use = "futures do nothing unless you `.await` or poll them"]
269pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
270where
271    T: Ord,
272    K: Kind,
273    M: RawMutex,
274{
275    channel: &'ch PriorityChannel<M, T, K, N>,
276}
277
278impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
279where
280    T: Ord,
281    K: Kind,
282    M: RawMutex,
283{
284    type Output = T;
285
286    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
287        self.channel.poll_receive(cx)
288    }
289}
290
291/// Future returned by [`PriorityChannel::send`] and  [`Sender::send`].
292#[must_use = "futures do nothing unless you `.await` or poll them"]
293pub struct SendFuture<'ch, M, T, K, const N: usize>
294where
295    T: Ord,
296    K: Kind,
297    M: RawMutex,
298{
299    channel: &'ch PriorityChannel<M, T, K, N>,
300    message: Option<T>,
301}
302
303impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
304where
305    T: Ord,
306    K: Kind,
307    M: RawMutex,
308{
309    type Output = ();
310
311    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312        match self.message.take() {
313            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
314                Ok(..) => Poll::Ready(()),
315                Err(TrySendError::Full(m)) => {
316                    self.message = Some(m);
317                    Poll::Pending
318                }
319            },
320            None => panic!("Message cannot be None"),
321        }
322    }
323}
324
325impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
326where
327    T: Ord,
328    K: Kind,
329    M: RawMutex,
330{
331}
332
333struct ChannelState<T, K, const N: usize> {
334    queue: BinaryHeap<T, K, N>,
335    receiver_waker: WakerRegistration,
336    senders_waker: WakerRegistration,
337}
338
339impl<T, K, const N: usize> ChannelState<T, K, N>
340where
341    T: Ord,
342    K: Kind,
343{
344    const fn new() -> Self {
345        ChannelState {
346            queue: BinaryHeap::new(),
347            receiver_waker: WakerRegistration::new(),
348            senders_waker: WakerRegistration::new(),
349        }
350    }
351
352    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
353        self.try_receive_with_context(None)
354    }
355
356    fn try_peek(&mut self) -> Result<T, TryReceiveError>
357    where
358        T: Clone,
359    {
360        self.try_peek_with_context(None)
361    }
362
363    fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364    where
365        T: Clone,
366    {
367        if self.queue.len() == self.queue.capacity() {
368            self.senders_waker.wake();
369        }
370
371        if let Some(message) = self.queue.peek() {
372            Ok(message.clone())
373        } else {
374            if let Some(cx) = cx {
375                self.receiver_waker.register(cx.waker());
376            }
377            Err(TryReceiveError::Empty)
378        }
379    }
380
381    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
382        if self.queue.len() == self.queue.capacity() {
383            self.senders_waker.wake();
384        }
385
386        if let Some(message) = self.queue.pop() {
387            Ok(message)
388        } else {
389            if let Some(cx) = cx {
390                self.receiver_waker.register(cx.waker());
391            }
392            Err(TryReceiveError::Empty)
393        }
394    }
395
396    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
397        if self.queue.len() == self.queue.capacity() {
398            self.senders_waker.wake();
399        }
400
401        if let Some(message) = self.queue.pop() {
402            Poll::Ready(message)
403        } else {
404            self.receiver_waker.register(cx.waker());
405            Poll::Pending
406        }
407    }
408
409    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
410        self.receiver_waker.register(cx.waker());
411
412        if !self.queue.is_empty() {
413            Poll::Ready(())
414        } else {
415            Poll::Pending
416        }
417    }
418
419    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
420        self.try_send_with_context(message, None)
421    }
422
423    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
424        match self.queue.push(message) {
425            Ok(()) => {
426                self.receiver_waker.wake();
427                Ok(())
428            }
429            Err(message) => {
430                if let Some(cx) = cx {
431                    self.senders_waker.register(cx.waker());
432                }
433                Err(TrySendError::Full(message))
434            }
435        }
436    }
437
438    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
439        self.senders_waker.register(cx.waker());
440
441        if !self.queue.len() == self.queue.capacity() {
442            Poll::Ready(())
443        } else {
444            Poll::Pending
445        }
446    }
447
448    fn clear(&mut self) {
449        if self.queue.len() == self.queue.capacity() {
450            self.senders_waker.wake();
451        }
452        self.queue.clear();
453    }
454
455    fn len(&self) -> usize {
456        self.queue.len()
457    }
458
459    fn is_empty(&self) -> bool {
460        self.queue.is_empty()
461    }
462
463    fn is_full(&self) -> bool {
464        self.queue.len() == self.queue.capacity()
465    }
466}
467
468/// A bounded channel for communicating between asynchronous tasks
469/// with backpressure.
470///
471/// The channel will buffer up to the provided number of messages.  Once the
472/// buffer is full, attempts to `send` new messages will wait until a message is
473/// received from the channel.
474///
475/// Sent data may be reordered based on their priority within the channel.
476/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
477/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
478pub struct PriorityChannel<M, T, K, const N: usize>
479where
480    T: Ord,
481    K: Kind,
482    M: RawMutex,
483{
484    inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
485}
486
487impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
488where
489    T: Ord,
490    K: Kind,
491    M: RawMutex,
492{
493    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
494    ///
495    /// ```
496    /// use embassy_sync::priority_channel::{PriorityChannel, Max};
497    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
498    ///
499    /// // Declare a bounded channel of 3 u32s.
500    /// let mut channel = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
501    /// ```
502    pub const fn new() -> Self {
503        Self {
504            inner: Mutex::new(RefCell::new(ChannelState::new())),
505        }
506    }
507
508    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, K, N>) -> R) -> R {
509        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
510    }
511
512    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
513        self.lock(|c| c.try_receive_with_context(cx))
514    }
515
516    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517    where
518        T: Clone,
519    {
520        self.lock(|c| c.try_peek_with_context(cx))
521    }
522
523    /// Poll the channel for the next message
524    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
525        self.lock(|c| c.poll_receive(cx))
526    }
527
528    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
529        self.lock(|c| c.try_send_with_context(m, cx))
530    }
531
532    /// Allows a poll_fn to poll until the channel is ready to receive
533    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
534        self.lock(|c| c.poll_ready_to_receive(cx))
535    }
536
537    /// Allows a poll_fn to poll until the channel is ready to send
538    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
539        self.lock(|c| c.poll_ready_to_send(cx))
540    }
541
542    /// Get a sender for this channel.
543    pub fn sender(&self) -> Sender<'_, M, T, K, N> {
544        Sender { channel: self }
545    }
546
547    /// Get a receiver for this channel.
548    pub fn receiver(&self) -> Receiver<'_, M, T, K, N> {
549        Receiver { channel: self }
550    }
551
552    /// Send a value, waiting until there is capacity.
553    ///
554    /// Sending completes when the value has been pushed to the channel's queue.
555    /// This doesn't mean the value has been received yet.
556    pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> {
557        SendFuture {
558            channel: self,
559            message: Some(message),
560        }
561    }
562
563    /// Attempt to immediately send a message.
564    ///
565    /// This method differs from [`send`](PriorityChannel::send) by returning immediately if the channel's
566    /// buffer is full, instead of waiting.
567    ///
568    /// # Errors
569    ///
570    /// If the channel capacity has been reached, i.e., the channel has `n`
571    /// buffered values where `n` is the argument passed to [`PriorityChannel`], then an
572    /// error is returned.
573    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
574        self.lock(|c| c.try_send(message))
575    }
576
577    /// Receive the next value.
578    ///
579    /// If there are no messages in the channel's buffer, this method will
580    /// wait until a message is sent.
581    pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
582        ReceiveFuture { channel: self }
583    }
584
585    /// Attempt to immediately receive a message.
586    ///
587    /// This method will either receive a message from the channel immediately or return an error
588    /// if the channel is empty.
589    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
590        self.lock(|c| c.try_receive())
591    }
592
593    /// Peek at the next value without removing it from the queue.
594    ///
595    /// This method will either receive a copy of the message from the channel immediately or return
596    /// an error if the channel is empty.
597    pub fn try_peek(&self) -> Result<T, TryReceiveError>
598    where
599        T: Clone,
600    {
601        self.lock(|c| c.try_peek())
602    }
603
604    /// Removes elements from the channel based on the given predicate.
605    pub fn remove_if<F>(&self, predicate: F)
606    where
607        F: Fn(&T) -> bool,
608        T: Clone,
609    {
610        self.lock(|c| {
611            let mut new_heap = BinaryHeap::<T, K, N>::new();
612            for item in c.queue.iter() {
613                if !predicate(item) {
614                    match new_heap.push(item.clone()) {
615                        Ok(_) => (),
616                        Err(_) => panic!("Error pushing item to heap"),
617                    }
618                }
619            }
620            c.queue = new_heap;
621        });
622    }
623
624    /// Returns the maximum number of elements the channel can hold.
625    pub const fn capacity(&self) -> usize {
626        N
627    }
628
629    /// Returns the free capacity of the channel.
630    ///
631    /// This is equivalent to `capacity() - len()`
632    pub fn free_capacity(&self) -> usize {
633        N - self.len()
634    }
635
636    /// Clears all elements in the channel.
637    pub fn clear(&self) {
638        self.lock(|c| c.clear());
639    }
640
641    /// Returns the number of elements currently in the channel.
642    pub fn len(&self) -> usize {
643        self.lock(|c| c.len())
644    }
645
646    /// Returns whether the channel is empty.
647    pub fn is_empty(&self) -> bool {
648        self.lock(|c| c.is_empty())
649    }
650
651    /// Returns whether the channel is full.
652    pub fn is_full(&self) -> bool {
653        self.lock(|c| c.is_full())
654    }
655}
656
657/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
658/// tradeoff cost of dynamic dispatch.
659impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
660where
661    T: Ord,
662    K: Kind,
663    M: RawMutex,
664{
665    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
666        PriorityChannel::try_send_with_context(self, m, cx)
667    }
668
669    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
670        PriorityChannel::try_receive_with_context(self, cx)
671    }
672
673    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674    where
675        T: Clone,
676    {
677        PriorityChannel::try_peek_with_context(self, cx)
678    }
679
680    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
681        PriorityChannel::poll_ready_to_send(self, cx)
682    }
683
684    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
685        PriorityChannel::poll_ready_to_receive(self, cx)
686    }
687
688    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
689        PriorityChannel::poll_receive(self, cx)
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use core::time::Duration;
696
697    use futures_executor::ThreadPool;
698    use futures_timer::Delay;
699    use futures_util::task::SpawnExt;
700    use heapless::binary_heap::{Kind, Max};
701    use static_cell::StaticCell;
702
703    use super::*;
704    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
705
706    fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
707    where
708        T: Ord,
709        K: Kind,
710    {
711        c.queue.capacity() - c.queue.len()
712    }
713
714    #[test]
715    fn sending_once() {
716        let mut c = ChannelState::<u32, Max, 3>::new();
717        assert!(c.try_send(1).is_ok());
718        assert_eq!(capacity(&c), 2);
719    }
720
721    #[test]
722    fn sending_when_full() {
723        let mut c = ChannelState::<u32, Max, 3>::new();
724        let _ = c.try_send(1);
725        let _ = c.try_send(1);
726        let _ = c.try_send(1);
727        match c.try_send(2) {
728            Err(TrySendError::Full(2)) => assert!(true),
729            _ => assert!(false),
730        }
731        assert_eq!(capacity(&c), 0);
732    }
733
734    #[test]
735    fn send_priority() {
736        // Prio channel with kind `Max` sifts larger numbers to the front of the queue
737        let mut c = ChannelState::<u32, Max, 3>::new();
738        assert!(c.try_send(1).is_ok());
739        assert!(c.try_send(2).is_ok());
740        assert!(c.try_send(3).is_ok());
741        assert_eq!(c.try_receive().unwrap(), 3);
742        assert_eq!(c.try_receive().unwrap(), 2);
743        assert_eq!(c.try_receive().unwrap(), 1);
744    }
745
746    #[test]
747    fn receiving_once_with_one_send() {
748        let mut c = ChannelState::<u32, Max, 3>::new();
749        assert!(c.try_send(1).is_ok());
750        assert_eq!(c.try_receive().unwrap(), 1);
751        assert_eq!(capacity(&c), 3);
752    }
753
754    #[test]
755    fn receiving_when_empty() {
756        let mut c = ChannelState::<u32, Max, 3>::new();
757        match c.try_receive() {
758            Err(TryReceiveError::Empty) => assert!(true),
759            _ => assert!(false),
760        }
761        assert_eq!(capacity(&c), 3);
762    }
763
764    #[test]
765    fn simple_send_and_receive() {
766        let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
767        assert!(c.try_send(1).is_ok());
768        assert_eq!(c.try_peek().unwrap(), 1);
769        assert_eq!(c.try_peek().unwrap(), 1);
770        assert_eq!(c.try_receive().unwrap(), 1);
771    }
772
773    #[test]
774    fn cloning() {
775        let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
776        let r1 = c.receiver();
777        let s1 = c.sender();
778
779        let _ = r1.clone();
780        let _ = s1.clone();
781    }
782
783    #[test]
784    fn dynamic_dispatch() {
785        let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
786        let s: DynamicSender<'_, u32> = c.sender().into();
787        let r: DynamicReceiver<'_, u32> = c.receiver().into();
788
789        assert!(s.try_send(1).is_ok());
790        assert_eq!(r.try_peek().unwrap(), 1);
791        assert_eq!(r.try_peek().unwrap(), 1);
792        assert_eq!(r.try_receive().unwrap(), 1);
793    }
794
795    #[futures_test::test]
796    async fn receiver_receives_given_try_send_async() {
797        let executor = ThreadPool::new().unwrap();
798
799        static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
800        let c = &*CHANNEL.init(PriorityChannel::new());
801        let c2 = c;
802        assert!(executor
803            .spawn(async move {
804                assert!(c2.try_send(1).is_ok());
805            })
806            .is_ok());
807        assert_eq!(c.receive().await, 1);
808    }
809
810    #[futures_test::test]
811    async fn sender_send_completes_if_capacity() {
812        let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
813        c.send(1).await;
814        assert_eq!(c.receive().await, 1);
815    }
816
817    #[futures_test::test]
818    async fn senders_sends_wait_until_capacity() {
819        let executor = ThreadPool::new().unwrap();
820
821        static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
822        let c = &*CHANNEL.init(PriorityChannel::new());
823        assert!(c.try_send(1).is_ok());
824
825        let c2 = c;
826        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
827        let c2 = c;
828        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
829        // Wish I could think of a means of determining that the async send is waiting instead.
830        // However, I've used the debugger to observe that the send does indeed wait.
831        Delay::new(Duration::from_millis(500)).await;
832        assert_eq!(c.receive().await, 1);
833        assert!(executor
834            .spawn(async move {
835                loop {
836                    c.receive().await;
837                }
838            })
839            .is_ok());
840        send_task_1.unwrap().await;
841        send_task_2.unwrap().await;
842    }
843}