1use core::cell::RefCell;
47use core::future::Future;
48use core::pin::Pin;
49use core::task::{Context, Poll};
50
51use heapless::Deque;
52
53use crate::blocking_mutex::Mutex;
54use crate::blocking_mutex::raw::RawMutex;
55use crate::waitqueue::WakerRegistration;
56
57#[derive(Debug)]
59pub struct Sender<'ch, M, T, const N: usize>
60where
61 M: RawMutex,
62{
63 channel: &'ch Channel<M, T, N>,
64}
65
66impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
67where
68 M: RawMutex,
69{
70 fn clone(&self) -> Self {
71 *self
72 }
73}
74
75impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
76
77impl<'ch, M, T, const N: usize> futures_sink::Sink<T> for Sender<'ch, M, T, N>
78where
79 M: RawMutex,
80{
81 type Error = TrySendError<T>;
82
83 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84 self.poll_ready_to_send(cx).map(|()| Ok(()))
85 }
86
87 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
88 self.try_send(item)
89 }
90
91 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92 Poll::Ready(Ok(()))
93 }
94
95 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96 Poll::Ready(Ok(()))
97 }
98}
99
100impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
101where
102 M: RawMutex,
103{
104 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
108 self.channel.send(message)
109 }
110
111 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
115 self.channel.try_send(message)
116 }
117
118 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
122 self.channel.poll_ready_to_send(cx)
123 }
124
125 pub const fn capacity(&self) -> usize {
129 self.channel.capacity()
130 }
131
132 pub fn free_capacity(&self) -> usize {
136 self.channel.free_capacity()
137 }
138
139 pub fn clear(&self) {
143 self.channel.clear();
144 }
145
146 pub fn len(&self) -> usize {
150 self.channel.len()
151 }
152
153 pub fn is_empty(&self) -> bool {
157 self.channel.is_empty()
158 }
159
160 pub fn is_full(&self) -> bool {
164 self.channel.is_full()
165 }
166}
167
168pub struct DynamicSender<'ch, T> {
170 pub(crate) channel: &'ch dyn DynamicChannel<T>,
171}
172
173impl<'ch, T> Clone for DynamicSender<'ch, T> {
174 fn clone(&self) -> Self {
175 *self
176 }
177}
178
179impl<'ch, T> Copy for DynamicSender<'ch, T> {}
180
181impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
182where
183 M: RawMutex,
184{
185 fn from(s: Sender<'ch, M, T, N>) -> Self {
186 Self { channel: s.channel }
187 }
188}
189
190impl<'ch, T> DynamicSender<'ch, T> {
191 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
195 DynamicSendFuture {
196 channel: self.channel,
197 message: Some(message),
198 }
199 }
200
201 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
205 self.channel.try_send_with_context(message, None)
206 }
207
208 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
212 self.channel.poll_ready_to_send(cx)
213 }
214}
215
216pub struct SendDynamicSender<'ch, T> {
219 pub(crate) channel: &'ch dyn DynamicChannel<T>,
220}
221
222impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
223 fn clone(&self) -> Self {
224 *self
225 }
226}
227
228impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
229unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
230unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
231
232impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
233where
234 M: RawMutex + Sync + Send,
235{
236 fn from(s: Sender<'ch, M, T, N>) -> Self {
237 Self { channel: s.channel }
238 }
239}
240
241impl<'ch, T> SendDynamicSender<'ch, T> {
242 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
246 DynamicSendFuture {
247 channel: self.channel,
248 message: Some(message),
249 }
250 }
251
252 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
256 self.channel.try_send_with_context(message, None)
257 }
258
259 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
263 self.channel.poll_ready_to_send(cx)
264 }
265}
266
267#[derive(Debug)]
269pub struct Receiver<'ch, M, T, const N: usize>
270where
271 M: RawMutex,
272{
273 channel: &'ch Channel<M, T, N>,
274}
275
276impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
277where
278 M: RawMutex,
279{
280 fn clone(&self) -> Self {
281 *self
282 }
283}
284
285impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
286
287impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
288where
289 M: RawMutex,
290{
291 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
295 self.channel.receive()
296 }
297
298 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
302 self.channel.ready_to_receive()
303 }
304
305 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
309 self.channel.try_receive()
310 }
311
312 pub fn try_peek(&self) -> Result<T, TryReceiveError>
316 where
317 T: Clone,
318 {
319 self.channel.try_peek()
320 }
321
322 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
326 self.channel.poll_ready_to_receive(cx)
327 }
328
329 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
333 self.channel.poll_receive(cx)
334 }
335
336 pub const fn capacity(&self) -> usize {
340 self.channel.capacity()
341 }
342
343 pub fn free_capacity(&self) -> usize {
347 self.channel.free_capacity()
348 }
349
350 pub fn clear(&self) {
354 self.channel.clear();
355 }
356
357 pub fn len(&self) -> usize {
361 self.channel.len()
362 }
363
364 pub fn is_empty(&self) -> bool {
368 self.channel.is_empty()
369 }
370
371 pub fn is_full(&self) -> bool {
375 self.channel.is_full()
376 }
377}
378
379pub struct DynamicReceiver<'ch, T> {
381 pub(crate) channel: &'ch dyn DynamicChannel<T>,
382}
383
384impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
385 fn clone(&self) -> Self {
386 *self
387 }
388}
389
390impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
391
392impl<'ch, T> DynamicReceiver<'ch, T> {
393 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
397 DynamicReceiveFuture { channel: self.channel }
398 }
399
400 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
404 self.channel.try_receive_with_context(None)
405 }
406
407 pub fn try_peek(&self) -> Result<T, TryReceiveError>
411 where
412 T: Clone,
413 {
414 self.channel.try_peek_with_context(None)
415 }
416
417 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
421 self.channel.poll_ready_to_receive(cx)
422 }
423
424 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
428 self.channel.poll_receive(cx)
429 }
430}
431
432impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
433where
434 M: RawMutex,
435{
436 fn from(s: Receiver<'ch, M, T, N>) -> Self {
437 Self { channel: s.channel }
438 }
439}
440
441pub struct SendDynamicReceiver<'ch, T> {
444 pub(crate) channel: &'ch dyn DynamicChannel<T>,
445}
446
447#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
450pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
451
452impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
453 fn clone(&self) -> Self {
454 *self
455 }
456}
457
458impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
459unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
460unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
461
462impl<'ch, T> SendDynamicReceiver<'ch, T> {
463 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
467 DynamicReceiveFuture { channel: self.channel }
468 }
469
470 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
474 self.channel.try_receive_with_context(None)
475 }
476
477 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
481 self.channel.poll_ready_to_receive(cx)
482 }
483
484 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
488 self.channel.poll_receive(cx)
489 }
490}
491
492impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
493where
494 M: RawMutex + Sync + Send,
495{
496 fn from(s: Receiver<'ch, M, T, N>) -> Self {
497 Self { channel: s.channel }
498 }
499}
500
501impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
502where
503 M: RawMutex,
504{
505 type Item = T;
506
507 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
508 self.channel.poll_receive(cx).map(Some)
509 }
510}
511
512#[must_use = "futures do nothing unless you `.await` or poll them"]
514#[derive(Debug)]
515pub struct ReceiveFuture<'ch, M, T, const N: usize>
516where
517 M: RawMutex,
518{
519 channel: &'ch Channel<M, T, N>,
520}
521
522impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
523where
524 M: RawMutex,
525{
526 type Output = T;
527
528 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
529 self.channel.poll_receive(cx)
530 }
531}
532
533#[must_use = "futures do nothing unless you `.await` or poll them"]
535#[derive(Debug)]
536pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
537where
538 M: RawMutex,
539{
540 channel: &'ch Channel<M, T, N>,
541}
542
543impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
544where
545 M: RawMutex,
546{
547 type Output = ();
548
549 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
550 self.channel.poll_ready_to_receive(cx)
551 }
552}
553
554#[must_use = "futures do nothing unless you `.await` or poll them"]
556pub struct DynamicReceiveFuture<'ch, T> {
557 channel: &'ch dyn DynamicChannel<T>,
558}
559
560impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
561 type Output = T;
562
563 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
564 match self.channel.try_receive_with_context(Some(cx)) {
565 Ok(v) => Poll::Ready(v),
566 Err(TryReceiveError::Empty) => Poll::Pending,
567 }
568 }
569}
570
571impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
572 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
573 Self { channel: value.channel }
574 }
575}
576
577#[must_use = "futures do nothing unless you `.await` or poll them"]
579#[derive(Debug)]
580pub struct SendFuture<'ch, M, T, const N: usize>
581where
582 M: RawMutex,
583{
584 channel: &'ch Channel<M, T, N>,
585 message: Option<T>,
586}
587
588impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
589where
590 M: RawMutex,
591{
592 type Output = ();
593
594 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
595 match self.message.take() {
596 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
597 Ok(..) => Poll::Ready(()),
598 Err(TrySendError::Full(m)) => {
599 self.message = Some(m);
600 Poll::Pending
601 }
602 },
603 None => panic!("Message cannot be None"),
604 }
605 }
606}
607
608impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
609
610#[must_use = "futures do nothing unless you `.await` or poll them"]
612pub struct DynamicSendFuture<'ch, T> {
613 channel: &'ch dyn DynamicChannel<T>,
614 message: Option<T>,
615}
616
617impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
618 type Output = ();
619
620 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
621 match self.message.take() {
622 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
623 Ok(..) => Poll::Ready(()),
624 Err(TrySendError::Full(m)) => {
625 self.message = Some(m);
626 Poll::Pending
627 }
628 },
629 None => panic!("Message cannot be None"),
630 }
631 }
632}
633
634impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
635
636impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
637 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
638 Self {
639 channel: value.channel,
640 message: value.message,
641 }
642 }
643}
644
645pub(crate) trait DynamicChannel<T> {
646 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
647
648 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
649
650 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
651 where
652 T: Clone;
653
654 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
655 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
656
657 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
658}
659
660#[derive(PartialEq, Eq, Clone, Copy, Debug)]
662#[cfg_attr(feature = "defmt", derive(defmt::Format))]
663pub enum TryReceiveError {
664 Empty,
666}
667
668#[derive(PartialEq, Eq, Clone, Copy, Debug)]
670#[cfg_attr(feature = "defmt", derive(defmt::Format))]
671pub enum TrySendError<T> {
672 Full(T),
675}
676
677#[derive(Debug)]
678struct ChannelState<T, const N: usize> {
679 queue: Deque<T, N>,
680 receiver_waker: WakerRegistration,
681 senders_waker: WakerRegistration,
682}
683
684impl<T, const N: usize> ChannelState<T, N> {
685 const fn new() -> Self {
686 ChannelState {
687 queue: Deque::new(),
688 receiver_waker: WakerRegistration::new(),
689 senders_waker: WakerRegistration::new(),
690 }
691 }
692
693 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
694 self.try_receive_with_context(None)
695 }
696
697 fn try_peek(&mut self) -> Result<T, TryReceiveError>
698 where
699 T: Clone,
700 {
701 self.try_peek_with_context(None)
702 }
703
704 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
705 where
706 T: Clone,
707 {
708 if self.queue.is_full() {
709 self.senders_waker.wake();
710 }
711
712 if let Some(message) = self.queue.front() {
713 Ok(message.clone())
714 } else {
715 if let Some(cx) = cx {
716 self.receiver_waker.register(cx.waker());
717 }
718 Err(TryReceiveError::Empty)
719 }
720 }
721
722 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
723 if self.queue.is_full() {
724 self.senders_waker.wake();
725 }
726
727 if let Some(message) = self.queue.pop_front() {
728 Ok(message)
729 } else {
730 if let Some(cx) = cx {
731 self.receiver_waker.register(cx.waker());
732 }
733 Err(TryReceiveError::Empty)
734 }
735 }
736
737 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
738 if self.queue.is_full() {
739 self.senders_waker.wake();
740 }
741
742 if let Some(message) = self.queue.pop_front() {
743 Poll::Ready(message)
744 } else {
745 self.receiver_waker.register(cx.waker());
746 Poll::Pending
747 }
748 }
749
750 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
751 self.receiver_waker.register(cx.waker());
752
753 if !self.queue.is_empty() {
754 Poll::Ready(())
755 } else {
756 Poll::Pending
757 }
758 }
759
760 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
761 self.try_send_with_context(message, None)
762 }
763
764 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
765 match self.queue.push_back(message) {
766 Ok(()) => {
767 self.receiver_waker.wake();
768 Ok(())
769 }
770 Err(message) => {
771 if let Some(cx) = cx {
772 self.senders_waker.register(cx.waker());
773 }
774 Err(TrySendError::Full(message))
775 }
776 }
777 }
778
779 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
780 self.senders_waker.register(cx.waker());
781
782 if !self.queue.is_full() {
783 Poll::Ready(())
784 } else {
785 Poll::Pending
786 }
787 }
788
789 fn clear(&mut self) {
790 if self.queue.is_full() {
791 self.senders_waker.wake();
792 }
793 self.queue.clear();
794 }
795
796 fn len(&self) -> usize {
797 self.queue.len()
798 }
799
800 fn is_empty(&self) -> bool {
801 self.queue.is_empty()
802 }
803
804 fn is_full(&self) -> bool {
805 self.queue.is_full()
806 }
807}
808
809#[derive(Debug)]
818pub struct Channel<M, T, const N: usize>
819where
820 M: RawMutex,
821{
822 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
823}
824
825impl<M, T, const N: usize> Channel<M, T, N>
826where
827 M: RawMutex,
828{
829 pub const fn new() -> Self {
839 Self {
840 inner: Mutex::new(RefCell::new(ChannelState::new())),
841 }
842 }
843
844 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
845 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
846 }
847
848 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
849 self.lock(|c| c.try_receive_with_context(cx))
850 }
851
852 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
853 where
854 T: Clone,
855 {
856 self.lock(|c| c.try_peek_with_context(cx))
857 }
858
859 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
861 self.lock(|c| c.poll_receive(cx))
862 }
863
864 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
865 self.lock(|c| c.try_send_with_context(m, cx))
866 }
867
868 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
870 self.lock(|c| c.poll_ready_to_receive(cx))
871 }
872
873 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
875 self.lock(|c| c.poll_ready_to_send(cx))
876 }
877
878 pub fn sender(&self) -> Sender<'_, M, T, N> {
880 Sender { channel: self }
881 }
882
883 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
885 Receiver { channel: self }
886 }
887
888 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
890 DynamicSender { channel: self }
891 }
892
893 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
895 DynamicReceiver { channel: self }
896 }
897
898 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
903 SendFuture {
904 channel: self,
905 message: Some(message),
906 }
907 }
908
909 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
920 self.lock(|c| c.try_send(message))
921 }
922
923 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
928 ReceiveFuture { channel: self }
929 }
930
931 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
936 ReceiveReadyFuture { channel: self }
937 }
938
939 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
944 self.lock(|c| c.try_receive())
945 }
946
947 pub fn try_peek(&self) -> Result<T, TryReceiveError>
952 where
953 T: Clone,
954 {
955 self.lock(|c| c.try_peek())
956 }
957
958 pub const fn capacity(&self) -> usize {
960 N
961 }
962
963 pub fn free_capacity(&self) -> usize {
967 N - self.len()
968 }
969
970 pub fn clear(&self) {
972 self.lock(|c| c.clear());
973 }
974
975 pub fn len(&self) -> usize {
977 self.lock(|c| c.len())
978 }
979
980 pub fn is_empty(&self) -> bool {
982 self.lock(|c| c.is_empty())
983 }
984
985 pub fn is_full(&self) -> bool {
987 self.lock(|c| c.is_full())
988 }
989}
990
991impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
994where
995 M: RawMutex,
996{
997 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
998 Channel::try_send_with_context(self, m, cx)
999 }
1000
1001 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
1002 Channel::try_receive_with_context(self, cx)
1003 }
1004
1005 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
1006 where
1007 T: Clone,
1008 {
1009 Channel::try_peek_with_context(self, cx)
1010 }
1011
1012 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
1013 Channel::poll_ready_to_send(self, cx)
1014 }
1015
1016 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
1017 Channel::poll_ready_to_receive(self, cx)
1018 }
1019
1020 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
1021 Channel::poll_receive(self, cx)
1022 }
1023}
1024
1025impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
1026where
1027 M: RawMutex,
1028{
1029 type Item = T;
1030
1031 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1032 self.poll_receive(cx).map(Some)
1033 }
1034}
1035
1036impl<M, T, const N: usize> futures_sink::Sink<T> for Channel<M, T, N>
1037where
1038 M: RawMutex,
1039{
1040 type Error = TrySendError<T>;
1041
1042 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1043 self.poll_ready_to_send(cx).map(|()| Ok(()))
1044 }
1045
1046 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
1047 self.try_send(item)
1048 }
1049
1050 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1051 Poll::Ready(Ok(()))
1052 }
1053
1054 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1055 Poll::Ready(Ok(()))
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use core::time::Duration;
1062
1063 use futures_executor::ThreadPool;
1064 use futures_timer::Delay;
1065 use futures_util::task::SpawnExt;
1066 use static_cell::StaticCell;
1067
1068 use super::*;
1069 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
1070
1071 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
1072 c.queue.capacity() - c.queue.len()
1073 }
1074
1075 #[test]
1076 fn sending_once() {
1077 let mut c = ChannelState::<u32, 3>::new();
1078 assert!(c.try_send(1).is_ok());
1079 assert_eq!(capacity(&c), 2);
1080 }
1081
1082 #[test]
1083 fn sending_when_full() {
1084 let mut c = ChannelState::<u32, 3>::new();
1085 let _ = c.try_send(1);
1086 let _ = c.try_send(1);
1087 let _ = c.try_send(1);
1088 match c.try_send(2) {
1089 Err(TrySendError::Full(2)) => assert!(true),
1090 _ => assert!(false),
1091 }
1092 assert_eq!(capacity(&c), 0);
1093 }
1094
1095 #[test]
1096 fn receiving_once_with_one_send() {
1097 let mut c = ChannelState::<u32, 3>::new();
1098 assert!(c.try_send(1).is_ok());
1099 assert_eq!(c.try_receive().unwrap(), 1);
1100 assert_eq!(capacity(&c), 3);
1101 }
1102
1103 #[test]
1104 fn receiving_when_empty() {
1105 let mut c = ChannelState::<u32, 3>::new();
1106 match c.try_receive() {
1107 Err(TryReceiveError::Empty) => assert!(true),
1108 _ => assert!(false),
1109 }
1110 assert_eq!(capacity(&c), 3);
1111 }
1112
1113 #[test]
1114 fn simple_send_and_receive() {
1115 let c = Channel::<NoopRawMutex, u32, 3>::new();
1116 assert!(c.try_send(1).is_ok());
1117 assert_eq!(c.try_peek().unwrap(), 1);
1118 assert_eq!(c.try_peek().unwrap(), 1);
1119 assert_eq!(c.try_receive().unwrap(), 1);
1120 }
1121
1122 #[test]
1123 fn cloning() {
1124 let c = Channel::<NoopRawMutex, u32, 3>::new();
1125 let r1 = c.receiver();
1126 let s1 = c.sender();
1127
1128 let _ = r1.clone();
1129 let _ = s1.clone();
1130 }
1131
1132 #[test]
1133 fn dynamic_dispatch_into() {
1134 let c = Channel::<NoopRawMutex, u32, 3>::new();
1135 let s: DynamicSender<'_, u32> = c.sender().into();
1136 let r: DynamicReceiver<'_, u32> = c.receiver().into();
1137
1138 assert!(s.try_send(1).is_ok());
1139 assert_eq!(r.try_receive().unwrap(), 1);
1140 }
1141
1142 #[test]
1143 fn dynamic_dispatch_constructor() {
1144 let c = Channel::<NoopRawMutex, u32, 3>::new();
1145 let s = c.dyn_sender();
1146 let r = c.dyn_receiver();
1147
1148 assert!(s.try_send(1).is_ok());
1149 assert_eq!(r.try_peek().unwrap(), 1);
1150 assert_eq!(r.try_peek().unwrap(), 1);
1151 assert_eq!(r.try_receive().unwrap(), 1);
1152 }
1153
1154 #[futures_test::test]
1155 async fn receiver_receives_given_try_send_async() {
1156 let executor = ThreadPool::new().unwrap();
1157
1158 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1159 let c = &*CHANNEL.init(Channel::new());
1160 let c2 = c;
1161 assert!(
1162 executor
1163 .spawn(async move {
1164 assert!(c2.try_send(1).is_ok());
1165 })
1166 .is_ok()
1167 );
1168 assert_eq!(c.receive().await, 1);
1169 }
1170
1171 #[futures_test::test]
1172 async fn sender_send_completes_if_capacity() {
1173 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1174 c.send(1).await;
1175 assert_eq!(c.receive().await, 1);
1176 }
1177
1178 #[futures_test::test]
1179 async fn senders_sends_wait_until_capacity() {
1180 let executor = ThreadPool::new().unwrap();
1181
1182 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1183 let c = &*CHANNEL.init(Channel::new());
1184 assert!(c.try_send(1).is_ok());
1185
1186 let c2 = c;
1187 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1188 let c2 = c;
1189 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1190 Delay::new(Duration::from_millis(500)).await;
1193 assert_eq!(c.receive().await, 1);
1194 assert!(
1195 executor
1196 .spawn(async move {
1197 loop {
1198 c.receive().await;
1199 }
1200 })
1201 .is_ok()
1202 );
1203 send_task_1.unwrap().await;
1204 send_task_2.unwrap().await;
1205 }
1206}