rust_pubsub/lib.rs
1//! # Rust PubSub
2//!
3//! A thread-safe, in-memory publish-subscribe library for Rust, designed for efficient and flexible
4//! inter-thread communication. It supports both manual message receiving and callback-based subscriptions,
5//! with configurable queue depth and overwrite behavior.
6//!
7//! ## Features
8//!
9//! - **Thread-Safe**: Uses `crossbeam-channel` for safe message passing between threads.
10//! - **Flexible Subscriptions**: Supports manual message receiving and callback-based subscriptions.
11//! - **Configurable Channels**: Allows customization of queue depth and overwrite behavior per subscription.
12//! - **Type-Safe Messaging**: Supports any `Send + Sync + Clone + 'static` type for messages.
13//! - **Timeout Support**: Provides blocking, non-blocking, and timeout-based message receiving and publishing.
14//!
15//! ## 功能(中文)
16//!
17//! - **线程安全**:使用 `crossbeam-channel` 实现线程间安全消息传递。
18//! - **灵活订阅**:支持手动消息接收和基于回调的订阅模式。
19//! - **可配置通道**:允许为每个订阅配置队列深度和覆盖行为。
20//! - **类型安全消息**:支持任何满足 `Send + Sync + Clone + 'static` 的消息类型。
21//! - **超时支持**:提供阻塞、非阻塞和带超时的消息接收与发布。
22//!
23//! ## API Overview
24//!
25//! The library offers two main subscription methods:
26//!
27//! - **`subscribe_manual`**: Returns a receiver that allows manually polling for messages.
28//! - **`subscribe`**: Takes a callback closure that processes messages in a dedicated thread.
29//! This non-blocking approach runs the provided closure in its own thread, making it ideal
30//! for operations that shouldn't block the main thread.
31//!
32//! ## Usage Examples
33//!
34//! ### 1. Manual Subscription with Non-Blocking Receive
35//!
36//! ```rust
37//! use rust_pubsub::{PubSub, TopicConfig};
38//!
39//! let pubsub = PubSub::instance();
40//! let topic = "test_topic";
41//! let config = TopicConfig::new(10, false); // Queue depth 10, no overwrite
42//!
43//! // Create publisher
44//! let topic_id = pubsub.create_publisher(topic);
45//!
46//! // Subscribe manually
47//! let receiver = pubsub.subscribe_manual::<String>(topic, config);
48//!
49//! // Publish a message
50//! pubsub.publish(topic_id, "Hello, World!".to_string());
51//!
52//! // Try to receive the message
53//! if let Some(msg) = receiver.try_recv() {
54//! println!("Received: {}", msg);
55//! }
56//! ```
57//!
58//! ### 2. Callback-Based Subscription
59//!
60//! ```rust
61//! use rust_pubsub::{PubSub, TopicConfig};
62//!
63//! let pubsub = PubSub::instance();
64//! let topic = "callback_topic";
65//! let config = TopicConfig::new(5, true); // Queue depth 5, overwrite enabled
66//!
67//! // Create publisher
68//! let topic_id = pubsub.create_publisher(topic);
69//!
70//! // Subscribe with a callback
71//! let subscriber_id = pubsub.subscribe::<String, _>(topic, config, |msg: &String| {
72//! println!("Callback received: {}", msg);
73//! });
74//!
75//! // Publish a message
76//! pubsub.publish(topic_id, "Callback message".to_string());
77//!
78//! // Wait briefly to ensure callback executes
79//! std::thread::sleep(std::time::Duration::from_millis(100));
80//!
81//! // Unsubscribe
82//! pubsub.unsubscribe(&subscriber_id);
83//! ```
84//!
85//! ### 3. Non-Blocking Callback Processing in Dedicated Thread
86//!
87//! This example demonstrates how the `subscribe` method's callback is processed in its own dedicated thread,
88//! allowing your main thread to continue execution without being blocked by message processing.
89//!
90//! ```rust
91//! use rust_pubsub::{PubSub, TopicConfig};
92//! use std::sync::{Arc, Mutex};
93//! use std::thread;
94//! use std::time::Duration;
95//!
96//! // Create a shared counter to demonstrate the callback running in a separate thread
97//! let counter = Arc::new(Mutex::new(0));
98//! let counter_clone = counter.clone();
99//!
100//! let pubsub = PubSub::instance();
101//! let topic = "thread_topic";
102//! let config = TopicConfig::new(5, true);
103//! let topic_id = pubsub.create_publisher(topic);
104//!
105//! // Subscribe with a callback that will increment the counter
106//! pubsub.subscribe::<i32, _>(topic, config, move |msg: &i32| {
107//! // This closure runs in a dedicated thread
108//! println!("Processing message: {} in a dedicated thread", msg);
109//!
110//! // Simulate some processing time
111//! thread::sleep(Duration::from_millis(500));
112//!
113//! // Update the shared counter
114//! let mut count = counter_clone.lock().unwrap();
115//! *count += 1;
116//! println!("Counter updated to: {}", *count);
117//! });
118//!
119//! // Publish multiple messages
120//! for i in 1..=5 {
121//! pubsub.publish(topic_id, i);
122//!
123//! // Main thread can continue doing work without waiting for message processing
124//! println!("Main thread: Published message {}", i);
125//! }
126//!
127//! // Main thread can do other work while callbacks are processed in background
128//! println!("Main thread: Continuing with other work immediately");
129//!
130//! // Wait briefly to allow some callback processing to occur
131//! thread::sleep(Duration::from_millis(1000));
132//!
133//! // Check how many messages were processed
134//! let final_count = *counter.lock().unwrap();
135//! println!("Messages processed so far: {}", final_count);
136//! ```
137//!
138//! ### 4. Publishing with Timeout
139//!
140//! ```rust
141//! use rust_pubsub::{PubSub, TopicConfig};
142//!
143//! let pubsub = PubSub::instance();
144//! let topic = "timeout_topic";
145//! let config = TopicConfig::new(1, false); // Queue depth 1, no overwrite
146//!
147//! // Create publisher
148//! let topic_id = pubsub.create_publisher(topic);
149//!
150//! // Subscribe manually
151//! let receiver = pubsub.subscribe_manual::<i32>(topic, config);
152//!
153//! // Publish with timeout (100ms)
154//! pubsub.publish_with_timeout(topic_id, 42, Some(100));
155//!
156//! // Receive with timeout (100ms)
157//! if let Some(msg) = receiver.recv_timeout(Some(100)) {
158//! println!("Received: {}", msg);
159//! }
160//! ```
161//!
162//! ### 5. Overwrite Mode with Full Queue
163//!
164//! ```rust
165//! use rust_pubsub::{PubSub, TopicConfig};
166//!
167//! let pubsub = PubSub::instance();
168//! let topic = "overwrite_topic";
169//! let config = TopicConfig::new(2, true); // Queue depth 2, overwrite enabled
170//!
171//! // Create publisher
172//! let topic_id = pubsub.create_publisher(topic);
173//!
174//! // Subscribe manually
175//! let receiver = pubsub.subscribe_manual::<String>( topic, config);
176//!
177//! // Publish multiple messages to fill queue
178//! pubsub.publish(topic_id, "Message 1".to_string());
179//! pubsub.publish(topic_id, "Message 2".to_string());
180//! pubsub.publish(topic_id, "Message 3".to_string()); // Overwrites oldest
181//!
182//! // Receive messages
183//! while let Some(msg) = receiver.try_recv() {
184//! println!("Received: {}", msg);
185//! }
186//! ```
187//!
188//! ### 6. Multiple Subscribers
189//!
190//! ```rust
191//! use rust_pubsub::{PubSub, TopicConfig};
192//!
193//! let pubsub = PubSub::instance();
194//! let topic = "multi_subscriber_topic";
195//! let config = TopicConfig::new(10, false);
196//!
197//! // Create publisher
198//! let topic_id = pubsub.create_publisher(topic);
199//!
200//! // Subscribe multiple times
201//! let receiver1 = pubsub.subscribe_manual::<String>(topic, config.clone());
202//! let receiver2 = pubsub.subscribe_manual::<String>(topic, config.clone());
203//!
204//! // Publish a message
205//! pubsub.publish(topic_id, "Broadcast message".to_string());
206//!
207//! // Receive from both subscribers
208//! println!("Receiver 1: {:?}", receiver1.try_recv());
209//! println!("Receiver 2: {:?}", receiver2.try_recv());
210//! ```
211//!
212//! ## Installation
213//!
214//! Add the following to your `Cargo.toml`:
215//!
216//! ```toml
217//! [dependencies]
218//! rust-pubsub = "0.1.0"
219//! ```
220//!
221//! ## 安装(中文)
222//!
223//! 在您的 `Cargo.toml` 中添加以下内容:
224//!
225//! ```toml
226//! [dependencies]
227//! rust-pubsub = "0.1.0"
228//! ```
229//!
230//! ## License
231//!
232//! Licensed under either of Apache License, Version 2.0 or MIT license at your option.
233
234use crossbeam_channel::{Receiver, Sender, bounded};
235use lazy_static::lazy_static;
236use std::any::Any;
237use std::collections::HashMap;
238use std::sync::{Arc, Mutex};
239use std::time::Duration;
240use uuid::Uuid;
241
242// Your original code follows here, unchanged
243lazy_static! {
244 static ref PUBSUB: Arc<PubSub> = Arc::new(PubSub::new());
245}
246
247#[derive(Clone)]
248pub struct TopicConfig {
249 queue_depth: usize,
250 overwrite: bool,
251}
252
253impl TopicConfig {
254 pub fn new(queue_depth: usize, overwrite: bool) -> Self {
255 TopicConfig {
256 queue_depth,
257 overwrite,
258 }
259 }
260}
261
262#[derive(Clone)]
263struct MessageWrapper {
264 data: Arc<dyn Any + Send + Sync>,
265}
266
267#[derive(Clone)]
268struct ChannelPair {
269 sender: Sender<MessageWrapper>,
270 receiver: Receiver<MessageWrapper>,
271 config: TopicConfig,
272 subscriber_id: String,
273}
274
275impl ChannelPair {
276 fn new(
277 sender: Sender<MessageWrapper>,
278 receiver: Receiver<MessageWrapper>,
279 config: TopicConfig,
280 subscriber_id: String,
281 ) -> Self {
282 ChannelPair {
283 sender,
284 receiver,
285 config,
286 subscriber_id,
287 }
288 }
289}
290
291struct TopicData {
292 #[allow(dead_code)]
293 name: String,
294 channel_pairs: Vec<ChannelPair>,
295}
296
297struct SubscriberData {
298 topic: String,
299 #[allow(dead_code)]
300 receiver: Receiver<MessageWrapper>,
301 #[allow(dead_code)]
302 callback: Option<Arc<dyn Fn(&dyn Any) + Send + Sync>>,
303}
304
305#[derive(Clone)]
306pub struct ManualReceiver<T: 'static> {
307 receiver: Receiver<MessageWrapper>,
308 subscriber_id: String,
309 pubsub: Arc<PubSub>,
310 _marker: std::marker::PhantomData<T>,
311}
312
313impl<T: Clone + 'static> ManualReceiver<T> {
314 pub fn try_recv(&self) -> Option<T> {
315 let msg = self.receiver.try_recv().ok();
316
317 match msg {
318 Some(msg) => {
319 if let Some(data) = msg.downcast::<T>() {
320 return Some(data.to_owned());
321 }
322 None
323 }
324 None => None,
325 }
326 }
327
328 pub fn recv(&self) -> Option<T> {
329 self.recv_timeout(None)
330 }
331
332 pub fn recv_timeout(&self, timeout_ms: Option<u64>) -> Option<T> {
333 let msg = match timeout_ms {
334 Some(ms) => self.receiver.recv_timeout(Duration::from_millis(ms)).ok(),
335 None => self.receiver.recv().ok(),
336 };
337
338 match msg {
339 Some(msg) => {
340 if let Some(data) = msg.downcast::<T>() {
341 return Some(data.to_owned());
342 }
343 None
344 }
345 None => None,
346 }
347 }
348
349 pub fn unsubscribe(self) {
350 self.pubsub.unsubscribe(&self.subscriber_id);
351 }
352}
353
354impl MessageWrapper {
355 fn new<T: Send + Sync + Clone + 'static>(data: T) -> Self {
356 MessageWrapper {
357 data: Arc::new(data),
358 }
359 }
360
361 fn downcast<T: 'static>(&self) -> Option<&T> {
362 self.data.downcast_ref::<T>()
363 }
364}
365
366pub struct PubSub {
367 topics: Mutex<Vec<TopicData>>,
368 topic_map: Mutex<HashMap<String, usize>>,
369 subscribers: Mutex<HashMap<String, SubscriberData>>,
370}
371
372impl PubSub {
373 fn new() -> Self {
374 PubSub {
375 topics: Mutex::new(Vec::new()),
376 topic_map: Mutex::new(HashMap::new()),
377 subscribers: Mutex::new(HashMap::new()),
378 }
379 }
380
381 pub fn instance() -> Arc<PubSub> {
382 PUBSUB.clone()
383 }
384
385 pub fn create_publisher(&self, topic: &str) -> usize {
386 let mut topic_map = self.topic_map.lock().unwrap();
387
388 if let Some(&index) = topic_map.get(topic) {
389 return index;
390 }
391
392 let mut topics = self.topics.lock().unwrap();
393 let new_index = topics.len();
394
395 topics.push(TopicData {
396 name: topic.to_string(),
397 channel_pairs: Vec::new(),
398 });
399
400 topic_map.insert(topic.to_string(), new_index);
401
402 new_index
403 }
404
405 pub fn subscribe_manual<T: Send + Sync + Clone + 'static>(
406 &self,
407 topic: &str,
408 config: TopicConfig,
409 ) -> ManualReceiver<T>
410 where
411 T: 'static,
412 {
413 let subscriber_id = Uuid::new_v4().to_string();
414 let (tx, rx) = bounded(config.queue_depth);
415 let topic_str = topic.to_string();
416
417 let topic_index = self.create_publisher(topic);
418
419 {
420 let mut topics = self.topics.lock().unwrap();
421 topics[topic_index].channel_pairs.push(ChannelPair::new(
422 tx,
423 rx.clone(),
424 config.clone(),
425 subscriber_id.clone(),
426 ));
427 }
428
429 {
430 self.subscribers.lock().unwrap().insert(
431 subscriber_id.clone(),
432 SubscriberData {
433 topic: topic_str.clone(),
434 receiver: rx.clone(),
435 callback: None,
436 },
437 );
438 }
439
440 ManualReceiver {
441 receiver: rx,
442 subscriber_id,
443 pubsub: PubSub::instance(),
444 _marker: std::marker::PhantomData,
445 }
446 }
447
448 pub fn subscribe<T, F>(&self, topic: &str, config: TopicConfig, callback: F) -> String
449 where
450 T: Send + Sync + Clone + 'static,
451 F: Fn(&T) + Send + Sync + 'static,
452 {
453 let subscriber_id = Uuid::new_v4().to_string();
454 let (tx, rx) = bounded(config.queue_depth);
455 let topic_str = topic.to_string();
456
457 let topic_index = self.create_publisher(topic);
458
459 {
460 let mut topics = self.topics.lock().unwrap();
461 topics[topic_index].channel_pairs.push(ChannelPair::new(
462 tx,
463 rx.clone(),
464 config.clone(),
465 subscriber_id.clone(),
466 ));
467 }
468
469 let callback_wrapper: Arc<dyn Fn(&dyn Any) + Send + Sync> =
470 Arc::new(move |data: &dyn Any| {
471 if let Some(t) = data.downcast_ref::<T>() {
472 callback(t);
473 }
474 });
475
476 {
477 self.subscribers.lock().unwrap().insert(
478 subscriber_id.clone(),
479 SubscriberData {
480 topic: topic_str.clone(),
481 receiver: rx.clone(),
482 callback: Some(callback_wrapper.clone()),
483 },
484 );
485 }
486
487 let rx_clone = rx.clone();
488 let callback_for_thread = callback_wrapper.clone();
489 std::thread::spawn(move || {
490 while let Ok(msg) = rx_clone.recv() {
491 if let Some(data) = msg.downcast::<T>() {
492 callback_for_thread(data);
493 }
494 }
495 });
496
497 subscriber_id
498 }
499
500 pub fn try_publish<T: Send + Sync + Clone + 'static>(&self, topic_id: usize, message: T) {
501 let msg = MessageWrapper::new(message);
502
503 let channel_pairs = {
504 let topics = self.topics.lock().unwrap();
505
506 if topic_id >= topics.len() {
507 return;
508 }
509
510 if topics[topic_id].channel_pairs.is_empty() {
511 return;
512 }
513
514 topics[topic_id].channel_pairs.clone()
515 };
516
517 for pair in channel_pairs.iter() {
518 if pair.config.overwrite {
519 while pair.sender.is_full() {
520 let _ = pair.receiver.try_recv();
521 }
522 }
523
524 let _ = pair.sender.try_send(msg.clone());
525 }
526 }
527
528 pub fn publish<T: Send + Sync + Clone + 'static>(&self, topic_id: usize, message: T) {
529 self.publish_with_timeout(topic_id, message, None);
530 }
531
532 pub fn publish_with_timeout<T: Send + Sync + Clone + 'static>(
533 &self,
534 topic_id: usize,
535 message: T,
536 max_wait_ms: Option<u64>,
537 ) {
538 let msg = MessageWrapper::new(message);
539
540 let channel_pairs = {
541 let topics = self.topics.lock().unwrap();
542
543 if topic_id >= topics.len() {
544 return;
545 }
546
547 if topics[topic_id].channel_pairs.is_empty() {
548 return;
549 }
550
551 topics[topic_id].channel_pairs.clone()
552 };
553
554 for pair in channel_pairs.iter() {
555 if pair.config.overwrite {
556 while pair.sender.is_full() {
557 let _ = pair.receiver.try_recv();
558 }
559 let _ = pair.sender.try_send(msg.clone());
560 } else {
561 match max_wait_ms {
562 Some(ms) => {
563 let _ = pair
564 .sender
565 .send_timeout(msg.clone(), Duration::from_millis(ms));
566 }
567 None => {
568 let _ = pair.sender.send(msg.clone());
569 }
570 }
571 }
572 }
573 }
574
575 pub fn unsubscribe(&self, subscriber_id: &str) {
576 let topic_opt = {
577 let mut subscribers = self.subscribers.lock().unwrap();
578 if let Some(data) = subscribers.remove(subscriber_id) {
579 Some(data.topic)
580 } else {
581 None
582 }
583 };
584
585 if let Some(topic) = topic_opt {
586 let topic_index_opt = {
587 let topic_map = self.topic_map.lock().unwrap();
588 topic_map.get(&topic).cloned()
589 };
590
591 if let Some(topic_index) = topic_index_opt {
592 let mut topics = self.topics.lock().unwrap();
593 if let Some(topic_data) = topics.get_mut(topic_index) {
594 topic_data
595 .channel_pairs
596 .retain(|pair| pair.subscriber_id != subscriber_id);
597
598 if topic_data.channel_pairs.is_empty() {
599 let mut topic_map = self.topic_map.lock().unwrap();
600 topic_map.remove(&topic);
601 }
602 }
603 }
604 }
605 }
606}