1#![expect(
14 clippy::unused_async,
15 reason = "PubSubManager API is async-contract — callers .await; backend may swap to remote in future"
16)]
17
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
20
21use tokio::sync::broadcast;
22
23use crate::resource::{MessageType, SubscriptionMessage};
24
25const DEFAULT_CHANNEL_CAPACITY: usize = 256;
27
28#[derive(Debug, Clone)]
33pub struct PubSubManager {
34 topics: Arc<dashmap::DashMap<String, broadcast::Sender<SubscriptionMessage>>>,
36 capacity: usize,
38 cleanup_counter: Arc<AtomicU64>,
40}
41
42impl PubSubManager {
43 #[must_use]
45 pub fn new(capacity: usize) -> Self {
46 Self {
47 topics: Arc::new(dashmap::DashMap::new()),
48 capacity,
49 cleanup_counter: Arc::new(AtomicU64::new(0)),
50 }
51 }
52
53 #[must_use]
55 pub fn with_default_capacity() -> Self {
56 Self::new(DEFAULT_CHANNEL_CAPACITY)
57 }
58
59 pub async fn subscribe(&self, topic: &str) -> broadcast::Receiver<SubscriptionMessage> {
61 if self
63 .cleanup_counter
64 .fetch_add(1, AtomicOrdering::Relaxed)
65 .is_multiple_of(100)
66 {
67 self.topics.retain(|_, sender| sender.receiver_count() > 0);
68 }
69
70 if let Some(sender) = self.topics.get(topic) {
71 return sender.subscribe();
72 }
73
74 let entry = self
75 .topics
76 .entry(topic.to_owned())
77 .or_insert_with(|| broadcast::channel(self.capacity).0);
78 entry.value().subscribe()
79 }
80
81 pub async fn publish(&self, topic: &str, message: SubscriptionMessage) -> usize {
83 self.topics
84 .get(topic)
85 .map_or(0, |sender| sender.send(message).unwrap_or(0))
86 }
87
88 pub async fn notify_update(&self, table: &str, id: &str, data: &serde_json::Value) {
91 if self.topics.is_empty() {
92 return;
93 }
94 let record_topic = format!("{table}/{id}");
95 let has_record_sub = self.topics.contains_key(&record_topic);
96 let has_table_sub = self.topics.contains_key(table);
97 if !has_record_sub && !has_table_sub {
98 return;
99 }
100 let message = SubscriptionMessage {
101 message_type: MessageType::Update,
102 data: data.clone(),
103 id: Some(id.to_owned()),
104 timestamp: chrono::Utc::now(),
105 };
106 if has_record_sub {
107 self.publish(&record_topic, message.clone()).await;
108 }
109 if has_table_sub {
110 self.publish(table, message).await;
111 }
112 }
113
114 pub async fn notify_delete(&self, table: &str, id: &str) {
117 if self.topics.is_empty() {
118 return;
119 }
120 let record_topic = format!("{table}/{id}");
121 let has_record_sub = self.topics.contains_key(&record_topic);
122 let has_table_sub = self.topics.contains_key(table);
123 if !has_record_sub && !has_table_sub {
124 return;
125 }
126 let message = SubscriptionMessage {
127 message_type: MessageType::Delete,
128 data: serde_json::json!({"id": id}),
129 id: Some(id.to_owned()),
130 timestamp: chrono::Utc::now(),
131 };
132 if has_record_sub {
133 self.publish(&record_topic, message.clone()).await;
134 }
135 if has_table_sub {
136 self.publish(table, message).await;
137 }
138 }
139
140 pub async fn notify_publish(&self, topic: &str, data: serde_json::Value) {
143 if self.topics.is_empty() || !self.topics.contains_key(topic) {
144 return;
145 }
146 let message = SubscriptionMessage {
147 message_type: MessageType::Publish,
148 data,
149 id: None,
150 timestamp: chrono::Utc::now(),
151 };
152 self.publish(topic, message).await;
153 }
154
155 pub async fn topic_count(&self) -> usize {
157 self.topics.len()
158 }
159
160 #[must_use]
164 pub fn has_topics(&self) -> bool {
165 !self.topics.is_empty()
166 }
167
168 pub async fn subscriber_count(&self, topic: &str) -> usize {
170 self.topics
171 .get(topic)
172 .map_or(0, |entry| entry.receiver_count())
173 }
174
175 pub async fn has_topic(&self, topic: &str) -> bool {
177 self.topics.contains_key(topic)
178 }
179
180 pub async fn remove_topic(&self, topic: &str) -> bool {
182 self.topics.remove(topic).is_some()
183 }
184
185 pub async fn cleanup_empty_topics(&self) -> usize {
187 let before = self.topics.len();
188 self.topics.retain(|_, sender| sender.receiver_count() > 0);
189 before - self.topics.len()
190 }
191
192 pub async fn topics(&self) -> Vec<String> {
194 self.topics.iter().map(|e| e.key().clone()).collect()
195 }
196}
197
198impl Default for PubSubManager {
199 fn default() -> Self {
200 Self::with_default_capacity()
201 }
202}