Skip to main content

cdk_common/pub_sub/
subscriber.rs

1//! Active subscription
2use std::fmt::Debug;
3use std::sync::atomic::AtomicUsize;
4use std::sync::{Arc, Mutex};
5
6use tokio::sync::mpsc;
7
8use super::pubsub::{SubReceiver, TopicTree};
9use super::{Error, Spec};
10
11/// Subscription request
12pub trait SubscriptionRequest {
13    /// Topics
14    type Topic;
15
16    /// Subscription Id
17    type SubscriptionId;
18
19    /// Try to get topics from the request
20    fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;
21
22    /// Get the subscription name
23    fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
24}
25
26/// Active Subscription
27#[allow(missing_debug_implementations)]
28pub struct ActiveSubscription<S>
29where
30    S: Spec + 'static,
31{
32    id: usize,
33    name: Arc<S::SubscriptionId>,
34    active_subscribers: Arc<AtomicUsize>,
35    topics: TopicTree<S>,
36    subscribed_to: Vec<S::Topic>,
37    receiver: Option<SubReceiver<S>>,
38}
39
40impl<S> ActiveSubscription<S>
41where
42    S: Spec + 'static,
43{
44    /// Creates a new instance
45    pub fn new(
46        id: usize,
47        name: Arc<S::SubscriptionId>,
48        active_subscribers: Arc<AtomicUsize>,
49        topics: TopicTree<S>,
50        subscribed_to: Vec<S::Topic>,
51        receiver: Option<SubReceiver<S>>,
52    ) -> Self {
53        Self {
54            id,
55            name,
56            active_subscribers,
57            subscribed_to,
58            topics,
59            receiver,
60        }
61    }
62
63    /// Receives the next event
64    pub async fn recv(&mut self) -> Option<S::Event> {
65        self.receiver.as_mut()?.recv().await.map(|(_, event)| event)
66    }
67
68    /// Try receive an event or return None right away
69    pub fn try_recv(&mut self) -> Option<S::Event> {
70        self.receiver
71            .as_mut()?
72            .try_recv()
73            .ok()
74            .map(|(_, event)| event)
75    }
76
77    /// Get the subscription name
78    pub fn name(&self) -> &S::SubscriptionId {
79        &self.name
80    }
81}
82
83impl<S> Drop for ActiveSubscription<S>
84where
85    S: Spec + 'static,
86{
87    fn drop(&mut self) {
88        // remove the listener
89        let mut topics = self.topics.write();
90        for index in self.subscribed_to.drain(..) {
91            topics.remove(&(index, self.id));
92        }
93
94        // decrement the number of active subscribers
95        self.active_subscribers
96            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
97    }
98}
99
100/// Lightweight sink used by producers to send events to subscribers.
101///
102/// You usually do not construct a `Subscriber` directly — it is provided to you in
103/// the [`Spec::fetch_events`] callback so you can backfill a new subscription.
104#[derive(Debug)]
105pub struct Subscriber<S>
106where
107    S: Spec + 'static,
108{
109    subscription: Arc<S::SubscriptionId>,
110    inner: mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
111    latest: Arc<Mutex<Option<S::Event>>>,
112}
113
114impl<S> Clone for Subscriber<S>
115where
116    S: Spec + 'static,
117{
118    fn clone(&self) -> Self {
119        Self {
120            subscription: self.subscription.clone(),
121            inner: self.inner.clone(),
122            latest: self.latest.clone(),
123        }
124    }
125}
126
127impl<S> Subscriber<S>
128where
129    S: Spec + 'static,
130{
131    /// Create a new instance
132    pub fn new(
133        subscription: Arc<S::SubscriptionId>,
134        inner: &mpsc::Sender<(Arc<S::SubscriptionId>, S::Event)>,
135    ) -> Self {
136        Self {
137            inner: inner.clone(),
138            subscription,
139            latest: Arc::new(Mutex::new(None)),
140        }
141    }
142
143    /// Send a message
144    pub fn send(&self, event: S::Event) {
145        let mut latest = if let Ok(reader) = self.latest.lock() {
146            reader
147        } else {
148            let _ = self.inner.try_send((self.subscription.to_owned(), event));
149            return;
150        };
151
152        if let Some(last_event) = latest.replace(event.clone()) {
153            if last_event == event {
154                return;
155            }
156        }
157
158        let _ = self.inner.try_send((self.subscription.to_owned(), event));
159    }
160}