Skip to main content

rocketmq_admin_core/core/topic/
operations.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Topic operations - Core business logic
16//!
17//! This module contains reusable topic management operations that can be
18//! used by CLI, API, or any other interface.
19
20use std::collections::HashSet;
21
22use cheetah_string::CheetahString;
23use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
24
25use super::types::TopicClusterList;
26use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
27use crate::core::RocketMQResult;
28use crate::core::ToolsError;
29
30/// Topic operations service
31pub struct TopicService;
32
33/// Alias for TopicService for compatibility
34pub type TopicOperations = TopicService;
35
36impl TopicService {
37    /// Get cluster list for a given topic
38    ///
39    /// # Arguments
40    /// * `admin` - Admin client instance
41    /// * `topic` - Topic name
42    ///
43    /// # Returns
44    /// Set of cluster names containing the topic
45    pub async fn get_topic_cluster_list(
46        admin: &mut DefaultMQAdminExt,
47        topic: impl Into<String>,
48    ) -> RocketMQResult<TopicClusterList> {
49        let topic = topic.into();
50        let clusters = admin
51            .get_topic_cluster_list(topic.clone())
52            .await
53            .map_err(|_| ToolsError::topic_not_found(topic.clone()))?;
54
55        Ok(TopicClusterList { clusters })
56    }
57
58    /// Get topic route information
59    ///
60    /// # Arguments
61    /// * `admin` - Admin client instance
62    /// * `topic` - Topic name
63    ///
64    /// # Returns
65    /// Topic route data including broker and queue information
66    pub async fn get_topic_route(
67        admin: &mut DefaultMQAdminExt,
68        topic: impl Into<CheetahString>,
69    ) -> RocketMQResult<Option<rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData>> {
70        let topic = topic.into();
71        Ok(admin
72            .examine_topic_route_info(topic.clone())
73            .await
74            .map_err(|_| ToolsError::topic_not_found(topic.to_string()))?)
75    }
76
77    /// Delete a topic from cluster
78    ///
79    /// # Arguments
80    /// * `admin` - Admin client instance
81    /// * `topic` - Topic name
82    /// * `cluster_name` - Cluster name (optional)
83    ///
84    /// # Returns
85    /// Result indicating success or failure
86    pub async fn delete_topic(
87        admin: &mut DefaultMQAdminExt,
88        topic: impl Into<CheetahString>,
89        cluster_name: impl Into<CheetahString>,
90    ) -> RocketMQResult<()> {
91        let topic = topic.into();
92        let cluster = cluster_name.into();
93
94        Ok(admin.delete_topic(topic.clone(), cluster.clone()).await.map_err(|e| {
95            ToolsError::internal(format!(
96                "Failed to delete topic '{topic}' from cluster '{cluster}': {e}"
97            ))
98        })?)
99    }
100
101    /// Create or update a topic configuration
102    ///
103    /// # Arguments
104    /// * `admin` - Admin client instance
105    /// * `config` - Topic configuration
106    /// * `target` - Target broker address or cluster name
107    ///
108    /// # Returns
109    /// Result indicating success or failure
110    pub async fn create_or_update_topic(
111        admin: &mut DefaultMQAdminExt,
112        config: super::types::TopicConfig,
113        target: super::types::TopicTarget,
114    ) -> RocketMQResult<()> {
115        use rocketmq_common::common::config::TopicConfig as RocketMQTopicConfig;
116        use rocketmq_common::common::TopicFilterType;
117
118        // Convert to internal TopicConfig
119        let internal_config = RocketMQTopicConfig {
120            topic_name: Some(config.topic_name.clone()),
121            read_queue_nums: config.read_queue_nums as u32,
122            write_queue_nums: config.write_queue_nums as u32,
123            perm: config.perm as u32,
124            topic_filter_type: config
125                .topic_filter_type
126                .map(|s| TopicFilterType::from(s.as_str()))
127                .unwrap_or_default(),
128            topic_sys_flag: config.topic_sys_flag.unwrap_or(0) as u32,
129            order: config.order,
130            attributes: std::collections::HashMap::new(),
131        };
132
133        match target {
134            super::types::TopicTarget::Broker(addr) => admin
135                .create_and_update_topic_config(addr, internal_config)
136                .await
137                .map_err(|e| ToolsError::internal(format!("Failed to create/update topic: {e}")).into()),
138            super::types::TopicTarget::Cluster(cluster_name) => {
139                // Get all master brokers in cluster
140                let cluster_info = admin
141                    .examine_broker_cluster_info()
142                    .await
143                    .map_err(|e| ToolsError::internal(format!("Failed to get cluster info: {e}")))?;
144
145                // Find master brokers in the cluster
146                let master_addrs = crate::commands::command_util::CommandUtil::fetch_master_addr_by_cluster_name(
147                    &cluster_info,
148                    &cluster_name,
149                )?;
150
151                if master_addrs.is_empty() {
152                    return Err(ToolsError::ClusterNotFound {
153                        cluster: cluster_name.to_string(),
154                    }
155                    .into());
156                }
157
158                // Create topic on all master brokers
159                for addr in master_addrs {
160                    admin
161                        .create_and_update_topic_config(addr, internal_config.clone())
162                        .await
163                        .map_err(|e| ToolsError::internal(format!("Failed to create/update topic: {e}")))?;
164                }
165
166                Ok(())
167            }
168        }
169    }
170
171    /// Batch get cluster lists for multiple topics
172    ///
173    /// # Arguments
174    /// * `admin` - Admin client instance
175    /// * `topics` - List of topic names
176    ///
177    /// # Returns
178    /// Map of topic name to cluster list
179    pub async fn batch_get_topic_clusters(
180        admin: &mut DefaultMQAdminExt,
181        topics: Vec<String>,
182    ) -> RocketMQResult<std::collections::HashMap<String, HashSet<CheetahString>>> {
183        use futures::future::join_all;
184
185        let futures = topics.iter().map(|topic| admin.get_topic_cluster_list(topic.clone()));
186
187        let results = join_all(futures).await;
188
189        let map = topics
190            .into_iter()
191            .zip(results)
192            .filter_map(|(topic, result)| match result {
193                Ok(clusters) => Some((topic, clusters)),
194                Err(e) => {
195                    tracing::warn!("Failed to get clusters for topic {topic}: {e}");
196                    None
197                }
198            })
199            .collect();
200
201        Ok(map)
202    }
203
204    /// List all topics
205    ///
206    /// # Arguments
207    /// * `admin` - Admin client instance
208    ///
209    /// # Returns
210    /// Set of all topic names
211    pub async fn list_all_topics(admin: &mut DefaultMQAdminExt) -> RocketMQResult<HashSet<CheetahString>> {
212        let topic_list = admin
213            .fetch_all_topic_list()
214            .await
215            .map_err(|e| ToolsError::internal(format!("Failed to fetch topic list: {e}")))?;
216
217        Ok(topic_list.topic_list.into_iter().collect())
218    }
219
220    /// Get topic statistics
221    ///
222    /// # Arguments
223    /// * `admin` - Admin client instance
224    /// * `topic` - Topic name
225    /// * `broker_addr` - Optional broker address
226    ///
227    /// # Returns
228    /// Topic statistics table
229    pub async fn get_topic_stats(
230        admin: &mut DefaultMQAdminExt,
231        topic: impl Into<CheetahString>,
232        broker_addr: Option<CheetahString>,
233    ) -> RocketMQResult<rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable> {
234        admin
235            .examine_topic_stats(topic.into(), broker_addr)
236            .await
237            .map_err(|e| ToolsError::internal(format!("Failed to get topic stats: {e}")).into())
238    }
239
240    /// Update topic permission
241    ///
242    /// # Arguments
243    /// * `admin` - Admin client instance
244    /// * `topic` - Topic name
245    /// * `perm` - New permission value (2=W, 4=R, 6=RW)
246    /// * `target` - Target broker or cluster
247    ///
248    /// # Returns
249    /// Result indicating success or failure
250    pub async fn update_topic_perm(
251        admin: &mut DefaultMQAdminExt,
252        topic: impl Into<CheetahString>,
253        perm: i32,
254        target: super::types::TopicTarget,
255    ) -> RocketMQResult<()> {
256        use rocketmq_common::common::config::TopicConfig as RocketMQTopicConfig;
257
258        let topic = topic.into();
259
260        match target {
261            super::types::TopicTarget::Broker(broker_addr) => {
262                // Get existing config
263                let topic_config = admin
264                    .examine_topic_config(broker_addr.clone(), topic.clone())
265                    .await
266                    .map_err(|e| ToolsError::internal(format!("Failed to get topic config: {e}")))?;
267
268                // Update permission
269                let updated_config = RocketMQTopicConfig {
270                    topic_name: Some(topic.clone()),
271                    read_queue_nums: topic_config.read_queue_nums,
272                    write_queue_nums: topic_config.write_queue_nums,
273                    perm: perm as u32,
274                    topic_filter_type: topic_config.topic_filter_type,
275                    topic_sys_flag: topic_config.topic_sys_flag,
276                    order: topic_config.order,
277                    attributes: std::collections::HashMap::new(),
278                };
279
280                admin
281                    .create_and_update_topic_config(broker_addr, updated_config)
282                    .await
283                    .map_err(|e| ToolsError::internal(format!("Failed to update topic permission: {e}")))?;
284
285                Ok(())
286            }
287            super::types::TopicTarget::Cluster(cluster_name) => {
288                // Get cluster info
289                let cluster_info = admin
290                    .examine_broker_cluster_info()
291                    .await
292                    .map_err(|e| ToolsError::internal(format!("Failed to get cluster info: {e}")))?;
293
294                // Find master brokers
295                let master_addrs = crate::commands::command_util::CommandUtil::fetch_master_addr_by_cluster_name(
296                    &cluster_info,
297                    &cluster_name,
298                )?;
299
300                if master_addrs.is_empty() {
301                    return Err(ToolsError::ClusterNotFound {
302                        cluster: cluster_name.to_string(),
303                    }
304                    .into());
305                }
306
307                // Update on all master brokers
308                for broker_addr in master_addrs {
309                    let topic_config = admin
310                        .examine_topic_config(broker_addr.clone(), topic.clone())
311                        .await
312                        .map_err(|e| ToolsError::internal(format!("Failed to get topic config: {e}")))?;
313
314                    let updated_config = RocketMQTopicConfig {
315                        topic_name: Some(topic.clone()),
316                        read_queue_nums: topic_config.read_queue_nums,
317                        write_queue_nums: topic_config.write_queue_nums,
318                        perm: perm as u32,
319                        topic_filter_type: topic_config.topic_filter_type,
320                        topic_sys_flag: topic_config.topic_sys_flag,
321                        order: topic_config.order,
322                        attributes: std::collections::HashMap::new(),
323                    };
324
325                    admin
326                        .create_and_update_topic_config(broker_addr, updated_config)
327                        .await
328                        .map_err(|e| ToolsError::internal(format!("Failed to update topic permission: {e}")))?;
329                }
330
331                Ok(())
332            }
333        }
334    }
335
336    /// Query message queues allocated for a topic on specific IPs
337    ///
338    /// # Arguments
339    /// * `admin` - Admin client instance
340    /// * `topic` - Topic name
341    /// * `ip_list` - Comma-separated IP addresses
342    ///
343    /// # Returns
344    /// Result indicating success or failure
345    pub async fn query_allocated_mq(
346        admin: &mut DefaultMQAdminExt,
347        topic: impl Into<CheetahString>,
348        ip_list: impl Into<CheetahString>,
349    ) -> RocketMQResult<()> {
350        let topic = topic.into();
351        let ip_list = ip_list.into();
352
353        // Parse IP list
354        let ips: Vec<_> = ip_list.split(',').map(|s| s.trim()).collect();
355
356        // Get topic route
357        let route_opt = admin.examine_topic_route_info(topic.clone()).await?;
358
359        if let Some(route) = route_opt {
360            // Build a simple allocation overview
361            println!("Topic: {topic}");
362            println!("IP List: {}", ips.join(", "));
363            println!("\nMessage Queue Allocation:");
364            println!("Total Queues: {}", route.queue_datas.len());
365            println!("\nBrokers:");
366            for broker in &route.broker_datas {
367                println!("  - {}", broker.broker_name());
368            }
369        } else {
370            println!("No route information found for topic: {topic}");
371        }
372
373        Ok(())
374    }
375
376    /// Create or update order configuration
377    ///
378    /// # Arguments
379    /// * `admin` - Admin client instance
380    /// * `topic` - Topic name
381    /// * `order_conf` - Order configuration (e.g., "broker-a:4;broker-b:4")
382    ///
383    /// # Returns
384    /// Result indicating success or failure
385    pub async fn create_or_update_order_conf(
386        admin: &mut DefaultMQAdminExt,
387        topic: impl Into<CheetahString>,
388        order_conf: impl Into<CheetahString>,
389    ) -> RocketMQResult<()> {
390        const NAMESPACE: &str = "ORDER_TOPIC_CONFIG";
391        admin
392            .create_and_update_kv_config(
393                CheetahString::from_static_str(NAMESPACE),
394                topic.into(),
395                order_conf.into(),
396            )
397            .await
398            .map_err(|e| ToolsError::internal(format!("Failed to update order config: {e}")).into())
399    }
400
401    /// Get order configuration
402    ///
403    /// # Arguments
404    /// * `admin` - Admin client instance
405    /// * `topic` - Topic name
406    ///
407    /// # Returns
408    /// Order configuration string
409    pub async fn get_order_conf(
410        admin: &mut DefaultMQAdminExt,
411        topic: impl Into<CheetahString>,
412    ) -> RocketMQResult<CheetahString> {
413        const NAMESPACE: &str = "ORDER_TOPIC_CONFIG";
414        admin
415            .get_kv_config(CheetahString::from_static_str(NAMESPACE), topic.into())
416            .await
417            .map_err(|e| ToolsError::internal(format!("Failed to get order config: {e}")).into())
418    }
419
420    /// Delete order configuration
421    ///
422    /// # Arguments
423    /// * `admin` - Admin client instance
424    /// * `topic` - Topic name
425    ///
426    /// # Returns
427    /// Result indicating success or failure
428    pub async fn delete_order_conf(
429        admin: &mut DefaultMQAdminExt,
430        topic: impl Into<CheetahString>,
431    ) -> RocketMQResult<()> {
432        const NAMESPACE: &str = "ORDER_TOPIC_CONFIG";
433        admin
434            .delete_kv_config(CheetahString::from_static_str(NAMESPACE), topic.into())
435            .await
436            .map_err(|e| ToolsError::internal(format!("Failed to delete order config: {e}")).into())
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    #[test]
443    fn test_topic_config_creation() {
444        // Test topic config structure
445        let _config = super::super::types::TopicConfig {
446            topic_name: "test_topic".into(),
447            read_queue_nums: 8,
448            write_queue_nums: 8,
449            perm: 6,
450            topic_filter_type: None,
451            topic_sys_flag: None,
452            order: false,
453        };
454    }
455}