Skip to main content

rocketmq_client_rust/base/
client_config.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use std::time::Duration;
20
21use cheetah_string::CheetahString;
22use rocketmq_common::common::message::message_queue::MessageQueue;
23use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils;
24use rocketmq_common::utils::name_server_address_utils::NAMESRV_ENDPOINT_PATTERN;
25use rocketmq_common::utils::network_util::NetworkUtil;
26use rocketmq_common::utils::string_utils::StringUtils;
27use rocketmq_common::TimeUtils::current_nano;
28use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
29use rocketmq_remoting::protocol::request_type::RequestType;
30use rocketmq_remoting::protocol::LanguageCode;
31
32use crate::base::access_channel::AccessChannel;
33
34#[derive(Clone)]
35pub struct ClientConfig {
36    pub namesrv_addr: Option<CheetahString>,
37    pub client_ip: Option<CheetahString>,
38    pub instance_name: CheetahString,
39    pub client_callback_executor_threads: usize,
40    pub namespace: Option<CheetahString>,
41    pub namespace_initialized: Arc<AtomicBool>,
42    pub namespace_v2: Option<CheetahString>,
43    pub access_channel: AccessChannel,
44    pub poll_name_server_interval: u32,
45    pub heartbeat_broker_interval: u32,
46    pub persist_consumer_offset_interval: u32,
47    pub pull_time_delay_millis_when_exception: u32,
48    pub unit_mode: bool,
49    pub unit_name: Option<CheetahString>,
50    pub decode_read_body: bool,
51    pub decode_decompress_body: bool,
52    pub vip_channel_enabled: bool,
53    pub use_heartbeat_v2: bool,
54    pub enable_concurrent_heartbeat: bool,
55    pub use_tls: bool,
56    pub socks_proxy_config: CheetahString,
57    pub mq_client_api_timeout: u64,
58    pub detect_timeout: u32,
59    pub detect_interval: u32,
60    pub language: LanguageCode,
61    pub enable_stream_request_type: bool,
62    pub send_latency_enable: bool,
63    pub start_detector_enable: bool,
64    pub enable_heartbeat_channel_event_listener: bool,
65    pub enable_trace: bool,
66    pub trace_topic: Option<CheetahString>,
67    pub trace_msg_batch_num: usize,
68    pub max_page_size_in_get_metadata: usize,
69    /// Thread pool size for concurrent heartbeat operations.
70    /// Only effective when enable_concurrent_heartbeat is true.
71    /// Default: number of CPU cores (matches Java: Runtime.getRuntime().availableProcessors())
72    // java client has this option, but we keep it for compatibility and future will remove it if it's not needed
73    pub concurrent_heartbeat_thread_pool_size: usize,
74}
75
76impl Default for ClientConfig {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl ClientConfig {
83    pub const SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY: &'static str = "com.rocketmq.sendMessageWithVIPChannel";
84    pub const SOCKS_PROXY_CONFIG: &'static str = "com.rocketmq.socks.proxy.config";
85    pub const DECODE_READ_BODY: &'static str = "com.rocketmq.read.body";
86    pub const DECODE_DECOMPRESS_BODY: &'static str = "com.rocketmq.decompress.body";
87    pub const SEND_LATENCY_ENABLE: &'static str = "com.rocketmq.sendLatencyEnable";
88    pub const START_DETECTOR_ENABLE: &'static str = "com.rocketmq.startDetectorEnable";
89    pub const HEART_BEAT_V2: &'static str = "com.rocketmq.heartbeat.v2";
90    pub const ENABLE_CONCURRENT_HEARTBEAT: &'static str = "com.rocketmq.enableConcurrentHeartbeat";
91
92    pub fn new() -> Self {
93        ClientConfig {
94            namesrv_addr: NameServerAddressUtils::get_name_server_addresses().map(|addr| addr.into()),
95            client_ip: NetworkUtil::get_local_address().map(|addr| addr.into()),
96            instance_name: env::var("rocketmq.client.name")
97                .unwrap_or_else(|_| "DEFAULT".to_string())
98                .into(),
99            client_callback_executor_threads: num_cpus::get(),
100            namespace: None,
101            namespace_initialized: Arc::new(AtomicBool::new(false)),
102            namespace_v2: None,
103            access_channel: AccessChannel::Local,
104            poll_name_server_interval: Duration::from_secs(30).as_millis() as u32,
105            heartbeat_broker_interval: Duration::from_secs(30).as_millis() as u32,
106            persist_consumer_offset_interval: Duration::from_secs(5).as_millis() as u32,
107            pull_time_delay_millis_when_exception: 1000,
108            unit_mode: false,
109            unit_name: None,
110            decode_read_body: env::var(Self::DECODE_READ_BODY)
111                .unwrap_or_else(|_| "true".to_string())
112                .parse::<bool>()
113                .unwrap_or(true),
114            decode_decompress_body: env::var(Self::DECODE_DECOMPRESS_BODY)
115                .unwrap_or_else(|_| "true".to_string())
116                .parse::<bool>()
117                .unwrap_or(true),
118            vip_channel_enabled: env::var(Self::SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY)
119                .unwrap_or_else(|_| "false".to_string())
120                .parse::<bool>()
121                .unwrap_or(false),
122            use_heartbeat_v2: env::var(Self::HEART_BEAT_V2)
123                .unwrap_or_else(|_| "false".to_string())
124                .parse::<bool>()
125                .unwrap_or(false),
126            enable_concurrent_heartbeat: env::var(Self::ENABLE_CONCURRENT_HEARTBEAT)
127                .unwrap_or_else(|_| "false".to_string())
128                .parse::<bool>()
129                .unwrap_or(false),
130            use_tls: false,
131            socks_proxy_config: env::var(Self::SOCKS_PROXY_CONFIG)
132                .unwrap_or_else(|_| "{}".to_string())
133                .into(),
134            mq_client_api_timeout: Duration::from_secs(3).as_millis() as u64,
135            detect_timeout: 200,
136            detect_interval: Duration::from_secs(2).as_millis() as u32,
137            language: LanguageCode::RUST,
138            enable_stream_request_type: false,
139            send_latency_enable: env::var(Self::SEND_LATENCY_ENABLE)
140                .unwrap_or_else(|_| "false".to_string())
141                .parse::<bool>()
142                .unwrap_or(false),
143            start_detector_enable: env::var(Self::START_DETECTOR_ENABLE)
144                .unwrap_or_else(|_| "false".to_string())
145                .parse::<bool>()
146                .unwrap_or(false),
147            enable_heartbeat_channel_event_listener: true,
148            enable_trace: false,
149            trace_topic: None,
150            trace_msg_batch_num: 10,
151            max_page_size_in_get_metadata: 2000,
152            concurrent_heartbeat_thread_pool_size: num_cpus::get(),
153        }
154    }
155}
156
157impl ClientConfig {
158    #[inline]
159    pub fn with_namespace(&mut self, resource: impl Into<CheetahString>) -> CheetahString {
160        let resource = resource.into();
161        let namespace = self.get_namespace().unwrap_or_default();
162
163        // Fast path: no namespace needed, return resource directly
164        if namespace.is_empty() {
165            return resource;
166        }
167
168        // Fast path: resource already has namespace, return directly
169        if NamespaceUtil::is_already_with_namespace(resource.as_str(), namespace.as_str()) {
170            return resource;
171        }
172
173        NamespaceUtil::wrap_namespace(namespace, resource)
174    }
175
176    #[inline]
177    pub fn queue_with_namespace(&mut self, mut queue: MessageQueue) -> MessageQueue {
178        if let Some(namespace) = self.get_namespace() {
179            if !namespace.is_empty() {
180                let topic = NamespaceUtil::wrap_namespace(namespace.as_str(), queue.topic_str());
181                queue.set_topic(topic);
182                return queue;
183            }
184        }
185        queue
186    }
187
188    #[inline]
189    pub fn get_namespace(&mut self) -> Option<CheetahString> {
190        let namespace_initialized = self.namespace_initialized.load(Ordering::Acquire);
191        if namespace_initialized {
192            return self.namespace.clone();
193        }
194
195        if let Some(ref namespace) = self.namespace {
196            return Some(namespace.clone());
197        }
198
199        if let Some(ref namesrv_addr) = self.namesrv_addr {
200            if NameServerAddressUtils::validate_instance_endpoint(namesrv_addr.as_ref()) {
201                self.namespace =
202                    NameServerAddressUtils::parse_instance_id_from_endpoint(namesrv_addr.as_ref()).map(|id| id.into());
203            }
204        }
205        self.namespace_initialized.store(true, Ordering::Release);
206        self.namespace.clone()
207    }
208
209    #[inline]
210    pub fn change_instance_name_to_pid(&mut self) {
211        if self.instance_name == "DEFAULT" {
212            self.instance_name = format!("{}#{}", std::process::id(), current_nano()).into();
213        }
214    }
215    #[inline]
216    pub fn set_instance_name(&mut self, instance_name: CheetahString) {
217        self.instance_name = instance_name;
218    }
219    #[inline]
220    pub fn set_namesrv_addr(&mut self, namesrv_addr: CheetahString) {
221        self.namesrv_addr = Some(namesrv_addr);
222        self.namespace_initialized.store(false, Ordering::Release);
223    }
224
225    #[inline]
226    pub fn build_mq_client_id(&self) -> String {
227        // Pre-allocate capacity to avoid reallocations
228        let estimated_capacity = self.client_ip.as_ref().map(|ip| ip.len()).unwrap_or(15)
229            + self.instance_name.len()
230            + self.unit_name.as_ref().map(|un| un.len() + 1).unwrap_or(0)
231            + if self.enable_stream_request_type { 8 } else { 0 }
232            + 3; // For '@' separators
233
234        let mut sb = String::with_capacity(estimated_capacity);
235        if let Some(ref client_ip) = self.client_ip {
236            sb.push_str(client_ip.as_str());
237        }
238
239        sb.push('@');
240        sb.push_str(self.instance_name.as_str());
241        if let Some(ref unit_name) = self.unit_name {
242            if !unit_name.is_empty() {
243                sb.push('@');
244                sb.push_str(unit_name.as_str());
245            }
246        }
247
248        if self.enable_stream_request_type {
249            sb.push('@');
250            sb.push_str(RequestType::Stream.to_string().as_str());
251        }
252
253        sb
254    }
255
256    #[inline]
257    pub fn get_namesrv_addr(&self) -> Option<CheetahString> {
258        if StringUtils::is_not_empty_str(self.namesrv_addr.as_deref())
259            && NAMESRV_ENDPOINT_PATTERN.is_match(self.namesrv_addr.as_ref().unwrap().as_str())
260        {
261            NameServerAddressUtils::get_name_srv_addr_from_namesrv_endpoint(
262                self.namesrv_addr.as_ref().unwrap().as_str(),
263            )
264            .map(|addr| addr.into())
265        } else {
266            self.namesrv_addr.clone()
267        }
268    }
269
270    // ============ Comprehensive Getters and Setters ============
271
272    #[inline]
273    pub fn get_client_ip(&self) -> Option<&CheetahString> {
274        self.client_ip.as_ref()
275    }
276
277    #[inline]
278    pub fn set_client_ip(&mut self, client_ip: CheetahString) {
279        self.client_ip = Some(client_ip);
280    }
281
282    #[inline]
283    pub fn get_instance_name(&self) -> &CheetahString {
284        &self.instance_name
285    }
286
287    #[inline]
288    pub fn get_client_callback_executor_threads(&self) -> usize {
289        self.client_callback_executor_threads
290    }
291
292    #[inline]
293    pub fn set_client_callback_executor_threads(&mut self, threads: usize) {
294        self.client_callback_executor_threads = threads;
295    }
296
297    #[inline]
298    pub fn get_namespace_v2(&self) -> Option<&CheetahString> {
299        self.namespace_v2.as_ref()
300    }
301
302    #[inline]
303    pub fn set_namespace_v2(&mut self, namespace_v2: CheetahString) {
304        self.namespace_v2 = Some(namespace_v2);
305    }
306
307    #[inline]
308    pub fn set_namespace(&mut self, namespace: CheetahString) {
309        self.namespace = Some(namespace);
310        self.namespace_initialized.store(true, Ordering::Release);
311    }
312
313    #[inline]
314    pub fn get_access_channel(&self) -> AccessChannel {
315        self.access_channel
316    }
317
318    #[inline]
319    pub fn set_access_channel(&mut self, access_channel: AccessChannel) {
320        self.access_channel = access_channel;
321    }
322
323    #[inline]
324    pub fn get_poll_name_server_interval(&self) -> u32 {
325        self.poll_name_server_interval
326    }
327
328    #[inline]
329    pub fn set_poll_name_server_interval(&mut self, interval: u32) {
330        self.poll_name_server_interval = interval;
331    }
332
333    #[inline]
334    pub fn get_heartbeat_broker_interval(&self) -> u32 {
335        self.heartbeat_broker_interval
336    }
337
338    #[inline]
339    pub fn set_heartbeat_broker_interval(&mut self, interval: u32) {
340        self.heartbeat_broker_interval = interval;
341    }
342
343    #[inline]
344    pub fn get_persist_consumer_offset_interval(&self) -> u32 {
345        self.persist_consumer_offset_interval
346    }
347
348    #[inline]
349    pub fn set_persist_consumer_offset_interval(&mut self, interval: u32) {
350        self.persist_consumer_offset_interval = interval;
351    }
352
353    #[inline]
354    pub fn get_pull_time_delay_millis_when_exception(&self) -> u32 {
355        self.pull_time_delay_millis_when_exception
356    }
357
358    #[inline]
359    pub fn set_pull_time_delay_millis_when_exception(&mut self, delay: u32) {
360        self.pull_time_delay_millis_when_exception = delay;
361    }
362
363    #[inline]
364    pub fn get_unit_name(&self) -> Option<&CheetahString> {
365        self.unit_name.as_ref()
366    }
367
368    #[inline]
369    pub fn set_unit_name(&mut self, unit_name: CheetahString) {
370        self.unit_name = Some(unit_name);
371    }
372
373    #[inline]
374    pub fn is_unit_mode(&self) -> bool {
375        self.unit_mode
376    }
377
378    #[inline]
379    pub fn set_unit_mode(&mut self, unit_mode: bool) {
380        self.unit_mode = unit_mode;
381    }
382
383    #[inline]
384    pub fn is_decode_read_body(&self) -> bool {
385        self.decode_read_body
386    }
387
388    #[inline]
389    pub fn set_decode_read_body(&mut self, decode_read_body: bool) {
390        self.decode_read_body = decode_read_body;
391    }
392
393    #[inline]
394    pub fn is_decode_decompress_body(&self) -> bool {
395        self.decode_decompress_body
396    }
397
398    #[inline]
399    pub fn set_decode_decompress_body(&mut self, decode_decompress_body: bool) {
400        self.decode_decompress_body = decode_decompress_body;
401    }
402
403    #[inline]
404    pub fn is_vip_channel_enabled(&self) -> bool {
405        self.vip_channel_enabled
406    }
407
408    #[inline]
409    pub fn set_vip_channel_enabled(&mut self, enabled: bool) {
410        self.vip_channel_enabled = enabled;
411    }
412
413    #[inline]
414    pub fn is_use_heartbeat_v2(&self) -> bool {
415        self.use_heartbeat_v2
416    }
417
418    #[inline]
419    pub fn set_use_heartbeat_v2(&mut self, use_heartbeat_v2: bool) {
420        self.use_heartbeat_v2 = use_heartbeat_v2;
421    }
422
423    #[inline]
424    pub fn is_use_tls(&self) -> bool {
425        self.use_tls
426    }
427
428    #[inline]
429    pub fn set_use_tls(&mut self, use_tls: bool) {
430        self.use_tls = use_tls;
431    }
432
433    #[inline]
434    pub fn get_socks_proxy_config(&self) -> &CheetahString {
435        &self.socks_proxy_config
436    }
437
438    #[inline]
439    pub fn set_socks_proxy_config(&mut self, config: CheetahString) {
440        self.socks_proxy_config = config;
441    }
442
443    #[inline]
444    pub fn get_language(&self) -> LanguageCode {
445        self.language
446    }
447
448    #[inline]
449    pub fn set_language(&mut self, language: LanguageCode) {
450        self.language = language;
451    }
452
453    #[inline]
454    pub fn get_mq_client_api_timeout(&self) -> u64 {
455        self.mq_client_api_timeout
456    }
457
458    #[inline]
459    pub fn set_mq_client_api_timeout(&mut self, timeout: u64) {
460        self.mq_client_api_timeout = timeout;
461    }
462
463    #[inline]
464    pub fn get_detect_timeout(&self) -> u32 {
465        self.detect_timeout
466    }
467
468    #[inline]
469    pub fn set_detect_timeout(&mut self, timeout: u32) {
470        self.detect_timeout = timeout;
471    }
472
473    #[inline]
474    pub fn get_detect_interval(&self) -> u32 {
475        self.detect_interval
476    }
477
478    #[inline]
479    pub fn set_detect_interval(&mut self, interval: u32) {
480        self.detect_interval = interval;
481    }
482
483    #[inline]
484    pub fn is_enable_stream_request_type(&self) -> bool {
485        self.enable_stream_request_type
486    }
487
488    #[inline]
489    pub fn set_enable_stream_request_type(&mut self, enabled: bool) {
490        self.enable_stream_request_type = enabled;
491    }
492
493    #[inline]
494    pub fn is_send_latency_enable(&self) -> bool {
495        self.send_latency_enable
496    }
497
498    #[inline]
499    pub fn set_send_latency_enable(&mut self, enabled: bool) {
500        self.send_latency_enable = enabled;
501    }
502
503    #[inline]
504    pub fn is_start_detector_enable(&self) -> bool {
505        self.start_detector_enable
506    }
507
508    #[inline]
509    pub fn set_start_detector_enable(&mut self, enabled: bool) {
510        self.start_detector_enable = enabled;
511    }
512
513    #[inline]
514    pub fn is_enable_heartbeat_channel_event_listener(&self) -> bool {
515        self.enable_heartbeat_channel_event_listener
516    }
517
518    #[inline]
519    pub fn set_enable_heartbeat_channel_event_listener(&mut self, enabled: bool) {
520        self.enable_heartbeat_channel_event_listener = enabled;
521    }
522
523    #[inline]
524    pub fn is_enable_trace(&self) -> bool {
525        self.enable_trace
526    }
527
528    #[inline]
529    pub fn set_enable_trace(&mut self, enabled: bool) {
530        self.enable_trace = enabled;
531    }
532
533    #[inline]
534    pub fn get_trace_topic(&self) -> Option<&CheetahString> {
535        self.trace_topic.as_ref()
536    }
537
538    #[inline]
539    pub fn set_trace_topic(&mut self, topic: CheetahString) {
540        self.trace_topic = Some(topic);
541    }
542
543    #[inline]
544    pub fn get_trace_msg_batch_num(&self) -> usize {
545        self.trace_msg_batch_num
546    }
547
548    #[inline]
549    pub fn set_trace_msg_batch_num(&mut self, num: usize) {
550        self.trace_msg_batch_num = num;
551    }
552
553    #[inline]
554    pub fn get_max_page_size_in_get_metadata(&self) -> usize {
555        self.max_page_size_in_get_metadata
556    }
557
558    #[inline]
559    pub fn set_max_page_size_in_get_metadata(&mut self, size: usize) {
560        self.max_page_size_in_get_metadata = size;
561    }
562
563    #[inline]
564    pub fn get_concurrent_heartbeat_thread_pool_size(&self) -> usize {
565        self.concurrent_heartbeat_thread_pool_size
566    }
567
568    #[inline]
569    pub fn set_concurrent_heartbeat_thread_pool_size(&mut self, size: usize) {
570        self.concurrent_heartbeat_thread_pool_size = size;
571    }
572
573    // ============ Utility Methods ============
574
575    /// Clones the configuration
576    #[inline]
577    pub fn clone_client_config(&self) -> Self {
578        self.clone()
579    }
580
581    /// Resets client config from another instance
582    pub fn reset_client_config(&mut self, other: &ClientConfig) {
583        self.namesrv_addr = other.namesrv_addr.clone();
584        self.client_ip = other.client_ip.clone();
585        self.instance_name = other.instance_name.clone();
586        self.client_callback_executor_threads = other.client_callback_executor_threads;
587        self.namespace = other.namespace.clone();
588        self.namespace_v2 = other.namespace_v2.clone();
589        self.access_channel = other.access_channel;
590        self.poll_name_server_interval = other.poll_name_server_interval;
591        self.heartbeat_broker_interval = other.heartbeat_broker_interval;
592        self.persist_consumer_offset_interval = other.persist_consumer_offset_interval;
593        self.pull_time_delay_millis_when_exception = other.pull_time_delay_millis_when_exception;
594        self.unit_mode = other.unit_mode;
595        self.unit_name = other.unit_name.clone();
596        self.decode_read_body = other.decode_read_body;
597        self.decode_decompress_body = other.decode_decompress_body;
598        self.vip_channel_enabled = other.vip_channel_enabled;
599        self.use_heartbeat_v2 = other.use_heartbeat_v2;
600        self.use_tls = other.use_tls;
601        self.socks_proxy_config = other.socks_proxy_config.clone();
602        self.language = other.language;
603        self.mq_client_api_timeout = other.mq_client_api_timeout;
604        self.detect_timeout = other.detect_timeout;
605        self.detect_interval = other.detect_interval;
606        self.enable_stream_request_type = other.enable_stream_request_type;
607        self.send_latency_enable = other.send_latency_enable;
608        self.start_detector_enable = other.start_detector_enable;
609        self.enable_heartbeat_channel_event_listener = other.enable_heartbeat_channel_event_listener;
610        self.enable_trace = other.enable_trace;
611        self.trace_topic = other.trace_topic.clone();
612        self.trace_msg_batch_num = other.trace_msg_batch_num;
613        self.max_page_size_in_get_metadata = other.max_page_size_in_get_metadata;
614        self.concurrent_heartbeat_thread_pool_size = other.concurrent_heartbeat_thread_pool_size;
615    }
616
617    /// Deprecated: Use with_namespace instead
618    #[inline]
619    #[deprecated(note = "Use with_namespace for namespace wrapping")]
620    pub fn without_namespace(&mut self, resource: &str) -> CheetahString {
621        if let Some(namespace) = self.get_namespace().as_deref() {
622            NamespaceUtil::without_namespace_with_namespace(resource, namespace).into()
623        } else {
624            NamespaceUtil::without_namespace(resource).into()
625        }
626    }
627
628    /// Creates a new builder for constructing a ClientConfig with a fluent API
629    ///
630    /// # Example
631    ///
632    /// ```rust
633    /// use rocketmq_client_rust::base::client_config::ClientConfig;
634    ///
635    /// let config = ClientConfig::builder()
636    ///     .namesrv_addr("localhost:9876")
637    ///     .instance_name("my_producer")
638    ///     .enable_tls(true)
639    ///     .build()
640    ///     .unwrap();
641    /// ```
642    #[inline]
643    pub fn builder() -> crate::base::client_config_builder::ClientConfigBuilder {
644        crate::base::client_config_builder::ClientConfigBuilder::new()
645    }
646}
647
648impl std::fmt::Display for ClientConfig {
649    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
650        write!(
651            f,
652            "ClientConfig {{ namesrv_addr: {:?}, client_ip: {:?}, instance_name: {}, \
653             client_callback_executor_threads: {}, namespace: {:?}, namespace_v2: {:?}, access_channel: {:?}, \
654             poll_name_server_interval: {}, heartbeat_broker_interval: {}, persist_consumer_offset_interval: {}, \
655             pull_time_delay_millis_when_exception: {}, unit_mode: {}, unit_name: {:?}, decode_read_body: {}, \
656             decode_decompress_body: {}, vip_channel_enabled: {}, use_heartbeat_v2: {}, use_tls: {}, \
657             socks_proxy_config: {}, mq_client_api_timeout: {}, detect_timeout: {}, detect_interval: {}, language: \
658             {:?}, enable_stream_request_type: {}, send_latency_enable: {}, start_detector_enable: {}, \
659             enable_heartbeat_channel_event_listener: {}, enable_trace: {}, trace_topic: {:?}, trace_msg_batch_num: \
660             {}, max_page_size_in_get_metadata: {}, enable_concurrent_heartbeat: {}, \
661             concurrent_heartbeat_thread_pool_size: {} }}",
662            self.namesrv_addr,
663            self.client_ip,
664            self.instance_name,
665            self.client_callback_executor_threads,
666            self.namespace,
667            self.namespace_v2,
668            self.access_channel,
669            self.poll_name_server_interval,
670            self.heartbeat_broker_interval,
671            self.persist_consumer_offset_interval,
672            self.pull_time_delay_millis_when_exception,
673            self.unit_mode,
674            self.unit_name,
675            self.decode_read_body,
676            self.decode_decompress_body,
677            self.vip_channel_enabled,
678            self.use_heartbeat_v2,
679            self.use_tls,
680            self.socks_proxy_config,
681            self.mq_client_api_timeout,
682            self.detect_timeout,
683            self.detect_interval,
684            self.language,
685            self.enable_stream_request_type,
686            self.send_latency_enable,
687            self.start_detector_enable,
688            self.enable_heartbeat_channel_event_listener,
689            self.enable_trace,
690            self.trace_topic,
691            self.trace_msg_batch_num,
692            self.max_page_size_in_get_metadata,
693            self.enable_concurrent_heartbeat,
694            self.concurrent_heartbeat_thread_pool_size
695        )
696    }
697}