Skip to main content

cdk_common/pub_sub/
pubsub.rs

1//! Pub-sub producer
2
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, HashSet};
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use super::subscriber::{ActiveSubscription, SubscriptionRequest};
12use super::{Error, Event, Spec, Subscriber};
13use crate::task::spawn;
14
15/// Default channel size for subscription buffering
16pub const DEFAULT_CHANNEL_SIZE: usize = 10_000;
17
18/// Subscriber Receiver
19pub type SubReceiver<S> = mpsc::Receiver<(Arc<<S as Spec>::SubscriptionId>, <S as Spec>::Event)>;
20
21/// Internal Index Tree
22pub type TopicTree<T> = Arc<
23    RwLock<
24        BTreeMap<
25            // Index with a subscription unique ID
26            (<T as Spec>::Topic, usize),
27            Subscriber<T>,
28        >,
29    >,
30>;
31
32/// Manager
33#[allow(missing_debug_implementations)]
34pub struct Pubsub<S>
35where
36    S: Spec + 'static,
37{
38    inner: Arc<S>,
39    listeners_topics: TopicTree<S>,
40    unique_subscription_counter: AtomicUsize,
41    active_subscribers: Arc<AtomicUsize>,
42}
43
44impl<S> Pubsub<S>
45where
46    S: Spec + 'static,
47{
48    /// Create a new instance
49    pub fn new(inner: Arc<S>) -> Self {
50        Self {
51            inner,
52            listeners_topics: Default::default(),
53            unique_subscription_counter: 0.into(),
54            active_subscribers: Arc::new(0.into()),
55        }
56    }
57
58    /// Total number of active subscribers, it is not the number of active topics being subscribed
59    pub fn active_subscribers(&self) -> usize {
60        self.active_subscribers
61            .load(std::sync::atomic::Ordering::Relaxed)
62    }
63
64    /// Publish an event to all listenrs
65    #[inline(always)]
66    fn publish_internal(event: S::Event, listeners_index: &TopicTree<S>) -> Result<(), Error> {
67        let index_storage = listeners_index.read();
68
69        let mut sent = HashSet::new();
70        for topic in event.get_topics() {
71            for ((subscription_index, unique_id), sender) in
72                index_storage.range((topic.clone(), 0)..)
73            {
74                if subscription_index.cmp(&topic) != Ordering::Equal {
75                    break;
76                }
77                if sent.contains(&unique_id) {
78                    continue;
79                }
80                sent.insert(unique_id);
81                sender.send(event.clone());
82            }
83        }
84
85        Ok(())
86    }
87
88    /// Broadcast an event to all listeners
89    #[inline(always)]
90    pub fn publish<E>(&self, event: E)
91    where
92        E: Into<S::Event>,
93    {
94        let topics = self.listeners_topics.clone();
95        let event = event.into();
96
97        spawn(async move {
98            let _ = Self::publish_internal(event, &topics);
99        });
100    }
101
102    /// Broadcast an event to all listeners right away, blocking the current thread
103    ///
104    /// This function takes an Arc to the storage struct, the event_id, the kind
105    /// and the vent to broadcast
106    #[inline(always)]
107    pub fn publish_now<E>(&self, event: E) -> Result<(), Error>
108    where
109        E: Into<S::Event>,
110    {
111        let event = event.into();
112        Self::publish_internal(event, &self.listeners_topics)
113    }
114
115    /// Subscribe proving custom sender/receiver mpsc
116    #[inline(always)]
117    pub fn subscribe_with<I>(
118        &self,
119        request: I,
120        sender: &mpsc::Sender<(Arc<I::SubscriptionId>, S::Event)>,
121        receiver: Option<SubReceiver<S>>,
122    ) -> Result<ActiveSubscription<S>, Error>
123    where
124        I: SubscriptionRequest<
125            Topic = <S::Event as Event>::Topic,
126            SubscriptionId = S::SubscriptionId,
127        >,
128    {
129        let subscription_name = request.subscription_name();
130        let sender = Subscriber::new(subscription_name.clone(), sender);
131        let mut index_storage = self.listeners_topics.write();
132        let subscription_internal_id = self
133            .unique_subscription_counter
134            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
135
136        self.active_subscribers
137            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
138
139        let subscribed_to = request.try_get_topics()?;
140
141        for index in subscribed_to.iter() {
142            index_storage.insert((index.clone(), subscription_internal_id), sender.clone());
143        }
144        drop(index_storage);
145
146        let inner = self.inner.clone();
147        let subscribed_to_for_spawn = subscribed_to.clone();
148
149        spawn(async move {
150            // TODO: Ignore topics broadcasted from fetch_events _if_ any real time has been broadcasted already.
151            inner.fetch_events(subscribed_to_for_spawn, sender).await;
152        });
153
154        Ok(ActiveSubscription::new(
155            subscription_internal_id,
156            subscription_name,
157            self.active_subscribers.clone(),
158            self.listeners_topics.clone(),
159            subscribed_to,
160            receiver,
161        ))
162    }
163
164    /// Subscribe
165    pub fn subscribe<I>(&self, request: I) -> Result<ActiveSubscription<S>, Error>
166    where
167        I: SubscriptionRequest<
168            Topic = <S::Event as Event>::Topic,
169            SubscriptionId = S::SubscriptionId,
170        >,
171    {
172        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
173        self.subscribe_with(request, &sender, Some(receiver))
174    }
175}