1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::collections::HashSet;
4
5use cheetah_string::CheetahString;
6use rand::seq::SliceRandom;
7use rocketmq_common::common::config::TopicConfig;
8use rocketmq_common::common::mix_all;
25use rocketmq_common::EnvUtils::EnvUtils;
26use rocketmq_common::FileUtils::string_to_file;
27use rocketmq_common::TimeUtils::get_current_millis;
28use rocketmq_error::RocketMQError;
29use rocketmq_error::RocketMQResult;
30use rocketmq_rust::ArcMut;
31
32use crate::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
33use crate::protocol::static_topic::topic_config_and_queue_mapping::TopicConfigAndQueueMapping;
34use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
35use crate::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
36use crate::protocol::static_topic::topic_queue_mapping_one::TopicQueueMappingOne;
37use crate::protocol::static_topic::topic_remapping_detail_wrapper;
38use crate::protocol::static_topic::topic_remapping_detail_wrapper::TopicRemappingDetailWrapper;
39use crate::protocol::RemotingSerializable;
40
41pub struct TopicQueueMappingUtils;
42
43impl TopicQueueMappingUtils {
44 pub fn find_logic_queue_mapping_item(
45 mapping_items: &[LogicQueueMappingItem],
46 logic_offset: i64,
47 ignore_negative: bool,
48 ) -> Option<&LogicQueueMappingItem> {
49 if mapping_items.is_empty() {
50 return None;
51 }
52 for i in (0..mapping_items.len()).rev() {
54 let item = &mapping_items[i];
55 if ignore_negative && item.logic_offset < 0 {
56 continue;
57 }
58 if logic_offset >= item.logic_offset {
59 return Some(item);
60 }
61 }
62 for item in mapping_items.iter() {
64 if ignore_negative && item.logic_offset < 0 {
65 continue;
66 } else {
67 return Some(item);
68 }
69 }
70 None
71 }
72
73 pub fn find_next<'a>(
74 items: &'a [LogicQueueMappingItem],
75 current_item: Option<&'a LogicQueueMappingItem>,
76 ignore_negative: bool,
77 ) -> Option<&'a LogicQueueMappingItem> {
78 if items.is_empty() || current_item.is_none() {
79 return None;
80 }
81 let current_item = current_item.unwrap();
82 for i in 0..items.len() {
83 let item = &items[i];
84 if ignore_negative && item.logic_offset < 0 {
85 continue;
86 }
87 if item.gen == current_item.gen {
88 if i < items.len() - 1 {
89 let next_item = &items[i + 1];
90 if ignore_negative && next_item.logic_offset < 0 {
91 return None;
92 } else {
93 return Some(next_item);
94 }
95 } else {
96 return None;
97 }
98 }
99 }
100 None
101 }
102
103 pub fn get_mock_broker_name(scope: &str) -> CheetahString {
104 assert!(!scope.is_empty(), "Scope cannot be null");
105
106 if scope == mix_all::METADATA_SCOPE_GLOBAL {
107 format!(
108 "{}{}",
109 mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX,
110 &scope[2..]
111 )
112 .into()
113 } else {
114 format!("{}{}", mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX, scope).into()
115 }
116 }
117 pub fn get_mapping_detail_from_config(
118 configs: Vec<TopicConfigAndQueueMapping>,
119 ) -> RocketMQResult<Vec<TopicQueueMappingDetail>> {
120 let mut detail_list = vec![];
121 for config_mapping in &configs {
122 if let Some(detail) = &config_mapping.topic_queue_mapping_detail {
123 detail_list.push((**detail).clone());
124 }
125 }
126 Ok(detail_list)
127 }
128 pub fn check_name_epoch_num_consistence(
129 topic: &CheetahString,
130 broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
131 ) -> RocketMQResult<(i64, i32)> {
132 if broker_config_map.is_empty() {
133 return Err(RocketMQError::Internal(
134 "check_name_epoch_num_consistence get empty config map".to_string(),
135 ));
136 }
137 let mut max_epoch = -1;
139 let mut max_num = -1;
140 let scope = CheetahString::new();
141 for entry in broker_config_map {
142 let broker = entry.0;
143 let config_mapping = entry.1;
144 if config_mapping.topic_queue_mapping_detail.is_none() {
145 return Err(RocketMQError::Internal(format!(
146 "Mapping info should not be null in broker {}",
147 broker
148 )));
149 }
150 let mapping_detail = &config_mapping.topic_queue_mapping_detail;
151 if let Some(mapping_detail) = &mapping_detail {
152 if let Some(broker_name) = &mapping_detail.topic_queue_mapping_info.bname {
153 if !broker.eq(broker_name) {
154 return Err(RocketMQError::Internal(format!(
155 "The broker name is not equal {} != {} ",
156 broker, broker_name
157 )));
158 }
159 if mapping_detail.topic_queue_mapping_info.dirty {
160 return Err(RocketMQError::Internal(format!(
161 "The mapping info is dirty in broker {}",
162 broker
163 )));
164 }
165 if let Some(top) = &config_mapping.topic_config.topic_name {
166 if let Some(mapping_top) = &mapping_detail.topic_queue_mapping_info.topic {
167 if !top.eq(mapping_top) {
168 return Err(RocketMQError::Internal(format!(
169 "The topic name is inconsistent in broker {}",
170 broker
171 )));
172 }
173 if !topic.eq(mapping_top) {
174 return Err(RocketMQError::Internal(format!(
175 "The topic name is not match for broker {}",
176 broker
177 )));
178 }
179 if let Some(m_scope) = &mapping_detail.topic_queue_mapping_info.scope {
180 if !scope.eq(m_scope) {
181 return Err(RocketMQError::Internal(format!(
182 "scope does not match {} != {} in {}",
183 m_scope, scope, broker
184 )));
185 }
186 if max_epoch != -1
187 && max_epoch != mapping_detail.topic_queue_mapping_info.epoch
188 {
189 return Err(RocketMQError::Internal(format!(
190 "epoch does not match {} != {} in {}",
191 max_epoch,
192 mapping_detail.topic_queue_mapping_info.epoch,
193 broker_name
194 )));
195 } else {
196 max_epoch = mapping_detail.topic_queue_mapping_info.epoch;
197 }
198 if max_num != -1
199 && max_num
200 != mapping_detail.topic_queue_mapping_info.total_queues
201 {
202 return Err(RocketMQError::Internal(format!(
203 "total queue number does not match {} != {} in {}",
204 max_num,
205 mapping_detail.topic_queue_mapping_info.total_queues,
206 broker_name
207 )));
208 } else {
209 max_num = mapping_detail.topic_queue_mapping_info.total_queues;
210 }
211 return Ok((max_epoch, max_num));
212 }
213 }
214 }
215 }
216 }
217 }
218 Err(RocketMQError::Internal(
219 "check_name_epoch_num_consistence err ! maybe some var is none".to_string(),
220 ))
221 }
222 pub fn check_if_reuse_physical_queue(
223 mapping_ones: &Vec<TopicQueueMappingOne>,
224 ) -> RocketMQResult<()> {
225 let mut physical_queue_id_map = HashMap::new();
226 for mapping_one in mapping_ones {
227 for item in mapping_one.items() {
228 if let Some(bname) = &item.bname {
229 let physical_queue_id = format!("{} - {}", bname, item.queue_id);
230 physical_queue_id_map
231 .entry(physical_queue_id.clone())
232 .or_insert(mapping_one.clone());
233 if let Some(id) = physical_queue_id_map.get(&physical_queue_id) {
234 return Err(RocketMQError::Internal(format!(
235 "Topic {} global queue id {} and {} shared the same physical queue {}",
236 mapping_one.topic(),
237 mapping_one.global_id(),
238 id.global_id(),
239 physical_queue_id
240 )));
241 }
242 }
243 }
244 }
245 Ok(())
246 }
247
248 pub fn check_logic_queue_mapping_item_offset(
249 items: &[LogicQueueMappingItem],
250 ) -> RocketMQResult<()> {
251 if items.is_empty() {
252 return Err(RocketMQError::Internal(
253 "check_logic_queue_mapping_item_offset input items is empty".to_string(),
254 ));
255 }
256 let mut last_gen = -1;
257 let mut last_offset = -1;
258 for i in items.len() - 1..=0 {
259 let item = &items[i];
260 if item.start_offset < 0 || item.gen < 0 || item.queue_id < 0 {
261 return Err(RocketMQError::Internal(
262 "The field is illegal, should not be negative".to_string(),
263 ));
264 }
265 if items.len() >= 2 && i <= items.len() - 2 && items[i].logic_offset < 0 {
266 return Err(RocketMQError::Internal(
267 "The non-latest item has negative logic offset".to_string(),
268 ));
269 }
270 if last_gen != -1 && item.gen >= last_gen {
271 return Err(RocketMQError::Internal(
272 "The gen does not increase monotonically".to_string(),
273 ));
274 }
275
276 if item.end_offset != -1 && item.end_offset < item.start_offset {
277 return Err(RocketMQError::Internal(
278 "The endOffset is smaller than the start offset".to_string(),
279 ));
280 }
281
282 if last_offset != -1 && item.logic_offset != -1 {
283 if item.logic_offset >= last_offset {
284 return Err(RocketMQError::Internal(
285 "The base logic offset does not increase monotonically".to_string(),
286 ));
287 }
288 if item.compute_max_static_queue_offset() >= last_offset {
289 return Err(RocketMQError::Internal(
290 "The max logic offset does not increase monotonically".to_string(),
291 ));
292 }
293 }
294 last_gen = item.gen;
295 last_offset = item.logic_offset;
296 }
297 Ok(())
298 }
299 pub fn get_leader_item(
300 items: &[LogicQueueMappingItem],
301 ) -> RocketMQResult<LogicQueueMappingItem> {
302 if items.is_empty() {
303 return Err(RocketMQError::Internal(
304 "get_leader_item failed with empty items".to_string(),
305 ));
306 }
307 if let Some(i) = items.last() {
308 return Ok(i.clone());
309 }
310 Err(RocketMQError::Internal(
311 "get_leader_item failed with empty items".to_string(),
312 ))
313 }
314 pub fn get_leader_broker(items: &[LogicQueueMappingItem]) -> RocketMQResult<CheetahString> {
315 let item = TopicQueueMappingUtils::get_leader_item(items)?;
316 if let Some(bname) = &item.bname {
317 return Ok(bname.to_string().into());
318 }
319 Err(RocketMQError::Internal(
320 "get_leader_broker fn get Item with None bname".to_string(),
321 ))
322 }
323 pub fn check_and_build_mapping_items(
324 mut mapping_detail_list: Vec<TopicQueueMappingDetail>,
325 replace: bool,
326 check_consistence: bool,
327 ) -> RocketMQResult<HashMap<i32, TopicQueueMappingOne>> {
328 mapping_detail_list.sort_by(|o1, o2| {
329 let sub = o1.topic_queue_mapping_info.epoch - o2.topic_queue_mapping_info.epoch;
330 if sub > 0 {
331 return std::cmp::Ordering::Greater;
332 } else if sub < 0 {
333 return std::cmp::Ordering::Less;
334 }
335 std::cmp::Ordering::Equal
336 });
337
338 let mut max_num = 0;
339 let mut global_id_map = HashMap::new();
340 for mapping_detail in &mapping_detail_list {
341 if mapping_detail.topic_queue_mapping_info.total_queues > max_num {
342 max_num = mapping_detail.topic_queue_mapping_info.total_queues;
343 }
344 if let Some(queue) = &mapping_detail.hosted_queues {
345 for entry in queue {
346 let global_id = entry.0;
347 TopicQueueMappingUtils::check_logic_queue_mapping_item_offset(entry.1)?;
348 let leader_broker_name = TopicQueueMappingUtils::get_leader_broker(entry.1)?;
349 if let Some(broker_name) = &mapping_detail.topic_queue_mapping_info.bname {
350 if !leader_broker_name.eq(broker_name) {
351 continue;
353 }
354
355 if global_id_map.contains_key(global_id) {
356 if !replace {
357 return Err(RocketMQError::Internal(format!(
358 "The queue id is duplicated in broker {} {}",
359 leader_broker_name, broker_name
360 )));
361 }
362 } else if let Some(top) = &mapping_detail.topic_queue_mapping_info.topic {
363 global_id_map.insert(
364 *global_id,
365 TopicQueueMappingOne::new(
366 mapping_detail.clone(),
367 top.clone().into(),
368 broker_name.clone().into(),
369 *global_id,
370 entry.1.clone(),
371 ),
372 );
373 }
374 }
375 }
376 }
377 }
378
379 if check_consistence {
380 if max_num as usize != global_id_map.len() {
381 return Err(RocketMQError::Internal(format!(
382 "The total queue number in config does not match the real hosted queues {} != \
383 {}",
384 max_num,
385 global_id_map.len()
386 )));
387 }
388 for i in 0..max_num {
389 if !global_id_map.contains_key(&i) {
390 return Err(RocketMQError::Internal(format!(
391 "The queue number {} is not in globalIdMap",
392 i
393 )));
394 }
395 }
396 }
397 let values = global_id_map.values().cloned().collect();
398 TopicQueueMappingUtils::check_if_reuse_physical_queue(&values)?;
399 Ok(global_id_map)
400 }
401 pub fn write_to_temp(
402 wrapper: &TopicRemappingDetailWrapper,
403 after: bool,
404 ) -> RocketMQResult<CheetahString> {
405 let topic = wrapper.topic();
406 let data = wrapper.serialize_json()?;
407 let mut suffix = topic_remapping_detail_wrapper::SUFFIX_BEFORE;
408 if after {
409 suffix = topic_remapping_detail_wrapper::SUFFIX_AFTER;
410 }
411 if let Some(tmpdir) = &EnvUtils::get_property("java.io.tmpdir") {
412 let file_name = format!("{}/{}-{}{}", tmpdir, topic, wrapper.get_epoch(), suffix);
413 string_to_file(&data, &file_name)?;
414 return Ok(file_name.into());
415 }
416
417 Err(RocketMQError::Internal("write file failed".to_string()))
418 }
419 pub fn check_target_brokers_complete(
420 target_brokers: &HashSet<CheetahString>,
421 broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
422 ) -> RocketMQResult<()> {
423 for broker in broker_config_map.keys() {
424 if let Some(mapping) = broker_config_map.get(broker) {
425 if let Some(detail) = mapping.get_topic_queue_mapping_detail() {
426 if let Some(queues) = &detail.hosted_queues {
427 if queues.is_empty() {
428 continue;
429 }
430 }
431 }
432 }
433 if !target_brokers.contains(broker) {
434 return Err(RocketMQError::Internal(format!(
435 "The existed broker {} does not in target brokers ",
436 broker
437 )));
438 }
439 }
440
441 Ok(())
442 }
443 pub fn check_physical_queue_consistence(
444 broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
445 ) -> RocketMQResult<()> {
446 for entry in broker_config_map {
447 let config_mapping = entry.1;
448 if let Some(detail) = config_mapping.get_topic_queue_mapping_detail() {
449 if config_mapping.topic_config.read_queue_nums
450 < config_mapping.topic_config.write_queue_nums
451 {
452 return Err(RocketMQError::Internal(
453 "Read queues is smaller than write queues".to_string(),
454 ));
455 }
456 if let Some(queues) = &detail.hosted_queues {
457 for items in queues.values() {
458 for item in items {
459 if item.start_offset != 0 {
460 return Err(RocketMQError::Internal(
461 "The start offset does not begin from 0".to_string(),
462 ));
463 }
464 if let Some(bname) = &item.bname {
465 let topic_config =
466 broker_config_map.get(&CheetahString::from(bname));
467 if topic_config.is_none() {
468 return Err(RocketMQError::Internal(
469 "The broker of item does not exist".to_string(),
470 ));
471 } else if let Some(topic_config) = topic_config {
472 if item.queue_id
473 >= topic_config.topic_config.write_queue_nums as i32
474 {
475 return Err(RocketMQError::Internal(
476 "The physical queue id is overflow the write queues"
477 .to_string(),
478 ));
479 }
480 }
481 }
482 }
483 }
484 }
485 }
486 }
487 Ok(())
488 }
489
490 pub fn create_topic_config_mapping(
491 topic: &str,
492 queue_num: i32,
493 target_brokers: &HashSet<CheetahString>,
494 broker_config_map: &mut HashMap<CheetahString, TopicConfigAndQueueMapping>,
495 ) -> RocketMQResult<TopicRemappingDetailWrapper> {
496 TopicQueueMappingUtils::check_target_brokers_complete(target_brokers, broker_config_map)?;
497 let mut global_id_map = HashMap::new();
498 let mut max_epoch_and_num = (get_current_millis(), queue_num);
499 if !broker_config_map.is_empty() {
500 let new_max_epoch_and_num = TopicQueueMappingUtils::check_name_epoch_num_consistence(
501 &CheetahString::from(topic),
502 broker_config_map,
503 )?;
504 max_epoch_and_num.0 = new_max_epoch_and_num.0 as u64;
505 max_epoch_and_num.1 = new_max_epoch_and_num.1;
506 let mut detail_list = vec![];
507 detail_list.extend(TopicQueueMappingUtils::get_mapping_detail_from_config(
508 broker_config_map.values().cloned().collect(),
509 )?);
510 global_id_map =
511 TopicQueueMappingUtils::check_and_build_mapping_items(detail_list, false, true)?;
512 TopicQueueMappingUtils::check_if_reuse_physical_queue(
513 &global_id_map.values().cloned().collect(),
514 )?;
515 TopicQueueMappingUtils::check_physical_queue_consistence(broker_config_map)?;
516 }
517 if (queue_num as usize) < global_id_map.len() {
518 return Err(RocketMQError::Internal(format!(
519 "Cannot decrease the queue num for static topic {} < {}",
520 queue_num,
521 global_id_map.len()
522 )));
523 }
524 if (queue_num as usize) == global_id_map.len() {
526 return Err(RocketMQError::Internal(
527 "The topic queue num is equal the existed queue num, do nothing".to_string(),
528 ));
529 }
530
531 let mut broker_num_map = HashMap::new();
533 for broker in target_brokers {
534 broker_num_map.insert(broker.clone(), 0);
535 }
536 let mut old_id_to_broker = HashMap::new();
537 for entry in &global_id_map {
538 let leader_broker = entry.1.bname();
539 old_id_to_broker.insert(*entry.0, CheetahString::from(leader_broker));
540 if !broker_num_map.contains_key(leader_broker) {
541 broker_num_map.insert(leader_broker.into(), 1);
542 } else {
543 broker_num_map.insert(leader_broker.into(), broker_num_map[leader_broker] + 1);
544 }
545 }
546 let mut allocator = MappingAllocator::new(old_id_to_broker, broker_num_map, HashMap::new());
547 allocator.up_to_num(queue_num);
548 let new_id_to_broker = allocator.id_to_broker();
549
550 let new_epoch = (max_epoch_and_num.0 + 1000).max(get_current_millis());
552 for e in new_id_to_broker {
553 let queue_id = e.0;
554 let broker = e.1;
555 if global_id_map.contains_key(queue_id) {
556 continue;
558 }
559 let mut config_mapping;
560 if !broker_config_map.contains_key(broker) {
561 config_mapping = TopicConfigAndQueueMapping::new(
562 TopicConfig::new(topic),
563 Some(ArcMut::new(TopicQueueMappingDetail {
564 topic_queue_mapping_info: TopicQueueMappingInfo::new(
565 topic.into(),
566 0,
567 broker.into(),
568 get_current_millis() as i64,
569 ),
570 hosted_queues: None,
571 })),
572 );
573 config_mapping.topic_config.write_queue_nums = 1;
574 config_mapping.topic_config.read_queue_nums = 1;
575 broker_config_map.insert(broker.clone(), config_mapping.clone());
576 } else {
577 config_mapping = broker_config_map[broker].clone();
578 config_mapping.topic_config.write_queue_nums += 1;
579 config_mapping.topic_config.read_queue_nums += 1;
580 }
581 let mapping_item = LogicQueueMappingItem {
582 gen: 0,
583 queue_id: config_mapping.topic_config.write_queue_nums as i32,
584 bname: Some(broker.clone()),
585 logic_offset: 0,
586 start_offset: 0,
587 end_offset: -1,
588 time_of_start: -1,
589 time_of_end: -1,
590 };
591 if let Some(detail) = config_mapping.topic_queue_mapping_detail {
592 TopicQueueMappingDetail::put_mapping_info(
593 detail.clone(),
594 *queue_id,
595 vec![mapping_item],
596 );
597 }
598 }
599
600 for entry in &mut *broker_config_map {
602 let config_mapping = entry.1;
603 if let Some(detail) = &mut config_mapping.topic_queue_mapping_detail {
604 detail.topic_queue_mapping_info.epoch = new_epoch as i64;
605 detail.topic_queue_mapping_info.total_queues = queue_num;
606 }
607 }
608 TopicQueueMappingUtils::check_name_epoch_num_consistence(
611 &CheetahString::from(topic),
612 broker_config_map,
613 )?;
614 global_id_map = TopicQueueMappingUtils::check_and_build_mapping_items(
615 TopicQueueMappingUtils::get_mapping_detail_from_config(
616 broker_config_map.values().cloned().collect(),
617 )?,
618 false,
619 true,
620 )?;
621 TopicQueueMappingUtils::check_if_reuse_physical_queue(
622 &global_id_map.values().cloned().collect(),
623 )?;
624 TopicQueueMappingUtils::check_physical_queue_consistence(broker_config_map)?;
625 let map = broker_config_map
626 .iter()
627 .map(|(k, v)| (CheetahString::from_string(k.to_string()), v.clone()))
628 .collect();
629 Ok(TopicRemappingDetailWrapper::new(
630 topic.to_string().into(),
631 topic_remapping_detail_wrapper::TYPE_CREATE_OR_UPDATE
632 .to_string()
633 .into(),
634 new_epoch,
635 map,
636 HashSet::new(),
637 HashSet::new(),
638 ))
639 }
640}
641pub struct MappingAllocator {
642 broker_num_map: HashMap<CheetahString, i32>,
643 id_to_broker: HashMap<i32, CheetahString>,
644 broker_num_map_before_remapping: HashMap<CheetahString, i32>,
646 current_index: i32,
647 least_brokers: Vec<CheetahString>,
648}
649impl MappingAllocator {
650 pub fn new(
651 id_to_broker: HashMap<i32, CheetahString>,
652 broker_num_map: HashMap<CheetahString, i32>,
653 broker_num_map_before_remapping: HashMap<CheetahString, i32>,
654 ) -> Self {
655 Self {
656 id_to_broker,
657 broker_num_map,
658 broker_num_map_before_remapping,
659 current_index: 0,
660 least_brokers: vec![],
661 }
662 }
663
664 fn fresh_state(&mut self) {
665 let mut min_num = i32::MAX;
666 for entry in &self.broker_num_map {
667 if *entry.1 < min_num {
668 self.least_brokers.clear();
669 self.least_brokers.push(entry.0.clone());
670 min_num = *entry.1;
671 } else if *entry.1 == min_num {
672 self.least_brokers.push(entry.0.clone());
673 }
674 }
675 if !self.broker_num_map_before_remapping.is_empty() {
677 self.least_brokers.sort_by(|o1, o2| {
678 let mut i1 = 0;
679 let mut i2 = 0;
680 if self.broker_num_map_before_remapping.contains_key(o1) {
681 if let Some(s) = self.broker_num_map_before_remapping.get(o1) {
682 i1 = *s;
683 }
684 }
685 if self.broker_num_map_before_remapping.contains_key(o2) {
686 if let Some(s) = self.broker_num_map_before_remapping.get(o2) {
687 i2 = *s;
688 }
689 }
690 if i1 - i2 > 0 {
691 return std::cmp::Ordering::Greater;
692 } else if i1 - i2 < 0 {
693 return Ordering::Less;
694 }
695 Ordering::Equal
696 });
697 } else {
698 let mut rng = rand::rng();
700 self.least_brokers.shuffle(&mut rng);
701 }
702 self.current_index = (self.least_brokers.len() - 1) as i32;
703 }
704 fn next_broker(&mut self) -> CheetahString {
705 if self.least_brokers.is_empty() {
706 self.fresh_state();
707 }
708 let tmp_index = self.current_index as usize % self.least_brokers.len();
709 self.least_brokers.remove(tmp_index)
710 }
711
712 pub fn broker_num_map(&self) -> &HashMap<CheetahString, i32> {
713 &self.broker_num_map
714 }
715
716 pub fn up_to_num(&mut self, max_queue_num: i32) {
717 let curr_size = self.id_to_broker.len();
718 if (max_queue_num as usize) <= curr_size {
719 return;
720 }
721 for i in curr_size..(max_queue_num as usize) {
722 let next_broker = self.next_broker();
723 if self.broker_num_map.contains_key(&next_broker) {
724 self.broker_num_map
725 .insert(next_broker.clone(), self.broker_num_map[&next_broker] + 1);
726 } else {
727 self.broker_num_map.insert(next_broker.clone(), 1);
728 }
729 self.id_to_broker.insert(i as i32, next_broker);
730 }
731 }
732
733 pub fn id_to_broker(&self) -> &HashMap<i32, CheetahString> {
734 &self.id_to_broker
735 }
736}