Skip to main content

rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_averagely.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_common::common::message::message_queue::MessageQueue;
17
18use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
19use crate::consumer::rebalance_strategy::check;
20
21pub struct AllocateMessageQueueAveragely;
22
23impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely {
24    fn allocate(
25        &self,
26        consumer_group: &CheetahString,
27        current_cid: &CheetahString,
28        mq_all: &[MessageQueue],
29        cid_all: &[CheetahString],
30    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
31        let mut result = Vec::new();
32        if !check(consumer_group, current_cid, mq_all, cid_all)? {
33            return Ok(result);
34        }
35
36        let index = cid_all.iter().position(|cid| cid == current_cid).unwrap_or(0);
37        let mod_val = mq_all.len() % cid_all.len();
38        let average_size = if mq_all.len() <= cid_all.len() {
39            1
40        } else if mod_val > 0 && index < mod_val {
41            mq_all.len() / cid_all.len() + 1
42        } else {
43            mq_all.len() / cid_all.len()
44        };
45        let start_index = if mod_val > 0 && index < mod_val {
46            index * average_size
47        } else {
48            index * average_size + mod_val
49        };
50        //let range = average_size.min(mq_all.len() - start_index);
51        //fix the bug " subtract with overflow" caused by (mq_all.len() - start_index )
52        let mut range: usize = 0;
53        if mq_all.len() > start_index {
54            range = average_size.min(mq_all.len() - start_index);
55        }
56        //in case of  mq_all.len() < start_index ,  means the customers is much more than queue
57        // so let range ==0 ,the for loop not work, and then no queue alloced to this customerID
58        for i in 0..range {
59            result.push(mq_all[start_index + i].clone());
60        }
61        Ok(result)
62    }
63    fn get_name(&self) -> &'static str {
64        "AVG"
65    }
66}