1use std::collections::{HashMap, VecDeque};
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::time::Duration;
8
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11use tokio::time::{sleep, Instant};
12
13use super::subscriber::{ActiveSubscription, SubscriptionRequest};
14use super::{Error, Event, Pubsub, Spec};
15use crate::task::spawn;
16
17const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
18
19const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
20
21const INTERNAL_POLL_SIZE: usize = 1_000;
22
23const POLL_SLEEP: Duration = Duration::from_millis(2_000);
24
25struct UniqueSubscription<S>
26where
27 S: Spec,
28{
29 name: S::SubscriptionId,
30 total_subscribers: usize,
31}
32
33type UniqueSubscriptions<S> = RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>;
34
35type ActiveSubscriptions<S> =
36 RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
37
38type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
39
40pub struct Consumer<T>
42where
43 T: Transport + 'static,
44{
45 transport: T,
46 inner_pubsub: Arc<Pubsub<T::Spec>>,
47 remote_subscriptions: UniqueSubscriptions<T::Spec>,
48 subscriptions: ActiveSubscriptions<T::Spec>,
49 stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
50 still_running: AtomicBool,
51 prefer_polling: bool,
52 cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
58}
59
60pub struct RemoteActiveConsumer<T>
62where
63 T: Transport + 'static,
64{
65 inner: ActiveSubscription<T::Spec>,
66 previous_messages: VecDeque<<T::Spec as Spec>::Event>,
67 consumer: Arc<Consumer<T>>,
68}
69
70impl<T> RemoteActiveConsumer<T>
71where
72 T: Transport + 'static,
73{
74 pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
76 if let Some(event) = self.previous_messages.pop_front() {
77 Some(event)
78 } else {
79 self.inner.recv().await
80 }
81 }
82
83 pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
85 if let Some(event) = self.previous_messages.pop_front() {
86 Some(event)
87 } else {
88 self.inner.try_recv()
89 }
90 }
91
92 pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
94 self.inner.name()
95 }
96}
97
98impl<T> Drop for RemoteActiveConsumer<T>
99where
100 T: Transport + 'static,
101{
102 fn drop(&mut self) {
103 let _ = self.consumer.unsubscribe(self.name().clone());
104 }
105}
106
107pub struct InternalRelay<S>
110where
111 S: Spec + 'static,
112{
113 inner: Arc<Pubsub<S>>,
114 cached_events: Arc<RwLock<CacheEvent<S>>>,
115}
116
117impl<S> InternalRelay<S>
118where
119 S: Spec + 'static,
120{
121 pub fn send<X>(&self, event: X)
123 where
124 X: Into<S::Event>,
125 {
126 let event = event.into();
127 let mut cached_events = self.cached_events.write();
128
129 for topic in event.get_topics() {
130 cached_events.insert(topic, event.clone());
131 }
132
133 self.inner.publish(event);
134 }
135}
136
137impl<T> Consumer<T>
138where
139 T: Transport + 'static,
140{
141 pub fn new(
143 transport: T,
144 prefer_polling: bool,
145 context: <T::Spec as Spec>::Context,
146 ) -> Arc<Self> {
147 let this = Arc::new(Self {
148 transport,
149 prefer_polling,
150 inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
151 subscriptions: Default::default(),
152 remote_subscriptions: Default::default(),
153 stream_ctrl: RwLock::new(None),
154 cached_events: Default::default(),
155 still_running: true.into(),
156 });
157
158 spawn(Self::stream(this.clone()));
159
160 this
161 }
162
163 async fn stream(instance: Arc<Self>) {
164 let mut stream_supported = true;
165 let mut poll_supported = true;
166
167 let mut backoff = STREAM_CONNECTION_BACKOFF;
168 let mut retry_at = None;
169
170 loop {
171 if (!stream_supported && !poll_supported)
172 || !instance
173 .still_running
174 .load(std::sync::atomic::Ordering::Relaxed)
175 {
176 break;
177 }
178
179 if instance.remote_subscriptions.read().is_empty() {
180 sleep(Duration::from_millis(100)).await;
181 continue;
182 }
183
184 if stream_supported
185 && !instance.prefer_polling
186 && retry_at
187 .map(|retry_at| retry_at < Instant::now())
188 .unwrap_or(true)
189 {
190 let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
191
192 {
193 *instance.stream_ctrl.write() = Some(sender);
194 }
195
196 let current_subscriptions = {
197 instance
198 .remote_subscriptions
199 .read()
200 .iter()
201 .map(|(key, name)| (name.name.clone(), key.clone()))
202 .collect::<Vec<_>>()
203 };
204
205 if let Err(err) = instance
206 .transport
207 .stream(
208 receiver,
209 current_subscriptions,
210 InternalRelay {
211 inner: instance.inner_pubsub.clone(),
212 cached_events: instance.cached_events.clone(),
213 },
214 )
215 .await
216 {
217 retry_at = Some(Instant::now() + backoff);
218 backoff =
219 (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
220
221 if matches!(err, Error::NotSupported) {
222 stream_supported = false;
223 }
224 tracing::error!("Long connection failed with error {:?}", err);
225 } else {
226 backoff = STREAM_CONNECTION_BACKOFF;
227 }
228
229 let _ = instance.stream_ctrl.write().take();
231 }
232
233 if poll_supported {
234 let current_subscriptions = {
235 instance
236 .remote_subscriptions
237 .read()
238 .iter()
239 .map(|(key, name)| (name.name.clone(), key.clone()))
240 .collect::<Vec<_>>()
241 };
242
243 if let Err(err) = instance
244 .transport
245 .poll(
246 current_subscriptions,
247 InternalRelay {
248 inner: instance.inner_pubsub.clone(),
249 cached_events: instance.cached_events.clone(),
250 },
251 )
252 .await
253 {
254 if matches!(err, Error::NotSupported) {
255 poll_supported = false;
256 }
257 tracing::error!("Polling failed with error {:?}", err);
258 }
259
260 sleep(POLL_SLEEP).await;
261 }
262 }
263 }
264
265 fn unsubscribe(
268 self: &Arc<Self>,
269 subscription_name: <T::Spec as Spec>::SubscriptionId,
270 ) -> Result<(), Error> {
271 let topics = self
272 .subscriptions
273 .write()
274 .remove(&subscription_name)
275 .ok_or(Error::NoSubscription)?;
276
277 let mut remote_subscriptions = self.remote_subscriptions.write();
278
279 for topic in topics {
280 let mut remote_subscription =
281 if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
282 remote_subscription
283 } else {
284 continue;
285 };
286
287 remote_subscription.total_subscribers = remote_subscription
288 .total_subscribers
289 .checked_sub(1)
290 .unwrap_or_default();
291
292 if remote_subscription.total_subscribers == 0 {
293 let mut cached_events = self.cached_events.write();
294
295 cached_events.remove(&topic);
296
297 self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
298 } else {
299 remote_subscriptions.insert(topic, remote_subscription);
300 }
301 }
302
303 if remote_subscriptions.is_empty() {
304 self.message_to_stream(StreamCtrl::Stop)?;
305 }
306
307 Ok(())
308 }
309
310 #[inline(always)]
311 fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
312 let to_stream = self.stream_ctrl.read();
313
314 if let Some(to_stream) = to_stream.as_ref() {
315 Ok(to_stream.try_send(message)?)
316 } else {
317 Ok(())
318 }
319 }
320
321 pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
332 where
333 I: SubscriptionRequest<
334 Topic = <T::Spec as Spec>::Topic,
335 SubscriptionId = <T::Spec as Spec>::SubscriptionId,
336 >,
337 {
338 let subscription_name = request.subscription_name();
339 let topics = request.try_get_topics()?;
340
341 let mut remote_subscriptions = self.remote_subscriptions.write();
342 let mut subscriptions = self.subscriptions.write();
343
344 if subscriptions.get(&subscription_name).is_some() {
345 return Err(Error::NoSubscription);
346 }
347
348 let mut previous_messages = Vec::new();
349 let cached_events = self.cached_events.read();
350
351 for topic in topics.iter() {
352 if let Some(subscription) = remote_subscriptions.get_mut(topic) {
353 subscription.total_subscribers += 1;
354
355 if let Some(v) = cached_events.get(topic).cloned() {
356 previous_messages.push(v);
357 }
358 } else {
359 let internal_sub_name = self.transport.new_name();
360 remote_subscriptions.insert(
361 topic.clone(),
362 UniqueSubscription {
363 total_subscribers: 1,
364 name: internal_sub_name.clone(),
365 },
366 );
367
368 self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
370 }
371 }
372
373 subscriptions.insert(subscription_name, topics);
374 drop(subscriptions);
375
376 Ok(RemoteActiveConsumer {
377 inner: self.inner_pubsub.subscribe(request)?,
378 previous_messages: previous_messages.into(),
379 consumer: self.clone(),
380 })
381 }
382}
383
384impl<T> Drop for Consumer<T>
385where
386 T: Transport + 'static,
387{
388 fn drop(&mut self) {
389 self.still_running
390 .store(false, std::sync::atomic::Ordering::Release);
391 if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
392 let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
393 tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
394 });
395 }
396 }
397}
398
399pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
401
402pub enum StreamCtrl<S>
404where
405 S: Spec + 'static,
406{
407 Subscribe(SubscribeMessage<S>),
409 Unsubscribe(S::SubscriptionId),
411 Stop,
413}
414
415impl<S> Clone for StreamCtrl<S>
416where
417 S: Spec + 'static,
418{
419 fn clone(&self) -> Self {
420 match self {
421 Self::Subscribe(s) => Self::Subscribe(s.clone()),
422 Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
423 Self::Stop => Self::Stop,
424 }
425 }
426}
427
428#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
446#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
447pub trait Transport: Send + Sync {
448 type Spec: Spec;
450
451 fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
453
454 async fn stream(
457 &self,
458 subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
459 topics: Vec<SubscribeMessage<Self::Spec>>,
460 reply_to: InternalRelay<Self::Spec>,
461 ) -> Result<(), Error>;
462
463 async fn poll(
466 &self,
467 topics: Vec<SubscribeMessage<Self::Spec>>,
468 reply_to: InternalRelay<Self::Spec>,
469 ) -> Result<(), Error>;
470}
471
472#[cfg(test)]
473mod tests {
474 use std::sync::atomic::{AtomicUsize, Ordering};
475 use std::sync::Arc;
476
477 use tokio::sync::{mpsc, Mutex};
478 use tokio::time::{timeout, Duration};
479
480 use super::{
481 InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
482 INTERNAL_POLL_SIZE,
483 };
484 use crate::pub_sub::remote_consumer::Consumer;
485 use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
486 use crate::pub_sub::{Error, Spec, SubscriptionRequest};
487
488 #[derive(Clone, Debug)]
491 enum SubscriptionReq {
492 Foo(String, u64),
493 Bar(String, u64),
494 }
495
496 impl SubscriptionRequest for SubscriptionReq {
497 type Topic = IndexTest;
498
499 type SubscriptionId = String;
500
501 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
502 Ok(vec![match self {
503 SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
504 SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
505 }])
506 }
507
508 fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
509 Arc::new(match self {
510 SubscriptionReq::Foo(n, _) => n.to_string(),
511 SubscriptionReq::Bar(n, _) => n.to_string(),
512 })
513 }
514 }
515
516 struct TestTransport {
521 name_ctr: AtomicUsize,
522 observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
524 support_long: bool,
526 support_poll: bool,
527 rx: Mutex<mpsc::Receiver<Message>>,
528 }
529
530 impl TestTransport {
531 fn new(
532 support_long: bool,
533 support_poll: bool,
534 ) -> (
535 Self,
536 mpsc::Sender<Message>,
537 mpsc::Receiver<StreamCtrl<CustomPubSub>>,
538 ) {
539 let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
540 let (observe_ctrl_tx, observe_ctrl_rx) =
541 mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
542
543 let t = TestTransport {
544 name_ctr: AtomicUsize::new(1),
545 rx: Mutex::new(rx),
546 observe_ctrl_tx,
547 support_long,
548 support_poll,
549 };
550
551 (t, events_tx, observe_ctrl_rx)
552 }
553 }
554
555 #[async_trait::async_trait]
556 impl Transport for TestTransport {
557 type Spec = CustomPubSub;
558
559 fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
560 format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
561 }
562
563 async fn stream(
564 &self,
565 mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
566 topics: Vec<SubscribeMessage<Self::Spec>>,
567 reply_to: InternalRelay<Self::Spec>,
568 ) -> Result<(), Error> {
569 if !self.support_long {
570 return Err(Error::NotSupported);
571 }
572
573 let mut rx = self.rx.lock().await;
575 let observe = self.observe_ctrl_tx.clone();
576
577 for topic in topics {
578 observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
579 }
580
581 loop {
582 tokio::select! {
583 Some(ctrl) = subscribe_changes.recv() => {
585 observe.try_send(ctrl.clone()).unwrap();
586 if matches!(ctrl, StreamCtrl::Stop) {
587 break;
588 }
589 }
590 Some(msg) = rx.recv() => {
592 reply_to.send(msg);
593 }
594 }
595 }
596
597 Ok(())
598 }
599
600 async fn poll(
601 &self,
602 _topics: Vec<SubscribeMessage<Self::Spec>>,
603 reply_to: InternalRelay<Self::Spec>,
604 ) -> Result<(), Error> {
605 if !self.support_poll {
606 return Err(Error::NotSupported);
607 }
608
609 let mut rx = self.rx.lock().await;
612 for _ in 0..32 {
614 match rx.try_recv() {
615 Ok(msg) => reply_to.send(msg),
616 Err(mpsc::error::TryRecvError::Empty) => continue,
617 Err(mpsc::error::TryRecvError::Disconnected) => break,
618 }
619 }
620 Ok(())
621 }
622 }
623
624 async fn recv_next<T: Transport>(
627 sub: &mut RemoteActiveConsumer<T>,
628 dur_ms: u64,
629 ) -> Option<<T::Spec as Spec>::Event> {
630 timeout(Duration::from_millis(dur_ms), sub.recv())
631 .await
632 .ok()
633 .flatten()
634 }
635
636 async fn expect_ctrl(
637 rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
638 dur_ms: u64,
639 pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
640 ) -> StreamCtrl<CustomPubSub> {
641 timeout(Duration::from_millis(dur_ms), async {
642 loop {
643 if let Some(msg) = rx.recv().await {
644 if pred(&msg) {
645 break msg;
646 }
647 }
648 }
649 })
650 .await
651 .expect("timed out waiting for control message")
652 }
653
654 #[tokio::test]
657 async fn stream_delivery_and_unsubscribe_on_drop() {
658 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
660
661 let consumer = Consumer::new(transport, false, ());
663
664 let mut sub = consumer
666 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
667 .expect("subscribe ok");
668
669 let ctrl = expect_ctrl(
671 &mut ctrl_rx,
672 1000,
673 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
674 )
675 .await;
676 match ctrl {
677 StreamCtrl::Subscribe((name, idx)) => {
678 assert_ne!(name, "t".to_owned());
679 assert_eq!(idx, IndexTest::Foo(7));
680 }
681 _ => unreachable!(),
682 }
683
684 events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
686 let got = recv_next::<TestTransport>(&mut sub, 1000)
687 .await
688 .expect("got event");
689 assert_eq!(got, Message { foo: 7, bar: 1 });
690
691 drop(sub);
693 let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
694 matches!(m, StreamCtrl::Unsubscribe(_))
695 })
696 .await;
697
698 drop(consumer);
700 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
701 }
702
703 #[tokio::test]
704 async fn test_cache_and_invalation() {
705 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
707
708 let consumer = Consumer::new(transport, false, ());
710
711 let mut sub_1 = consumer
713 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
714 .expect("subscribe ok");
715
716 let ctrl = expect_ctrl(
718 &mut ctrl_rx,
719 1000,
720 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
721 )
722 .await;
723 match ctrl {
724 StreamCtrl::Subscribe((name, idx)) => {
725 assert_ne!(name, "t1".to_owned());
726 assert_eq!(idx, IndexTest::Foo(7));
727 }
728 _ => unreachable!(),
729 }
730
731 events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
733 let got = recv_next::<TestTransport>(&mut sub_1, 1000)
734 .await
735 .expect("got event");
736 assert_eq!(got, Message { foo: 7, bar: 1 });
737
738 let mut sub_2 = consumer
740 .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
741 .expect("subscribe ok");
742
743 let got = recv_next::<TestTransport>(&mut sub_2, 1000)
744 .await
745 .expect("got event");
746 assert_eq!(got, Message { foo: 7, bar: 1 });
747
748 drop(sub_1);
750
751 let mut sub_3 = consumer
753 .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
754 .expect("subscribe ok");
755
756 let got = recv_next::<TestTransport>(&mut sub_3, 1000)
758 .await
759 .expect("got event");
760 assert_eq!(got, Message { foo: 7, bar: 1 });
761
762 events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
764
765 let got = recv_next::<TestTransport>(&mut sub_2, 1000)
767 .await
768 .expect("got event");
769 assert_eq!(got, Message { foo: 7, bar: 2 });
770
771 let got = recv_next::<TestTransport>(&mut sub_3, 1000)
772 .await
773 .expect("got event");
774 assert_eq!(got, Message { foo: 7, bar: 2 });
775
776 drop(sub_2);
777 drop(sub_3);
778
779 let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
780 matches!(m, StreamCtrl::Unsubscribe(_))
781 })
782 .await;
783
784 let mut sub_4 = consumer
786 .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
787 .expect("subscribe ok");
788
789 assert!(
790 recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
791 "Should have not receive any update"
792 );
793
794 drop(sub_4);
795
796 let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
798 }
799
800 #[tokio::test]
801 async fn falls_back_to_poll_when_stream_not_supported() {
802 let (transport, events_tx, _) = TestTransport::new(false, true);
804 let consumer = Consumer::new(transport, true, ());
807
808 let mut sub = consumer
810 .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
811 .expect("subscribe ok");
812
813 events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
815 let got = recv_next::<TestTransport>(&mut sub, 1500)
816 .await
817 .expect("event relayed via polling");
818 assert_eq!(got, Message { foo: 9, bar: 5 });
819 }
820
821 #[tokio::test]
822 async fn multiple_subscribers_share_single_remote_subscription() {
823 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
826 let consumer = Consumer::new(transport, false, ());
827
828 let mut a = consumer
830 .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
831 .expect("subscribe A");
832 let _ = expect_ctrl(
833 &mut ctrl_rx,
834 1000,
835 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(1)),
836 )
837 .await;
838
839 let mut b = consumer
840 .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
841 .expect("subscribe B");
842
843 if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
846 timeout(Duration::from_millis(400), ctrl_rx.recv()).await
847 {
848 assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
849 }
850
851 events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
853 let got_a = recv_next::<TestTransport>(&mut a, 1000)
854 .await
855 .expect("A got");
856 let got_b = recv_next::<TestTransport>(&mut b, 1000)
857 .await
858 .expect("B got");
859 assert_eq!(got_a, Message { foo: 1, bar: 42 });
860 assert_eq!(got_b, Message { foo: 1, bar: 42 });
861
862 drop(b);
864 if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
865 timeout(Duration::from_millis(400), ctrl_rx.recv()).await
866 {
867 panic!("Should NOT unsubscribe while another local subscriber exists");
868 }
869
870 drop(a);
872 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
873 matches!(m, StreamCtrl::Unsubscribe(_))
874 })
875 .await;
876
877 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
878 }
879}