rocketmq_controller/metadata/
topic.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::Arc;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::info;
24
25use crate::config::ControllerConfig;
26use crate::error::ControllerError;
27use crate::error::Result;
28
29/// Topic configuration
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TopicConfig {
32    /// Topic name
33    pub topic_name: String,
34
35    /// Number of read queues
36    pub read_queue_nums: u32,
37
38    /// Number of write queues
39    pub write_queue_nums: u32,
40
41    /// Permission
42    pub perm: u32,
43
44    /// Topic filter type
45    pub topic_filter_type: u32,
46
47    /// Topic system flag
48    pub topic_sys_flag: u32,
49
50    /// Order
51    pub order: bool,
52
53    /// Attributes
54    pub attributes: serde_json::Value,
55}
56
57/// Topic information
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct TopicInfo {
60    /// Topic name
61    pub name: String,
62
63    /// Number of read queues
64    pub read_queue_nums: u32,
65
66    /// Number of write queues
67    pub write_queue_nums: u32,
68
69    /// Permission
70    pub perm: u32,
71
72    /// Topic system flag
73    pub topic_sys_flag: u32,
74
75    /// Broker addresses that have this topic
76    pub brokers: Vec<String>,
77
78    /// Additional metadata
79    pub metadata: serde_json::Value,
80}
81
82/// Topic manager
83pub struct TopicManager {
84    /// Topics: topic_name -> TopicInfo
85    topics: Arc<DashMap<String, TopicInfo>>,
86
87    /// Configuration
88    #[allow(dead_code)]
89    config: Arc<ControllerConfig>,
90}
91
92impl TopicManager {
93    /// Create a new topic manager
94    pub fn new(config: Arc<ControllerConfig>) -> Self {
95        Self {
96            topics: Arc::new(DashMap::new()),
97            config,
98        }
99    }
100
101    /// Start the topic manager
102    pub async fn start(&self) -> Result<()> {
103        info!("Starting topic manager");
104        Ok(())
105    }
106
107    /// Shutdown the topic manager
108    pub async fn shutdown(&self) -> Result<()> {
109        info!("Shutting down topic manager");
110        self.topics.clear();
111        Ok(())
112    }
113
114    /// Create a topic from config
115    pub async fn create_topic(&self, config: TopicConfig) -> Result<()> {
116        info!("Creating topic: {}", config.topic_name);
117
118        if config.topic_name.is_empty() {
119            return Err(ControllerError::InvalidRequest(
120                "Topic name cannot be empty".to_string(),
121            ));
122        }
123
124        // Convert config to info
125        let info = TopicInfo {
126            name: config.topic_name.clone(),
127            read_queue_nums: config.read_queue_nums,
128            write_queue_nums: config.write_queue_nums,
129            perm: config.perm,
130            topic_sys_flag: config.topic_sys_flag,
131            brokers: Vec::new(),
132            metadata: config.attributes,
133        };
134
135        self.topics.insert(info.name.clone(), info);
136        Ok(())
137    }
138
139    /// Update a topic from config
140    pub async fn update_topic(&self, config: TopicConfig) -> Result<()> {
141        info!("Updating topic: {}", config.topic_name);
142
143        if config.topic_name.is_empty() {
144            return Err(ControllerError::InvalidRequest(
145                "Topic name cannot be empty".to_string(),
146            ));
147        }
148
149        // Check if topic exists
150        if !self.topics.contains_key(&config.topic_name) {
151            return Err(ControllerError::MetadataNotFound {
152                key: config.topic_name.clone(),
153            });
154        }
155
156        // Convert config to info (preserving brokers list)
157        let old_brokers = self
158            .topics
159            .get(&config.topic_name)
160            .map(|v| v.brokers.clone())
161            .unwrap_or_default();
162
163        let info = TopicInfo {
164            name: config.topic_name.clone(),
165            read_queue_nums: config.read_queue_nums,
166            write_queue_nums: config.write_queue_nums,
167            perm: config.perm,
168            topic_sys_flag: config.topic_sys_flag,
169            brokers: old_brokers,
170            metadata: config.attributes,
171        };
172
173        self.topics.insert(info.name.clone(), info);
174        Ok(())
175    }
176
177    /// Create or update a topic
178    pub async fn create_or_update_topic(&self, info: TopicInfo) -> Result<()> {
179        info!("Creating/updating topic: {}", info.name);
180
181        if info.name.is_empty() {
182            return Err(ControllerError::InvalidRequest(
183                "Topic name cannot be empty".to_string(),
184            ));
185        }
186
187        self.topics.insert(info.name.clone(), info);
188        Ok(())
189    }
190
191    /// Delete a topic
192    pub async fn delete_topic(&self, topic_name: &str) -> Result<()> {
193        info!("Deleting topic: {}", topic_name);
194
195        self.topics
196            .remove(topic_name)
197            .ok_or_else(|| ControllerError::MetadataNotFound {
198                key: topic_name.to_string(),
199            })?;
200
201        Ok(())
202    }
203
204    /// Get topic information
205    pub async fn get_topic(&self, topic_name: &str) -> Result<TopicInfo> {
206        self.topics
207            .get(topic_name)
208            .map(|entry| entry.value().clone())
209            .ok_or_else(|| ControllerError::MetadataNotFound {
210                key: topic_name.to_string(),
211            })
212    }
213
214    /// List all topics
215    pub async fn list_topics(&self) -> Vec<TopicInfo> {
216        self.topics
217            .iter()
218            .map(|entry| entry.value().clone())
219            .collect()
220    }
221
222    /// List topics by broker
223    pub async fn list_topics_by_broker(&self, broker_name: &str) -> Vec<TopicInfo> {
224        self.topics
225            .iter()
226            .filter(|entry| entry.value().brokers.contains(&broker_name.to_string()))
227            .map(|entry| entry.value().clone())
228            .collect()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[tokio::test]
237    async fn test_topic_creation() {
238        let config = Arc::new(ControllerConfig::test_config());
239
240        let manager = TopicManager::new(config);
241
242        let info = TopicInfo {
243            name: "TestTopic".to_string(),
244            read_queue_nums: 4,
245            write_queue_nums: 4,
246            perm: 6,
247            topic_sys_flag: 0,
248            brokers: vec!["broker-a".to_string()],
249            metadata: serde_json::json!({}),
250        };
251
252        assert!(manager.create_or_update_topic(info.clone()).await.is_ok());
253        assert!(manager.get_topic("TestTopic").await.is_ok());
254    }
255}