rocketmq_client_rust/base/
validators.rs1use std::collections::HashMap;
16
17use cheetah_string::CheetahString;
18use rocketmq_common::common::config::TopicConfig;
19use rocketmq_common::common::constant::PermName;
20use rocketmq_common::common::message::MessageConst;
21use rocketmq_common::common::message::MessageTrait;
22use rocketmq_common::common::topic::TopicValidator;
23use rocketmq_remoting::code::response_code::ResponseCode;
24
25use crate::producer::default_mq_producer::ProducerConfig;
26
27pub struct Validators;
28
29impl Validators {
30 pub const CHARACTER_MAX_LENGTH: usize = 255;
31 pub const TOPIC_MAX_LENGTH: usize = 127;
32
33 pub fn check_group(group: &str) -> rocketmq_error::RocketMQResult<()> {
34 if group.trim().is_empty() {
35 return Err(mq_client_err!("the specified group is blank"));
36 }
37
38 if group.len() > Self::CHARACTER_MAX_LENGTH {
39 return Err(mq_client_err!(
40 "the specified group is longer than group max length 255."
41 ));
42 }
43
44 if TopicValidator::is_topic_or_group_illegal(group) {
45 return Err(mq_client_err!(format!(
46 "the specified group[{}] contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$",
47 group
48 )));
49 }
50 Ok(())
51 }
52
53 pub fn check_message<M>(msg: Option<&M>, producer_config: &ProducerConfig) -> rocketmq_error::RocketMQResult<()>
54 where
55 M: MessageTrait,
56 {
57 if msg.is_none() {
58 return Err(mq_client_err!(
59 ResponseCode::MessageIllegal as i32,
60 "the message is null".to_string()
61 ));
62 }
63 let msg = msg.unwrap();
64 Self::check_topic(msg.topic())?;
65 Self::is_not_allowed_send_topic(msg.topic())?;
66
67 if msg.get_body().is_none() {
68 return Err(mq_client_err!(
69 ResponseCode::MessageIllegal as i32,
70 "the message body is null".to_string()
71 ));
72 }
73
74 let length = msg.get_body().unwrap().len();
75 if length == 0 {
76 return Err(mq_client_err!(
77 ResponseCode::MessageIllegal as i32,
78 "the message body length is zero".to_string()
79 ));
80 }
81
82 if length > producer_config.max_message_size() as usize {
83 return Err(mq_client_err!(
84 ResponseCode::MessageIllegal as i32,
85 format!(
86 "the message body size over max value, MAX: {}",
87 producer_config.max_message_size()
88 )
89 ));
90 }
91
92 let lmq_path = msg.user_property(&CheetahString::from_static_str(
93 MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
94 ));
95 if let Some(value) = lmq_path {
96 if value.contains(std::path::MAIN_SEPARATOR) {
97 return Err(mq_client_err!(
98 ResponseCode::MessageIllegal as i32,
99 format!(
100 "INNER_MULTI_DISPATCH {} can not contains {} character",
101 value,
102 std::path::MAIN_SEPARATOR
103 )
104 ));
105 }
106 }
107
108 Ok(())
109 }
110
111 pub fn check_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
112 if topic.trim().is_empty() {
113 return Err(mq_client_err!("The specified topic is blank"));
114 }
115
116 if topic.len() > Self::TOPIC_MAX_LENGTH {
117 return Err(mq_client_err!(format!(
118 "The specified topic is longer than topic max length {}.",
119 Self::TOPIC_MAX_LENGTH
120 )));
121 }
122
123 if TopicValidator::is_topic_or_group_illegal(topic) {
124 return Err(mq_client_err!(format!(
125 "The specified topic[{}] contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$",
126 topic
127 )));
128 }
129
130 Ok(())
131 }
132
133 pub fn is_system_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
134 if TopicValidator::is_system_topic(topic) {
135 return Err(mq_client_err!(format!(
136 "The topic[{}] is conflict with system topic.",
137 topic
138 )));
139 }
140 Ok(())
141 }
142
143 pub fn is_not_allowed_send_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
144 if TopicValidator::is_not_allowed_send_topic(topic) {
145 return Err(mq_client_err!(format!(
146 "Sending message to topic[{}] is forbidden.",
147 topic
148 )));
149 }
150
151 Ok(())
152 }
153
154 pub fn check_topic_config(topic_config: &TopicConfig) -> rocketmq_error::RocketMQResult<()> {
155 if !PermName::is_valid(topic_config.perm) {
156 return Err(mq_client_err!(
157 ResponseCode::NoPermission as i32,
158 format!("topicPermission value: {} is invalid.", topic_config.perm)
159 ));
160 }
161
162 Ok(())
163 }
164
165 pub fn check_broker_config(broker_config: &HashMap<String, String>) -> rocketmq_error::RocketMQResult<()> {
166 if let Some(broker_permission) = broker_config.get("brokerPermission") {
167 if !PermName::is_valid(broker_permission.parse().unwrap()) {
168 return Err(mq_client_err!(format!(
169 "brokerPermission value: {} is invalid.",
170 broker_permission
171 )));
172 }
173 }
174
175 Ok(())
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::collections::HashMap;
182
183 use rocketmq_common::common::config::TopicConfig;
184
185 use super::*;
186
187 #[test]
188 fn check_group_blank_group() {
189 let result = Validators::check_group("");
190 assert!(result.is_err());
191 }
192
193 #[test]
194 fn check_group_long_group() {
195 let long_group = "a".repeat(256);
196 let result = Validators::check_group(&long_group);
197 assert!(result.is_err());
198 }
199
200 #[test]
201 fn check_group_illegal_characters() {
202 let result = Validators::check_group("illegal@group");
203 assert!(result.is_err());
204 }
205
206 #[test]
207 fn check_group_valid_group() {
208 let result = Validators::check_group("valid_group");
209 assert!(result.is_ok());
210 }
211
212 #[test]
213 fn check_topic_blank_topic() {
214 let result = Validators::check_topic("");
215 assert!(result.is_err());
216 }
217
218 #[test]
219 fn check_topic_long_topic() {
220 let long_topic = "a".repeat(128);
221 let result = Validators::check_topic(&long_topic);
222 assert!(result.is_err());
223 }
224
225 #[test]
226 fn check_topic_illegal_characters() {
227 let result = Validators::check_topic("illegal@topic");
228 assert!(result.is_err());
229 }
230
231 #[test]
232 fn check_topic_valid_topic() {
233 let result = Validators::check_topic("valid_topic");
234 assert!(result.is_ok());
235 }
236
237 #[test]
238 fn check_topic_config_invalid_permission() {
239 let topic_config = TopicConfig {
240 perm: 999,
241 ..Default::default()
242 };
243 let result = Validators::check_topic_config(&topic_config);
244 assert!(result.is_err());
245 }
246
247 #[test]
248 fn check_topic_config_valid_permission() {
249 let topic_config = TopicConfig {
250 perm: 6,
251 ..Default::default()
252 };
253 let result = Validators::check_topic_config(&topic_config);
254 assert!(result.is_ok());
255 }
256
257 #[test]
258 fn check_broker_config_invalid_permission() {
259 let mut broker_config = HashMap::new();
260 broker_config.insert("brokerPermission".to_string(), "999".to_string());
261 let result = Validators::check_broker_config(&broker_config);
262 assert!(result.is_err());
263 }
264
265 #[test]
266 fn check_broker_config_valid_permission() {
267 let mut broker_config = HashMap::new();
268 broker_config.insert("brokerPermission".to_string(), "6".to_string());
269 let result = Validators::check_broker_config(&broker_config);
270 assert!(result.is_ok());
271 }
272}