broadcast_consumer/
push_consumer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
18use rocketmq_client_rust::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
19use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
20use rocketmq_client_rust::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
21use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
22use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
23use rocketmq_common::common::message::message_ext::MessageExt;
24use rocketmq_error::RocketMQResult;
25use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
26use rocketmq_rust::rocketmq;
27use tracing::info;
28
29pub const MESSAGE_COUNT: usize = 1;
30pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_1";
31pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
32pub const TOPIC: &str = "TopicTest";
33//pub const SUB_EXPRESSION: &str = "TagA || TagC || TagD";
34pub const SUB_EXPRESSION: &str = "*";
35
36#[rocketmq::main]
37pub async fn main() -> RocketMQResult<()> {
38    //init logger
39    rocketmq_common::log::init_logger()?;
40
41    // create a producer builder with default configuration
42    let builder = DefaultMQPushConsumer::builder();
43
44    let mut consumer = builder
45        .consumer_group(CONSUMER_GROUP.to_string())
46        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
47        .message_model(MessageModel::Broadcasting)
48        .build();
49    consumer.subscribe(TOPIC, SUB_EXPRESSION)?;
50    consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset);
51    consumer.register_message_listener_concurrently(MyMessageListener);
52    consumer.start().await?;
53    let _ = tokio::signal::ctrl_c().await;
54    Ok(())
55}
56
57pub struct MyMessageListener;
58
59impl MessageListenerConcurrently for MyMessageListener {
60    fn consume_message(
61        &self,
62        msgs: &[&MessageExt],
63        _context: &ConsumeConcurrentlyContext,
64    ) -> RocketMQResult<ConsumeConcurrentlyStatus> {
65        for msg in msgs {
66            info!("Receive message: {:?}", msg);
67        }
68        Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
69    }
70}