Skip to main content

rust_pubsub/
lib.rs

1//! # Rust PubSub
2//!
3//! A thread-safe, in-memory publish-subscribe library for Rust, designed for efficient and flexible
4//! inter-thread communication. It supports both manual message receiving and callback-based subscriptions,
5//! with configurable queue depth and overwrite behavior.
6//!
7//! ## Features
8//!
9//! - **Thread-Safe**: Uses `crossbeam-channel` for safe message passing between threads.
10//! - **Flexible Subscriptions**: Supports manual message receiving and callback-based subscriptions.
11//! - **Configurable Channels**: Allows customization of queue depth and overwrite behavior per subscription.
12//! - **Type-Safe Messaging**: Supports any `Send + Sync + Clone + 'static` type for messages.
13//! - **Timeout Support**: Provides blocking, non-blocking, and timeout-based message receiving and publishing.
14//!
15//! ## 功能(中文)
16//!
17//! - **线程安全**:使用 `crossbeam-channel` 实现线程间安全消息传递。
18//! - **灵活订阅**:支持手动消息接收和基于回调的订阅模式。
19//! - **可配置通道**:允许为每个订阅配置队列深度和覆盖行为。
20//! - **类型安全消息**:支持任何满足 `Send + Sync + Clone + 'static` 的消息类型。
21//! - **超时支持**:提供阻塞、非阻塞和带超时的消息接收与发布。
22//!
23//! ## API Overview
24//!
25//! The library offers two main subscription methods:
26//!
27//! - **`subscribe_manual`**: Returns a receiver that allows manually polling for messages.
28//! - **`subscribe`**: Takes a callback closure that processes messages in a dedicated thread.
29//!   This non-blocking approach runs the provided closure in its own thread, making it ideal
30//!   for operations that shouldn't block the main thread.
31//!
32//! ## Usage Examples
33//!
34//! ### 1. Manual Subscription with Non-Blocking Receive
35//!
36//! ```rust
37//! use rust_pubsub::{PubSub, TopicConfig};
38//!
39//! let pubsub = PubSub::instance();
40//! let topic = "test_topic";
41//! let config = TopicConfig::new(10, false); // Queue depth 10, no overwrite
42//!
43//! // Create publisher
44//! let topic_id = pubsub.create_publisher(topic);
45//!
46//! // Subscribe manually
47//! let receiver = pubsub.subscribe_manual::<String>(topic, config);
48//!
49//! // Publish a message
50//! pubsub.publish(topic_id, "Hello, World!".to_string());
51//!
52//! // Try to receive the message
53//! if let Some(msg) = receiver.try_recv() {
54//!     println!("Received: {}", msg);
55//! }
56//! ```
57//!
58//! ### 2. Callback-Based Subscription
59//!
60//! ```rust
61//! use rust_pubsub::{PubSub, TopicConfig};
62//!
63//! let pubsub = PubSub::instance();
64//! let topic = "callback_topic";
65//! let config = TopicConfig::new(5, true); // Queue depth 5, overwrite enabled
66//!
67//! // Create publisher
68//! let topic_id = pubsub.create_publisher(topic);
69//!
70//! // Subscribe with a callback
71//! let subscriber_id = pubsub.subscribe::<String, _>(topic, config, |msg: &String| {
72//!     println!("Callback received: {}", msg);
73//! });
74//!
75//! // Publish a message
76//! pubsub.publish(topic_id, "Callback message".to_string());
77//!
78//! // Wait briefly to ensure callback executes
79//! std::thread::sleep(std::time::Duration::from_millis(100));
80//!
81//! // Unsubscribe
82//! pubsub.unsubscribe(&subscriber_id);
83//! ```
84//!
85//! ### 3. Non-Blocking Callback Processing in Dedicated Thread
86//!
87//! This example demonstrates how the `subscribe` method's callback is processed in its own dedicated thread,
88//! allowing your main thread to continue execution without being blocked by message processing.
89//!
90//! ```rust
91//! use rust_pubsub::{PubSub, TopicConfig};
92//! use std::sync::{Arc, Mutex};
93//! use std::thread;
94//! use std::time::Duration;
95//!
96//! // Create a shared counter to demonstrate the callback running in a separate thread
97//! let counter = Arc::new(Mutex::new(0));
98//! let counter_clone = counter.clone();
99//!
100//! let pubsub = PubSub::instance();
101//! let topic = "thread_topic";
102//! let config = TopicConfig::new(5, true);
103//! let topic_id = pubsub.create_publisher(topic);
104//!
105//! // Subscribe with a callback that will increment the counter
106//! pubsub.subscribe::<i32, _>(topic, config, move |msg: &i32| {
107//!     // This closure runs in a dedicated thread
108//!     println!("Processing message: {} in a dedicated thread", msg);
109//!     
110//!     // Simulate some processing time
111//!     thread::sleep(Duration::from_millis(500));
112//!     
113//!     // Update the shared counter
114//!     let mut count = counter_clone.lock().unwrap();
115//!     *count += 1;
116//!     println!("Counter updated to: {}", *count);
117//! });
118//!
119//! // Publish multiple messages
120//! for i in 1..=5 {
121//!     pubsub.publish(topic_id, i);
122//!     
123//!     // Main thread can continue doing work without waiting for message processing
124//!     println!("Main thread: Published message {}", i);
125//! }
126//!
127//! // Main thread can do other work while callbacks are processed in background
128//! println!("Main thread: Continuing with other work immediately");
129//!
130//! // Wait briefly to allow some callback processing to occur
131//! thread::sleep(Duration::from_millis(1000));
132//!
133//! // Check how many messages were processed
134//! let final_count = *counter.lock().unwrap();
135//! println!("Messages processed so far: {}", final_count);
136//! ```
137//!
138//! ### 4. Publishing with Timeout
139//!
140//! ```rust
141//! use rust_pubsub::{PubSub, TopicConfig};
142//!
143//! let pubsub = PubSub::instance();
144//! let topic = "timeout_topic";
145//! let config = TopicConfig::new(1, false); // Queue depth 1, no overwrite
146//!
147//! // Create publisher
148//! let topic_id = pubsub.create_publisher(topic);
149//!
150//! // Subscribe manually
151//! let receiver = pubsub.subscribe_manual::<i32>(topic, config);
152//!
153//! // Publish with timeout (100ms)
154//! pubsub.publish_with_timeout(topic_id, 42, Some(100));
155//!
156//! // Receive with timeout (100ms)
157//! if let Some(msg) = receiver.recv_timeout(Some(100)) {
158//!     println!("Received: {}", msg);
159//! }
160//! ```
161//!
162//! ### 5. Overwrite Mode with Full Queue
163//!
164//! ```rust
165//! use rust_pubsub::{PubSub, TopicConfig};
166//!
167//! let pubsub = PubSub::instance();
168//! let topic = "overwrite_topic";
169//! let config = TopicConfig::new(2, true); // Queue depth 2, overwrite enabled
170//!
171//! // Create publisher
172//! let topic_id = pubsub.create_publisher(topic);
173//!
174//! // Subscribe manually
175//! let receiver = pubsub.subscribe_manual::<String>( topic, config);
176//!
177//! // Publish multiple messages to fill queue
178//! pubsub.publish(topic_id, "Message 1".to_string());
179//! pubsub.publish(topic_id, "Message 2".to_string());
180//! pubsub.publish(topic_id, "Message 3".to_string()); // Overwrites oldest
181//!
182//! // Receive messages
183//! while let Some(msg) = receiver.try_recv() {
184//!     println!("Received: {}", msg);
185//! }
186//! ```
187//!
188//! ### 6. Multiple Subscribers
189//!
190//! ```rust
191//! use rust_pubsub::{PubSub, TopicConfig};
192//!
193//! let pubsub = PubSub::instance();
194//! let topic = "multi_subscriber_topic";
195//! let config = TopicConfig::new(10, false);
196//!
197//! // Create publisher
198//! let topic_id = pubsub.create_publisher(topic);
199//!
200//! // Subscribe multiple times
201//! let receiver1 = pubsub.subscribe_manual::<String>(topic, config.clone());
202//! let receiver2 = pubsub.subscribe_manual::<String>(topic, config.clone());
203//!
204//! // Publish a message
205//! pubsub.publish(topic_id, "Broadcast message".to_string());
206//!
207//! // Receive from both subscribers
208//! println!("Receiver 1: {:?}", receiver1.try_recv());
209//! println!("Receiver 2: {:?}", receiver2.try_recv());
210//! ```
211//!
212//! ## Installation
213//!
214//! Add the following to your `Cargo.toml`:
215//!
216//! ```toml
217//! [dependencies]
218//! rust-pubsub = "0.1.0"
219//! ```
220//!
221//! ## 安装(中文)
222//!
223//! 在您的 `Cargo.toml` 中添加以下内容:
224//!
225//! ```toml
226//! [dependencies]
227//! rust-pubsub = "0.1.0"
228//! ```
229//!
230//! ## License
231//!
232//! Licensed under either of Apache License, Version 2.0 or MIT license at your option.
233
234use crossbeam_channel::{Receiver, Sender, bounded};
235use lazy_static::lazy_static;
236use std::any::Any;
237use std::collections::HashMap;
238use std::sync::{Arc, Mutex};
239use std::time::Duration;
240use uuid::Uuid;
241
242// Your original code follows here, unchanged
243lazy_static! {
244    static ref PUBSUB: Arc<PubSub> = Arc::new(PubSub::new());
245}
246
247#[derive(Clone)]
248pub struct TopicConfig {
249    queue_depth: usize,
250    overwrite: bool,
251}
252
253impl TopicConfig {
254    pub fn new(queue_depth: usize, overwrite: bool) -> Self {
255        TopicConfig {
256            queue_depth,
257            overwrite,
258        }
259    }
260}
261
262#[derive(Clone)]
263struct MessageWrapper {
264    data: Arc<dyn Any + Send + Sync>,
265}
266
267#[derive(Clone)]
268struct ChannelPair {
269    sender: Sender<MessageWrapper>,
270    receiver: Receiver<MessageWrapper>,
271    config: TopicConfig,
272    subscriber_id: String,
273}
274
275impl ChannelPair {
276    fn new(
277        sender: Sender<MessageWrapper>,
278        receiver: Receiver<MessageWrapper>,
279        config: TopicConfig,
280        subscriber_id: String,
281    ) -> Self {
282        ChannelPair {
283            sender,
284            receiver,
285            config,
286            subscriber_id,
287        }
288    }
289}
290
291struct TopicData {
292    #[allow(dead_code)]
293    name: String,
294    channel_pairs: Vec<ChannelPair>,
295}
296
297struct SubscriberData {
298    topic: String,
299    #[allow(dead_code)]
300    receiver: Receiver<MessageWrapper>,
301    #[allow(dead_code)]
302    callback: Option<Arc<dyn Fn(&dyn Any) + Send + Sync>>,
303}
304
305#[derive(Clone)]
306pub struct ManualReceiver<T: 'static> {
307    receiver: Receiver<MessageWrapper>,
308    subscriber_id: String,
309    pubsub: Arc<PubSub>,
310    _marker: std::marker::PhantomData<T>,
311}
312
313impl<T: Clone + 'static> ManualReceiver<T> {
314    pub fn try_recv(&self) -> Option<T> {
315        let msg = self.receiver.try_recv().ok();
316
317        match msg {
318            Some(msg) => {
319                if let Some(data) = msg.downcast::<T>() {
320                    return Some(data.to_owned());
321                }
322                None
323            }
324            None => None,
325        }
326    }
327
328    pub fn recv(&self) -> Option<T> {
329        self.recv_timeout(None)
330    }
331
332    pub fn recv_timeout(&self, timeout_ms: Option<u64>) -> Option<T> {
333        let msg = match timeout_ms {
334            Some(ms) => self.receiver.recv_timeout(Duration::from_millis(ms)).ok(),
335            None => self.receiver.recv().ok(),
336        };
337
338        match msg {
339            Some(msg) => {
340                if let Some(data) = msg.downcast::<T>() {
341                    return Some(data.to_owned());
342                }
343                None
344            }
345            None => None,
346        }
347    }
348
349    pub fn unsubscribe(self) {
350        self.pubsub.unsubscribe(&self.subscriber_id);
351    }
352}
353
354impl MessageWrapper {
355    fn new<T: Send + Sync + Clone + 'static>(data: T) -> Self {
356        MessageWrapper {
357            data: Arc::new(data),
358        }
359    }
360
361    fn downcast<T: 'static>(&self) -> Option<&T> {
362        self.data.downcast_ref::<T>()
363    }
364}
365
366pub struct PubSub {
367    topics: Mutex<Vec<TopicData>>,
368    topic_map: Mutex<HashMap<String, usize>>,
369    subscribers: Mutex<HashMap<String, SubscriberData>>,
370}
371
372impl PubSub {
373    fn new() -> Self {
374        PubSub {
375            topics: Mutex::new(Vec::new()),
376            topic_map: Mutex::new(HashMap::new()),
377            subscribers: Mutex::new(HashMap::new()),
378        }
379    }
380
381    pub fn instance() -> Arc<PubSub> {
382        PUBSUB.clone()
383    }
384
385    pub fn create_publisher(&self, topic: &str) -> usize {
386        let mut topic_map = self.topic_map.lock().unwrap();
387
388        if let Some(&index) = topic_map.get(topic) {
389            return index;
390        }
391
392        let mut topics = self.topics.lock().unwrap();
393        let new_index = topics.len();
394
395        topics.push(TopicData {
396            name: topic.to_string(),
397            channel_pairs: Vec::new(),
398        });
399
400        topic_map.insert(topic.to_string(), new_index);
401
402        new_index
403    }
404
405    pub fn subscribe_manual<T: Send + Sync + Clone + 'static>(
406        &self,
407        topic: &str,
408        config: TopicConfig,
409    ) -> ManualReceiver<T>
410    where
411        T: 'static,
412    {
413        let subscriber_id = Uuid::new_v4().to_string();
414        let (tx, rx) = bounded(config.queue_depth);
415        let topic_str = topic.to_string();
416
417        let topic_index = self.create_publisher(topic);
418
419        {
420            let mut topics = self.topics.lock().unwrap();
421            topics[topic_index].channel_pairs.push(ChannelPair::new(
422                tx,
423                rx.clone(),
424                config.clone(),
425                subscriber_id.clone(),
426            ));
427        }
428
429        {
430            self.subscribers.lock().unwrap().insert(
431                subscriber_id.clone(),
432                SubscriberData {
433                    topic: topic_str.clone(),
434                    receiver: rx.clone(),
435                    callback: None,
436                },
437            );
438        }
439
440        ManualReceiver {
441            receiver: rx,
442            subscriber_id,
443            pubsub: PubSub::instance(),
444            _marker: std::marker::PhantomData,
445        }
446    }
447
448    pub fn subscribe<T, F>(&self, topic: &str, config: TopicConfig, callback: F) -> String
449    where
450        T: Send + Sync + Clone + 'static,
451        F: Fn(&T) + Send + Sync + 'static,
452    {
453        let subscriber_id = Uuid::new_v4().to_string();
454        let (tx, rx) = bounded(config.queue_depth);
455        let topic_str = topic.to_string();
456
457        let topic_index = self.create_publisher(topic);
458
459        {
460            let mut topics = self.topics.lock().unwrap();
461            topics[topic_index].channel_pairs.push(ChannelPair::new(
462                tx,
463                rx.clone(),
464                config.clone(),
465                subscriber_id.clone(),
466            ));
467        }
468
469        let callback_wrapper: Arc<dyn Fn(&dyn Any) + Send + Sync> =
470            Arc::new(move |data: &dyn Any| {
471                if let Some(t) = data.downcast_ref::<T>() {
472                    callback(t);
473                }
474            });
475
476        {
477            self.subscribers.lock().unwrap().insert(
478                subscriber_id.clone(),
479                SubscriberData {
480                    topic: topic_str.clone(),
481                    receiver: rx.clone(),
482                    callback: Some(callback_wrapper.clone()),
483                },
484            );
485        }
486
487        let rx_clone = rx.clone();
488        let callback_for_thread = callback_wrapper.clone();
489        std::thread::spawn(move || {
490            while let Ok(msg) = rx_clone.recv() {
491                if let Some(data) = msg.downcast::<T>() {
492                    callback_for_thread(data);
493                }
494            }
495        });
496
497        subscriber_id
498    }
499
500    pub fn try_publish<T: Send + Sync + Clone + 'static>(&self, topic_id: usize, message: T) {
501        let msg = MessageWrapper::new(message);
502
503        let channel_pairs = {
504            let topics = self.topics.lock().unwrap();
505
506            if topic_id >= topics.len() {
507                return;
508            }
509
510            if topics[topic_id].channel_pairs.is_empty() {
511                return;
512            }
513
514            topics[topic_id].channel_pairs.clone()
515        };
516
517        for pair in channel_pairs.iter() {
518            if pair.config.overwrite {
519                while pair.sender.is_full() {
520                    let _ = pair.receiver.try_recv();
521                }
522            }
523
524            let _ = pair.sender.try_send(msg.clone());
525        }
526    }
527
528    pub fn publish<T: Send + Sync + Clone + 'static>(&self, topic_id: usize, message: T) {
529        self.publish_with_timeout(topic_id, message, None);
530    }
531
532    pub fn publish_with_timeout<T: Send + Sync + Clone + 'static>(
533        &self,
534        topic_id: usize,
535        message: T,
536        max_wait_ms: Option<u64>,
537    ) {
538        let msg = MessageWrapper::new(message);
539
540        let channel_pairs = {
541            let topics = self.topics.lock().unwrap();
542
543            if topic_id >= topics.len() {
544                return;
545            }
546
547            if topics[topic_id].channel_pairs.is_empty() {
548                return;
549            }
550
551            topics[topic_id].channel_pairs.clone()
552        };
553
554        for pair in channel_pairs.iter() {
555            if pair.config.overwrite {
556                while pair.sender.is_full() {
557                    let _ = pair.receiver.try_recv();
558                }
559                let _ = pair.sender.try_send(msg.clone());
560            } else {
561                match max_wait_ms {
562                    Some(ms) => {
563                        let _ = pair
564                            .sender
565                            .send_timeout(msg.clone(), Duration::from_millis(ms));
566                    }
567                    None => {
568                        let _ = pair.sender.send(msg.clone());
569                    }
570                }
571            }
572        }
573    }
574
575    pub fn unsubscribe(&self, subscriber_id: &str) {
576        let topic_opt = {
577            let mut subscribers = self.subscribers.lock().unwrap();
578            if let Some(data) = subscribers.remove(subscriber_id) {
579                Some(data.topic)
580            } else {
581                None
582            }
583        };
584
585        if let Some(topic) = topic_opt {
586            let topic_index_opt = {
587                let topic_map = self.topic_map.lock().unwrap();
588                topic_map.get(&topic).cloned()
589            };
590
591            if let Some(topic_index) = topic_index_opt {
592                let mut topics = self.topics.lock().unwrap();
593                if let Some(topic_data) = topics.get_mut(topic_index) {
594                    topic_data
595                        .channel_pairs
596                        .retain(|pair| pair.subscriber_id != subscriber_id);
597
598                    if topic_data.channel_pairs.is_empty() {
599                        let mut topic_map = self.topic_map.lock().unwrap();
600                        topic_map.remove(&topic);
601                    }
602                }
603            }
604        }
605    }
606}