Skip to main content

rocketmq_client_rust/base/
client_config_validation.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Client configuration validation module
16//!
17//! This module provides validation functions for ClientConfig fields,
18//! ensuring that configuration values are within acceptable bounds and
19//! properly formatted.
20
21use rocketmq_error::RocketMQError;
22use rocketmq_error::RocketMQResult;
23
24/// Validator for ClientConfig fields
25///
26/// Provides validation methods for all configuration parameters,
27/// ensuring they meet RocketMQ requirements.
28pub struct ClientConfigValidator;
29
30impl ClientConfigValidator {
31    // =========================================================================
32    // Validation Constants
33    // =========================================================================
34
35    /// Minimum poll name server interval (10 seconds)
36    pub const MIN_POLL_NAME_SERVER_INTERVAL: u32 = 10_000;
37
38    /// Maximum poll name server interval (10 minutes)
39    pub const MAX_POLL_NAME_SERVER_INTERVAL: u32 = 600_000;
40
41    /// Minimum heartbeat broker interval (10 seconds)
42    pub const MIN_HEARTBEAT_BROKER_INTERVAL: u32 = 10_000;
43
44    /// Maximum heartbeat broker interval (10 minutes)
45    pub const MAX_HEARTBEAT_BROKER_INTERVAL: u32 = 600_000;
46
47    /// Minimum persist consumer offset interval (1 second)
48    pub const MIN_PERSIST_CONSUMER_OFFSET_INTERVAL: u32 = 1_000;
49
50    /// Maximum persist consumer offset interval (1 minute)
51    pub const MAX_PERSIST_CONSUMER_OFFSET_INTERVAL: u32 = 60_000;
52
53    /// Minimum trace message batch number
54    pub const MIN_TRACE_MSG_BATCH_NUM: usize = 1;
55
56    /// Maximum trace message batch number
57    pub const MAX_TRACE_MSG_BATCH_NUM: usize = 10_000;
58
59    /// Minimum max page size in get metadata
60    pub const MIN_MAX_PAGE_SIZE_IN_GET_METADATA: usize = 1;
61
62    /// Maximum max page size in get metadata
63    pub const MAX_MAX_PAGE_SIZE_IN_GET_METADATA: usize = 100_000;
64
65    /// Minimum MQ client API timeout (100ms)
66    pub const MIN_MQ_CLIENT_API_TIMEOUT: u64 = 100;
67
68    /// Maximum MQ client API timeout (60 seconds)
69    pub const MAX_MQ_CLIENT_API_TIMEOUT: u64 = 60_000;
70
71    // =========================================================================
72    // Validation Methods
73    // =========================================================================
74
75    /// Validate poll name server interval
76    ///
77    /// Ensures the interval is between 10 seconds and 10 minutes.
78    pub fn validate_poll_name_server_interval(interval: u32) -> RocketMQResult<()> {
79        if !(Self::MIN_POLL_NAME_SERVER_INTERVAL..=Self::MAX_POLL_NAME_SERVER_INTERVAL).contains(&interval) {
80            return Err(RocketMQError::ConfigInvalidValue {
81                key: "poll_name_server_interval",
82                value: interval.to_string(),
83                reason: format!(
84                    "must be between {} and {} milliseconds",
85                    Self::MIN_POLL_NAME_SERVER_INTERVAL,
86                    Self::MAX_POLL_NAME_SERVER_INTERVAL
87                ),
88            });
89        }
90        Ok(())
91    }
92
93    /// Validate heartbeat broker interval
94    ///
95    /// Ensures the interval is between 10 seconds and 10 minutes.
96    pub fn validate_heartbeat_broker_interval(interval: u32) -> RocketMQResult<()> {
97        if !(Self::MIN_HEARTBEAT_BROKER_INTERVAL..=Self::MAX_HEARTBEAT_BROKER_INTERVAL).contains(&interval) {
98            return Err(RocketMQError::ConfigInvalidValue {
99                key: "heartbeat_broker_interval",
100                value: interval.to_string(),
101                reason: format!(
102                    "must be between {} and {} milliseconds",
103                    Self::MIN_HEARTBEAT_BROKER_INTERVAL,
104                    Self::MAX_HEARTBEAT_BROKER_INTERVAL
105                ),
106            });
107        }
108        Ok(())
109    }
110
111    /// Validate persist consumer offset interval
112    ///
113    /// Ensures the interval is between 1 second and 1 minute.
114    pub fn validate_persist_consumer_offset_interval(interval: u32) -> RocketMQResult<()> {
115        if !(Self::MIN_PERSIST_CONSUMER_OFFSET_INTERVAL..=Self::MAX_PERSIST_CONSUMER_OFFSET_INTERVAL)
116            .contains(&interval)
117        {
118            return Err(RocketMQError::ConfigInvalidValue {
119                key: "persist_consumer_offset_interval",
120                value: interval.to_string(),
121                reason: format!(
122                    "must be between {} and {} milliseconds",
123                    Self::MIN_PERSIST_CONSUMER_OFFSET_INTERVAL,
124                    Self::MAX_PERSIST_CONSUMER_OFFSET_INTERVAL
125                ),
126            });
127        }
128        Ok(())
129    }
130
131    /// Validate trace message batch number
132    ///
133    /// Ensures the batch number is between 1 and 10,000.
134    pub fn validate_trace_msg_batch_num(num: usize) -> RocketMQResult<()> {
135        if !(Self::MIN_TRACE_MSG_BATCH_NUM..=Self::MAX_TRACE_MSG_BATCH_NUM).contains(&num) {
136            return Err(RocketMQError::ConfigInvalidValue {
137                key: "trace_msg_batch_num",
138                value: num.to_string(),
139                reason: format!(
140                    "must be between {} and {}",
141                    Self::MIN_TRACE_MSG_BATCH_NUM,
142                    Self::MAX_TRACE_MSG_BATCH_NUM
143                ),
144            });
145        }
146        Ok(())
147    }
148
149    /// Validate max page size in get metadata
150    ///
151    /// Ensures the page size is between 1 and 100,000.
152    pub fn validate_max_page_size_in_get_metadata(size: usize) -> RocketMQResult<()> {
153        if !(Self::MIN_MAX_PAGE_SIZE_IN_GET_METADATA..=Self::MAX_MAX_PAGE_SIZE_IN_GET_METADATA).contains(&size) {
154            return Err(RocketMQError::ConfigInvalidValue {
155                key: "max_page_size_in_get_metadata",
156                value: size.to_string(),
157                reason: format!(
158                    "must be between {} and {}",
159                    Self::MIN_MAX_PAGE_SIZE_IN_GET_METADATA,
160                    Self::MAX_MAX_PAGE_SIZE_IN_GET_METADATA
161                ),
162            });
163        }
164        Ok(())
165    }
166
167    /// Validate MQ client API timeout
168    ///
169    /// Ensures the timeout is between 100ms and 60 seconds.
170    pub fn validate_mq_client_api_timeout(timeout: u64) -> RocketMQResult<()> {
171        if !(Self::MIN_MQ_CLIENT_API_TIMEOUT..=Self::MAX_MQ_CLIENT_API_TIMEOUT).contains(&timeout) {
172            return Err(RocketMQError::ConfigInvalidValue {
173                key: "mq_client_api_timeout",
174                value: timeout.to_string(),
175                reason: format!(
176                    "must be between {} and {} milliseconds",
177                    Self::MIN_MQ_CLIENT_API_TIMEOUT,
178                    Self::MAX_MQ_CLIENT_API_TIMEOUT
179                ),
180            });
181        }
182        Ok(())
183    }
184
185    /// Validate concurrent heartbeat thread pool size
186    ///
187    /// Ensures the thread pool size is greater than 0 and doesn't exceed
188    /// a reasonable maximum (CPU cores * 4).
189    pub fn validate_concurrent_heartbeat_thread_pool_size(size: usize) -> RocketMQResult<()> {
190        if size == 0 {
191            return Err(RocketMQError::ConfigInvalidValue {
192                key: "concurrent_heartbeat_thread_pool_size",
193                value: size.to_string(),
194                reason: "must be greater than 0".to_string(),
195            });
196        }
197
198        // Get CPU count with a reasonable default if num_cpus fails
199        #[allow(clippy::redundant_closure)]
200        let cpu_count = std::panic::catch_unwind(|| num_cpus::get()).unwrap_or(4);
201        let max_size = cpu_count * 4;
202
203        if size > max_size {
204            return Err(RocketMQError::ConfigInvalidValue {
205                key: "concurrent_heartbeat_thread_pool_size",
206                value: size.to_string(),
207                reason: format!("should not exceed {} (CPU cores * 4)", max_size),
208            });
209        }
210
211        Ok(())
212    }
213
214    /// Validate client callback executor threads
215    ///
216    /// Ensures the thread count is greater than 0.
217    pub fn validate_client_callback_executor_threads(threads: usize) -> RocketMQResult<()> {
218        if threads == 0 {
219            return Err(RocketMQError::ConfigInvalidValue {
220                key: "client_callback_executor_threads",
221                value: threads.to_string(),
222                reason: "must be greater than 0".to_string(),
223            });
224        }
225
226        // Get CPU count with a reasonable default if num_cpus fails
227        #[allow(clippy::redundant_closure)]
228        let cpu_count = std::panic::catch_unwind(|| num_cpus::get()).unwrap_or(4);
229        let max_threads = cpu_count * 8;
230
231        if threads > max_threads {
232            return Err(RocketMQError::ConfigInvalidValue {
233                key: "client_callback_executor_threads",
234                value: threads.to_string(),
235                reason: format!("should not exceed {} (CPU cores * 8)", max_threads),
236            });
237        }
238
239        Ok(())
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_validate_poll_name_server_interval_valid() {
249        assert!(ClientConfigValidator::validate_poll_name_server_interval(30_000).is_ok());
250        assert!(ClientConfigValidator::validate_poll_name_server_interval(10_000).is_ok());
251        assert!(ClientConfigValidator::validate_poll_name_server_interval(600_000).is_ok());
252    }
253
254    #[test]
255    fn test_validate_poll_name_server_interval_invalid() {
256        assert!(ClientConfigValidator::validate_poll_name_server_interval(1_000).is_err());
257        assert!(ClientConfigValidator::validate_poll_name_server_interval(700_000).is_err());
258    }
259
260    #[test]
261    fn test_validate_heartbeat_broker_interval_valid() {
262        assert!(ClientConfigValidator::validate_heartbeat_broker_interval(30_000).is_ok());
263        assert!(ClientConfigValidator::validate_heartbeat_broker_interval(10_000).is_ok());
264        assert!(ClientConfigValidator::validate_heartbeat_broker_interval(600_000).is_ok());
265    }
266
267    #[test]
268    fn test_validate_heartbeat_broker_interval_invalid() {
269        assert!(ClientConfigValidator::validate_heartbeat_broker_interval(1_000).is_err());
270        assert!(ClientConfigValidator::validate_heartbeat_broker_interval(700_000).is_err());
271    }
272
273    #[test]
274    fn test_validate_trace_msg_batch_num_valid() {
275        assert!(ClientConfigValidator::validate_trace_msg_batch_num(1).is_ok());
276        assert!(ClientConfigValidator::validate_trace_msg_batch_num(10).is_ok());
277        assert!(ClientConfigValidator::validate_trace_msg_batch_num(10_000).is_ok());
278    }
279
280    #[test]
281    fn test_validate_trace_msg_batch_num_invalid() {
282        assert!(ClientConfigValidator::validate_trace_msg_batch_num(0).is_err());
283        assert!(ClientConfigValidator::validate_trace_msg_batch_num(10_001).is_err());
284    }
285
286    #[test]
287    fn test_validate_concurrent_heartbeat_thread_pool_size_valid() {
288        assert!(ClientConfigValidator::validate_concurrent_heartbeat_thread_pool_size(1).is_ok());
289        assert!(ClientConfigValidator::validate_concurrent_heartbeat_thread_pool_size(4).is_ok());
290    }
291
292    #[test]
293    fn test_validate_mq_client_api_timeout_valid() {
294        assert!(ClientConfigValidator::validate_mq_client_api_timeout(3_000).is_ok());
295        assert!(ClientConfigValidator::validate_mq_client_api_timeout(100).is_ok());
296        assert!(ClientConfigValidator::validate_mq_client_api_timeout(60_000).is_ok());
297    }
298
299    #[test]
300    fn test_validate_mq_client_api_timeout_invalid() {
301        assert!(ClientConfigValidator::validate_mq_client_api_timeout(50).is_err());
302        assert!(ClientConfigValidator::validate_mq_client_api_timeout(70_000).is_err());
303    }
304}