broadcast_consumer/
push_consumer.rs1use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
18use rocketmq_client_rust::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
19use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
20use rocketmq_client_rust::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
21use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
22use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
23use rocketmq_common::common::message::message_ext::MessageExt;
24use rocketmq_error::RocketMQResult;
25use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
26use rocketmq_rust::rocketmq;
27use tracing::info;
28
29pub const MESSAGE_COUNT: usize = 1;
30pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_1";
31pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
32pub const TOPIC: &str = "TopicTest";
33pub const SUB_EXPRESSION: &str = "*";
35
36#[rocketmq::main]
37pub async fn main() -> RocketMQResult<()> {
38 rocketmq_common::log::init_logger()?;
40
41 let builder = DefaultMQPushConsumer::builder();
43
44 let mut consumer = builder
45 .consumer_group(CONSUMER_GROUP.to_string())
46 .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
47 .message_model(MessageModel::Broadcasting)
48 .build();
49 consumer.subscribe(TOPIC, SUB_EXPRESSION)?;
50 consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset);
51 consumer.register_message_listener_concurrently(MyMessageListener);
52 consumer.start().await?;
53 let _ = tokio::signal::ctrl_c().await;
54 Ok(())
55}
56
57pub struct MyMessageListener;
58
59impl MessageListenerConcurrently for MyMessageListener {
60 fn consume_message(
61 &self,
62 msgs: &[&MessageExt],
63 _context: &ConsumeConcurrentlyContext,
64 ) -> RocketMQResult<ConsumeConcurrentlyStatus> {
65 for msg in msgs {
66 info!("Receive message: {:?}", msg);
67 }
68 Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
69 }
70}