1use std::collections::{HashMap, VecDeque};
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::time::Duration;
8
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11use tokio::time::{sleep, Instant};
12
13use super::subscriber::{ActiveSubscription, SubscriptionRequest};
14use super::{Error, Event, Pubsub, Spec};
15use crate::task::spawn;
16
17const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
18
19const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
20
21const INTERNAL_POLL_SIZE: usize = 1_000;
22
23const POLL_SLEEP: Duration = Duration::from_millis(2_000);
24
25struct UniqueSubscription<S>
26where
27 S: Spec,
28{
29 name: S::SubscriptionId,
30 total_subscribers: usize,
31}
32
33type UniqueSubscriptions<S> = RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>;
34
35type ActiveSubscriptions<S> =
36 RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
37
38type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
39
40#[allow(missing_debug_implementations)]
42pub struct Consumer<T>
43where
44 T: Transport + 'static,
45{
46 transport: T,
47 inner_pubsub: Arc<Pubsub<T::Spec>>,
48 remote_subscriptions: UniqueSubscriptions<T::Spec>,
49 subscriptions: ActiveSubscriptions<T::Spec>,
50 stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
51 still_running: AtomicBool,
52 prefer_polling: bool,
53 cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
59}
60
61#[allow(missing_debug_implementations)]
63pub struct RemoteActiveConsumer<T>
64where
65 T: Transport + 'static,
66{
67 inner: ActiveSubscription<T::Spec>,
68 previous_messages: VecDeque<<T::Spec as Spec>::Event>,
69 consumer: Arc<Consumer<T>>,
70}
71
72impl<T> RemoteActiveConsumer<T>
73where
74 T: Transport + 'static,
75{
76 pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
78 if let Some(event) = self.previous_messages.pop_front() {
79 Some(event)
80 } else {
81 self.inner.recv().await
82 }
83 }
84
85 pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
87 if let Some(event) = self.previous_messages.pop_front() {
88 Some(event)
89 } else {
90 self.inner.try_recv()
91 }
92 }
93
94 pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
96 self.inner.name()
97 }
98}
99
100impl<T> Drop for RemoteActiveConsumer<T>
101where
102 T: Transport + 'static,
103{
104 fn drop(&mut self) {
105 let _ = self.consumer.unsubscribe(self.name().clone());
106 }
107}
108
109#[allow(missing_debug_implementations)]
112pub struct InternalRelay<S>
113where
114 S: Spec + 'static,
115{
116 inner: Arc<Pubsub<S>>,
117 cached_events: Arc<RwLock<CacheEvent<S>>>,
118}
119
120impl<S> InternalRelay<S>
121where
122 S: Spec + 'static,
123{
124 pub fn send<X>(&self, event: X)
126 where
127 X: Into<S::Event>,
128 {
129 let event = event.into();
130 let mut cached_events = self.cached_events.write();
131
132 for topic in event.get_topics() {
133 cached_events.insert(topic, event.clone());
134 }
135
136 self.inner.publish(event);
137 }
138}
139
140impl<T> Consumer<T>
141where
142 T: Transport + 'static,
143{
144 pub fn new(
146 transport: T,
147 prefer_polling: bool,
148 context: <T::Spec as Spec>::Context,
149 ) -> Arc<Self> {
150 let this = Arc::new(Self {
151 transport,
152 prefer_polling,
153 inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
154 subscriptions: Default::default(),
155 remote_subscriptions: Default::default(),
156 stream_ctrl: RwLock::new(None),
157 cached_events: Default::default(),
158 still_running: true.into(),
159 });
160
161 spawn(Self::stream(this.clone()));
162
163 this
164 }
165
166 async fn stream(instance: Arc<Self>) {
167 let mut stream_supported = true;
168 let mut poll_supported = true;
169
170 let mut backoff = STREAM_CONNECTION_BACKOFF;
171 let mut retry_at = None;
172
173 loop {
174 if (!stream_supported && !poll_supported)
175 || !instance
176 .still_running
177 .load(std::sync::atomic::Ordering::Relaxed)
178 {
179 break;
180 }
181
182 if instance.remote_subscriptions.read().is_empty() {
183 sleep(Duration::from_millis(100)).await;
184 continue;
185 }
186
187 if stream_supported
188 && !instance.prefer_polling
189 && retry_at
190 .map(|retry_at| retry_at < Instant::now())
191 .unwrap_or(true)
192 {
193 let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
194
195 {
196 *instance.stream_ctrl.write() = Some(sender);
197 }
198
199 let current_subscriptions = {
200 instance
201 .remote_subscriptions
202 .read()
203 .iter()
204 .map(|(key, name)| (name.name.clone(), key.clone()))
205 .collect::<Vec<_>>()
206 };
207
208 if let Err(err) = instance
209 .transport
210 .stream(
211 receiver,
212 current_subscriptions,
213 InternalRelay {
214 inner: instance.inner_pubsub.clone(),
215 cached_events: instance.cached_events.clone(),
216 },
217 )
218 .await
219 {
220 retry_at = Some(Instant::now() + backoff);
221 backoff =
222 (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
223
224 if matches!(err, Error::NotSupported) {
225 stream_supported = false;
226 }
227 tracing::error!("Long connection failed with error {:?}", err);
228 } else {
229 backoff = STREAM_CONNECTION_BACKOFF;
230 }
231
232 let _ = instance.stream_ctrl.write().take();
234 }
235
236 if poll_supported {
237 let current_subscriptions = {
238 instance
239 .remote_subscriptions
240 .read()
241 .iter()
242 .map(|(key, name)| (name.name.clone(), key.clone()))
243 .collect::<Vec<_>>()
244 };
245
246 if let Err(err) = instance
247 .transport
248 .poll(
249 current_subscriptions,
250 InternalRelay {
251 inner: instance.inner_pubsub.clone(),
252 cached_events: instance.cached_events.clone(),
253 },
254 )
255 .await
256 {
257 if matches!(err, Error::NotSupported) {
258 poll_supported = false;
259 }
260 tracing::error!("Polling failed with error {:?}", err);
261 }
262
263 sleep(POLL_SLEEP).await;
264 }
265 }
266 }
267
268 fn unsubscribe(
271 self: &Arc<Self>,
272 subscription_name: <T::Spec as Spec>::SubscriptionId,
273 ) -> Result<(), Error> {
274 let topics = self
275 .subscriptions
276 .write()
277 .remove(&subscription_name)
278 .ok_or(Error::NoSubscription)?;
279
280 let mut remote_subscriptions = self.remote_subscriptions.write();
281
282 for topic in topics {
283 let mut remote_subscription =
284 if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
285 remote_subscription
286 } else {
287 continue;
288 };
289
290 remote_subscription.total_subscribers =
291 remote_subscription.total_subscribers.saturating_sub(1);
292
293 if remote_subscription.total_subscribers == 0 {
294 let mut cached_events = self.cached_events.write();
295
296 cached_events.remove(&topic);
297
298 self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
299 } else {
300 remote_subscriptions.insert(topic, remote_subscription);
301 }
302 }
303
304 if remote_subscriptions.is_empty() {
305 self.message_to_stream(StreamCtrl::Stop)?;
306 }
307
308 Ok(())
309 }
310
311 #[inline(always)]
312 fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
313 let to_stream = self.stream_ctrl.read();
314
315 if let Some(to_stream) = to_stream.as_ref() {
316 Ok(to_stream.try_send(message)?)
317 } else {
318 Ok(())
319 }
320 }
321
322 pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
333 where
334 I: SubscriptionRequest<
335 Topic = <T::Spec as Spec>::Topic,
336 SubscriptionId = <T::Spec as Spec>::SubscriptionId,
337 >,
338 {
339 let subscription_name = request.subscription_name();
340 let topics = request.try_get_topics()?;
341
342 let mut remote_subscriptions = self.remote_subscriptions.write();
343 let mut subscriptions = self.subscriptions.write();
344
345 if subscriptions.get(&subscription_name).is_some() {
346 return Err(Error::NoSubscription);
347 }
348
349 let mut previous_messages = Vec::new();
350 let cached_events = self.cached_events.read();
351
352 for topic in topics.iter() {
353 if let Some(subscription) = remote_subscriptions.get_mut(topic) {
354 subscription.total_subscribers += 1;
355
356 if let Some(v) = cached_events.get(topic).cloned() {
357 previous_messages.push(v);
358 }
359 } else {
360 let internal_sub_name = self.transport.new_name();
361 remote_subscriptions.insert(
362 topic.clone(),
363 UniqueSubscription {
364 total_subscribers: 1,
365 name: internal_sub_name.clone(),
366 },
367 );
368
369 self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
371 }
372 }
373
374 subscriptions.insert(subscription_name, topics);
375 drop(subscriptions);
376
377 Ok(RemoteActiveConsumer {
378 inner: self.inner_pubsub.subscribe(request)?,
379 previous_messages: previous_messages.into(),
380 consumer: self.clone(),
381 })
382 }
383}
384
385impl<T> Drop for Consumer<T>
386where
387 T: Transport + 'static,
388{
389 fn drop(&mut self) {
390 self.still_running
391 .store(false, std::sync::atomic::Ordering::Release);
392 if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
393 let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
394 tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
395 });
396 }
397 }
398}
399
400pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
402
403#[allow(missing_debug_implementations)]
405pub enum StreamCtrl<S>
406where
407 S: Spec + 'static,
408{
409 Subscribe(SubscribeMessage<S>),
411 Unsubscribe(S::SubscriptionId),
413 Stop,
415}
416
417impl<S> Clone for StreamCtrl<S>
418where
419 S: Spec + 'static,
420{
421 fn clone(&self) -> Self {
422 match self {
423 Self::Subscribe(s) => Self::Subscribe(s.clone()),
424 Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
425 Self::Stop => Self::Stop,
426 }
427 }
428}
429
430#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
448#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
449pub trait Transport: Send + Sync {
450 type Spec: Spec;
452
453 fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
455
456 async fn stream(
459 &self,
460 subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
461 topics: Vec<SubscribeMessage<Self::Spec>>,
462 reply_to: InternalRelay<Self::Spec>,
463 ) -> Result<(), Error>;
464
465 async fn poll(
468 &self,
469 topics: Vec<SubscribeMessage<Self::Spec>>,
470 reply_to: InternalRelay<Self::Spec>,
471 ) -> Result<(), Error>;
472}
473
474#[cfg(test)]
475mod tests {
476 use std::sync::atomic::{AtomicUsize, Ordering};
477 use std::sync::Arc;
478
479 use tokio::sync::{mpsc, Mutex};
480 use tokio::time::{timeout, Duration};
481
482 use super::{
483 InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
484 INTERNAL_POLL_SIZE,
485 };
486 use crate::pub_sub::remote_consumer::Consumer;
487 use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
488 use crate::pub_sub::{Error, Spec, SubscriptionRequest};
489
490 #[derive(Clone, Debug)]
493 enum SubscriptionReq {
494 Foo(String, u64),
495 Bar(String, u64),
496 }
497
498 impl SubscriptionRequest for SubscriptionReq {
499 type Topic = IndexTest;
500
501 type SubscriptionId = String;
502
503 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
504 Ok(vec![match self {
505 SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
506 SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
507 }])
508 }
509
510 fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
511 Arc::new(match self {
512 SubscriptionReq::Foo(n, _) => n.to_string(),
513 SubscriptionReq::Bar(n, _) => n.to_string(),
514 })
515 }
516 }
517
518 struct TestTransport {
523 name_ctr: AtomicUsize,
524 observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
526 support_long: bool,
528 support_poll: bool,
529 rx: Mutex<mpsc::Receiver<Message>>,
530 }
531
532 impl TestTransport {
533 fn new(
534 support_long: bool,
535 support_poll: bool,
536 ) -> (
537 Self,
538 mpsc::Sender<Message>,
539 mpsc::Receiver<StreamCtrl<CustomPubSub>>,
540 ) {
541 let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
542 let (observe_ctrl_tx, observe_ctrl_rx) =
543 mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
544
545 let t = TestTransport {
546 name_ctr: AtomicUsize::new(1),
547 rx: Mutex::new(rx),
548 observe_ctrl_tx,
549 support_long,
550 support_poll,
551 };
552
553 (t, events_tx, observe_ctrl_rx)
554 }
555 }
556
557 #[async_trait::async_trait]
558 impl Transport for TestTransport {
559 type Spec = CustomPubSub;
560
561 fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
562 format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
563 }
564
565 async fn stream(
566 &self,
567 mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
568 topics: Vec<SubscribeMessage<Self::Spec>>,
569 reply_to: InternalRelay<Self::Spec>,
570 ) -> Result<(), Error> {
571 if !self.support_long {
572 return Err(Error::NotSupported);
573 }
574
575 let mut rx = self.rx.lock().await;
577 let observe = self.observe_ctrl_tx.clone();
578
579 for topic in topics {
580 observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
581 }
582
583 loop {
584 tokio::select! {
585 Some(ctrl) = subscribe_changes.recv() => {
587 observe.try_send(ctrl.clone()).unwrap();
588 if matches!(ctrl, StreamCtrl::Stop) {
589 break;
590 }
591 }
592 Some(msg) = rx.recv() => {
594 reply_to.send(msg);
595 }
596 }
597 }
598
599 Ok(())
600 }
601
602 async fn poll(
603 &self,
604 _topics: Vec<SubscribeMessage<Self::Spec>>,
605 reply_to: InternalRelay<Self::Spec>,
606 ) -> Result<(), Error> {
607 if !self.support_poll {
608 return Err(Error::NotSupported);
609 }
610
611 let mut rx = self.rx.lock().await;
614 for _ in 0..32 {
616 match rx.try_recv() {
617 Ok(msg) => reply_to.send(msg),
618 Err(mpsc::error::TryRecvError::Empty) => continue,
619 Err(mpsc::error::TryRecvError::Disconnected) => break,
620 }
621 }
622 Ok(())
623 }
624 }
625
626 async fn recv_next<T: Transport>(
629 sub: &mut RemoteActiveConsumer<T>,
630 dur_ms: u64,
631 ) -> Option<<T::Spec as Spec>::Event> {
632 timeout(Duration::from_millis(dur_ms), sub.recv())
633 .await
634 .ok()
635 .flatten()
636 }
637
638 async fn expect_ctrl(
639 rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
640 dur_ms: u64,
641 pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
642 ) -> StreamCtrl<CustomPubSub> {
643 timeout(Duration::from_millis(dur_ms), async {
644 loop {
645 if let Some(msg) = rx.recv().await {
646 if pred(&msg) {
647 break msg;
648 }
649 }
650 }
651 })
652 .await
653 .expect("timed out waiting for control message")
654 }
655
656 #[tokio::test]
659 async fn stream_delivery_and_unsubscribe_on_drop() {
660 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
662
663 let consumer = Consumer::new(transport, false, ());
665
666 let mut sub = consumer
668 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
669 .expect("subscribe ok");
670
671 let ctrl = expect_ctrl(
673 &mut ctrl_rx,
674 1000,
675 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
676 )
677 .await;
678 match ctrl {
679 StreamCtrl::Subscribe((name, idx)) => {
680 assert_ne!(name, "t".to_owned());
681 assert_eq!(idx, IndexTest::Foo(7));
682 }
683 _ => unreachable!(),
684 }
685
686 events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
688 let got = recv_next::<TestTransport>(&mut sub, 1000)
689 .await
690 .expect("got event");
691 assert_eq!(got, Message { foo: 7, bar: 1 });
692
693 drop(sub);
695 let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
696 matches!(m, StreamCtrl::Unsubscribe(_))
697 })
698 .await;
699
700 drop(consumer);
702 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
703 }
704
705 #[tokio::test]
706 async fn test_cache_and_invalation() {
707 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
709
710 let consumer = Consumer::new(transport, false, ());
712
713 let mut sub_1 = consumer
715 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
716 .expect("subscribe ok");
717
718 let ctrl = expect_ctrl(
720 &mut ctrl_rx,
721 1000,
722 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
723 )
724 .await;
725 match ctrl {
726 StreamCtrl::Subscribe((name, idx)) => {
727 assert_ne!(name, "t1".to_owned());
728 assert_eq!(idx, IndexTest::Foo(7));
729 }
730 _ => unreachable!(),
731 }
732
733 events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
735 let got = recv_next::<TestTransport>(&mut sub_1, 1000)
736 .await
737 .expect("got event");
738 assert_eq!(got, Message { foo: 7, bar: 1 });
739
740 let mut sub_2 = consumer
742 .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
743 .expect("subscribe ok");
744
745 let got = recv_next::<TestTransport>(&mut sub_2, 1000)
746 .await
747 .expect("got event");
748 assert_eq!(got, Message { foo: 7, bar: 1 });
749
750 drop(sub_1);
752
753 let mut sub_3 = consumer
755 .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
756 .expect("subscribe ok");
757
758 let got = recv_next::<TestTransport>(&mut sub_3, 1000)
760 .await
761 .expect("got event");
762 assert_eq!(got, Message { foo: 7, bar: 1 });
763
764 events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
766
767 let got = recv_next::<TestTransport>(&mut sub_2, 1000)
769 .await
770 .expect("got event");
771 assert_eq!(got, Message { foo: 7, bar: 2 });
772
773 let got = recv_next::<TestTransport>(&mut sub_3, 1000)
774 .await
775 .expect("got event");
776 assert_eq!(got, Message { foo: 7, bar: 2 });
777
778 drop(sub_2);
779 drop(sub_3);
780
781 let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
782 matches!(m, StreamCtrl::Unsubscribe(_))
783 })
784 .await;
785
786 let mut sub_4 = consumer
788 .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
789 .expect("subscribe ok");
790
791 assert!(
792 recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
793 "Should have not receive any update"
794 );
795
796 drop(sub_4);
797
798 let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
800 }
801
802 #[tokio::test]
803 async fn falls_back_to_poll_when_stream_not_supported() {
804 let (transport, events_tx, _) = TestTransport::new(false, true);
806 let consumer = Consumer::new(transport, true, ());
809
810 let mut sub = consumer
812 .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
813 .expect("subscribe ok");
814
815 events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
817 let got = recv_next::<TestTransport>(&mut sub, 1500)
818 .await
819 .expect("event relayed via polling");
820 assert_eq!(got, Message { foo: 9, bar: 5 });
821 }
822
823 #[tokio::test]
824 async fn multiple_subscribers_share_single_remote_subscription() {
825 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
828 let consumer = Consumer::new(transport, false, ());
829
830 let mut a = consumer
832 .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
833 .expect("subscribe A");
834 let _ = expect_ctrl(
835 &mut ctrl_rx,
836 1000,
837 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(1)),
838 )
839 .await;
840
841 let mut b = consumer
842 .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
843 .expect("subscribe B");
844
845 if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
848 timeout(Duration::from_millis(400), ctrl_rx.recv()).await
849 {
850 assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
851 }
852
853 events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
855 let got_a = recv_next::<TestTransport>(&mut a, 1000)
856 .await
857 .expect("A got");
858 let got_b = recv_next::<TestTransport>(&mut b, 1000)
859 .await
860 .expect("B got");
861 assert_eq!(got_a, Message { foo: 1, bar: 42 });
862 assert_eq!(got_b, Message { foo: 1, bar: 42 });
863
864 drop(b);
866 if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
867 timeout(Duration::from_millis(400), ctrl_rx.recv()).await
868 {
869 panic!("Should NOT unsubscribe while another local subscriber exists");
870 }
871
872 drop(a);
874 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
875 matches!(m, StreamCtrl::Unsubscribe(_))
876 })
877 .await;
878
879 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
880 }
881}