ordermessage_consumer/
ordermessage_consumer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::sync::atomic::AtomicI64;
18use std::sync::Arc;
19
20use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
21#[allow(unused_imports)]
22use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
23use rocketmq_client_rust::consumer::listener::consume_orderly_context::ConsumeOrderlyContext;
24use rocketmq_client_rust::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus;
25use rocketmq_client_rust::consumer::listener::message_listener_orderly::MessageListenerOrderly;
26use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
27use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
28use rocketmq_common::common::message::message_ext::MessageExt;
29use rocketmq_error::RocketMQResult;
30use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
31use rocketmq_rust::rocketmq;
32use tracing::info;
33
34pub const MESSAGE_COUNT: usize = 1;
35pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_3";
36pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
37pub const TOPIC: &str = "TopicTest";
38pub const TAG: &str = "*";
39
40#[rocketmq::main]
41pub async fn main() -> RocketMQResult<()> {
42    //init logger
43    rocketmq_common::log::init_logger()?;
44
45    // create a producer builder with default configuration
46    let builder = DefaultMQPushConsumer::builder();
47
48    let mut consumer = builder
49        .consumer_group(CONSUMER_GROUP.to_string())
50        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
51        .message_model(MessageModel::Clustering)
52        .build();
53    consumer.subscribe(TOPIC, TAG)?;
54    consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset);
55    consumer.register_message_listener_orderly(MyMessageListener::new());
56    consumer.start().await?;
57    let _ = tokio::signal::ctrl_c().await;
58    Ok(())
59}
60
61pub struct MyMessageListener {
62    consume_times: Arc<AtomicI64>,
63}
64
65impl Default for MyMessageListener {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl MyMessageListener {
72    pub fn new() -> Self {
73        Self {
74            consume_times: Arc::new(AtomicI64::new(0)),
75        }
76    }
77}
78
79impl MessageListenerOrderly for MyMessageListener {
80    fn consume_message(
81        &self,
82        msgs: &[&MessageExt],
83        context: &mut ConsumeOrderlyContext,
84    ) -> RocketMQResult<ConsumeOrderlyStatus> {
85        context.set_auto_commit(true);
86        for msg in msgs {
87            println!("Receive message: {:?}", msg);
88            info!("Receive message: {:?}", msg);
89        }
90        if self
91            .consume_times
92            .load(std::sync::atomic::Ordering::Acquire)
93            % 2
94            == 0
95        {
96            return Ok(ConsumeOrderlyStatus::Success);
97        } else if self
98            .consume_times
99            .load(std::sync::atomic::Ordering::Acquire)
100            % 5
101            == 0
102        {
103            context.set_suspend_current_queue_time_millis(3000);
104            return Ok(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment);
105        }
106        Ok(ConsumeOrderlyStatus::Success)
107    }
108}