pop_consumer/
pop_consumer.rs1use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
18use rocketmq_client_rust::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
19use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
20use rocketmq_client_rust::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
21use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
22use rocketmq_common::common::message::message_ext::MessageExt;
23use rocketmq_error::RocketMQResult;
24use rocketmq_rust::rocketmq;
25use tracing::info;
26
27pub const MESSAGE_COUNT: usize = 1;
28pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
29pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
30pub const TOPIC: &str = "TopicTest";
31pub const TAG: &str = "*";
32
33#[rocketmq::main]
34pub async fn main() -> RocketMQResult<()> {
35 rocketmq_common::log::init_logger()?;
37
38 let builder = DefaultMQPushConsumer::builder();
40
41 let mut consumer = builder
42 .consumer_group(CONSUMER_GROUP.to_string())
43 .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
44 .client_rebalance(false)
46 .build();
47 consumer.subscribe(TOPIC, "*")?;
48 consumer.register_message_listener_concurrently(MyMessageListener);
49 consumer.start().await?;
50 let _ = tokio::signal::ctrl_c().await;
51 Ok(())
52}
53
54pub struct MyMessageListener;
55
56impl MessageListenerConcurrently for MyMessageListener {
57 fn consume_message(
58 &self,
59 msgs: &[&MessageExt],
60 _context: &ConsumeConcurrentlyContext,
61 ) -> RocketMQResult<ConsumeConcurrentlyStatus> {
62 for msg in msgs {
63 info!("Receive message: {:?}", msg);
64 }
65 Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
66 }
67}