1use std::collections::HashMap;
16use std::fmt::Display;
17
18use cheetah_string::CheetahString;
19use serde::Deserialize;
20use serde::Serialize;
21
22use super::TopicFilterType;
23use crate::common::attribute::topic_message_type::TopicMessageType;
24use crate::common::attribute::Attribute;
25use crate::common::constant::PermName;
26use crate::TopicAttributes::TopicAttributes;
27
28#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
29#[serde(rename_all = "camelCase")]
30pub struct TopicConfig {
31 pub topic_name: Option<CheetahString>,
32 pub read_queue_nums: u32,
33 pub write_queue_nums: u32,
34 pub perm: u32,
35 pub topic_filter_type: TopicFilterType,
36 pub topic_sys_flag: u32,
37 pub order: bool,
38 pub attributes: HashMap<CheetahString, CheetahString>,
39}
40
41impl Display for TopicConfig {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 write!(
44 f,
45 "TopicConfig {{ topic_name: {:?}, read_queue_nums: {}, write_queue_nums: {}, perm: {}, topic_filter_type: \
46 {}, topic_sys_flag: {}, order: {}, attributes: {:?} }}",
47 self.topic_name,
48 self.read_queue_nums,
49 self.write_queue_nums,
50 self.perm,
51 self.topic_filter_type,
52 self.topic_sys_flag,
53 self.order,
54 self.attributes
55 )
56 }
57}
58
59impl Default for TopicConfig {
60 fn default() -> Self {
61 Self {
62 topic_name: None,
63 read_queue_nums: Self::DEFAULT_READ_QUEUE_NUMS,
64 write_queue_nums: Self::DEFAULT_WRITE_QUEUE_NUMS,
65 perm: PermName::PERM_READ | PermName::PERM_WRITE,
66 topic_filter_type: TopicFilterType::SingleTag,
67 topic_sys_flag: 0,
68 order: false,
69 attributes: HashMap::new(),
70 }
71 }
72}
73
74impl TopicConfig {
75 const DEFAULT_READ_QUEUE_NUMS: u32 = 16;
76 const DEFAULT_WRITE_QUEUE_NUMS: u32 = 16;
77 const SEPARATOR: &'static str = " ";
78
79 pub fn get_topic_message_type(&self) -> TopicMessageType {
80 if self.attributes.is_empty() {
81 return TopicMessageType::Normal;
82 }
83 let content = self
84 .attributes
85 .get(TopicAttributes::topic_message_type_attribute().name());
86 if let Some(content) = content {
87 return TopicMessageType::from(content.to_string());
88 }
89 TopicMessageType::Normal
90 }
91
92 pub fn new(topic_name: impl Into<CheetahString>) -> Self {
93 TopicConfig {
94 topic_name: Some(topic_name.into()),
95 ..Self::default()
96 }
97 }
98
99 pub fn with_queues(topic_name: impl Into<CheetahString>, read_queue_nums: u32, write_queue_nums: u32) -> Self {
100 Self {
101 read_queue_nums,
102 write_queue_nums,
103 ..Self::new(topic_name)
104 }
105 }
106
107 pub fn with_perm(
108 topic_name: impl Into<CheetahString>,
109 read_queue_nums: u32,
110 write_queue_nums: u32,
111 perm: u32,
112 ) -> Self {
113 Self {
114 read_queue_nums,
115 write_queue_nums,
116 perm,
117 ..Self::new(topic_name)
118 }
119 }
120
121 pub fn with_sys_flag(
122 topic_name: impl Into<CheetahString>,
123 read_queue_nums: u32,
124 write_queue_nums: u32,
125 perm: u32,
126 topic_sys_flag: u32,
127 ) -> Self {
128 Self {
129 read_queue_nums,
130 write_queue_nums,
131 perm,
132 topic_sys_flag,
133 ..Self::new(topic_name)
134 }
135 }
136
137 pub fn encode(&self) -> String {
138 let mut sb = String::new();
139 sb.push_str(self.topic_name.clone().unwrap_or(CheetahString::empty()).as_str());
140 sb.push_str(Self::SEPARATOR);
141 sb.push_str(&self.read_queue_nums.to_string());
142 sb.push_str(Self::SEPARATOR);
143 sb.push_str(&self.write_queue_nums.to_string());
144 sb.push_str(Self::SEPARATOR);
145 sb.push_str(&self.perm.to_string());
146 sb.push_str(Self::SEPARATOR);
147 sb.push_str(&format!("{}", self.topic_filter_type));
148 sb.push_str(Self::SEPARATOR);
149 if !self.attributes.is_empty() {
150 sb.push_str(&serde_json::to_string(&self.attributes).unwrap());
151 }
152 sb.trim().to_string()
153 }
154
155 pub fn decode(&mut self, input: &str) -> bool {
156 let parts: Vec<&str> = input.split(Self::SEPARATOR).collect();
157 if parts.len() >= 5 {
158 self.topic_name = Some(parts[0].into());
159 self.read_queue_nums = parts[1].parse().unwrap_or(Self::DEFAULT_READ_QUEUE_NUMS);
160 self.write_queue_nums = parts[2].parse().unwrap_or(Self::DEFAULT_WRITE_QUEUE_NUMS);
161 self.perm = parts[3].parse().unwrap_or(PermName::PERM_READ | PermName::PERM_WRITE);
162 self.topic_filter_type = From::from(parts[4]);
163 if parts.len() >= 6 {
164 if let Ok(attrs) = serde_json::from_str(parts[5]) {
165 self.attributes = attrs
166 }
167 }
168 true
169 } else {
170 false
171 }
172 }
173
174 pub fn get_read_queue_nums(&self) -> u32 {
175 self.read_queue_nums
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::collections::HashMap;
182
183 use cheetah_string::CheetahString;
184
185 use super::*;
186
187 #[test]
188 fn default_topic_config() {
189 let config = TopicConfig::default();
190 assert_eq!(config.topic_name, None);
191 assert_eq!(config.read_queue_nums, TopicConfig::DEFAULT_READ_QUEUE_NUMS);
192 assert_eq!(config.write_queue_nums, TopicConfig::DEFAULT_WRITE_QUEUE_NUMS);
193 assert_eq!(config.perm, PermName::PERM_READ | PermName::PERM_WRITE);
194 assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
195 assert_eq!(config.topic_sys_flag, 0);
196 assert!(!config.order);
197 assert!(config.attributes.is_empty());
198 }
199
200 #[test]
201 fn new_topic_config() {
202 let topic_name = CheetahString::from("test_topic");
203 let config = TopicConfig::new(topic_name.clone());
204 assert_eq!(config.topic_name, Some(topic_name));
205 }
206
207 #[test]
208 fn with_queues_topic_config() {
209 let topic_name = CheetahString::from("test_topic");
210 let config = TopicConfig::with_queues(topic_name.clone(), 8, 8);
211 assert_eq!(config.topic_name, Some(topic_name));
212 assert_eq!(config.read_queue_nums, 8);
213 assert_eq!(config.write_queue_nums, 8);
214 }
215
216 #[test]
217 fn with_perm_topic_config() {
218 let topic_name = CheetahString::from("test_topic");
219 let config = TopicConfig::with_perm(topic_name.clone(), 8, 8, PermName::PERM_READ);
220 assert_eq!(config.topic_name, Some(topic_name));
221 assert_eq!(config.read_queue_nums, 8);
222 assert_eq!(config.write_queue_nums, 8);
223 assert_eq!(config.perm, PermName::PERM_READ);
224 }
225
226 #[test]
227 fn with_sys_flag_topic_config() {
228 let topic_name = CheetahString::from("test_topic");
229 let config = TopicConfig::with_sys_flag(topic_name.clone(), 8, 8, PermName::PERM_READ, 1);
230 assert_eq!(config.topic_name, Some(topic_name));
231 assert_eq!(config.read_queue_nums, 8);
232 assert_eq!(config.write_queue_nums, 8);
233 assert_eq!(config.perm, PermName::PERM_READ);
234 assert_eq!(config.topic_sys_flag, 1);
235 }
236
237 #[test]
238 fn encode_topic_config() {
239 let topic_name = CheetahString::from("test_topic");
240 let mut attributes = HashMap::new();
241 attributes.insert(CheetahString::from("key"), CheetahString::from("value"));
242 let config = TopicConfig {
243 topic_name: Some(topic_name.clone()),
244 read_queue_nums: 8,
245 write_queue_nums: 8,
246 perm: PermName::PERM_READ,
247 topic_filter_type: TopicFilterType::SingleTag,
248 topic_sys_flag: 1,
249 order: false,
250 attributes,
251 };
252 let encoded = config.encode();
253 assert!(encoded.contains("test_topic 8 8 4 SINGLE_TAG {\"key\":\"value\"}"));
254 }
255
256 #[test]
257 fn decode_topic_config() {
258 let mut config = TopicConfig::default();
259 let input = "test_topic 8 8 2 SingleTag {\"key\":\"value\"}";
260 let result = config.decode(input);
261 assert!(result);
262 assert_eq!(config.topic_name, Some(CheetahString::from("test_topic")));
263 assert_eq!(config.read_queue_nums, 8);
264 assert_eq!(config.write_queue_nums, 8);
265 assert_eq!(config.perm, PermName::PERM_WRITE);
266 assert_eq!(config.topic_filter_type, TopicFilterType::SingleTag);
267 assert_eq!(
268 config.attributes.get(&CheetahString::from("key")),
269 Some(&CheetahString::from("value"))
270 );
271 }
272
273 #[test]
274 fn get_topic_message_type_normal() {
275 let config = TopicConfig::default();
276 assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
277 }
278
279 #[test]
280 fn get_topic_message_type_from_attributes() {
281 let mut config = TopicConfig::default();
282 config.attributes.insert(
283 CheetahString::from(TopicAttributes::topic_message_type_attribute().name()),
284 CheetahString::from("Normal"),
285 );
286 assert_eq!(config.get_topic_message_type(), TopicMessageType::Normal);
287 }
288}