1use core::cell::RefCell;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub use heapless::binary_heap::{Kind, Max, Min};
12use heapless::BinaryHeap;
13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration;
18
19pub struct Sender<'ch, M, T, K, const N: usize>
21where
22 T: Ord,
23 K: Kind,
24 M: RawMutex,
25{
26 channel: &'ch PriorityChannel<M, T, K, N>,
27}
28
29impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
30where
31 T: Ord,
32 K: Kind,
33 M: RawMutex,
34{
35 fn clone(&self) -> Self {
36 *self
37 }
38}
39
40impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
41where
42 T: Ord,
43 K: Kind,
44 M: RawMutex,
45{
46}
47
48impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
49where
50 T: Ord,
51 K: Kind,
52 M: RawMutex,
53{
54 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> {
58 self.channel.send(message)
59 }
60
61 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67
68 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx)
73 }
74
75 pub const fn capacity(&self) -> usize {
79 self.channel.capacity()
80 }
81
82 pub fn free_capacity(&self) -> usize {
86 self.channel.free_capacity()
87 }
88
89 pub fn clear(&self) {
93 self.channel.clear();
94 }
95
96 pub fn len(&self) -> usize {
100 self.channel.len()
101 }
102
103 pub fn is_empty(&self) -> bool {
107 self.channel.is_empty()
108 }
109
110 pub fn is_full(&self) -> bool {
114 self.channel.is_full()
115 }
116}
117
118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
119where
120 T: Ord,
121 K: Kind,
122 M: RawMutex,
123{
124 fn from(s: Sender<'ch, M, T, K, N>) -> Self {
125 Self { channel: s.channel }
126 }
127}
128
129pub struct Receiver<'ch, M, T, K, const N: usize>
131where
132 T: Ord,
133 K: Kind,
134 M: RawMutex,
135{
136 channel: &'ch PriorityChannel<M, T, K, N>,
137}
138
139impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
140where
141 T: Ord,
142 K: Kind,
143 M: RawMutex,
144{
145 fn clone(&self) -> Self {
146 *self
147 }
148}
149
150impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
151where
152 T: Ord,
153 K: Kind,
154 M: RawMutex,
155{
156}
157
158impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
159where
160 T: Ord,
161 K: Kind,
162 M: RawMutex,
163{
164 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
168 self.channel.receive()
169 }
170
171 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
175 self.channel.try_receive()
176 }
177
178 pub fn try_peek(&self) -> Result<T, TryReceiveError>
182 where
183 T: Clone,
184 {
185 self.channel.try_peek_with_context(None)
186 }
187
188 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
192 self.channel.poll_ready_to_receive(cx)
193 }
194
195 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
199 self.channel.poll_receive(cx)
200 }
201
202 pub fn remove_if<F>(&self, predicate: F)
206 where
207 F: Fn(&T) -> bool,
208 T: Clone,
209 {
210 self.channel.remove_if(predicate)
211 }
212
213 pub const fn capacity(&self) -> usize {
217 self.channel.capacity()
218 }
219
220 pub fn free_capacity(&self) -> usize {
224 self.channel.free_capacity()
225 }
226
227 pub fn clear(&self) {
231 self.channel.clear();
232 }
233
234 pub fn len(&self) -> usize {
238 self.channel.len()
239 }
240
241 pub fn is_empty(&self) -> bool {
245 self.channel.is_empty()
246 }
247
248 pub fn is_full(&self) -> bool {
252 self.channel.is_full()
253 }
254}
255
256impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
257where
258 T: Ord,
259 K: Kind,
260 M: RawMutex,
261{
262 fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
263 Self { channel: s.channel }
264 }
265}
266
267#[must_use = "futures do nothing unless you `.await` or poll them"]
269pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
270where
271 T: Ord,
272 K: Kind,
273 M: RawMutex,
274{
275 channel: &'ch PriorityChannel<M, T, K, N>,
276}
277
278impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
279where
280 T: Ord,
281 K: Kind,
282 M: RawMutex,
283{
284 type Output = T;
285
286 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
287 self.channel.poll_receive(cx)
288 }
289}
290
291#[must_use = "futures do nothing unless you `.await` or poll them"]
293pub struct SendFuture<'ch, M, T, K, const N: usize>
294where
295 T: Ord,
296 K: Kind,
297 M: RawMutex,
298{
299 channel: &'ch PriorityChannel<M, T, K, N>,
300 message: Option<T>,
301}
302
303impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
304where
305 T: Ord,
306 K: Kind,
307 M: RawMutex,
308{
309 type Output = ();
310
311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312 match self.message.take() {
313 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
314 Ok(..) => Poll::Ready(()),
315 Err(TrySendError::Full(m)) => {
316 self.message = Some(m);
317 Poll::Pending
318 }
319 },
320 None => panic!("Message cannot be None"),
321 }
322 }
323}
324
325impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
326where
327 T: Ord,
328 K: Kind,
329 M: RawMutex,
330{
331}
332
333struct ChannelState<T, K, const N: usize> {
334 queue: BinaryHeap<T, K, N>,
335 receiver_waker: WakerRegistration,
336 senders_waker: WakerRegistration,
337}
338
339impl<T, K, const N: usize> ChannelState<T, K, N>
340where
341 T: Ord,
342 K: Kind,
343{
344 const fn new() -> Self {
345 ChannelState {
346 queue: BinaryHeap::new(),
347 receiver_waker: WakerRegistration::new(),
348 senders_waker: WakerRegistration::new(),
349 }
350 }
351
352 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
353 self.try_receive_with_context(None)
354 }
355
356 fn try_peek(&mut self) -> Result<T, TryReceiveError>
357 where
358 T: Clone,
359 {
360 self.try_peek_with_context(None)
361 }
362
363 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
364 where
365 T: Clone,
366 {
367 if self.queue.len() == self.queue.capacity() {
368 self.senders_waker.wake();
369 }
370
371 if let Some(message) = self.queue.peek() {
372 Ok(message.clone())
373 } else {
374 if let Some(cx) = cx {
375 self.receiver_waker.register(cx.waker());
376 }
377 Err(TryReceiveError::Empty)
378 }
379 }
380
381 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
382 if self.queue.len() == self.queue.capacity() {
383 self.senders_waker.wake();
384 }
385
386 if let Some(message) = self.queue.pop() {
387 Ok(message)
388 } else {
389 if let Some(cx) = cx {
390 self.receiver_waker.register(cx.waker());
391 }
392 Err(TryReceiveError::Empty)
393 }
394 }
395
396 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
397 if self.queue.len() == self.queue.capacity() {
398 self.senders_waker.wake();
399 }
400
401 if let Some(message) = self.queue.pop() {
402 Poll::Ready(message)
403 } else {
404 self.receiver_waker.register(cx.waker());
405 Poll::Pending
406 }
407 }
408
409 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
410 self.receiver_waker.register(cx.waker());
411
412 if !self.queue.is_empty() {
413 Poll::Ready(())
414 } else {
415 Poll::Pending
416 }
417 }
418
419 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
420 self.try_send_with_context(message, None)
421 }
422
423 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
424 match self.queue.push(message) {
425 Ok(()) => {
426 self.receiver_waker.wake();
427 Ok(())
428 }
429 Err(message) => {
430 if let Some(cx) = cx {
431 self.senders_waker.register(cx.waker());
432 }
433 Err(TrySendError::Full(message))
434 }
435 }
436 }
437
438 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
439 self.senders_waker.register(cx.waker());
440
441 if !self.queue.len() == self.queue.capacity() {
442 Poll::Ready(())
443 } else {
444 Poll::Pending
445 }
446 }
447
448 fn clear(&mut self) {
449 if self.queue.len() == self.queue.capacity() {
450 self.senders_waker.wake();
451 }
452 self.queue.clear();
453 }
454
455 fn len(&self) -> usize {
456 self.queue.len()
457 }
458
459 fn is_empty(&self) -> bool {
460 self.queue.is_empty()
461 }
462
463 fn is_full(&self) -> bool {
464 self.queue.len() == self.queue.capacity()
465 }
466}
467
468pub struct PriorityChannel<M, T, K, const N: usize>
479where
480 T: Ord,
481 K: Kind,
482 M: RawMutex,
483{
484 inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
485}
486
487impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
488where
489 T: Ord,
490 K: Kind,
491 M: RawMutex,
492{
493 pub const fn new() -> Self {
503 Self {
504 inner: Mutex::new(RefCell::new(ChannelState::new())),
505 }
506 }
507
508 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, K, N>) -> R) -> R {
509 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
510 }
511
512 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
513 self.lock(|c| c.try_receive_with_context(cx))
514 }
515
516 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
517 where
518 T: Clone,
519 {
520 self.lock(|c| c.try_peek_with_context(cx))
521 }
522
523 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
525 self.lock(|c| c.poll_receive(cx))
526 }
527
528 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
529 self.lock(|c| c.try_send_with_context(m, cx))
530 }
531
532 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
534 self.lock(|c| c.poll_ready_to_receive(cx))
535 }
536
537 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
539 self.lock(|c| c.poll_ready_to_send(cx))
540 }
541
542 pub fn sender(&self) -> Sender<'_, M, T, K, N> {
544 Sender { channel: self }
545 }
546
547 pub fn receiver(&self) -> Receiver<'_, M, T, K, N> {
549 Receiver { channel: self }
550 }
551
552 pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> {
557 SendFuture {
558 channel: self,
559 message: Some(message),
560 }
561 }
562
563 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
574 self.lock(|c| c.try_send(message))
575 }
576
577 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
582 ReceiveFuture { channel: self }
583 }
584
585 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
590 self.lock(|c| c.try_receive())
591 }
592
593 pub fn try_peek(&self) -> Result<T, TryReceiveError>
598 where
599 T: Clone,
600 {
601 self.lock(|c| c.try_peek())
602 }
603
604 pub fn remove_if<F>(&self, predicate: F)
606 where
607 F: Fn(&T) -> bool,
608 T: Clone,
609 {
610 self.lock(|c| {
611 let mut new_heap = BinaryHeap::<T, K, N>::new();
612 for item in c.queue.iter() {
613 if !predicate(item) {
614 match new_heap.push(item.clone()) {
615 Ok(_) => (),
616 Err(_) => panic!("Error pushing item to heap"),
617 }
618 }
619 }
620 c.queue = new_heap;
621 });
622 }
623
624 pub const fn capacity(&self) -> usize {
626 N
627 }
628
629 pub fn free_capacity(&self) -> usize {
633 N - self.len()
634 }
635
636 pub fn clear(&self) {
638 self.lock(|c| c.clear());
639 }
640
641 pub fn len(&self) -> usize {
643 self.lock(|c| c.len())
644 }
645
646 pub fn is_empty(&self) -> bool {
648 self.lock(|c| c.is_empty())
649 }
650
651 pub fn is_full(&self) -> bool {
653 self.lock(|c| c.is_full())
654 }
655}
656
657impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
660where
661 T: Ord,
662 K: Kind,
663 M: RawMutex,
664{
665 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
666 PriorityChannel::try_send_with_context(self, m, cx)
667 }
668
669 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
670 PriorityChannel::try_receive_with_context(self, cx)
671 }
672
673 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
674 where
675 T: Clone,
676 {
677 PriorityChannel::try_peek_with_context(self, cx)
678 }
679
680 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
681 PriorityChannel::poll_ready_to_send(self, cx)
682 }
683
684 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
685 PriorityChannel::poll_ready_to_receive(self, cx)
686 }
687
688 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
689 PriorityChannel::poll_receive(self, cx)
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use core::time::Duration;
696
697 use futures_executor::ThreadPool;
698 use futures_timer::Delay;
699 use futures_util::task::SpawnExt;
700 use heapless::binary_heap::{Kind, Max};
701 use static_cell::StaticCell;
702
703 use super::*;
704 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
705
706 fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
707 where
708 T: Ord,
709 K: Kind,
710 {
711 c.queue.capacity() - c.queue.len()
712 }
713
714 #[test]
715 fn sending_once() {
716 let mut c = ChannelState::<u32, Max, 3>::new();
717 assert!(c.try_send(1).is_ok());
718 assert_eq!(capacity(&c), 2);
719 }
720
721 #[test]
722 fn sending_when_full() {
723 let mut c = ChannelState::<u32, Max, 3>::new();
724 let _ = c.try_send(1);
725 let _ = c.try_send(1);
726 let _ = c.try_send(1);
727 match c.try_send(2) {
728 Err(TrySendError::Full(2)) => assert!(true),
729 _ => assert!(false),
730 }
731 assert_eq!(capacity(&c), 0);
732 }
733
734 #[test]
735 fn send_priority() {
736 let mut c = ChannelState::<u32, Max, 3>::new();
738 assert!(c.try_send(1).is_ok());
739 assert!(c.try_send(2).is_ok());
740 assert!(c.try_send(3).is_ok());
741 assert_eq!(c.try_receive().unwrap(), 3);
742 assert_eq!(c.try_receive().unwrap(), 2);
743 assert_eq!(c.try_receive().unwrap(), 1);
744 }
745
746 #[test]
747 fn receiving_once_with_one_send() {
748 let mut c = ChannelState::<u32, Max, 3>::new();
749 assert!(c.try_send(1).is_ok());
750 assert_eq!(c.try_receive().unwrap(), 1);
751 assert_eq!(capacity(&c), 3);
752 }
753
754 #[test]
755 fn receiving_when_empty() {
756 let mut c = ChannelState::<u32, Max, 3>::new();
757 match c.try_receive() {
758 Err(TryReceiveError::Empty) => assert!(true),
759 _ => assert!(false),
760 }
761 assert_eq!(capacity(&c), 3);
762 }
763
764 #[test]
765 fn simple_send_and_receive() {
766 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
767 assert!(c.try_send(1).is_ok());
768 assert_eq!(c.try_peek().unwrap(), 1);
769 assert_eq!(c.try_peek().unwrap(), 1);
770 assert_eq!(c.try_receive().unwrap(), 1);
771 }
772
773 #[test]
774 fn cloning() {
775 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
776 let r1 = c.receiver();
777 let s1 = c.sender();
778
779 let _ = r1.clone();
780 let _ = s1.clone();
781 }
782
783 #[test]
784 fn dynamic_dispatch() {
785 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
786 let s: DynamicSender<'_, u32> = c.sender().into();
787 let r: DynamicReceiver<'_, u32> = c.receiver().into();
788
789 assert!(s.try_send(1).is_ok());
790 assert_eq!(r.try_peek().unwrap(), 1);
791 assert_eq!(r.try_peek().unwrap(), 1);
792 assert_eq!(r.try_receive().unwrap(), 1);
793 }
794
795 #[futures_test::test]
796 async fn receiver_receives_given_try_send_async() {
797 let executor = ThreadPool::new().unwrap();
798
799 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
800 let c = &*CHANNEL.init(PriorityChannel::new());
801 let c2 = c;
802 assert!(executor
803 .spawn(async move {
804 assert!(c2.try_send(1).is_ok());
805 })
806 .is_ok());
807 assert_eq!(c.receive().await, 1);
808 }
809
810 #[futures_test::test]
811 async fn sender_send_completes_if_capacity() {
812 let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
813 c.send(1).await;
814 assert_eq!(c.receive().await, 1);
815 }
816
817 #[futures_test::test]
818 async fn senders_sends_wait_until_capacity() {
819 let executor = ThreadPool::new().unwrap();
820
821 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
822 let c = &*CHANNEL.init(PriorityChannel::new());
823 assert!(c.try_send(1).is_ok());
824
825 let c2 = c;
826 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
827 let c2 = c;
828 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
829 Delay::new(Duration::from_millis(500)).await;
832 assert_eq!(c.receive().await, 1);
833 assert!(executor
834 .spawn(async move {
835 loop {
836 c.receive().await;
837 }
838 })
839 .is_ok());
840 send_task_1.unwrap().await;
841 send_task_2.unwrap().await;
842 }
843}