Expand description
§The Rust Implementation of Apache RocketMQ Client
Here is the official rust client for Apache RocketMQ providing async/await API powered by tokio runtime.
Different from the remoting-based client, the current implementation is based on separating architecture for computing and storage, which is the more recommended way to access the RocketMQ service.
Here are some preparations you may need to know: RocketMQ Quick Start.
§Examples
Basic usage:
§Producer
use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;
#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// producer will prefetch topic route when start and failed fast if topic not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["test_topic"]);
// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
producer.start().await.unwrap();
// build message
let message = MessageBuilder::builder()
.set_topic("test_topic")
.set_tag("test_tag")
.set_body("hello world".as_bytes().to_vec())
.build()
.unwrap();
// send message to rocketmq proxy
let result = producer.send(message).await;
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
result.unwrap().message_id()
);
}
§Simple Consumer
use rocketmq::conf::{ClientOption, SimpleConsumerOption};
use rocketmq::model::common::{FilterExpression, FilterType};
use rocketmq::SimpleConsumer;
#[tokio::main]
async fn main() {
// recommend to specify which topic(s) you would like to send message to
// simple consumer will prefetch topic route when start and failed fast if topic not exist
let mut consumer_option = SimpleConsumerOption::default();
consumer_option.set_topics(vec!["test_topic"]);
consumer_option.set_consumer_group("SimpleConsumerGroup");
// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
// build and start simple consumer
let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
consumer.start().await.unwrap();
// pop message from rocketmq proxy
let receive_result = consumer
.receive(
"test_topic".to_string(),
&FilterExpression::new(FilterType::Tag, "test_tag"),
)
.await;
debug_assert!(
receive_result.is_ok(),
"receive message failed: {:?}",
receive_result.unwrap_err()
);
let messages = receive_result.unwrap();
for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(&message).await;
debug_assert!(
ack_result.is_ok(),
"ack message failed: {:?}",
ack_result.unwrap_err()
);
}
}
Modules§
- conf
- Configuration of RocketMQ rust client.
- error
- Error data model of RocketMQ rust client.
- model
- Data model of RocketMQ rust client.
Structs§
- Producer
Producer
is the core struct, to which application developers should turn, when publishing messages to RocketMQ proxy.- Simple
Consumer SimpleConsumer
is a lightweight consumer to consume messages from RocketMQ proxy.