rocketmq_client_rust/base/
client_config_validation.rs1use rocketmq_error::RocketMQError;
22use rocketmq_error::RocketMQResult;
23
24pub struct ClientConfigValidator;
29
30impl ClientConfigValidator {
31 pub const MIN_POLL_NAME_SERVER_INTERVAL: u32 = 10_000;
37
38 pub const MAX_POLL_NAME_SERVER_INTERVAL: u32 = 600_000;
40
41 pub const MIN_HEARTBEAT_BROKER_INTERVAL: u32 = 10_000;
43
44 pub const MAX_HEARTBEAT_BROKER_INTERVAL: u32 = 600_000;
46
47 pub const MIN_PERSIST_CONSUMER_OFFSET_INTERVAL: u32 = 1_000;
49
50 pub const MAX_PERSIST_CONSUMER_OFFSET_INTERVAL: u32 = 60_000;
52
53 pub const MIN_TRACE_MSG_BATCH_NUM: usize = 1;
55
56 pub const MAX_TRACE_MSG_BATCH_NUM: usize = 10_000;
58
59 pub const MIN_MAX_PAGE_SIZE_IN_GET_METADATA: usize = 1;
61
62 pub const MAX_MAX_PAGE_SIZE_IN_GET_METADATA: usize = 100_000;
64
65 pub const MIN_MQ_CLIENT_API_TIMEOUT: u64 = 100;
67
68 pub const MAX_MQ_CLIENT_API_TIMEOUT: u64 = 60_000;
70
71 pub fn validate_poll_name_server_interval(interval: u32) -> RocketMQResult<()> {
79 if !(Self::MIN_POLL_NAME_SERVER_INTERVAL..=Self::MAX_POLL_NAME_SERVER_INTERVAL).contains(&interval) {
80 return Err(RocketMQError::ConfigInvalidValue {
81 key: "poll_name_server_interval",
82 value: interval.to_string(),
83 reason: format!(
84 "must be between {} and {} milliseconds",
85 Self::MIN_POLL_NAME_SERVER_INTERVAL,
86 Self::MAX_POLL_NAME_SERVER_INTERVAL
87 ),
88 });
89 }
90 Ok(())
91 }
92
93 pub fn validate_heartbeat_broker_interval(interval: u32) -> RocketMQResult<()> {
97 if !(Self::MIN_HEARTBEAT_BROKER_INTERVAL..=Self::MAX_HEARTBEAT_BROKER_INTERVAL).contains(&interval) {
98 return Err(RocketMQError::ConfigInvalidValue {
99 key: "heartbeat_broker_interval",
100 value: interval.to_string(),
101 reason: format!(
102 "must be between {} and {} milliseconds",
103 Self::MIN_HEARTBEAT_BROKER_INTERVAL,
104 Self::MAX_HEARTBEAT_BROKER_INTERVAL
105 ),
106 });
107 }
108 Ok(())
109 }
110
111 pub fn validate_persist_consumer_offset_interval(interval: u32) -> RocketMQResult<()> {
115 if !(Self::MIN_PERSIST_CONSUMER_OFFSET_INTERVAL..=Self::MAX_PERSIST_CONSUMER_OFFSET_INTERVAL)
116 .contains(&interval)
117 {
118 return Err(RocketMQError::ConfigInvalidValue {
119 key: "persist_consumer_offset_interval",
120 value: interval.to_string(),
121 reason: format!(
122 "must be between {} and {} milliseconds",
123 Self::MIN_PERSIST_CONSUMER_OFFSET_INTERVAL,
124 Self::MAX_PERSIST_CONSUMER_OFFSET_INTERVAL
125 ),
126 });
127 }
128 Ok(())
129 }
130
131 pub fn validate_trace_msg_batch_num(num: usize) -> RocketMQResult<()> {
135 if !(Self::MIN_TRACE_MSG_BATCH_NUM..=Self::MAX_TRACE_MSG_BATCH_NUM).contains(&num) {
136 return Err(RocketMQError::ConfigInvalidValue {
137 key: "trace_msg_batch_num",
138 value: num.to_string(),
139 reason: format!(
140 "must be between {} and {}",
141 Self::MIN_TRACE_MSG_BATCH_NUM,
142 Self::MAX_TRACE_MSG_BATCH_NUM
143 ),
144 });
145 }
146 Ok(())
147 }
148
149 pub fn validate_max_page_size_in_get_metadata(size: usize) -> RocketMQResult<()> {
153 if !(Self::MIN_MAX_PAGE_SIZE_IN_GET_METADATA..=Self::MAX_MAX_PAGE_SIZE_IN_GET_METADATA).contains(&size) {
154 return Err(RocketMQError::ConfigInvalidValue {
155 key: "max_page_size_in_get_metadata",
156 value: size.to_string(),
157 reason: format!(
158 "must be between {} and {}",
159 Self::MIN_MAX_PAGE_SIZE_IN_GET_METADATA,
160 Self::MAX_MAX_PAGE_SIZE_IN_GET_METADATA
161 ),
162 });
163 }
164 Ok(())
165 }
166
167 pub fn validate_mq_client_api_timeout(timeout: u64) -> RocketMQResult<()> {
171 if !(Self::MIN_MQ_CLIENT_API_TIMEOUT..=Self::MAX_MQ_CLIENT_API_TIMEOUT).contains(&timeout) {
172 return Err(RocketMQError::ConfigInvalidValue {
173 key: "mq_client_api_timeout",
174 value: timeout.to_string(),
175 reason: format!(
176 "must be between {} and {} milliseconds",
177 Self::MIN_MQ_CLIENT_API_TIMEOUT,
178 Self::MAX_MQ_CLIENT_API_TIMEOUT
179 ),
180 });
181 }
182 Ok(())
183 }
184
185 pub fn validate_concurrent_heartbeat_thread_pool_size(size: usize) -> RocketMQResult<()> {
190 if size == 0 {
191 return Err(RocketMQError::ConfigInvalidValue {
192 key: "concurrent_heartbeat_thread_pool_size",
193 value: size.to_string(),
194 reason: "must be greater than 0".to_string(),
195 });
196 }
197
198 #[allow(clippy::redundant_closure)]
200 let cpu_count = std::panic::catch_unwind(|| num_cpus::get()).unwrap_or(4);
201 let max_size = cpu_count * 4;
202
203 if size > max_size {
204 return Err(RocketMQError::ConfigInvalidValue {
205 key: "concurrent_heartbeat_thread_pool_size",
206 value: size.to_string(),
207 reason: format!("should not exceed {} (CPU cores * 4)", max_size),
208 });
209 }
210
211 Ok(())
212 }
213
214 pub fn validate_client_callback_executor_threads(threads: usize) -> RocketMQResult<()> {
218 if threads == 0 {
219 return Err(RocketMQError::ConfigInvalidValue {
220 key: "client_callback_executor_threads",
221 value: threads.to_string(),
222 reason: "must be greater than 0".to_string(),
223 });
224 }
225
226 #[allow(clippy::redundant_closure)]
228 let cpu_count = std::panic::catch_unwind(|| num_cpus::get()).unwrap_or(4);
229 let max_threads = cpu_count * 8;
230
231 if threads > max_threads {
232 return Err(RocketMQError::ConfigInvalidValue {
233 key: "client_callback_executor_threads",
234 value: threads.to_string(),
235 reason: format!("should not exceed {} (CPU cores * 8)", max_threads),
236 });
237 }
238
239 Ok(())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_validate_poll_name_server_interval_valid() {
249 assert!(ClientConfigValidator::validate_poll_name_server_interval(30_000).is_ok());
250 assert!(ClientConfigValidator::validate_poll_name_server_interval(10_000).is_ok());
251 assert!(ClientConfigValidator::validate_poll_name_server_interval(600_000).is_ok());
252 }
253
254 #[test]
255 fn test_validate_poll_name_server_interval_invalid() {
256 assert!(ClientConfigValidator::validate_poll_name_server_interval(1_000).is_err());
257 assert!(ClientConfigValidator::validate_poll_name_server_interval(700_000).is_err());
258 }
259
260 #[test]
261 fn test_validate_heartbeat_broker_interval_valid() {
262 assert!(ClientConfigValidator::validate_heartbeat_broker_interval(30_000).is_ok());
263 assert!(ClientConfigValidator::validate_heartbeat_broker_interval(10_000).is_ok());
264 assert!(ClientConfigValidator::validate_heartbeat_broker_interval(600_000).is_ok());
265 }
266
267 #[test]
268 fn test_validate_heartbeat_broker_interval_invalid() {
269 assert!(ClientConfigValidator::validate_heartbeat_broker_interval(1_000).is_err());
270 assert!(ClientConfigValidator::validate_heartbeat_broker_interval(700_000).is_err());
271 }
272
273 #[test]
274 fn test_validate_trace_msg_batch_num_valid() {
275 assert!(ClientConfigValidator::validate_trace_msg_batch_num(1).is_ok());
276 assert!(ClientConfigValidator::validate_trace_msg_batch_num(10).is_ok());
277 assert!(ClientConfigValidator::validate_trace_msg_batch_num(10_000).is_ok());
278 }
279
280 #[test]
281 fn test_validate_trace_msg_batch_num_invalid() {
282 assert!(ClientConfigValidator::validate_trace_msg_batch_num(0).is_err());
283 assert!(ClientConfigValidator::validate_trace_msg_batch_num(10_001).is_err());
284 }
285
286 #[test]
287 fn test_validate_concurrent_heartbeat_thread_pool_size_valid() {
288 assert!(ClientConfigValidator::validate_concurrent_heartbeat_thread_pool_size(1).is_ok());
289 assert!(ClientConfigValidator::validate_concurrent_heartbeat_thread_pool_size(4).is_ok());
290 }
291
292 #[test]
293 fn test_validate_mq_client_api_timeout_valid() {
294 assert!(ClientConfigValidator::validate_mq_client_api_timeout(3_000).is_ok());
295 assert!(ClientConfigValidator::validate_mq_client_api_timeout(100).is_ok());
296 assert!(ClientConfigValidator::validate_mq_client_api_timeout(60_000).is_ok());
297 }
298
299 #[test]
300 fn test_validate_mq_client_api_timeout_invalid() {
301 assert!(ClientConfigValidator::validate_mq_client_api_timeout(50).is_err());
302 assert!(ClientConfigValidator::validate_mq_client_api_timeout(70_000).is_err());
303 }
304}