pubsub_rs/lib.rs
1//! A publish-subscribe system for Rust with async/await support.
2//!
3//! This crate provides a simple yet powerful publish-subscribe (pubsub) system
4//! that allows multiple subscribers to receive messages published to specific topics.
5//! It's designed to be:
6//! - Thread-safe: Uses Arc and DashMap for concurrent access
7//! - Async-friendly: Built with async/await support using async-channel
8//! - Memory efficient: Uses weak references to prevent memory leaks
9//! - Clean shutdown: Automatically cleans up resources when dropped
10//!
11//! # Features
12//! - Multiple subscribers per topic
13//! - Multiple topics per subscriber
14//! - Thread-safe operations
15//! - Async message delivery
16//! - Automatic cleanup of dropped subscribers
17//! - Graceful shutdown handling
18//!
19//! # Basic Usage
20//! ```rust
21//! use pubsub_rs::{Pubsub, PubsubError};
22//!
23//! #[tokio::main]
24//! async fn main() {
25//! let pubsub = Pubsub::new();
26//!
27//! // Subscribe to topics
28//! let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
29//!
30//! // Publish messages
31//! pubsub.publish("topic1", "Hello".to_owned()).await;
32//!
33//! // Receive messages
34//! let (topic, message) = subscriber.recv().await.unwrap();
35//! assert_eq!(topic, "topic1");
36//! assert_eq!(message, "Hello");
37//! }
38//! ```
39//!
40//! # Error Handling
41//! The main error type is `PubsubError`, which occurs when:
42//! - The pubsub system has been closed
43//! - A subscriber tries to receive messages after the pubsub system is dropped
44//!
45//! # Performance Considerations
46//! - Uses DashMap for concurrent topic storage
47//! - Each subscriber has its own async channel
48//! - Message delivery is non-blocking
49//! - Automatic cleanup of dropped subscribers
50//!
51//! # Safety
52//! - All operations are thread-safe
53//! - Uses Arc for shared ownership
54//! - Uses Weak references to prevent memory leaks
55//! - Proper cleanup on drop
56//!
57//! # Examples
58//! See the tests module for more comprehensive usage examples.
59use std::error::Error;
60use std::fmt::Display;
61use std::hash::Hash;
62use std::sync::{Arc, Weak};
63use async_channel::{unbounded, Receiver, Sender};
64use dashmap::DashMap;
65
66#[cfg(test)]
67mod tests;
68
69/// A trait that defines the requirements for types that can be used as Pubsub topics.
70///
71/// Any type that implements Clone, Hash, and Eq can be used as a topic in the Pubsub system.
72/// This includes common types like String, &str, and custom types that implement these traits.
73///
74/// The trait is automatically implemented for all types that satisfy the trait bounds,
75/// so users don't need to explicitly implement it for their types.
76///
77/// # Requirements
78/// - Clone: Topics need to be cloned when creating subscriptions and publishing messages
79/// - Hash: Topics are used as keys in a hash map for efficient lookups
80/// - Eq: Topics need to be comparable for equality when matching subscriptions
81///
82/// # Examples
83/// ```rust
84/// // String implements PubsubTopic
85/// let topic: String = "my_topic".to_owned();
86///
87/// // &str implements PubsubTopic
88/// let topic: &str = "my_topic";
89///
90/// // Custom types can be used if they implement the required traits
91/// #[derive(Clone, Hash, Eq, PartialEq)]
92/// struct CustomTopic {
93/// id: u32,
94/// name: String,
95/// }
96/// ```
97pub trait PubsubTopic: Clone + Hash + Eq {}
98impl<T: Clone + Hash + Eq> PubsubTopic for T {}
99
100/// A publish-subscribe system that allows multiple subscribers to receive messages
101/// published to specific topics.
102///
103/// The Pubsub struct is the main interface for creating topics, publishing messages,
104/// and managing subscriptions. It uses an internal Arc reference to shared state,
105/// allowing multiple clones of the Pubsub instance to share the same underlying
106/// subscription data.
107///
108/// # Type Parameters
109/// * `T` - The topic type, must implement PubsubTopic (Clone + Hash + Eq)
110/// * `P` - The payload type, must implement Clone
111///
112/// # Examples
113/// ```rust
114/// // let pubsub = Pubsub::new();
115/// // let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
116/// // pubsub.publish("topic1", "Hello, world!".to_owned()).await;
117/// ```
118#[derive(Clone)]
119pub struct Pubsub<T: PubsubTopic, P: Clone> {
120 inner: Arc<PubsubInner<T, P>>,
121}
122
123/// Implements the Drop trait for Pubsub to ensure proper cleanup when the last instance is dropped.
124///
125/// When the last strong reference to the Pubsub's inner data is dropped, this implementation:
126/// 1. Checks if this is the last strong reference (Arc::strong_count == 1)
127/// 2. If so, iterates through all topics and their subscribers
128/// 3. Closes each subscriber's channel to prevent them from being stuck waiting for messages
129///
130/// This ensures that any remaining subscribers will receive an error on their next recv() call
131/// rather than blocking indefinitely, allowing them to clean up their resources properly.
132impl<T: PubsubTopic, P: Clone> Drop for Pubsub<T, P> {
133 fn drop(&mut self) {
134 if Arc::strong_count(&self.inner) == 1 {
135 for subs in self.inner.m.iter() {
136 for sub_inner in subs.iter() {
137 sub_inner.tx.close();
138 }
139 }
140 }
141 }
142}
143
144impl<T: PubsubTopic, P: Clone> Pubsub<T, P> {
145 /// Creates a new Pubsub instance with empty topic subscriptions.
146 ///
147 /// This initializes the internal shared state and returns a new Pubsub instance
148 /// that can be used to manage topics and subscriptions.
149 ///
150 /// # Examples
151 /// ```rust
152 /// // let pubsub = Pubsub::new();
153 /// ```
154 pub fn new() -> Self {
155 Self { inner: Arc::new(PubsubInner::new()) }
156 }
157
158 /// Subscribes to one or more topics and returns a new Subscriber instance.
159 ///
160 /// # Arguments
161 /// * `topics` - A vector of topics to subscribe to. The subscriber will receive messages
162 /// published to any of these topics.
163 ///
164 /// # Returns
165 /// A new `Subscriber` instance that can be used to receive messages for the subscribed topics.
166 ///
167 /// # Example
168 /// ```rust
169 /// // let pubsub = Pubsub::new();
170 /// // let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
171 /// ```
172 pub async fn subscribe(&self, topics: Vec<T>) -> Subscriber<T, P> {
173 let w = Arc::downgrade(&self.inner);
174 let sub = Subscriber::new(w, topics);
175 self.inner.add_subscriber(&Arc::clone(&sub.inner));
176 sub
177 }
178
179 /// Publishes a message to a specific topic.
180 ///
181 /// # Arguments
182 /// * `topic` - The topic to publish the message to. Subscribers subscribed to this topic
183 /// will receive the message.
184 /// * `payload` - The message payload to send to subscribers.
185 ///
186 /// # Example
187 /// ```rust
188 /// // let pubsub = Pubsub::new();
189 /// // pubsub.publish("topic1", "Hello, world!".to_owned()).await;
190 /// ```
191 pub async fn publish(&self, topic: T, payload: P) {
192 self.inner.publish(topic, payload).await;
193 }
194}
195
196struct PubsubInner<T: PubsubTopic, P: Clone> {
197 m: DashMap<T, Vec<Arc<SubscriberInner<T, P>>>>,
198}
199
200impl<T: PubsubTopic, P: Clone> PubsubInner<T, P> {
201 fn new() -> Self {
202 Self { m: DashMap::new() }
203 }
204
205 async fn publish(&self, topic: T, payload: P) {
206 if let Some(subs) = self.m.get(&topic) {
207 for sub in subs.iter() {
208 sub.publish(Payload::new(topic.clone(), payload.clone())).await;
209 }
210 }
211 }
212
213 fn add_subscriber(&self, sub: &Arc<SubscriberInner<T, P>>) {
214 for topic in &sub.topics {
215 self.m.entry(topic.clone()).or_insert_with(Vec::new).push(Arc::clone(&sub));
216 }
217 }
218
219 fn remove_subscriber(&self, sub: &Arc<SubscriberInner<T, P>>) {
220 for topic in &sub.topics {
221 if let Some(mut subs) = self.m.get_mut(topic) {
222 subs.retain(|other| !Arc::ptr_eq(sub, other));
223 }
224 }
225 }
226}
227
228struct Payload<T, P> {
229 topic: T,
230 payload: P,
231}
232
233impl<T, P> Payload<T, P> {
234 fn new(topic: T, payload: P) -> Self {
235 Self { topic, payload }
236 }
237}
238
239/// A subscriber that receives messages for subscribed topics from a Pubsub system.
240///
241/// The Subscriber struct represents an active subscription to one or more topics.
242/// It contains an internal Arc reference to shared state that manages the message
243/// channel and subscription information.
244///
245/// # Type Parameters
246/// * `T` - The topic type, must implement PubsubTopic (Clone + Hash + Eq)
247/// * `P` - The payload type, must implement Clone
248///
249/// # Examples
250/// ```rust
251/// use pubsub_rs::Pubsub;
252/// async fn some_fn() {
253/// let pubsub: Pubsub<&str, String> = Pubsub::new();
254/// let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
255/// let (topic, message) = subscriber.recv().await.unwrap();
256/// }
257/// ```
258#[derive(Clone)]
259pub struct Subscriber<T: PubsubTopic, P: Clone> {
260 inner: Arc<SubscriberInner<T, P>>,
261}
262
263impl<T: PubsubTopic, P: Clone> Subscriber<T, P> {
264 fn new(p: Weak<PubsubInner<T, P>>, topics: Vec<T>) -> Self {
265 let inner = Arc::new(SubscriberInner::new(p, topics));
266 Self { inner }
267 }
268
269 /// Receives the next message for this subscriber.
270 ///
271 /// This async function waits until a message is published to one of the subscriber's
272 /// subscribed topics, then returns a tuple containing:
273 /// 1. The topic the message was published to
274 /// 2. The message payload
275 ///
276 /// # Returns
277 /// * `Ok((T, P))` - A tuple containing the topic and payload if a message is received
278 /// * `Err(PubsubError)` - If the pubsub system has been closed and no more messages will be sent
279 ///
280 /// # Examples
281 /// ```rust
282 /// // let pubsub = Pubsub::new();
283 /// // let subscriber = pubsub.subscribe(vec!["topic1"]).await;
284 /// // pubsub.publish("topic1", "Hello".to_owned()).await;
285 /// // let (topic, message) = subscriber.recv().await.unwrap();
286 /// // assert_eq!(topic, "topic1");
287 /// // assert_eq!(message, "Hello");
288 /// ```
289 pub async fn recv(&self) -> Result<(T, P)> {
290 self.inner.recv().await
291 }
292}
293
294/// When Subscriber is dropped, remove itself from all the Pubsub subscriptions
295impl<T: PubsubTopic, P: Clone> Drop for Subscriber<T, P> {
296 fn drop(&mut self) {
297 if let Some(p) = self.inner.p.upgrade() {
298 p.remove_subscriber(&self.inner);
299 }
300 }
301}
302
303struct SubscriberInner<T: PubsubTopic, P: Clone> {
304 topics: Vec<T>,
305 tx: Sender<Payload<T, P>>,
306 rx: Receiver<Payload<T, P>>,
307 p: Weak<PubsubInner<T, P>>,
308}
309
310impl<T: PubsubTopic, P: Clone> SubscriberInner<T, P> {
311 fn new(p: Weak<PubsubInner<T, P>>, topics: Vec<T>) -> Self {
312 let (tx, rx) = unbounded();
313 Self { topics, tx, rx, p }
314 }
315
316 async fn recv(&self) -> Result<(T, P)> {
317 let Ok(payload) = self.rx.recv().await else {
318 return Err(PubsubError)
319 };
320 Ok((payload.topic, payload.payload))
321 }
322
323 async fn publish(&self, payload: Payload<T, P>) {
324 let _ = self.tx.send(payload).await;
325 }
326}
327
328/// Error type returned when a Pubsub operation fails.
329///
330/// This error occurs when:
331/// - The Pubsub system has been closed and no more messages can be received
332/// - A subscriber attempts to receive a message after the Pubsub system has been dropped
333///
334/// # Examples
335/// ```rust
336/// // let pubsub = Pubsub::new();
337/// // let subscriber = pubsub.subscribe(vec!["topic1"]).await;
338/// // drop(pubsub);
339/// // let result = subscriber.recv().await;
340/// // assert!(matches!(result, Err(PubsubError)));
341/// ```
342#[derive(PartialEq, Eq, Clone, Copy, Debug)]
343pub struct PubsubError;
344
345impl Display for PubsubError {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 write!(f, "pubsub closed")
348 }
349}
350
351impl Error for PubsubError {}
352
353type Result<T> = std::result::Result<T, PubsubError>;