consumer/
consumer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
18use rocketmq_client_rust::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
19use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
20use rocketmq_client_rust::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
21use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
22use rocketmq_common::common::message::message_ext::MessageExt;
23use rocketmq_error::RocketMQResult;
24use rocketmq_rust::rocketmq;
25use tracing::info;
26
27pub const MESSAGE_COUNT: usize = 1;
28pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_4";
29pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
30pub const TOPIC: &str = "TopicTest";
31pub const TAG: &str = "*";
32
33#[rocketmq::main]
34pub async fn main() -> RocketMQResult<()> {
35    //init logger
36    rocketmq_common::log::init_logger()?;
37
38    // create a producer builder with default configuration
39    let builder = DefaultMQPushConsumer::builder();
40
41    let mut consumer = builder
42        .consumer_group(CONSUMER_GROUP.to_string())
43        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
44        .build();
45    consumer.subscribe(TOPIC, "*")?;
46    consumer.register_message_listener_concurrently(MyMessageListener);
47    consumer.start().await?;
48    let _ = tokio::signal::ctrl_c().await;
49    Ok(())
50}
51
52pub struct MyMessageListener;
53
54impl MessageListenerConcurrently for MyMessageListener {
55    fn consume_message(
56        &self,
57        msgs: &[&MessageExt],
58        _context: &ConsumeConcurrentlyContext,
59    ) -> RocketMQResult<ConsumeConcurrentlyStatus> {
60        for msg in msgs {
61            info!("Receive message: {:?}", msg);
62        }
63        Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
64    }
65}