rocketmq_client_rust/base/
client_config_builder.rs1use cheetah_string::CheetahString;
21use rocketmq_error::RocketMQResult;
22use rocketmq_remoting::protocol::LanguageCode;
23
24use super::access_channel::AccessChannel;
25use super::client_config::ClientConfig;
26use super::client_config_validation::ClientConfigValidator;
27
28pub struct ClientConfigBuilder {
44 config: ClientConfig,
45}
46
47impl Default for ClientConfigBuilder {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl ClientConfigBuilder {
54 pub fn new() -> Self {
56 Self {
57 config: ClientConfig::new(),
58 }
59 }
60
61 pub fn namesrv_addr(mut self, addr: impl Into<CheetahString>) -> Self {
67 self.config.set_namesrv_addr(addr.into());
68 self
69 }
70
71 pub fn client_ip(mut self, ip: impl Into<CheetahString>) -> Self {
77 self.config.set_client_ip(ip.into());
78 self
79 }
80
81 pub fn instance_name(mut self, name: impl Into<CheetahString>) -> Self {
83 self.config.set_instance_name(name.into());
84 self
85 }
86
87 pub fn client_callback_executor_threads(mut self, threads: usize) -> Self {
94 self.config.set_client_callback_executor_threads(threads);
95 self
96 }
97
98 pub fn concurrent_heartbeat_thread_pool_size(mut self, size: usize) -> Self {
100 self.config.set_concurrent_heartbeat_thread_pool_size(size);
101 self
102 }
103
104 pub fn namespace(mut self, namespace: impl Into<CheetahString>) -> Self {
110 self.config.set_namespace(namespace.into());
111 self
112 }
113
114 pub fn namespace_v2(mut self, namespace: impl Into<CheetahString>) -> Self {
116 self.config.set_namespace_v2(namespace.into());
117 self
118 }
119
120 pub fn access_channel(mut self, channel: AccessChannel) -> Self {
126 self.config.set_access_channel(channel);
127 self
128 }
129
130 pub fn poll_name_server_interval(mut self, millis: u32) -> Self {
136 self.config.set_poll_name_server_interval(millis);
137 self
138 }
139
140 pub fn heartbeat_broker_interval(mut self, millis: u32) -> Self {
142 self.config.set_heartbeat_broker_interval(millis);
143 self
144 }
145
146 pub fn persist_consumer_offset_interval(mut self, millis: u32) -> Self {
148 self.config.set_persist_consumer_offset_interval(millis);
149 self
150 }
151
152 pub fn pull_time_delay_millis_when_exception(mut self, millis: u32) -> Self {
158 self.config.set_pull_time_delay_millis_when_exception(millis);
159 self
160 }
161
162 pub fn mq_client_api_timeout(mut self, millis: u64) -> Self {
164 self.config.set_mq_client_api_timeout(millis);
165 self
166 }
167
168 pub fn detect_timeout(mut self, millis: u32) -> Self {
170 self.config.set_detect_timeout(millis);
171 self
172 }
173
174 pub fn detect_interval(mut self, millis: u32) -> Self {
176 self.config.set_detect_interval(millis);
177 self
178 }
179
180 pub fn enable_unit_mode(mut self, enabled: bool) -> Self {
186 self.config.set_unit_mode(enabled);
187 self
188 }
189
190 pub fn enable_vip_channel(mut self, enabled: bool) -> Self {
192 self.config.set_vip_channel_enabled(enabled);
193 self
194 }
195
196 pub fn enable_heartbeat_v2(mut self, enabled: bool) -> Self {
198 self.config.set_use_heartbeat_v2(enabled);
199 self
200 }
201
202 pub fn enable_concurrent_heartbeat(mut self, enabled: bool) -> Self {
204 self.config.enable_concurrent_heartbeat = enabled;
205 self
206 }
207
208 pub fn enable_tls(mut self, enabled: bool) -> Self {
210 self.config.set_use_tls(enabled);
211 self
212 }
213
214 pub fn enable_decode_read_body(mut self, enabled: bool) -> Self {
216 self.config.set_decode_read_body(enabled);
217 self
218 }
219
220 pub fn enable_decode_decompress_body(mut self, enabled: bool) -> Self {
222 self.config.set_decode_decompress_body(enabled);
223 self
224 }
225
226 pub fn enable_stream_request_type(mut self, enabled: bool) -> Self {
228 self.config.set_enable_stream_request_type(enabled);
229 self
230 }
231
232 pub fn enable_send_latency(mut self, enabled: bool) -> Self {
234 self.config.set_send_latency_enable(enabled);
235 self
236 }
237
238 pub fn enable_start_detector(mut self, enabled: bool) -> Self {
240 self.config.set_start_detector_enable(enabled);
241 self
242 }
243
244 pub fn enable_heartbeat_channel_event_listener(mut self, enabled: bool) -> Self {
246 self.config.set_enable_heartbeat_channel_event_listener(enabled);
247 self
248 }
249
250 pub fn enable_trace(mut self, enabled: bool) -> Self {
252 self.config.set_enable_trace(enabled);
253 self
254 }
255
256 pub fn unit_name(mut self, name: impl Into<CheetahString>) -> Self {
262 self.config.set_unit_name(name.into());
263 self
264 }
265
266 pub fn socks_proxy_config(mut self, config: impl Into<CheetahString>) -> Self {
268 self.config.set_socks_proxy_config(config.into());
269 self
270 }
271
272 pub fn language(mut self, language: LanguageCode) -> Self {
274 self.config.set_language(language);
275 self
276 }
277
278 pub fn trace_topic(mut self, topic: impl Into<CheetahString>) -> Self {
280 self.config.set_trace_topic(topic.into());
281 self
282 }
283
284 pub fn trace_msg_batch_num(mut self, num: usize) -> Self {
286 self.config.set_trace_msg_batch_num(num);
287 self
288 }
289
290 pub fn max_page_size_in_get_metadata(mut self, size: usize) -> Self {
292 self.config.set_max_page_size_in_get_metadata(size);
293 self
294 }
295
296 pub fn build(self) -> RocketMQResult<ClientConfig> {
306 self.validate()?;
308 Ok(self.config)
309 }
310
311 #[doc(hidden)]
318 pub fn build_unvalidated(self) -> ClientConfig {
319 self.config
320 }
321
322 fn validate(&self) -> RocketMQResult<()> {
324 ClientConfigValidator::validate_poll_name_server_interval(self.config.poll_name_server_interval)?;
326 ClientConfigValidator::validate_heartbeat_broker_interval(self.config.heartbeat_broker_interval)?;
327 ClientConfigValidator::validate_persist_consumer_offset_interval(self.config.persist_consumer_offset_interval)?;
328
329 ClientConfigValidator::validate_mq_client_api_timeout(self.config.mq_client_api_timeout)?;
331
332 ClientConfigValidator::validate_trace_msg_batch_num(self.config.trace_msg_batch_num)?;
334 ClientConfigValidator::validate_max_page_size_in_get_metadata(self.config.max_page_size_in_get_metadata)?;
335
336 ClientConfigValidator::validate_client_callback_executor_threads(self.config.client_callback_executor_threads)?;
338
339 if self.config.enable_concurrent_heartbeat {
341 ClientConfigValidator::validate_concurrent_heartbeat_thread_pool_size(
342 self.config.concurrent_heartbeat_thread_pool_size,
343 )?;
344 }
345
346 Ok(())
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn test_builder_basic() {
356 let config = ClientConfig::builder()
357 .namesrv_addr("localhost:9876")
358 .instance_name("test_instance")
359 .build()
360 .unwrap();
361
362 assert_eq!(config.get_namesrv_addr().unwrap().as_str(), "localhost:9876");
363 assert_eq!(config.get_instance_name().as_str(), "test_instance");
364 }
365
366 #[test]
367 fn test_builder_with_all_options() {
368 let config = ClientConfig::builder()
369 .namesrv_addr("localhost:9876")
370 .instance_name("test")
371 .client_ip("127.0.0.1")
372 .poll_name_server_interval(60_000)
373 .heartbeat_broker_interval(30_000)
374 .enable_tls(true)
375 .enable_concurrent_heartbeat(true)
376 .concurrent_heartbeat_thread_pool_size(4)
377 .build()
378 .unwrap();
379
380 assert_eq!(config.get_namesrv_addr().unwrap().as_str(), "localhost:9876");
381 assert_eq!(config.get_instance_name().as_str(), "test");
382 assert_eq!(config.get_client_ip().unwrap().as_str(), "127.0.0.1");
383 assert_eq!(config.poll_name_server_interval, 60_000);
384 assert_eq!(config.heartbeat_broker_interval, 30_000);
385 assert!(config.use_tls);
386 assert!(config.enable_concurrent_heartbeat);
387 assert_eq!(config.concurrent_heartbeat_thread_pool_size, 4);
388 }
389
390 #[test]
391 fn test_builder_validation_invalid_poll_interval() {
392 let result = ClientConfig::builder()
393 .poll_name_server_interval(100) .build();
395
396 assert!(result.is_err());
397 }
398
399 #[test]
400 fn test_builder_validation_invalid_heartbeat_interval() {
401 let result = ClientConfig::builder()
402 .heartbeat_broker_interval(100) .build();
404
405 assert!(result.is_err());
406 }
407
408 #[test]
409 fn test_builder_validation_invalid_thread_pool_size() {
410 let result = ClientConfig::builder()
411 .enable_concurrent_heartbeat(true)
412 .concurrent_heartbeat_thread_pool_size(0) .build();
414
415 assert!(result.is_err());
416 }
417
418 #[test]
419 fn test_builder_validation_invalid_trace_msg_batch_num() {
420 let result = ClientConfig::builder().trace_msg_batch_num(0).build();
421
422 assert!(result.is_err());
423 }
424
425 #[test]
426 fn test_builder_unvalidated() {
427 let config = ClientConfig::builder()
429 .poll_name_server_interval(100) .build_unvalidated();
431
432 assert_eq!(config.poll_name_server_interval, 100);
433 }
434}