Skip to main content

cdk_common/pub_sub/
remote_consumer.rs

1//! Pub-sub consumer
2//!
3//! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
4use std::collections::{HashMap, VecDeque};
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::time::Duration;
8
9use parking_lot::RwLock;
10use tokio::sync::mpsc;
11use tokio::time::{sleep, Instant};
12
13use super::subscriber::{ActiveSubscription, SubscriptionRequest};
14use super::{Error, Event, Pubsub, Spec};
15use crate::task::spawn;
16
17const STREAM_CONNECTION_BACKOFF: Duration = Duration::from_millis(2_000);
18
19const STREAM_CONNECTION_MAX_BACKOFF: Duration = Duration::from_millis(30_000);
20
21const INTERNAL_POLL_SIZE: usize = 1_000;
22
23const POLL_SLEEP: Duration = Duration::from_millis(2_000);
24
25struct UniqueSubscription<S>
26where
27    S: Spec,
28{
29    name: S::SubscriptionId,
30    total_subscribers: usize,
31}
32
33type UniqueSubscriptions<S> = RwLock<HashMap<<S as Spec>::Topic, UniqueSubscription<S>>>;
34
35type ActiveSubscriptions<S> =
36    RwLock<HashMap<Arc<<S as Spec>::SubscriptionId>, Vec<<S as Spec>::Topic>>>;
37
38type CacheEvent<S> = HashMap<<<S as Spec>::Event as Event>::Topic, <S as Spec>::Event>;
39
40/// Subscription consumer
41#[allow(missing_debug_implementations)]
42pub struct Consumer<T>
43where
44    T: Transport + 'static,
45{
46    transport: T,
47    inner_pubsub: Arc<Pubsub<T::Spec>>,
48    remote_subscriptions: UniqueSubscriptions<T::Spec>,
49    subscriptions: ActiveSubscriptions<T::Spec>,
50    stream_ctrl: RwLock<Option<mpsc::Sender<StreamCtrl<T::Spec>>>>,
51    still_running: AtomicBool,
52    prefer_polling: bool,
53    /// Cached events
54    ///
55    /// The cached events are useful to share events. The cache is automatically evicted it is
56    /// disconnected from the remote source, meaning the cache is only active while there is an
57    /// active subscription to the remote source, and it remembers the latest event.
58    cached_events: Arc<RwLock<CacheEvent<T::Spec>>>,
59}
60
61/// Remote consumer
62#[allow(missing_debug_implementations)]
63pub struct RemoteActiveConsumer<T>
64where
65    T: Transport + 'static,
66{
67    inner: ActiveSubscription<T::Spec>,
68    previous_messages: VecDeque<<T::Spec as Spec>::Event>,
69    consumer: Arc<Consumer<T>>,
70}
71
72impl<T> RemoteActiveConsumer<T>
73where
74    T: Transport + 'static,
75{
76    /// Receives the next event
77    pub async fn recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
78        if let Some(event) = self.previous_messages.pop_front() {
79            Some(event)
80        } else {
81            self.inner.recv().await
82        }
83    }
84
85    /// Try receive an event or return None right away
86    pub fn try_recv(&mut self) -> Option<<T::Spec as Spec>::Event> {
87        if let Some(event) = self.previous_messages.pop_front() {
88            Some(event)
89        } else {
90            self.inner.try_recv()
91        }
92    }
93
94    /// Get the subscription name
95    pub fn name(&self) -> &<T::Spec as Spec>::SubscriptionId {
96        self.inner.name()
97    }
98}
99
100impl<T> Drop for RemoteActiveConsumer<T>
101where
102    T: Transport + 'static,
103{
104    fn drop(&mut self) {
105        let _ = self.consumer.unsubscribe(self.name().clone());
106    }
107}
108
109/// Struct to relay events from Poll and Streams from the external subscription to the local
110/// subscribers
111#[allow(missing_debug_implementations)]
112pub struct InternalRelay<S>
113where
114    S: Spec + 'static,
115{
116    inner: Arc<Pubsub<S>>,
117    cached_events: Arc<RwLock<CacheEvent<S>>>,
118}
119
120impl<S> InternalRelay<S>
121where
122    S: Spec + 'static,
123{
124    /// Relay a remote event locally
125    pub fn send<X>(&self, event: X)
126    where
127        X: Into<S::Event>,
128    {
129        let event = event.into();
130        let mut cached_events = self.cached_events.write();
131
132        for topic in event.get_topics() {
133            cached_events.insert(topic, event.clone());
134        }
135
136        self.inner.publish(event);
137    }
138}
139
140impl<T> Consumer<T>
141where
142    T: Transport + 'static,
143{
144    /// Creates a new instance
145    pub fn new(
146        transport: T,
147        prefer_polling: bool,
148        context: <T::Spec as Spec>::Context,
149    ) -> Arc<Self> {
150        let this = Arc::new(Self {
151            transport,
152            prefer_polling,
153            inner_pubsub: Arc::new(Pubsub::new(T::Spec::new_instance(context))),
154            subscriptions: Default::default(),
155            remote_subscriptions: Default::default(),
156            stream_ctrl: RwLock::new(None),
157            cached_events: Default::default(),
158            still_running: true.into(),
159        });
160
161        spawn(Self::stream(this.clone()));
162
163        this
164    }
165
166    async fn stream(instance: Arc<Self>) {
167        let mut stream_supported = true;
168        let mut poll_supported = true;
169
170        let mut backoff = STREAM_CONNECTION_BACKOFF;
171        let mut retry_at = None;
172
173        loop {
174            if (!stream_supported && !poll_supported)
175                || !instance
176                    .still_running
177                    .load(std::sync::atomic::Ordering::Relaxed)
178            {
179                break;
180            }
181
182            if instance.remote_subscriptions.read().is_empty() {
183                sleep(Duration::from_millis(100)).await;
184                continue;
185            }
186
187            if stream_supported
188                && !instance.prefer_polling
189                && retry_at
190                    .map(|retry_at| retry_at < Instant::now())
191                    .unwrap_or(true)
192            {
193                let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
194
195                {
196                    *instance.stream_ctrl.write() = Some(sender);
197                }
198
199                let current_subscriptions = {
200                    instance
201                        .remote_subscriptions
202                        .read()
203                        .iter()
204                        .map(|(key, name)| (name.name.clone(), key.clone()))
205                        .collect::<Vec<_>>()
206                };
207
208                if let Err(err) = instance
209                    .transport
210                    .stream(
211                        receiver,
212                        current_subscriptions,
213                        InternalRelay {
214                            inner: instance.inner_pubsub.clone(),
215                            cached_events: instance.cached_events.clone(),
216                        },
217                    )
218                    .await
219                {
220                    retry_at = Some(Instant::now() + backoff);
221                    backoff =
222                        (backoff + STREAM_CONNECTION_BACKOFF).min(STREAM_CONNECTION_MAX_BACKOFF);
223
224                    if matches!(err, Error::NotSupported) {
225                        stream_supported = false;
226                    }
227                    tracing::error!("Long connection failed with error {:?}", err);
228                } else {
229                    backoff = STREAM_CONNECTION_BACKOFF;
230                }
231
232                // remove sender to stream, as there is no stream
233                let _ = instance.stream_ctrl.write().take();
234            }
235
236            if poll_supported {
237                let current_subscriptions = {
238                    instance
239                        .remote_subscriptions
240                        .read()
241                        .iter()
242                        .map(|(key, name)| (name.name.clone(), key.clone()))
243                        .collect::<Vec<_>>()
244                };
245
246                if let Err(err) = instance
247                    .transport
248                    .poll(
249                        current_subscriptions,
250                        InternalRelay {
251                            inner: instance.inner_pubsub.clone(),
252                            cached_events: instance.cached_events.clone(),
253                        },
254                    )
255                    .await
256                {
257                    if matches!(err, Error::NotSupported) {
258                        poll_supported = false;
259                    }
260                    tracing::error!("Polling failed with error {:?}", err);
261                }
262
263                sleep(POLL_SLEEP).await;
264            }
265        }
266    }
267
268    /// Unsubscribe from a topic, this is called automatically when RemoteActiveSubscription<T> goes
269    /// out of scope
270    fn unsubscribe(
271        self: &Arc<Self>,
272        subscription_name: <T::Spec as Spec>::SubscriptionId,
273    ) -> Result<(), Error> {
274        let topics = self
275            .subscriptions
276            .write()
277            .remove(&subscription_name)
278            .ok_or(Error::NoSubscription)?;
279
280        let mut remote_subscriptions = self.remote_subscriptions.write();
281
282        for topic in topics {
283            let mut remote_subscription =
284                if let Some(remote_subscription) = remote_subscriptions.remove(&topic) {
285                    remote_subscription
286                } else {
287                    continue;
288                };
289
290            remote_subscription.total_subscribers =
291                remote_subscription.total_subscribers.saturating_sub(1);
292
293            if remote_subscription.total_subscribers == 0 {
294                let mut cached_events = self.cached_events.write();
295
296                cached_events.remove(&topic);
297
298                self.message_to_stream(StreamCtrl::Unsubscribe(remote_subscription.name.clone()))?;
299            } else {
300                remote_subscriptions.insert(topic, remote_subscription);
301            }
302        }
303
304        if remote_subscriptions.is_empty() {
305            self.message_to_stream(StreamCtrl::Stop)?;
306        }
307
308        Ok(())
309    }
310
311    #[inline(always)]
312    fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
313        let to_stream = self.stream_ctrl.read();
314
315        if let Some(to_stream) = to_stream.as_ref() {
316            Ok(to_stream.try_send(message)?)
317        } else {
318            Ok(())
319        }
320    }
321
322    /// Creates a subscription
323    ///
324    /// The subscriptions have two parts:
325    ///
326    /// 1. Will create the subscription to the remote Pubsub service, Any events will be moved to
327    ///    the internal pubsub
328    ///
329    /// 2. The internal subscription to the inner Pubsub. Because all subscriptions are going the
330    ///    transport, once events matches subscriptions, the inner_pubsub will receive the message and
331    ///    broadcasat the event.
332    pub fn subscribe<I>(self: &Arc<Self>, request: I) -> Result<RemoteActiveConsumer<T>, Error>
333    where
334        I: SubscriptionRequest<
335            Topic = <T::Spec as Spec>::Topic,
336            SubscriptionId = <T::Spec as Spec>::SubscriptionId,
337        >,
338    {
339        let subscription_name = request.subscription_name();
340        let topics = request.try_get_topics()?;
341
342        let mut remote_subscriptions = self.remote_subscriptions.write();
343        let mut subscriptions = self.subscriptions.write();
344
345        if subscriptions.get(&subscription_name).is_some() {
346            return Err(Error::NoSubscription);
347        }
348
349        let mut previous_messages = Vec::new();
350        let cached_events = self.cached_events.read();
351
352        for topic in topics.iter() {
353            if let Some(subscription) = remote_subscriptions.get_mut(topic) {
354                subscription.total_subscribers += 1;
355
356                if let Some(v) = cached_events.get(topic).cloned() {
357                    previous_messages.push(v);
358                }
359            } else {
360                let internal_sub_name = self.transport.new_name();
361                remote_subscriptions.insert(
362                    topic.clone(),
363                    UniqueSubscription {
364                        total_subscribers: 1,
365                        name: internal_sub_name.clone(),
366                    },
367                );
368
369                // new subscription is created, so the connection worker should be notified
370                self.message_to_stream(StreamCtrl::Subscribe((internal_sub_name, topic.clone())))?;
371            }
372        }
373
374        subscriptions.insert(subscription_name, topics);
375        drop(subscriptions);
376
377        Ok(RemoteActiveConsumer {
378            inner: self.inner_pubsub.subscribe(request)?,
379            previous_messages: previous_messages.into(),
380            consumer: self.clone(),
381        })
382    }
383}
384
385impl<T> Drop for Consumer<T>
386where
387    T: Transport + 'static,
388{
389    fn drop(&mut self) {
390        self.still_running
391            .store(false, std::sync::atomic::Ordering::Release);
392        if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
393            let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
394                tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
395            });
396        }
397    }
398}
399
400/// Subscribe Message
401pub type SubscribeMessage<S> = (<S as Spec>::SubscriptionId, <S as Spec>::Topic);
402
403/// Messages sent from the [`Consumer`] to the [`Transport`] background loop.
404#[allow(missing_debug_implementations)]
405pub enum StreamCtrl<S>
406where
407    S: Spec + 'static,
408{
409    /// Add a subscription
410    Subscribe(SubscribeMessage<S>),
411    /// Desuscribe
412    Unsubscribe(S::SubscriptionId),
413    /// Exit the loop
414    Stop,
415}
416
417impl<S> Clone for StreamCtrl<S>
418where
419    S: Spec + 'static,
420{
421    fn clone(&self) -> Self {
422        match self {
423            Self::Subscribe(s) => Self::Subscribe(s.clone()),
424            Self::Unsubscribe(u) => Self::Unsubscribe(u.clone()),
425            Self::Stop => Self::Stop,
426        }
427    }
428}
429
430/// Transport abstracts how the consumer talks to the remote pubsub.
431///
432/// Implement this on your HTTP/WebSocket client. The transport is responsible for:
433/// - creating unique subscription names,
434/// - keeping a long connection via `stream` **or** performing on-demand `poll`,
435/// - forwarding remote events to `InternalRelay`.
436///
437/// ```ignore
438/// struct WsTransport { /* ... */ }
439/// #[async_trait::async_trait]
440/// impl Transport for WsTransport {
441///     type Topic = MyTopic;
442///     fn new_name(&self) -> <Self::Topic as Topic>::SubscriptionName { 0 }
443///     async fn stream(/* ... */) -> Result<(), Error> { Ok(()) }
444///     async fn poll(/* ... */) -> Result<(), Error> { Ok(()) }
445/// }
446/// ```
447#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
448#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
449pub trait Transport: Send + Sync {
450    /// Spec
451    type Spec: Spec;
452
453    /// Create a new subscription name
454    fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;
455
456    /// Opens a persistent connection and continuously streams events.
457    /// For protocols that support server push (e.g. WebSocket, SSE).
458    async fn stream(
459        &self,
460        subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
461        topics: Vec<SubscribeMessage<Self::Spec>>,
462        reply_to: InternalRelay<Self::Spec>,
463    ) -> Result<(), Error>;
464
465    /// Performs a one-shot fetch of any currently available events.
466    /// Called repeatedly by the consumer when streaming is not available.
467    async fn poll(
468        &self,
469        topics: Vec<SubscribeMessage<Self::Spec>>,
470        reply_to: InternalRelay<Self::Spec>,
471    ) -> Result<(), Error>;
472}
473
474#[cfg(test)]
475mod tests {
476    use std::sync::atomic::{AtomicUsize, Ordering};
477    use std::sync::Arc;
478
479    use tokio::sync::{mpsc, Mutex};
480    use tokio::time::{timeout, Duration};
481
482    use super::{
483        InternalRelay, RemoteActiveConsumer, StreamCtrl, SubscribeMessage, Transport,
484        INTERNAL_POLL_SIZE,
485    };
486    use crate::pub_sub::remote_consumer::Consumer;
487    use crate::pub_sub::test::{CustomPubSub, IndexTest, Message};
488    use crate::pub_sub::{Error, Spec, SubscriptionRequest};
489
490    // ===== Test Event/Topic types =====
491
492    #[derive(Clone, Debug)]
493    enum SubscriptionReq {
494        Foo(String, u64),
495        Bar(String, u64),
496    }
497
498    impl SubscriptionRequest for SubscriptionReq {
499        type Topic = IndexTest;
500
501        type SubscriptionId = String;
502
503        fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
504            Ok(vec![match self {
505                SubscriptionReq::Foo(_, n) => IndexTest::Foo(*n),
506                SubscriptionReq::Bar(_, n) => IndexTest::Bar(*n),
507            }])
508        }
509
510        fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
511            Arc::new(match self {
512                SubscriptionReq::Foo(n, _) => n.to_string(),
513                SubscriptionReq::Bar(n, _) => n.to_string(),
514            })
515        }
516    }
517
518    // ===== A controllable in-memory Transport used by tests =====
519
520    /// TestTransport relays messages from a broadcast channel to the Consumer via `InternalRelay`.
521    /// It also forwards Subscribe/Unsubscribe/Stop signals to an observer channel so tests can assert them.
522    struct TestTransport {
523        name_ctr: AtomicUsize,
524        // We forward all transport-loop control messages here so tests can observe them.
525        observe_ctrl_tx: mpsc::Sender<StreamCtrl<CustomPubSub>>,
526        // Whether stream / poll are supported.
527        support_long: bool,
528        support_poll: bool,
529        rx: Mutex<mpsc::Receiver<Message>>,
530    }
531
532    impl TestTransport {
533        fn new(
534            support_long: bool,
535            support_poll: bool,
536        ) -> (
537            Self,
538            mpsc::Sender<Message>,
539            mpsc::Receiver<StreamCtrl<CustomPubSub>>,
540        ) {
541            let (events_tx, rx) = mpsc::channel::<Message>(INTERNAL_POLL_SIZE);
542            let (observe_ctrl_tx, observe_ctrl_rx) =
543                mpsc::channel::<StreamCtrl<_>>(INTERNAL_POLL_SIZE);
544
545            let t = TestTransport {
546                name_ctr: AtomicUsize::new(1),
547                rx: Mutex::new(rx),
548                observe_ctrl_tx,
549                support_long,
550                support_poll,
551            };
552
553            (t, events_tx, observe_ctrl_rx)
554        }
555    }
556
557    #[async_trait::async_trait]
558    impl Transport for TestTransport {
559        type Spec = CustomPubSub;
560
561        fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId {
562            format!("sub-{}", self.name_ctr.fetch_add(1, Ordering::Relaxed))
563        }
564
565        async fn stream(
566            &self,
567            mut subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
568            topics: Vec<SubscribeMessage<Self::Spec>>,
569            reply_to: InternalRelay<Self::Spec>,
570        ) -> Result<(), Error> {
571            if !self.support_long {
572                return Err(Error::NotSupported);
573            }
574
575            // Each invocation creates a fresh broadcast receiver
576            let mut rx = self.rx.lock().await;
577            let observe = self.observe_ctrl_tx.clone();
578
579            for topic in topics {
580                observe.try_send(StreamCtrl::Subscribe(topic)).unwrap();
581            }
582
583            loop {
584                tokio::select! {
585                    // Forward any control (Subscribe/Unsubscribe/Stop) messages so the test can assert them.
586                    Some(ctrl) = subscribe_changes.recv() => {
587                        observe.try_send(ctrl.clone()).unwrap();
588                        if matches!(ctrl, StreamCtrl::Stop) {
589                            break;
590                        }
591                    }
592                    // Relay external events into the inner pubsub
593                    Some(msg) = rx.recv() => {
594                        reply_to.send(msg);
595                    }
596                }
597            }
598
599            Ok(())
600        }
601
602        async fn poll(
603            &self,
604            _topics: Vec<SubscribeMessage<Self::Spec>>,
605            reply_to: InternalRelay<Self::Spec>,
606        ) -> Result<(), Error> {
607            if !self.support_poll {
608                return Err(Error::NotSupported);
609            }
610
611            // On each poll call, drain anything currently pending and return.
612            // (The Consumer calls this repeatedly; first call happens immediately.)
613            let mut rx = self.rx.lock().await;
614            // Non-blocking drain pass: try a few times without sleeping to keep tests snappy
615            for _ in 0..32 {
616                match rx.try_recv() {
617                    Ok(msg) => reply_to.send(msg),
618                    Err(mpsc::error::TryRecvError::Empty) => continue,
619                    Err(mpsc::error::TryRecvError::Disconnected) => break,
620                }
621            }
622            Ok(())
623        }
624    }
625
626    // ===== Helpers =====
627
628    async fn recv_next<T: Transport>(
629        sub: &mut RemoteActiveConsumer<T>,
630        dur_ms: u64,
631    ) -> Option<<T::Spec as Spec>::Event> {
632        timeout(Duration::from_millis(dur_ms), sub.recv())
633            .await
634            .ok()
635            .flatten()
636    }
637
638    async fn expect_ctrl(
639        rx: &mut mpsc::Receiver<StreamCtrl<CustomPubSub>>,
640        dur_ms: u64,
641        pred: impl Fn(&StreamCtrl<CustomPubSub>) -> bool,
642    ) -> StreamCtrl<CustomPubSub> {
643        timeout(Duration::from_millis(dur_ms), async {
644            loop {
645                if let Some(msg) = rx.recv().await {
646                    if pred(&msg) {
647                        break msg;
648                    }
649                }
650            }
651        })
652        .await
653        .expect("timed out waiting for control message")
654    }
655
656    // ===== Tests =====
657
658    #[tokio::test]
659    async fn stream_delivery_and_unsubscribe_on_drop() {
660        // stream supported, poll supported (doesn't matter; prefer long)
661        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
662
663        // prefer_polling = false so connection loop will try stream first.
664        let consumer = Consumer::new(transport, false, ());
665
666        // Subscribe to Foo(7)
667        let mut sub = consumer
668            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
669            .expect("subscribe ok");
670
671        // We should see a Subscribe(name, topic) forwarded to transport
672        let ctrl = expect_ctrl(
673            &mut ctrl_rx,
674            1000,
675            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
676        )
677        .await;
678        match ctrl {
679            StreamCtrl::Subscribe((name, idx)) => {
680                assert_ne!(name, "t".to_owned());
681                assert_eq!(idx, IndexTest::Foo(7));
682            }
683            _ => unreachable!(),
684        }
685
686        // Send an event that matches Foo(7)
687        events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
688        let got = recv_next::<TestTransport>(&mut sub, 1000)
689            .await
690            .expect("got event");
691        assert_eq!(got, Message { foo: 7, bar: 1 });
692
693        // Dropping the RemoteActiveConsumer should trigger an Unsubscribe(name)
694        drop(sub);
695        let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
696            matches!(m, StreamCtrl::Unsubscribe(_))
697        })
698        .await;
699
700        // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
701        drop(consumer);
702        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
703    }
704
705    #[tokio::test]
706    async fn test_cache_and_invalation() {
707        // stream supported, poll supported (doesn't matter; prefer long)
708        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
709
710        // prefer_polling = false so connection loop will try stream first.
711        let consumer = Consumer::new(transport, false, ());
712
713        // Subscribe to Foo(7)
714        let mut sub_1 = consumer
715            .subscribe(SubscriptionReq::Foo("t".to_owned(), 7))
716            .expect("subscribe ok");
717
718        // We should see a Subscribe(name, topic) forwarded to transport
719        let ctrl = expect_ctrl(
720            &mut ctrl_rx,
721            1000,
722            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if *idx == IndexTest::Foo(7)),
723        )
724        .await;
725        match ctrl {
726            StreamCtrl::Subscribe((name, idx)) => {
727                assert_ne!(name, "t1".to_owned());
728                assert_eq!(idx, IndexTest::Foo(7));
729            }
730            _ => unreachable!(),
731        }
732
733        // Send an event that matches Foo(7)
734        events_tx.send(Message { foo: 7, bar: 1 }).await.unwrap();
735        let got = recv_next::<TestTransport>(&mut sub_1, 1000)
736            .await
737            .expect("got event");
738        assert_eq!(got, Message { foo: 7, bar: 1 });
739
740        // Subscribe to Foo(7), should receive the latest message and future messages
741        let mut sub_2 = consumer
742            .subscribe(SubscriptionReq::Foo("t2".to_owned(), 7))
743            .expect("subscribe ok");
744
745        let got = recv_next::<TestTransport>(&mut sub_2, 1000)
746            .await
747            .expect("got event");
748        assert_eq!(got, Message { foo: 7, bar: 1 });
749
750        // Dropping the RemoteActiveConsumer but not unsubscribe, since sub_2 is still active
751        drop(sub_1);
752
753        // Subscribe to Foo(7), should receive the latest message and future messages
754        let mut sub_3 = consumer
755            .subscribe(SubscriptionReq::Foo("t3".to_owned(), 7))
756            .expect("subscribe ok");
757
758        // receive cache message
759        let got = recv_next::<TestTransport>(&mut sub_3, 1000)
760            .await
761            .expect("got event");
762        assert_eq!(got, Message { foo: 7, bar: 1 });
763
764        // Send an event that matches Foo(7)
765        events_tx.send(Message { foo: 7, bar: 2 }).await.unwrap();
766
767        // receive new message
768        let got = recv_next::<TestTransport>(&mut sub_2, 1000)
769            .await
770            .expect("got event");
771        assert_eq!(got, Message { foo: 7, bar: 2 });
772
773        let got = recv_next::<TestTransport>(&mut sub_3, 1000)
774            .await
775            .expect("got event");
776        assert_eq!(got, Message { foo: 7, bar: 2 });
777
778        drop(sub_2);
779        drop(sub_3);
780
781        let _ctrl = expect_ctrl(&mut ctrl_rx, 1000, |m| {
782            matches!(m, StreamCtrl::Unsubscribe(_))
783        })
784        .await;
785
786        // The cache should be dropped, so no new messages
787        let mut sub_4 = consumer
788            .subscribe(SubscriptionReq::Foo("t4".to_owned(), 7))
789            .expect("subscribe ok");
790
791        assert!(
792            recv_next::<TestTransport>(&mut sub_4, 1000).await.is_none(),
793            "Should have not receive any update"
794        );
795
796        drop(sub_4);
797
798        // Drop the Consumer -> Stop is sent so the transport loop exits cleanly
799        let _ = expect_ctrl(&mut ctrl_rx, 2000, |m| matches!(m, StreamCtrl::Stop)).await;
800    }
801
802    #[tokio::test]
803    async fn falls_back_to_poll_when_stream_not_supported() {
804        // stream NOT supported, poll supported
805        let (transport, events_tx, _) = TestTransport::new(false, true);
806        // prefer_polling = true nudges the connection loop to poll first, but even if it
807        // tried stream, our transport returns NotSupported and the loop will use poll.
808        let consumer = Consumer::new(transport, true, ());
809
810        // Subscribe to Bar(5)
811        let mut sub = consumer
812            .subscribe(SubscriptionReq::Bar("t".to_owned(), 5))
813            .expect("subscribe ok");
814
815        // Inject an event; the poll path should relay it on the first poll iteration
816        events_tx.send(Message { foo: 9, bar: 5 }).await.unwrap();
817        let got = recv_next::<TestTransport>(&mut sub, 1500)
818            .await
819            .expect("event relayed via polling");
820        assert_eq!(got, Message { foo: 9, bar: 5 });
821    }
822
823    #[tokio::test]
824    async fn multiple_subscribers_share_single_remote_subscription() {
825        // This validates the "coalescing" behavior in Consumer::subscribe where multiple local
826        // subscribers to the same Topic should only create one remote subscription.
827        let (transport, events_tx, mut ctrl_rx) = TestTransport::new(true, true);
828        let consumer = Consumer::new(transport, false, ());
829
830        // Two local subscriptions to the SAME topic/name pair (different names)
831        let mut a = consumer
832            .subscribe(SubscriptionReq::Foo("t".to_owned(), 1))
833            .expect("subscribe A");
834        let _ = expect_ctrl(
835            &mut ctrl_rx,
836            1000,
837            |m| matches!(m, StreamCtrl::Subscribe((_, idx)) if  *idx == IndexTest::Foo(1)),
838        )
839        .await;
840
841        let mut b = consumer
842            .subscribe(SubscriptionReq::Foo("b".to_owned(), 1))
843            .expect("subscribe B");
844
845        // No second Subscribe should be forwarded for the same topic (coalesced).
846        // Give a little time; if one appears, we'll fail explicitly.
847        if let Ok(Some(StreamCtrl::Subscribe((_, idx)))) =
848            timeout(Duration::from_millis(400), ctrl_rx.recv()).await
849        {
850            assert_ne!(idx, IndexTest::Foo(1), "should not resubscribe same topic");
851        }
852
853        // Send one event and ensure BOTH local subscribers receive it.
854        events_tx.send(Message { foo: 1, bar: 42 }).await.unwrap();
855        let got_a = recv_next::<TestTransport>(&mut a, 1000)
856            .await
857            .expect("A got");
858        let got_b = recv_next::<TestTransport>(&mut b, 1000)
859            .await
860            .expect("B got");
861        assert_eq!(got_a, Message { foo: 1, bar: 42 });
862        assert_eq!(got_b, Message { foo: 1, bar: 42 });
863
864        // Drop B: no Unsubscribe should be sent yet (still one local subscriber).
865        drop(b);
866        if let Ok(Some(StreamCtrl::Unsubscribe(_))) =
867            timeout(Duration::from_millis(400), ctrl_rx.recv()).await
868        {
869            panic!("Should NOT unsubscribe while another local subscriber exists");
870        }
871
872        // Drop A: now remote unsubscribe should occur.
873        drop(a);
874        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| {
875            matches!(m, StreamCtrl::Unsubscribe(_))
876        })
877        .await;
878
879        let _ = expect_ctrl(&mut ctrl_rx, 1000, |m| matches!(m, StreamCtrl::Stop)).await;
880    }
881}