rocketmq-client-rust 0.8.0

Rust implementation of Apache rocketmq client
Documentation

The Rust Implementation of Apache RocketMQ Client

Overview

This project is the Rust implementation of Apache RocketMQ client. It provides feature parity with the RocketMQ Java client, supporting all major Producer capabilities.

Producer Features

The RocketMQ Rust client provides comprehensive Producer functionality through two main implementations:

Feature DefaultMQProducer TransactionMQProducer
Basic Send (Sync/Async/Oneway)
Send to Specific Queue
Send with Selector
Batch Send
Request-Reply (RPC)
Message Recall
Transaction Messages
Auto Batch Sending
Backpressure Control

Detailed Feature List

1. Basic Sending Methods

  • Synchronous Send: Blocks until receive broker response
  • Asynchronous Send: Non-blocking with callback
  • Oneway Send: Fire-and-forget, no response expected
  • Timeout Support: Configurable per-request timeout

2. Queue Selection

  • Auto Selection: Default load balancing across queues
  • Specific Queue: Send to designated MessageQueue
  • Custom Selector: Implement MessageQueueSelector for custom routing logic

3. Batch Sending

  • Manual Batch: Send multiple messages together
  • Auto Batch: Automatic batching with configurable thresholds
  • Batch to Queue: Send batches to specific queues

4. Request-Reply Pattern (RPC)

  • Synchronous Request: Send request and wait for response
  • Asynchronous Request: Non-blocking with callback
  • Request with Selector: Route requests via custom selector
  • Request to Queue: Send requests to specific queues

5. Transaction Messages (TransactionMQProducer only)

  • Local Transaction Execution: Execute transaction logic locally
  • Transaction Commit/Rollback: Full transaction state management
  • Transaction Listener: Custom transaction behavior via TransactionListener trait

6. Advanced Features

  • Message Recall: Recall messages by topic and handle
  • Compression: Automatic compression for large messages
  • Backpressure: Configurable async backpressure control
  • Namespace Support: Multi-tenant namespace isolation
  • Trace Integration: Message tracing support

How to send message

First, start the RocketMQ NameServer and Broker services.

For more examples, you can check here

Send a single message

use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::Result;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> Result<()> {
    //init logger
    rocketmq_common::log::init_logger()?;

    // create a producer builder with default configuration
    let builder = DefaultMQProducer::builder();

    let mut producer = builder
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .build();

    producer.start().await?;

    for _ in 0..10 {
        let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

        let send_result = producer.send_with_timeout(message, 2000).await?;
        println!("send result: {}", send_result);
    }
    producer.shutdown().await;

    Ok(())
}

Send batch messages

use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const PRODUCER_GROUP: &str = "BatchProducerGroupName";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
    //init logger
    rocketmq_common::log::init_logger()?;

    // create a producer builder with default configuration
    let builder = DefaultMQProducer::builder();

    let mut producer = builder
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .build();
    producer.start().await?;

    let mut messages = Vec::new();
    messages.push(Message::with_keys(
        TOPIC,
        TAG,
        "OrderID001",
        "Hello world 0".as_bytes(),
    ));
    messages.push(Message::with_keys(
        TOPIC,
        TAG,
        "OrderID002",
        "Hello world 1".as_bytes(),
    ));
    messages.push(Message::with_keys(
        TOPIC,
        TAG,
        "OrderID003",
        "Hello world 2".as_bytes(),
    ));
    let send_result = producer.send_batch(messages).await?;
    println!("send result: {}", send_result);
    Ok(())
}

Send RPC messages

use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::Result;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "RequestTopic";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> Result<()> {
    //init logger
    rocketmq_common::log::init_logger()?;

    // create a producer builder with default configuration
    let builder = DefaultMQProducer::builder();

    let mut producer = builder
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .build();

    producer.start().await?;
    let ttl = 3000;
    let message = producer
        .request(
            Message::with_tags(TOPIC, "", "Hello RocketMQ".as_bytes()),
            ttl,
        )
        .await?;
    println!("send result: {:?}", message);
    producer.shutdown().await;

    Ok(())
}

Send transaction messages

use rocketmq_client_rust::producer::transaction_mq_producer::TransactionMQProducer;
use rocketmq_client_rust::producer::transaction_listener::TransactionListener;
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const PRODUCER_GROUP: &str = "transaction_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TransactionTopic";

// Custom transaction listener
struct MyTransactionListener;

impl TransactionListener for MyTransactionListener {
    fn execute_local_transaction(
        &mut self,
        msg: &dyn rocketmq_common::common::message::MessageTrait,
        arg: &dyn std::any::Any,
    ) -> rocketmq_client_rust::Result<rocketmq_client_rust::producer::transaction_listener::LocalTransactionState> {
        // Implement local transaction logic here
        println!("Executing local transaction for message: {:?}", msg.get_keys());
        Ok(rocketmq_client_rust::producer::transaction_listener::LocalTransactionState::CommitMessage)
    }

    fn check_local_transaction(
        &mut self,
        msg: &dyn rocketmq_common::common::message::MessageTrait,
    ) -> rocketmq_client_rust::Result<rocketmq_client_rust::producer::transaction_listener::LocalTransactionState> {
        // Check transaction status
        Ok(rocketmq_client_rust::producer::transaction_listener::LocalTransactionState::CommitMessage)
    }
}

#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
    rocketmq_common::log::init_logger()?;

    let mut producer = TransactionMQProducer::builder()
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .transaction_listener(MyTransactionListener)
        .build()
        .await?;

    producer.start().await?;

    let msg = Message::with_tags(TOPIC, "", "Hello Transactional RocketMQ".as_bytes());
    let result = producer.send_message_in_transaction(msg, Some("transaction_arg")).await?;

    println!("Transaction send result: {:?}", result);
    producer.shutdown().await;

    Ok(())
}

Send with custom selector

use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_rust::rocketmq;

pub const PRODUCER_GROUP: &str = "selector_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "SelectorTopic";

#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
    rocketmq_common::log::init_logger()?;

    let mut producer = DefaultMQProducer::builder()
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .build();

    producer.start().await?;

    let msg = Message::with_tags(TOPIC, "", "Hello RocketMQ with Selector".as_bytes());

    // Custom queue selector - routes messages based on key
    let selector = |queues: &[MessageQueue], _msg: &dyn MessageTrait, arg: &dyn std::any::Any| {
        if let Some(order_id) = arg.downcast_ref::<String>() {
            // Simple hash-based routing
            let hash = order_id.chars().map(|c| c as usize).sum::<usize>();
            let index = hash % queues.len();
            Some(queues[index].clone())
        } else {
            queues.first().cloned()
        }
    };

    let order_id = "ORDER12345".to_string();
    let result = producer.send_with_selector(msg, selector, order_id).await?;

    println!("Send result: {:?}", result);
    producer.shutdown().await;

    Ok(())
}

Message recall

use rocketmq_client_rust::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_common::common::message::message_single::Message;
use cheetah_string::CheetahString;
use rocketmq_rust::rocketmq;

pub const PRODUCER_GROUP: &str = "recall_producer_group";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "RecallTopic";

#[rocketmq::main]
pub async fn main() -> rocketmq_client_rust::Result<()> {
    rocketmq_common::log::init_logger()?;

    let mut producer = DefaultMQProducer::builder()
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .build();

    producer.start().await?;

    // Send a message
    let msg = Message::with_tags(TOPIC, "", "Hello RocketMQ - Recallable".as_bytes());
    let send_result = producer.send(msg).await?;

    println!("Send result: {:?}", send_result);

    // Recall the message (if recall handle is available)
    // The recall handle is typically returned in the send result for recallable messages
    if let Some(recall_handle) = send_result.and_then(|r| r.recall_handle()) {
        let recall_result = producer.recall_message(
            CheetahString::from_static_str(TOPIC),
            CheetahString::from(recall_handle.as_str())
        ).await?;

        println!("Recall result: {}", recall_result);
    }

    producer.shutdown().await;

    Ok(())
}

Consumer Performance Best Practices

High-Throughput Message Consumption with Zero-Copy API

The RocketMQ Rust client provides zero-copy polling APIs for maximum performance when consuming messages at high throughput (10,000+ msg/s).

Performance Comparison

Method Throughput Memory Overhead Use Case
poll() ~1,000 msg/s High (clones all messages) Simple scenarios, need to store messages
poll_zero_copy() 10,000+ msg/s Minimal (no cloning) High throughput, read-only processing

Why Use Zero-Copy?

Regular poll() overhead:

// ⚠️ Each poll with 32 messages of 2KB each = ~90KB allocation + copy
// At 100 polls/sec = ~9MB/s memory allocation rate
let messages = consumer.poll().await;  // Clones every message!

Zero-copy poll_zero_copy() advantage:

// ✅ Zero heap allocations, 10x+ faster
let messages = consumer.poll_zero_copy().await;  // No cloning!

Usage Examples

Example 1: High-Throughput Processing

For processing messages without storing them (forward, parse, aggregate, etc.):

use rocketmq_client::consumer::default_lite_pull_consumer::DefaultLitePullConsumer;
use rocketmq_client::consumer::lite_pull_consumer::LitePullConsumer;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let consumer = DefaultLitePullConsumer::builder()
        .consumer_group("high_throughput_group")
        .name_server_addr("127.0.0.1:9876")
        .pull_batch_size(32)
        .auto_commit(true)
        .build();

    consumer.start().await?;
    consumer.subscribe("MyTopic").await?;

    loop {
        // ✅ Use zero-copy for maximum performance (10x+ faster)
        let messages = consumer.poll_zero_copy().await;
        
        for msg in &messages {
            // Process without cloning - read-only access
            let topic = msg.get_topic();
            let body = msg.get_body();
            
            // Your processing: parse, forward, aggregate, etc.
            process_message(topic, body).await?;
        }
        // Messages automatically dropped here, no manual cleanup!
    }
}

Example 2: Selective Cloning

When you need to store only some messages:

// ✅ Best practice: Zero-copy first, clone only what you need
let messages = consumer.poll_zero_copy().await;

// Filter and clone only important messages
let important: Vec<MessageExt> = messages.into_iter()
    .filter(|msg| is_important(msg))
    .map(|msg| (*msg).clone())  // Clone only filtered messages
    .collect();

// Store only important messages
store.save(important);

Example 3: When to Use Regular poll()

Use regular poll() when you need to store all messages:

// ✅ Appropriate use of poll() - need to store all messages
let messages = consumer.poll().await;  // Clones all
message_store.save_all(messages);  // Messages outlive poll scope

Complete Examples

Use the zero-copy API in your application code. Here's how:

Basic high-throughput pattern: Basic high-throughput pattern:

loop {
    let messages = consumer.poll_zero_copy().await;
    for msg in &messages {
        process_message(msg).await?;
    }
}

**SeImplementation Notes

**Key Takeaways:ng pattern:

let messages = consumer.poll_zero_copy().await;
let important = messages.into_iter()
    .filter(|m| is_important(m))
    .map(|m| (*m).clone())
    .collect();

Implementation Notes

  1. Default choice for high throughput: Use poll_zero_copy() or poll_with_timeout_zero_copy()
  2. Read-only processing: No need to clone - process messages directly
  3. Selective storage: Clone only the messages you need to keep
  4. Legacy compatibility: Regular poll() still available for simple use cases

For detailed API documentation, see the LitePullConsumer trait documentation.