1#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79 PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81 pub const fn new() -> Self {
83 Self {
84 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85 }
86 }
87
88 pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut();
94
95 if s.subscriber_count >= SUBS {
96 Err(Error::MaximumSubscribersReached)
97 } else {
98 s.subscriber_count += 1;
99 Ok(Subscriber(Sub::new(s.next_message_id, self)))
100 }
101 })
102 }
103
104 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108 self.inner.lock(|inner| {
109 let mut s = inner.borrow_mut();
110
111 if s.subscriber_count >= SUBS {
112 Err(Error::MaximumSubscribersReached)
113 } else {
114 s.subscriber_count += 1;
115 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116 }
117 })
118 }
119
120 pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut();
126
127 if s.publisher_count >= PUBS {
128 Err(Error::MaximumPublishersReached)
129 } else {
130 s.publisher_count += 1;
131 Ok(Publisher(Pub::new(self)))
132 }
133 })
134 }
135
136 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140 self.inner.lock(|inner| {
141 let mut s = inner.borrow_mut();
142
143 if s.publisher_count >= PUBS {
144 Err(Error::MaximumPublishersReached)
145 } else {
146 s.publisher_count += 1;
147 Ok(DynPublisher(Pub::new(self)))
148 }
149 })
150 }
151
152 pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self))
156 }
157
158 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> {
161 DynImmediatePublisher(ImmediatePub::new(self))
162 }
163
164 pub const fn capacity(&self) -> usize {
166 CAP
167 }
168
169 pub fn free_capacity(&self) -> usize {
173 CAP - self.len()
174 }
175
176 pub fn clear(&self) {
178 self.inner.lock(|inner| inner.borrow_mut().clear());
179 }
180
181 pub fn len(&self) -> usize {
183 self.inner.lock(|inner| inner.borrow().len())
184 }
185
186 pub fn is_empty(&self) -> bool {
188 self.inner.lock(|inner| inner.borrow().is_empty())
189 }
190
191 pub fn is_full(&self) -> bool {
193 self.inner.lock(|inner| inner.borrow().is_full())
194 }
195}
196
197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
198 for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{
200 fn publish_immediate(&self, message: T) {
201 self.inner.lock(|s| {
202 let mut s = s.borrow_mut();
203 s.publish_immediate(message)
204 })
205 }
206
207 fn capacity(&self) -> usize {
208 self.capacity()
209 }
210
211 fn is_full(&self) -> bool {
212 self.is_full()
213 }
214}
215
216impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
217 for PubSubChannel<M, T, CAP, SUBS, PUBS>
218{
219 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
220 self.inner.lock(|s| {
221 let mut s = s.borrow_mut();
222
223 match s.get_message(*next_message_id) {
225 Some(WaitResult::Message(message)) => {
227 *next_message_id += 1;
228 Poll::Ready(WaitResult::Message(message))
229 }
230 None => {
232 if let Some(cx) = cx {
233 s.subscriber_wakers.register(cx.waker());
234 }
235 Poll::Pending
236 }
237 Some(WaitResult::Lagged(amount)) => {
239 *next_message_id += amount;
240 Poll::Ready(WaitResult::Lagged(amount))
241 }
242 }
243 })
244 }
245
246 fn available(&self, next_message_id: u64) -> u64 {
247 self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
248 }
249
250 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
251 self.inner.lock(|s| {
252 let mut s = s.borrow_mut();
253 match s.try_publish(message) {
255 Ok(()) => Ok(()),
257 Err(message) => {
259 if let Some(cx) = cx {
260 s.publisher_wakers.register(cx.waker());
261 }
262 Err(message)
263 }
264 }
265 })
266 }
267
268 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
269 self.inner.lock(|s| {
270 let mut s = s.borrow_mut();
271 s.unregister_subscriber(subscriber_next_message_id)
272 })
273 }
274
275 fn unregister_publisher(&self) {
276 self.inner.lock(|s| {
277 let mut s = s.borrow_mut();
278 s.unregister_publisher()
279 })
280 }
281
282 fn free_capacity(&self) -> usize {
283 self.free_capacity()
284 }
285
286 fn clear(&self) {
287 self.clear();
288 }
289
290 fn len(&self) -> usize {
291 self.len()
292 }
293
294 fn is_empty(&self) -> bool {
295 self.is_empty()
296 }
297}
298
299struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
301 queue: Deque<(T, usize), CAP>,
303 next_message_id: u64,
307 subscriber_wakers: MultiWakerRegistration<SUBS>,
309 publisher_wakers: MultiWakerRegistration<PUBS>,
311 subscriber_count: usize,
313 publisher_count: usize,
315}
316
317impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
318 const fn new() -> Self {
320 Self {
321 queue: Deque::new(),
322 next_message_id: 0,
323 subscriber_wakers: MultiWakerRegistration::new(),
324 publisher_wakers: MultiWakerRegistration::new(),
325 subscriber_count: 0,
326 publisher_count: 0,
327 }
328 }
329
330 fn try_publish(&mut self, message: T) -> Result<(), T> {
331 if self.subscriber_count == 0 {
332 return Ok(());
334 }
335
336 if self.queue.is_full() {
337 return Err(message);
338 }
339 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
341
342 self.next_message_id += 1;
343
344 self.subscriber_wakers.wake();
346
347 Ok(())
348 }
349
350 fn publish_immediate(&mut self, message: T) {
351 if self.queue.is_full() {
353 self.queue.pop_front();
354 }
355
356 self.try_publish(message).ok().unwrap();
358 }
359
360 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
361 let start_id = self.next_message_id - self.queue.len() as u64;
362
363 if message_id < start_id {
364 return Some(WaitResult::Lagged(start_id - message_id));
365 }
366
367 let current_message_index = (message_id - start_id) as usize;
368
369 if current_message_index >= self.queue.len() {
370 return None;
371 }
372
373 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
375
376 queue_item.1 -= 1;
378
379 let message = if current_message_index == 0 && queue_item.1 == 0 {
380 let (message, _) = self.queue.pop_front().unwrap();
381 self.publisher_wakers.wake();
382 message
384 } else {
385 queue_item.0.clone()
386 };
387
388 Some(WaitResult::Message(message))
389 }
390
391 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
392 self.subscriber_count -= 1;
393
394 let start_id = self.next_message_id - self.queue.len() as u64;
396 if subscriber_next_message_id >= start_id {
397 let current_message_index = (subscriber_next_message_id - start_id) as usize;
398 self.queue
399 .iter_mut()
400 .skip(current_message_index)
401 .for_each(|(_, counter)| *counter -= 1);
402
403 let mut wake_publishers = false;
404 while let Some((_, count)) = self.queue.front() {
405 if *count == 0 {
406 self.queue.pop_front().unwrap();
407 wake_publishers = true;
408 } else {
409 break;
410 }
411 }
412
413 if wake_publishers {
414 self.publisher_wakers.wake();
415 }
416 }
417 }
418
419 fn unregister_publisher(&mut self) {
420 self.publisher_count -= 1;
421 }
422
423 fn clear(&mut self) {
424 if self.is_full() {
425 self.publisher_wakers.wake();
426 }
427 self.queue.clear();
428 }
429
430 fn len(&self) -> usize {
431 self.queue.len()
432 }
433
434 fn is_empty(&self) -> bool {
435 self.queue.is_empty()
436 }
437
438 fn is_full(&self) -> bool {
439 self.queue.is_full()
440 }
441}
442
443#[derive(Debug, PartialEq, Eq, Clone, Copy)]
445#[cfg_attr(feature = "defmt", derive(defmt::Format))]
446pub enum Error {
447 MaximumSubscribersReached,
450 MaximumPublishersReached,
453}
454
455trait SealedPubSubBehavior<T> {
456 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
460
461 fn available(&self, next_message_id: u64) -> u64;
464
465 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
469
470 fn free_capacity(&self) -> usize;
474
475 fn clear(&self);
477
478 fn len(&self) -> usize;
480
481 fn is_empty(&self) -> bool;
483
484 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
486
487 fn unregister_publisher(&self);
489}
490
491#[allow(private_bounds)]
494pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
495 fn publish_immediate(&self, message: T);
497
498 fn capacity(&self) -> usize;
500
501 fn is_full(&self) -> bool;
503}
504
505#[derive(Debug, Clone, PartialEq, Eq)]
507#[cfg_attr(feature = "defmt", derive(defmt::Format))]
508pub enum WaitResult<T> {
509 Lagged(u64),
512 Message(T),
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519 use crate::blocking_mutex::raw::NoopRawMutex;
520
521 #[futures_test::test]
522 async fn dyn_pub_sub_works() {
523 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
524
525 let mut sub0 = channel.dyn_subscriber().unwrap();
526 let mut sub1 = channel.dyn_subscriber().unwrap();
527 let pub0 = channel.dyn_publisher().unwrap();
528
529 pub0.publish(42).await;
530
531 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
532 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
533
534 assert_eq!(sub0.try_next_message(), None);
535 assert_eq!(sub1.try_next_message(), None);
536 }
537
538 #[futures_test::test]
539 async fn all_subscribers_receive() {
540 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
541
542 let mut sub0 = channel.subscriber().unwrap();
543 let mut sub1 = channel.subscriber().unwrap();
544 let pub0 = channel.publisher().unwrap();
545
546 pub0.publish(42).await;
547
548 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
549 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
550
551 assert_eq!(sub0.try_next_message(), None);
552 assert_eq!(sub1.try_next_message(), None);
553 }
554
555 #[futures_test::test]
556 async fn lag_when_queue_full_on_immediate_publish() {
557 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
558
559 let mut sub0 = channel.subscriber().unwrap();
560 let pub0 = channel.publisher().unwrap();
561
562 pub0.publish_immediate(42);
563 pub0.publish_immediate(43);
564 pub0.publish_immediate(44);
565 pub0.publish_immediate(45);
566 pub0.publish_immediate(46);
567 pub0.publish_immediate(47);
568
569 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
570 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
571 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
572 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
573 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
574 assert_eq!(sub0.try_next_message(), None);
575 }
576
577 #[test]
578 fn limited_subs_and_pubs() {
579 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
580
581 let sub0 = channel.subscriber();
582 let sub1 = channel.subscriber();
583 let sub2 = channel.subscriber();
584 let sub3 = channel.subscriber();
585 let sub4 = channel.subscriber();
586
587 assert!(sub0.is_ok());
588 assert!(sub1.is_ok());
589 assert!(sub2.is_ok());
590 assert!(sub3.is_ok());
591 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
592
593 drop(sub0);
594
595 let sub5 = channel.subscriber();
596 assert!(sub5.is_ok());
597
598 let pub0 = channel.publisher();
601 let pub1 = channel.publisher();
602 let pub2 = channel.publisher();
603 let pub3 = channel.publisher();
604 let pub4 = channel.publisher();
605
606 assert!(pub0.is_ok());
607 assert!(pub1.is_ok());
608 assert!(pub2.is_ok());
609 assert!(pub3.is_ok());
610 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
611
612 drop(pub0);
613
614 let pub5 = channel.publisher();
615 assert!(pub5.is_ok());
616 }
617
618 #[test]
619 fn publisher_wait_on_full_queue() {
620 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
621
622 let pub0 = channel.publisher().unwrap();
623
624 assert_eq!(pub0.try_publish(0), Ok(()));
626 assert_eq!(pub0.try_publish(0), Ok(()));
627 assert_eq!(pub0.try_publish(0), Ok(()));
628 assert_eq!(pub0.try_publish(0), Ok(()));
629 assert_eq!(pub0.try_publish(0), Ok(()));
630
631 let sub0 = channel.subscriber().unwrap();
632
633 assert_eq!(pub0.try_publish(0), Ok(()));
634 assert_eq!(pub0.try_publish(0), Ok(()));
635 assert_eq!(pub0.try_publish(0), Ok(()));
636 assert_eq!(pub0.try_publish(0), Ok(()));
637 assert!(pub0.is_full());
638 assert_eq!(pub0.try_publish(0), Err(0));
639
640 drop(sub0);
641 }
642
643 #[futures_test::test]
644 async fn correct_available() {
645 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
646
647 let sub0 = channel.subscriber().unwrap();
648 let mut sub1 = channel.subscriber().unwrap();
649 let pub0 = channel.publisher().unwrap();
650
651 assert_eq!(sub0.available(), 0);
652 assert_eq!(sub1.available(), 0);
653
654 pub0.publish(42).await;
655
656 assert_eq!(sub0.available(), 1);
657 assert_eq!(sub1.available(), 1);
658
659 sub1.next_message().await;
660
661 assert_eq!(sub1.available(), 0);
662
663 pub0.publish(42).await;
664
665 assert_eq!(sub0.available(), 2);
666 assert_eq!(sub1.available(), 1);
667 }
668
669 #[futures_test::test]
670 async fn correct_len() {
671 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
672
673 let mut sub0 = channel.subscriber().unwrap();
674 let mut sub1 = channel.subscriber().unwrap();
675 let pub0 = channel.publisher().unwrap();
676
677 assert!(sub0.is_empty());
678 assert!(sub1.is_empty());
679 assert!(pub0.is_empty());
680 assert_eq!(pub0.free_capacity(), 4);
681 assert_eq!(pub0.len(), 0);
682
683 pub0.publish(42).await;
684
685 assert_eq!(pub0.free_capacity(), 3);
686 assert_eq!(pub0.len(), 1);
687
688 pub0.publish(42).await;
689
690 assert_eq!(pub0.free_capacity(), 2);
691 assert_eq!(pub0.len(), 2);
692
693 sub0.next_message().await;
694 sub0.next_message().await;
695
696 assert_eq!(pub0.free_capacity(), 2);
697 assert_eq!(pub0.len(), 2);
698
699 sub1.next_message().await;
700 assert_eq!(pub0.free_capacity(), 3);
701 assert_eq!(pub0.len(), 1);
702
703 sub1.next_message().await;
704 assert_eq!(pub0.free_capacity(), 4);
705 assert_eq!(pub0.len(), 0);
706 }
707
708 #[futures_test::test]
709 async fn empty_channel_when_last_subscriber_is_dropped() {
710 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
711
712 let pub0 = channel.publisher().unwrap();
713 let mut sub0 = channel.subscriber().unwrap();
714 let mut sub1 = channel.subscriber().unwrap();
715
716 assert_eq!(4, pub0.free_capacity());
717
718 pub0.publish(1).await;
719 pub0.publish(2).await;
720
721 assert_eq!(2, channel.free_capacity());
722
723 assert_eq!(1, sub0.try_next_message_pure().unwrap());
724 assert_eq!(2, sub0.try_next_message_pure().unwrap());
725
726 assert_eq!(2, channel.free_capacity());
727
728 drop(sub0);
729
730 assert_eq!(2, channel.free_capacity());
731
732 assert_eq!(1, sub1.try_next_message_pure().unwrap());
733
734 assert_eq!(3, channel.free_capacity());
735
736 drop(sub1);
737
738 assert_eq!(4, channel.free_capacity());
739 }
740
741 struct CloneCallCounter(usize);
742
743 impl Clone for CloneCallCounter {
744 fn clone(&self) -> Self {
745 Self(self.0 + 1)
746 }
747 }
748
749 #[futures_test::test]
750 async fn skip_clone_for_last_message() {
751 let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
752 let pub0 = channel.publisher().unwrap();
753 let mut sub0 = channel.subscriber().unwrap();
754 let mut sub1 = channel.subscriber().unwrap();
755
756 pub0.publish(CloneCallCounter(0)).await;
757
758 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
759 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
760 }
761
762 #[futures_test::test]
763 async fn publisher_sink() {
764 use futures_util::{SinkExt, StreamExt};
765
766 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
767
768 let mut sub = channel.subscriber().unwrap();
769
770 let publ = channel.publisher().unwrap();
771 let mut sink = publ.sink();
772
773 sink.send(0).await.unwrap();
774 assert_eq!(0, sub.try_next_message_pure().unwrap());
775
776 sink.send(1).await.unwrap();
777 assert_eq!(1, sub.try_next_message_pure().unwrap());
778
779 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
780 .await
781 .unwrap();
782 assert_eq!(0, sub.try_next_message_pure().unwrap());
783 assert_eq!(1, sub.try_next_message_pure().unwrap());
784 assert_eq!(2, sub.try_next_message_pure().unwrap());
785 assert_eq!(3, sub.try_next_message_pure().unwrap());
786 }
787}