rocketmq_client_rust/base/
validators.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::config::TopicConfig;
21use rocketmq_common::common::constant::PermName;
22use rocketmq_common::common::message::MessageConst;
23use rocketmq_common::common::message::MessageTrait;
24use rocketmq_common::common::topic::TopicValidator;
25use rocketmq_error::mq_client_err;
26use rocketmq_remoting::code::response_code::ResponseCode;
27
28use crate::producer::default_mq_producer::ProducerConfig;
29
30pub struct Validators;
31
32impl Validators {
33    pub const CHARACTER_MAX_LENGTH: usize = 255;
34    pub const TOPIC_MAX_LENGTH: usize = 127;
35
36    pub fn check_group(group: &str) -> rocketmq_error::RocketMQResult<()> {
37        if group.trim().is_empty() {
38            return mq_client_err!("the specified group is blank");
39        }
40
41        if group.len() > Self::CHARACTER_MAX_LENGTH {
42            return mq_client_err!("the specified group is longer than group max length 255.");
43        }
44
45        if TopicValidator::is_topic_or_group_illegal(group) {
46            return mq_client_err!(format!(
47                "the specified group[{}] contains illegal characters, allowing only \
48                 ^[%|a-zA-Z0-9_-]+$",
49                group
50            ));
51        }
52        Ok(())
53    }
54
55    pub fn check_message<M>(
56        msg: Option<&M>,
57        producer_config: &ProducerConfig,
58    ) -> rocketmq_error::RocketMQResult<()>
59    where
60        M: MessageTrait,
61    {
62        if msg.is_none() {
63            return mq_client_err!(
64                ResponseCode::MessageIllegal as i32,
65                "the message is null".to_string()
66            );
67        }
68        let msg = msg.unwrap();
69        Self::check_topic(msg.get_topic())?;
70        Self::is_not_allowed_send_topic(msg.get_topic())?;
71
72        if msg.get_body().is_none() {
73            return mq_client_err!(
74                ResponseCode::MessageIllegal as i32,
75                "the message body is null".to_string()
76            );
77        }
78
79        let length = msg.get_body().unwrap().len();
80        if length == 0 {
81            return mq_client_err!(
82                ResponseCode::MessageIllegal as i32,
83                "the message body length is zero".to_string()
84            );
85        }
86
87        if length > producer_config.max_message_size() as usize {
88            return mq_client_err!(
89                ResponseCode::MessageIllegal as i32,
90                format!(
91                    "the message body size over max value, MAX: {}",
92                    producer_config.max_message_size()
93                )
94            );
95        }
96
97        let lmq_path = msg.get_user_property(&CheetahString::from_static_str(
98            MessageConst::PROPERTY_INNER_MULTI_DISPATCH,
99        ));
100        if let Some(value) = lmq_path {
101            if value.contains(std::path::MAIN_SEPARATOR) {
102                return mq_client_err!(
103                    ResponseCode::MessageIllegal as i32,
104                    format!(
105                        "INNER_MULTI_DISPATCH {} can not contains {} character",
106                        value,
107                        std::path::MAIN_SEPARATOR
108                    )
109                );
110            }
111        }
112
113        Ok(())
114    }
115
116    pub fn check_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
117        if topic.trim().is_empty() {
118            return mq_client_err!("The specified topic is blank");
119        }
120
121        if topic.len() > Self::TOPIC_MAX_LENGTH {
122            return mq_client_err!(format!(
123                "The specified topic is longer than topic max length {}.",
124                Self::TOPIC_MAX_LENGTH
125            ));
126        }
127
128        if TopicValidator::is_topic_or_group_illegal(topic) {
129            return mq_client_err!(format!(
130                "The specified topic[{}] contains illegal characters, allowing only \
131                 ^[%|a-zA-Z0-9_-]+$",
132                topic
133            ));
134        }
135
136        Ok(())
137    }
138
139    pub fn is_system_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
140        if TopicValidator::is_system_topic(topic) {
141            return mq_client_err!(format!(
142                "The topic[{}] is conflict with system topic.",
143                topic
144            ));
145        }
146        Ok(())
147    }
148
149    pub fn is_not_allowed_send_topic(topic: &str) -> rocketmq_error::RocketMQResult<()> {
150        if TopicValidator::is_not_allowed_send_topic(topic) {
151            return mq_client_err!(format!("Sending message to topic[{}] is forbidden.", topic));
152        }
153
154        Ok(())
155    }
156
157    pub fn check_topic_config(topic_config: &TopicConfig) -> rocketmq_error::RocketMQResult<()> {
158        if !PermName::is_valid(topic_config.perm) {
159            return mq_client_err!(
160                ResponseCode::NoPermission as i32,
161                format!("topicPermission value: {} is invalid.", topic_config.perm)
162            );
163        }
164
165        Ok(())
166    }
167
168    pub fn check_broker_config(
169        broker_config: &HashMap<String, String>,
170    ) -> rocketmq_error::RocketMQResult<()> {
171        if let Some(broker_permission) = broker_config.get("brokerPermission") {
172            if !PermName::is_valid(broker_permission.parse().unwrap()) {
173                return mq_client_err!(format!(
174                    "brokerPermission value: {} is invalid.",
175                    broker_permission
176                ));
177            }
178        }
179
180        Ok(())
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use std::collections::HashMap;
187
188    use rocketmq_common::common::config::TopicConfig;
189
190    use super::*;
191
192    #[test]
193    fn check_group_blank_group() {
194        let result = Validators::check_group("");
195        assert!(result.is_err());
196    }
197
198    #[test]
199    fn check_group_long_group() {
200        let long_group = "a".repeat(256);
201        let result = Validators::check_group(&long_group);
202        assert!(result.is_err());
203    }
204
205    #[test]
206    fn check_group_illegal_characters() {
207        let result = Validators::check_group("illegal@group");
208        assert!(result.is_err());
209    }
210
211    #[test]
212    fn check_group_valid_group() {
213        let result = Validators::check_group("valid_group");
214        assert!(result.is_ok());
215    }
216
217    #[test]
218    fn check_topic_blank_topic() {
219        let result = Validators::check_topic("");
220        assert!(result.is_err());
221    }
222
223    #[test]
224    fn check_topic_long_topic() {
225        let long_topic = "a".repeat(128);
226        let result = Validators::check_topic(&long_topic);
227        assert!(result.is_err());
228    }
229
230    #[test]
231    fn check_topic_illegal_characters() {
232        let result = Validators::check_topic("illegal@topic");
233        assert!(result.is_err());
234    }
235
236    #[test]
237    fn check_topic_valid_topic() {
238        let result = Validators::check_topic("valid_topic");
239        assert!(result.is_ok());
240    }
241
242    #[test]
243    fn check_topic_config_invalid_permission() {
244        let topic_config = TopicConfig {
245            perm: 999,
246            ..Default::default()
247        };
248        let result = Validators::check_topic_config(&topic_config);
249        assert!(result.is_err());
250    }
251
252    #[test]
253    fn check_topic_config_valid_permission() {
254        let topic_config = TopicConfig {
255            perm: 6,
256            ..Default::default()
257        };
258        let result = Validators::check_topic_config(&topic_config);
259        assert!(result.is_ok());
260    }
261
262    #[test]
263    fn check_broker_config_invalid_permission() {
264        let mut broker_config = HashMap::new();
265        broker_config.insert("brokerPermission".to_string(), "999".to_string());
266        let result = Validators::check_broker_config(&broker_config);
267        assert!(result.is_err());
268    }
269
270    #[test]
271    fn check_broker_config_valid_permission() {
272        let mut broker_config = HashMap::new();
273        broker_config.insert("brokerPermission".to_string(), "6".to_string());
274        let result = Validators::check_broker_config(&broker_config);
275        assert!(result.is_ok());
276    }
277}