1use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
18use rocketmq_client_rust::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
19use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
20use rocketmq_client_rust::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
21use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
22use rocketmq_common::common::message::message_ext::MessageExt;
23use rocketmq_error::RocketMQResult;
24use rocketmq_rust::rocketmq;
25use tracing::info;
26
27pub const MESSAGE_COUNT: usize = 1;
28pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
29pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
30pub const TOPIC: &str = "TopicTest";
31pub const TAG: &str = "*";
32
33#[rocketmq::main]
34pub async fn main() -> RocketMQResult<()> {
35 rocketmq_common::log::init_logger()?;
37
38 let builder = DefaultMQPushConsumer::builder();
40
41 let mut consumer = builder
42 .consumer_group(CONSUMER_GROUP.to_string())
43 .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
44 .build();
45 consumer.subscribe(TOPIC, "*")?;
46 consumer.register_message_listener_concurrently(MyMessageListener);
47 consumer.start().await?;
48 let _ = tokio::signal::ctrl_c().await;
49 Ok(())
50}
51
52pub struct MyMessageListener;
53
54impl MessageListenerConcurrently for MyMessageListener {
55 fn consume_message(
56 &self,
57 msgs: &[&MessageExt],
58 _context: &ConsumeConcurrentlyContext,
59 ) -> RocketMQResult<ConsumeConcurrentlyStatus> {
60 for msg in msgs {
61 info!("Receive message: {:?}", msg);
62 }
63 Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
64 }
65}