Skip to main content

yeti_types/
pubsub.rs

1//! PubSub — topic-based publish/subscribe system.
2//!
3//! Provides real-time messaging infrastructure for subscriptions.
4//!
5//! # Async surface
6//!
7//! Many `PubSubManager` methods are `async fn` despite the current
8//! implementation being sync (DashMap + tokio broadcast channels). The
9//! async keyword is part of the public API contract: callers `.await` it
10//! everywhere, and we want the freedom to swap the in-process backend
11//! for a remote one (NATS/Redis) without forcing a workspace-wide call-
12//! site sweep. See `unused_async` note at module scope.
13#![expect(
14    clippy::unused_async,
15    reason = "PubSubManager API is async-contract — callers .await; backend may swap to remote in future"
16)]
17
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
20
21use tokio::sync::broadcast;
22
23use crate::resource::{MessageType, SubscriptionMessage};
24
25/// Default channel capacity for broadcast channels.
26const DEFAULT_CHANNEL_CAPACITY: usize = 256;
27
28/// Topic-based publish/subscribe manager.
29///
30/// Thread-safe manager for real-time subscriptions. Supports topic-based
31/// subscriptions, automatic channel creation, and broadcast to all subscribers.
32#[derive(Debug, Clone)]
33pub struct PubSubManager {
34    /// Topic -> broadcast sender
35    topics: Arc<dashmap::DashMap<String, broadcast::Sender<SubscriptionMessage>>>,
36    /// Default channel capacity for new topics
37    capacity: usize,
38    /// Counter for periodic topic cleanup
39    cleanup_counter: Arc<AtomicU64>,
40}
41
42impl PubSubManager {
43    /// Create a new `PubSubManager` with specified channel capacity.
44    #[must_use]
45    pub fn new(capacity: usize) -> Self {
46        Self {
47            topics: Arc::new(dashmap::DashMap::new()),
48            capacity,
49            cleanup_counter: Arc::new(AtomicU64::new(0)),
50        }
51    }
52
53    /// Create a new `PubSubManager` with default capacity (256).
54    #[must_use]
55    pub fn with_default_capacity() -> Self {
56        Self::new(DEFAULT_CHANNEL_CAPACITY)
57    }
58
59    /// Subscribe to a topic. Returns a receiver for messages.
60    pub async fn subscribe(&self, topic: &str) -> broadcast::Receiver<SubscriptionMessage> {
61        // Periodic cleanup every 100 subscriptions
62        if self
63            .cleanup_counter
64            .fetch_add(1, AtomicOrdering::Relaxed)
65            .is_multiple_of(100)
66        {
67            self.topics.retain(|_, sender| sender.receiver_count() > 0);
68        }
69
70        if let Some(sender) = self.topics.get(topic) {
71            return sender.subscribe();
72        }
73
74        let entry = self
75            .topics
76            .entry(topic.to_owned())
77            .or_insert_with(|| broadcast::channel(self.capacity).0);
78        entry.value().subscribe()
79    }
80
81    /// Publish a message to a topic. Returns number of recipients.
82    pub async fn publish(&self, topic: &str, message: SubscriptionMessage) -> usize {
83        self.topics
84            .get(topic)
85            .map_or(0, |sender| sender.send(message).unwrap_or(0))
86    }
87
88    /// Notify subscribers of a record update (publishes to record and table topics).
89    /// Short-circuits with zero allocation if no subscribers exist.
90    pub async fn notify_update(&self, table: &str, id: &str, data: &serde_json::Value) {
91        if self.topics.is_empty() {
92            return;
93        }
94        let record_topic = format!("{table}/{id}");
95        let has_record_sub = self.topics.contains_key(&record_topic);
96        let has_table_sub = self.topics.contains_key(table);
97        if !has_record_sub && !has_table_sub {
98            return;
99        }
100        let message = SubscriptionMessage {
101            message_type: MessageType::Update,
102            data: data.clone(),
103            id: Some(id.to_owned()),
104            timestamp: chrono::Utc::now(),
105        };
106        if has_record_sub {
107            self.publish(&record_topic, message.clone()).await;
108        }
109        if has_table_sub {
110            self.publish(table, message).await;
111        }
112    }
113
114    /// Notify subscribers of a record deletion.
115    /// Short-circuits with zero allocation if no subscribers exist.
116    pub async fn notify_delete(&self, table: &str, id: &str) {
117        if self.topics.is_empty() {
118            return;
119        }
120        let record_topic = format!("{table}/{id}");
121        let has_record_sub = self.topics.contains_key(&record_topic);
122        let has_table_sub = self.topics.contains_key(table);
123        if !has_record_sub && !has_table_sub {
124            return;
125        }
126        let message = SubscriptionMessage {
127            message_type: MessageType::Delete,
128            data: serde_json::json!({"id": id}),
129            id: Some(id.to_owned()),
130            timestamp: chrono::Utc::now(),
131        };
132        if has_record_sub {
133            self.publish(&record_topic, message.clone()).await;
134        }
135        if has_table_sub {
136            self.publish(table, message).await;
137        }
138    }
139
140    /// Notify subscribers of a custom published message.
141    /// Short-circuits if no subscribers exist for this topic.
142    pub async fn notify_publish(&self, topic: &str, data: serde_json::Value) {
143        if self.topics.is_empty() || !self.topics.contains_key(topic) {
144            return;
145        }
146        let message = SubscriptionMessage {
147            message_type: MessageType::Publish,
148            data,
149            id: None,
150            timestamp: chrono::Utc::now(),
151        };
152        self.publish(topic, message).await;
153    }
154
155    /// Get the number of active topics.
156    pub async fn topic_count(&self) -> usize {
157        self.topics.len()
158    }
159
160    /// Whether any subscription topic is live. Cheap (`DashMap::is_empty`),
161    /// sync — callers on a write hot path gate record decode/notify behind
162    /// this so a write with no subscribers pays a single atomic-load check.
163    #[must_use]
164    pub fn has_topics(&self) -> bool {
165        !self.topics.is_empty()
166    }
167
168    /// Get the number of subscribers for a topic.
169    pub async fn subscriber_count(&self, topic: &str) -> usize {
170        self.topics
171            .get(topic)
172            .map_or(0, |entry| entry.receiver_count())
173    }
174
175    /// Check if a topic exists.
176    pub async fn has_topic(&self, topic: &str) -> bool {
177        self.topics.contains_key(topic)
178    }
179
180    /// Remove a topic. Returns true if it existed.
181    pub async fn remove_topic(&self, topic: &str) -> bool {
182        self.topics.remove(topic).is_some()
183    }
184
185    /// Remove topics with zero active subscribers.
186    pub async fn cleanup_empty_topics(&self) -> usize {
187        let before = self.topics.len();
188        self.topics.retain(|_, sender| sender.receiver_count() > 0);
189        before - self.topics.len()
190    }
191
192    /// List all active topic names.
193    pub async fn topics(&self) -> Vec<String> {
194        self.topics.iter().map(|e| e.key().clone()).collect()
195    }
196}
197
198impl Default for PubSubManager {
199    fn default() -> Self {
200        Self::with_default_capacity()
201    }
202}