pubsub_rs/
lib.rs

1//! A publish-subscribe system for Rust with async/await support.
2//!
3//! This crate provides a simple yet powerful publish-subscribe (pubsub) system
4//! that allows multiple subscribers to receive messages published to specific topics.
5//! It's designed to be:
6//! - Thread-safe: Uses Arc and DashMap for concurrent access
7//! - Async-friendly: Built with async/await support using async-channel
8//! - Memory efficient: Uses weak references to prevent memory leaks
9//! - Clean shutdown: Automatically cleans up resources when dropped
10//!
11//! # Features
12//! - Multiple subscribers per topic
13//! - Multiple topics per subscriber
14//! - Thread-safe operations
15//! - Async message delivery
16//! - Automatic cleanup of dropped subscribers
17//! - Graceful shutdown handling
18//!
19//! # Basic Usage
20//! ```rust
21//! use pubsub_rs::{Pubsub, PubsubError};
22//!
23//! #[tokio::main]
24//! async fn main() {
25//!     let pubsub = Pubsub::new();
26//!
27//!     // Subscribe to topics
28//!     let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
29//!
30//!     // Publish messages
31//!     pubsub.publish("topic1", "Hello".to_owned()).await;
32//!
33//!     // Receive messages
34//!     let (topic, message) = subscriber.recv().await.unwrap();
35//!     assert_eq!(topic, "topic1");
36//!     assert_eq!(message, "Hello");
37//! }
38//! ```
39//!
40//! # Error Handling
41//! The main error type is `PubsubError`, which occurs when:
42//! - The pubsub system has been closed
43//! - A subscriber tries to receive messages after the pubsub system is dropped
44//!
45//! # Performance Considerations
46//! - Uses DashMap for concurrent topic storage
47//! - Each subscriber has its own async channel
48//! - Message delivery is non-blocking
49//! - Automatic cleanup of dropped subscribers
50//!
51//! # Safety
52//! - All operations are thread-safe
53//! - Uses Arc for shared ownership
54//! - Uses Weak references to prevent memory leaks
55//! - Proper cleanup on drop
56//!
57//! # Examples
58//! See the tests module for more comprehensive usage examples.
59use std::error::Error;
60use std::fmt::Display;
61use std::hash::Hash;
62use std::sync::{Arc, Weak};
63use async_channel::{unbounded, Receiver, Sender};
64use dashmap::DashMap;
65
66#[cfg(test)]
67mod tests;
68
69/// A trait that defines the requirements for types that can be used as Pubsub topics.
70///
71/// Any type that implements Clone, Hash, and Eq can be used as a topic in the Pubsub system.
72/// This includes common types like String, &str, and custom types that implement these traits.
73///
74/// The trait is automatically implemented for all types that satisfy the trait bounds,
75/// so users don't need to explicitly implement it for their types.
76///
77/// # Requirements
78/// - Clone: Topics need to be cloned when creating subscriptions and publishing messages
79/// - Hash: Topics are used as keys in a hash map for efficient lookups
80/// - Eq: Topics need to be comparable for equality when matching subscriptions
81///
82/// # Examples
83/// ```rust
84/// // String implements PubsubTopic
85/// let topic: String = "my_topic".to_owned();
86///
87/// // &str implements PubsubTopic
88/// let topic: &str = "my_topic";
89///
90/// // Custom types can be used if they implement the required traits
91/// #[derive(Clone, Hash, Eq, PartialEq)]
92/// struct CustomTopic {
93///     id: u32,
94///     name: String,
95/// }
96/// ```
97pub trait PubsubTopic: Clone + Hash + Eq {}
98impl<T: Clone + Hash + Eq> PubsubTopic for T {}
99
100/// A publish-subscribe system that allows multiple subscribers to receive messages
101/// published to specific topics.
102///
103/// The Pubsub struct is the main interface for creating topics, publishing messages,
104/// and managing subscriptions. It uses an internal Arc reference to shared state,
105/// allowing multiple clones of the Pubsub instance to share the same underlying
106/// subscription data.
107///
108/// # Type Parameters
109/// * `T` - The topic type, must implement PubsubTopic (Clone + Hash + Eq)
110/// * `P` - The payload type, must implement Clone
111///
112/// # Examples
113/// ```rust
114/// // let pubsub = Pubsub::new();
115/// // let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
116/// // pubsub.publish("topic1", "Hello, world!".to_owned()).await;
117/// ```
118#[derive(Clone)]
119pub struct Pubsub<T: PubsubTopic, P: Clone> {
120    inner: Arc<PubsubInner<T, P>>,
121}
122
123/// Implements the Drop trait for Pubsub to ensure proper cleanup when the last instance is dropped.
124///
125/// When the last strong reference to the Pubsub's inner data is dropped, this implementation:
126/// 1. Checks if this is the last strong reference (Arc::strong_count == 1)
127/// 2. If so, iterates through all topics and their subscribers
128/// 3. Closes each subscriber's channel to prevent them from being stuck waiting for messages
129///
130/// This ensures that any remaining subscribers will receive an error on their next recv() call
131/// rather than blocking indefinitely, allowing them to clean up their resources properly.
132impl<T: PubsubTopic, P: Clone> Drop for Pubsub<T, P> {
133    fn drop(&mut self) {
134        if Arc::strong_count(&self.inner) == 1 {
135            for subs in self.inner.m.iter() {
136                for sub_inner in subs.iter() {
137                    sub_inner.tx.close();
138                }
139            }
140        }
141    }
142}
143
144impl<T: PubsubTopic, P: Clone> Pubsub<T, P> {
145    /// Creates a new Pubsub instance with empty topic subscriptions.
146    ///
147    /// This initializes the internal shared state and returns a new Pubsub instance
148    /// that can be used to manage topics and subscriptions.
149    ///
150    /// # Examples
151    /// ```rust
152    /// // let pubsub = Pubsub::new();
153    /// ```
154    pub fn new() -> Self {
155        Self { inner: Arc::new(PubsubInner::new()) }
156    }
157
158    /// Subscribes to one or more topics and returns a new Subscriber instance.
159    ///
160    /// # Arguments
161    /// * `topics` - A vector of topics to subscribe to. The subscriber will receive messages
162    ///             published to any of these topics.
163    ///
164    /// # Returns
165    /// A new `Subscriber` instance that can be used to receive messages for the subscribed topics.
166    ///
167    /// # Example
168    /// ```rust
169    /// // let pubsub = Pubsub::new();
170    /// // let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
171    /// ```
172    pub async fn subscribe(&self, topics: Vec<T>) -> Subscriber<T, P> {
173        let w = Arc::downgrade(&self.inner);
174        let sub = Subscriber::new(w, topics);
175        self.inner.add_subscriber(&Arc::clone(&sub.inner));
176        sub
177    }
178
179    /// Publishes a message to a specific topic.
180    ///
181    /// # Arguments
182    /// * `topic` - The topic to publish the message to. Subscribers subscribed to this topic
183    ///            will receive the message.
184    /// * `payload` - The message payload to send to subscribers.
185    ///
186    /// # Example
187    /// ```rust
188    /// // let pubsub = Pubsub::new();
189    /// // pubsub.publish("topic1", "Hello, world!".to_owned()).await;
190    /// ```
191    pub async fn publish(&self, topic: T, payload: P) {
192        self.inner.publish(topic, payload).await;
193    }
194}
195
196struct PubsubInner<T: PubsubTopic, P: Clone> {
197    m: DashMap<T, Vec<Arc<SubscriberInner<T, P>>>>,
198}
199
200impl<T: PubsubTopic, P: Clone> PubsubInner<T, P> {
201    fn new() -> Self {
202        Self { m: DashMap::new() }
203    }
204
205    async fn publish(&self, topic: T, payload: P) {
206        if let Some(subs) = self.m.get(&topic) {
207            for sub in subs.iter() {
208                sub.publish(Payload::new(topic.clone(), payload.clone())).await;
209            }
210        }
211    }
212
213    fn add_subscriber(&self, sub: &Arc<SubscriberInner<T, P>>) {
214        for topic in &sub.topics {
215            self.m.entry(topic.clone()).or_insert_with(Vec::new).push(Arc::clone(&sub));
216        }
217    }
218
219    fn remove_subscriber(&self, sub: &Arc<SubscriberInner<T, P>>) {
220        for topic in &sub.topics {
221            if let Some(mut subs) = self.m.get_mut(topic) {
222                subs.retain(|other| !Arc::ptr_eq(sub, other));
223            }
224        }
225    }
226}
227
228struct Payload<T, P> {
229    topic: T,
230    payload: P,
231}
232
233impl<T, P> Payload<T, P> {
234    fn new(topic: T, payload: P) -> Self {
235        Self { topic, payload }
236    }
237}
238
239/// A subscriber that receives messages for subscribed topics from a Pubsub system.
240///
241/// The Subscriber struct represents an active subscription to one or more topics.
242/// It contains an internal Arc reference to shared state that manages the message
243/// channel and subscription information.
244///
245/// # Type Parameters
246/// * `T` - The topic type, must implement PubsubTopic (Clone + Hash + Eq)
247/// * `P` - The payload type, must implement Clone
248///
249/// # Examples
250/// ```rust
251/// use pubsub_rs::Pubsub;
252/// async fn some_fn() {
253///     let pubsub: Pubsub<&str, String> = Pubsub::new();
254///     let subscriber = pubsub.subscribe(vec!["topic1", "topic2"]).await;
255///     let (topic, message) = subscriber.recv().await.unwrap();
256/// }
257/// ```
258#[derive(Clone)]
259pub struct Subscriber<T: PubsubTopic, P: Clone> {
260    inner: Arc<SubscriberInner<T, P>>,
261}
262
263impl<T: PubsubTopic, P: Clone> Subscriber<T, P> {
264    fn new(p: Weak<PubsubInner<T, P>>, topics: Vec<T>) -> Self {
265        let inner = Arc::new(SubscriberInner::new(p, topics));
266        Self { inner }
267    }
268
269    /// Receives the next message for this subscriber.
270    ///
271    /// This async function waits until a message is published to one of the subscriber's
272    /// subscribed topics, then returns a tuple containing:
273    /// 1. The topic the message was published to
274    /// 2. The message payload
275    ///
276    /// # Returns
277    /// * `Ok((T, P))` - A tuple containing the topic and payload if a message is received
278    /// * `Err(PubsubError)` - If the pubsub system has been closed and no more messages will be sent
279    ///
280    /// # Examples
281    /// ```rust
282    /// // let pubsub = Pubsub::new();
283    /// // let subscriber = pubsub.subscribe(vec!["topic1"]).await;
284    /// // pubsub.publish("topic1", "Hello".to_owned()).await;
285    /// // let (topic, message) = subscriber.recv().await.unwrap();
286    /// // assert_eq!(topic, "topic1");
287    /// // assert_eq!(message, "Hello");
288    /// ```
289    pub async fn recv(&self) -> Result<(T, P)> {
290        self.inner.recv().await
291    }
292}
293
294/// When Subscriber is dropped, remove itself from all the Pubsub subscriptions
295impl<T: PubsubTopic, P: Clone> Drop for Subscriber<T, P> {
296    fn drop(&mut self) {
297        if let Some(p) = self.inner.p.upgrade() {
298            p.remove_subscriber(&self.inner);
299        }
300    }
301}
302
303struct SubscriberInner<T: PubsubTopic, P: Clone> {
304    topics: Vec<T>,
305    tx: Sender<Payload<T, P>>,
306    rx: Receiver<Payload<T, P>>,
307    p: Weak<PubsubInner<T, P>>,
308}
309
310impl<T: PubsubTopic, P: Clone> SubscriberInner<T, P> {
311    fn new(p: Weak<PubsubInner<T, P>>, topics: Vec<T>) -> Self {
312        let (tx, rx) = unbounded();
313        Self { topics, tx, rx, p }
314    }
315
316    async fn recv(&self) -> Result<(T, P)> {
317        let Ok(payload) = self.rx.recv().await else {
318            return Err(PubsubError)
319        };
320        Ok((payload.topic, payload.payload))
321    }
322
323    async fn publish(&self, payload: Payload<T, P>) {
324        let _ = self.tx.send(payload).await;
325    }
326}
327
328/// Error type returned when a Pubsub operation fails.
329///
330/// This error occurs when:
331/// - The Pubsub system has been closed and no more messages can be received
332/// - A subscriber attempts to receive a message after the Pubsub system has been dropped
333///
334/// # Examples
335/// ```rust
336/// // let pubsub = Pubsub::new();
337/// // let subscriber = pubsub.subscribe(vec!["topic1"]).await;
338/// // drop(pubsub);
339/// // let result = subscriber.recv().await;
340/// // assert!(matches!(result, Err(PubsubError)));
341/// ```
342#[derive(PartialEq, Eq, Clone, Copy, Debug)]
343pub struct PubsubError;
344
345impl Display for PubsubError {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        write!(f, "pubsub closed")
348    }
349}
350
351impl Error for PubsubError {}
352
353type Result<T> = std::result::Result<T, PubsubError>;