Skip to main content

cdk_common/pub_sub/
remote_consumer.rs

1//! Pub-sub consumer
2//!
3//! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
4use std::collections::{HashMap, VecDeque};
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::time::Duration;
8
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11use tokio::time::{sleep, Instant};
12
13use super::subscriber::{ActiveSubscription, SubscriptionRequest};
14use super::{Error, Event, Pubsub, Spec};
15use crate::task::spawn;
16
17const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
18
19const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
20
21const INTERNAL_POLL_SIZE: usize = 1_000;
22
23const POLL_SLEEP: Duration = Duration::from_millis(2_000);
24
25struct UniqueSubscription<S>
26where
27    S: Spec,
28{
29    name: S::SubscriptionId,
30    total_subscribers: usize,
31}
32
33type UniqueSubscriptions<S> = Arc<RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>>;
34
35type ActiveSubscriptions<S> =
36    RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
37
38type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
39
40/// Subscription consumer
41#[allow(missing_debug_implementations)]
42pub struct Consumer<T>
43where
44    T: Transport + 'static,
45{
46    transport: T,
47    inner_pubsub: Arc<Pubsub<T::Spec>>,
48    remote_subscriptions: UniqueSubscriptions<T::Spec>,
49    subscriptions: ActiveSubscriptions<T::Spec>,
50    stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
51    still_running: AtomicBool,
52    prefer_polling: bool,
53    /// Cached events
54    ///
55    /// The cached events are useful to share events. The cache is automatically evicted it is
56    /// disconnected from the remote source, meaning the cache is only active while there is an
57    /// active subscription to the remote source, and it remembers the latest event.
58    cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
59}
60
61/// Remote consumer
62#[allow(missing_debug_implementations)]
63pub struct RemoteActiveConsumer<T>
64where
65    T: Transport + 'static,
66{
67    inner: ActiveSubscription<T::Spec>,
68    previous_messages: VecDeque<<T::Spec as Spec>::Event>,
69    consumer: Arc<Consumer<T>>,
70}
71
72impl<T> RemoteActiveConsumer<T>
73where
74    T: Transport + 'static,
75{
76    /// Receives the next event
77    pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
78        if let Some(event) = self.previous_messages.pop_front() {
79            Some(event)
80        } else {
81            self.inner.recv().await
82        }
83    }
84
85    /// Try receive an event or return None right away
86    pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
87        if let Some(event) = self.previous_messages.pop_front() {
88            Some(event)
89        } else {
90            self.inner.try_recv()
91        }
92    }
93
94    /// Get the subscription name
95    pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
96        self.inner.name()
97    }
98}
99
100impl<T> Drop for RemoteActiveConsumer<T>
101where
102    T: Transport + 'static,
103{
104    fn drop(&mut self) {
105        let _ = self.consumer.unsubscribe(self.name().clone());
106    }
107}
108
109/// Struct to relay events from Poll and Streams from the external subscription to the local
110/// subscribers
111#[allow(missing_debug_implementations)]
112pub struct InternalRelay<S>
113where
114    S: Spec + 'static,
115{
116    inner: Arc<Pubsub<S>>,
117    remote_subscriptions: UniqueSubscriptions<S>,
118    cached_events: Arc<RwLock<CacheEvent<S>>>,
119}
120
121impl<S> InternalRelay<S>
122where
123    S: Spec + 'static,
124{
125    /// Relay a remote event locally
126    pub fn send<X>(&self, event: X)
127    where
128        X: Into<S::Event>,
129    {
130        let event = event.into();
131
132        {
133            let active_topics = self.remote_subscriptions.read();
134            let mut cached_events = self.cached_events.write();
135
136            for topic in event.get_topics() {
137                if active_topics.contains_key(&topic) {
138                    cached_events.insert(topic, event.clone());
139                }
140            }
141        }
142
143        self.inner.publish(event);
144    }
145}
146
147impl<T> Consumer<T>
148where
149    T: Transport + 'static,
150{
151    /// Creates a new instance
152    pub fn new(
153        transport: T,
154        prefer_polling: bool,
155        context: <T::Spec as Spec>::Context,
156    ) -> Arc<Self> {
157        let this = Arc::new(Self {
158            transport,
159            prefer_polling,
160            inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
161            subscriptions: Default::default(),
162            remote_subscriptions: Default::default(),
163            stream_ctrl: RwLock::new(None),
164            cached_events: Default::default(),
165            still_running: true.into(),
166        });
167
168        spawn(Self::stream(this.clone()));
169
170        this
171    }
172
173    async fn stream(instance: Arc<Self>) {
174        let mut stream_supported = true;
175        let mut poll_supported = true;
176
177        let mut backoff = STREAM_CONNECTION_BACKOFF;
178        let mut retry_at = None;
179
180        loop {
181            if (!stream_supported && !poll_supported)
182                || !instance
183                    .still_running
184                    .load(std::sync::atomic::Ordering::Relaxed)
185            {
186                break;
187            }
188
189            if instance.remote_subscriptions.read().is_empty() {
190                sleep(Duration::from_millis(100)).await;
191                continue;
192            }
193
194            if stream_supported
195                && !instance.prefer_polling
196                && retry_at
197                    .map(|retry_at| retry_at < Instant::now())
198                    .unwrap_or(true)
199            {
200                let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
201
202                {
203                    *instance.stream_ctrl.write() = Some(sender);
204                }
205
206                let current_subscriptions = {
207                    instance
208                        .remote_subscriptions
209                        .read()
210                        .iter()
211                        .map(|(key, name)| (name.name.clone(), key.clone()))
212                        .collect::<Vec<_>>()
213                };
214
215                if let Err(err) = instance
216                    .transport
217                    .stream(
218                        receiver,
219                        current_subscriptions,
220                        InternalRelay {
221                            inner: instance.inner_pubsub.clone(),
222                            remote_subscriptions: instance.remote_subscriptions.clone(),
223                            cached_events: instance.cached_events.clone(),
224                        },
225                    )
226                    .await
227                {
228                    retry_at = Some(Instant::now() + backoff);
229                    backoff =
230                        (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
231
232                    if matches!(err, Error::NotSupported) {
233                        stream_supported = false;
234                    }
235                    tracing::error!("Long connection failed with error {:?}", err);
236                } else {
237                    backoff = STREAM_CONNECTION_BACKOFF;
238                }
239
240                // remove sender to stream, as there is no stream
241                let _ = instance.stream_ctrl.write().take();
242            }
243
244            if poll_supported {
245                let current_subscriptions = {
246                    instance
247                        .remote_subscriptions
248                        .read()
249                        .iter()
250                        .map(|(key, name)| (name.name.clone(), key.clone()))
251                        .collect::<Vec<_>>()
252                };
253
254                if let Err(err) = instance
255                    .transport
256                    .poll(
257                        current_subscriptions,
258                        InternalRelay {
259                            inner: instance.inner_pubsub.clone(),
260                            remote_subscriptions: instance.remote_subscriptions.clone(),
261                            cached_events: instance.cached_events.clone(),
262                        },
263                    )
264                    .await
265                {
266                    if matches!(err, Error::NotSupported) {
267                        poll_supported = false;
268                    }
269                    tracing::error!("Polling failed with error {:?}", err);
270                }
271
272                sleep(POLL_SLEEP).await;
273            }
274        }
275    }
276
277    /// Unsubscribe from a topic, this is called automatically when RemoteActiveSubscription<T> goes
278    /// out of scope
279    fn unsubscribe(
280        self: &Arc<Self>,
281        subscription_name: <T::Spec as Spec>::SubscriptionId,
282    ) -> Result<(), Error> {
283        let topics = self
284            .subscriptions
285            .write()
286            .remove(&subscription_name)
287            .ok_or(Error::NoSubscription)?;
288
289        let mut remote_subscriptions = self.remote_subscriptions.write();
290
291        for topic in topics {
292            let mut remote_subscription =
293                if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
294                    remote_subscription
295                } else {
296                    continue;
297                };
298
299            remote_subscription.total_subscribers =
300                remote_subscription.total_subscribers.saturating_sub(1);
301
302            if remote_subscription.total_subscribers == 0 {
303                let mut cached_events = self.cached_events.write();
304
305                cached_events.remove(&topic);
306
307                self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
308            } else {
309                remote_subscriptions.insert(topic, remote_subscription);
310            }
311        }
312
313        if remote_subscriptions.is_empty() {
314            self.cached_events.write().clear();
315            self.message_to_stream(StreamCtrl::Stop)?;
316        }
317
318        Ok(())
319    }
320
321    #[inline(always)]
322    fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
323        let to_stream = self.stream_ctrl.read();
324
325        if let Some(to_stream) = to_stream.as_ref() {
326            Ok(to_stream.try_send(message)?)
327        } else {
328            Ok(())
329        }
330    }
331
332    /// Creates a subscription
333    ///
334    /// The subscriptions have two parts:
335    ///
336    /// 1. Will create the subscription to the remote Pubsub service, Any events will be moved to
337    ///    the internal pubsub
338    ///
339    /// 2. The internal subscription to the inner Pubsub. Because all subscriptions are going the
340    ///    transport, once events matches subscriptions, the inner_pubsub will receive the message and
341    ///    broadcasat the event.
342    pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
343    where
344        I: SubscriptionRequest<
345            Topic = <T::Spec as Spec>::Topic,
346            SubscriptionId = <T::Spec as Spec>::SubscriptionId,
347        >,
348    {
349        let subscription_name = request.subscription_name();
350        let topics = request.try_get_topics()?;
351
352        let mut remote_subscriptions = self.remote_subscriptions.write();
353        let mut subscriptions = self.subscriptions.write();
354
355        if subscriptions.get(&subscription_name).is_some() {
356            return Err(Error::NoSubscription);
357        }
358
359        let mut previous_messages = Vec::new();
360        let cached_events = self.cached_events.read();
361
362        for topic in topics.iter() {
363            if let Some(subscription) = remote_subscriptions.get_mut(topic) {
364                subscription.total_subscribers += 1;
365
366                if let Some(v) = cached_events.get(topic).cloned() {
367                    previous_messages.push(v);
368                }
369            } else {
370                let internal_sub_name = self.transport.new_name();
371                remote_subscriptions.insert(
372                    topic.clone(),
373                    UniqueSubscription {
374                        total_subscribers: 1,
375                        name: internal_sub_name.clone(),
376                    },
377                );
378
379                // new subscription is created, so the connection worker should be notified
380                self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
381            }
382        }
383
384        subscriptions.insert(subscription_name, topics);
385        drop(subscriptions);
386
387        Ok(RemoteActiveConsumer {
388            inner: self.inner_pubsub.subscribe(request)?,
389            previous_messages: previous_messages.into(),
390            consumer: self.clone(),
391        })
392    }
393}
394
395impl<T> Drop for Consumer<T>
396where
397    T: Transport + 'static,
398{
399    fn drop(&mut self) {
400        self.still_running
401            .store(false, std::sync::atomic::Ordering::Release);
402        if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
403            let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
404                tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
405            });
406        }
407    }
408}
409
410/// Subscribe Message
411pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
412
413/// Messages sent from the [`Consumer`] to the [`Transport`] background loop.
414#[allow(missing_debug_implementations)]
415pub enum StreamCtrl<S>
416where
417    S: Spec + 'static,
418{
419    /// Add a subscription
420    Subscribe(SubscribeMessage<S>),
421    /// Desuscribe
422    Unsubscribe(S::SubscriptionId),
423    /// Exit the loop
424    Stop,
425}
426
427impl<S> Clone for StreamCtrl<S>
428where
429    S: Spec + 'static,
430{
431    fn clone(&self) -> Self {
432        match self {
433            Self::Subscribe(s) => Self::Subscribe(s.clone()),
434            Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
435            Self::Stop => Self::Stop,
436        }
437    }
438}
439
440/// Transport abstracts how the consumer talks to the remote pubsub.
441///
442/// Implement this on your HTTP/WebSocket client. The transport is responsible for:
443/// - creating unique subscription names,
444/// - keeping a long connection via `stream` **or** performing on-demand `poll`,
445/// - forwarding remote events to `InternalRelay`.
446///
447/// ```ignore
448/// struct WsTransport { /* ... */ }
449/// #[async_trait::async_trait]
450/// impl Transport for WsTransport {
451///     type Topic = MyTopic;
452///     fn new_name(&self) -> <Self::Topic as Topic>::SubscriptionName { 0 }
453///     async fn stream(/* ... */) -> Result<(), Error> { Ok(()) }
454///     async fn poll(/* ... */) -> Result<(), Error> { Ok(()) }
455/// }
456/// ```
457#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
458#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
459pub trait Transport: Send + Sync {
460    /// Spec
461    type Spec: Spec;
462
463    /// Create a new subscription name
464    fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
465
466    /// Opens a persistent connection and continuously streams events.
467    /// For protocols that support server push (e.g. WebSocket, SSE).
468    async fn stream(
469        &self,
470        subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
471        topics: Vec<SubscribeMessage<Self::Spec>>,
472        reply_to: InternalRelay<Self::Spec>,
473    ) -> Result<(), Error>;
474
475    /// Performs a one-shot fetch of any currently available events.
476    /// Called repeatedly by the consumer when streaming is not available.
477    async fn poll(
478        &self,
479        topics: Vec<SubscribeMessage<Self::Spec>>,
480        reply_to: InternalRelay<Self::Spec>,
481    ) -> Result<(), Error>;
482}
483
484#[cfg(test)]
485mod tests {
486    use std::sync::atomic::{AtomicUsize, Ordering};
487    use std::sync::Arc;
488
489    use tokio::sync::{mpsc, Mutex};
490    use tokio::time::{timeout, Duration};
491
492    use super::{
493        InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
494        INTERNAL_POLL_SIZE,
495    };
496    use crate::pub_sub::remote_consumer::Consumer;
497    use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
498    use crate::pub_sub::{Error, Spec, SubscriptionRequest};
499
500    // ===== Test Event/Topic types =====
501
502    #[derive(Clone, Debug)]
503    enum SubscriptionReq {
504        Foo(String, u64),
505        Bar(String, u64),
506    }
507
508    impl SubscriptionRequest for SubscriptionReq {
509        type Topic = IndexTest;
510
511        type SubscriptionId = String;
512
513        fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
514            Ok(vec![match self {
515                SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
516                SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
517            }])
518        }
519
520        fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
521            Arc::new(match self {
522                SubscriptionReq::Foo(n, _) => n.to_string(),
523                SubscriptionReq::Bar(n, _) => n.to_string(),
524            })
525        }
526    }
527
528    // ===== A controllable in-memory Transport used by tests =====
529
530    /// TestTransport relays messages from a broadcast channel to the Consumer via `InternalRelay`.
531    /// It also forwards Subscribe/Unsubscribe/Stop signals to an observer channel so tests can assert them.
532    struct TestTransport {
533        name_ctr: AtomicUsize,
534        // We forward all transport-loop control messages here so tests can observe them.
535        observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
536        // Whether stream / poll are supported.
537        support_long: bool,
538        support_poll: bool,
539        rx: Mutex<mpsc::Receiver<Message>>,
540    }
541
542    impl TestTransport {
543        fn new(
544            support_long: bool,
545            support_poll: bool,
546        ) -> (
547            Self,
548            mpsc::Sender<Message>,
549            mpsc::Receiver<StreamCtrl<CustomPubSub>>,
550        ) {
551            let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
552            let (observe_ctrl_tx, observe_ctrl_rx) =
553                mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
554
555            let t = TestTransport {
556                name_ctr: AtomicUsize::new(1),
557                rx: Mutex::new(rx),
558                observe_ctrl_tx,
559                support_long,
560                support_poll,
561            };
562
563            (t, events_tx, observe_ctrl_rx)
564        }
565    }
566
567    #[async_trait::async_trait]
568    impl Transport for TestTransport {
569        type Spec = CustomPubSub;
570
571        fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
572            format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
573        }
574
575        async fn stream(
576            &self,
577            mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
578            topics: Vec<SubscribeMessage<Self::Spec>>,
579            reply_to: InternalRelay<Self::Spec>,
580        ) -> Result<(), Error> {
581            if !self.support_long {
582                return Err(Error::NotSupported);
583            }
584
585            // Each invocation creates a fresh broadcast receiver
586            let mut rx = self.rx.lock().await;
587            let observe = self.observe_ctrl_tx.clone();
588
589            for topic in topics {
590                observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
591            }
592
593            loop {
594                tokio::select! {
595                    // Forward any control (Subscribe/Unsubscribe/Stop) messages so the test can assert them.
596                    Some(ctrl) = subscribe_changes.recv() => {
597                        observe.try_send(ctrl.clone()).unwrap();
598                        if matches!(ctrl, StreamCtrl::Stop) {
599                            break;
600                        }
601                    }
602                    // Relay external events into the inner pubsub
603                    Some(msg) = rx.recv() => {
604                        reply_to.send(msg);
605                    }
606                }
607            }
608
609            Ok(())
610        }
611
612        async fn poll(
613            &self,
614            _topics: Vec<SubscribeMessage<Self::Spec>>,
615            reply_to: InternalRelay<Self::Spec>,
616        ) -> Result<(), Error> {
617            if !self.support_poll {
618                return Err(Error::NotSupported);
619            }
620
621            // On each poll call, drain anything currently pending and return.
622            // (The Consumer calls this repeatedly; first call happens immediately.)
623            let mut rx = self.rx.lock().await;
624            // Non-blocking drain pass: try a few times without sleeping to keep tests snappy
625            for _ in 0..32 {
626                match rx.try_recv() {
627                    Ok(msg) => reply_to.send(msg),
628                    Err(mpsc::error::TryRecvError::Empty) => continue,
629                    Err(mpsc::error::TryRecvError::Disconnected) => break,
630                }
631            }
632            Ok(())
633        }
634    }
635
636    // ===== Helpers =====
637
638    async fn recv_next<T: Transport>(
639        sub: &mut RemoteActiveConsumer<T>,
640        dur_ms: u64,
641    ) -> Option<<T::Spec as Spec>::Event> {
642        timeout(Duration::from_millis(dur_ms), sub.recv())
643            .await
644            .ok()
645            .flatten()
646    }
647
648    async fn expect_ctrl(
649        rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
650        dur_ms: u64,
651        pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
652    ) -> StreamCtrl<CustomPubSub> {
653        timeout(Duration::from_millis(dur_ms), async {
654            loop {
655                if let Some(msg) = rx.recv().await {
656                    if pred(&msg) {
657                        break msg;
658                    }
659                }
660            }
661        })
662        .await
663        .expect("timed out waiting for control message")
664    }
665
666    // ===== Tests =====
667
668    #[tokio::test]
669    async fn stream_delivery_and_unsubscribe_on_drop() {
670        // stream supported, poll supported (doesn't matter; prefer long)
671        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
672
673        // prefer_polling = false so connection loop will try stream first.
674        let consumer = Consumer::new(transport, false, ());
675
676        // Subscribe to Foo(7)
677        let mut sub = consumer
678            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
679            .expect("subscribe ok");
680
681        // We should see a Subscribe(name, topic) forwarded to transport
682        let ctrl = expect_ctrl(
683            &mut ctrl_rx,
684            1000,
685            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
686        )
687        .await;
688        match ctrl {
689            StreamCtrl::Subscribe((name, idx)) => {
690                assert_ne!(name, "t".to_owned());
691                assert_eq!(idx, IndexTest::Foo(7));
692            }
693            _ => unreachable!(),
694        }
695
696        // Send an event that matches Foo(7)
697        events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
698        let got = recv_next::<TestTransport>(&mut sub, 1000)
699            .await
700            .expect("got event");
701        assert_eq!(got, Message { foo: 7, bar: 1 });
702
703        // Dropping the RemoteActiveConsumer should trigger an Unsubscribe(name)
704        drop(sub);
705        let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
706            matches!(m, StreamCtrl::Unsubscribe(_))
707        })
708        .await;
709
710        // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
711        drop(consumer);
712        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
713    }
714
715    #[tokio::test]
716    async fn test_cache_and_invalation() {
717        // stream supported, poll supported (doesn't matter; prefer long)
718        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
719
720        // prefer_polling = false so connection loop will try stream first.
721        let consumer = Consumer::new(transport, false, ());
722
723        // Subscribe to Foo(7)
724        let mut sub_1 = consumer
725            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
726            .expect("subscribe ok");
727
728        // We should see a Subscribe(name, topic) forwarded to transport
729        let ctrl = expect_ctrl(
730            &mut ctrl_rx,
731            1000,
732            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
733        )
734        .await;
735        match ctrl {
736            StreamCtrl::Subscribe((name, idx)) => {
737                assert_ne!(name, "t1".to_owned());
738                assert_eq!(idx, IndexTest::Foo(7));
739            }
740            _ => unreachable!(),
741        }
742
743        // Send an event that matches Foo(7)
744        events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
745        let got = recv_next::<TestTransport>(&mut sub_1, 1000)
746            .await
747            .expect("got event");
748        assert_eq!(got, Message { foo: 7, bar: 1 });
749
750        // Subscribe to Foo(7), should receive the latest message and future messages
751        let mut sub_2 = consumer
752            .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
753            .expect("subscribe ok");
754
755        let got = recv_next::<TestTransport>(&mut sub_2, 1000)
756            .await
757            .expect("got event");
758        assert_eq!(got, Message { foo: 7, bar: 1 });
759
760        // Dropping the RemoteActiveConsumer but not unsubscribe, since sub_2 is still active
761        drop(sub_1);
762
763        // Subscribe to Foo(7), should receive the latest message and future messages
764        let mut sub_3 = consumer
765            .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
766            .expect("subscribe ok");
767
768        // receive cache message
769        let got = recv_next::<TestTransport>(&mut sub_3, 1000)
770            .await
771            .expect("got event");
772        assert_eq!(got, Message { foo: 7, bar: 1 });
773
774        // Send an event that matches Foo(7)
775        events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
776
777        // receive new message
778        let got = recv_next::<TestTransport>(&mut sub_2, 1000)
779            .await
780            .expect("got event");
781        assert_eq!(got, Message { foo: 7, bar: 2 });
782
783        let got = recv_next::<TestTransport>(&mut sub_3, 1000)
784            .await
785            .expect("got event");
786        assert_eq!(got, Message { foo: 7, bar: 2 });
787
788        drop(sub_2);
789        drop(sub_3);
790
791        let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
792            matches!(m, StreamCtrl::Unsubscribe(_))
793        })
794        .await;
795
796        // The cache should be dropped, so no new messages
797        let mut sub_4 = consumer
798            .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
799            .expect("subscribe ok");
800
801        assert!(
802            recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
803            "Should have not receive any update"
804        );
805
806        drop(sub_4);
807
808        // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
809        let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
810    }
811
812    #[tokio::test]
813    async fn cache_ignores_orphan_event_topics() {
814        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
815        let consumer = Consumer::new(transport, false, ());
816
817        let mut sub = consumer
818            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
819            .expect("subscribe ok");
820        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
821            matches!(m, StreamCtrl::Subscribe(_))
822        })
823        .await;
824
825        events_tx.send(Message { foo: 7, bar: 99 }).await.unwrap();
826        let _ = recv_next::<TestTransport>(&mut sub, 1000)
827            .await
828            .expect("got event");
829
830        drop(sub);
831        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
832            matches!(m, StreamCtrl::Unsubscribe(_))
833        })
834        .await;
835
836        let cache = consumer.cached_events.read();
837        assert!(
838            cache.is_empty(),
839            "cache leaked entries for orphan topics: {:?}",
840            cache.keys().collect::<Vec<_>>()
841        );
842    }
843
844    #[tokio::test]
845    async fn falls_back_to_poll_when_stream_not_supported() {
846        // stream NOT supported, poll supported
847        let (transport, events_tx, _) = TestTransport::new(false, true);
848        // prefer_polling = true nudges the connection loop to poll first, but even if it
849        // tried stream, our transport returns NotSupported and the loop will use poll.
850        let consumer = Consumer::new(transport, true, ());
851
852        // Subscribe to Bar(5)
853        let mut sub = consumer
854            .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
855            .expect("subscribe ok");
856
857        // Inject an event; the poll path should relay it on the first poll iteration
858        events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
859        let got = recv_next::<TestTransport>(&mut sub, 1500)
860            .await
861            .expect("event relayed via polling");
862        assert_eq!(got, Message { foo: 9, bar: 5 });
863    }
864
865    #[tokio::test]
866    async fn multiple_subscribers_share_single_remote_subscription() {
867        // This validates the "coalescing" behavior in Consumer::subscribe where multiple local
868        // subscribers to the same Topic should only create one remote subscription.
869        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
870        let consumer = Consumer::new(transport, false, ());
871
872        // Two local subscriptions to the SAME topic/name pair (different names)
873        let mut a = consumer
874            .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
875            .expect("subscribe A");
876        let _ = expect_ctrl(
877            &mut ctrl_rx,
878            1000,
879            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if  *idx == IndexTest::Foo(1)),
880        )
881        .await;
882
883        let mut b = consumer
884            .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
885            .expect("subscribe B");
886
887        // No second Subscribe should be forwarded for the same topic (coalesced).
888        // Give a little time; if one appears, we'll fail explicitly.
889        if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
890            timeout(Duration::from_millis(400), ctrl_rx.recv()).await
891        {
892            assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
893        }
894
895        // Send one event and ensure BOTH local subscribers receive it.
896        events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
897        let got_a = recv_next::<TestTransport>(&mut a, 1000)
898            .await
899            .expect("A got");
900        let got_b = recv_next::<TestTransport>(&mut b, 1000)
901            .await
902            .expect("B got");
903        assert_eq!(got_a, Message { foo: 1, bar: 42 });
904        assert_eq!(got_b, Message { foo: 1, bar: 42 });
905
906        // Drop B: no Unsubscribe should be sent yet (still one local subscriber).
907        drop(b);
908        if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
909            timeout(Duration::from_millis(400), ctrl_rx.recv()).await
910        {
911            panic!("Should NOT unsubscribe while another local subscriber exists");
912        }
913
914        // Drop A: now remote unsubscribe should occur.
915        drop(a);
916        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
917            matches!(m, StreamCtrl::Unsubscribe(_))
918        })
919        .await;
920
921        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
922    }
923}