1#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23#[derive(Debug)]
75pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
76 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
77}
78
79impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
80 PubSubChannel<M, T, CAP, SUBS, PUBS>
81{
82 pub const fn new() -> Self {
84 Self {
85 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
86 }
87 }
88
89 pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error> {
93 self.inner.lock(|inner| {
94 let mut s = inner.borrow_mut();
95
96 if s.subscriber_count >= SUBS {
97 Err(Error::MaximumSubscribersReached)
98 } else {
99 s.subscriber_count += 1;
100 Ok(Subscriber(Sub::new(s.next_message_id, self)))
101 }
102 })
103 }
104
105 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
109 self.inner.lock(|inner| {
110 let mut s = inner.borrow_mut();
111
112 if s.subscriber_count >= SUBS {
113 Err(Error::MaximumSubscribersReached)
114 } else {
115 s.subscriber_count += 1;
116 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
117 }
118 })
119 }
120
121 pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error> {
125 self.inner.lock(|inner| {
126 let mut s = inner.borrow_mut();
127
128 if s.publisher_count >= PUBS {
129 Err(Error::MaximumPublishersReached)
130 } else {
131 s.publisher_count += 1;
132 Ok(Publisher(Pub::new(self)))
133 }
134 })
135 }
136
137 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
141 self.inner.lock(|inner| {
142 let mut s = inner.borrow_mut();
143
144 if s.publisher_count >= PUBS {
145 Err(Error::MaximumPublishersReached)
146 } else {
147 s.publisher_count += 1;
148 Ok(DynPublisher(Pub::new(self)))
149 }
150 })
151 }
152
153 pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> {
156 ImmediatePublisher(ImmediatePub::new(self))
157 }
158
159 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> {
162 DynImmediatePublisher(ImmediatePub::new(self))
163 }
164
165 pub const fn capacity(&self) -> usize {
167 CAP
168 }
169
170 pub fn free_capacity(&self) -> usize {
174 CAP - self.len()
175 }
176
177 pub fn clear(&self) {
179 self.inner.lock(|inner| inner.borrow_mut().clear());
180 }
181
182 pub fn len(&self) -> usize {
184 self.inner.lock(|inner| inner.borrow().len())
185 }
186
187 pub fn is_empty(&self) -> bool {
189 self.inner.lock(|inner| inner.borrow().is_empty())
190 }
191
192 pub fn is_full(&self) -> bool {
194 self.inner.lock(|inner| inner.borrow().is_full())
195 }
196}
197
198impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
199 for PubSubChannel<M, T, CAP, SUBS, PUBS>
200{
201 fn publish_immediate(&self, message: T) {
202 self.inner.lock(|s| {
203 let mut s = s.borrow_mut();
204 s.publish_immediate(message)
205 })
206 }
207
208 fn capacity(&self) -> usize {
209 self.capacity()
210 }
211
212 fn is_full(&self) -> bool {
213 self.is_full()
214 }
215}
216
217impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
218 for PubSubChannel<M, T, CAP, SUBS, PUBS>
219{
220 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
221 self.inner.lock(|s| {
222 let mut s = s.borrow_mut();
223
224 match s.get_message(*next_message_id) {
226 Some(WaitResult::Message(message)) => {
228 *next_message_id += 1;
229 Poll::Ready(WaitResult::Message(message))
230 }
231 None => {
233 if let Some(cx) = cx {
234 s.subscriber_wakers.register(cx.waker());
235 }
236 Poll::Pending
237 }
238 Some(WaitResult::Lagged(amount)) => {
240 *next_message_id += amount;
241 Poll::Ready(WaitResult::Lagged(amount))
242 }
243 }
244 })
245 }
246
247 fn available(&self, next_message_id: u64) -> u64 {
248 self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
249 }
250
251 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
252 self.inner.lock(|s| {
253 let mut s = s.borrow_mut();
254 match s.try_publish(message) {
256 Ok(()) => Ok(()),
258 Err(message) => {
260 if let Some(cx) = cx {
261 s.publisher_wakers.register(cx.waker());
262 }
263 Err(message)
264 }
265 }
266 })
267 }
268
269 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
270 self.inner.lock(|s| {
271 let mut s = s.borrow_mut();
272 s.unregister_subscriber(subscriber_next_message_id)
273 })
274 }
275
276 fn unregister_publisher(&self) {
277 self.inner.lock(|s| {
278 let mut s = s.borrow_mut();
279 s.unregister_publisher()
280 })
281 }
282
283 fn free_capacity(&self) -> usize {
284 self.free_capacity()
285 }
286
287 fn clear(&self) {
288 self.clear();
289 }
290
291 fn len(&self) -> usize {
292 self.len()
293 }
294
295 fn is_empty(&self) -> bool {
296 self.is_empty()
297 }
298}
299
300#[derive(Debug)]
302struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
303 queue: Deque<(T, usize), CAP>,
305 next_message_id: u64,
309 subscriber_wakers: MultiWakerRegistration<SUBS>,
311 publisher_wakers: MultiWakerRegistration<PUBS>,
313 subscriber_count: usize,
315 publisher_count: usize,
317}
318
319impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
320 const fn new() -> Self {
322 Self {
323 queue: Deque::new(),
324 next_message_id: 0,
325 subscriber_wakers: MultiWakerRegistration::new(),
326 publisher_wakers: MultiWakerRegistration::new(),
327 subscriber_count: 0,
328 publisher_count: 0,
329 }
330 }
331
332 fn try_publish(&mut self, message: T) -> Result<(), T> {
333 if self.subscriber_count == 0 {
334 return Ok(());
336 }
337
338 if self.queue.is_full() {
339 return Err(message);
340 }
341 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
343
344 self.next_message_id += 1;
345
346 self.subscriber_wakers.wake();
348
349 Ok(())
350 }
351
352 fn publish_immediate(&mut self, message: T) {
353 if self.queue.is_full() {
355 self.queue.pop_front();
356 }
357
358 self.try_publish(message).ok().unwrap();
360 }
361
362 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
363 let start_id = self.next_message_id - self.queue.len() as u64;
364
365 if message_id < start_id {
366 return Some(WaitResult::Lagged(start_id - message_id));
367 }
368
369 let current_message_index = (message_id - start_id) as usize;
370
371 if current_message_index >= self.queue.len() {
372 return None;
373 }
374
375 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
377
378 queue_item.1 -= 1;
380
381 let message = if current_message_index == 0 && queue_item.1 == 0 {
382 let (message, _) = self.queue.pop_front().unwrap();
383 self.publisher_wakers.wake();
384 message
386 } else {
387 queue_item.0.clone()
388 };
389
390 Some(WaitResult::Message(message))
391 }
392
393 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
394 self.subscriber_count -= 1;
395
396 let start_id = self.next_message_id - self.queue.len() as u64;
398 if subscriber_next_message_id >= start_id {
399 let current_message_index = (subscriber_next_message_id - start_id) as usize;
400 self.queue
401 .iter_mut()
402 .skip(current_message_index)
403 .for_each(|(_, counter)| *counter -= 1);
404
405 let mut wake_publishers = false;
406 while let Some((_, count)) = self.queue.front() {
407 if *count == 0 {
408 self.queue.pop_front().unwrap();
409 wake_publishers = true;
410 } else {
411 break;
412 }
413 }
414
415 if wake_publishers {
416 self.publisher_wakers.wake();
417 }
418 }
419 }
420
421 fn unregister_publisher(&mut self) {
422 self.publisher_count -= 1;
423 }
424
425 fn clear(&mut self) {
426 if self.is_full() {
427 self.publisher_wakers.wake();
428 }
429 self.queue.clear();
430 }
431
432 fn len(&self) -> usize {
433 self.queue.len()
434 }
435
436 fn is_empty(&self) -> bool {
437 self.queue.is_empty()
438 }
439
440 fn is_full(&self) -> bool {
441 self.queue.is_full()
442 }
443}
444
445#[derive(Debug, PartialEq, Eq, Clone, Copy)]
447#[cfg_attr(feature = "defmt", derive(defmt::Format))]
448pub enum Error {
449 MaximumSubscribersReached,
452 MaximumPublishersReached,
455}
456
457trait SealedPubSubBehavior<T> {
458 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
462
463 fn available(&self, next_message_id: u64) -> u64;
466
467 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
471
472 fn free_capacity(&self) -> usize;
476
477 fn clear(&self);
479
480 fn len(&self) -> usize;
482
483 fn is_empty(&self) -> bool;
485
486 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
488
489 fn unregister_publisher(&self);
491}
492
493#[allow(private_bounds)]
496pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
497 fn publish_immediate(&self, message: T);
499
500 fn capacity(&self) -> usize;
502
503 fn is_full(&self) -> bool;
505}
506
507#[derive(Debug, Clone, PartialEq, Eq)]
509#[cfg_attr(feature = "defmt", derive(defmt::Format))]
510pub enum WaitResult<T> {
511 Lagged(u64),
514 Message(T),
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::blocking_mutex::raw::NoopRawMutex;
522
523 #[futures_test::test]
524 async fn dyn_pub_sub_works() {
525 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
526
527 let mut sub0 = channel.dyn_subscriber().unwrap();
528 let mut sub1 = channel.dyn_subscriber().unwrap();
529 let pub0 = channel.dyn_publisher().unwrap();
530
531 pub0.publish(42).await;
532
533 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
534 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
535
536 assert_eq!(sub0.try_next_message(), None);
537 assert_eq!(sub1.try_next_message(), None);
538 }
539
540 #[futures_test::test]
541 async fn all_subscribers_receive() {
542 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
543
544 let mut sub0 = channel.subscriber().unwrap();
545 let mut sub1 = channel.subscriber().unwrap();
546 let pub0 = channel.publisher().unwrap();
547
548 pub0.publish(42).await;
549
550 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
551 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
552
553 assert_eq!(sub0.try_next_message(), None);
554 assert_eq!(sub1.try_next_message(), None);
555 }
556
557 #[futures_test::test]
558 async fn lag_when_queue_full_on_immediate_publish() {
559 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
560
561 let mut sub0 = channel.subscriber().unwrap();
562 let pub0 = channel.publisher().unwrap();
563
564 pub0.publish_immediate(42);
565 pub0.publish_immediate(43);
566 pub0.publish_immediate(44);
567 pub0.publish_immediate(45);
568 pub0.publish_immediate(46);
569 pub0.publish_immediate(47);
570
571 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
572 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
573 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
574 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
575 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
576 assert_eq!(sub0.try_next_message(), None);
577 }
578
579 #[test]
580 fn limited_subs_and_pubs() {
581 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
582
583 let sub0 = channel.subscriber();
584 let sub1 = channel.subscriber();
585 let sub2 = channel.subscriber();
586 let sub3 = channel.subscriber();
587 let sub4 = channel.subscriber();
588
589 assert!(sub0.is_ok());
590 assert!(sub1.is_ok());
591 assert!(sub2.is_ok());
592 assert!(sub3.is_ok());
593 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
594
595 drop(sub0);
596
597 let sub5 = channel.subscriber();
598 assert!(sub5.is_ok());
599
600 let pub0 = channel.publisher();
603 let pub1 = channel.publisher();
604 let pub2 = channel.publisher();
605 let pub3 = channel.publisher();
606 let pub4 = channel.publisher();
607
608 assert!(pub0.is_ok());
609 assert!(pub1.is_ok());
610 assert!(pub2.is_ok());
611 assert!(pub3.is_ok());
612 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
613
614 drop(pub0);
615
616 let pub5 = channel.publisher();
617 assert!(pub5.is_ok());
618 }
619
620 #[test]
621 fn publisher_wait_on_full_queue() {
622 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
623
624 let pub0 = channel.publisher().unwrap();
625
626 assert_eq!(pub0.try_publish(0), Ok(()));
628 assert_eq!(pub0.try_publish(0), Ok(()));
629 assert_eq!(pub0.try_publish(0), Ok(()));
630 assert_eq!(pub0.try_publish(0), Ok(()));
631 assert_eq!(pub0.try_publish(0), Ok(()));
632
633 let sub0 = channel.subscriber().unwrap();
634
635 assert_eq!(pub0.try_publish(0), Ok(()));
636 assert_eq!(pub0.try_publish(0), Ok(()));
637 assert_eq!(pub0.try_publish(0), Ok(()));
638 assert_eq!(pub0.try_publish(0), Ok(()));
639 assert!(pub0.is_full());
640 assert_eq!(pub0.try_publish(0), Err(0));
641
642 drop(sub0);
643 }
644
645 #[futures_test::test]
646 async fn correct_available() {
647 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
648
649 let sub0 = channel.subscriber().unwrap();
650 let mut sub1 = channel.subscriber().unwrap();
651 let pub0 = channel.publisher().unwrap();
652
653 assert_eq!(sub0.available(), 0);
654 assert_eq!(sub1.available(), 0);
655
656 pub0.publish(42).await;
657
658 assert_eq!(sub0.available(), 1);
659 assert_eq!(sub1.available(), 1);
660
661 sub1.next_message().await;
662
663 assert_eq!(sub1.available(), 0);
664
665 pub0.publish(42).await;
666
667 assert_eq!(sub0.available(), 2);
668 assert_eq!(sub1.available(), 1);
669 }
670
671 #[futures_test::test]
672 async fn correct_len() {
673 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
674
675 let mut sub0 = channel.subscriber().unwrap();
676 let mut sub1 = channel.subscriber().unwrap();
677 let pub0 = channel.publisher().unwrap();
678
679 assert!(sub0.is_empty());
680 assert!(sub1.is_empty());
681 assert!(pub0.is_empty());
682 assert_eq!(pub0.free_capacity(), 4);
683 assert_eq!(pub0.len(), 0);
684
685 pub0.publish(42).await;
686
687 assert_eq!(pub0.free_capacity(), 3);
688 assert_eq!(pub0.len(), 1);
689
690 pub0.publish(42).await;
691
692 assert_eq!(pub0.free_capacity(), 2);
693 assert_eq!(pub0.len(), 2);
694
695 sub0.next_message().await;
696 sub0.next_message().await;
697
698 assert_eq!(pub0.free_capacity(), 2);
699 assert_eq!(pub0.len(), 2);
700
701 sub1.next_message().await;
702 assert_eq!(pub0.free_capacity(), 3);
703 assert_eq!(pub0.len(), 1);
704
705 sub1.next_message().await;
706 assert_eq!(pub0.free_capacity(), 4);
707 assert_eq!(pub0.len(), 0);
708 }
709
710 #[futures_test::test]
711 async fn empty_channel_when_last_subscriber_is_dropped() {
712 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
713
714 let pub0 = channel.publisher().unwrap();
715 let mut sub0 = channel.subscriber().unwrap();
716 let mut sub1 = channel.subscriber().unwrap();
717
718 assert_eq!(4, pub0.free_capacity());
719
720 pub0.publish(1).await;
721 pub0.publish(2).await;
722
723 assert_eq!(2, channel.free_capacity());
724
725 assert_eq!(1, sub0.try_next_message_pure().unwrap());
726 assert_eq!(2, sub0.try_next_message_pure().unwrap());
727
728 assert_eq!(2, channel.free_capacity());
729
730 drop(sub0);
731
732 assert_eq!(2, channel.free_capacity());
733
734 assert_eq!(1, sub1.try_next_message_pure().unwrap());
735
736 assert_eq!(3, channel.free_capacity());
737
738 drop(sub1);
739
740 assert_eq!(4, channel.free_capacity());
741 }
742
743 struct CloneCallCounter(usize);
744
745 impl Clone for CloneCallCounter {
746 fn clone(&self) -> Self {
747 Self(self.0 + 1)
748 }
749 }
750
751 #[futures_test::test]
752 async fn skip_clone_for_last_message() {
753 let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
754 let pub0 = channel.publisher().unwrap();
755 let mut sub0 = channel.subscriber().unwrap();
756 let mut sub1 = channel.subscriber().unwrap();
757
758 pub0.publish(CloneCallCounter(0)).await;
759
760 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
761 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
762 }
763
764 #[futures_test::test]
765 async fn publisher_sink() {
766 use futures_util::{SinkExt, StreamExt};
767
768 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
769
770 let mut sub = channel.subscriber().unwrap();
771
772 let publ = channel.publisher().unwrap();
773 let mut sink = publ.sink();
774
775 sink.send(0).await.unwrap();
776 assert_eq!(0, sub.try_next_message_pure().unwrap());
777
778 sink.send(1).await.unwrap();
779 assert_eq!(1, sub.try_next_message_pure().unwrap());
780
781 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
782 .await
783 .unwrap();
784 assert_eq!(0, sub.try_next_message_pure().unwrap());
785 assert_eq!(1, sub.try_next_message_pure().unwrap());
786 assert_eq!(2, sub.try_next_message_pure().unwrap());
787 assert_eq!(3, sub.try_next_message_pure().unwrap());
788 }
789}