rocketmq_client_rust/base/
validators.rs1use std::collections::HashMap;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::config::TopicConfig;
21use rocketmq_common::common::constant::PermName;
22use rocketmq_common::common::message::MessageConst;
23use rocketmq_common::common::message::MessageTrait;
24use rocketmq_common::common::topic::TopicValidator;
25use rocketmq_remoting::code::response_code::ResponseCode;
26
27use crate::producer::default_mq_producer::ProducerConfig;
28
29pub struct Validators;
30
31impl Validators {
32 pub const CHARACTER_MAX_LENGTH: usize = 255;
33 pub const TOPIC_MAX_LENGTH: usize = 127;
34
35 pub fn check_group(group: &str) -> rocketmq_error::RocketMQResult<()> {
36 if group.trim().is_empty() {
37 return Err(mq_client_err!("the specified group is blank"));
38 }
39
40 if group.len() > Self::CHARACTER_MAX_LENGTH {
41 return Err(mq_client_err!(
42 "the specified group is longer than group max length 255."
43 ));
44 }
45
46 if TopicValidator::is_topic_or_group_illegal(group) {
47 return Err(mq_client_err!(format!(
48 "the specified group[{}] contains illegal characters, allowing only \
49 ^[%|a-zA-Z0-9_-]+$",
50 group
51 )));
52 }
53 Ok(())
54 }
55
56 pub fn check_message<M>(
57 msg: Option<&M>,
58 producer_config: &ProducerConfig,
59 ) -> rocketmq_error::RocketMQResult<()>
60 where
61 M: MessageTrait,
62 {
63 if msg.is_none() {
64 return Err(mq_client_err!(
65 ResponseCode::MessageIllegal as i32,
66 "the message is null".to_string()
67 ));
68 }
69 let msg = msg.unwrap();
70 Self::check_topic(msg.get_topic())?;
71 Self::is_not_allowed_send_topic(msg.get_topic())?;
72
73 if msg.get_body().is_none() {
74 return Err(mq_client_err!(
75 ResponseCode::MessageIllegal as i32,
76 "the message body is null".to_string()
77 ));
78 }
79
80 let length = msg.get_body().unwrap().len();
81 if length == 0 {
82 return Err(mq_client_err!(
83 ResponseCode::MessageIllegal as i32,
84 "the message body length is zero".to_string()
85 ));
86 }
87
88 if length > producer_config.max_message_size() as usize {
89 return Err(mq_client_err!(
90 ResponseCode::MessageIllegal as i32,
91 format!(
92 "the message body size over max value, MAX: {}",
93 producer_config.max_message_size()
94 )
95 ));
96 }
97
98 let lmq_path = msg.get_user_property(&CheetahString::from_static_str(
99 MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
100 ));
101 if let Some(value) = lmq_path {
102 if value.contains(std::path::MAIN_SEPARATOR) {
103 return Err(mq_client_err!(
104 ResponseCode::MessageIllegal as i32,
105 format!(
106 "INNER_MULTI_DISPATCH {} can not contains {} character",
107 value,
108 std::path::MAIN_SEPARATOR
109 )
110 ));
111 }
112 }
113
114 Ok(())
115 }
116
117 pub fn check_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
118 if topic.trim().is_empty() {
119 return Err(mq_client_err!("The specified topic is blank"));
120 }
121
122 if topic.len() > Self::TOPIC_MAX_LENGTH {
123 return Err(mq_client_err!(format!(
124 "The specified topic is longer than topic max length {}.",
125 Self::TOPIC_MAX_LENGTH
126 )));
127 }
128
129 if TopicValidator::is_topic_or_group_illegal(topic) {
130 return Err(mq_client_err!(format!(
131 "The specified topic[{}] contains illegal characters, allowing only \
132 ^[%|a-zA-Z0-9_-]+$",
133 topic
134 )));
135 }
136
137 Ok(())
138 }
139
140 pub fn is_system_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
141 if TopicValidator::is_system_topic(topic) {
142 return Err(mq_client_err!(format!(
143 "The topic[{}] is conflict with system topic.",
144 topic
145 )));
146 }
147 Ok(())
148 }
149
150 pub fn is_not_allowed_send_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
151 if TopicValidator::is_not_allowed_send_topic(topic) {
152 return Err(mq_client_err!(format!(
153 "Sending message to topic[{}] is forbidden.",
154 topic
155 )));
156 }
157
158 Ok(())
159 }
160
161 pub fn check_topic_config(topic_config: &TopicConfig) -> rocketmq_error::RocketMQResult<()> {
162 if !PermName::is_valid(topic_config.perm) {
163 return Err(mq_client_err!(
164 ResponseCode::NoPermission as i32,
165 format!("topicPermission value: {} is invalid.", topic_config.perm)
166 ));
167 }
168
169 Ok(())
170 }
171
172 pub fn check_broker_config(
173 broker_config: &HashMap<String, String>,
174 ) -> rocketmq_error::RocketMQResult<()> {
175 if let Some(broker_permission) = broker_config.get("brokerPermission") {
176 if !PermName::is_valid(broker_permission.parse().unwrap()) {
177 return Err(mq_client_err!(format!(
178 "brokerPermission value: {} is invalid.",
179 broker_permission
180 )));
181 }
182 }
183
184 Ok(())
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::collections::HashMap;
191
192 use rocketmq_common::common::config::TopicConfig;
193
194 use super::*;
195
196 #[test]
197 fn check_group_blank_group() {
198 let result = Validators::check_group("");
199 assert!(result.is_err());
200 }
201
202 #[test]
203 fn check_group_long_group() {
204 let long_group = "a".repeat(256);
205 let result = Validators::check_group(&long_group);
206 assert!(result.is_err());
207 }
208
209 #[test]
210 fn check_group_illegal_characters() {
211 let result = Validators::check_group("illegal@group");
212 assert!(result.is_err());
213 }
214
215 #[test]
216 fn check_group_valid_group() {
217 let result = Validators::check_group("valid_group");
218 assert!(result.is_ok());
219 }
220
221 #[test]
222 fn check_topic_blank_topic() {
223 let result = Validators::check_topic("");
224 assert!(result.is_err());
225 }
226
227 #[test]
228 fn check_topic_long_topic() {
229 let long_topic = "a".repeat(128);
230 let result = Validators::check_topic(&long_topic);
231 assert!(result.is_err());
232 }
233
234 #[test]
235 fn check_topic_illegal_characters() {
236 let result = Validators::check_topic("illegal@topic");
237 assert!(result.is_err());
238 }
239
240 #[test]
241 fn check_topic_valid_topic() {
242 let result = Validators::check_topic("valid_topic");
243 assert!(result.is_ok());
244 }
245
246 #[test]
247 fn check_topic_config_invalid_permission() {
248 let topic_config = TopicConfig {
249 perm: 999,
250 ..Default::default()
251 };
252 let result = Validators::check_topic_config(&topic_config);
253 assert!(result.is_err());
254 }
255
256 #[test]
257 fn check_topic_config_valid_permission() {
258 let topic_config = TopicConfig {
259 perm: 6,
260 ..Default::default()
261 };
262 let result = Validators::check_topic_config(&topic_config);
263 assert!(result.is_ok());
264 }
265
266 #[test]
267 fn check_broker_config_invalid_permission() {
268 let mut broker_config = HashMap::new();
269 broker_config.insert("brokerPermission".to_string(), "999".to_string());
270 let result = Validators::check_broker_config(&broker_config);
271 assert!(result.is_err());
272 }
273
274 #[test]
275 fn check_broker_config_valid_permission() {
276 let mut broker_config = HashMap::new();
277 broker_config.insert("brokerPermission".to_string(), "6".to_string());
278 let result = Validators::check_broker_config(&broker_config);
279 assert!(result.is_ok());
280 }
281}