rocketmq_admin_core/admin/
mq_admin_utils.rs1use std::collections::HashMap;
16use std::collections::HashSet;
17
18use cheetah_string::CheetahString;
19use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
20use rocketmq_common::common::config::TopicConfig;
21use rocketmq_error::RocketMQError;
22use rocketmq_error::RocketMQResult;
23use rocketmq_remoting::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
24use rocketmq_remoting::protocol::static_topic::topic_config_and_queue_mapping::TopicConfigAndQueueMapping;
25use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
26use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_info::TopicQueueMappingInfo;
27use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils;
28use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
29use rocketmq_rust::ArcMut;
30
31use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
32
33pub struct MQAdminUtils {}
34impl MQAdminUtils {
35 pub async fn get_all_brokers_in_same_cluster(
36 brokers: Vec<CheetahString>,
37 default_mq_admin_ext: &DefaultMQAdminExt,
38 ) -> RocketMQResult<HashSet<CheetahString>> {
39 let cluster_info = default_mq_admin_ext.examine_broker_cluster_info().await?;
40 if cluster_info.cluster_addr_table.is_none() {
41 return Err(rocketmq_error::RocketMQError::Internal(
42 "The Cluster info is empty".to_string(),
43 ));
44 } else if let Some(c_table) = &cluster_info.cluster_addr_table {
45 let mut all_brokers = HashSet::new();
46 for broker in &brokers {
47 if all_brokers.contains(&CheetahString::from(broker)) {
48 continue;
49 }
50 for cluster_brokers in c_table.values() {
51 if cluster_brokers.contains(&CheetahString::from(broker)) {
52 all_brokers.extend(cluster_brokers.clone());
53 break;
54 }
55 }
56 }
57 return Ok(all_brokers);
58 }
59 Err(rocketmq_error::RocketMQError::Internal(
60 "get_all_brokers_in_same_cluster err".to_string(),
61 ))
62 }
63 pub async fn complete_no_target_brokers(
64 mut broker_config_map: HashMap<CheetahString, TopicConfigAndQueueMapping>,
65 default_mq_admin_ext: &DefaultMQAdminExt,
66 ) -> RocketMQResult<()> {
67 let config_mapping = broker_config_map.values_mut().next().cloned();
68 if let Some(config_mapping) = config_mapping {
69 if let Some(topic) = &config_mapping.topic_config.topic_name {
70 if let Some(detail) = &config_mapping.topic_queue_mapping_detail {
71 let queue_num = detail.topic_queue_mapping_info.total_queues;
72 let new_epoch = detail.topic_queue_mapping_info.epoch;
73 let all_brokers = MQAdminUtils::get_all_brokers_in_same_cluster(
74 broker_config_map.keys().cloned().collect(),
75 default_mq_admin_ext,
76 )
77 .await;
78 if let Ok(all_brokers) = &all_brokers {
79 for broker in all_brokers {
80 broker_config_map.entry(broker.clone()).or_insert_with(|| {
81 TopicConfigAndQueueMapping::new(
82 TopicConfig::new(topic.clone()),
83 Some(ArcMut::new(TopicQueueMappingDetail {
84 topic_queue_mapping_info: TopicQueueMappingInfo::new(
85 topic.clone(),
86 queue_num,
87 broker.clone(),
88 new_epoch,
89 ),
90 hosted_queues: None,
91 })),
92 )
93 });
94 }
95 }
96 }
97 }
98 }
99 Ok(())
100 }
101 pub async fn refresh_cluster_info(
102 default_mq_admin_ext: &DefaultMQAdminExt,
103 client_metadata: &ClientMetadata,
104 ) -> RocketMQResult<()> {
105 let cluster_info = default_mq_admin_ext.examine_broker_cluster_info().await;
106 if let Ok(info) = &cluster_info {
107 client_metadata.refresh_cluster_info(Some(info));
108 return Ok(());
109 }
110 Err(RocketMQError::Internal("The Cluster info is empty".to_string()))
111 }
112 pub async fn get_broker_metadata(default_mq_admin_ext: &DefaultMQAdminExt) -> RocketMQResult<ClientMetadata> {
113 let client_metadata = ClientMetadata::new();
114 MQAdminUtils::refresh_cluster_info(default_mq_admin_ext, &client_metadata).await?;
115 Ok(client_metadata)
116 }
117 pub fn check_if_master_alive(brokers: Vec<CheetahString>, client_metadata: &ClientMetadata) -> RocketMQResult<()> {
118 for broker in &brokers {
119 let addr = client_metadata.find_master_broker_addr(broker);
120 if addr.is_none() {
121 return Err(RocketMQError::Internal(format!(
122 "Can't find addr for broker {}",
123 broker
124 )));
125 }
126 }
127 Ok(())
128 }
129 pub async fn update_topic_config_mapping_all(
130 broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
131 default_mq_admin_ext: &DefaultMQAdminExt,
132 force: bool,
133 ) -> RocketMQResult<()> {
134 let client_meta_data = MQAdminUtils::get_broker_metadata(default_mq_admin_ext).await?;
135 MQAdminUtils::check_if_master_alive(broker_config_map.keys().cloned().collect(), &client_meta_data)?;
136 for entry in broker_config_map {
138 let broker = entry.0;
139 let addr = client_meta_data.find_master_broker_addr(broker);
140 let config_mapping = entry.1;
141 if let Some(addr) = &addr {
142 if let Some(topic) = &config_mapping.topic_config.topic_name {
143 if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
144 default_mq_admin_ext
145 .create_static_topic(
146 addr.clone(),
147 topic.clone(),
148 config_mapping.topic_config.clone(),
149 (**mapping_detail).clone(),
150 force,
151 )
152 .await?;
153 }
154 }
155 }
156 }
157 Ok(())
158 }
159 pub async fn examine_topic_config_all(
160 topic: &CheetahString,
161 default_mq_admin_ext: &DefaultMQAdminExt,
162 ) -> RocketMQResult<HashMap<CheetahString, TopicConfigAndQueueMapping>> {
163 let mut broker_config_map = HashMap::new();
164 let client_metadata = ClientMetadata::new();
165 let cluster_info = default_mq_admin_ext.examine_broker_cluster_info().await?;
167 if cluster_info.broker_addr_table.is_some() {
168 client_metadata.refresh_cluster_info(Some(&cluster_info));
169 let keys = {
170 let addr_table = client_metadata.broker_addr_table();
171 let addr_table = addr_table.read();
172 addr_table.keys().cloned().collect::<Vec<CheetahString>>()
173 };
174 for broker in keys {
175 let addr = client_metadata.find_master_broker_addr(&broker);
176 if let Some(addr) = &addr {
177 let mapping = TopicConfigAndQueueMapping::new(
178 default_mq_admin_ext
179 .examine_topic_config(addr.clone(), topic.to_string().into())
180 .await?,
181 None,
182 );
183 broker_config_map.insert(broker.clone(), mapping);
185 }
186 }
187 }
188
189 Ok(broker_config_map)
190 }
191 pub async fn remapping_static_topic(
192 topic: &CheetahString,
193 brokers_to_map_in: &HashSet<CheetahString>,
194 brokers_to_map_out: &HashSet<CheetahString>,
195 broker_config_map: &mut HashMap<CheetahString, TopicConfigAndQueueMapping>,
196 block_seq_size: i64,
197 force: bool,
198 default_mq_admin_ext: &DefaultMQAdminExt,
199 ) -> RocketMQResult<()> {
200 let client_metadata = MQAdminUtils::get_broker_metadata(default_mq_admin_ext).await?;
201 MQAdminUtils::check_if_master_alive(broker_config_map.keys().cloned().collect(), &client_metadata)?;
202
203 for broker in brokers_to_map_in {
204 let addr = client_metadata.find_master_broker_addr(broker);
205 if let Some(addr) = addr {
206 if let Some(config_mapping) = broker_config_map.get(broker) {
207 if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
208 default_mq_admin_ext
209 .create_static_topic(
210 addr,
211 topic.clone(),
212 config_mapping.topic_config.clone(),
213 (**mapping_detail).clone(),
214 force,
215 )
216 .await?;
217 }
218 }
219 }
220 }
221
222 for broker in brokers_to_map_out {
223 let addr = client_metadata.find_master_broker_addr(broker);
224 if let Some(addr) = addr {
225 if let Some(config_mapping) = broker_config_map.get(broker) {
226 if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
227 default_mq_admin_ext
228 .create_static_topic(
229 addr,
230 topic.clone(),
231 config_mapping.topic_config.clone(),
232 (**mapping_detail).clone(),
233 force,
234 )
235 .await?;
236 }
237 }
238 }
239 }
240
241 let mut updates_to_apply: Vec<(CheetahString, i32, Vec<LogicQueueMappingItem>)> = vec![];
242
243 for broker in brokers_to_map_out {
244 let addr = client_metadata.find_master_broker_addr(broker);
245 if let Some(addr) = addr {
246 let stats_table = default_mq_admin_ext
247 .examine_topic_stats(topic.clone(), Some(addr.clone()))
248 .await?;
249 let offset_table = stats_table.get_offset_table();
250
251 if let Some(map_out_config) = broker_config_map.get_mut(broker) {
252 if let Some(mapping_detail) = &mut map_out_config.topic_queue_mapping_detail {
253 if let Some(hosted_queues) = &mut mapping_detail.hosted_queues {
254 for (global_id, items) in hosted_queues.iter_mut() {
255 if items.len() < 2 {
256 continue;
257 }
258 let items_len = items.len();
259 let old_leader = items.get(items_len - 2).cloned();
260 let new_leader = items.last_mut();
261
262 if let (Some(new_leader), Some(old_leader)) = (new_leader, old_leader) {
263 if new_leader.logic_offset > 0 {
264 continue;
265 }
266
267 let old_broker_name = old_leader.bname.clone().unwrap_or_default();
268 let old_queue_id = old_leader.queue_id;
269
270 let topic_offset = offset_table.get(
271 &rocketmq_common::common::message::message_queue::MessageQueue::from_parts(
272 topic.clone(),
273 old_broker_name.clone(),
274 old_queue_id,
275 ),
276 );
277
278 if let Some(topic_offset) = topic_offset {
279 if topic_offset.get_max_offset() < old_leader.start_offset {
280 return Err(RocketMQError::Internal(format!(
281 "The max offset is smaller than the start offset {:?} {}",
282 old_leader,
283 topic_offset.get_max_offset()
284 )));
285 }
286 new_leader.logic_offset = TopicQueueMappingUtils::block_seq_round_up(
287 old_leader
288 .compute_static_queue_offset_strictly(topic_offset.get_max_offset()),
289 block_seq_size,
290 );
291
292 if let Some(new_leader_bname) = &new_leader.bname {
294 updates_to_apply.push((
295 new_leader_bname.clone(),
296 *global_id,
297 items.clone(),
298 ));
299 }
300 } else {
301 return Err(RocketMQError::Internal(format!(
302 "Cannot get the max offset for old leader {:?}",
303 old_leader
304 )));
305 }
306 }
307 }
308 }
309 }
310 }
311 }
312 }
313
314 for (broker_name, global_id, items) in updates_to_apply {
316 if let Some(map_in_config) = broker_config_map.get_mut(&broker_name) {
317 if let Some(map_in_detail) = &mut map_in_config.topic_queue_mapping_detail {
318 if let Some(map_in_queues) = &mut map_in_detail.hosted_queues {
319 map_in_queues.insert(global_id, items);
320 }
321 }
322 }
323 }
324
325 for broker in brokers_to_map_in {
327 let addr = client_metadata.find_master_broker_addr(broker);
328 if let Some(addr) = addr {
329 if let Some(config_mapping) = broker_config_map.get(broker) {
330 if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
331 default_mq_admin_ext
332 .create_static_topic(
333 addr,
334 topic.clone(),
335 config_mapping.topic_config.clone(),
336 (**mapping_detail).clone(),
337 force,
338 )
339 .await?;
340 }
341 }
342 }
343 }
344
345 for (broker, config_mapping) in broker_config_map.iter() {
347 if brokers_to_map_in.contains(broker) || brokers_to_map_out.contains(broker) {
348 continue;
349 }
350 let addr = client_metadata.find_master_broker_addr(broker);
351 if let Some(addr) = addr {
352 if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
353 default_mq_admin_ext
354 .create_static_topic(
355 addr,
356 topic.clone(),
357 config_mapping.topic_config.clone(),
358 (**mapping_detail).clone(),
359 force,
360 )
361 .await?;
362 }
363 }
364 }
365
366 Ok(())
367 }
368}