rocketmq_client_rust/base/
validators.rs1use std::collections::HashMap;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::config::TopicConfig;
21use rocketmq_common::common::constant::PermName;
22use rocketmq_common::common::message::MessageConst;
23use rocketmq_common::common::message::MessageTrait;
24use rocketmq_common::common::topic::TopicValidator;
25use rocketmq_error::mq_client_err;
26use rocketmq_remoting::code::response_code::ResponseCode;
27
28use crate::producer::default_mq_producer::ProducerConfig;
29
30pub struct Validators;
31
32impl Validators {
33 pub const CHARACTER_MAX_LENGTH: usize = 255;
34 pub const TOPIC_MAX_LENGTH: usize = 127;
35
36 pub fn check_group(group: &str) -> rocketmq_error::RocketMQResult<()> {
37 if group.trim().is_empty() {
38 return mq_client_err!("the specified group is blank");
39 }
40
41 if group.len() > Self::CHARACTER_MAX_LENGTH {
42 return mq_client_err!("the specified group is longer than group max length 255.");
43 }
44
45 if TopicValidator::is_topic_or_group_illegal(group) {
46 return mq_client_err!(format!(
47 "the specified group[{}] contains illegal characters, allowing only \
48 ^[%|a-zA-Z0-9_-]+$",
49 group
50 ));
51 }
52 Ok(())
53 }
54
55 pub fn check_message<M>(
56 msg: Option<&M>,
57 producer_config: &ProducerConfig,
58 ) -> rocketmq_error::RocketMQResult<()>
59 where
60 M: MessageTrait,
61 {
62 if msg.is_none() {
63 return mq_client_err!(
64 ResponseCode::MessageIllegal as i32,
65 "the message is null".to_string()
66 );
67 }
68 let msg = msg.unwrap();
69 Self::check_topic(msg.get_topic())?;
70 Self::is_not_allowed_send_topic(msg.get_topic())?;
71
72 if msg.get_body().is_none() {
73 return mq_client_err!(
74 ResponseCode::MessageIllegal as i32,
75 "the message body is null".to_string()
76 );
77 }
78
79 let length = msg.get_body().unwrap().len();
80 if length == 0 {
81 return mq_client_err!(
82 ResponseCode::MessageIllegal as i32,
83 "the message body length is zero".to_string()
84 );
85 }
86
87 if length > producer_config.max_message_size() as usize {
88 return mq_client_err!(
89 ResponseCode::MessageIllegal as i32,
90 format!(
91 "the message body size over max value, MAX: {}",
92 producer_config.max_message_size()
93 )
94 );
95 }
96
97 let lmq_path = msg.get_user_property(&CheetahString::from_static_str(
98 MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
99 ));
100 if let Some(value) = lmq_path {
101 if value.contains(std::path::MAIN_SEPARATOR) {
102 return mq_client_err!(
103 ResponseCode::MessageIllegal as i32,
104 format!(
105 "INNER_MULTI_DISPATCH {} can not contains {} character",
106 value,
107 std::path::MAIN_SEPARATOR
108 )
109 );
110 }
111 }
112
113 Ok(())
114 }
115
116 pub fn check_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
117 if topic.trim().is_empty() {
118 return mq_client_err!("The specified topic is blank");
119 }
120
121 if topic.len() > Self::TOPIC_MAX_LENGTH {
122 return mq_client_err!(format!(
123 "The specified topic is longer than topic max length {}.",
124 Self::TOPIC_MAX_LENGTH
125 ));
126 }
127
128 if TopicValidator::is_topic_or_group_illegal(topic) {
129 return mq_client_err!(format!(
130 "The specified topic[{}] contains illegal characters, allowing only \
131 ^[%|a-zA-Z0-9_-]+$",
132 topic
133 ));
134 }
135
136 Ok(())
137 }
138
139 pub fn is_system_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
140 if TopicValidator::is_system_topic(topic) {
141 return mq_client_err!(format!(
142 "The topic[{}] is conflict with system topic.",
143 topic
144 ));
145 }
146 Ok(())
147 }
148
149 pub fn is_not_allowed_send_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
150 if TopicValidator::is_not_allowed_send_topic(topic) {
151 return mq_client_err!(format!("Sending message to topic[{}] is forbidden.", topic));
152 }
153
154 Ok(())
155 }
156
157 pub fn check_topic_config(topic_config: &TopicConfig) -> rocketmq_error::RocketMQResult<()> {
158 if !PermName::is_valid(topic_config.perm) {
159 return mq_client_err!(
160 ResponseCode::NoPermission as i32,
161 format!("topicPermission value: {} is invalid.", topic_config.perm)
162 );
163 }
164
165 Ok(())
166 }
167
168 pub fn check_broker_config(
169 broker_config: &HashMap<String, String>,
170 ) -> rocketmq_error::RocketMQResult<()> {
171 if let Some(broker_permission) = broker_config.get("brokerPermission") {
172 if !PermName::is_valid(broker_permission.parse().unwrap()) {
173 return mq_client_err!(format!(
174 "brokerPermission value: {} is invalid.",
175 broker_permission
176 ));
177 }
178 }
179
180 Ok(())
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use std::collections::HashMap;
187
188 use rocketmq_common::common::config::TopicConfig;
189
190 use super::*;
191
192 #[test]
193 fn check_group_blank_group() {
194 let result = Validators::check_group("");
195 assert!(result.is_err());
196 }
197
198 #[test]
199 fn check_group_long_group() {
200 let long_group = "a".repeat(256);
201 let result = Validators::check_group(&long_group);
202 assert!(result.is_err());
203 }
204
205 #[test]
206 fn check_group_illegal_characters() {
207 let result = Validators::check_group("illegal@group");
208 assert!(result.is_err());
209 }
210
211 #[test]
212 fn check_group_valid_group() {
213 let result = Validators::check_group("valid_group");
214 assert!(result.is_ok());
215 }
216
217 #[test]
218 fn check_topic_blank_topic() {
219 let result = Validators::check_topic("");
220 assert!(result.is_err());
221 }
222
223 #[test]
224 fn check_topic_long_topic() {
225 let long_topic = "a".repeat(128);
226 let result = Validators::check_topic(&long_topic);
227 assert!(result.is_err());
228 }
229
230 #[test]
231 fn check_topic_illegal_characters() {
232 let result = Validators::check_topic("illegal@topic");
233 assert!(result.is_err());
234 }
235
236 #[test]
237 fn check_topic_valid_topic() {
238 let result = Validators::check_topic("valid_topic");
239 assert!(result.is_ok());
240 }
241
242 #[test]
243 fn check_topic_config_invalid_permission() {
244 let topic_config = TopicConfig {
245 perm: 999,
246 ..Default::default()
247 };
248 let result = Validators::check_topic_config(&topic_config);
249 assert!(result.is_err());
250 }
251
252 #[test]
253 fn check_topic_config_valid_permission() {
254 let topic_config = TopicConfig {
255 perm: 6,
256 ..Default::default()
257 };
258 let result = Validators::check_topic_config(&topic_config);
259 assert!(result.is_ok());
260 }
261
262 #[test]
263 fn check_broker_config_invalid_permission() {
264 let mut broker_config = HashMap::new();
265 broker_config.insert("brokerPermission".to_string(), "999".to_string());
266 let result = Validators::check_broker_config(&broker_config);
267 assert!(result.is_err());
268 }
269
270 #[test]
271 fn check_broker_config_valid_permission() {
272 let mut broker_config = HashMap::new();
273 broker_config.insert("brokerPermission".to_string(), "6".to_string());
274 let result = Validators::check_broker_config(&broker_config);
275 assert!(result.is_ok());
276 }
277}