rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_averagely.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_common::common::message::message_queue::MessageQueue;
19
20use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
21use crate::consumer::rebalance_strategy::check;
22
23pub struct AllocateMessageQueueAveragely;
24
25impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely {
26    fn allocate(
27        &self,
28        consumer_group: &CheetahString,
29        current_cid: &CheetahString,
30        mq_all: &[MessageQueue],
31        cid_all: &[CheetahString],
32    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
33        let mut result = Vec::new();
34        if !check(consumer_group, current_cid, mq_all, cid_all)? {
35            return Ok(result);
36        }
37
38        let index = cid_all
39            .iter()
40            .position(|cid| cid == current_cid)
41            .unwrap_or(0);
42        let mod_val = mq_all.len() % cid_all.len();
43        let average_size = if mq_all.len() <= cid_all.len() {
44            1
45        } else if mod_val > 0 && index < mod_val {
46            mq_all.len() / cid_all.len() + 1
47        } else {
48            mq_all.len() / cid_all.len()
49        };
50        let start_index = if mod_val > 0 && index < mod_val {
51            index * average_size
52        } else {
53            index * average_size + mod_val
54        };
55        //let range = average_size.min(mq_all.len() - start_index);
56        //fix the bug " subtract with overflow" caused by (mq_all.len() - start_index )
57        let mut range: usize = 0;
58        if mq_all.len() > start_index {
59            range = average_size.min(mq_all.len() - start_index);
60        }
61        //in case of  mq_all.len() < start_index ,  means the customers is much more than queue
62        // so let range ==0 ,the for loop not work, and then no queue alloced to this customerID
63        for i in 0..range {
64            result.push(mq_all[start_index + i].clone());
65        }
66        Ok(result)
67    }
68    fn get_name(&self) -> &'static str {
69        "AVG"
70    }
71}