1use std::collections::{HashMap, VecDeque};
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::time::Duration;
8
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11use tokio::time::{sleep, Instant};
12
13use super::subscriber::{ActiveSubscription, SubscriptionRequest};
14use super::{Error, Event, Pubsub, Spec};
15use crate::task::spawn;
16
17const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
18
19const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
20
21const INTERNAL_POLL_SIZE: usize = 1_000;
22
23const POLL_SLEEP: Duration = Duration::from_millis(2_000);
24
25struct UniqueSubscription<S>
26where
27 S: Spec,
28{
29 name: S::SubscriptionId,
30 total_subscribers: usize,
31}
32
33type UniqueSubscriptions<S> = Arc<RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>>;
34
35type ActiveSubscriptions<S> =
36 RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
37
38type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
39
40#[allow(missing_debug_implementations)]
42pub struct Consumer<T>
43where
44 T: Transport + 'static,
45{
46 transport: T,
47 inner_pubsub: Arc<Pubsub<T::Spec>>,
48 remote_subscriptions: UniqueSubscriptions<T::Spec>,
49 subscriptions: ActiveSubscriptions<T::Spec>,
50 stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
51 still_running: AtomicBool,
52 prefer_polling: bool,
53 cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
59}
60
61#[allow(missing_debug_implementations)]
63pub struct RemoteActiveConsumer<T>
64where
65 T: Transport + 'static,
66{
67 inner: ActiveSubscription<T::Spec>,
68 previous_messages: VecDeque<<T::Spec as Spec>::Event>,
69 consumer: Arc<Consumer<T>>,
70}
71
72impl<T> RemoteActiveConsumer<T>
73where
74 T: Transport + 'static,
75{
76 pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
78 if let Some(event) = self.previous_messages.pop_front() {
79 Some(event)
80 } else {
81 self.inner.recv().await
82 }
83 }
84
85 pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
87 if let Some(event) = self.previous_messages.pop_front() {
88 Some(event)
89 } else {
90 self.inner.try_recv()
91 }
92 }
93
94 pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
96 self.inner.name()
97 }
98}
99
100impl<T> Drop for RemoteActiveConsumer<T>
101where
102 T: Transport + 'static,
103{
104 fn drop(&mut self) {
105 let _ = self.consumer.unsubscribe(self.name().clone());
106 }
107}
108
109#[allow(missing_debug_implementations)]
112pub struct InternalRelay<S>
113where
114 S: Spec + 'static,
115{
116 inner: Arc<Pubsub<S>>,
117 remote_subscriptions: UniqueSubscriptions<S>,
118 cached_events: Arc<RwLock<CacheEvent<S>>>,
119}
120
121impl<S> InternalRelay<S>
122where
123 S: Spec + 'static,
124{
125 pub fn send<X>(&self, event: X)
127 where
128 X: Into<S::Event>,
129 {
130 let event = event.into();
131
132 {
133 let active_topics = self.remote_subscriptions.read();
134 let mut cached_events = self.cached_events.write();
135
136 for topic in event.get_topics() {
137 if active_topics.contains_key(&topic) {
138 cached_events.insert(topic, event.clone());
139 }
140 }
141 }
142
143 self.inner.publish(event);
144 }
145}
146
147impl<T> Consumer<T>
148where
149 T: Transport + 'static,
150{
151 pub fn new(
153 transport: T,
154 prefer_polling: bool,
155 context: <T::Spec as Spec>::Context,
156 ) -> Arc<Self> {
157 let this = Arc::new(Self {
158 transport,
159 prefer_polling,
160 inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
161 subscriptions: Default::default(),
162 remote_subscriptions: Default::default(),
163 stream_ctrl: RwLock::new(None),
164 cached_events: Default::default(),
165 still_running: true.into(),
166 });
167
168 spawn(Self::stream(this.clone()));
169
170 this
171 }
172
173 async fn stream(instance: Arc<Self>) {
174 let mut stream_supported = true;
175 let mut poll_supported = true;
176
177 let mut backoff = STREAM_CONNECTION_BACKOFF;
178 let mut retry_at = None;
179
180 loop {
181 if (!stream_supported && !poll_supported)
182 || !instance
183 .still_running
184 .load(std::sync::atomic::Ordering::Relaxed)
185 {
186 break;
187 }
188
189 if instance.remote_subscriptions.read().is_empty() {
190 sleep(Duration::from_millis(100)).await;
191 continue;
192 }
193
194 if stream_supported
195 && !instance.prefer_polling
196 && retry_at
197 .map(|retry_at| retry_at < Instant::now())
198 .unwrap_or(true)
199 {
200 let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
201
202 {
203 *instance.stream_ctrl.write() = Some(sender);
204 }
205
206 let current_subscriptions = {
207 instance
208 .remote_subscriptions
209 .read()
210 .iter()
211 .map(|(key, name)| (name.name.clone(), key.clone()))
212 .collect::<Vec<_>>()
213 };
214
215 if let Err(err) = instance
216 .transport
217 .stream(
218 receiver,
219 current_subscriptions,
220 InternalRelay {
221 inner: instance.inner_pubsub.clone(),
222 remote_subscriptions: instance.remote_subscriptions.clone(),
223 cached_events: instance.cached_events.clone(),
224 },
225 )
226 .await
227 {
228 retry_at = Some(Instant::now() + backoff);
229 backoff =
230 (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
231
232 if matches!(err, Error::NotSupported) {
233 stream_supported = false;
234 }
235 tracing::error!("Long connection failed with error {:?}", err);
236 } else {
237 backoff = STREAM_CONNECTION_BACKOFF;
238 }
239
240 let _ = instance.stream_ctrl.write().take();
242 }
243
244 if poll_supported {
245 let current_subscriptions = {
246 instance
247 .remote_subscriptions
248 .read()
249 .iter()
250 .map(|(key, name)| (name.name.clone(), key.clone()))
251 .collect::<Vec<_>>()
252 };
253
254 if let Err(err) = instance
255 .transport
256 .poll(
257 current_subscriptions,
258 InternalRelay {
259 inner: instance.inner_pubsub.clone(),
260 remote_subscriptions: instance.remote_subscriptions.clone(),
261 cached_events: instance.cached_events.clone(),
262 },
263 )
264 .await
265 {
266 if matches!(err, Error::NotSupported) {
267 poll_supported = false;
268 }
269 tracing::error!("Polling failed with error {:?}", err);
270 }
271
272 sleep(POLL_SLEEP).await;
273 }
274 }
275 }
276
277 fn unsubscribe(
280 self: &Arc<Self>,
281 subscription_name: <T::Spec as Spec>::SubscriptionId,
282 ) -> Result<(), Error> {
283 let topics = self
284 .subscriptions
285 .write()
286 .remove(&subscription_name)
287 .ok_or(Error::NoSubscription)?;
288
289 let mut remote_subscriptions = self.remote_subscriptions.write();
290
291 for topic in topics {
292 let mut remote_subscription =
293 if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
294 remote_subscription
295 } else {
296 continue;
297 };
298
299 remote_subscription.total_subscribers =
300 remote_subscription.total_subscribers.saturating_sub(1);
301
302 if remote_subscription.total_subscribers == 0 {
303 let mut cached_events = self.cached_events.write();
304
305 cached_events.remove(&topic);
306
307 self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
308 } else {
309 remote_subscriptions.insert(topic, remote_subscription);
310 }
311 }
312
313 if remote_subscriptions.is_empty() {
314 self.cached_events.write().clear();
315 self.message_to_stream(StreamCtrl::Stop)?;
316 }
317
318 Ok(())
319 }
320
321 #[inline(always)]
322 fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
323 let to_stream = self.stream_ctrl.read();
324
325 if let Some(to_stream) = to_stream.as_ref() {
326 Ok(to_stream.try_send(message)?)
327 } else {
328 Ok(())
329 }
330 }
331
332 pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
343 where
344 I: SubscriptionRequest<
345 Topic = <T::Spec as Spec>::Topic,
346 SubscriptionId = <T::Spec as Spec>::SubscriptionId,
347 >,
348 {
349 let subscription_name = request.subscription_name();
350 let topics = request.try_get_topics()?;
351
352 let mut remote_subscriptions = self.remote_subscriptions.write();
353 let mut subscriptions = self.subscriptions.write();
354
355 if subscriptions.get(&subscription_name).is_some() {
356 return Err(Error::NoSubscription);
357 }
358
359 let mut previous_messages = Vec::new();
360 let cached_events = self.cached_events.read();
361
362 for topic in topics.iter() {
363 if let Some(subscription) = remote_subscriptions.get_mut(topic) {
364 subscription.total_subscribers += 1;
365
366 if let Some(v) = cached_events.get(topic).cloned() {
367 previous_messages.push(v);
368 }
369 } else {
370 let internal_sub_name = self.transport.new_name();
371 remote_subscriptions.insert(
372 topic.clone(),
373 UniqueSubscription {
374 total_subscribers: 1,
375 name: internal_sub_name.clone(),
376 },
377 );
378
379 self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
381 }
382 }
383
384 subscriptions.insert(subscription_name, topics);
385 drop(subscriptions);
386
387 Ok(RemoteActiveConsumer {
388 inner: self.inner_pubsub.subscribe(request)?,
389 previous_messages: previous_messages.into(),
390 consumer: self.clone(),
391 })
392 }
393}
394
395impl<T> Drop for Consumer<T>
396where
397 T: Transport + 'static,
398{
399 fn drop(&mut self) {
400 self.still_running
401 .store(false, std::sync::atomic::Ordering::Release);
402 if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
403 let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
404 tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
405 });
406 }
407 }
408}
409
410pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
412
413#[allow(missing_debug_implementations)]
415pub enum StreamCtrl<S>
416where
417 S: Spec + 'static,
418{
419 Subscribe(SubscribeMessage<S>),
421 Unsubscribe(S::SubscriptionId),
423 Stop,
425}
426
427impl<S> Clone for StreamCtrl<S>
428where
429 S: Spec + 'static,
430{
431 fn clone(&self) -> Self {
432 match self {
433 Self::Subscribe(s) => Self::Subscribe(s.clone()),
434 Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
435 Self::Stop => Self::Stop,
436 }
437 }
438}
439
440#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
458#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
459pub trait Transport: Send + Sync {
460 type Spec: Spec;
462
463 fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
465
466 async fn stream(
469 &self,
470 subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
471 topics: Vec<SubscribeMessage<Self::Spec>>,
472 reply_to: InternalRelay<Self::Spec>,
473 ) -> Result<(), Error>;
474
475 async fn poll(
478 &self,
479 topics: Vec<SubscribeMessage<Self::Spec>>,
480 reply_to: InternalRelay<Self::Spec>,
481 ) -> Result<(), Error>;
482}
483
484#[cfg(test)]
485mod tests {
486 use std::sync::atomic::{AtomicUsize, Ordering};
487 use std::sync::Arc;
488
489 use tokio::sync::{mpsc, Mutex};
490 use tokio::time::{timeout, Duration};
491
492 use super::{
493 InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
494 INTERNAL_POLL_SIZE,
495 };
496 use crate::pub_sub::remote_consumer::Consumer;
497 use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
498 use crate::pub_sub::{Error, Spec, SubscriptionRequest};
499
500 #[derive(Clone, Debug)]
503 enum SubscriptionReq {
504 Foo(String, u64),
505 Bar(String, u64),
506 }
507
508 impl SubscriptionRequest for SubscriptionReq {
509 type Topic = IndexTest;
510
511 type SubscriptionId = String;
512
513 fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
514 Ok(vec![match self {
515 SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
516 SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
517 }])
518 }
519
520 fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
521 Arc::new(match self {
522 SubscriptionReq::Foo(n, _) => n.to_string(),
523 SubscriptionReq::Bar(n, _) => n.to_string(),
524 })
525 }
526 }
527
528 struct TestTransport {
533 name_ctr: AtomicUsize,
534 observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
536 support_long: bool,
538 support_poll: bool,
539 rx: Mutex<mpsc::Receiver<Message>>,
540 }
541
542 impl TestTransport {
543 fn new(
544 support_long: bool,
545 support_poll: bool,
546 ) -> (
547 Self,
548 mpsc::Sender<Message>,
549 mpsc::Receiver<StreamCtrl<CustomPubSub>>,
550 ) {
551 let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
552 let (observe_ctrl_tx, observe_ctrl_rx) =
553 mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
554
555 let t = TestTransport {
556 name_ctr: AtomicUsize::new(1),
557 rx: Mutex::new(rx),
558 observe_ctrl_tx,
559 support_long,
560 support_poll,
561 };
562
563 (t, events_tx, observe_ctrl_rx)
564 }
565 }
566
567 #[async_trait::async_trait]
568 impl Transport for TestTransport {
569 type Spec = CustomPubSub;
570
571 fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
572 format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
573 }
574
575 async fn stream(
576 &self,
577 mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
578 topics: Vec<SubscribeMessage<Self::Spec>>,
579 reply_to: InternalRelay<Self::Spec>,
580 ) -> Result<(), Error> {
581 if !self.support_long {
582 return Err(Error::NotSupported);
583 }
584
585 let mut rx = self.rx.lock().await;
587 let observe = self.observe_ctrl_tx.clone();
588
589 for topic in topics {
590 observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
591 }
592
593 loop {
594 tokio::select! {
595 Some(ctrl) = subscribe_changes.recv() => {
597 observe.try_send(ctrl.clone()).unwrap();
598 if matches!(ctrl, StreamCtrl::Stop) {
599 break;
600 }
601 }
602 Some(msg) = rx.recv() => {
604 reply_to.send(msg);
605 }
606 }
607 }
608
609 Ok(())
610 }
611
612 async fn poll(
613 &self,
614 _topics: Vec<SubscribeMessage<Self::Spec>>,
615 reply_to: InternalRelay<Self::Spec>,
616 ) -> Result<(), Error> {
617 if !self.support_poll {
618 return Err(Error::NotSupported);
619 }
620
621 let mut rx = self.rx.lock().await;
624 for _ in 0..32 {
626 match rx.try_recv() {
627 Ok(msg) => reply_to.send(msg),
628 Err(mpsc::error::TryRecvError::Empty) => continue,
629 Err(mpsc::error::TryRecvError::Disconnected) => break,
630 }
631 }
632 Ok(())
633 }
634 }
635
636 async fn recv_next<T: Transport>(
639 sub: &mut RemoteActiveConsumer<T>,
640 dur_ms: u64,
641 ) -> Option<<T::Spec as Spec>::Event> {
642 timeout(Duration::from_millis(dur_ms), sub.recv())
643 .await
644 .ok()
645 .flatten()
646 }
647
648 async fn expect_ctrl(
649 rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
650 dur_ms: u64,
651 pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
652 ) -> StreamCtrl<CustomPubSub> {
653 timeout(Duration::from_millis(dur_ms), async {
654 loop {
655 if let Some(msg) = rx.recv().await {
656 if pred(&msg) {
657 break msg;
658 }
659 }
660 }
661 })
662 .await
663 .expect("timed out waiting for control message")
664 }
665
666 #[tokio::test]
669 async fn stream_delivery_and_unsubscribe_on_drop() {
670 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
672
673 let consumer = Consumer::new(transport, false, ());
675
676 let mut sub = consumer
678 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
679 .expect("subscribe ok");
680
681 let ctrl = expect_ctrl(
683 &mut ctrl_rx,
684 1000,
685 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
686 )
687 .await;
688 match ctrl {
689 StreamCtrl::Subscribe((name, idx)) => {
690 assert_ne!(name, "t".to_owned());
691 assert_eq!(idx, IndexTest::Foo(7));
692 }
693 _ => unreachable!(),
694 }
695
696 events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
698 let got = recv_next::<TestTransport>(&mut sub, 1000)
699 .await
700 .expect("got event");
701 assert_eq!(got, Message { foo: 7, bar: 1 });
702
703 drop(sub);
705 let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
706 matches!(m, StreamCtrl::Unsubscribe(_))
707 })
708 .await;
709
710 drop(consumer);
712 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
713 }
714
715 #[tokio::test]
716 async fn test_cache_and_invalation() {
717 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
719
720 let consumer = Consumer::new(transport, false, ());
722
723 let mut sub_1 = consumer
725 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
726 .expect("subscribe ok");
727
728 let ctrl = expect_ctrl(
730 &mut ctrl_rx,
731 1000,
732 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
733 )
734 .await;
735 match ctrl {
736 StreamCtrl::Subscribe((name, idx)) => {
737 assert_ne!(name, "t1".to_owned());
738 assert_eq!(idx, IndexTest::Foo(7));
739 }
740 _ => unreachable!(),
741 }
742
743 events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
745 let got = recv_next::<TestTransport>(&mut sub_1, 1000)
746 .await
747 .expect("got event");
748 assert_eq!(got, Message { foo: 7, bar: 1 });
749
750 let mut sub_2 = consumer
752 .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
753 .expect("subscribe ok");
754
755 let got = recv_next::<TestTransport>(&mut sub_2, 1000)
756 .await
757 .expect("got event");
758 assert_eq!(got, Message { foo: 7, bar: 1 });
759
760 drop(sub_1);
762
763 let mut sub_3 = consumer
765 .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
766 .expect("subscribe ok");
767
768 let got = recv_next::<TestTransport>(&mut sub_3, 1000)
770 .await
771 .expect("got event");
772 assert_eq!(got, Message { foo: 7, bar: 1 });
773
774 events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
776
777 let got = recv_next::<TestTransport>(&mut sub_2, 1000)
779 .await
780 .expect("got event");
781 assert_eq!(got, Message { foo: 7, bar: 2 });
782
783 let got = recv_next::<TestTransport>(&mut sub_3, 1000)
784 .await
785 .expect("got event");
786 assert_eq!(got, Message { foo: 7, bar: 2 });
787
788 drop(sub_2);
789 drop(sub_3);
790
791 let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
792 matches!(m, StreamCtrl::Unsubscribe(_))
793 })
794 .await;
795
796 let mut sub_4 = consumer
798 .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
799 .expect("subscribe ok");
800
801 assert!(
802 recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
803 "Should have not receive any update"
804 );
805
806 drop(sub_4);
807
808 let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
810 }
811
812 #[tokio::test]
813 async fn cache_ignores_orphan_event_topics() {
814 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
815 let consumer = Consumer::new(transport, false, ());
816
817 let mut sub = consumer
818 .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
819 .expect("subscribe ok");
820 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
821 matches!(m, StreamCtrl::Subscribe(_))
822 })
823 .await;
824
825 events_tx.send(Message { foo: 7, bar: 99 }).await.unwrap();
826 let _ = recv_next::<TestTransport>(&mut sub, 1000)
827 .await
828 .expect("got event");
829
830 drop(sub);
831 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
832 matches!(m, StreamCtrl::Unsubscribe(_))
833 })
834 .await;
835
836 let cache = consumer.cached_events.read();
837 assert!(
838 cache.is_empty(),
839 "cache leaked entries for orphan topics: {:?}",
840 cache.keys().collect::<Vec<_>>()
841 );
842 }
843
844 #[tokio::test]
845 async fn falls_back_to_poll_when_stream_not_supported() {
846 let (transport, events_tx, _) = TestTransport::new(false, true);
848 let consumer = Consumer::new(transport, true, ());
851
852 let mut sub = consumer
854 .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
855 .expect("subscribe ok");
856
857 events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
859 let got = recv_next::<TestTransport>(&mut sub, 1500)
860 .await
861 .expect("event relayed via polling");
862 assert_eq!(got, Message { foo: 9, bar: 5 });
863 }
864
865 #[tokio::test]
866 async fn multiple_subscribers_share_single_remote_subscription() {
867 let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
870 let consumer = Consumer::new(transport, false, ());
871
872 let mut a = consumer
874 .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
875 .expect("subscribe A");
876 let _ = expect_ctrl(
877 &mut ctrl_rx,
878 1000,
879 |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(1)),
880 )
881 .await;
882
883 let mut b = consumer
884 .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
885 .expect("subscribe B");
886
887 if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
890 timeout(Duration::from_millis(400), ctrl_rx.recv()).await
891 {
892 assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
893 }
894
895 events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
897 let got_a = recv_next::<TestTransport>(&mut a, 1000)
898 .await
899 .expect("A got");
900 let got_b = recv_next::<TestTransport>(&mut b, 1000)
901 .await
902 .expect("B got");
903 assert_eq!(got_a, Message { foo: 1, bar: 42 });
904 assert_eq!(got_b, Message { foo: 1, bar: 42 });
905
906 drop(b);
908 if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
909 timeout(Duration::from_millis(400), ctrl_rx.recv()).await
910 {
911 panic!("Should NOT unsubscribe while another local subscriber exists");
912 }
913
914 drop(a);
916 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
917 matches!(m, StreamCtrl::Unsubscribe(_))
918 })
919 .await;
920
921 let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
922 }
923}