Crate rocketmq

source ·
Expand description

The Rust Implementation of Apache RocketMQ Client

Here is the official rust client for Apache RocketMQ providing async/await API powered by tokio runtime.

Different from the remoting-based client, the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.

Here are some preparations you may need to know: RocketMQ Quick Start.

Examples

Basic usage:

Producer

use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;

#[tokio::main]
async fn main() {
    // recommend to specify which topic(s) you would like to send message to
    // producer will prefetch topic route when start and failed fast if topic not exist
    let mut producer_option = ProducerOption::default();
    producer_option.set_topics(vec!["test_topic"]);

    // set which rocketmq proxy to connect
    let mut client_option = ClientOption::default();
    client_option.set_access_url("localhost:8081");

    // build and start producer
    let mut  producer = Producer::new(producer_option, client_option).unwrap();
    producer.start().await.unwrap();

    // build message
    let message = MessageBuilder::builder()
        .set_topic("test_topic")
        .set_tag("test_tag")
        .set_body("hello world".as_bytes().to_vec())
        .build()
        .unwrap();

    // send message to rocketmq proxy
    let result = producer.send(message).await;
    debug_assert!(result.is_ok(), "send message failed: {:?}", result);
    println!(
        "send message success, message_id={}",
        result.unwrap().message_id()
    );
}

Simple Consumer

use rocketmq::conf::{ClientOption, SimpleConsumerOption};
use rocketmq::model::common::{FilterExpression, FilterType};
use rocketmq::SimpleConsumer;

#[tokio::main]
async fn main() {
    // recommend to specify which topic(s) you would like to send message to
    // simple consumer will prefetch topic route when start and failed fast if topic not exist
    let mut consumer_option = SimpleConsumerOption::default();
    consumer_option.set_topics(vec!["test_topic"]);
    consumer_option.set_consumer_group("SimpleConsumerGroup");

    // set which rocketmq proxy to connect
    let mut client_option = ClientOption::default();
    client_option.set_access_url("localhost:8081");

    // build and start simple consumer
    let mut  consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
    consumer.start().await.unwrap();

    // pop message from rocketmq proxy
    let receive_result = consumer
        .receive(
            "test_topic".to_string(),
            &FilterExpression::new(FilterType::Tag, "test_tag"),
        )
        .await;
    debug_assert!(
        receive_result.is_ok(),
        "receive message failed: {:?}",
        receive_result.unwrap_err()
    );

    let messages = receive_result.unwrap();
    for message in messages {
        println!("receive message: {:?}", message);
        // ack message to rocketmq proxy
        let ack_result = consumer.ack(&message).await;
        debug_assert!(
            ack_result.is_ok(),
            "ack message failed: {:?}",
            ack_result.unwrap_err()
        );
    }
}

Modules

  • Configuration of RocketMQ rust client.
  • Error data model of RocketMQ rust client.
  • Data model of RocketMQ rust client.

Structs

  • Producer is the core struct, to which application developers should turn, when publishing messages to RocketMQ proxy.
  • SimpleConsumer is a lightweight consumer to consume messages from RocketMQ proxy.