rocketmq_controller/metadata/
topic.rs1use std::sync::Arc;
19
20use dashmap::DashMap;
21use serde::Deserialize;
22use serde::Serialize;
23use tracing::info;
24
25use crate::config::ControllerConfig;
26use crate::error::ControllerError;
27use crate::error::Result;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TopicConfig {
32 pub topic_name: String,
34
35 pub read_queue_nums: u32,
37
38 pub write_queue_nums: u32,
40
41 pub perm: u32,
43
44 pub topic_filter_type: u32,
46
47 pub topic_sys_flag: u32,
49
50 pub order: bool,
52
53 pub attributes: serde_json::Value,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct TopicInfo {
60 pub name: String,
62
63 pub read_queue_nums: u32,
65
66 pub write_queue_nums: u32,
68
69 pub perm: u32,
71
72 pub topic_sys_flag: u32,
74
75 pub brokers: Vec<String>,
77
78 pub metadata: serde_json::Value,
80}
81
82pub struct TopicManager {
84 topics: Arc<DashMap<String, TopicInfo>>,
86
87 #[allow(dead_code)]
89 config: Arc<ControllerConfig>,
90}
91
92impl TopicManager {
93 pub fn new(config: Arc<ControllerConfig>) -> Self {
95 Self {
96 topics: Arc::new(DashMap::new()),
97 config,
98 }
99 }
100
101 pub async fn start(&self) -> Result<()> {
103 info!("Starting topic manager");
104 Ok(())
105 }
106
107 pub async fn shutdown(&self) -> Result<()> {
109 info!("Shutting down topic manager");
110 self.topics.clear();
111 Ok(())
112 }
113
114 pub async fn create_topic(&self, config: TopicConfig) -> Result<()> {
116 info!("Creating topic: {}", config.topic_name);
117
118 if config.topic_name.is_empty() {
119 return Err(ControllerError::InvalidRequest(
120 "Topic name cannot be empty".to_string(),
121 ));
122 }
123
124 let info = TopicInfo {
126 name: config.topic_name.clone(),
127 read_queue_nums: config.read_queue_nums,
128 write_queue_nums: config.write_queue_nums,
129 perm: config.perm,
130 topic_sys_flag: config.topic_sys_flag,
131 brokers: Vec::new(),
132 metadata: config.attributes,
133 };
134
135 self.topics.insert(info.name.clone(), info);
136 Ok(())
137 }
138
139 pub async fn update_topic(&self, config: TopicConfig) -> Result<()> {
141 info!("Updating topic: {}", config.topic_name);
142
143 if config.topic_name.is_empty() {
144 return Err(ControllerError::InvalidRequest(
145 "Topic name cannot be empty".to_string(),
146 ));
147 }
148
149 if !self.topics.contains_key(&config.topic_name) {
151 return Err(ControllerError::MetadataNotFound {
152 key: config.topic_name.clone(),
153 });
154 }
155
156 let old_brokers = self
158 .topics
159 .get(&config.topic_name)
160 .map(|v| v.brokers.clone())
161 .unwrap_or_default();
162
163 let info = TopicInfo {
164 name: config.topic_name.clone(),
165 read_queue_nums: config.read_queue_nums,
166 write_queue_nums: config.write_queue_nums,
167 perm: config.perm,
168 topic_sys_flag: config.topic_sys_flag,
169 brokers: old_brokers,
170 metadata: config.attributes,
171 };
172
173 self.topics.insert(info.name.clone(), info);
174 Ok(())
175 }
176
177 pub async fn create_or_update_topic(&self, info: TopicInfo) -> Result<()> {
179 info!("Creating/updating topic: {}", info.name);
180
181 if info.name.is_empty() {
182 return Err(ControllerError::InvalidRequest(
183 "Topic name cannot be empty".to_string(),
184 ));
185 }
186
187 self.topics.insert(info.name.clone(), info);
188 Ok(())
189 }
190
191 pub async fn delete_topic(&self, topic_name: &str) -> Result<()> {
193 info!("Deleting topic: {}", topic_name);
194
195 self.topics
196 .remove(topic_name)
197 .ok_or_else(|| ControllerError::MetadataNotFound {
198 key: topic_name.to_string(),
199 })?;
200
201 Ok(())
202 }
203
204 pub async fn get_topic(&self, topic_name: &str) -> Result<TopicInfo> {
206 self.topics
207 .get(topic_name)
208 .map(|entry| entry.value().clone())
209 .ok_or_else(|| ControllerError::MetadataNotFound {
210 key: topic_name.to_string(),
211 })
212 }
213
214 pub async fn list_topics(&self) -> Vec<TopicInfo> {
216 self.topics
217 .iter()
218 .map(|entry| entry.value().clone())
219 .collect()
220 }
221
222 pub async fn list_topics_by_broker(&self, broker_name: &str) -> Vec<TopicInfo> {
224 self.topics
225 .iter()
226 .filter(|entry| entry.value().brokers.contains(&broker_name.to_string()))
227 .map(|entry| entry.value().clone())
228 .collect()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[tokio::test]
237 async fn test_topic_creation() {
238 let config = Arc::new(ControllerConfig::test_config());
239
240 let manager = TopicManager::new(config);
241
242 let info = TopicInfo {
243 name: "TestTopic".to_string(),
244 read_queue_nums: 4,
245 write_queue_nums: 4,
246 perm: 6,
247 topic_sys_flag: 0,
248 brokers: vec!["broker-a".to_string()],
249 metadata: serde_json::json!({}),
250 };
251
252 assert!(manager.create_or_update_topic(info.clone()).await.is_ok());
253 assert!(manager.get_topic("TestTopic").await.is_ok());
254 }
255}