cdk_common/pub_sub/
pubsub.rs

1//! Pub-sub producer
2
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, HashSet};
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use super::subscriber::{ActiveSubscription, SubscriptionRequest};
12use super::{Error, Event, Spec, Subscriber};
13use crate::task::spawn;
14
15/// Default channel size for subscription buffering
16pub const DEFAULT_CHANNEL_SIZE: usize = 10_000;
17
18/// Subscriber Receiver
19pub type SubReceiver<S> = mpsc::Receiver<(Arc<<S as Spec>::SubscriptionId>, <S as Spec>::Event)>;
20
21/// Internal Index Tree
22pub type TopicTree<T> = Arc<
23    RwLock<
24        BTreeMap<
25            // Index with a subscription unique ID
26            (<T as Spec>::Topic, usize),
27            Subscriber<T>,
28        >,
29    >,
30>;
31
32/// Manager
33pub struct Pubsub<S>
34where
35    S: Spec + 'static,
36{
37    inner: Arc<S>,
38    listeners_topics: TopicTree<S>,
39    unique_subscription_counter: AtomicUsize,
40    active_subscribers: Arc<AtomicUsize>,
41}
42
43impl<S> Pubsub<S>
44where
45    S: Spec + 'static,
46{
47    /// Create a new instance
48    pub fn new(inner: Arc<S>) -> Self {
49        Self {
50            inner,
51            listeners_topics: Default::default(),
52            unique_subscription_counter: 0.into(),
53            active_subscribers: Arc::new(0.into()),
54        }
55    }
56
57    /// Total number of active subscribers, it is not the number of active topics being subscribed
58    pub fn active_subscribers(&self) -> usize {
59        self.active_subscribers
60            .load(std::sync::atomic::Ordering::Relaxed)
61    }
62
63    /// Publish an event to all listenrs
64    #[inline(always)]
65    fn publish_internal(event: S::Event, listeners_index: &TopicTree<S>) -> Result<(), Error> {
66        let index_storage = listeners_index.read();
67
68        let mut sent = HashSet::new();
69        for topic in event.get_topics() {
70            for ((subscription_index, unique_id), sender) in
71                index_storage.range((topic.clone(), 0)..)
72            {
73                if subscription_index.cmp(&topic) != Ordering::Equal {
74                    break;
75                }
76                if sent.contains(&unique_id) {
77                    continue;
78                }
79                sent.insert(unique_id);
80                sender.send(event.clone());
81            }
82        }
83
84        Ok(())
85    }
86
87    /// Broadcast an event to all listeners
88    #[inline(always)]
89    pub fn publish<E>(&self, event: E)
90    where
91        E: Into<S::Event>,
92    {
93        let topics = self.listeners_topics.clone();
94        let event = event.into();
95
96        spawn(async move {
97            let _ = Self::publish_internal(event, &topics);
98        });
99    }
100
101    /// Broadcast an event to all listeners right away, blocking the current thread
102    ///
103    /// This function takes an Arc to the storage struct, the event_id, the kind
104    /// and the vent to broadcast
105    #[inline(always)]
106    pub fn publish_now<E>(&self, event: E) -> Result<(), Error>
107    where
108        E: Into<S::Event>,
109    {
110        let event = event.into();
111        Self::publish_internal(event, &self.listeners_topics)
112    }
113
114    /// Subscribe proving custom sender/receiver mpsc
115    #[inline(always)]
116    pub fn subscribe_with<I>(
117        &self,
118        request: I,
119        sender: &mpsc::Sender<(Arc<I::SubscriptionId>, S::Event)>,
120        receiver: Option<SubReceiver<S>>,
121    ) -> Result<ActiveSubscription<S>, Error>
122    where
123        I: SubscriptionRequest<
124            Topic = <S::Event as Event>::Topic,
125            SubscriptionId = S::SubscriptionId,
126        >,
127    {
128        let subscription_name = request.subscription_name();
129        let sender = Subscriber::new(subscription_name.clone(), sender);
130        let mut index_storage = self.listeners_topics.write();
131        let subscription_internal_id = self
132            .unique_subscription_counter
133            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
134
135        self.active_subscribers
136            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
137
138        let subscribed_to = request.try_get_topics()?;
139
140        for index in subscribed_to.iter() {
141            index_storage.insert((index.clone(), subscription_internal_id), sender.clone());
142        }
143        drop(index_storage);
144
145        let inner = self.inner.clone();
146        let subscribed_to_for_spawn = subscribed_to.clone();
147
148        spawn(async move {
149            // TODO: Ignore topics broadcasted from fetch_events _if_ any real time has been broadcasted already.
150            inner.fetch_events(subscribed_to_for_spawn, sender).await;
151        });
152
153        Ok(ActiveSubscription::new(
154            subscription_internal_id,
155            subscription_name,
156            self.active_subscribers.clone(),
157            self.listeners_topics.clone(),
158            subscribed_to,
159            receiver,
160        ))
161    }
162
163    /// Subscribe
164    pub fn subscribe<I>(&self, request: I) -> Result<ActiveSubscription<S>, Error>
165    where
166        I: SubscriptionRequest<
167            Topic = <S::Event as Event>::Topic,
168            SubscriptionId = S::SubscriptionId,
169        >,
170    {
171        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
172        self.subscribe_with(request, &sender, Some(receiver))
173    }
174}