Skip to main content

rocketmq_client_rust/base/
validators.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use cheetah_string::CheetahString;
18use rocketmq_common::common::config::TopicConfig;
19use rocketmq_common::common::constant::PermName;
20use rocketmq_common::common::message::MessageConst;
21use rocketmq_common::common::message::MessageTrait;
22use rocketmq_common::common::topic::TopicValidator;
23use rocketmq_remoting::code::response_code::ResponseCode;
24
25use crate::producer::default_mq_producer::ProducerConfig;
26
27pub struct Validators;
28
29impl Validators {
30    pub const CHARACTER_MAX_LENGTH: usize = 255;
31    pub const TOPIC_MAX_LENGTH: usize = 127;
32
33    pub fn check_group(group: &str) -> rocketmq_error::RocketMQResult<()> {
34        if group.trim().is_empty() {
35            return Err(mq_client_err!("the specified group is blank"));
36        }
37
38        if group.len() > Self::CHARACTER_MAX_LENGTH {
39            return Err(mq_client_err!(
40                "the specified group is longer than group max length 255."
41            ));
42        }
43
44        if TopicValidator::is_topic_or_group_illegal(group) {
45            return Err(mq_client_err!(format!(
46                "the specified group[{}] contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$",
47                group
48            )));
49        }
50        Ok(())
51    }
52
53    pub fn check_message<M>(msg: Option<&M>, producer_config: &ProducerConfig) -> rocketmq_error::RocketMQResult<()>
54    where
55        M: MessageTrait,
56    {
57        if msg.is_none() {
58            return Err(mq_client_err!(
59                ResponseCode::MessageIllegal as i32,
60                "the message is null".to_string()
61            ));
62        }
63        let msg = msg.unwrap();
64        Self::check_topic(msg.topic())?;
65        Self::is_not_allowed_send_topic(msg.topic())?;
66
67        if msg.get_body().is_none() {
68            return Err(mq_client_err!(
69                ResponseCode::MessageIllegal as i32,
70                "the message body is null".to_string()
71            ));
72        }
73
74        let length = msg.get_body().unwrap().len();
75        if length == 0 {
76            return Err(mq_client_err!(
77                ResponseCode::MessageIllegal as i32,
78                "the message body length is zero".to_string()
79            ));
80        }
81
82        if length > producer_config.max_message_size() as usize {
83            return Err(mq_client_err!(
84                ResponseCode::MessageIllegal as i32,
85                format!(
86                    "the message body size over max value, MAX: {}",
87                    producer_config.max_message_size()
88                )
89            ));
90        }
91
92        let lmq_path = msg.user_property(&CheetahString::from_static_str(
93            MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
94        ));
95        if let Some(value) = lmq_path {
96            if value.contains(std::path::MAIN_SEPARATOR) {
97                return Err(mq_client_err!(
98                    ResponseCode::MessageIllegal as i32,
99                    format!(
100                        "INNER_MULTI_DISPATCH {} can not contains {} character",
101                        value,
102                        std::path::MAIN_SEPARATOR
103                    )
104                ));
105            }
106        }
107
108        Ok(())
109    }
110
111    pub fn check_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
112        if topic.trim().is_empty() {
113            return Err(mq_client_err!("The specified topic is blank"));
114        }
115
116        if topic.len() > Self::TOPIC_MAX_LENGTH {
117            return Err(mq_client_err!(format!(
118                "The specified topic is longer than topic max length {}.",
119                Self::TOPIC_MAX_LENGTH
120            )));
121        }
122
123        if TopicValidator::is_topic_or_group_illegal(topic) {
124            return Err(mq_client_err!(format!(
125                "The specified topic[{}] contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$",
126                topic
127            )));
128        }
129
130        Ok(())
131    }
132
133    pub fn is_system_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
134        if TopicValidator::is_system_topic(topic) {
135            return Err(mq_client_err!(format!(
136                "The topic[{}] is conflict with system topic.",
137                topic
138            )));
139        }
140        Ok(())
141    }
142
143    pub fn is_not_allowed_send_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
144        if TopicValidator::is_not_allowed_send_topic(topic) {
145            return Err(mq_client_err!(format!(
146                "Sending message to topic[{}] is forbidden.",
147                topic
148            )));
149        }
150
151        Ok(())
152    }
153
154    pub fn check_topic_config(topic_config: &TopicConfig) -> rocketmq_error::RocketMQResult<()> {
155        if !PermName::is_valid(topic_config.perm) {
156            return Err(mq_client_err!(
157                ResponseCode::NoPermission as i32,
158                format!("topicPermission value: {} is invalid.", topic_config.perm)
159            ));
160        }
161
162        Ok(())
163    }
164
165    pub fn check_broker_config(broker_config: &HashMap<String, String>) -> rocketmq_error::RocketMQResult<()> {
166        if let Some(broker_permission) = broker_config.get("brokerPermission") {
167            if !PermName::is_valid(broker_permission.parse().unwrap()) {
168                return Err(mq_client_err!(format!(
169                    "brokerPermission value: {} is invalid.",
170                    broker_permission
171                )));
172            }
173        }
174
175        Ok(())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::collections::HashMap;
182
183    use rocketmq_common::common::config::TopicConfig;
184
185    use super::*;
186
187    #[test]
188    fn check_group_blank_group() {
189        let result = Validators::check_group("");
190        assert!(result.is_err());
191    }
192
193    #[test]
194    fn check_group_long_group() {
195        let long_group = "a".repeat(256);
196        let result = Validators::check_group(&long_group);
197        assert!(result.is_err());
198    }
199
200    #[test]
201    fn check_group_illegal_characters() {
202        let result = Validators::check_group("illegal@group");
203        assert!(result.is_err());
204    }
205
206    #[test]
207    fn check_group_valid_group() {
208        let result = Validators::check_group("valid_group");
209        assert!(result.is_ok());
210    }
211
212    #[test]
213    fn check_topic_blank_topic() {
214        let result = Validators::check_topic("");
215        assert!(result.is_err());
216    }
217
218    #[test]
219    fn check_topic_long_topic() {
220        let long_topic = "a".repeat(128);
221        let result = Validators::check_topic(&long_topic);
222        assert!(result.is_err());
223    }
224
225    #[test]
226    fn check_topic_illegal_characters() {
227        let result = Validators::check_topic("illegal@topic");
228        assert!(result.is_err());
229    }
230
231    #[test]
232    fn check_topic_valid_topic() {
233        let result = Validators::check_topic("valid_topic");
234        assert!(result.is_ok());
235    }
236
237    #[test]
238    fn check_topic_config_invalid_permission() {
239        let topic_config = TopicConfig {
240            perm: 999,
241            ..Default::default()
242        };
243        let result = Validators::check_topic_config(&topic_config);
244        assert!(result.is_err());
245    }
246
247    #[test]
248    fn check_topic_config_valid_permission() {
249        let topic_config = TopicConfig {
250            perm: 6,
251            ..Default::default()
252        };
253        let result = Validators::check_topic_config(&topic_config);
254        assert!(result.is_ok());
255    }
256
257    #[test]
258    fn check_broker_config_invalid_permission() {
259        let mut broker_config = HashMap::new();
260        broker_config.insert("brokerPermission".to_string(), "999".to_string());
261        let result = Validators::check_broker_config(&broker_config);
262        assert!(result.is_err());
263    }
264
265    #[test]
266    fn check_broker_config_valid_permission() {
267        let mut broker_config = HashMap::new();
268        broker_config.insert("brokerPermission".to_string(), "6".to_string());
269        let result = Validators::check_broker_config(&broker_config);
270        assert!(result.is_ok());
271    }
272}