cdk_common/pub_sub/
subscriber.rs

1//! Active subscription
2use std::fmt::Debug;
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Mutex};
5
6use tokio::sync::mpsc;
7
8use super::pubsub::{SubReceiver, TopicTree};
9use super::{Error, Spec};
10
11/// Subscription request
12pub trait SubscriptionRequest {
13    /// Topics
14    type Topic;
15
16    /// Subscription Id
17    type SubscriptionId;
18
19    /// Try to get topics from the request
20    fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;
21
22    /// Get the subscription name
23    fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
24}
25
26/// Active Subscription
27pub struct ActiveSubscription<S>
28where
29    S: Spec + 'static,
30{
31    id: usize,
32    name: Arc<S::SubscriptionId>,
33    active_subscribers: Arc<AtomicUsize>,
34    topics: TopicTree<S>,
35    subscribed_to: Vec<S::Topic>,
36    receiver: Option<SubReceiver<S>>,
37}
38
39impl<S> ActiveSubscription<S>
40where
41    S: Spec + 'static,
42{
43    /// Creates a new instance
44    pub fn new(
45        id: usize,
46        name: Arc<S::SubscriptionId>,
47        active_subscribers: Arc<AtomicUsize>,
48        topics: TopicTree<S>,
49        subscribed_to: Vec<S::Topic>,
50        receiver: Option<SubReceiver<S>>,
51    ) -> Self {
52        Self {
53            id,
54            name,
55            active_subscribers,
56            subscribed_to,
57            topics,
58            receiver,
59        }
60    }
61
62    /// Receives the next event
63    pub async fn recv(&mut self) -> Option<S::Event> {
64        self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
65    }
66
67    /// Try receive an event or return Noen right away
68    pub fn try_recv(&mut self) -> Option<S::Event> {
69        self.receiver
70            .as_mut()?
71            .try_recv()
72            .ok()
73            .map(|(_, event)| event)
74    }
75
76    /// Get the subscription name
77    pub fn name(&self) -> &S::SubscriptionId {
78        &self.name
79    }
80}
81
82impl<S> Drop for ActiveSubscription<S>
83where
84    S: Spec + 'static,
85{
86    fn drop(&mut self) {
87        // remove the listener
88        let mut topics = self.topics.write();
89        for index in self.subscribed_to.drain(..) {
90            topics.remove(&(index, self.id));
91        }
92
93        // decrement the number of active subscribers
94        self.active_subscribers
95            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
96    }
97}
98
99/// Lightweight sink used by producers to send events to subscribers.
100///
101/// You usually do not construct a `Subscriber` directly — it is provided to you in
102/// the [`Spec::fetch_events`] callback so you can backfill a new subscription.
103#[derive(Debug)]
104pub struct Subscriber<S>
105where
106    S: Spec + 'static,
107{
108    subscription: Arc<S::SubscriptionId>,
109    inner: mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
110    latest: Arc<Mutex<Option<S::Event>>>,
111}
112
113impl<S> Clone for Subscriber<S>
114where
115    S: Spec + 'static,
116{
117    fn clone(&self) -> Self {
118        Self {
119            subscription: self.subscription.clone(),
120            inner: self.inner.clone(),
121            latest: self.latest.clone(),
122        }
123    }
124}
125
126impl<S> Subscriber<S>
127where
128    S: Spec + 'static,
129{
130    /// Create a new instance
131    pub fn new(
132        subscription: Arc<S::SubscriptionId>,
133        inner: &mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
134    ) -> Self {
135        Self {
136            inner: inner.clone(),
137            subscription,
138            latest: Arc::new(Mutex::new(None)),
139        }
140    }
141
142    /// Send a message
143    pub fn send(&self, event: S::Event) {
144        let mut latest = if let Ok(reader) = self.latest.lock() {
145            reader
146        } else {
147            let _ = self.inner.try_send((self.subscription.to_owned(), event));
148            return;
149        };
150
151        if let Some(last_event) = latest.replace(event.clone()) {
152            if last_event == event {
153                return;
154            }
155        }
156
157        let _ = self.inner.try_send((self.subscription.to_owned(), event));
158    }
159}