use rocketmq::conf::{ClientOption, SimpleConsumerOption};
use rocketmq::model::common::{FilterExpression, FilterType};
use rocketmq::SimpleConsumer;
#[tokio::main]
async fn main() {
let mut consumer_option = SimpleConsumerOption::default();
consumer_option.set_topics(vec!["test_topic"]);
consumer_option.set_consumer_group("SimpleConsumerGroup");
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
client_option.set_enable_tls(false);
let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap();
let start_result = consumer.start().await;
if start_result.is_err() {
eprintln!(
"simple consumer start failed: {:?}",
start_result.unwrap_err()
);
return;
}
let receive_result = consumer
.receive(
"test_topic".to_string(),
&FilterExpression::new(FilterType::Tag, "test_tag"),
)
.await;
if receive_result.is_err() {
eprintln!("receive message failed: {:?}", receive_result.unwrap_err());
return;
}
let messages = receive_result.unwrap();
if messages.is_empty() {
println!("no message received");
return;
}
for message in messages {
println!("receive message: {:?}", message);
let ack_result = consumer.ack(&message).await;
if ack_result.is_err() {
eprintln!(
"ack message {} failed: {:?}",
message.message_id(),
ack_result.unwrap_err()
);
}
}
let shutdown_result = consumer.shutdown().await;
if shutdown_result.is_err() {
eprintln!(
"simple consumer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}