Skip to main content

rocketmq_admin_core/admin/
mq_admin_utils.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::HashSet;
17
18use cheetah_string::CheetahString;
19use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
20use rocketmq_common::common::config::TopicConfig;
21use rocketmq_error::RocketMQError;
22use rocketmq_error::RocketMQResult;
23use rocketmq_remoting::protocol::static_topic::logic_queue_mapping_item::LogicQueueMappingItem;
24use rocketmq_remoting::protocol::static_topic::topic_config_and_queue_mapping::TopicConfigAndQueueMapping;
25use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
26use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_info::TopicQueueMappingInfo;
27use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils;
28use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
29use rocketmq_rust::ArcMut;
30
31use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
32
33pub struct MQAdminUtils {}
34impl MQAdminUtils {
35    pub async fn get_all_brokers_in_same_cluster(
36        brokers: Vec<CheetahString>,
37        default_mq_admin_ext: &DefaultMQAdminExt,
38    ) -> RocketMQResult<HashSet<CheetahString>> {
39        let cluster_info = default_mq_admin_ext.examine_broker_cluster_info().await?;
40        if cluster_info.cluster_addr_table.is_none() {
41            return Err(rocketmq_error::RocketMQError::Internal(
42                "The Cluster info is empty".to_string(),
43            ));
44        } else if let Some(c_table) = &cluster_info.cluster_addr_table {
45            let mut all_brokers = HashSet::new();
46            for broker in &brokers {
47                if all_brokers.contains(&CheetahString::from(broker)) {
48                    continue;
49                }
50                for cluster_brokers in c_table.values() {
51                    if cluster_brokers.contains(&CheetahString::from(broker)) {
52                        all_brokers.extend(cluster_brokers.clone());
53                        break;
54                    }
55                }
56            }
57            return Ok(all_brokers);
58        }
59        Err(rocketmq_error::RocketMQError::Internal(
60            "get_all_brokers_in_same_cluster err".to_string(),
61        ))
62    }
63    pub async fn complete_no_target_brokers(
64        mut broker_config_map: HashMap<CheetahString, TopicConfigAndQueueMapping>,
65        default_mq_admin_ext: &DefaultMQAdminExt,
66    ) -> RocketMQResult<()> {
67        let config_mapping = broker_config_map.values_mut().next().cloned();
68        if let Some(config_mapping) = config_mapping {
69            if let Some(topic) = &config_mapping.topic_config.topic_name {
70                if let Some(detail) = &config_mapping.topic_queue_mapping_detail {
71                    let queue_num = detail.topic_queue_mapping_info.total_queues;
72                    let new_epoch = detail.topic_queue_mapping_info.epoch;
73                    let all_brokers = MQAdminUtils::get_all_brokers_in_same_cluster(
74                        broker_config_map.keys().cloned().collect(),
75                        default_mq_admin_ext,
76                    )
77                    .await;
78                    if let Ok(all_brokers) = &all_brokers {
79                        for broker in all_brokers {
80                            broker_config_map.entry(broker.clone()).or_insert_with(|| {
81                                TopicConfigAndQueueMapping::new(
82                                    TopicConfig::new(topic.clone()),
83                                    Some(ArcMut::new(TopicQueueMappingDetail {
84                                        topic_queue_mapping_info: TopicQueueMappingInfo::new(
85                                            topic.clone(),
86                                            queue_num,
87                                            broker.clone(),
88                                            new_epoch,
89                                        ),
90                                        hosted_queues: None,
91                                    })),
92                                )
93                            });
94                        }
95                    }
96                }
97            }
98        }
99        Ok(())
100    }
101    pub async fn refresh_cluster_info(
102        default_mq_admin_ext: &DefaultMQAdminExt,
103        client_metadata: &ClientMetadata,
104    ) -> RocketMQResult<()> {
105        let cluster_info = default_mq_admin_ext.examine_broker_cluster_info().await;
106        if let Ok(info) = &cluster_info {
107            client_metadata.refresh_cluster_info(Some(info));
108            return Ok(());
109        }
110        Err(RocketMQError::Internal("The Cluster info is empty".to_string()))
111    }
112    pub async fn get_broker_metadata(default_mq_admin_ext: &DefaultMQAdminExt) -> RocketMQResult<ClientMetadata> {
113        let client_metadata = ClientMetadata::new();
114        MQAdminUtils::refresh_cluster_info(default_mq_admin_ext, &client_metadata).await?;
115        Ok(client_metadata)
116    }
117    pub fn check_if_master_alive(brokers: Vec<CheetahString>, client_metadata: &ClientMetadata) -> RocketMQResult<()> {
118        for broker in &brokers {
119            let addr = client_metadata.find_master_broker_addr(broker);
120            if addr.is_none() {
121                return Err(RocketMQError::Internal(format!(
122                    "Can't find addr for broker {}",
123                    broker
124                )));
125            }
126        }
127        Ok(())
128    }
129    pub async fn update_topic_config_mapping_all(
130        broker_config_map: &HashMap<CheetahString, TopicConfigAndQueueMapping>,
131        default_mq_admin_ext: &DefaultMQAdminExt,
132        force: bool,
133    ) -> RocketMQResult<()> {
134        let client_meta_data = MQAdminUtils::get_broker_metadata(default_mq_admin_ext).await?;
135        MQAdminUtils::check_if_master_alive(broker_config_map.keys().cloned().collect(), &client_meta_data)?;
136        //If some succeed, and others fail, it will cause inconsistent data
137        for entry in broker_config_map {
138            let broker = entry.0;
139            let addr = client_meta_data.find_master_broker_addr(broker);
140            let config_mapping = entry.1;
141            if let Some(addr) = &addr {
142                if let Some(topic) = &config_mapping.topic_config.topic_name {
143                    if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
144                        default_mq_admin_ext
145                            .create_static_topic(
146                                addr.clone(),
147                                topic.clone(),
148                                config_mapping.topic_config.clone(),
149                                (**mapping_detail).clone(),
150                                force,
151                            )
152                            .await?;
153                    }
154                }
155            }
156        }
157        Ok(())
158    }
159    pub async fn examine_topic_config_all(
160        topic: &CheetahString,
161        default_mq_admin_ext: &DefaultMQAdminExt,
162    ) -> RocketMQResult<HashMap<CheetahString, TopicConfigAndQueueMapping>> {
163        let mut broker_config_map = HashMap::new();
164        let client_metadata = ClientMetadata::new();
165        //check all the brokers
166        let cluster_info = default_mq_admin_ext.examine_broker_cluster_info().await?;
167        if cluster_info.broker_addr_table.is_some() {
168            client_metadata.refresh_cluster_info(Some(&cluster_info));
169            let keys = {
170                let addr_table = client_metadata.broker_addr_table();
171                let addr_table = addr_table.read();
172                addr_table.keys().cloned().collect::<Vec<CheetahString>>()
173            };
174            for broker in keys {
175                let addr = client_metadata.find_master_broker_addr(&broker);
176                if let Some(addr) = &addr {
177                    let mapping = TopicConfigAndQueueMapping::new(
178                        default_mq_admin_ext
179                            .examine_topic_config(addr.clone(), topic.to_string().into())
180                            .await?,
181                        None,
182                    );
183                    //allow the config is null
184                    broker_config_map.insert(broker.clone(), mapping);
185                }
186            }
187        }
188
189        Ok(broker_config_map)
190    }
191    pub async fn remapping_static_topic(
192        topic: &CheetahString,
193        brokers_to_map_in: &HashSet<CheetahString>,
194        brokers_to_map_out: &HashSet<CheetahString>,
195        broker_config_map: &mut HashMap<CheetahString, TopicConfigAndQueueMapping>,
196        block_seq_size: i64,
197        force: bool,
198        default_mq_admin_ext: &DefaultMQAdminExt,
199    ) -> RocketMQResult<()> {
200        let client_metadata = MQAdminUtils::get_broker_metadata(default_mq_admin_ext).await?;
201        MQAdminUtils::check_if_master_alive(broker_config_map.keys().cloned().collect(), &client_metadata)?;
202
203        for broker in brokers_to_map_in {
204            let addr = client_metadata.find_master_broker_addr(broker);
205            if let Some(addr) = addr {
206                if let Some(config_mapping) = broker_config_map.get(broker) {
207                    if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
208                        default_mq_admin_ext
209                            .create_static_topic(
210                                addr,
211                                topic.clone(),
212                                config_mapping.topic_config.clone(),
213                                (**mapping_detail).clone(),
214                                force,
215                            )
216                            .await?;
217                    }
218                }
219            }
220        }
221
222        for broker in brokers_to_map_out {
223            let addr = client_metadata.find_master_broker_addr(broker);
224            if let Some(addr) = addr {
225                if let Some(config_mapping) = broker_config_map.get(broker) {
226                    if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
227                        default_mq_admin_ext
228                            .create_static_topic(
229                                addr,
230                                topic.clone(),
231                                config_mapping.topic_config.clone(),
232                                (**mapping_detail).clone(),
233                                force,
234                            )
235                            .await?;
236                    }
237                }
238            }
239        }
240
241        let mut updates_to_apply: Vec<(CheetahString, i32, Vec<LogicQueueMappingItem>)> = vec![];
242
243        for broker in brokers_to_map_out {
244            let addr = client_metadata.find_master_broker_addr(broker);
245            if let Some(addr) = addr {
246                let stats_table = default_mq_admin_ext
247                    .examine_topic_stats(topic.clone(), Some(addr.clone()))
248                    .await?;
249                let offset_table = stats_table.get_offset_table();
250
251                if let Some(map_out_config) = broker_config_map.get_mut(broker) {
252                    if let Some(mapping_detail) = &mut map_out_config.topic_queue_mapping_detail {
253                        if let Some(hosted_queues) = &mut mapping_detail.hosted_queues {
254                            for (global_id, items) in hosted_queues.iter_mut() {
255                                if items.len() < 2 {
256                                    continue;
257                                }
258                                let items_len = items.len();
259                                let old_leader = items.get(items_len - 2).cloned();
260                                let new_leader = items.last_mut();
261
262                                if let (Some(new_leader), Some(old_leader)) = (new_leader, old_leader) {
263                                    if new_leader.logic_offset > 0 {
264                                        continue;
265                                    }
266
267                                    let old_broker_name = old_leader.bname.clone().unwrap_or_default();
268                                    let old_queue_id = old_leader.queue_id;
269
270                                    let topic_offset = offset_table.get(
271                                        &rocketmq_common::common::message::message_queue::MessageQueue::from_parts(
272                                            topic.clone(),
273                                            old_broker_name.clone(),
274                                            old_queue_id,
275                                        ),
276                                    );
277
278                                    if let Some(topic_offset) = topic_offset {
279                                        if topic_offset.get_max_offset() < old_leader.start_offset {
280                                            return Err(RocketMQError::Internal(format!(
281                                                "The max offset is smaller than the start offset {:?} {}",
282                                                old_leader,
283                                                topic_offset.get_max_offset()
284                                            )));
285                                        }
286                                        new_leader.logic_offset = TopicQueueMappingUtils::block_seq_round_up(
287                                            old_leader
288                                                .compute_static_queue_offset_strictly(topic_offset.get_max_offset()),
289                                            block_seq_size,
290                                        );
291
292                                        // collect updates for map_in_config
293                                        if let Some(new_leader_bname) = &new_leader.bname {
294                                            updates_to_apply.push((
295                                                new_leader_bname.clone(),
296                                                *global_id,
297                                                items.clone(),
298                                            ));
299                                        }
300                                    } else {
301                                        return Err(RocketMQError::Internal(format!(
302                                            "Cannot get the max offset for old leader {:?}",
303                                            old_leader
304                                        )));
305                                    }
306                                }
307                            }
308                        }
309                    }
310                }
311            }
312        }
313
314        // apply collected updates to map_in_config
315        for (broker_name, global_id, items) in updates_to_apply {
316            if let Some(map_in_config) = broker_config_map.get_mut(&broker_name) {
317                if let Some(map_in_detail) = &mut map_in_config.topic_queue_mapping_detail {
318                    if let Some(map_in_queues) = &mut map_in_detail.hosted_queues {
319                        map_in_queues.insert(global_id, items);
320                    }
321                }
322            }
323        }
324
325        // write to the new leader with logic offset
326        for broker in brokers_to_map_in {
327            let addr = client_metadata.find_master_broker_addr(broker);
328            if let Some(addr) = addr {
329                if let Some(config_mapping) = broker_config_map.get(broker) {
330                    if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
331                        default_mq_admin_ext
332                            .create_static_topic(
333                                addr,
334                                topic.clone(),
335                                config_mapping.topic_config.clone(),
336                                (**mapping_detail).clone(),
337                                force,
338                            )
339                            .await?;
340                    }
341                }
342            }
343        }
344
345        // write the non-target brokers
346        for (broker, config_mapping) in broker_config_map.iter() {
347            if brokers_to_map_in.contains(broker) || brokers_to_map_out.contains(broker) {
348                continue;
349            }
350            let addr = client_metadata.find_master_broker_addr(broker);
351            if let Some(addr) = addr {
352                if let Some(mapping_detail) = &config_mapping.topic_queue_mapping_detail {
353                    default_mq_admin_ext
354                        .create_static_topic(
355                            addr,
356                            topic.clone(),
357                            config_mapping.topic_config.clone(),
358                            (**mapping_detail).clone(),
359                            force,
360                        )
361                        .await?;
362                }
363            }
364        }
365
366        Ok(())
367    }
368}