The Rust Implementation of Apache RocketMQ Client
Overview
This project is the Rust implementation of Apache RocketMQ client. It provides feature parity with the RocketMQ Java client, supporting all major Producer capabilities.
Producer Features
The RocketMQ Rust client provides comprehensive Producer functionality through two main implementations:
| Feature | DefaultMQProducer | TransactionMQProducer |
|---|---|---|
| Basic Send (Sync/Async/Oneway) | ✅ | ✅ |
| Send to Specific Queue | ✅ | ✅ |
| Send with Selector | ✅ | ✅ |
| Batch Send | ✅ | ✅ |
| Request-Reply (RPC) | ✅ | ✅ |
| Message Recall | ✅ | ✅ |
| Transaction Messages | ❌ | ✅ |
| Auto Batch Sending | ✅ | ✅ |
| Backpressure Control | ✅ | ✅ |
Detailed Feature List
1. Basic Sending Methods
- Synchronous Send: Blocks until receive broker response
- Asynchronous Send: Non-blocking with callback
- Oneway Send: Fire-and-forget, no response expected
- Timeout Support: Configurable per-request timeout
2. Queue Selection
- Auto Selection: Default load balancing across queues
- Specific Queue: Send to designated MessageQueue
- Custom Selector: Implement
MessageQueueSelectorfor custom routing logic
3. Batch Sending
- Manual Batch: Send multiple messages together
- Auto Batch: Automatic batching with configurable thresholds
- Batch to Queue: Send batches to specific queues
4. Request-Reply Pattern (RPC)
- Synchronous Request: Send request and wait for response
- Asynchronous Request: Non-blocking with callback
- Request with Selector: Route requests via custom selector
- Request to Queue: Send requests to specific queues
5. Transaction Messages (TransactionMQProducer only)
- Local Transaction Execution: Execute transaction logic locally
- Transaction Commit/Rollback: Full transaction state management
- Transaction Listener: Custom transaction behavior via
TransactionListenertrait
6. Advanced Features
- Message Recall: Recall messages by topic and handle
- Compression: Automatic compression for large messages
- Backpressure: Configurable async backpressure control
- Namespace Support: Multi-tenant namespace isolation
- Trace Integration: Message tracing support
How to send message
First, start the RocketMQ NameServer and Broker services.
For more examples, you can check here
Send a single message
use DefaultMQProducer;
use MQProducer;
use Result;
use Message;
use rocketmq;
pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
pub async
Send batch messages
use DefaultMQProducer;
use MQProducer;
use Message;
use rocketmq;
pub const PRODUCER_GROUP: &str = "BatchProducerGroupName";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
pub async
Send RPC messages
use DefaultMQProducer;
use MQProducer;
use Result;
use Message;
use rocketmq;
pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "RequestTopic";
pub const TAG: &str = "TagA";
pub async
Send transaction messages
use TransactionMQProducer;
use TransactionListener;
use CheetahString;
use Message;
use rocketmq;
pub const PRODUCER_GROUP: &str = "transaction_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TransactionTopic";
// Custom transaction listener
;
pub async
Send with custom selector
use DefaultMQProducer;
use MQProducer;
use MessageQueue;
use MessageTrait;
use Message;
use rocketmq;
pub const PRODUCER_GROUP: &str = "selector_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "SelectorTopic";
pub async
Message recall
use DefaultMQProducer;
use MQProducer;
use Message;
use CheetahString;
use rocketmq;
pub const PRODUCER_GROUP: &str = "recall_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "RecallTopic";
pub async
Consumer Performance Best Practices
High-Throughput Message Consumption with Zero-Copy API
The RocketMQ Rust client provides zero-copy polling APIs for maximum performance when consuming messages at high throughput (10,000+ msg/s).
Performance Comparison
| Method | Throughput | Memory Overhead | Use Case |
|---|---|---|---|
poll() |
~1,000 msg/s | High (clones all messages) | Simple scenarios, need to store messages |
poll_zero_copy() |
10,000+ msg/s | Minimal (no cloning) | High throughput, read-only processing |
Why Use Zero-Copy?
Regular poll() overhead:
// ⚠️ Each poll with 32 messages of 2KB each = ~90KB allocation + copy
// At 100 polls/sec = ~9MB/s memory allocation rate
let messages = consumer.poll.await; // Clones every message!
Zero-copy poll_zero_copy() advantage:
// ✅ Zero heap allocations, 10x+ faster
let messages = consumer.poll_zero_copy.await; // No cloning!
Usage Examples
Example 1: High-Throughput Processing
For processing messages without storing them (forward, parse, aggregate, etc.):
use DefaultLitePullConsumer;
use LitePullConsumer;
async
Example 2: Selective Cloning
When you need to store only some messages:
// ✅ Best practice: Zero-copy first, clone only what you need
let messages = consumer.poll_zero_copy.await;
// Filter and clone only important messages
let important: = messages.into_iter
.filter
.map // Clone only filtered messages
.collect;
// Store only important messages
store.save;
Example 3: When to Use Regular poll()
Use regular poll() when you need to store all messages:
// ✅ Appropriate use of poll() - need to store all messages
let messages = consumer.poll.await; // Clones all
message_store.save_all; // Messages outlive poll scope
Complete Examples
Use the zero-copy API in your application code. Here's how:
Basic high-throughput pattern: Basic high-throughput pattern:
loop
**SeImplementation Notes
**Key Takeaways:ng pattern:
let messages = consumer.poll_zero_copy.await;
let important = messages.into_iter
.filter
.map
.collect;
Implementation Notes
- Default choice for high throughput: Use
poll_zero_copy()orpoll_with_timeout_zero_copy() - Read-only processing: No need to clone - process messages directly
- Selective storage: Clone only the messages you need to keep
- Legacy compatibility: Regular
poll()still available for simple use cases
For detailed API documentation, see the LitePullConsumer trait documentation.