rocketmq_remoting/protocol/static_topic/
topic_queue_mapping_utils.rs

1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::collections::HashSet;
4
5use cheetah_string::CheetahString;
6use rand::seq::SliceRandom;
7use rocketmq_common::common::config::TopicConfig;
8/*
9 * Licensed to the Apache Software Foundation (ASF) under one or more
10 * contributor license agreements.  See the NOTICE file distributed with
11 * this work for additional information regarding copyright ownership.
12 * The ASF licenses this file to You under the Apache License, Version 2.0
13 * (the "License"); you may not use this file except in compliance with
14 * the License.  You may obtain a copy of the License at
15 *
16 *     http://www.apache.org/licenses/LICENSE-2.0
17 *
18 * Unless required by applicable law or agreed to in writing, software
19 * distributed under the License is distributed on an "AS IS" BASIS,
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21 * See the License for the specific language governing permissions and
22 * limitations under the License.
23 */
24use rocketmq_common::common::mix_all;
25use rocketmq_common::EnvUtils::EnvUtils;
26use rocketmq_common::FileUtils::string_to_file;
27use rocketmq_common::TimeUtils::get_current_millis;
28use rocketmq_error::RocketMQError;
29use rocketmq_error::RocketMQResult;
30use rocketmq_rust::ArcMut;
31
32use crate::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
33use crate::protocol::static_topic::topic_config_and_queue_mapping::TopicConfigAndQueueMapping;
34use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
35use crate::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
36use crate::protocol::static_topic::topic_queue_mapping_one::TopicQueueMappingOne;
37use crate::protocol::static_topic::topic_remapping_detail_wrapper;
38use crate::protocol::static_topic::topic_remapping_detail_wrapper::TopicRemappingDetailWrapper;
39use crate::protocol::RemotingSerializable;
40
41pub struct TopicQueueMappingUtils;
42
43impl TopicQueueMappingUtils {
44    pub fn find_logic_queue_mapping_item(
45        mapping_items: &[LogicQueueMappingItem],
46        logic_offset: i64,
47        ignore_negative: bool,
48    ) -> Option<&LogicQueueMappingItem> {
49        if mapping_items.is_empty() {
50            return None;
51        }
52        // Could use binary search to improve performance
53        for i in (0..mapping_items.len()).rev() {
54            let item = &mapping_items[i];
55            if ignore_negative && item.logic_offset < 0 {
56                continue;
57            }
58            if logic_offset >= item.logic_offset {
59                return Some(item);
60            }
61        }
62        // If not found, maybe out of range, return the first one
63        for item in mapping_items.iter() {
64            if ignore_negative && item.logic_offset < 0 {
65                continue;
66            } else {
67                return Some(item);
68            }
69        }
70        None
71    }
72
73    pub fn find_next<'a>(
74        items: &'a [LogicQueueMappingItem],
75        current_item: Option<&'a LogicQueueMappingItem>,
76        ignore_negative: bool,
77    ) -> Option<&'a LogicQueueMappingItem> {
78        if items.is_empty() || current_item.is_none() {
79            return None;
80        }
81        let current_item = current_item.unwrap();
82        for i in 0..items.len() {
83            let item = &items[i];
84            if ignore_negative && item.logic_offset < 0 {
85                continue;
86            }
87            if item.gen == current_item.gen {
88                if i < items.len() - 1 {
89                    let next_item = &items[i + 1];
90                    if ignore_negative && next_item.logic_offset < 0 {
91                        return None;
92                    } else {
93                        return Some(next_item);
94                    }
95                } else {
96                    return None;
97                }
98            }
99        }
100        None
101    }
102
103    pub fn get_mock_broker_name(scope: &str) -> CheetahString {
104        assert!(!scope.is_empty(), "Scope cannot be null");
105
106        if scope == mix_all::METADATA_SCOPE_GLOBAL {
107            format!(
108                "{}{}",
109                mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX,
110                &scope[2..]
111            )
112            .into()
113        } else {
114            format!("{}{}", mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX, scope).into()
115        }
116    }
117    pub fn get_mapping_detail_from_config(
118        configs: Vec<TopicConfigAndQueueMapping>,
119    ) -> RocketMQResult<Vec<TopicQueueMappingDetail>> {
120        let mut detail_list = vec![];
121        for config_mapping in &configs {
122            if let Some(detail) = &config_mapping.topic_queue_mapping_detail {
123                detail_list.push((**detail).clone());
124            }
125        }
126        Ok(detail_list)
127    }
128    pub fn check_name_epoch_num_consistence(
129        topic: &CheetahString,
130        broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
131    ) -> RocketMQResult<(i64, i32)> {
132        if broker_config_map.is_empty() {
133            return Err(RocketMQError::Internal(
134                "check_name_epoch_num_consistence get empty config map".to_string(),
135            ));
136        }
137        //make sure it is not null
138        let mut max_epoch = -1;
139        let mut max_num = -1;
140        let scope = CheetahString::new();
141        for entry in broker_config_map {
142            let broker = entry.0;
143            let config_mapping = entry.1;
144            if config_mapping.topic_queue_mapping_detail.is_none() {
145                return Err(RocketMQError::Internal(format!(
146                    "Mapping info should not be null in broker {}",
147                    broker
148                )));
149            }
150            let mapping_detail = &config_mapping.topic_queue_mapping_detail;
151            if let Some(mapping_detail) = &mapping_detail {
152                if let Some(broker_name) = &mapping_detail.topic_queue_mapping_info.bname {
153                    if !broker.eq(broker_name) {
154                        return Err(RocketMQError::Internal(format!(
155                            "The broker name is not equal {} != {} ",
156                            broker, broker_name
157                        )));
158                    }
159                    if mapping_detail.topic_queue_mapping_info.dirty {
160                        return Err(RocketMQError::Internal(format!(
161                            "The mapping info is dirty in broker  {}",
162                            broker
163                        )));
164                    }
165                    if let Some(top) = &config_mapping.topic_config.topic_name {
166                        if let Some(mapping_top) = &mapping_detail.topic_queue_mapping_info.topic {
167                            if !top.eq(mapping_top) {
168                                return Err(RocketMQError::Internal(format!(
169                                    "The topic name is inconsistent in broker  {}",
170                                    broker
171                                )));
172                            }
173                            if !topic.eq(mapping_top) {
174                                return Err(RocketMQError::Internal(format!(
175                                    "The topic name is not match for broker  {}",
176                                    broker
177                                )));
178                            }
179                            if let Some(m_scope) = &mapping_detail.topic_queue_mapping_info.scope {
180                                if !scope.eq(m_scope) {
181                                    return Err(RocketMQError::Internal(format!(
182                                        "scope does not match {} != {} in {}",
183                                        m_scope, scope, broker
184                                    )));
185                                }
186                                if max_epoch != -1
187                                    && max_epoch != mapping_detail.topic_queue_mapping_info.epoch
188                                {
189                                    return Err(RocketMQError::Internal(format!(
190                                        "epoch does not match {} != {} in {}",
191                                        max_epoch,
192                                        mapping_detail.topic_queue_mapping_info.epoch,
193                                        broker_name
194                                    )));
195                                } else {
196                                    max_epoch = mapping_detail.topic_queue_mapping_info.epoch;
197                                }
198                                if max_num != -1
199                                    && max_num
200                                        != mapping_detail.topic_queue_mapping_info.total_queues
201                                {
202                                    return Err(RocketMQError::Internal(format!(
203                                        "total queue number does not match {} != {} in {}",
204                                        max_num,
205                                        mapping_detail.topic_queue_mapping_info.total_queues,
206                                        broker_name
207                                    )));
208                                } else {
209                                    max_num = mapping_detail.topic_queue_mapping_info.total_queues;
210                                }
211                                return Ok((max_epoch, max_num));
212                            }
213                        }
214                    }
215                }
216            }
217        }
218        Err(RocketMQError::Internal(
219            "check_name_epoch_num_consistence err ! maybe some var is none".to_string(),
220        ))
221    }
222    pub fn check_if_reuse_physical_queue(
223        mapping_ones: &Vec<TopicQueueMappingOne>,
224    ) -> RocketMQResult<()> {
225        let mut physical_queue_id_map = HashMap::new();
226        for mapping_one in mapping_ones {
227            for item in mapping_one.items() {
228                if let Some(bname) = &item.bname {
229                    let physical_queue_id = format!("{} - {}", bname, item.queue_id);
230                    physical_queue_id_map
231                        .entry(physical_queue_id.clone())
232                        .or_insert(mapping_one.clone());
233                    if let Some(id) = physical_queue_id_map.get(&physical_queue_id) {
234                        return Err(RocketMQError::Internal(format!(
235                            "Topic {} global queue id {} and {} shared the same physical queue {}",
236                            mapping_one.topic(),
237                            mapping_one.global_id(),
238                            id.global_id(),
239                            physical_queue_id
240                        )));
241                    }
242                }
243            }
244        }
245        Ok(())
246    }
247
248    pub fn check_logic_queue_mapping_item_offset(
249        items: &[LogicQueueMappingItem],
250    ) -> RocketMQResult<()> {
251        if items.is_empty() {
252            return Err(RocketMQError::Internal(
253                "check_logic_queue_mapping_item_offset input items is empty".to_string(),
254            ));
255        }
256        let mut last_gen = -1;
257        let mut last_offset = -1;
258        for i in items.len() - 1..=0 {
259            let item = &items[i];
260            if item.start_offset < 0 || item.gen < 0 || item.queue_id < 0 {
261                return Err(RocketMQError::Internal(
262                    "The field is illegal, should not be negative".to_string(),
263                ));
264            }
265            if items.len() >= 2 && i <= items.len() - 2 && items[i].logic_offset < 0 {
266                return Err(RocketMQError::Internal(
267                    "The non-latest item has negative logic offset".to_string(),
268                ));
269            }
270            if last_gen != -1 && item.gen >= last_gen {
271                return Err(RocketMQError::Internal(
272                    "The gen does not increase monotonically".to_string(),
273                ));
274            }
275
276            if item.end_offset != -1 && item.end_offset < item.start_offset {
277                return Err(RocketMQError::Internal(
278                    "The endOffset is smaller than the start offset".to_string(),
279                ));
280            }
281
282            if last_offset != -1 && item.logic_offset != -1 {
283                if item.logic_offset >= last_offset {
284                    return Err(RocketMQError::Internal(
285                        "The base logic offset does not increase monotonically".to_string(),
286                    ));
287                }
288                if item.compute_max_static_queue_offset() >= last_offset {
289                    return Err(RocketMQError::Internal(
290                        "The max logic offset does not increase monotonically".to_string(),
291                    ));
292                }
293            }
294            last_gen = item.gen;
295            last_offset = item.logic_offset;
296        }
297        Ok(())
298    }
299    pub fn get_leader_item(
300        items: &[LogicQueueMappingItem],
301    ) -> RocketMQResult<LogicQueueMappingItem> {
302        if items.is_empty() {
303            return Err(RocketMQError::Internal(
304                "get_leader_item failed with empty items".to_string(),
305            ));
306        }
307        if let Some(i) = items.last() {
308            return Ok(i.clone());
309        }
310        Err(RocketMQError::Internal(
311            "get_leader_item failed with empty items".to_string(),
312        ))
313    }
314    pub fn get_leader_broker(items: &[LogicQueueMappingItem]) -> RocketMQResult<CheetahString> {
315        let item = TopicQueueMappingUtils::get_leader_item(items)?;
316        if let Some(bname) = &item.bname {
317            return Ok(bname.to_string().into());
318        }
319        Err(RocketMQError::Internal(
320            "get_leader_broker fn get Item with None bname".to_string(),
321        ))
322    }
323    pub fn check_and_build_mapping_items(
324        mut mapping_detail_list: Vec<TopicQueueMappingDetail>,
325        replace: bool,
326        check_consistence: bool,
327    ) -> RocketMQResult<HashMap<i32, TopicQueueMappingOne>> {
328        mapping_detail_list.sort_by(|o1, o2| {
329            let sub = o1.topic_queue_mapping_info.epoch - o2.topic_queue_mapping_info.epoch;
330            if sub > 0 {
331                return std::cmp::Ordering::Greater;
332            } else if sub < 0 {
333                return std::cmp::Ordering::Less;
334            }
335            std::cmp::Ordering::Equal
336        });
337
338        let mut max_num = 0;
339        let mut global_id_map = HashMap::new();
340        for mapping_detail in &mapping_detail_list {
341            if mapping_detail.topic_queue_mapping_info.total_queues > max_num {
342                max_num = mapping_detail.topic_queue_mapping_info.total_queues;
343            }
344            if let Some(queue) = &mapping_detail.hosted_queues {
345                for entry in queue {
346                    let global_id = entry.0;
347                    TopicQueueMappingUtils::check_logic_queue_mapping_item_offset(entry.1)?;
348                    let leader_broker_name = TopicQueueMappingUtils::get_leader_broker(entry.1)?;
349                    if let Some(broker_name) = &mapping_detail.topic_queue_mapping_info.bname {
350                        if !leader_broker_name.eq(broker_name) {
351                            //not the leader
352                            continue;
353                        }
354
355                        if global_id_map.contains_key(global_id) {
356                            if !replace {
357                                return Err(RocketMQError::Internal(format!(
358                                    "The queue id is duplicated in broker {} {}",
359                                    leader_broker_name, broker_name
360                                )));
361                            }
362                        } else if let Some(top) = &mapping_detail.topic_queue_mapping_info.topic {
363                            global_id_map.insert(
364                                *global_id,
365                                TopicQueueMappingOne::new(
366                                    mapping_detail.clone(),
367                                    top.clone().into(),
368                                    broker_name.clone().into(),
369                                    *global_id,
370                                    entry.1.clone(),
371                                ),
372                            );
373                        }
374                    }
375                }
376            }
377        }
378
379        if check_consistence {
380            if max_num as usize != global_id_map.len() {
381                return Err(RocketMQError::Internal(format!(
382                    "The total queue number in config does not match the real hosted queues {} != \
383                     {}",
384                    max_num,
385                    global_id_map.len()
386                )));
387            }
388            for i in 0..max_num {
389                if !global_id_map.contains_key(&i) {
390                    return Err(RocketMQError::Internal(format!(
391                        "The queue number {} is not in globalIdMap",
392                        i
393                    )));
394                }
395            }
396        }
397        let values = global_id_map.values().cloned().collect();
398        TopicQueueMappingUtils::check_if_reuse_physical_queue(&values)?;
399        Ok(global_id_map)
400    }
401    pub fn write_to_temp(
402        wrapper: &TopicRemappingDetailWrapper,
403        after: bool,
404    ) -> RocketMQResult<CheetahString> {
405        let topic = wrapper.topic();
406        let data = wrapper.serialize_json()?;
407        let mut suffix = topic_remapping_detail_wrapper::SUFFIX_BEFORE;
408        if after {
409            suffix = topic_remapping_detail_wrapper::SUFFIX_AFTER;
410        }
411        if let Some(tmpdir) = &EnvUtils::get_property("java.io.tmpdir") {
412            let file_name = format!("{}/{}-{}{}", tmpdir, topic, wrapper.get_epoch(), suffix);
413            string_to_file(&data, &file_name)?;
414            return Ok(file_name.into());
415        }
416
417        Err(RocketMQError::Internal("write file failed".to_string()))
418    }
419    pub fn check_target_brokers_complete(
420        target_brokers: &HashSet<CheetahString>,
421        broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
422    ) -> RocketMQResult<()> {
423        for broker in broker_config_map.keys() {
424            if let Some(mapping) = broker_config_map.get(broker) {
425                if let Some(detail) = mapping.get_topic_queue_mapping_detail() {
426                    if let Some(queues) = &detail.hosted_queues {
427                        if queues.is_empty() {
428                            continue;
429                        }
430                    }
431                }
432            }
433            if !target_brokers.contains(broker) {
434                return Err(RocketMQError::Internal(format!(
435                    "The existed broker {} does not in target brokers ",
436                    broker
437                )));
438            }
439        }
440
441        Ok(())
442    }
443    pub fn check_physical_queue_consistence(
444        broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
445    ) -> RocketMQResult<()> {
446        for entry in broker_config_map {
447            let config_mapping = entry.1;
448            if let Some(detail) = config_mapping.get_topic_queue_mapping_detail() {
449                if config_mapping.topic_config.read_queue_nums
450                    < config_mapping.topic_config.write_queue_nums
451                {
452                    return Err(RocketMQError::Internal(
453                        "Read queues is smaller than write queues".to_string(),
454                    ));
455                }
456                if let Some(queues) = &detail.hosted_queues {
457                    for items in queues.values() {
458                        for item in items {
459                            if item.start_offset != 0 {
460                                return Err(RocketMQError::Internal(
461                                    "The start offset does not begin from 0".to_string(),
462                                ));
463                            }
464                            if let Some(bname) = &item.bname {
465                                let topic_config =
466                                    broker_config_map.get(&CheetahString::from(bname));
467                                if topic_config.is_none() {
468                                    return Err(RocketMQError::Internal(
469                                        "The broker of item does not exist".to_string(),
470                                    ));
471                                } else if let Some(topic_config) = topic_config {
472                                    if item.queue_id
473                                        >= topic_config.topic_config.write_queue_nums as i32
474                                    {
475                                        return Err(RocketMQError::Internal(
476                                            "The physical queue id is overflow the write queues"
477                                                .to_string(),
478                                        ));
479                                    }
480                                }
481                            }
482                        }
483                    }
484                }
485            }
486        }
487        Ok(())
488    }
489
490    pub fn create_topic_config_mapping(
491        topic: &str,
492        queue_num: i32,
493        target_brokers: &HashSet<CheetahString>,
494        broker_config_map: &mut HashMap<CheetahString, TopicConfigAndQueueMapping>,
495    ) -> RocketMQResult<TopicRemappingDetailWrapper> {
496        TopicQueueMappingUtils::check_target_brokers_complete(target_brokers, broker_config_map)?;
497        let mut global_id_map = HashMap::new();
498        let mut max_epoch_and_num = (get_current_millis(), queue_num);
499        if !broker_config_map.is_empty() {
500            let new_max_epoch_and_num = TopicQueueMappingUtils::check_name_epoch_num_consistence(
501                &CheetahString::from(topic),
502                broker_config_map,
503            )?;
504            max_epoch_and_num.0 = new_max_epoch_and_num.0 as u64;
505            max_epoch_and_num.1 = new_max_epoch_and_num.1;
506            let mut detail_list = vec![];
507            detail_list.extend(TopicQueueMappingUtils::get_mapping_detail_from_config(
508                broker_config_map.values().cloned().collect(),
509            )?);
510            global_id_map =
511                TopicQueueMappingUtils::check_and_build_mapping_items(detail_list, false, true)?;
512            TopicQueueMappingUtils::check_if_reuse_physical_queue(
513                &global_id_map.values().cloned().collect(),
514            )?;
515            TopicQueueMappingUtils::check_physical_queue_consistence(broker_config_map)?;
516        }
517        if (queue_num as usize) < global_id_map.len() {
518            return Err(RocketMQError::Internal(format!(
519                "Cannot decrease the queue num for static topic {} < {}",
520                queue_num,
521                global_id_map.len()
522            )));
523        }
524        //check the queue number
525        if (queue_num as usize) == global_id_map.len() {
526            return Err(RocketMQError::Internal(
527                "The topic queue num is equal the existed queue num, do nothing".to_string(),
528            ));
529        }
530
531        //the check is ok, now do the mapping allocation
532        let mut broker_num_map = HashMap::new();
533        for broker in target_brokers {
534            broker_num_map.insert(broker.clone(), 0);
535        }
536        let mut old_id_to_broker = HashMap::new();
537        for entry in &global_id_map {
538            let leader_broker = entry.1.bname();
539            old_id_to_broker.insert(*entry.0, CheetahString::from(leader_broker));
540            if !broker_num_map.contains_key(leader_broker) {
541                broker_num_map.insert(leader_broker.into(), 1);
542            } else {
543                broker_num_map.insert(leader_broker.into(), broker_num_map[leader_broker] + 1);
544            }
545        }
546        let mut allocator = MappingAllocator::new(old_id_to_broker, broker_num_map, HashMap::new());
547        allocator.up_to_num(queue_num);
548        let new_id_to_broker = allocator.id_to_broker();
549
550        //construct the topic configAndMapping
551        let new_epoch = (max_epoch_and_num.0 + 1000).max(get_current_millis());
552        for e in new_id_to_broker {
553            let queue_id = e.0;
554            let broker = e.1;
555            if global_id_map.contains_key(queue_id) {
556                //ignore the exited
557                continue;
558            }
559            let mut config_mapping;
560            if !broker_config_map.contains_key(broker) {
561                config_mapping = TopicConfigAndQueueMapping::new(
562                    TopicConfig::new(topic),
563                    Some(ArcMut::new(TopicQueueMappingDetail {
564                        topic_queue_mapping_info: TopicQueueMappingInfo::new(
565                            topic.into(),
566                            0,
567                            broker.into(),
568                            get_current_millis() as i64,
569                        ),
570                        hosted_queues: None,
571                    })),
572                );
573                config_mapping.topic_config.write_queue_nums = 1;
574                config_mapping.topic_config.read_queue_nums = 1;
575                broker_config_map.insert(broker.clone(), config_mapping.clone());
576            } else {
577                config_mapping = broker_config_map[broker].clone();
578                config_mapping.topic_config.write_queue_nums += 1;
579                config_mapping.topic_config.read_queue_nums += 1;
580            }
581            let mapping_item = LogicQueueMappingItem {
582                gen: 0,
583                queue_id: config_mapping.topic_config.write_queue_nums as i32,
584                bname: Some(broker.clone()),
585                logic_offset: 0,
586                start_offset: 0,
587                end_offset: -1,
588                time_of_start: -1,
589                time_of_end: -1,
590            };
591            if let Some(detail) = config_mapping.topic_queue_mapping_detail {
592                TopicQueueMappingDetail::put_mapping_info(
593                    detail.clone(),
594                    *queue_id,
595                    vec![mapping_item],
596                );
597            }
598        }
599
600        // set the topic config
601        for entry in &mut *broker_config_map {
602            let config_mapping = entry.1;
603            if let Some(detail) = &mut config_mapping.topic_queue_mapping_detail {
604                detail.topic_queue_mapping_info.epoch = new_epoch as i64;
605                detail.topic_queue_mapping_info.total_queues = queue_num;
606            }
607        }
608        //double check the config
609
610        TopicQueueMappingUtils::check_name_epoch_num_consistence(
611            &CheetahString::from(topic),
612            broker_config_map,
613        )?;
614        global_id_map = TopicQueueMappingUtils::check_and_build_mapping_items(
615            TopicQueueMappingUtils::get_mapping_detail_from_config(
616                broker_config_map.values().cloned().collect(),
617            )?,
618            false,
619            true,
620        )?;
621        TopicQueueMappingUtils::check_if_reuse_physical_queue(
622            &global_id_map.values().cloned().collect(),
623        )?;
624        TopicQueueMappingUtils::check_physical_queue_consistence(broker_config_map)?;
625        let map = broker_config_map
626            .iter()
627            .map(|(k, v)| (CheetahString::from_string(k.to_string()), v.clone()))
628            .collect();
629        Ok(TopicRemappingDetailWrapper::new(
630            topic.to_string().into(),
631            topic_remapping_detail_wrapper::TYPE_CREATE_OR_UPDATE
632                .to_string()
633                .into(),
634            new_epoch,
635            map,
636            HashSet::new(),
637            HashSet::new(),
638        ))
639    }
640}
641pub struct MappingAllocator {
642    broker_num_map: HashMap<CheetahString, i32>,
643    id_to_broker: HashMap<i32, CheetahString>,
644    //used for remapping
645    broker_num_map_before_remapping: HashMap<CheetahString, i32>,
646    current_index: i32,
647    least_brokers: Vec<CheetahString>,
648}
649impl MappingAllocator {
650    pub fn new(
651        id_to_broker: HashMap<i32, CheetahString>,
652        broker_num_map: HashMap<CheetahString, i32>,
653        broker_num_map_before_remapping: HashMap<CheetahString, i32>,
654    ) -> Self {
655        Self {
656            id_to_broker,
657            broker_num_map,
658            broker_num_map_before_remapping,
659            current_index: 0,
660            least_brokers: vec![],
661        }
662    }
663
664    fn fresh_state(&mut self) {
665        let mut min_num = i32::MAX;
666        for entry in &self.broker_num_map {
667            if *entry.1 < min_num {
668                self.least_brokers.clear();
669                self.least_brokers.push(entry.0.clone());
670                min_num = *entry.1;
671            } else if *entry.1 == min_num {
672                self.least_brokers.push(entry.0.clone());
673            }
674        }
675        //reduce the remapping
676        if !self.broker_num_map_before_remapping.is_empty() {
677            self.least_brokers.sort_by(|o1, o2| {
678                let mut i1 = 0;
679                let mut i2 = 0;
680                if self.broker_num_map_before_remapping.contains_key(o1) {
681                    if let Some(s) = self.broker_num_map_before_remapping.get(o1) {
682                        i1 = *s;
683                    }
684                }
685                if self.broker_num_map_before_remapping.contains_key(o2) {
686                    if let Some(s) = self.broker_num_map_before_remapping.get(o2) {
687                        i2 = *s;
688                    }
689                }
690                if i1 - i2 > 0 {
691                    return std::cmp::Ordering::Greater;
692                } else if i1 - i2 < 0 {
693                    return Ordering::Less;
694                }
695                Ordering::Equal
696            });
697        } else {
698            //reduce the imbalance
699            let mut rng = rand::rng();
700            self.least_brokers.shuffle(&mut rng);
701        }
702        self.current_index = (self.least_brokers.len() - 1) as i32;
703    }
704    fn next_broker(&mut self) -> CheetahString {
705        if self.least_brokers.is_empty() {
706            self.fresh_state();
707        }
708        let tmp_index = self.current_index as usize % self.least_brokers.len();
709        self.least_brokers.remove(tmp_index)
710    }
711
712    pub fn broker_num_map(&self) -> &HashMap<CheetahString, i32> {
713        &self.broker_num_map
714    }
715
716    pub fn up_to_num(&mut self, max_queue_num: i32) {
717        let curr_size = self.id_to_broker.len();
718        if (max_queue_num as usize) <= curr_size {
719            return;
720        }
721        for i in curr_size..(max_queue_num as usize) {
722            let next_broker = self.next_broker();
723            if self.broker_num_map.contains_key(&next_broker) {
724                self.broker_num_map
725                    .insert(next_broker.clone(), self.broker_num_map[&next_broker] + 1);
726            } else {
727                self.broker_num_map.insert(next_broker.clone(), 1);
728            }
729            self.id_to_broker.insert(i as i32, next_broker);
730        }
731    }
732
733    pub fn id_to_broker(&self) -> &HashMap<i32, CheetahString> {
734        &self.id_to_broker
735    }
736}