rocketmq_client_rust/consumer/rebalance_strategy/allocate_message_queue_averagely.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_common::common::message::message_queue::MessageQueue;
19
20use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
21use crate::consumer::rebalance_strategy::check;
22
23pub struct AllocateMessageQueueAveragely;
24
25impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely {
26 fn allocate(
27 &self,
28 consumer_group: &CheetahString,
29 current_cid: &CheetahString,
30 mq_all: &[MessageQueue],
31 cid_all: &[CheetahString],
32 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
33 let mut result = Vec::new();
34 if !check(consumer_group, current_cid, mq_all, cid_all)? {
35 return Ok(result);
36 }
37
38 let index = cid_all
39 .iter()
40 .position(|cid| cid == current_cid)
41 .unwrap_or(0);
42 let mod_val = mq_all.len() % cid_all.len();
43 let average_size = if mq_all.len() <= cid_all.len() {
44 1
45 } else if mod_val > 0 && index < mod_val {
46 mq_all.len() / cid_all.len() + 1
47 } else {
48 mq_all.len() / cid_all.len()
49 };
50 let start_index = if mod_val > 0 && index < mod_val {
51 index * average_size
52 } else {
53 index * average_size + mod_val
54 };
55 //let range = average_size.min(mq_all.len() - start_index);
56 //fix the bug " subtract with overflow" caused by (mq_all.len() - start_index )
57 let mut range: usize = 0;
58 if mq_all.len() > start_index {
59 range = average_size.min(mq_all.len() - start_index);
60 }
61 //in case of mq_all.len() < start_index , means the customers is much more than queue
62 // so let range ==0 ,the for loop not work, and then no queue alloced to this customerID
63 for i in 0..range {
64 result.push(mq_all[start_index + i].clone());
65 }
66 Ok(result)
67 }
68 fn get_name(&self) -> &'static str {
69 "AVG"
70 }
71}