rocketmq_client_rust/base/
validators.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::config::TopicConfig;
21use rocketmq_common::common::constant::PermName;
22use rocketmq_common::common::message::MessageConst;
23use rocketmq_common::common::message::MessageTrait;
24use rocketmq_common::common::topic::TopicValidator;
25use rocketmq_remoting::code::response_code::ResponseCode;
26
27use crate::producer::default_mq_producer::ProducerConfig;
28
29pub struct Validators;
30
31impl Validators {
32    pub const CHARACTER_MAX_LENGTH: usize = 255;
33    pub const TOPIC_MAX_LENGTH: usize = 127;
34
35    pub fn check_group(group: &str) -> rocketmq_error::RocketMQResult<()> {
36        if group.trim().is_empty() {
37            return Err(mq_client_err!("the specified group is blank"));
38        }
39
40        if group.len() > Self::CHARACTER_MAX_LENGTH {
41            return Err(mq_client_err!(
42                "the specified group is longer than group max length 255."
43            ));
44        }
45
46        if TopicValidator::is_topic_or_group_illegal(group) {
47            return Err(mq_client_err!(format!(
48                "the specified group[{}] contains illegal characters, allowing only \
49                 ^[%|a-zA-Z0-9_-]+$",
50                group
51            )));
52        }
53        Ok(())
54    }
55
56    pub fn check_message<M>(
57        msg: Option<&M>,
58        producer_config: &ProducerConfig,
59    ) -> rocketmq_error::RocketMQResult<()>
60    where
61        M: MessageTrait,
62    {
63        if msg.is_none() {
64            return Err(mq_client_err!(
65                ResponseCode::MessageIllegal as i32,
66                "the message is null".to_string()
67            ));
68        }
69        let msg = msg.unwrap();
70        Self::check_topic(msg.get_topic())?;
71        Self::is_not_allowed_send_topic(msg.get_topic())?;
72
73        if msg.get_body().is_none() {
74            return Err(mq_client_err!(
75                ResponseCode::MessageIllegal as i32,
76                "the message body is null".to_string()
77            ));
78        }
79
80        let length = msg.get_body().unwrap().len();
81        if length == 0 {
82            return Err(mq_client_err!(
83                ResponseCode::MessageIllegal as i32,
84                "the message body length is zero".to_string()
85            ));
86        }
87
88        if length > producer_config.max_message_size() as usize {
89            return Err(mq_client_err!(
90                ResponseCode::MessageIllegal as i32,
91                format!(
92                    "the message body size over max value, MAX: {}",
93                    producer_config.max_message_size()
94                )
95            ));
96        }
97
98        let lmq_path = msg.get_user_property(&CheetahString::from_static_str(
99            MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
100        ));
101        if let Some(value) = lmq_path {
102            if value.contains(std::path::MAIN_SEPARATOR) {
103                return Err(mq_client_err!(
104                    ResponseCode::MessageIllegal as i32,
105                    format!(
106                        "INNER_MULTI_DISPATCH {} can not contains {} character",
107                        value,
108                        std::path::MAIN_SEPARATOR
109                    )
110                ));
111            }
112        }
113
114        Ok(())
115    }
116
117    pub fn check_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
118        if topic.trim().is_empty() {
119            return Err(mq_client_err!("The specified topic is blank"));
120        }
121
122        if topic.len() > Self::TOPIC_MAX_LENGTH {
123            return Err(mq_client_err!(format!(
124                "The specified topic is longer than topic max length {}.",
125                Self::TOPIC_MAX_LENGTH
126            )));
127        }
128
129        if TopicValidator::is_topic_or_group_illegal(topic) {
130            return Err(mq_client_err!(format!(
131                "The specified topic[{}] contains illegal characters, allowing only \
132                 ^[%|a-zA-Z0-9_-]+$",
133                topic
134            )));
135        }
136
137        Ok(())
138    }
139
140    pub fn is_system_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
141        if TopicValidator::is_system_topic(topic) {
142            return Err(mq_client_err!(format!(
143                "The topic[{}] is conflict with system topic.",
144                topic
145            )));
146        }
147        Ok(())
148    }
149
150    pub fn is_not_allowed_send_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
151        if TopicValidator::is_not_allowed_send_topic(topic) {
152            return Err(mq_client_err!(format!(
153                "Sending message to topic[{}] is forbidden.",
154                topic
155            )));
156        }
157
158        Ok(())
159    }
160
161    pub fn check_topic_config(topic_config: &TopicConfig) -> rocketmq_error::RocketMQResult<()> {
162        if !PermName::is_valid(topic_config.perm) {
163            return Err(mq_client_err!(
164                ResponseCode::NoPermission as i32,
165                format!("topicPermission value: {} is invalid.", topic_config.perm)
166            ));
167        }
168
169        Ok(())
170    }
171
172    pub fn check_broker_config(
173        broker_config: &HashMap<String, String>,
174    ) -> rocketmq_error::RocketMQResult<()> {
175        if let Some(broker_permission) = broker_config.get("brokerPermission") {
176            if !PermName::is_valid(broker_permission.parse().unwrap()) {
177                return Err(mq_client_err!(format!(
178                    "brokerPermission value: {} is invalid.",
179                    broker_permission
180                )));
181            }
182        }
183
184        Ok(())
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::collections::HashMap;
191
192    use rocketmq_common::common::config::TopicConfig;
193
194    use super::*;
195
196    #[test]
197    fn check_group_blank_group() {
198        let result = Validators::check_group("");
199        assert!(result.is_err());
200    }
201
202    #[test]
203    fn check_group_long_group() {
204        let long_group = "a".repeat(256);
205        let result = Validators::check_group(&long_group);
206        assert!(result.is_err());
207    }
208
209    #[test]
210    fn check_group_illegal_characters() {
211        let result = Validators::check_group("illegal@group");
212        assert!(result.is_err());
213    }
214
215    #[test]
216    fn check_group_valid_group() {
217        let result = Validators::check_group("valid_group");
218        assert!(result.is_ok());
219    }
220
221    #[test]
222    fn check_topic_blank_topic() {
223        let result = Validators::check_topic("");
224        assert!(result.is_err());
225    }
226
227    #[test]
228    fn check_topic_long_topic() {
229        let long_topic = "a".repeat(128);
230        let result = Validators::check_topic(&long_topic);
231        assert!(result.is_err());
232    }
233
234    #[test]
235    fn check_topic_illegal_characters() {
236        let result = Validators::check_topic("illegal@topic");
237        assert!(result.is_err());
238    }
239
240    #[test]
241    fn check_topic_valid_topic() {
242        let result = Validators::check_topic("valid_topic");
243        assert!(result.is_ok());
244    }
245
246    #[test]
247    fn check_topic_config_invalid_permission() {
248        let topic_config = TopicConfig {
249            perm: 999,
250            ..Default::default()
251        };
252        let result = Validators::check_topic_config(&topic_config);
253        assert!(result.is_err());
254    }
255
256    #[test]
257    fn check_topic_config_valid_permission() {
258        let topic_config = TopicConfig {
259            perm: 6,
260            ..Default::default()
261        };
262        let result = Validators::check_topic_config(&topic_config);
263        assert!(result.is_ok());
264    }
265
266    #[test]
267    fn check_broker_config_invalid_permission() {
268        let mut broker_config = HashMap::new();
269        broker_config.insert("brokerPermission".to_string(), "999".to_string());
270        let result = Validators::check_broker_config(&broker_config);
271        assert!(result.is_err());
272    }
273
274    #[test]
275    fn check_broker_config_valid_permission() {
276        let mut broker_config = HashMap::new();
277        broker_config.insert("brokerPermission".to_string(), "6".to_string());
278        let result = Validators::check_broker_config(&broker_config);
279        assert!(result.is_ok());
280    }
281}