rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_averagely.rs1use cheetah_string::CheetahString;
16use rocketmq_common::common::message::message_queue::MessageQueue;
17
18use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
19use crate::consumer::rebalance_strategy::check;
20
21pub struct AllocateMessageQueueAveragely;
22
23impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely {
24 fn allocate(
25 &self,
26 consumer_group: &CheetahString,
27 current_cid: &CheetahString,
28 mq_all: &[MessageQueue],
29 cid_all: &[CheetahString],
30 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
31 let mut result = Vec::new();
32 if !check(consumer_group, current_cid, mq_all, cid_all)? {
33 return Ok(result);
34 }
35
36 let index = cid_all.iter().position(|cid| cid == current_cid).unwrap_or(0);
37 let mod_val = mq_all.len() % cid_all.len();
38 let average_size = if mq_all.len() <= cid_all.len() {
39 1
40 } else if mod_val > 0 && index < mod_val {
41 mq_all.len() / cid_all.len() + 1
42 } else {
43 mq_all.len() / cid_all.len()
44 };
45 let start_index = if mod_val > 0 && index < mod_val {
46 index * average_size
47 } else {
48 index * average_size + mod_val
49 };
50 let mut range: usize = 0;
53 if mq_all.len() > start_index {
54 range = average_size.min(mq_all.len() - start_index);
55 }
56 for i in 0..range {
59 result.push(mq_all[start_index + i].clone());
60 }
61 Ok(result)
62 }
63 fn get_name(&self) -> &'static str {
64 "AVG"
65 }
66}