1use core::cell::RefCell;
47use core::future::Future;
48use core::pin::Pin;
49use core::task::{Context, Poll};
50
51use heapless::Deque;
52
53use crate::blocking_mutex::raw::RawMutex;
54use crate::blocking_mutex::Mutex;
55use crate::waitqueue::WakerRegistration;
56
57pub struct Sender<'ch, M, T, const N: usize>
59where
60 M: RawMutex,
61{
62 channel: &'ch Channel<M, T, N>,
63}
64
65impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
66where
67 M: RawMutex,
68{
69 fn clone(&self) -> Self {
70 *self
71 }
72}
73
74impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
75
76impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
77where
78 M: RawMutex,
79{
80 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
84 self.channel.send(message)
85 }
86
87 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
91 self.channel.try_send(message)
92 }
93
94 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
98 self.channel.poll_ready_to_send(cx)
99 }
100
101 pub const fn capacity(&self) -> usize {
105 self.channel.capacity()
106 }
107
108 pub fn free_capacity(&self) -> usize {
112 self.channel.free_capacity()
113 }
114
115 pub fn clear(&self) {
119 self.channel.clear();
120 }
121
122 pub fn len(&self) -> usize {
126 self.channel.len()
127 }
128
129 pub fn is_empty(&self) -> bool {
133 self.channel.is_empty()
134 }
135
136 pub fn is_full(&self) -> bool {
140 self.channel.is_full()
141 }
142}
143
144pub struct DynamicSender<'ch, T> {
146 pub(crate) channel: &'ch dyn DynamicChannel<T>,
147}
148
149impl<'ch, T> Clone for DynamicSender<'ch, T> {
150 fn clone(&self) -> Self {
151 *self
152 }
153}
154
155impl<'ch, T> Copy for DynamicSender<'ch, T> {}
156
157impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
158where
159 M: RawMutex,
160{
161 fn from(s: Sender<'ch, M, T, N>) -> Self {
162 Self { channel: s.channel }
163 }
164}
165
166impl<'ch, T> DynamicSender<'ch, T> {
167 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
171 DynamicSendFuture {
172 channel: self.channel,
173 message: Some(message),
174 }
175 }
176
177 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
181 self.channel.try_send_with_context(message, None)
182 }
183
184 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
188 self.channel.poll_ready_to_send(cx)
189 }
190}
191
192pub struct SendDynamicSender<'ch, T> {
195 pub(crate) channel: &'ch dyn DynamicChannel<T>,
196}
197
198impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
199 fn clone(&self) -> Self {
200 *self
201 }
202}
203
204impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
205unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
206unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
207
208impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
209where
210 M: RawMutex + Sync + Send,
211{
212 fn from(s: Sender<'ch, M, T, N>) -> Self {
213 Self { channel: s.channel }
214 }
215}
216
217impl<'ch, T> SendDynamicSender<'ch, T> {
218 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
222 DynamicSendFuture {
223 channel: self.channel,
224 message: Some(message),
225 }
226 }
227
228 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
232 self.channel.try_send_with_context(message, None)
233 }
234
235 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
239 self.channel.poll_ready_to_send(cx)
240 }
241}
242
243pub struct Receiver<'ch, M, T, const N: usize>
245where
246 M: RawMutex,
247{
248 channel: &'ch Channel<M, T, N>,
249}
250
251impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
252where
253 M: RawMutex,
254{
255 fn clone(&self) -> Self {
256 *self
257 }
258}
259
260impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
261
262impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
263where
264 M: RawMutex,
265{
266 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
270 self.channel.receive()
271 }
272
273 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
277 self.channel.ready_to_receive()
278 }
279
280 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
284 self.channel.try_receive()
285 }
286
287 pub fn try_peek(&self) -> Result<T, TryReceiveError>
291 where
292 T: Clone,
293 {
294 self.channel.try_peek()
295 }
296
297 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
301 self.channel.poll_ready_to_receive(cx)
302 }
303
304 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
308 self.channel.poll_receive(cx)
309 }
310
311 pub const fn capacity(&self) -> usize {
315 self.channel.capacity()
316 }
317
318 pub fn free_capacity(&self) -> usize {
322 self.channel.free_capacity()
323 }
324
325 pub fn clear(&self) {
329 self.channel.clear();
330 }
331
332 pub fn len(&self) -> usize {
336 self.channel.len()
337 }
338
339 pub fn is_empty(&self) -> bool {
343 self.channel.is_empty()
344 }
345
346 pub fn is_full(&self) -> bool {
350 self.channel.is_full()
351 }
352}
353
354pub struct DynamicReceiver<'ch, T> {
356 pub(crate) channel: &'ch dyn DynamicChannel<T>,
357}
358
359impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
360 fn clone(&self) -> Self {
361 *self
362 }
363}
364
365impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
366
367impl<'ch, T> DynamicReceiver<'ch, T> {
368 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
372 DynamicReceiveFuture { channel: self.channel }
373 }
374
375 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
379 self.channel.try_receive_with_context(None)
380 }
381
382 pub fn try_peek(&self) -> Result<T, TryReceiveError>
386 where
387 T: Clone,
388 {
389 self.channel.try_peek_with_context(None)
390 }
391
392 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
396 self.channel.poll_ready_to_receive(cx)
397 }
398
399 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
403 self.channel.poll_receive(cx)
404 }
405}
406
407impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
408where
409 M: RawMutex,
410{
411 fn from(s: Receiver<'ch, M, T, N>) -> Self {
412 Self { channel: s.channel }
413 }
414}
415
416pub struct SendDynamicReceiver<'ch, T> {
419 pub(crate) channel: &'ch dyn DynamicChannel<T>,
420}
421
422#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
425pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
426
427impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
428 fn clone(&self) -> Self {
429 *self
430 }
431}
432
433impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
434unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
435unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
436
437impl<'ch, T> SendDynamicReceiver<'ch, T> {
438 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
442 DynamicReceiveFuture { channel: self.channel }
443 }
444
445 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
449 self.channel.try_receive_with_context(None)
450 }
451
452 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
456 self.channel.poll_ready_to_receive(cx)
457 }
458
459 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
463 self.channel.poll_receive(cx)
464 }
465}
466
467impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
468where
469 M: RawMutex + Sync + Send,
470{
471 fn from(s: Receiver<'ch, M, T, N>) -> Self {
472 Self { channel: s.channel }
473 }
474}
475
476impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
477where
478 M: RawMutex,
479{
480 type Item = T;
481
482 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
483 self.channel.poll_receive(cx).map(Some)
484 }
485}
486
487#[must_use = "futures do nothing unless you `.await` or poll them"]
489pub struct ReceiveFuture<'ch, M, T, const N: usize>
490where
491 M: RawMutex,
492{
493 channel: &'ch Channel<M, T, N>,
494}
495
496impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
497where
498 M: RawMutex,
499{
500 type Output = T;
501
502 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
503 self.channel.poll_receive(cx)
504 }
505}
506
507#[must_use = "futures do nothing unless you `.await` or poll them"]
509pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
510where
511 M: RawMutex,
512{
513 channel: &'ch Channel<M, T, N>,
514}
515
516impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
517where
518 M: RawMutex,
519{
520 type Output = ();
521
522 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
523 self.channel.poll_ready_to_receive(cx)
524 }
525}
526
527#[must_use = "futures do nothing unless you `.await` or poll them"]
529pub struct DynamicReceiveFuture<'ch, T> {
530 channel: &'ch dyn DynamicChannel<T>,
531}
532
533impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
534 type Output = T;
535
536 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
537 match self.channel.try_receive_with_context(Some(cx)) {
538 Ok(v) => Poll::Ready(v),
539 Err(TryReceiveError::Empty) => Poll::Pending,
540 }
541 }
542}
543
544impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
545 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
546 Self { channel: value.channel }
547 }
548}
549
550#[must_use = "futures do nothing unless you `.await` or poll them"]
552pub struct SendFuture<'ch, M, T, const N: usize>
553where
554 M: RawMutex,
555{
556 channel: &'ch Channel<M, T, N>,
557 message: Option<T>,
558}
559
560impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
561where
562 M: RawMutex,
563{
564 type Output = ();
565
566 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
567 match self.message.take() {
568 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
569 Ok(..) => Poll::Ready(()),
570 Err(TrySendError::Full(m)) => {
571 self.message = Some(m);
572 Poll::Pending
573 }
574 },
575 None => panic!("Message cannot be None"),
576 }
577 }
578}
579
580impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
581
582#[must_use = "futures do nothing unless you `.await` or poll them"]
584pub struct DynamicSendFuture<'ch, T> {
585 channel: &'ch dyn DynamicChannel<T>,
586 message: Option<T>,
587}
588
589impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
590 type Output = ();
591
592 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
593 match self.message.take() {
594 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
595 Ok(..) => Poll::Ready(()),
596 Err(TrySendError::Full(m)) => {
597 self.message = Some(m);
598 Poll::Pending
599 }
600 },
601 None => panic!("Message cannot be None"),
602 }
603 }
604}
605
606impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
607
608impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
609 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
610 Self {
611 channel: value.channel,
612 message: value.message,
613 }
614 }
615}
616
617pub(crate) trait DynamicChannel<T> {
618 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
619
620 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
621
622 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
623 where
624 T: Clone;
625
626 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
627 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
628
629 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
630}
631
632#[derive(PartialEq, Eq, Clone, Copy, Debug)]
634#[cfg_attr(feature = "defmt", derive(defmt::Format))]
635pub enum TryReceiveError {
636 Empty,
638}
639
640#[derive(PartialEq, Eq, Clone, Copy, Debug)]
642#[cfg_attr(feature = "defmt", derive(defmt::Format))]
643pub enum TrySendError<T> {
644 Full(T),
647}
648
649struct ChannelState<T, const N: usize> {
650 queue: Deque<T, N>,
651 receiver_waker: WakerRegistration,
652 senders_waker: WakerRegistration,
653}
654
655impl<T, const N: usize> ChannelState<T, N> {
656 const fn new() -> Self {
657 ChannelState {
658 queue: Deque::new(),
659 receiver_waker: WakerRegistration::new(),
660 senders_waker: WakerRegistration::new(),
661 }
662 }
663
664 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
665 self.try_receive_with_context(None)
666 }
667
668 fn try_peek(&mut self) -> Result<T, TryReceiveError>
669 where
670 T: Clone,
671 {
672 self.try_peek_with_context(None)
673 }
674
675 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
676 where
677 T: Clone,
678 {
679 if self.queue.is_full() {
680 self.senders_waker.wake();
681 }
682
683 if let Some(message) = self.queue.front() {
684 Ok(message.clone())
685 } else {
686 if let Some(cx) = cx {
687 self.receiver_waker.register(cx.waker());
688 }
689 Err(TryReceiveError::Empty)
690 }
691 }
692
693 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
694 if self.queue.is_full() {
695 self.senders_waker.wake();
696 }
697
698 if let Some(message) = self.queue.pop_front() {
699 Ok(message)
700 } else {
701 if let Some(cx) = cx {
702 self.receiver_waker.register(cx.waker());
703 }
704 Err(TryReceiveError::Empty)
705 }
706 }
707
708 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
709 if self.queue.is_full() {
710 self.senders_waker.wake();
711 }
712
713 if let Some(message) = self.queue.pop_front() {
714 Poll::Ready(message)
715 } else {
716 self.receiver_waker.register(cx.waker());
717 Poll::Pending
718 }
719 }
720
721 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
722 self.receiver_waker.register(cx.waker());
723
724 if !self.queue.is_empty() {
725 Poll::Ready(())
726 } else {
727 Poll::Pending
728 }
729 }
730
731 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
732 self.try_send_with_context(message, None)
733 }
734
735 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
736 match self.queue.push_back(message) {
737 Ok(()) => {
738 self.receiver_waker.wake();
739 Ok(())
740 }
741 Err(message) => {
742 if let Some(cx) = cx {
743 self.senders_waker.register(cx.waker());
744 }
745 Err(TrySendError::Full(message))
746 }
747 }
748 }
749
750 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
751 self.senders_waker.register(cx.waker());
752
753 if !self.queue.is_full() {
754 Poll::Ready(())
755 } else {
756 Poll::Pending
757 }
758 }
759
760 fn clear(&mut self) {
761 if self.queue.is_full() {
762 self.senders_waker.wake();
763 }
764 self.queue.clear();
765 }
766
767 fn len(&self) -> usize {
768 self.queue.len()
769 }
770
771 fn is_empty(&self) -> bool {
772 self.queue.is_empty()
773 }
774
775 fn is_full(&self) -> bool {
776 self.queue.is_full()
777 }
778}
779
780pub struct Channel<M, T, const N: usize>
789where
790 M: RawMutex,
791{
792 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
793}
794
795impl<M, T, const N: usize> Channel<M, T, N>
796where
797 M: RawMutex,
798{
799 pub const fn new() -> Self {
809 Self {
810 inner: Mutex::new(RefCell::new(ChannelState::new())),
811 }
812 }
813
814 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
815 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
816 }
817
818 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
819 self.lock(|c| c.try_receive_with_context(cx))
820 }
821
822 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
823 where
824 T: Clone,
825 {
826 self.lock(|c| c.try_peek_with_context(cx))
827 }
828
829 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
831 self.lock(|c| c.poll_receive(cx))
832 }
833
834 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
835 self.lock(|c| c.try_send_with_context(m, cx))
836 }
837
838 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
840 self.lock(|c| c.poll_ready_to_receive(cx))
841 }
842
843 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
845 self.lock(|c| c.poll_ready_to_send(cx))
846 }
847
848 pub fn sender(&self) -> Sender<'_, M, T, N> {
850 Sender { channel: self }
851 }
852
853 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
855 Receiver { channel: self }
856 }
857
858 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
860 DynamicSender { channel: self }
861 }
862
863 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
865 DynamicReceiver { channel: self }
866 }
867
868 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
873 SendFuture {
874 channel: self,
875 message: Some(message),
876 }
877 }
878
879 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
890 self.lock(|c| c.try_send(message))
891 }
892
893 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
898 ReceiveFuture { channel: self }
899 }
900
901 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
906 ReceiveReadyFuture { channel: self }
907 }
908
909 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
914 self.lock(|c| c.try_receive())
915 }
916
917 pub fn try_peek(&self) -> Result<T, TryReceiveError>
922 where
923 T: Clone,
924 {
925 self.lock(|c| c.try_peek())
926 }
927
928 pub const fn capacity(&self) -> usize {
930 N
931 }
932
933 pub fn free_capacity(&self) -> usize {
937 N - self.len()
938 }
939
940 pub fn clear(&self) {
942 self.lock(|c| c.clear());
943 }
944
945 pub fn len(&self) -> usize {
947 self.lock(|c| c.len())
948 }
949
950 pub fn is_empty(&self) -> bool {
952 self.lock(|c| c.is_empty())
953 }
954
955 pub fn is_full(&self) -> bool {
957 self.lock(|c| c.is_full())
958 }
959}
960
961impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
964where
965 M: RawMutex,
966{
967 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
968 Channel::try_send_with_context(self, m, cx)
969 }
970
971 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
972 Channel::try_receive_with_context(self, cx)
973 }
974
975 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
976 where
977 T: Clone,
978 {
979 Channel::try_peek_with_context(self, cx)
980 }
981
982 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
983 Channel::poll_ready_to_send(self, cx)
984 }
985
986 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
987 Channel::poll_ready_to_receive(self, cx)
988 }
989
990 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
991 Channel::poll_receive(self, cx)
992 }
993}
994
995impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
996where
997 M: RawMutex,
998{
999 type Item = T;
1000
1001 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1002 self.poll_receive(cx).map(Some)
1003 }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use core::time::Duration;
1009
1010 use futures_executor::ThreadPool;
1011 use futures_timer::Delay;
1012 use futures_util::task::SpawnExt;
1013 use static_cell::StaticCell;
1014
1015 use super::*;
1016 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
1017
1018 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
1019 c.queue.capacity() - c.queue.len()
1020 }
1021
1022 #[test]
1023 fn sending_once() {
1024 let mut c = ChannelState::<u32, 3>::new();
1025 assert!(c.try_send(1).is_ok());
1026 assert_eq!(capacity(&c), 2);
1027 }
1028
1029 #[test]
1030 fn sending_when_full() {
1031 let mut c = ChannelState::<u32, 3>::new();
1032 let _ = c.try_send(1);
1033 let _ = c.try_send(1);
1034 let _ = c.try_send(1);
1035 match c.try_send(2) {
1036 Err(TrySendError::Full(2)) => assert!(true),
1037 _ => assert!(false),
1038 }
1039 assert_eq!(capacity(&c), 0);
1040 }
1041
1042 #[test]
1043 fn receiving_once_with_one_send() {
1044 let mut c = ChannelState::<u32, 3>::new();
1045 assert!(c.try_send(1).is_ok());
1046 assert_eq!(c.try_receive().unwrap(), 1);
1047 assert_eq!(capacity(&c), 3);
1048 }
1049
1050 #[test]
1051 fn receiving_when_empty() {
1052 let mut c = ChannelState::<u32, 3>::new();
1053 match c.try_receive() {
1054 Err(TryReceiveError::Empty) => assert!(true),
1055 _ => assert!(false),
1056 }
1057 assert_eq!(capacity(&c), 3);
1058 }
1059
1060 #[test]
1061 fn simple_send_and_receive() {
1062 let c = Channel::<NoopRawMutex, u32, 3>::new();
1063 assert!(c.try_send(1).is_ok());
1064 assert_eq!(c.try_peek().unwrap(), 1);
1065 assert_eq!(c.try_peek().unwrap(), 1);
1066 assert_eq!(c.try_receive().unwrap(), 1);
1067 }
1068
1069 #[test]
1070 fn cloning() {
1071 let c = Channel::<NoopRawMutex, u32, 3>::new();
1072 let r1 = c.receiver();
1073 let s1 = c.sender();
1074
1075 let _ = r1.clone();
1076 let _ = s1.clone();
1077 }
1078
1079 #[test]
1080 fn dynamic_dispatch_into() {
1081 let c = Channel::<NoopRawMutex, u32, 3>::new();
1082 let s: DynamicSender<'_, u32> = c.sender().into();
1083 let r: DynamicReceiver<'_, u32> = c.receiver().into();
1084
1085 assert!(s.try_send(1).is_ok());
1086 assert_eq!(r.try_receive().unwrap(), 1);
1087 }
1088
1089 #[test]
1090 fn dynamic_dispatch_constructor() {
1091 let c = Channel::<NoopRawMutex, u32, 3>::new();
1092 let s = c.dyn_sender();
1093 let r = c.dyn_receiver();
1094
1095 assert!(s.try_send(1).is_ok());
1096 assert_eq!(r.try_peek().unwrap(), 1);
1097 assert_eq!(r.try_peek().unwrap(), 1);
1098 assert_eq!(r.try_receive().unwrap(), 1);
1099 }
1100
1101 #[futures_test::test]
1102 async fn receiver_receives_given_try_send_async() {
1103 let executor = ThreadPool::new().unwrap();
1104
1105 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1106 let c = &*CHANNEL.init(Channel::new());
1107 let c2 = c;
1108 assert!(executor
1109 .spawn(async move {
1110 assert!(c2.try_send(1).is_ok());
1111 })
1112 .is_ok());
1113 assert_eq!(c.receive().await, 1);
1114 }
1115
1116 #[futures_test::test]
1117 async fn sender_send_completes_if_capacity() {
1118 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1119 c.send(1).await;
1120 assert_eq!(c.receive().await, 1);
1121 }
1122
1123 #[futures_test::test]
1124 async fn senders_sends_wait_until_capacity() {
1125 let executor = ThreadPool::new().unwrap();
1126
1127 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1128 let c = &*CHANNEL.init(Channel::new());
1129 assert!(c.try_send(1).is_ok());
1130
1131 let c2 = c;
1132 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1133 let c2 = c;
1134 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1135 Delay::new(Duration::from_millis(500)).await;
1138 assert_eq!(c.receive().await, 1);
1139 assert!(executor
1140 .spawn(async move {
1141 loop {
1142 c.receive().await;
1143 }
1144 })
1145 .is_ok());
1146 send_task_1.unwrap().await;
1147 send_task_2.unwrap().await;
1148 }
1149}