rocketmq_admin_core/core/topic/
operations.rs1use std::collections::HashSet;
21
22use cheetah_string::CheetahString;
23use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
24
25use super::types::TopicClusterList;
26use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
27use crate::core::RocketMQResult;
28use crate::core::ToolsError;
29
30pub struct TopicService;
32
33pub type TopicOperations = TopicService;
35
36impl TopicService {
37 pub async fn get_topic_cluster_list(
46 admin: &mut DefaultMQAdminExt,
47 topic: impl Into<String>,
48 ) -> RocketMQResult<TopicClusterList> {
49 let topic = topic.into();
50 let clusters = admin
51 .get_topic_cluster_list(topic.clone())
52 .await
53 .map_err(|_| ToolsError::topic_not_found(topic.clone()))?;
54
55 Ok(TopicClusterList { clusters })
56 }
57
58 pub async fn get_topic_route(
67 admin: &mut DefaultMQAdminExt,
68 topic: impl Into<CheetahString>,
69 ) -> RocketMQResult<Option<rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData>> {
70 let topic = topic.into();
71 Ok(admin
72 .examine_topic_route_info(topic.clone())
73 .await
74 .map_err(|_| ToolsError::topic_not_found(topic.to_string()))?)
75 }
76
77 pub async fn delete_topic(
87 admin: &mut DefaultMQAdminExt,
88 topic: impl Into<CheetahString>,
89 cluster_name: impl Into<CheetahString>,
90 ) -> RocketMQResult<()> {
91 let topic = topic.into();
92 let cluster = cluster_name.into();
93
94 Ok(admin.delete_topic(topic.clone(), cluster.clone()).await.map_err(|e| {
95 ToolsError::internal(format!(
96 "Failed to delete topic '{topic}' from cluster '{cluster}': {e}"
97 ))
98 })?)
99 }
100
101 pub async fn create_or_update_topic(
111 admin: &mut DefaultMQAdminExt,
112 config: super::types::TopicConfig,
113 target: super::types::TopicTarget,
114 ) -> RocketMQResult<()> {
115 use rocketmq_common::common::config::TopicConfig as RocketMQTopicConfig;
116 use rocketmq_common::common::TopicFilterType;
117
118 let internal_config = RocketMQTopicConfig {
120 topic_name: Some(config.topic_name.clone()),
121 read_queue_nums: config.read_queue_nums as u32,
122 write_queue_nums: config.write_queue_nums as u32,
123 perm: config.perm as u32,
124 topic_filter_type: config
125 .topic_filter_type
126 .map(|s| TopicFilterType::from(s.as_str()))
127 .unwrap_or_default(),
128 topic_sys_flag: config.topic_sys_flag.unwrap_or(0) as u32,
129 order: config.order,
130 attributes: std::collections::HashMap::new(),
131 };
132
133 match target {
134 super::types::TopicTarget::Broker(addr) => admin
135 .create_and_update_topic_config(addr, internal_config)
136 .await
137 .map_err(|e| ToolsError::internal(format!("Failed to create/update topic: {e}")).into()),
138 super::types::TopicTarget::Cluster(cluster_name) => {
139 let cluster_info = admin
141 .examine_broker_cluster_info()
142 .await
143 .map_err(|e| ToolsError::internal(format!("Failed to get cluster info: {e}")))?;
144
145 let master_addrs = crate::commands::command_util::CommandUtil::fetch_master_addr_by_cluster_name(
147 &cluster_info,
148 &cluster_name,
149 )?;
150
151 if master_addrs.is_empty() {
152 return Err(ToolsError::ClusterNotFound {
153 cluster: cluster_name.to_string(),
154 }
155 .into());
156 }
157
158 for addr in master_addrs {
160 admin
161 .create_and_update_topic_config(addr, internal_config.clone())
162 .await
163 .map_err(|e| ToolsError::internal(format!("Failed to create/update topic: {e}")))?;
164 }
165
166 Ok(())
167 }
168 }
169 }
170
171 pub async fn batch_get_topic_clusters(
180 admin: &mut DefaultMQAdminExt,
181 topics: Vec<String>,
182 ) -> RocketMQResult<std::collections::HashMap<String, HashSet<CheetahString>>> {
183 use futures::future::join_all;
184
185 let futures = topics.iter().map(|topic| admin.get_topic_cluster_list(topic.clone()));
186
187 let results = join_all(futures).await;
188
189 let map = topics
190 .into_iter()
191 .zip(results)
192 .filter_map(|(topic, result)| match result {
193 Ok(clusters) => Some((topic, clusters)),
194 Err(e) => {
195 tracing::warn!("Failed to get clusters for topic {topic}: {e}");
196 None
197 }
198 })
199 .collect();
200
201 Ok(map)
202 }
203
204 pub async fn list_all_topics(admin: &mut DefaultMQAdminExt) -> RocketMQResult<HashSet<CheetahString>> {
212 let topic_list = admin
213 .fetch_all_topic_list()
214 .await
215 .map_err(|e| ToolsError::internal(format!("Failed to fetch topic list: {e}")))?;
216
217 Ok(topic_list.topic_list.into_iter().collect())
218 }
219
220 pub async fn get_topic_stats(
230 admin: &mut DefaultMQAdminExt,
231 topic: impl Into<CheetahString>,
232 broker_addr: Option<CheetahString>,
233 ) -> RocketMQResult<rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable> {
234 admin
235 .examine_topic_stats(topic.into(), broker_addr)
236 .await
237 .map_err(|e| ToolsError::internal(format!("Failed to get topic stats: {e}")).into())
238 }
239
240 pub async fn update_topic_perm(
251 admin: &mut DefaultMQAdminExt,
252 topic: impl Into<CheetahString>,
253 perm: i32,
254 target: super::types::TopicTarget,
255 ) -> RocketMQResult<()> {
256 use rocketmq_common::common::config::TopicConfig as RocketMQTopicConfig;
257
258 let topic = topic.into();
259
260 match target {
261 super::types::TopicTarget::Broker(broker_addr) => {
262 let topic_config = admin
264 .examine_topic_config(broker_addr.clone(), topic.clone())
265 .await
266 .map_err(|e| ToolsError::internal(format!("Failed to get topic config: {e}")))?;
267
268 let updated_config = RocketMQTopicConfig {
270 topic_name: Some(topic.clone()),
271 read_queue_nums: topic_config.read_queue_nums,
272 write_queue_nums: topic_config.write_queue_nums,
273 perm: perm as u32,
274 topic_filter_type: topic_config.topic_filter_type,
275 topic_sys_flag: topic_config.topic_sys_flag,
276 order: topic_config.order,
277 attributes: std::collections::HashMap::new(),
278 };
279
280 admin
281 .create_and_update_topic_config(broker_addr, updated_config)
282 .await
283 .map_err(|e| ToolsError::internal(format!("Failed to update topic permission: {e}")))?;
284
285 Ok(())
286 }
287 super::types::TopicTarget::Cluster(cluster_name) => {
288 let cluster_info = admin
290 .examine_broker_cluster_info()
291 .await
292 .map_err(|e| ToolsError::internal(format!("Failed to get cluster info: {e}")))?;
293
294 let master_addrs = crate::commands::command_util::CommandUtil::fetch_master_addr_by_cluster_name(
296 &cluster_info,
297 &cluster_name,
298 )?;
299
300 if master_addrs.is_empty() {
301 return Err(ToolsError::ClusterNotFound {
302 cluster: cluster_name.to_string(),
303 }
304 .into());
305 }
306
307 for broker_addr in master_addrs {
309 let topic_config = admin
310 .examine_topic_config(broker_addr.clone(), topic.clone())
311 .await
312 .map_err(|e| ToolsError::internal(format!("Failed to get topic config: {e}")))?;
313
314 let updated_config = RocketMQTopicConfig {
315 topic_name: Some(topic.clone()),
316 read_queue_nums: topic_config.read_queue_nums,
317 write_queue_nums: topic_config.write_queue_nums,
318 perm: perm as u32,
319 topic_filter_type: topic_config.topic_filter_type,
320 topic_sys_flag: topic_config.topic_sys_flag,
321 order: topic_config.order,
322 attributes: std::collections::HashMap::new(),
323 };
324
325 admin
326 .create_and_update_topic_config(broker_addr, updated_config)
327 .await
328 .map_err(|e| ToolsError::internal(format!("Failed to update topic permission: {e}")))?;
329 }
330
331 Ok(())
332 }
333 }
334 }
335
336 pub async fn query_allocated_mq(
346 admin: &mut DefaultMQAdminExt,
347 topic: impl Into<CheetahString>,
348 ip_list: impl Into<CheetahString>,
349 ) -> RocketMQResult<()> {
350 let topic = topic.into();
351 let ip_list = ip_list.into();
352
353 let ips: Vec<_> = ip_list.split(',').map(|s| s.trim()).collect();
355
356 let route_opt = admin.examine_topic_route_info(topic.clone()).await?;
358
359 if let Some(route) = route_opt {
360 println!("Topic: {topic}");
362 println!("IP List: {}", ips.join(", "));
363 println!("\nMessage Queue Allocation:");
364 println!("Total Queues: {}", route.queue_datas.len());
365 println!("\nBrokers:");
366 for broker in &route.broker_datas {
367 println!(" - {}", broker.broker_name());
368 }
369 } else {
370 println!("No route information found for topic: {topic}");
371 }
372
373 Ok(())
374 }
375
376 pub async fn create_or_update_order_conf(
386 admin: &mut DefaultMQAdminExt,
387 topic: impl Into<CheetahString>,
388 order_conf: impl Into<CheetahString>,
389 ) -> RocketMQResult<()> {
390 const NAMESPACE: &str = "ORDER_TOPIC_CONFIG";
391 admin
392 .create_and_update_kv_config(
393 CheetahString::from_static_str(NAMESPACE),
394 topic.into(),
395 order_conf.into(),
396 )
397 .await
398 .map_err(|e| ToolsError::internal(format!("Failed to update order config: {e}")).into())
399 }
400
401 pub async fn get_order_conf(
410 admin: &mut DefaultMQAdminExt,
411 topic: impl Into<CheetahString>,
412 ) -> RocketMQResult<CheetahString> {
413 const NAMESPACE: &str = "ORDER_TOPIC_CONFIG";
414 admin
415 .get_kv_config(CheetahString::from_static_str(NAMESPACE), topic.into())
416 .await
417 .map_err(|e| ToolsError::internal(format!("Failed to get order config: {e}")).into())
418 }
419
420 pub async fn delete_order_conf(
429 admin: &mut DefaultMQAdminExt,
430 topic: impl Into<CheetahString>,
431 ) -> RocketMQResult<()> {
432 const NAMESPACE: &str = "ORDER_TOPIC_CONFIG";
433 admin
434 .delete_kv_config(CheetahString::from_static_str(NAMESPACE), topic.into())
435 .await
436 .map_err(|e| ToolsError::internal(format!("Failed to delete order config: {e}")).into())
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 #[test]
443 fn test_topic_config_creation() {
444 let _config = super::super::types::TopicConfig {
446 topic_name: "test_topic".into(),
447 read_queue_nums: 8,
448 write_queue_nums: 8,
449 perm: 6,
450 topic_filter_type: None,
451 topic_sys_flag: None,
452 order: false,
453 };
454 }
455}