embassy_sync/pubsub/
mod.rs

1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2
3#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
24///
25/// Any published message can be read by all subscribers.
26/// A publisher can choose how it sends its message.
27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30///   in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31///   an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy_sync::pubsub::WaitResult;
38/// # use embassy_sync::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
74#[derive(Debug)]
75pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
76    inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
77}
78
79impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
80    PubSubChannel<M, T, CAP, SUBS, PUBS>
81{
82    /// Create a new channel
83    pub const fn new() -> Self {
84        Self {
85            inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
86        }
87    }
88
89    /// Create a new subscriber. It will only receive messages that are published after its creation.
90    ///
91    /// If there are no subscriber slots left, an error will be returned.
92    pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error> {
93        self.inner.lock(|inner| {
94            let mut s = inner.borrow_mut();
95
96            if s.subscriber_count >= SUBS {
97                Err(Error::MaximumSubscribersReached)
98            } else {
99                s.subscriber_count += 1;
100                Ok(Subscriber(Sub::new(s.next_message_id, self)))
101            }
102        })
103    }
104
105    /// Create a new subscriber. It will only receive messages that are published after its creation.
106    ///
107    /// If there are no subscriber slots left, an error will be returned.
108    pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
109        self.inner.lock(|inner| {
110            let mut s = inner.borrow_mut();
111
112            if s.subscriber_count >= SUBS {
113                Err(Error::MaximumSubscribersReached)
114            } else {
115                s.subscriber_count += 1;
116                Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
117            }
118        })
119    }
120
121    /// Create a new publisher
122    ///
123    /// If there are no publisher slots left, an error will be returned.
124    pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error> {
125        self.inner.lock(|inner| {
126            let mut s = inner.borrow_mut();
127
128            if s.publisher_count >= PUBS {
129                Err(Error::MaximumPublishersReached)
130            } else {
131                s.publisher_count += 1;
132                Ok(Publisher(Pub::new(self)))
133            }
134        })
135    }
136
137    /// Create a new publisher
138    ///
139    /// If there are no publisher slots left, an error will be returned.
140    pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
141        self.inner.lock(|inner| {
142            let mut s = inner.borrow_mut();
143
144            if s.publisher_count >= PUBS {
145                Err(Error::MaximumPublishersReached)
146            } else {
147                s.publisher_count += 1;
148                Ok(DynPublisher(Pub::new(self)))
149            }
150        })
151    }
152
153    /// Create a new publisher that can only send immediate messages.
154    /// This kind of publisher does not take up a publisher slot.
155    pub fn immediate_publisher(&self) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS> {
156        ImmediatePublisher(ImmediatePub::new(self))
157    }
158
159    /// Create a new publisher that can only send immediate messages.
160    /// This kind of publisher does not take up a publisher slot.
161    pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T> {
162        DynImmediatePublisher(ImmediatePub::new(self))
163    }
164
165    /// Returns the maximum number of elements the channel can hold.
166    pub const fn capacity(&self) -> usize {
167        CAP
168    }
169
170    /// Returns the free capacity of the channel.
171    ///
172    /// This is equivalent to `capacity() - len()`
173    pub fn free_capacity(&self) -> usize {
174        CAP - self.len()
175    }
176
177    /// Clears all elements in the channel.
178    pub fn clear(&self) {
179        self.inner.lock(|inner| inner.borrow_mut().clear());
180    }
181
182    /// Returns the number of elements currently in the channel.
183    pub fn len(&self) -> usize {
184        self.inner.lock(|inner| inner.borrow().len())
185    }
186
187    /// Returns whether the channel is empty.
188    pub fn is_empty(&self) -> bool {
189        self.inner.lock(|inner| inner.borrow().is_empty())
190    }
191
192    /// Returns whether the channel is full.
193    pub fn is_full(&self) -> bool {
194        self.inner.lock(|inner| inner.borrow().is_full())
195    }
196}
197
198impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
199    for PubSubChannel<M, T, CAP, SUBS, PUBS>
200{
201    fn publish_immediate(&self, message: T) {
202        self.inner.lock(|s| {
203            let mut s = s.borrow_mut();
204            s.publish_immediate(message)
205        })
206    }
207
208    fn capacity(&self) -> usize {
209        self.capacity()
210    }
211
212    fn is_full(&self) -> bool {
213        self.is_full()
214    }
215}
216
217impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
218    for PubSubChannel<M, T, CAP, SUBS, PUBS>
219{
220    fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
221        self.inner.lock(|s| {
222            let mut s = s.borrow_mut();
223
224            // Check if we can read a message
225            match s.get_message(*next_message_id) {
226                // Yes, so we are done polling
227                Some(WaitResult::Message(message)) => {
228                    *next_message_id += 1;
229                    Poll::Ready(WaitResult::Message(message))
230                }
231                // No, so we need to reregister our waker and sleep again
232                None => {
233                    if let Some(cx) = cx {
234                        s.subscriber_wakers.register(cx.waker());
235                    }
236                    Poll::Pending
237                }
238                // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
239                Some(WaitResult::Lagged(amount)) => {
240                    *next_message_id += amount;
241                    Poll::Ready(WaitResult::Lagged(amount))
242                }
243            }
244        })
245    }
246
247    fn available(&self, next_message_id: u64) -> u64 {
248        self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
249    }
250
251    fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
252        self.inner.lock(|s| {
253            let mut s = s.borrow_mut();
254            // Try to publish the message
255            match s.try_publish(message) {
256                // We did it, we are ready
257                Ok(()) => Ok(()),
258                // The queue is full, so we need to reregister our waker and go to sleep
259                Err(message) => {
260                    if let Some(cx) = cx {
261                        s.publisher_wakers.register(cx.waker());
262                    }
263                    Err(message)
264                }
265            }
266        })
267    }
268
269    fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
270        self.inner.lock(|s| {
271            let mut s = s.borrow_mut();
272            s.unregister_subscriber(subscriber_next_message_id)
273        })
274    }
275
276    fn unregister_publisher(&self) {
277        self.inner.lock(|s| {
278            let mut s = s.borrow_mut();
279            s.unregister_publisher()
280        })
281    }
282
283    fn free_capacity(&self) -> usize {
284        self.free_capacity()
285    }
286
287    fn clear(&self) {
288        self.clear();
289    }
290
291    fn len(&self) -> usize {
292        self.len()
293    }
294
295    fn is_empty(&self) -> bool {
296        self.is_empty()
297    }
298}
299
300/// Internal state for the PubSub channel
301#[derive(Debug)]
302struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
303    /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
304    queue: Deque<(T, usize), CAP>,
305    /// Every message has an id.
306    /// Don't worry, we won't run out.
307    /// If a million messages were published every second, then the ID's would run out in about 584942 years.
308    next_message_id: u64,
309    /// Collection of wakers for Subscribers that are waiting.  
310    subscriber_wakers: MultiWakerRegistration<SUBS>,
311    /// Collection of wakers for Publishers that are waiting.  
312    publisher_wakers: MultiWakerRegistration<PUBS>,
313    /// The amount of subscribers that are active
314    subscriber_count: usize,
315    /// The amount of publishers that are active
316    publisher_count: usize,
317}
318
319impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
320    /// Create a new internal channel state
321    const fn new() -> Self {
322        Self {
323            queue: Deque::new(),
324            next_message_id: 0,
325            subscriber_wakers: MultiWakerRegistration::new(),
326            publisher_wakers: MultiWakerRegistration::new(),
327            subscriber_count: 0,
328            publisher_count: 0,
329        }
330    }
331
332    fn try_publish(&mut self, message: T) -> Result<(), T> {
333        if self.subscriber_count == 0 {
334            // We don't need to publish anything because there is no one to receive it
335            return Ok(());
336        }
337
338        if self.queue.is_full() {
339            return Err(message);
340        }
341        // We just did a check for this
342        self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
343
344        self.next_message_id += 1;
345
346        // Wake all of the subscribers
347        self.subscriber_wakers.wake();
348
349        Ok(())
350    }
351
352    fn publish_immediate(&mut self, message: T) {
353        // Make space in the queue if required
354        if self.queue.is_full() {
355            self.queue.pop_front();
356        }
357
358        // This will succeed because we made sure there is space
359        self.try_publish(message).ok().unwrap();
360    }
361
362    fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
363        let start_id = self.next_message_id - self.queue.len() as u64;
364
365        if message_id < start_id {
366            return Some(WaitResult::Lagged(start_id - message_id));
367        }
368
369        let current_message_index = (message_id - start_id) as usize;
370
371        if current_message_index >= self.queue.len() {
372            return None;
373        }
374
375        // We've checked that the index is valid
376        let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
377
378        // We're reading this item, so decrement the counter
379        queue_item.1 -= 1;
380
381        let message = if current_message_index == 0 && queue_item.1 == 0 {
382            let (message, _) = self.queue.pop_front().unwrap();
383            self.publisher_wakers.wake();
384            // Return pop'd message without clone
385            message
386        } else {
387            queue_item.0.clone()
388        };
389
390        Some(WaitResult::Message(message))
391    }
392
393    fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
394        self.subscriber_count -= 1;
395
396        // All messages that haven't been read yet by this subscriber must have their counter decremented
397        let start_id = self.next_message_id - self.queue.len() as u64;
398        if subscriber_next_message_id >= start_id {
399            let current_message_index = (subscriber_next_message_id - start_id) as usize;
400            self.queue
401                .iter_mut()
402                .skip(current_message_index)
403                .for_each(|(_, counter)| *counter -= 1);
404
405            let mut wake_publishers = false;
406            while let Some((_, count)) = self.queue.front() {
407                if *count == 0 {
408                    self.queue.pop_front().unwrap();
409                    wake_publishers = true;
410                } else {
411                    break;
412                }
413            }
414
415            if wake_publishers {
416                self.publisher_wakers.wake();
417            }
418        }
419    }
420
421    fn unregister_publisher(&mut self) {
422        self.publisher_count -= 1;
423    }
424
425    fn clear(&mut self) {
426        if self.is_full() {
427            self.publisher_wakers.wake();
428        }
429        self.queue.clear();
430    }
431
432    fn len(&self) -> usize {
433        self.queue.len()
434    }
435
436    fn is_empty(&self) -> bool {
437        self.queue.is_empty()
438    }
439
440    fn is_full(&self) -> bool {
441        self.queue.is_full()
442    }
443}
444
445/// Error type for the [PubSubChannel]
446#[derive(Debug, PartialEq, Eq, Clone, Copy)]
447#[cfg_attr(feature = "defmt", derive(defmt::Format))]
448pub enum Error {
449    /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
450    /// the capacity of the channels must be increased.
451    MaximumSubscribersReached,
452    /// All publisher slots are used. To add another publisher, first another publisher must be dropped or
453    /// the capacity of the channels must be increased.
454    MaximumPublishersReached,
455}
456
457trait SealedPubSubBehavior<T> {
458    /// Try to get a message from the queue with the given message id.
459    ///
460    /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers.
461    fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
462
463    /// Get the amount of messages that are between the given the next_message_id and the most recent message.
464    /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
465    fn available(&self, next_message_id: u64) -> u64;
466
467    /// Try to publish a message to the queue.
468    ///
469    /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
470    fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
471
472    /// Returns the free capacity of the channel.
473    ///
474    /// This is equivalent to `capacity() - len()`
475    fn free_capacity(&self) -> usize;
476
477    /// Clears all elements in the channel.
478    fn clear(&self);
479
480    /// Returns the number of elements currently in the channel.
481    fn len(&self) -> usize;
482
483    /// Returns whether the channel is empty.
484    fn is_empty(&self) -> bool;
485
486    /// Let the channel know that a subscriber has dropped
487    fn unregister_subscriber(&self, subscriber_next_message_id: u64);
488
489    /// Let the channel know that a publisher has dropped
490    fn unregister_publisher(&self);
491}
492
493/// 'Middle level' behaviour of the pubsub channel.
494/// This trait is used so that Sub and Pub can be generic over the channel.
495#[allow(private_bounds)]
496pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
497    /// Publish a message immediately
498    fn publish_immediate(&self, message: T);
499
500    /// Returns the maximum number of elements the channel can hold.
501    fn capacity(&self) -> usize;
502
503    /// Returns whether the channel is full.
504    fn is_full(&self) -> bool;
505}
506
507/// The result of the subscriber wait procedure
508#[derive(Debug, Clone, PartialEq, Eq)]
509#[cfg_attr(feature = "defmt", derive(defmt::Format))]
510pub enum WaitResult<T> {
511    /// The subscriber did not receive all messages and lagged by the given amount of messages.
512    /// (This is the amount of messages that were missed)
513    Lagged(u64),
514    /// A message was received
515    Message(T),
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::blocking_mutex::raw::NoopRawMutex;
522
523    #[futures_test::test]
524    async fn dyn_pub_sub_works() {
525        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
526
527        let mut sub0 = channel.dyn_subscriber().unwrap();
528        let mut sub1 = channel.dyn_subscriber().unwrap();
529        let pub0 = channel.dyn_publisher().unwrap();
530
531        pub0.publish(42).await;
532
533        assert_eq!(sub0.next_message().await, WaitResult::Message(42));
534        assert_eq!(sub1.next_message().await, WaitResult::Message(42));
535
536        assert_eq!(sub0.try_next_message(), None);
537        assert_eq!(sub1.try_next_message(), None);
538    }
539
540    #[futures_test::test]
541    async fn all_subscribers_receive() {
542        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
543
544        let mut sub0 = channel.subscriber().unwrap();
545        let mut sub1 = channel.subscriber().unwrap();
546        let pub0 = channel.publisher().unwrap();
547
548        pub0.publish(42).await;
549
550        assert_eq!(sub0.next_message().await, WaitResult::Message(42));
551        assert_eq!(sub1.next_message().await, WaitResult::Message(42));
552
553        assert_eq!(sub0.try_next_message(), None);
554        assert_eq!(sub1.try_next_message(), None);
555    }
556
557    #[futures_test::test]
558    async fn lag_when_queue_full_on_immediate_publish() {
559        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
560
561        let mut sub0 = channel.subscriber().unwrap();
562        let pub0 = channel.publisher().unwrap();
563
564        pub0.publish_immediate(42);
565        pub0.publish_immediate(43);
566        pub0.publish_immediate(44);
567        pub0.publish_immediate(45);
568        pub0.publish_immediate(46);
569        pub0.publish_immediate(47);
570
571        assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
572        assert_eq!(sub0.next_message().await, WaitResult::Message(44));
573        assert_eq!(sub0.next_message().await, WaitResult::Message(45));
574        assert_eq!(sub0.next_message().await, WaitResult::Message(46));
575        assert_eq!(sub0.next_message().await, WaitResult::Message(47));
576        assert_eq!(sub0.try_next_message(), None);
577    }
578
579    #[test]
580    fn limited_subs_and_pubs() {
581        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
582
583        let sub0 = channel.subscriber();
584        let sub1 = channel.subscriber();
585        let sub2 = channel.subscriber();
586        let sub3 = channel.subscriber();
587        let sub4 = channel.subscriber();
588
589        assert!(sub0.is_ok());
590        assert!(sub1.is_ok());
591        assert!(sub2.is_ok());
592        assert!(sub3.is_ok());
593        assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
594
595        drop(sub0);
596
597        let sub5 = channel.subscriber();
598        assert!(sub5.is_ok());
599
600        // publishers
601
602        let pub0 = channel.publisher();
603        let pub1 = channel.publisher();
604        let pub2 = channel.publisher();
605        let pub3 = channel.publisher();
606        let pub4 = channel.publisher();
607
608        assert!(pub0.is_ok());
609        assert!(pub1.is_ok());
610        assert!(pub2.is_ok());
611        assert!(pub3.is_ok());
612        assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
613
614        drop(pub0);
615
616        let pub5 = channel.publisher();
617        assert!(pub5.is_ok());
618    }
619
620    #[test]
621    fn publisher_wait_on_full_queue() {
622        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
623
624        let pub0 = channel.publisher().unwrap();
625
626        // There are no subscribers, so the queue will never be full
627        assert_eq!(pub0.try_publish(0), Ok(()));
628        assert_eq!(pub0.try_publish(0), Ok(()));
629        assert_eq!(pub0.try_publish(0), Ok(()));
630        assert_eq!(pub0.try_publish(0), Ok(()));
631        assert_eq!(pub0.try_publish(0), Ok(()));
632
633        let sub0 = channel.subscriber().unwrap();
634
635        assert_eq!(pub0.try_publish(0), Ok(()));
636        assert_eq!(pub0.try_publish(0), Ok(()));
637        assert_eq!(pub0.try_publish(0), Ok(()));
638        assert_eq!(pub0.try_publish(0), Ok(()));
639        assert!(pub0.is_full());
640        assert_eq!(pub0.try_publish(0), Err(0));
641
642        drop(sub0);
643    }
644
645    #[futures_test::test]
646    async fn correct_available() {
647        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
648
649        let sub0 = channel.subscriber().unwrap();
650        let mut sub1 = channel.subscriber().unwrap();
651        let pub0 = channel.publisher().unwrap();
652
653        assert_eq!(sub0.available(), 0);
654        assert_eq!(sub1.available(), 0);
655
656        pub0.publish(42).await;
657
658        assert_eq!(sub0.available(), 1);
659        assert_eq!(sub1.available(), 1);
660
661        sub1.next_message().await;
662
663        assert_eq!(sub1.available(), 0);
664
665        pub0.publish(42).await;
666
667        assert_eq!(sub0.available(), 2);
668        assert_eq!(sub1.available(), 1);
669    }
670
671    #[futures_test::test]
672    async fn correct_len() {
673        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
674
675        let mut sub0 = channel.subscriber().unwrap();
676        let mut sub1 = channel.subscriber().unwrap();
677        let pub0 = channel.publisher().unwrap();
678
679        assert!(sub0.is_empty());
680        assert!(sub1.is_empty());
681        assert!(pub0.is_empty());
682        assert_eq!(pub0.free_capacity(), 4);
683        assert_eq!(pub0.len(), 0);
684
685        pub0.publish(42).await;
686
687        assert_eq!(pub0.free_capacity(), 3);
688        assert_eq!(pub0.len(), 1);
689
690        pub0.publish(42).await;
691
692        assert_eq!(pub0.free_capacity(), 2);
693        assert_eq!(pub0.len(), 2);
694
695        sub0.next_message().await;
696        sub0.next_message().await;
697
698        assert_eq!(pub0.free_capacity(), 2);
699        assert_eq!(pub0.len(), 2);
700
701        sub1.next_message().await;
702        assert_eq!(pub0.free_capacity(), 3);
703        assert_eq!(pub0.len(), 1);
704
705        sub1.next_message().await;
706        assert_eq!(pub0.free_capacity(), 4);
707        assert_eq!(pub0.len(), 0);
708    }
709
710    #[futures_test::test]
711    async fn empty_channel_when_last_subscriber_is_dropped() {
712        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
713
714        let pub0 = channel.publisher().unwrap();
715        let mut sub0 = channel.subscriber().unwrap();
716        let mut sub1 = channel.subscriber().unwrap();
717
718        assert_eq!(4, pub0.free_capacity());
719
720        pub0.publish(1).await;
721        pub0.publish(2).await;
722
723        assert_eq!(2, channel.free_capacity());
724
725        assert_eq!(1, sub0.try_next_message_pure().unwrap());
726        assert_eq!(2, sub0.try_next_message_pure().unwrap());
727
728        assert_eq!(2, channel.free_capacity());
729
730        drop(sub0);
731
732        assert_eq!(2, channel.free_capacity());
733
734        assert_eq!(1, sub1.try_next_message_pure().unwrap());
735
736        assert_eq!(3, channel.free_capacity());
737
738        drop(sub1);
739
740        assert_eq!(4, channel.free_capacity());
741    }
742
743    struct CloneCallCounter(usize);
744
745    impl Clone for CloneCallCounter {
746        fn clone(&self) -> Self {
747            Self(self.0 + 1)
748        }
749    }
750
751    #[futures_test::test]
752    async fn skip_clone_for_last_message() {
753        let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
754        let pub0 = channel.publisher().unwrap();
755        let mut sub0 = channel.subscriber().unwrap();
756        let mut sub1 = channel.subscriber().unwrap();
757
758        pub0.publish(CloneCallCounter(0)).await;
759
760        assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
761        assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
762    }
763
764    #[futures_test::test]
765    async fn publisher_sink() {
766        use futures_util::{SinkExt, StreamExt};
767
768        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
769
770        let mut sub = channel.subscriber().unwrap();
771
772        let publ = channel.publisher().unwrap();
773        let mut sink = publ.sink();
774
775        sink.send(0).await.unwrap();
776        assert_eq!(0, sub.try_next_message_pure().unwrap());
777
778        sink.send(1).await.unwrap();
779        assert_eq!(1, sub.try_next_message_pure().unwrap());
780
781        sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
782            .await
783            .unwrap();
784        assert_eq!(0, sub.try_next_message_pure().unwrap());
785        assert_eq!(1, sub.try_next_message_pure().unwrap());
786        assert_eq!(2, sub.try_next_message_pure().unwrap());
787        assert_eq!(3, sub.try_next_message_pure().unwrap());
788    }
789}