cdk_common/pub_sub/
pubsub.rs1use std::cmp::Ordering;
4use std::collections::{BTreeMap, HashSet};
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use super::subscriber::{ActiveSubscription, SubscriptionRequest};
12use super::{Error, Event, Spec, Subscriber};
13use crate::task::spawn;
14
15pub const DEFAULT_CHANNEL_SIZE: usize = 10_000;
17
18pub type SubReceiver<S> = mpsc::Receiver<(Arc<<S as Spec>::SubscriptionId>, <S as Spec>::Event)>;
20
21pub type TopicTree<T> = Arc<
23 RwLock<
24 BTreeMap<
25 (<T as Spec>::Topic, usize),
27 Subscriber<T>,
28 >,
29 >,
30>;
31
32#[allow(missing_debug_implementations)]
34pub struct Pubsub<S>
35where
36 S: Spec + 'static,
37{
38 inner: Arc<S>,
39 listeners_topics: TopicTree<S>,
40 unique_subscription_counter: AtomicUsize,
41 active_subscribers: Arc<AtomicUsize>,
42}
43
44impl<S> Pubsub<S>
45where
46 S: Spec + 'static,
47{
48 pub fn new(inner: Arc<S>) -> Self {
50 Self {
51 inner,
52 listeners_topics: Default::default(),
53 unique_subscription_counter: 0.into(),
54 active_subscribers: Arc::new(0.into()),
55 }
56 }
57
58 pub fn active_subscribers(&self) -> usize {
60 self.active_subscribers
61 .load(std::sync::atomic::Ordering::Relaxed)
62 }
63
64 #[inline(always)]
66 fn publish_internal(event: S::Event, listeners_index: &TopicTree<S>) -> Result<(), Error> {
67 let index_storage = listeners_index.read();
68
69 let mut sent = HashSet::new();
70 for topic in event.get_topics() {
71 for ((subscription_index, unique_id), sender) in
72 index_storage.range((topic.clone(), 0)..)
73 {
74 if subscription_index.cmp(&topic) != Ordering::Equal {
75 break;
76 }
77 if sent.contains(&unique_id) {
78 continue;
79 }
80 sent.insert(unique_id);
81 sender.send(event.clone());
82 }
83 }
84
85 Ok(())
86 }
87
88 #[inline(always)]
90 pub fn publish<E>(&self, event: E)
91 where
92 E: Into<S::Event>,
93 {
94 let topics = self.listeners_topics.clone();
95 let event = event.into();
96
97 spawn(async move {
98 let _ = Self::publish_internal(event, &topics);
99 });
100 }
101
102 #[inline(always)]
107 pub fn publish_now<E>(&self, event: E) -> Result<(), Error>
108 where
109 E: Into<S::Event>,
110 {
111 let event = event.into();
112 Self::publish_internal(event, &self.listeners_topics)
113 }
114
115 #[inline(always)]
117 pub fn subscribe_with<I>(
118 &self,
119 request: I,
120 sender: &mpsc::Sender<(Arc<I::SubscriptionId>, S::Event)>,
121 receiver: Option<SubReceiver<S>>,
122 ) -> Result<ActiveSubscription<S>, Error>
123 where
124 I: SubscriptionRequest<
125 Topic = <S::Event as Event>::Topic,
126 SubscriptionId = S::SubscriptionId,
127 >,
128 {
129 let subscription_name = request.subscription_name();
130 let sender = Subscriber::new(subscription_name.clone(), sender);
131 let mut index_storage = self.listeners_topics.write();
132 let subscription_internal_id = self
133 .unique_subscription_counter
134 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
135
136 self.active_subscribers
137 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
138
139 let subscribed_to = request.try_get_topics()?;
140
141 for index in subscribed_to.iter() {
142 index_storage.insert((index.clone(), subscription_internal_id), sender.clone());
143 }
144 drop(index_storage);
145
146 let inner = self.inner.clone();
147 let subscribed_to_for_spawn = subscribed_to.clone();
148
149 spawn(async move {
150 inner.fetch_events(subscribed_to_for_spawn, sender).await;
152 });
153
154 Ok(ActiveSubscription::new(
155 subscription_internal_id,
156 subscription_name,
157 self.active_subscribers.clone(),
158 self.listeners_topics.clone(),
159 subscribed_to,
160 receiver,
161 ))
162 }
163
164 pub fn subscribe<I>(&self, request: I) -> Result<ActiveSubscription<S>, Error>
166 where
167 I: SubscriptionRequest<
168 Topic = <S::Event as Event>::Topic,
169 SubscriptionId = S::SubscriptionId,
170 >,
171 {
172 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
173 self.subscribe_with(request, &sender, Some(receiver))
174 }
175}