cdk_common/pub_sub/
pubsub.rs1use std::cmp::Ordering;
4use std::collections::{BTreeMap, HashSet};
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9use tokio::sync::mpsc;
10
11use super::subscriber::{ActiveSubscription, SubscriptionRequest};
12use super::{Error, Event, Spec, Subscriber};
13use crate::task::spawn;
14
15pub const DEFAULT_CHANNEL_SIZE: usize = 10_000;
17
18pub type SubReceiver<S> = mpsc::Receiver<(Arc<<S as Spec>::SubscriptionId>, <S as Spec>::Event)>;
20
21pub type TopicTree<T> = Arc<
23 RwLock<
24 BTreeMap<
25 (<T as Spec>::Topic, usize),
27 Subscriber<T>,
28 >,
29 >,
30>;
31
32pub struct Pubsub<S>
34where
35 S: Spec + 'static,
36{
37 inner: Arc<S>,
38 listeners_topics: TopicTree<S>,
39 unique_subscription_counter: AtomicUsize,
40 active_subscribers: Arc<AtomicUsize>,
41}
42
43impl<S> Pubsub<S>
44where
45 S: Spec + 'static,
46{
47 pub fn new(inner: Arc<S>) -> Self {
49 Self {
50 inner,
51 listeners_topics: Default::default(),
52 unique_subscription_counter: 0.into(),
53 active_subscribers: Arc::new(0.into()),
54 }
55 }
56
57 pub fn active_subscribers(&self) -> usize {
59 self.active_subscribers
60 .load(std::sync::atomic::Ordering::Relaxed)
61 }
62
63 #[inline(always)]
65 fn publish_internal(event: S::Event, listeners_index: &TopicTree<S>) -> Result<(), Error> {
66 let index_storage = listeners_index.read();
67
68 let mut sent = HashSet::new();
69 for topic in event.get_topics() {
70 for ((subscription_index, unique_id), sender) in
71 index_storage.range((topic.clone(), 0)..)
72 {
73 if subscription_index.cmp(&topic) != Ordering::Equal {
74 break;
75 }
76 if sent.contains(&unique_id) {
77 continue;
78 }
79 sent.insert(unique_id);
80 sender.send(event.clone());
81 }
82 }
83
84 Ok(())
85 }
86
87 #[inline(always)]
89 pub fn publish<E>(&self, event: E)
90 where
91 E: Into<S::Event>,
92 {
93 let topics = self.listeners_topics.clone();
94 let event = event.into();
95
96 spawn(async move {
97 let _ = Self::publish_internal(event, &topics);
98 });
99 }
100
101 #[inline(always)]
106 pub fn publish_now<E>(&self, event: E) -> Result<(), Error>
107 where
108 E: Into<S::Event>,
109 {
110 let event = event.into();
111 Self::publish_internal(event, &self.listeners_topics)
112 }
113
114 #[inline(always)]
116 pub fn subscribe_with<I>(
117 &self,
118 request: I,
119 sender: &mpsc::Sender<(Arc<I::SubscriptionId>, S::Event)>,
120 receiver: Option<SubReceiver<S>>,
121 ) -> Result<ActiveSubscription<S>, Error>
122 where
123 I: SubscriptionRequest<
124 Topic = <S::Event as Event>::Topic,
125 SubscriptionId = S::SubscriptionId,
126 >,
127 {
128 let subscription_name = request.subscription_name();
129 let sender = Subscriber::new(subscription_name.clone(), sender);
130 let mut index_storage = self.listeners_topics.write();
131 let subscription_internal_id = self
132 .unique_subscription_counter
133 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
134
135 self.active_subscribers
136 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
137
138 let subscribed_to = request.try_get_topics()?;
139
140 for index in subscribed_to.iter() {
141 index_storage.insert((index.clone(), subscription_internal_id), sender.clone());
142 }
143 drop(index_storage);
144
145 let inner = self.inner.clone();
146 let subscribed_to_for_spawn = subscribed_to.clone();
147
148 spawn(async move {
149 inner.fetch_events(subscribed_to_for_spawn, sender).await;
151 });
152
153 Ok(ActiveSubscription::new(
154 subscription_internal_id,
155 subscription_name,
156 self.active_subscribers.clone(),
157 self.listeners_topics.clone(),
158 subscribed_to,
159 receiver,
160 ))
161 }
162
163 pub fn subscribe<I>(&self, request: I) -> Result<ActiveSubscription<S>, Error>
165 where
166 I: SubscriptionRequest<
167 Topic = <S::Event as Event>::Topic,
168 SubscriptionId = S::SubscriptionId,
169 >,
170 {
171 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
172 self.subscribe_with(request, &sender, Some(receiver))
173 }
174}