1use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32pub struct Sender<'ch, M, T, const N: usize>
34where
35 M: RawMutex,
36{
37 channel: &'ch Channel<M, T, N>,
38}
39
40impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
41where
42 M: RawMutex,
43{
44 fn clone(&self) -> Self {
45 *self
46 }
47}
48
49impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
50
51impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
52where
53 M: RawMutex,
54{
55 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
59 self.channel.send(message)
60 }
61
62 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
66 self.channel.try_send(message)
67 }
68
69 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73 self.channel.poll_ready_to_send(cx)
74 }
75
76 pub const fn capacity(&self) -> usize {
80 self.channel.capacity()
81 }
82
83 pub fn free_capacity(&self) -> usize {
87 self.channel.free_capacity()
88 }
89
90 pub fn clear(&self) {
94 self.channel.clear();
95 }
96
97 pub fn len(&self) -> usize {
101 self.channel.len()
102 }
103
104 pub fn is_empty(&self) -> bool {
108 self.channel.is_empty()
109 }
110
111 pub fn is_full(&self) -> bool {
115 self.channel.is_full()
116 }
117}
118
119pub struct DynamicSender<'ch, T> {
121 pub(crate) channel: &'ch dyn DynamicChannel<T>,
122}
123
124impl<'ch, T> Clone for DynamicSender<'ch, T> {
125 fn clone(&self) -> Self {
126 *self
127 }
128}
129
130impl<'ch, T> Copy for DynamicSender<'ch, T> {}
131
132impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
133where
134 M: RawMutex,
135{
136 fn from(s: Sender<'ch, M, T, N>) -> Self {
137 Self { channel: s.channel }
138 }
139}
140
141impl<'ch, T> DynamicSender<'ch, T> {
142 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
146 DynamicSendFuture {
147 channel: self.channel,
148 message: Some(message),
149 }
150 }
151
152 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
156 self.channel.try_send_with_context(message, None)
157 }
158
159 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
163 self.channel.poll_ready_to_send(cx)
164 }
165}
166
167pub struct SendDynamicSender<'ch, T> {
170 pub(crate) channel: &'ch dyn DynamicChannel<T>,
171}
172
173impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
174 fn clone(&self) -> Self {
175 *self
176 }
177}
178
179impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
180unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
181unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
182
183impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
184where
185 M: RawMutex + Sync + Send,
186{
187 fn from(s: Sender<'ch, M, T, N>) -> Self {
188 Self { channel: s.channel }
189 }
190}
191
192impl<'ch, T> SendDynamicSender<'ch, T> {
193 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
197 DynamicSendFuture {
198 channel: self.channel,
199 message: Some(message),
200 }
201 }
202
203 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
207 self.channel.try_send_with_context(message, None)
208 }
209
210 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
214 self.channel.poll_ready_to_send(cx)
215 }
216}
217
218pub struct Receiver<'ch, M, T, const N: usize>
220where
221 M: RawMutex,
222{
223 channel: &'ch Channel<M, T, N>,
224}
225
226impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
227where
228 M: RawMutex,
229{
230 fn clone(&self) -> Self {
231 *self
232 }
233}
234
235impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
236
237impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
238where
239 M: RawMutex,
240{
241 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
245 self.channel.receive()
246 }
247
248 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
252 self.channel.ready_to_receive()
253 }
254
255 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
259 self.channel.try_receive()
260 }
261
262 pub fn try_peek(&self) -> Result<T, TryReceiveError>
266 where
267 T: Clone,
268 {
269 self.channel.try_peek()
270 }
271
272 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
276 self.channel.poll_ready_to_receive(cx)
277 }
278
279 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
283 self.channel.poll_receive(cx)
284 }
285
286 pub const fn capacity(&self) -> usize {
290 self.channel.capacity()
291 }
292
293 pub fn free_capacity(&self) -> usize {
297 self.channel.free_capacity()
298 }
299
300 pub fn clear(&self) {
304 self.channel.clear();
305 }
306
307 pub fn len(&self) -> usize {
311 self.channel.len()
312 }
313
314 pub fn is_empty(&self) -> bool {
318 self.channel.is_empty()
319 }
320
321 pub fn is_full(&self) -> bool {
325 self.channel.is_full()
326 }
327}
328
329pub struct DynamicReceiver<'ch, T> {
331 pub(crate) channel: &'ch dyn DynamicChannel<T>,
332}
333
334impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
335 fn clone(&self) -> Self {
336 *self
337 }
338}
339
340impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
341
342impl<'ch, T> DynamicReceiver<'ch, T> {
343 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
347 DynamicReceiveFuture { channel: self.channel }
348 }
349
350 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
354 self.channel.try_receive_with_context(None)
355 }
356
357 pub fn try_peek(&self) -> Result<T, TryReceiveError>
361 where
362 T: Clone,
363 {
364 self.channel.try_peek_with_context(None)
365 }
366
367 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
371 self.channel.poll_ready_to_receive(cx)
372 }
373
374 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
378 self.channel.poll_receive(cx)
379 }
380}
381
382impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
383where
384 M: RawMutex,
385{
386 fn from(s: Receiver<'ch, M, T, N>) -> Self {
387 Self { channel: s.channel }
388 }
389}
390
391pub struct SendableDynamicReceiver<'ch, T> {
394 pub(crate) channel: &'ch dyn DynamicChannel<T>,
395}
396
397impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> {
398 fn clone(&self) -> Self {
399 *self
400 }
401}
402
403impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {}
404unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {}
405unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {}
406
407impl<'ch, T> SendableDynamicReceiver<'ch, T> {
408 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
412 DynamicReceiveFuture { channel: self.channel }
413 }
414
415 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
419 self.channel.try_receive_with_context(None)
420 }
421
422 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
426 self.channel.poll_ready_to_receive(cx)
427 }
428
429 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
433 self.channel.poll_receive(cx)
434 }
435}
436
437impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendableDynamicReceiver<'ch, T>
438where
439 M: RawMutex + Sync + Send,
440{
441 fn from(s: Receiver<'ch, M, T, N>) -> Self {
442 Self { channel: s.channel }
443 }
444}
445
446impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N>
447where
448 M: RawMutex,
449{
450 type Item = T;
451
452 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453 self.channel.poll_receive(cx).map(Some)
454 }
455}
456
457#[must_use = "futures do nothing unless you `.await` or poll them"]
459pub struct ReceiveFuture<'ch, M, T, const N: usize>
460where
461 M: RawMutex,
462{
463 channel: &'ch Channel<M, T, N>,
464}
465
466impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
467where
468 M: RawMutex,
469{
470 type Output = T;
471
472 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
473 self.channel.poll_receive(cx)
474 }
475}
476
477#[must_use = "futures do nothing unless you `.await` or poll them"]
479pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
480where
481 M: RawMutex,
482{
483 channel: &'ch Channel<M, T, N>,
484}
485
486impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
487where
488 M: RawMutex,
489{
490 type Output = ();
491
492 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
493 self.channel.poll_ready_to_receive(cx)
494 }
495}
496
497#[must_use = "futures do nothing unless you `.await` or poll them"]
499pub struct DynamicReceiveFuture<'ch, T> {
500 channel: &'ch dyn DynamicChannel<T>,
501}
502
503impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
504 type Output = T;
505
506 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
507 match self.channel.try_receive_with_context(Some(cx)) {
508 Ok(v) => Poll::Ready(v),
509 Err(TryReceiveError::Empty) => Poll::Pending,
510 }
511 }
512}
513
514impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
515 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
516 Self { channel: value.channel }
517 }
518}
519
520#[must_use = "futures do nothing unless you `.await` or poll them"]
522pub struct SendFuture<'ch, M, T, const N: usize>
523where
524 M: RawMutex,
525{
526 channel: &'ch Channel<M, T, N>,
527 message: Option<T>,
528}
529
530impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
531where
532 M: RawMutex,
533{
534 type Output = ();
535
536 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
537 match self.message.take() {
538 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
539 Ok(..) => Poll::Ready(()),
540 Err(TrySendError::Full(m)) => {
541 self.message = Some(m);
542 Poll::Pending
543 }
544 },
545 None => panic!("Message cannot be None"),
546 }
547 }
548}
549
550impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
551
552#[must_use = "futures do nothing unless you `.await` or poll them"]
554pub struct DynamicSendFuture<'ch, T> {
555 channel: &'ch dyn DynamicChannel<T>,
556 message: Option<T>,
557}
558
559impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
560 type Output = ();
561
562 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563 match self.message.take() {
564 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
565 Ok(..) => Poll::Ready(()),
566 Err(TrySendError::Full(m)) => {
567 self.message = Some(m);
568 Poll::Pending
569 }
570 },
571 None => panic!("Message cannot be None"),
572 }
573 }
574}
575
576impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
577
578impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
579 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
580 Self {
581 channel: value.channel,
582 message: value.message,
583 }
584 }
585}
586
587pub(crate) trait DynamicChannel<T> {
588 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
589
590 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
591
592 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
593 where
594 T: Clone;
595
596 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
597 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
598
599 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
600}
601
602#[derive(PartialEq, Eq, Clone, Copy, Debug)]
604#[cfg_attr(feature = "defmt", derive(defmt::Format))]
605pub enum TryReceiveError {
606 Empty,
608}
609
610#[derive(PartialEq, Eq, Clone, Copy, Debug)]
612#[cfg_attr(feature = "defmt", derive(defmt::Format))]
613pub enum TrySendError<T> {
614 Full(T),
617}
618
619struct ChannelState<T, const N: usize> {
620 queue: Deque<T, N>,
621 receiver_waker: WakerRegistration,
622 senders_waker: WakerRegistration,
623}
624
625impl<T, const N: usize> ChannelState<T, N> {
626 const fn new() -> Self {
627 ChannelState {
628 queue: Deque::new(),
629 receiver_waker: WakerRegistration::new(),
630 senders_waker: WakerRegistration::new(),
631 }
632 }
633
634 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
635 self.try_receive_with_context(None)
636 }
637
638 fn try_peek(&mut self) -> Result<T, TryReceiveError>
639 where
640 T: Clone,
641 {
642 self.try_peek_with_context(None)
643 }
644
645 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
646 where
647 T: Clone,
648 {
649 if self.queue.is_full() {
650 self.senders_waker.wake();
651 }
652
653 if let Some(message) = self.queue.front() {
654 Ok(message.clone())
655 } else {
656 if let Some(cx) = cx {
657 self.receiver_waker.register(cx.waker());
658 }
659 Err(TryReceiveError::Empty)
660 }
661 }
662
663 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
664 if self.queue.is_full() {
665 self.senders_waker.wake();
666 }
667
668 if let Some(message) = self.queue.pop_front() {
669 Ok(message)
670 } else {
671 if let Some(cx) = cx {
672 self.receiver_waker.register(cx.waker());
673 }
674 Err(TryReceiveError::Empty)
675 }
676 }
677
678 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
679 if self.queue.is_full() {
680 self.senders_waker.wake();
681 }
682
683 if let Some(message) = self.queue.pop_front() {
684 Poll::Ready(message)
685 } else {
686 self.receiver_waker.register(cx.waker());
687 Poll::Pending
688 }
689 }
690
691 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
692 self.receiver_waker.register(cx.waker());
693
694 if !self.queue.is_empty() {
695 Poll::Ready(())
696 } else {
697 Poll::Pending
698 }
699 }
700
701 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
702 self.try_send_with_context(message, None)
703 }
704
705 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
706 match self.queue.push_back(message) {
707 Ok(()) => {
708 self.receiver_waker.wake();
709 Ok(())
710 }
711 Err(message) => {
712 if let Some(cx) = cx {
713 self.senders_waker.register(cx.waker());
714 }
715 Err(TrySendError::Full(message))
716 }
717 }
718 }
719
720 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
721 self.senders_waker.register(cx.waker());
722
723 if !self.queue.is_full() {
724 Poll::Ready(())
725 } else {
726 Poll::Pending
727 }
728 }
729
730 fn clear(&mut self) {
731 if self.queue.is_full() {
732 self.senders_waker.wake();
733 }
734 self.queue.clear();
735 }
736
737 fn len(&self) -> usize {
738 self.queue.len()
739 }
740
741 fn is_empty(&self) -> bool {
742 self.queue.is_empty()
743 }
744
745 fn is_full(&self) -> bool {
746 self.queue.is_full()
747 }
748}
749
750pub struct Channel<M, T, const N: usize>
759where
760 M: RawMutex,
761{
762 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
763}
764
765impl<M, T, const N: usize> Channel<M, T, N>
766where
767 M: RawMutex,
768{
769 pub const fn new() -> Self {
779 Self {
780 inner: Mutex::new(RefCell::new(ChannelState::new())),
781 }
782 }
783
784 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
785 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
786 }
787
788 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
789 self.lock(|c| c.try_receive_with_context(cx))
790 }
791
792 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
793 where
794 T: Clone,
795 {
796 self.lock(|c| c.try_peek_with_context(cx))
797 }
798
799 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
801 self.lock(|c| c.poll_receive(cx))
802 }
803
804 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
805 self.lock(|c| c.try_send_with_context(m, cx))
806 }
807
808 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
810 self.lock(|c| c.poll_ready_to_receive(cx))
811 }
812
813 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
815 self.lock(|c| c.poll_ready_to_send(cx))
816 }
817
818 pub fn sender(&self) -> Sender<'_, M, T, N> {
820 Sender { channel: self }
821 }
822
823 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
825 Receiver { channel: self }
826 }
827
828 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
830 DynamicSender { channel: self }
831 }
832
833 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
835 DynamicReceiver { channel: self }
836 }
837
838 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
843 SendFuture {
844 channel: self,
845 message: Some(message),
846 }
847 }
848
849 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
860 self.lock(|c| c.try_send(message))
861 }
862
863 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
868 ReceiveFuture { channel: self }
869 }
870
871 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
876 ReceiveReadyFuture { channel: self }
877 }
878
879 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
884 self.lock(|c| c.try_receive())
885 }
886
887 pub fn try_peek(&self) -> Result<T, TryReceiveError>
892 where
893 T: Clone,
894 {
895 self.lock(|c| c.try_peek())
896 }
897
898 pub const fn capacity(&self) -> usize {
900 N
901 }
902
903 pub fn free_capacity(&self) -> usize {
907 N - self.len()
908 }
909
910 pub fn clear(&self) {
912 self.lock(|c| c.clear());
913 }
914
915 pub fn len(&self) -> usize {
917 self.lock(|c| c.len())
918 }
919
920 pub fn is_empty(&self) -> bool {
922 self.lock(|c| c.is_empty())
923 }
924
925 pub fn is_full(&self) -> bool {
927 self.lock(|c| c.is_full())
928 }
929}
930
931impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
934where
935 M: RawMutex,
936{
937 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
938 Channel::try_send_with_context(self, m, cx)
939 }
940
941 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
942 Channel::try_receive_with_context(self, cx)
943 }
944
945 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
946 where
947 T: Clone,
948 {
949 Channel::try_peek_with_context(self, cx)
950 }
951
952 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
953 Channel::poll_ready_to_send(self, cx)
954 }
955
956 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
957 Channel::poll_ready_to_receive(self, cx)
958 }
959
960 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
961 Channel::poll_receive(self, cx)
962 }
963}
964
965impl<M, T, const N: usize> futures_util::Stream for Channel<M, T, N>
966where
967 M: RawMutex,
968{
969 type Item = T;
970
971 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972 self.poll_receive(cx).map(Some)
973 }
974}
975
976#[cfg(test)]
977mod tests {
978 use core::time::Duration;
979
980 use futures_executor::ThreadPool;
981 use futures_timer::Delay;
982 use futures_util::task::SpawnExt;
983 use static_cell::StaticCell;
984
985 use super::*;
986 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
987
988 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
989 c.queue.capacity() - c.queue.len()
990 }
991
992 #[test]
993 fn sending_once() {
994 let mut c = ChannelState::<u32, 3>::new();
995 assert!(c.try_send(1).is_ok());
996 assert_eq!(capacity(&c), 2);
997 }
998
999 #[test]
1000 fn sending_when_full() {
1001 let mut c = ChannelState::<u32, 3>::new();
1002 let _ = c.try_send(1);
1003 let _ = c.try_send(1);
1004 let _ = c.try_send(1);
1005 match c.try_send(2) {
1006 Err(TrySendError::Full(2)) => assert!(true),
1007 _ => assert!(false),
1008 }
1009 assert_eq!(capacity(&c), 0);
1010 }
1011
1012 #[test]
1013 fn receiving_once_with_one_send() {
1014 let mut c = ChannelState::<u32, 3>::new();
1015 assert!(c.try_send(1).is_ok());
1016 assert_eq!(c.try_receive().unwrap(), 1);
1017 assert_eq!(capacity(&c), 3);
1018 }
1019
1020 #[test]
1021 fn receiving_when_empty() {
1022 let mut c = ChannelState::<u32, 3>::new();
1023 match c.try_receive() {
1024 Err(TryReceiveError::Empty) => assert!(true),
1025 _ => assert!(false),
1026 }
1027 assert_eq!(capacity(&c), 3);
1028 }
1029
1030 #[test]
1031 fn simple_send_and_receive() {
1032 let c = Channel::<NoopRawMutex, u32, 3>::new();
1033 assert!(c.try_send(1).is_ok());
1034 assert_eq!(c.try_peek().unwrap(), 1);
1035 assert_eq!(c.try_peek().unwrap(), 1);
1036 assert_eq!(c.try_receive().unwrap(), 1);
1037 }
1038
1039 #[test]
1040 fn cloning() {
1041 let c = Channel::<NoopRawMutex, u32, 3>::new();
1042 let r1 = c.receiver();
1043 let s1 = c.sender();
1044
1045 let _ = r1.clone();
1046 let _ = s1.clone();
1047 }
1048
1049 #[test]
1050 fn dynamic_dispatch_into() {
1051 let c = Channel::<NoopRawMutex, u32, 3>::new();
1052 let s: DynamicSender<'_, u32> = c.sender().into();
1053 let r: DynamicReceiver<'_, u32> = c.receiver().into();
1054
1055 assert!(s.try_send(1).is_ok());
1056 assert_eq!(r.try_receive().unwrap(), 1);
1057 }
1058
1059 #[test]
1060 fn dynamic_dispatch_constructor() {
1061 let c = Channel::<NoopRawMutex, u32, 3>::new();
1062 let s = c.dyn_sender();
1063 let r = c.dyn_receiver();
1064
1065 assert!(s.try_send(1).is_ok());
1066 assert_eq!(r.try_peek().unwrap(), 1);
1067 assert_eq!(r.try_peek().unwrap(), 1);
1068 assert_eq!(r.try_receive().unwrap(), 1);
1069 }
1070
1071 #[futures_test::test]
1072 async fn receiver_receives_given_try_send_async() {
1073 let executor = ThreadPool::new().unwrap();
1074
1075 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1076 let c = &*CHANNEL.init(Channel::new());
1077 let c2 = c;
1078 assert!(executor
1079 .spawn(async move {
1080 assert!(c2.try_send(1).is_ok());
1081 })
1082 .is_ok());
1083 assert_eq!(c.receive().await, 1);
1084 }
1085
1086 #[futures_test::test]
1087 async fn sender_send_completes_if_capacity() {
1088 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1089 c.send(1).await;
1090 assert_eq!(c.receive().await, 1);
1091 }
1092
1093 #[futures_test::test]
1094 async fn senders_sends_wait_until_capacity() {
1095 let executor = ThreadPool::new().unwrap();
1096
1097 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1098 let c = &*CHANNEL.init(Channel::new());
1099 assert!(c.try_send(1).is_ok());
1100
1101 let c2 = c;
1102 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1103 let c2 = c;
1104 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1105 Delay::new(Duration::from_millis(500)).await;
1108 assert_eq!(c.receive().await, 1);
1109 assert!(executor
1110 .spawn(async move {
1111 loop {
1112 c.receive().await;
1113 }
1114 })
1115 .is_ok());
1116 send_task_1.unwrap().await;
1117 send_task_2.unwrap().await;
1118 }
1119}