Skip to main content

rocketmq_client_rust/base/
client_config_builder.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! ClientConfig builder module
16//!
17//! This module provides a fluent builder API for constructing ClientConfig instances,
18//! making configuration more readable and allowing for validation before building.
19
20use cheetah_string::CheetahString;
21use rocketmq_error::RocketMQResult;
22use rocketmq_remoting::protocol::LanguageCode;
23
24use super::access_channel::AccessChannel;
25use super::client_config::ClientConfig;
26use super::client_config_validation::ClientConfigValidator;
27
28/// Builder for creating [`ClientConfig`] instances with a fluent API
29///
30/// # Example
31///
32/// ```rust
33/// use rocketmq_client_rust::base::client_config::ClientConfig;
34///
35/// let config = ClientConfig::builder()
36///     .namesrv_addr("localhost:9876")
37///     .instance_name("my_producer")
38///     .enable_tls(true)
39///     .poll_name_server_interval(60_000)
40///     .build()
41///     .unwrap();
42/// ```
43pub struct ClientConfigBuilder {
44    config: ClientConfig,
45}
46
47impl Default for ClientConfigBuilder {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl ClientConfigBuilder {
54    /// Creates a new builder with default configuration
55    pub fn new() -> Self {
56        Self {
57            config: ClientConfig::new(),
58        }
59    }
60
61    // ========================================================================
62    // Name server configuration
63    // ========================================================================
64
65    /// Sets the name server address
66    pub fn namesrv_addr(mut self, addr: impl Into<CheetahString>) -> Self {
67        self.config.set_namesrv_addr(addr.into());
68        self
69    }
70
71    // ========================================================================
72    // Client identification
73    // ========================================================================
74
75    /// Sets the client IP address
76    pub fn client_ip(mut self, ip: impl Into<CheetahString>) -> Self {
77        self.config.set_client_ip(ip.into());
78        self
79    }
80
81    /// Sets the instance name
82    pub fn instance_name(mut self, name: impl Into<CheetahString>) -> Self {
83        self.config.set_instance_name(name.into());
84        self
85    }
86
87    // ========================================================================
88    // Thread pool configuration--only java client has these options,
89    // but we keep them for compatibility and future use
90    // ========================================================================
91
92    /// Sets the number of client callback executor threads
93    pub fn client_callback_executor_threads(mut self, threads: usize) -> Self {
94        self.config.set_client_callback_executor_threads(threads);
95        self
96    }
97
98    /// Sets the concurrent heartbeat thread pool size
99    pub fn concurrent_heartbeat_thread_pool_size(mut self, size: usize) -> Self {
100        self.config.set_concurrent_heartbeat_thread_pool_size(size);
101        self
102    }
103
104    // ========================================================================
105    // Namespace configuration
106    // ========================================================================
107
108    /// Sets the namespace (deprecated in Java, kept for compatibility)
109    pub fn namespace(mut self, namespace: impl Into<CheetahString>) -> Self {
110        self.config.set_namespace(namespace.into());
111        self
112    }
113
114    /// Sets the namespace v2
115    pub fn namespace_v2(mut self, namespace: impl Into<CheetahString>) -> Self {
116        self.config.set_namespace_v2(namespace.into());
117        self
118    }
119
120    // ========================================================================
121    // Access channel
122    // ========================================================================
123
124    /// Sets the access channel (Local or Cloud)
125    pub fn access_channel(mut self, channel: AccessChannel) -> Self {
126        self.config.set_access_channel(channel);
127        self
128    }
129
130    // ========================================================================
131    // Interval configuration
132    // ========================================================================
133
134    /// Sets the poll name server interval in milliseconds
135    pub fn poll_name_server_interval(mut self, millis: u32) -> Self {
136        self.config.set_poll_name_server_interval(millis);
137        self
138    }
139
140    /// Sets the heartbeat broker interval in milliseconds
141    pub fn heartbeat_broker_interval(mut self, millis: u32) -> Self {
142        self.config.set_heartbeat_broker_interval(millis);
143        self
144    }
145
146    /// Sets the persist consumer offset interval in milliseconds
147    pub fn persist_consumer_offset_interval(mut self, millis: u32) -> Self {
148        self.config.set_persist_consumer_offset_interval(millis);
149        self
150    }
151
152    // ========================================================================
153    // Timeout configuration
154    // ========================================================================
155
156    /// Sets the pull time delay in milliseconds when exception occurs
157    pub fn pull_time_delay_millis_when_exception(mut self, millis: u32) -> Self {
158        self.config.set_pull_time_delay_millis_when_exception(millis);
159        self
160    }
161
162    /// Sets the MQ client API timeout in milliseconds
163    pub fn mq_client_api_timeout(mut self, millis: u64) -> Self {
164        self.config.set_mq_client_api_timeout(millis);
165        self
166    }
167
168    /// Sets the detect timeout in milliseconds
169    pub fn detect_timeout(mut self, millis: u32) -> Self {
170        self.config.set_detect_timeout(millis);
171        self
172    }
173
174    /// Sets the detect interval in milliseconds
175    pub fn detect_interval(mut self, millis: u32) -> Self {
176        self.config.set_detect_interval(millis);
177        self
178    }
179
180    // ========================================================================
181    // Boolean flags
182    // ========================================================================
183
184    /// Sets unit mode enabled
185    pub fn enable_unit_mode(mut self, enabled: bool) -> Self {
186        self.config.set_unit_mode(enabled);
187        self
188    }
189
190    /// Sets VIP channel enabled
191    pub fn enable_vip_channel(mut self, enabled: bool) -> Self {
192        self.config.set_vip_channel_enabled(enabled);
193        self
194    }
195
196    /// Sets heartbeat v2 enabled
197    pub fn enable_heartbeat_v2(mut self, enabled: bool) -> Self {
198        self.config.set_use_heartbeat_v2(enabled);
199        self
200    }
201
202    /// Sets concurrent heartbeat enabled
203    pub fn enable_concurrent_heartbeat(mut self, enabled: bool) -> Self {
204        self.config.enable_concurrent_heartbeat = enabled;
205        self
206    }
207
208    /// Sets TLS enabled
209    pub fn enable_tls(mut self, enabled: bool) -> Self {
210        self.config.set_use_tls(enabled);
211        self
212    }
213
214    /// Sets decode read body enabled
215    pub fn enable_decode_read_body(mut self, enabled: bool) -> Self {
216        self.config.set_decode_read_body(enabled);
217        self
218    }
219
220    /// Sets decode decompress body enabled
221    pub fn enable_decode_decompress_body(mut self, enabled: bool) -> Self {
222        self.config.set_decode_decompress_body(enabled);
223        self
224    }
225
226    /// Sets stream request type enabled
227    pub fn enable_stream_request_type(mut self, enabled: bool) -> Self {
228        self.config.set_enable_stream_request_type(enabled);
229        self
230    }
231
232    /// Sets send latency enabled
233    pub fn enable_send_latency(mut self, enabled: bool) -> Self {
234        self.config.set_send_latency_enable(enabled);
235        self
236    }
237
238    /// Sets start detector enabled
239    pub fn enable_start_detector(mut self, enabled: bool) -> Self {
240        self.config.set_start_detector_enable(enabled);
241        self
242    }
243
244    /// Sets heartbeat channel event listener enabled
245    pub fn enable_heartbeat_channel_event_listener(mut self, enabled: bool) -> Self {
246        self.config.set_enable_heartbeat_channel_event_listener(enabled);
247        self
248    }
249
250    /// Sets trace enabled
251    pub fn enable_trace(mut self, enabled: bool) -> Self {
252        self.config.set_enable_trace(enabled);
253        self
254    }
255
256    // ========================================================================
257    // Other configuration
258    // ========================================================================
259
260    /// Sets the unit name
261    pub fn unit_name(mut self, name: impl Into<CheetahString>) -> Self {
262        self.config.set_unit_name(name.into());
263        self
264    }
265
266    /// Sets the SOCKS proxy config
267    pub fn socks_proxy_config(mut self, config: impl Into<CheetahString>) -> Self {
268        self.config.set_socks_proxy_config(config.into());
269        self
270    }
271
272    /// Sets the language code
273    pub fn language(mut self, language: LanguageCode) -> Self {
274        self.config.set_language(language);
275        self
276    }
277
278    /// Sets the trace topic
279    pub fn trace_topic(mut self, topic: impl Into<CheetahString>) -> Self {
280        self.config.set_trace_topic(topic.into());
281        self
282    }
283
284    /// Sets the trace message batch number
285    pub fn trace_msg_batch_num(mut self, num: usize) -> Self {
286        self.config.set_trace_msg_batch_num(num);
287        self
288    }
289
290    /// Sets the max page size in get metadata
291    pub fn max_page_size_in_get_metadata(mut self, size: usize) -> Self {
292        self.config.set_max_page_size_in_get_metadata(size);
293        self
294    }
295
296    // ========================================================================
297    // Build methods
298    // ========================================================================
299
300    /// Builds the ClientConfig with validation
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if any configuration value is invalid.
305    pub fn build(self) -> RocketMQResult<ClientConfig> {
306        // Validate configuration before returning
307        self.validate()?;
308        Ok(self.config)
309    }
310
311    /// Builds the ClientConfig without validation (internal use only)
312    ///
313    /// # Safety
314    ///
315    /// This method skips validation and should only be used when you're
316    /// certain all values are correct.
317    #[doc(hidden)]
318    pub fn build_unvalidated(self) -> ClientConfig {
319        self.config
320    }
321
322    /// Validates the current configuration
323    fn validate(&self) -> RocketMQResult<()> {
324        // Validate intervals
325        ClientConfigValidator::validate_poll_name_server_interval(self.config.poll_name_server_interval)?;
326        ClientConfigValidator::validate_heartbeat_broker_interval(self.config.heartbeat_broker_interval)?;
327        ClientConfigValidator::validate_persist_consumer_offset_interval(self.config.persist_consumer_offset_interval)?;
328
329        // Validate timeouts
330        ClientConfigValidator::validate_mq_client_api_timeout(self.config.mq_client_api_timeout)?;
331
332        // Validate numeric limits
333        ClientConfigValidator::validate_trace_msg_batch_num(self.config.trace_msg_batch_num)?;
334        ClientConfigValidator::validate_max_page_size_in_get_metadata(self.config.max_page_size_in_get_metadata)?;
335
336        // Validate thread pool sizes
337        ClientConfigValidator::validate_client_callback_executor_threads(self.config.client_callback_executor_threads)?;
338
339        // Only validate concurrent heartbeat thread pool size if concurrent heartbeat is enabled
340        if self.config.enable_concurrent_heartbeat {
341            ClientConfigValidator::validate_concurrent_heartbeat_thread_pool_size(
342                self.config.concurrent_heartbeat_thread_pool_size,
343            )?;
344        }
345
346        Ok(())
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    #[test]
355    fn test_builder_basic() {
356        let config = ClientConfig::builder()
357            .namesrv_addr("localhost:9876")
358            .instance_name("test_instance")
359            .build()
360            .unwrap();
361
362        assert_eq!(config.get_namesrv_addr().unwrap().as_str(), "localhost:9876");
363        assert_eq!(config.get_instance_name().as_str(), "test_instance");
364    }
365
366    #[test]
367    fn test_builder_with_all_options() {
368        let config = ClientConfig::builder()
369            .namesrv_addr("localhost:9876")
370            .instance_name("test")
371            .client_ip("127.0.0.1")
372            .poll_name_server_interval(60_000)
373            .heartbeat_broker_interval(30_000)
374            .enable_tls(true)
375            .enable_concurrent_heartbeat(true)
376            .concurrent_heartbeat_thread_pool_size(4)
377            .build()
378            .unwrap();
379
380        assert_eq!(config.get_namesrv_addr().unwrap().as_str(), "localhost:9876");
381        assert_eq!(config.get_instance_name().as_str(), "test");
382        assert_eq!(config.get_client_ip().unwrap().as_str(), "127.0.0.1");
383        assert_eq!(config.poll_name_server_interval, 60_000);
384        assert_eq!(config.heartbeat_broker_interval, 30_000);
385        assert!(config.use_tls);
386        assert!(config.enable_concurrent_heartbeat);
387        assert_eq!(config.concurrent_heartbeat_thread_pool_size, 4);
388    }
389
390    #[test]
391    fn test_builder_validation_invalid_poll_interval() {
392        let result = ClientConfig::builder()
393            .poll_name_server_interval(100) // Too small
394            .build();
395
396        assert!(result.is_err());
397    }
398
399    #[test]
400    fn test_builder_validation_invalid_heartbeat_interval() {
401        let result = ClientConfig::builder()
402            .heartbeat_broker_interval(100) // Too small
403            .build();
404
405        assert!(result.is_err());
406    }
407
408    #[test]
409    fn test_builder_validation_invalid_thread_pool_size() {
410        let result = ClientConfig::builder()
411            .enable_concurrent_heartbeat(true)
412            .concurrent_heartbeat_thread_pool_size(0) // Invalid
413            .build();
414
415        assert!(result.is_err());
416    }
417
418    #[test]
419    fn test_builder_validation_invalid_trace_msg_batch_num() {
420        let result = ClientConfig::builder().trace_msg_batch_num(0).build();
421
422        assert!(result.is_err());
423    }
424
425    #[test]
426    fn test_builder_unvalidated() {
427        // This should not panic even with invalid values
428        let config = ClientConfig::builder()
429            .poll_name_server_interval(100) // Invalid
430            .build_unvalidated();
431
432        assert_eq!(config.poll_name_server_interval, 100);
433    }
434}