Expand description
§Rust PubSub
A thread-safe, in-memory publish-subscribe library for Rust, designed for efficient and flexible inter-thread communication. It supports both manual message receiving and callback-based subscriptions, with configurable queue depth and overwrite behavior.
§Features
- Thread-Safe: Uses
crossbeam-channel
for safe message passing between threads. - Flexible Subscriptions: Supports manual message receiving and callback-based subscriptions.
- Configurable Channels: Allows customization of queue depth and overwrite behavior per subscription.
- Type-Safe Messaging: Supports any
Send + Sync + Clone + 'static
type for messages. - Timeout Support: Provides blocking, non-blocking, and timeout-based message receiving and publishing.
§功能(中文)
- 线程安全:使用
crossbeam-channel
实现线程间安全消息传递。 - 灵活订阅:支持手动消息接收和基于回调的订阅模式。
- 可配置通道:允许为每个订阅配置队列深度和覆盖行为。
- 类型安全消息:支持任何满足
Send + Sync + Clone + 'static
的消息类型。 - 超时支持:提供阻塞、非阻塞和带超时的消息接收与发布。
§API Overview
The library offers two main subscription methods:
subscribe_manual
: Returns a receiver that allows manually polling for messages.subscribe
: Takes a callback closure that processes messages in a dedicated thread. This non-blocking approach runs the provided closure in its own thread, making it ideal for operations that shouldn’t block the main thread.
§Usage Examples
§1. Manual Subscription with Non-Blocking Receive
use rust_pubsub::{PubSub, TopicConfig};
let pubsub = PubSub::instance();
let topic = "test_topic";
let config = TopicConfig::new(10, false); // Queue depth 10, no overwrite
// Create publisher
let topic_id = pubsub.create_publisher(topic);
// Subscribe manually
let receiver = pubsub.subscribe_manual::<String>(topic, config);
// Publish a message
pubsub.publish(topic_id, "Hello, World!".to_string());
// Try to receive the message
if let Some(msg) = receiver.try_recv() {
println!("Received: {}", msg);
}
§2. Callback-Based Subscription
use rust_pubsub::{PubSub, TopicConfig};
let pubsub = PubSub::instance();
let topic = "callback_topic";
let config = TopicConfig::new(5, true); // Queue depth 5, overwrite enabled
// Create publisher
let topic_id = pubsub.create_publisher(topic);
// Subscribe with a callback
let subscriber_id = pubsub.subscribe::<String, _>(topic, config, |msg: &String| {
println!("Callback received: {}", msg);
});
// Publish a message
pubsub.publish(topic_id, "Callback message".to_string());
// Wait briefly to ensure callback executes
std::thread::sleep(std::time::Duration::from_millis(100));
// Unsubscribe
pubsub.unsubscribe(&subscriber_id);
§3. Non-Blocking Callback Processing in Dedicated Thread
This example demonstrates how the subscribe
method’s callback is processed in its own dedicated thread,
allowing your main thread to continue execution without being blocked by message processing.
use rust_pubsub::{PubSub, TopicConfig};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
// Create a shared counter to demonstrate the callback running in a separate thread
let counter = Arc::new(Mutex::new(0));
let counter_clone = counter.clone();
let pubsub = PubSub::instance();
let topic = "thread_topic";
let config = TopicConfig::new(5, true);
let topic_id = pubsub.create_publisher(topic);
// Subscribe with a callback that will increment the counter
pubsub.subscribe::<i32, _>(topic, config, move |msg: &i32| {
// This closure runs in a dedicated thread
println!("Processing message: {} in a dedicated thread", msg);
// Simulate some processing time
thread::sleep(Duration::from_millis(500));
// Update the shared counter
let mut count = counter_clone.lock().unwrap();
*count += 1;
println!("Counter updated to: {}", *count);
});
// Publish multiple messages
for i in 1..=5 {
pubsub.publish(topic_id, i);
// Main thread can continue doing work without waiting for message processing
println!("Main thread: Published message {}", i);
}
// Main thread can do other work while callbacks are processed in background
println!("Main thread: Continuing with other work immediately");
// Wait briefly to allow some callback processing to occur
thread::sleep(Duration::from_millis(1000));
// Check how many messages were processed
let final_count = *counter.lock().unwrap();
println!("Messages processed so far: {}", final_count);
§4. Publishing with Timeout
use rust_pubsub::{PubSub, TopicConfig};
let pubsub = PubSub::instance();
let topic = "timeout_topic";
let config = TopicConfig::new(1, false); // Queue depth 1, no overwrite
// Create publisher
let topic_id = pubsub.create_publisher(topic);
// Subscribe manually
let receiver = pubsub.subscribe_manual::<i32>(topic, config);
// Publish with timeout (100ms)
pubsub.publish_with_timeout(topic_id, 42, Some(100));
// Receive with timeout (100ms)
if let Some(msg) = receiver.recv_timeout(Some(100)) {
println!("Received: {}", msg);
}
§5. Overwrite Mode with Full Queue
use rust_pubsub::{PubSub, TopicConfig};
let pubsub = PubSub::instance();
let topic = "overwrite_topic";
let config = TopicConfig::new(2, true); // Queue depth 2, overwrite enabled
// Create publisher
let topic_id = pubsub.create_publisher(topic);
// Subscribe manually
let receiver = pubsub.subscribe_manual::<String>( topic, config);
// Publish multiple messages to fill queue
pubsub.publish(topic_id, "Message 1".to_string());
pubsub.publish(topic_id, "Message 2".to_string());
pubsub.publish(topic_id, "Message 3".to_string()); // Overwrites oldest
// Receive messages
while let Some(msg) = receiver.try_recv() {
println!("Received: {}", msg);
}
§6. Multiple Subscribers
use rust_pubsub::{PubSub, TopicConfig};
let pubsub = PubSub::instance();
let topic = "multi_subscriber_topic";
let config = TopicConfig::new(10, false);
// Create publisher
let topic_id = pubsub.create_publisher(topic);
// Subscribe multiple times
let receiver1 = pubsub.subscribe_manual::<String>(topic, config.clone());
let receiver2 = pubsub.subscribe_manual::<String>(topic, config.clone());
// Publish a message
pubsub.publish(topic_id, "Broadcast message".to_string());
// Receive from both subscribers
println!("Receiver 1: {:?}", receiver1.try_recv());
println!("Receiver 2: {:?}", receiver2.try_recv());
§Installation
Add the following to your Cargo.toml
:
[dependencies]
rust-pubsub = "0.1.0"
§安装(中文)
在您的 Cargo.toml
中添加以下内容:
[dependencies]
rust-pubsub = "0.1.0"
§License
Licensed under either of Apache License, Version 2.0 or MIT license at your option.