1use std::collections::HashMap;
19use std::fmt::Display;
20
21use cheetah_string::CheetahString;
22use serde::Deserialize;
23use serde::Serialize;
24
25use super::TopicFilterType;
26use crate::common::attribute::topic_message_type::TopicMessageType;
27use crate::common::attribute::Attribute;
28use crate::common::constant::PermName;
29use crate::TopicAttributes::TopicAttributes;
30
31#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
32#[serde(rename_all = "camelCase")]
33pub struct TopicConfig {
34 pub topic_name: Option<CheetahString>,
35 pub read_queue_nums: u32,
36 pub write_queue_nums: u32,
37 pub perm: u32,
38 pub topic_filter_type: TopicFilterType,
39 pub topic_sys_flag: u32,
40 pub order: bool,
41 pub attributes: HashMap<CheetahString, CheetahString>,
42}
43
44impl Display for TopicConfig {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(
47 f,
48 "TopicConfig {{ topic_name: {:?}, read_queue_nums: {}, write_queue_nums: {}, perm: \
49 {}, topic_filter_type: {}, topic_sys_flag: {}, order: {}, attributes: {:?} }}",
50 self.topic_name,
51 self.read_queue_nums,
52 self.write_queue_nums,
53 self.perm,
54 self.topic_filter_type,
55 self.topic_sys_flag,
56 self.order,
57 self.attributes
58 )
59 }
60}
61
62impl Default for TopicConfig {
63 fn default() -> Self {
64 Self {
65 topic_name: None,
66 read_queue_nums: Self::DEFAULT_READ_QUEUE_NUMS,
67 write_queue_nums: Self::DEFAULT_WRITE_QUEUE_NUMS,
68 perm: PermName::PERM_READ | PermName::PERM_WRITE,
69 topic_filter_type: TopicFilterType::SingleTag,
70 topic_sys_flag: 0,
71 order: false,
72 attributes: HashMap::new(),
73 }
74 }
75}
76
77impl TopicConfig {
78 const DEFAULT_READ_QUEUE_NUMS: u32 = 16;
79 const DEFAULT_WRITE_QUEUE_NUMS: u32 = 16;
80 const SEPARATOR: &'static str = " ";
81
82 pub fn get_topic_message_type(&self) -> TopicMessageType {
83 if self.attributes.is_empty() {
84 return TopicMessageType::Normal;
85 }
86 let content = self
87 .attributes
88 .get(TopicAttributes::topic_message_type_attribute().name());
89 if let Some(content) = content {
90 return TopicMessageType::from(content.to_string());
91 }
92 TopicMessageType::Normal
93 }
94
95 pub fn new(topic_name: impl Into<CheetahString>) -> Self {
96 TopicConfig {
97 topic_name: Some(topic_name.into()),
98 ..Self::default()
99 }
100 }
101
102 pub fn with_queues(
103 topic_name: impl Into<CheetahString>,
104 read_queue_nums: u32,
105 write_queue_nums: u32,
106 ) -> Self {
107 Self {
108 read_queue_nums,
109 write_queue_nums,
110 ..Self::new(topic_name)
111 }
112 }
113
114 pub fn with_perm(
115 topic_name: impl Into<CheetahString>,
116 read_queue_nums: u32,
117 write_queue_nums: u32,
118 perm: u32,
119 ) -> Self {
120 Self {
121 read_queue_nums,
122 write_queue_nums,
123 perm,
124 ..Self::new(topic_name)
125 }
126 }
127
128 pub fn with_sys_flag(
129 topic_name: impl Into<CheetahString>,
130 read_queue_nums: u32,
131 write_queue_nums: u32,
132 perm: u32,
133 topic_sys_flag: u32,
134 ) -> Self {
135 Self {
136 read_queue_nums,
137 write_queue_nums,
138 perm,
139 topic_sys_flag,
140 ..Self::new(topic_name)
141 }
142 }
143
144 pub fn encode(&self) -> String {
145 let mut sb = String::new();
146 sb.push_str(
147 self.topic_name
148 .clone()
149 .unwrap_or(CheetahString::empty())
150 .as_str(),
151 );
152 sb.push_str(Self::SEPARATOR);
153 sb.push_str(&self.read_queue_nums.to_string());
154 sb.push_str(Self::SEPARATOR);
155 sb.push_str(&self.write_queue_nums.to_string());
156 sb.push_str(Self::SEPARATOR);
157 sb.push_str(&self.perm.to_string());
158 sb.push_str(Self::SEPARATOR);
159 sb.push_str(&format!("{}", self.topic_filter_type));
160 sb.push_str(Self::SEPARATOR);
161 if !self.attributes.is_empty() {
162 sb.push_str(&serde_json::to_string(&self.attributes).unwrap());
163 }
164 sb.trim().to_string()
165 }
166
167 pub fn decode(&mut self, input: &str) -> bool {
168 let parts: Vec<&str> = input.split(Self::SEPARATOR).collect();
169 if parts.len() >= 5 {
170 self.topic_name = Some(parts[0].into());
171 self.read_queue_nums = parts[1].parse().unwrap_or(Self::DEFAULT_READ_QUEUE_NUMS);
172 self.write_queue_nums = parts[2].parse().unwrap_or(Self::DEFAULT_WRITE_QUEUE_NUMS);
173 self.perm = parts[3]
174 .parse()
175 .unwrap_or(PermName::PERM_READ | PermName::PERM_WRITE);
176 self.topic_filter_type = From::from(parts[4]);
177 if parts.len() >= 6 {
178 if let Ok(attrs) = serde_json::from_str(parts[5]) {
179 self.attributes = attrs
180 }
181 }
182 true
183 } else {
184 false
185 }
186 }
187
188 pub fn get_read_queue_nums(&self) -> u32 {
189 self.read_queue_nums
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use std::collections::HashMap;
196
197 use cheetah_string::CheetahString;
198
199 use super::*;
200
201 #[test]
202 fn default_topic_config() {
203 let config = TopicConfig::default();
204 assert_eq!(config.topic_name, None);
205 assert_eq!(config.read_queue_nums, TopicConfig::DEFAULT_READ_QUEUE_NUMS);
206 assert_eq!(
207 config.write_queue_nums,
208 TopicConfig::DEFAULT_WRITE_QUEUE_NUMS
209 );
210 assert_eq!(config.perm, PermName::PERM_READ | PermName::PERM_WRITE);
211 assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
212 assert_eq!(config.topic_sys_flag, 0);
213 assert!(!config.order);
214 assert!(config.attributes.is_empty());
215 }
216
217 #[test]
218 fn new_topic_config() {
219 let topic_name = CheetahString::from("test_topic");
220 let config = TopicConfig::new(topic_name.clone());
221 assert_eq!(config.topic_name, Some(topic_name));
222 }
223
224 #[test]
225 fn with_queues_topic_config() {
226 let topic_name = CheetahString::from("test_topic");
227 let config = TopicConfig::with_queues(topic_name.clone(), 8, 8);
228 assert_eq!(config.topic_name, Some(topic_name));
229 assert_eq!(config.read_queue_nums, 8);
230 assert_eq!(config.write_queue_nums, 8);
231 }
232
233 #[test]
234 fn with_perm_topic_config() {
235 let topic_name = CheetahString::from("test_topic");
236 let config = TopicConfig::with_perm(topic_name.clone(), 8, 8, PermName::PERM_READ);
237 assert_eq!(config.topic_name, Some(topic_name));
238 assert_eq!(config.read_queue_nums, 8);
239 assert_eq!(config.write_queue_nums, 8);
240 assert_eq!(config.perm, PermName::PERM_READ);
241 }
242
243 #[test]
244 fn with_sys_flag_topic_config() {
245 let topic_name = CheetahString::from("test_topic");
246 let config = TopicConfig::with_sys_flag(topic_name.clone(), 8, 8, PermName::PERM_READ, 1);
247 assert_eq!(config.topic_name, Some(topic_name));
248 assert_eq!(config.read_queue_nums, 8);
249 assert_eq!(config.write_queue_nums, 8);
250 assert_eq!(config.perm, PermName::PERM_READ);
251 assert_eq!(config.topic_sys_flag, 1);
252 }
253
254 #[test]
255 fn encode_topic_config() {
256 let topic_name = CheetahString::from("test_topic");
257 let mut attributes = HashMap::new();
258 attributes.insert(CheetahString::from("key"), CheetahString::from("value"));
259 let config = TopicConfig {
260 topic_name: Some(topic_name.clone()),
261 read_queue_nums: 8,
262 write_queue_nums: 8,
263 perm: PermName::PERM_READ,
264 topic_filter_type: TopicFilterType::SingleTag,
265 topic_sys_flag: 1,
266 order: false,
267 attributes,
268 };
269 let encoded = config.encode();
270 assert!(encoded.contains("test_topic 8 8 4 SINGLE_TAG {\"key\":\"value\"}"));
271 }
272
273 #[test]
274 fn decode_topic_config() {
275 let mut config = TopicConfig::default();
276 let input = "test_topic 8 8 2 SingleTag {\"key\":\"value\"}";
277 let result = config.decode(input);
278 assert!(result);
279 assert_eq!(config.topic_name, Some(CheetahString::from("test_topic")));
280 assert_eq!(config.read_queue_nums, 8);
281 assert_eq!(config.write_queue_nums, 8);
282 assert_eq!(config.perm, PermName::PERM_WRITE);
283 assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
284 assert_eq!(
285 config.attributes.get(&CheetahString::from("key")),
286 Some(&CheetahString::from("value"))
287 );
288 }
289
290 #[test]
291 fn get_topic_message_type_normal() {
292 let config = TopicConfig::default();
293 assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
294 }
295
296 #[test]
297 fn get_topic_message_type_from_attributes() {
298 let mut config = TopicConfig::default();
299 config.attributes.insert(
300 CheetahString::from(TopicAttributes::topic_message_type_attribute().name()),
301 CheetahString::from("Normal"),
302 );
303 assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
304 }
305}