rocketmq_common/common/
topic.rs1use std::sync::LazyLock;
19
20use cheetah_string::CheetahString;
21use dashmap::DashSet;
22
23pub const TOPIC_MAX_LENGTH: usize = 127;
24
25const VALID_CHAR_BIT_MAP: [bool; 128] = {
28 let mut map = [false; 128];
29 map['%' as usize] = true;
30 map['-' as usize] = true;
31 map['_' as usize] = true;
32 map['|' as usize] = true;
33
34 let mut i = b'0';
35 while i <= b'9' {
36 map[i as usize] = true;
37 i += 1;
38 }
39
40 let mut i = b'A';
41 while i <= b'Z' {
42 map[i as usize] = true;
43 i += 1;
44 }
45
46 let mut i = b'a';
47 while i <= b'z' {
48 map[i as usize] = true;
49 i += 1;
50 }
51 map
52};
53
54static SYSTEM_TOPIC_SET: LazyLock<DashSet<CheetahString>> = LazyLock::new(|| {
55 let set = DashSet::new();
56 set.insert(CheetahString::from_static_str(
57 TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
58 ));
59 set.insert(CheetahString::from_static_str(
60 TopicValidator::RMQ_SYS_SCHEDULE_TOPIC,
61 ));
62 set.insert(CheetahString::from_static_str(
63 TopicValidator::RMQ_SYS_BENCHMARK_TOPIC,
64 ));
65 set.insert(CheetahString::from_static_str(
66 TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC,
67 ));
68 set.insert(CheetahString::from_static_str(
69 TopicValidator::RMQ_SYS_TRACE_TOPIC,
70 ));
71 set.insert(CheetahString::from_static_str(
72 TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC,
73 ));
74 set.insert(CheetahString::from_static_str(
75 TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC,
76 ));
77 set.insert(CheetahString::from_static_str(
78 TopicValidator::RMQ_SYS_SELF_TEST_TOPIC,
79 ));
80 set.insert(CheetahString::from_static_str(
81 TopicValidator::RMQ_SYS_OFFSET_MOVED_EVENT,
82 ));
83 set.insert(CheetahString::from_static_str(
84 TopicValidator::RMQ_SYS_ROCKSDB_OFFSET_TOPIC,
85 ));
86 set
87});
88
89static NOT_ALLOWED_SEND_TOPIC_SET: LazyLock<DashSet<CheetahString>> = LazyLock::new(|| {
90 let set = DashSet::new();
91 set.insert(CheetahString::from_static_str(
92 TopicValidator::RMQ_SYS_SCHEDULE_TOPIC,
93 ));
94 set.insert(CheetahString::from_static_str(
95 TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC,
96 ));
97 set.insert(CheetahString::from_static_str(
98 TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC,
99 ));
100 set.insert(CheetahString::from_static_str(
101 TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC,
102 ));
103 set.insert(CheetahString::from_static_str(
104 TopicValidator::RMQ_SYS_SELF_TEST_TOPIC,
105 ));
106 set.insert(CheetahString::from_static_str(
107 TopicValidator::RMQ_SYS_OFFSET_MOVED_EVENT,
108 ));
109 set
110});
111
112pub struct TopicValidator;
113
114impl TopicValidator {
115 pub const AUTO_CREATE_TOPIC_KEY_TOPIC: &'static str = "TBW102";
116 pub const RMQ_SYS_SCHEDULE_TOPIC: &'static str = "SCHEDULE_TOPIC_XXXX";
117 pub const RMQ_SYS_BENCHMARK_TOPIC: &'static str = "BenchmarkTest";
118 pub const RMQ_SYS_TRANS_HALF_TOPIC: &'static str = "RMQ_SYS_TRANS_HALF_TOPIC";
119 pub const RMQ_SYS_TRACE_TOPIC: &'static str = "RMQ_SYS_TRACE_TOPIC";
120 pub const RMQ_SYS_TRANS_OP_HALF_TOPIC: &'static str = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
121 pub const RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC: &'static str = "TRANS_CHECK_MAX_TIME_TOPIC";
122 pub const RMQ_SYS_SELF_TEST_TOPIC: &'static str = "SELF_TEST_TOPIC";
123 pub const RMQ_SYS_OFFSET_MOVED_EVENT: &'static str = "OFFSET_MOVED_EVENT";
124 pub const RMQ_SYS_ROCKSDB_OFFSET_TOPIC: &'static str = "CHECKPOINT_TOPIC";
125
126 pub const SYSTEM_TOPIC_PREFIX: &'static str = "rmq_sys_";
127 pub const SYNC_BROKER_MEMBER_GROUP_PREFIX: &'static str = "rmq_sys_SYNC_BROKER_MEMBER_";
128}
129
130impl TopicValidator {
131 #[inline]
136 pub fn is_topic_or_group_illegal(name: &str) -> bool {
137 name.bytes()
138 .any(|b| (b as usize) >= VALID_CHAR_BIT_MAP.len() || !VALID_CHAR_BIT_MAP[b as usize])
139 }
140
141 #[inline]
148 pub fn validate_topic(topic: &str) -> ValidateTopicResult {
149 if topic.is_empty() || topic.len() > TOPIC_MAX_LENGTH {
151 if topic.is_empty() || topic.trim().is_empty() {
152 const REMARK: &str = "The specified topic is blank.";
153 return ValidateTopicResult {
154 valid: false,
155 remark: CheetahString::from_static_str(REMARK),
156 };
157 }
158 return ValidateTopicResult {
159 valid: false,
160 remark: CheetahString::from(format!(
161 "The specified topic is longer than topic max length {TOPIC_MAX_LENGTH}."
162 )),
163 };
164 }
165
166 if Self::is_topic_or_group_illegal(topic) {
167 const REMARK: &str =
168 "The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
169 return ValidateTopicResult {
170 valid: false,
171 remark: CheetahString::from_static_str(REMARK),
172 };
173 }
174
175 ValidateTopicResult {
176 valid: true,
177 remark: CheetahString::empty(),
178 }
179 }
180
181 #[inline]
182 pub fn is_system_topic(topic: &str) -> bool {
183 if topic.starts_with(TopicValidator::SYSTEM_TOPIC_PREFIX) {
185 return true;
186 }
187 SYSTEM_TOPIC_SET.iter().any(|entry| entry.as_str() == topic)
189 }
190
191 #[inline]
192 pub fn is_not_allowed_send_topic(topic: &str) -> bool {
193 NOT_ALLOWED_SEND_TOPIC_SET
195 .iter()
196 .any(|entry| entry.as_str() == topic)
197 }
198
199 pub fn add_system_topic(system_topic: impl Into<CheetahString>) {
200 SYSTEM_TOPIC_SET.insert(system_topic.into());
201 }
202
203 pub fn get_system_topic_set() -> Vec<CheetahString> {
204 SYSTEM_TOPIC_SET.iter().map(|entry| entry.clone()).collect()
205 }
206
207 pub fn get_not_allowed_send_topic_set() -> Vec<CheetahString> {
208 NOT_ALLOWED_SEND_TOPIC_SET
209 .iter()
210 .map(|entry| entry.clone())
211 .collect()
212 }
213}
214
215pub struct ValidateTopicResult {
216 valid: bool,
217 remark: CheetahString,
218}
219
220impl ValidateTopicResult {
221 #[inline]
222 pub fn valid(&self) -> bool {
223 self.valid
224 }
225
226 #[inline]
227 pub fn remark(&self) -> &CheetahString {
228 &self.remark
229 }
230
231 #[inline]
232 pub fn take_remark(self) -> CheetahString {
233 self.remark
234 }
235
236 #[inline]
237 pub fn set_valid(&mut self, valid: bool) {
238 self.valid = valid;
239 }
240
241 #[inline]
242 pub fn set_remark(&mut self, remark: CheetahString) {
243 self.remark = remark;
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 #[test]
251 fn validate_topic_with_valid_topic() {
252 let result = TopicValidator::validate_topic("valid_topic");
253 assert!(result.valid());
254 assert_eq!(result.remark(), "");
255 }
256
257 #[test]
258 fn validate_topic_with_empty_topic() {
259 let result = TopicValidator::validate_topic("");
260 assert!(!result.valid());
261 assert_eq!(result.remark(), "The specified topic is blank.");
262 }
263
264 #[test]
265 fn validate_topic_with_illegal_characters() {
266 let result = TopicValidator::validate_topic("invalid@topic");
267 assert!(!result.valid());
268 assert_eq!(
269 result.remark(),
270 "The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$"
271 );
272 }
273
274 #[test]
275 fn validate_topic_with_exceeding_length() {
276 let long_topic = "a".repeat(TOPIC_MAX_LENGTH + 1);
277 let result = TopicValidator::validate_topic(&long_topic);
278 assert!(!result.valid());
279 assert_eq!(
280 result.remark().as_str(),
281 format!(
282 "The specified topic is longer than topic max length {}.",
283 TOPIC_MAX_LENGTH
284 )
285 );
286 }
287
288 #[test]
289 fn is_system_topic_with_system_topic() {
290 assert!(TopicValidator::is_system_topic(
291 TopicValidator::RMQ_SYS_SCHEDULE_TOPIC
292 ));
293 }
294
295 #[test]
296 fn is_system_topic_with_non_system_topic() {
297 assert!(!TopicValidator::is_system_topic("non_system_topic"));
298 }
299
300 #[test]
301 fn is_not_allowed_send_topic_with_not_allowed_topic() {
302 assert!(TopicValidator::is_not_allowed_send_topic(
303 TopicValidator::RMQ_SYS_SCHEDULE_TOPIC
304 ));
305 }
306
307 #[test]
308 fn is_not_allowed_send_topic_with_allowed_topic() {
309 assert!(!TopicValidator::is_not_allowed_send_topic("allowed_topic"));
310 }
311
312 #[test]
313 fn add_system_topic_adds_new_topic() {
314 TopicValidator::add_system_topic("new_system_topic");
315 assert!(TopicValidator::is_system_topic("new_system_topic"));
316 }
317
318 #[test]
319 fn get_system_topic_set_returns_all_system_topics() {
320 let system_topics = TopicValidator::get_system_topic_set();
321 assert!(system_topics
322 .iter()
323 .any(|s| s.as_str() == TopicValidator::RMQ_SYS_SCHEDULE_TOPIC));
324 }
325
326 #[test]
327 fn get_not_allowed_send_topic_set_returns_all_not_allowed_topics() {
328 let not_allowed_topics = TopicValidator::get_not_allowed_send_topic_set();
329 assert!(not_allowed_topics
330 .iter()
331 .any(|s| s.as_str() == TopicValidator::RMQ_SYS_SCHEDULE_TOPIC));
332 }
333}