cdk_common/pub_sub/
remote_consumer.rs

1//! Pub-sub consumer
2//!
3//! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
4use std::collections::{HashMap, VecDeque};
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::time::Duration;
8
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11use tokio::time::{sleep, Instant};
12
13use super::subscriber::{ActiveSubscription, SubscriptionRequest};
14use super::{Error, Event, Pubsub, Spec};
15use crate::task::spawn;
16
17const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
18
19const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
20
21const INTERNAL_POLL_SIZE: usize = 1_000;
22
23const POLL_SLEEP: Duration = Duration::from_millis(2_000);
24
25struct UniqueSubscription<S>
26where
27    S: Spec,
28{
29    name: S::SubscriptionId,
30    total_subscribers: usize,
31}
32
33type UniqueSubscriptions<S> = RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>;
34
35type ActiveSubscriptions<S> =
36    RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
37
38type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
39
40/// Subscription consumer
41pub struct Consumer<T>
42where
43    T: Transport + 'static,
44{
45    transport: T,
46    inner_pubsub: Arc<Pubsub<T::Spec>>,
47    remote_subscriptions: UniqueSubscriptions<T::Spec>,
48    subscriptions: ActiveSubscriptions<T::Spec>,
49    stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
50    still_running: AtomicBool,
51    prefer_polling: bool,
52    /// Cached events
53    ///
54    /// The cached events are useful to share events. The cache is automatically evicted it is
55    /// disconnected from the remote source, meaning the cache is only active while there is an
56    /// active subscription to the remote source, and it remembers the latest event.
57    cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
58}
59
60/// Remote consumer
61pub struct RemoteActiveConsumer<T>
62where
63    T: Transport + 'static,
64{
65    inner: ActiveSubscription<T::Spec>,
66    previous_messages: VecDeque<<T::Spec as Spec>::Event>,
67    consumer: Arc<Consumer<T>>,
68}
69
70impl<T> RemoteActiveConsumer<T>
71where
72    T: Transport + 'static,
73{
74    /// Receives the next event
75    pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
76        if let Some(event) = self.previous_messages.pop_front() {
77            Some(event)
78        } else {
79            self.inner.recv().await
80        }
81    }
82
83    /// Try receive an event or return Noen right away
84    pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
85        if let Some(event) = self.previous_messages.pop_front() {
86            Some(event)
87        } else {
88            self.inner.try_recv()
89        }
90    }
91
92    /// Get the subscription name
93    pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
94        self.inner.name()
95    }
96}
97
98impl<T> Drop for RemoteActiveConsumer<T>
99where
100    T: Transport + 'static,
101{
102    fn drop(&mut self) {
103        let _ = self.consumer.unsubscribe(self.name().clone());
104    }
105}
106
107/// Struct to relay events from Poll and Streams from the external subscription to the local
108/// subscribers
109pub struct InternalRelay<S>
110where
111    S: Spec + 'static,
112{
113    inner: Arc<Pubsub<S>>,
114    cached_events: Arc<RwLock<CacheEvent<S>>>,
115}
116
117impl<S> InternalRelay<S>
118where
119    S: Spec + 'static,
120{
121    /// Relay a remote event locally
122    pub fn send<X>(&self, event: X)
123    where
124        X: Into<S::Event>,
125    {
126        let event = event.into();
127        let mut cached_events = self.cached_events.write();
128
129        for topic in event.get_topics() {
130            cached_events.insert(topic, event.clone());
131        }
132
133        self.inner.publish(event);
134    }
135}
136
137impl<T> Consumer<T>
138where
139    T: Transport + 'static,
140{
141    /// Creates a new instance
142    pub fn new(
143        transport: T,
144        prefer_polling: bool,
145        context: <T::Spec as Spec>::Context,
146    ) -> Arc<Self> {
147        let this = Arc::new(Self {
148            transport,
149            prefer_polling,
150            inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
151            subscriptions: Default::default(),
152            remote_subscriptions: Default::default(),
153            stream_ctrl: RwLock::new(None),
154            cached_events: Default::default(),
155            still_running: true.into(),
156        });
157
158        spawn(Self::stream(this.clone()));
159
160        this
161    }
162
163    async fn stream(instance: Arc<Self>) {
164        let mut stream_supported = true;
165        let mut poll_supported = true;
166
167        let mut backoff = STREAM_CONNECTION_BACKOFF;
168        let mut retry_at = None;
169
170        loop {
171            if (!stream_supported && !poll_supported)
172                || !instance
173                    .still_running
174                    .load(std::sync::atomic::Ordering::Relaxed)
175            {
176                break;
177            }
178
179            if instance.remote_subscriptions.read().is_empty() {
180                sleep(Duration::from_millis(100)).await;
181                continue;
182            }
183
184            if stream_supported
185                && !instance.prefer_polling
186                && retry_at
187                    .map(|retry_at| retry_at < Instant::now())
188                    .unwrap_or(true)
189            {
190                let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
191
192                {
193                    *instance.stream_ctrl.write() = Some(sender);
194                }
195
196                let current_subscriptions = {
197                    instance
198                        .remote_subscriptions
199                        .read()
200                        .iter()
201                        .map(|(key, name)| (name.name.clone(), key.clone()))
202                        .collect::<Vec<_>>()
203                };
204
205                if let Err(err) = instance
206                    .transport
207                    .stream(
208                        receiver,
209                        current_subscriptions,
210                        InternalRelay {
211                            inner: instance.inner_pubsub.clone(),
212                            cached_events: instance.cached_events.clone(),
213                        },
214                    )
215                    .await
216                {
217                    retry_at = Some(Instant::now() + backoff);
218                    backoff =
219                        (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
220
221                    if matches!(err, Error::NotSupported) {
222                        stream_supported = false;
223                    }
224                    tracing::error!("Long connection failed with error {:?}", err);
225                } else {
226                    backoff = STREAM_CONNECTION_BACKOFF;
227                }
228
229                // remove sender to stream, as there is no stream
230                let _ = instance.stream_ctrl.write().take();
231            }
232
233            if poll_supported {
234                let current_subscriptions = {
235                    instance
236                        .remote_subscriptions
237                        .read()
238                        .iter()
239                        .map(|(key, name)| (name.name.clone(), key.clone()))
240                        .collect::<Vec<_>>()
241                };
242
243                if let Err(err) = instance
244                    .transport
245                    .poll(
246                        current_subscriptions,
247                        InternalRelay {
248                            inner: instance.inner_pubsub.clone(),
249                            cached_events: instance.cached_events.clone(),
250                        },
251                    )
252                    .await
253                {
254                    if matches!(err, Error::NotSupported) {
255                        poll_supported = false;
256                    }
257                    tracing::error!("Polling failed with error {:?}", err);
258                }
259
260                sleep(POLL_SLEEP).await;
261            }
262        }
263    }
264
265    /// Unsubscribe from a topic, this is called automatically when RemoteActiveSubscription<T> goes
266    /// out of scope
267    fn unsubscribe(
268        self: &Arc<Self>,
269        subscription_name: <T::Spec as Spec>::SubscriptionId,
270    ) -> Result<(), Error> {
271        let topics = self
272            .subscriptions
273            .write()
274            .remove(&subscription_name)
275            .ok_or(Error::NoSubscription)?;
276
277        let mut remote_subscriptions = self.remote_subscriptions.write();
278
279        for topic in topics {
280            let mut remote_subscription =
281                if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
282                    remote_subscription
283                } else {
284                    continue;
285                };
286
287            remote_subscription.total_subscribers = remote_subscription
288                .total_subscribers
289                .checked_sub(1)
290                .unwrap_or_default();
291
292            if remote_subscription.total_subscribers == 0 {
293                let mut cached_events = self.cached_events.write();
294
295                cached_events.remove(&topic);
296
297                self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
298            } else {
299                remote_subscriptions.insert(topic, remote_subscription);
300            }
301        }
302
303        if remote_subscriptions.is_empty() {
304            self.message_to_stream(StreamCtrl::Stop)?;
305        }
306
307        Ok(())
308    }
309
310    #[inline(always)]
311    fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
312        let to_stream = self.stream_ctrl.read();
313
314        if let Some(to_stream) = to_stream.as_ref() {
315            Ok(to_stream.try_send(message)?)
316        } else {
317            Ok(())
318        }
319    }
320
321    /// Creates a subscription
322    ///
323    /// The subscriptions have two parts:
324    ///
325    /// 1. Will create the subscription to the remote Pubsub service, Any events will be moved to
326    ///    the internal pubsub
327    ///
328    /// 2. The internal subscription to the inner Pubsub. Because all subscriptions are going the
329    ///    transport, once events matches subscriptions, the inner_pubsub will receive the message and
330    ///    broadcasat the event.
331    pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
332    where
333        I: SubscriptionRequest<
334            Topic = <T::Spec as Spec>::Topic,
335            SubscriptionId = <T::Spec as Spec>::SubscriptionId,
336        >,
337    {
338        let subscription_name = request.subscription_name();
339        let topics = request.try_get_topics()?;
340
341        let mut remote_subscriptions = self.remote_subscriptions.write();
342        let mut subscriptions = self.subscriptions.write();
343
344        if subscriptions.get(&subscription_name).is_some() {
345            return Err(Error::NoSubscription);
346        }
347
348        let mut previous_messages = Vec::new();
349        let cached_events = self.cached_events.read();
350
351        for topic in topics.iter() {
352            if let Some(subscription) = remote_subscriptions.get_mut(topic) {
353                subscription.total_subscribers += 1;
354
355                if let Some(v) = cached_events.get(topic).cloned() {
356                    previous_messages.push(v);
357                }
358            } else {
359                let internal_sub_name = self.transport.new_name();
360                remote_subscriptions.insert(
361                    topic.clone(),
362                    UniqueSubscription {
363                        total_subscribers: 1,
364                        name: internal_sub_name.clone(),
365                    },
366                );
367
368                // new subscription is created, so the connection worker should be notified
369                self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
370            }
371        }
372
373        subscriptions.insert(subscription_name, topics);
374        drop(subscriptions);
375
376        Ok(RemoteActiveConsumer {
377            inner: self.inner_pubsub.subscribe(request)?,
378            previous_messages: previous_messages.into(),
379            consumer: self.clone(),
380        })
381    }
382}
383
384impl<T> Drop for Consumer<T>
385where
386    T: Transport + 'static,
387{
388    fn drop(&mut self) {
389        self.still_running
390            .store(false, std::sync::atomic::Ordering::Release);
391        if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
392            let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
393                tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
394            });
395        }
396    }
397}
398
399/// Subscribe Message
400pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
401
402/// Messages sent from the [`Consumer`] to the [`Transport`] background loop.
403pub enum StreamCtrl<S>
404where
405    S: Spec + 'static,
406{
407    /// Add a subscription
408    Subscribe(SubscribeMessage<S>),
409    /// Desuscribe
410    Unsubscribe(S::SubscriptionId),
411    /// Exit the loop
412    Stop,
413}
414
415impl<S> Clone for StreamCtrl<S>
416where
417    S: Spec + 'static,
418{
419    fn clone(&self) -> Self {
420        match self {
421            Self::Subscribe(s) => Self::Subscribe(s.clone()),
422            Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
423            Self::Stop => Self::Stop,
424        }
425    }
426}
427
428/// Transport abstracts how the consumer talks to the remote pubsub.
429///
430/// Implement this on your HTTP/WebSocket client. The transport is responsible for:
431/// - creating unique subscription names,
432/// - keeping a long connection via `stream` **or** performing on-demand `poll`,
433/// - forwarding remote events to `InternalRelay`.
434///
435/// ```ignore
436/// struct WsTransport { /* ... */ }
437/// #[async_trait::async_trait]
438/// impl Transport for WsTransport {
439///     type Topic = MyTopic;
440///     fn new_name(&self) -> <Self::Topic as Topic>::SubscriptionName { 0 }
441///     async fn stream(/* ... */) -> Result<(), Error> { Ok(()) }
442///     async fn poll(/* ... */) -> Result<(), Error> { Ok(()) }
443/// }
444/// ```
445#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
446#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
447pub trait Transport: Send + Sync {
448    /// Spec
449    type Spec: Spec;
450
451    /// Create a new subscription name
452    fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
453
454    /// Opens a persistent connection and continuously streams events.
455    /// For protocols that support server push (e.g. WebSocket, SSE).
456    async fn stream(
457        &self,
458        subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
459        topics: Vec<SubscribeMessage<Self::Spec>>,
460        reply_to: InternalRelay<Self::Spec>,
461    ) -> Result<(), Error>;
462
463    /// Performs a one-shot fetch of any currently available events.
464    /// Called repeatedly by the consumer when streaming is not available.
465    async fn poll(
466        &self,
467        topics: Vec<SubscribeMessage<Self::Spec>>,
468        reply_to: InternalRelay<Self::Spec>,
469    ) -> Result<(), Error>;
470}
471
472#[cfg(test)]
473mod tests {
474    use std::sync::atomic::{AtomicUsize, Ordering};
475    use std::sync::Arc;
476
477    use tokio::sync::{mpsc, Mutex};
478    use tokio::time::{timeout, Duration};
479
480    use super::{
481        InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
482        INTERNAL_POLL_SIZE,
483    };
484    use crate::pub_sub::remote_consumer::Consumer;
485    use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
486    use crate::pub_sub::{Error, Spec, SubscriptionRequest};
487
488    // ===== Test Event/Topic types =====
489
490    #[derive(Clone, Debug)]
491    enum SubscriptionReq {
492        Foo(String, u64),
493        Bar(String, u64),
494    }
495
496    impl SubscriptionRequest for SubscriptionReq {
497        type Topic = IndexTest;
498
499        type SubscriptionId = String;
500
501        fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
502            Ok(vec![match self {
503                SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
504                SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
505            }])
506        }
507
508        fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
509            Arc::new(match self {
510                SubscriptionReq::Foo(n, _) => n.to_string(),
511                SubscriptionReq::Bar(n, _) => n.to_string(),
512            })
513        }
514    }
515
516    // ===== A controllable in-memory Transport used by tests =====
517
518    /// TestTransport relays messages from a broadcast channel to the Consumer via `InternalRelay`.
519    /// It also forwards Subscribe/Unsubscribe/Stop signals to an observer channel so tests can assert them.
520    struct TestTransport {
521        name_ctr: AtomicUsize,
522        // We forward all transport-loop control messages here so tests can observe them.
523        observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
524        // Whether stream / poll are supported.
525        support_long: bool,
526        support_poll: bool,
527        rx: Mutex<mpsc::Receiver<Message>>,
528    }
529
530    impl TestTransport {
531        fn new(
532            support_long: bool,
533            support_poll: bool,
534        ) -> (
535            Self,
536            mpsc::Sender<Message>,
537            mpsc::Receiver<StreamCtrl<CustomPubSub>>,
538        ) {
539            let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
540            let (observe_ctrl_tx, observe_ctrl_rx) =
541                mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
542
543            let t = TestTransport {
544                name_ctr: AtomicUsize::new(1),
545                rx: Mutex::new(rx),
546                observe_ctrl_tx,
547                support_long,
548                support_poll,
549            };
550
551            (t, events_tx, observe_ctrl_rx)
552        }
553    }
554
555    #[async_trait::async_trait]
556    impl Transport for TestTransport {
557        type Spec = CustomPubSub;
558
559        fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
560            format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
561        }
562
563        async fn stream(
564            &self,
565            mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
566            topics: Vec<SubscribeMessage<Self::Spec>>,
567            reply_to: InternalRelay<Self::Spec>,
568        ) -> Result<(), Error> {
569            if !self.support_long {
570                return Err(Error::NotSupported);
571            }
572
573            // Each invocation creates a fresh broadcast receiver
574            let mut rx = self.rx.lock().await;
575            let observe = self.observe_ctrl_tx.clone();
576
577            for topic in topics {
578                observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
579            }
580
581            loop {
582                tokio::select! {
583                    // Forward any control (Subscribe/Unsubscribe/Stop) messages so the test can assert them.
584                    Some(ctrl) = subscribe_changes.recv() => {
585                        observe.try_send(ctrl.clone()).unwrap();
586                        if matches!(ctrl, StreamCtrl::Stop) {
587                            break;
588                        }
589                    }
590                    // Relay external events into the inner pubsub
591                    Some(msg) = rx.recv() => {
592                        reply_to.send(msg);
593                    }
594                }
595            }
596
597            Ok(())
598        }
599
600        async fn poll(
601            &self,
602            _topics: Vec<SubscribeMessage<Self::Spec>>,
603            reply_to: InternalRelay<Self::Spec>,
604        ) -> Result<(), Error> {
605            if !self.support_poll {
606                return Err(Error::NotSupported);
607            }
608
609            // On each poll call, drain anything currently pending and return.
610            // (The Consumer calls this repeatedly; first call happens immediately.)
611            let mut rx = self.rx.lock().await;
612            // Non-blocking drain pass: try a few times without sleeping to keep tests snappy
613            for _ in 0..32 {
614                match rx.try_recv() {
615                    Ok(msg) => reply_to.send(msg),
616                    Err(mpsc::error::TryRecvError::Empty) => continue,
617                    Err(mpsc::error::TryRecvError::Disconnected) => break,
618                }
619            }
620            Ok(())
621        }
622    }
623
624    // ===== Helpers =====
625
626    async fn recv_next<T: Transport>(
627        sub: &mut RemoteActiveConsumer<T>,
628        dur_ms: u64,
629    ) -> Option<<T::Spec as Spec>::Event> {
630        timeout(Duration::from_millis(dur_ms), sub.recv())
631            .await
632            .ok()
633            .flatten()
634    }
635
636    async fn expect_ctrl(
637        rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
638        dur_ms: u64,
639        pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
640    ) -> StreamCtrl<CustomPubSub> {
641        timeout(Duration::from_millis(dur_ms), async {
642            loop {
643                if let Some(msg) = rx.recv().await {
644                    if pred(&msg) {
645                        break msg;
646                    }
647                }
648            }
649        })
650        .await
651        .expect("timed out waiting for control message")
652    }
653
654    // ===== Tests =====
655
656    #[tokio::test]
657    async fn stream_delivery_and_unsubscribe_on_drop() {
658        // stream supported, poll supported (doesn't matter; prefer long)
659        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
660
661        // prefer_polling = false so connection loop will try stream first.
662        let consumer = Consumer::new(transport, false, ());
663
664        // Subscribe to Foo(7)
665        let mut sub = consumer
666            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
667            .expect("subscribe ok");
668
669        // We should see a Subscribe(name, topic) forwarded to transport
670        let ctrl = expect_ctrl(
671            &mut ctrl_rx,
672            1000,
673            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
674        )
675        .await;
676        match ctrl {
677            StreamCtrl::Subscribe((name, idx)) => {
678                assert_ne!(name, "t".to_owned());
679                assert_eq!(idx, IndexTest::Foo(7));
680            }
681            _ => unreachable!(),
682        }
683
684        // Send an event that matches Foo(7)
685        events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
686        let got = recv_next::<TestTransport>(&mut sub, 1000)
687            .await
688            .expect("got event");
689        assert_eq!(got, Message { foo: 7, bar: 1 });
690
691        // Dropping the RemoteActiveConsumer should trigger an Unsubscribe(name)
692        drop(sub);
693        let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
694            matches!(m, StreamCtrl::Unsubscribe(_))
695        })
696        .await;
697
698        // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
699        drop(consumer);
700        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
701    }
702
703    #[tokio::test]
704    async fn test_cache_and_invalation() {
705        // stream supported, poll supported (doesn't matter; prefer long)
706        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
707
708        // prefer_polling = false so connection loop will try stream first.
709        let consumer = Consumer::new(transport, false, ());
710
711        // Subscribe to Foo(7)
712        let mut sub_1 = consumer
713            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
714            .expect("subscribe ok");
715
716        // We should see a Subscribe(name, topic) forwarded to transport
717        let ctrl = expect_ctrl(
718            &mut ctrl_rx,
719            1000,
720            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
721        )
722        .await;
723        match ctrl {
724            StreamCtrl::Subscribe((name, idx)) => {
725                assert_ne!(name, "t1".to_owned());
726                assert_eq!(idx, IndexTest::Foo(7));
727            }
728            _ => unreachable!(),
729        }
730
731        // Send an event that matches Foo(7)
732        events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
733        let got = recv_next::<TestTransport>(&mut sub_1, 1000)
734            .await
735            .expect("got event");
736        assert_eq!(got, Message { foo: 7, bar: 1 });
737
738        // Subscribe to Foo(7), should receive the latest message and future messages
739        let mut sub_2 = consumer
740            .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
741            .expect("subscribe ok");
742
743        let got = recv_next::<TestTransport>(&mut sub_2, 1000)
744            .await
745            .expect("got event");
746        assert_eq!(got, Message { foo: 7, bar: 1 });
747
748        // Dropping the RemoteActiveConsumer but not unsubscribe, since sub_2 is still active
749        drop(sub_1);
750
751        // Subscribe to Foo(7), should receive the latest message and future messages
752        let mut sub_3 = consumer
753            .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
754            .expect("subscribe ok");
755
756        // receive cache message
757        let got = recv_next::<TestTransport>(&mut sub_3, 1000)
758            .await
759            .expect("got event");
760        assert_eq!(got, Message { foo: 7, bar: 1 });
761
762        // Send an event that matches Foo(7)
763        events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
764
765        // receive new message
766        let got = recv_next::<TestTransport>(&mut sub_2, 1000)
767            .await
768            .expect("got event");
769        assert_eq!(got, Message { foo: 7, bar: 2 });
770
771        let got = recv_next::<TestTransport>(&mut sub_3, 1000)
772            .await
773            .expect("got event");
774        assert_eq!(got, Message { foo: 7, bar: 2 });
775
776        drop(sub_2);
777        drop(sub_3);
778
779        let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
780            matches!(m, StreamCtrl::Unsubscribe(_))
781        })
782        .await;
783
784        // The cache should be dropped, so no new messages
785        let mut sub_4 = consumer
786            .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
787            .expect("subscribe ok");
788
789        assert!(
790            recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
791            "Should have not receive any update"
792        );
793
794        drop(sub_4);
795
796        // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
797        let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
798    }
799
800    #[tokio::test]
801    async fn falls_back_to_poll_when_stream_not_supported() {
802        // stream NOT supported, poll supported
803        let (transport, events_tx, _) = TestTransport::new(false, true);
804        // prefer_polling = true nudges the connection loop to poll first, but even if it
805        // tried stream, our transport returns NotSupported and the loop will use poll.
806        let consumer = Consumer::new(transport, true, ());
807
808        // Subscribe to Bar(5)
809        let mut sub = consumer
810            .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
811            .expect("subscribe ok");
812
813        // Inject an event; the poll path should relay it on the first poll iteration
814        events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
815        let got = recv_next::<TestTransport>(&mut sub, 1500)
816            .await
817            .expect("event relayed via polling");
818        assert_eq!(got, Message { foo: 9, bar: 5 });
819    }
820
821    #[tokio::test]
822    async fn multiple_subscribers_share_single_remote_subscription() {
823        // This validates the "coalescing" behavior in Consumer::subscribe where multiple local
824        // subscribers to the same Topic should only create one remote subscription.
825        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
826        let consumer = Consumer::new(transport, false, ());
827
828        // Two local subscriptions to the SAME topic/name pair (different names)
829        let mut a = consumer
830            .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
831            .expect("subscribe A");
832        let _ = expect_ctrl(
833            &mut ctrl_rx,
834            1000,
835            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if  *idx == IndexTest::Foo(1)),
836        )
837        .await;
838
839        let mut b = consumer
840            .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
841            .expect("subscribe B");
842
843        // No second Subscribe should be forwarded for the same topic (coalesced).
844        // Give a little time; if one appears, we'll fail explicitly.
845        if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
846            timeout(Duration::from_millis(400), ctrl_rx.recv()).await
847        {
848            assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
849        }
850
851        // Send one event and ensure BOTH local subscribers receive it.
852        events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
853        let got_a = recv_next::<TestTransport>(&mut a, 1000)
854            .await
855            .expect("A got");
856        let got_b = recv_next::<TestTransport>(&mut b, 1000)
857            .await
858            .expect("B got");
859        assert_eq!(got_a, Message { foo: 1, bar: 42 });
860        assert_eq!(got_b, Message { foo: 1, bar: 42 });
861
862        // Drop B: no Unsubscribe should be sent yet (still one local subscriber).
863        drop(b);
864        if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
865            timeout(Duration::from_millis(400), ctrl_rx.recv()).await
866        {
867            panic!("Should NOT unsubscribe while another local subscriber exists");
868        }
869
870        // Drop A: now remote unsubscribe should occur.
871        drop(a);
872        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
873            matches!(m, StreamCtrl::Unsubscribe(_))
874        })
875        .await;
876
877        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
878    }
879}