rocketmq_common/common/
topic.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License; Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing; software
12 * distributed under the License is distributed on an "AS IS" BASIS;
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND; either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::LazyLock;
19
20use cheetah_string::CheetahString;
21use dashmap::DashSet;
22
23pub const TOPIC_MAX_LENGTH: usize = 127;
24
25/// Pre-computed valid character bitmap for topic/group name validation
26/// Allows: 0-9, a-z, A-Z, %, -, _, |
27const VALID_CHAR_BIT_MAP: [bool; 128] = {
28    let mut map = [false; 128];
29    map['%' as usize] = true;
30    map['-' as usize] = true;
31    map['_' as usize] = true;
32    map['|' as usize] = true;
33
34    let mut i = b'0';
35    while i <= b'9' {
36        map[i as usize] = true;
37        i += 1;
38    }
39
40    let mut i = b'A';
41    while i <= b'Z' {
42        map[i as usize] = true;
43        i += 1;
44    }
45
46    let mut i = b'a';
47    while i <= b'z' {
48        map[i as usize] = true;
49        i += 1;
50    }
51    map
52};
53
54static SYSTEM_TOPIC_SET: LazyLock<DashSet<CheetahString>> = LazyLock::new(|| {
55    let set = DashSet::new();
56    set.insert(CheetahString::from_static_str(
57        TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
58    ));
59    set.insert(CheetahString::from_static_str(
60        TopicValidator::RMQ_SYS_SCHEDULE_TOPIC,
61    ));
62    set.insert(CheetahString::from_static_str(
63        TopicValidator::RMQ_SYS_BENCHMARK_TOPIC,
64    ));
65    set.insert(CheetahString::from_static_str(
66        TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC,
67    ));
68    set.insert(CheetahString::from_static_str(
69        TopicValidator::RMQ_SYS_TRACE_TOPIC,
70    ));
71    set.insert(CheetahString::from_static_str(
72        TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC,
73    ));
74    set.insert(CheetahString::from_static_str(
75        TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC,
76    ));
77    set.insert(CheetahString::from_static_str(
78        TopicValidator::RMQ_SYS_SELF_TEST_TOPIC,
79    ));
80    set.insert(CheetahString::from_static_str(
81        TopicValidator::RMQ_SYS_OFFSET_MOVED_EVENT,
82    ));
83    set.insert(CheetahString::from_static_str(
84        TopicValidator::RMQ_SYS_ROCKSDB_OFFSET_TOPIC,
85    ));
86    set
87});
88
89static NOT_ALLOWED_SEND_TOPIC_SET: LazyLock<DashSet<CheetahString>> = LazyLock::new(|| {
90    let set = DashSet::new();
91    set.insert(CheetahString::from_static_str(
92        TopicValidator::RMQ_SYS_SCHEDULE_TOPIC,
93    ));
94    set.insert(CheetahString::from_static_str(
95        TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC,
96    ));
97    set.insert(CheetahString::from_static_str(
98        TopicValidator::RMQ_SYS_TRANS_OP_HALF_TOPIC,
99    ));
100    set.insert(CheetahString::from_static_str(
101        TopicValidator::RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC,
102    ));
103    set.insert(CheetahString::from_static_str(
104        TopicValidator::RMQ_SYS_SELF_TEST_TOPIC,
105    ));
106    set.insert(CheetahString::from_static_str(
107        TopicValidator::RMQ_SYS_OFFSET_MOVED_EVENT,
108    ));
109    set
110});
111
112pub struct TopicValidator;
113
114impl TopicValidator {
115    pub const AUTO_CREATE_TOPIC_KEY_TOPIC: &'static str = "TBW102";
116    pub const RMQ_SYS_SCHEDULE_TOPIC: &'static str = "SCHEDULE_TOPIC_XXXX";
117    pub const RMQ_SYS_BENCHMARK_TOPIC: &'static str = "BenchmarkTest";
118    pub const RMQ_SYS_TRANS_HALF_TOPIC: &'static str = "RMQ_SYS_TRANS_HALF_TOPIC";
119    pub const RMQ_SYS_TRACE_TOPIC: &'static str = "RMQ_SYS_TRACE_TOPIC";
120    pub const RMQ_SYS_TRANS_OP_HALF_TOPIC: &'static str = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
121    pub const RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC: &'static str = "TRANS_CHECK_MAX_TIME_TOPIC";
122    pub const RMQ_SYS_SELF_TEST_TOPIC: &'static str = "SELF_TEST_TOPIC";
123    pub const RMQ_SYS_OFFSET_MOVED_EVENT: &'static str = "OFFSET_MOVED_EVENT";
124    pub const RMQ_SYS_ROCKSDB_OFFSET_TOPIC: &'static str = "CHECKPOINT_TOPIC";
125
126    pub const SYSTEM_TOPIC_PREFIX: &'static str = "rmq_sys_";
127    pub const SYNC_BROKER_MEMBER_GROUP_PREFIX: &'static str = "rmq_sys_SYNC_BROKER_MEMBER_";
128}
129
130impl TopicValidator {
131    /// Check if topic or group name contains illegal characters
132    ///
133    /// # Performance
134    /// Uses pre-computed bitmap for O(1) character validation
135    #[inline]
136    pub fn is_topic_or_group_illegal(name: &str) -> bool {
137        name.bytes()
138            .any(|b| (b as usize) >= VALID_CHAR_BIT_MAP.len() || !VALID_CHAR_BIT_MAP[b as usize])
139    }
140
141    /// Validate topic name according to RocketMQ rules
142    ///
143    /// # Rules
144    /// - Cannot be blank
145    /// - Must contain only: 0-9, a-z, A-Z, %, -, _, |
146    /// - Length must not exceed TOPIC_MAX_LENGTH (127)
147    #[inline]
148    pub fn validate_topic(topic: &str) -> ValidateTopicResult {
149        // Fast path: check length first (cheaper than trim)
150        if topic.is_empty() || topic.len() > TOPIC_MAX_LENGTH {
151            if topic.is_empty() || topic.trim().is_empty() {
152                const REMARK: &str = "The specified topic is blank.";
153                return ValidateTopicResult {
154                    valid: false,
155                    remark: CheetahString::from_static_str(REMARK),
156                };
157            }
158            return ValidateTopicResult {
159                valid: false,
160                remark: CheetahString::from(format!(
161                    "The specified topic is longer than topic max length {TOPIC_MAX_LENGTH}."
162                )),
163            };
164        }
165
166        if Self::is_topic_or_group_illegal(topic) {
167            const REMARK: &str =
168                "The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$";
169            return ValidateTopicResult {
170                valid: false,
171                remark: CheetahString::from_static_str(REMARK),
172            };
173        }
174
175        ValidateTopicResult {
176            valid: true,
177            remark: CheetahString::empty(),
178        }
179    }
180
181    #[inline]
182    pub fn is_system_topic(topic: &str) -> bool {
183        // Fast path: check prefix first (no lock needed)
184        if topic.starts_with(TopicValidator::SYSTEM_TOPIC_PREFIX) {
185            return true;
186        }
187        // DashSet::contains is lock-free for reads
188        SYSTEM_TOPIC_SET.iter().any(|entry| entry.as_str() == topic)
189    }
190
191    #[inline]
192    pub fn is_not_allowed_send_topic(topic: &str) -> bool {
193        // DashSet iteration is lock-free
194        NOT_ALLOWED_SEND_TOPIC_SET
195            .iter()
196            .any(|entry| entry.as_str() == topic)
197    }
198
199    pub fn add_system_topic(system_topic: impl Into<CheetahString>) {
200        SYSTEM_TOPIC_SET.insert(system_topic.into());
201    }
202
203    pub fn get_system_topic_set() -> Vec<CheetahString> {
204        SYSTEM_TOPIC_SET.iter().map(|entry| entry.clone()).collect()
205    }
206
207    pub fn get_not_allowed_send_topic_set() -> Vec<CheetahString> {
208        NOT_ALLOWED_SEND_TOPIC_SET
209            .iter()
210            .map(|entry| entry.clone())
211            .collect()
212    }
213}
214
215pub struct ValidateTopicResult {
216    valid: bool,
217    remark: CheetahString,
218}
219
220impl ValidateTopicResult {
221    #[inline]
222    pub fn valid(&self) -> bool {
223        self.valid
224    }
225
226    #[inline]
227    pub fn remark(&self) -> &CheetahString {
228        &self.remark
229    }
230
231    #[inline]
232    pub fn take_remark(self) -> CheetahString {
233        self.remark
234    }
235
236    #[inline]
237    pub fn set_valid(&mut self, valid: bool) {
238        self.valid = valid;
239    }
240
241    #[inline]
242    pub fn set_remark(&mut self, remark: CheetahString) {
243        self.remark = remark;
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    #[test]
251    fn validate_topic_with_valid_topic() {
252        let result = TopicValidator::validate_topic("valid_topic");
253        assert!(result.valid());
254        assert_eq!(result.remark(), "");
255    }
256
257    #[test]
258    fn validate_topic_with_empty_topic() {
259        let result = TopicValidator::validate_topic("");
260        assert!(!result.valid());
261        assert_eq!(result.remark(), "The specified topic is blank.");
262    }
263
264    #[test]
265    fn validate_topic_with_illegal_characters() {
266        let result = TopicValidator::validate_topic("invalid@topic");
267        assert!(!result.valid());
268        assert_eq!(
269            result.remark(),
270            "The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$"
271        );
272    }
273
274    #[test]
275    fn validate_topic_with_exceeding_length() {
276        let long_topic = "a".repeat(TOPIC_MAX_LENGTH + 1);
277        let result = TopicValidator::validate_topic(&long_topic);
278        assert!(!result.valid());
279        assert_eq!(
280            result.remark().as_str(),
281            format!(
282                "The specified topic is longer than topic max length {}.",
283                TOPIC_MAX_LENGTH
284            )
285        );
286    }
287
288    #[test]
289    fn is_system_topic_with_system_topic() {
290        assert!(TopicValidator::is_system_topic(
291            TopicValidator::RMQ_SYS_SCHEDULE_TOPIC
292        ));
293    }
294
295    #[test]
296    fn is_system_topic_with_non_system_topic() {
297        assert!(!TopicValidator::is_system_topic("non_system_topic"));
298    }
299
300    #[test]
301    fn is_not_allowed_send_topic_with_not_allowed_topic() {
302        assert!(TopicValidator::is_not_allowed_send_topic(
303            TopicValidator::RMQ_SYS_SCHEDULE_TOPIC
304        ));
305    }
306
307    #[test]
308    fn is_not_allowed_send_topic_with_allowed_topic() {
309        assert!(!TopicValidator::is_not_allowed_send_topic("allowed_topic"));
310    }
311
312    #[test]
313    fn add_system_topic_adds_new_topic() {
314        TopicValidator::add_system_topic("new_system_topic");
315        assert!(TopicValidator::is_system_topic("new_system_topic"));
316    }
317
318    #[test]
319    fn get_system_topic_set_returns_all_system_topics() {
320        let system_topics = TopicValidator::get_system_topic_set();
321        assert!(system_topics
322            .iter()
323            .any(|s| s.as_str() == TopicValidator::RMQ_SYS_SCHEDULE_TOPIC));
324    }
325
326    #[test]
327    fn get_not_allowed_send_topic_set_returns_all_not_allowed_topics() {
328        let not_allowed_topics = TopicValidator::get_not_allowed_send_topic_set();
329        assert!(not_allowed_topics
330            .iter()
331            .any(|s| s.as_str() == TopicValidator::RMQ_SYS_SCHEDULE_TOPIC));
332    }
333}