1use std::env;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use std::time::Duration;
20
21use cheetah_string::CheetahString;
22use rocketmq_common::common::message::message_queue::MessageQueue;
23use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils;
24use rocketmq_common::utils::name_server_address_utils::NAMESRV_ENDPOINT_PATTERN;
25use rocketmq_common::utils::network_util::NetworkUtil;
26use rocketmq_common::utils::string_utils::StringUtils;
27use rocketmq_common::TimeUtils::current_nano;
28use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
29use rocketmq_remoting::protocol::request_type::RequestType;
30use rocketmq_remoting::protocol::LanguageCode;
31
32use crate::base::access_channel::AccessChannel;
33
34#[derive(Clone)]
35pub struct ClientConfig {
36 pub namesrv_addr: Option<CheetahString>,
37 pub client_ip: Option<CheetahString>,
38 pub instance_name: CheetahString,
39 pub client_callback_executor_threads: usize,
40 pub namespace: Option<CheetahString>,
41 pub namespace_initialized: Arc<AtomicBool>,
42 pub namespace_v2: Option<CheetahString>,
43 pub access_channel: AccessChannel,
44 pub poll_name_server_interval: u32,
45 pub heartbeat_broker_interval: u32,
46 pub persist_consumer_offset_interval: u32,
47 pub pull_time_delay_millis_when_exception: u32,
48 pub unit_mode: bool,
49 pub unit_name: Option<CheetahString>,
50 pub decode_read_body: bool,
51 pub decode_decompress_body: bool,
52 pub vip_channel_enabled: bool,
53 pub use_heartbeat_v2: bool,
54 pub enable_concurrent_heartbeat: bool,
55 pub use_tls: bool,
56 pub socks_proxy_config: CheetahString,
57 pub mq_client_api_timeout: u64,
58 pub detect_timeout: u32,
59 pub detect_interval: u32,
60 pub language: LanguageCode,
61 pub enable_stream_request_type: bool,
62 pub send_latency_enable: bool,
63 pub start_detector_enable: bool,
64 pub enable_heartbeat_channel_event_listener: bool,
65 pub enable_trace: bool,
66 pub trace_topic: Option<CheetahString>,
67 pub trace_msg_batch_num: usize,
68 pub max_page_size_in_get_metadata: usize,
69 pub concurrent_heartbeat_thread_pool_size: usize,
74}
75
76impl Default for ClientConfig {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl ClientConfig {
83 pub const SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY: &'static str = "com.rocketmq.sendMessageWithVIPChannel";
84 pub const SOCKS_PROXY_CONFIG: &'static str = "com.rocketmq.socks.proxy.config";
85 pub const DECODE_READ_BODY: &'static str = "com.rocketmq.read.body";
86 pub const DECODE_DECOMPRESS_BODY: &'static str = "com.rocketmq.decompress.body";
87 pub const SEND_LATENCY_ENABLE: &'static str = "com.rocketmq.sendLatencyEnable";
88 pub const START_DETECTOR_ENABLE: &'static str = "com.rocketmq.startDetectorEnable";
89 pub const HEART_BEAT_V2: &'static str = "com.rocketmq.heartbeat.v2";
90 pub const ENABLE_CONCURRENT_HEARTBEAT: &'static str = "com.rocketmq.enableConcurrentHeartbeat";
91
92 pub fn new() -> Self {
93 ClientConfig {
94 namesrv_addr: NameServerAddressUtils::get_name_server_addresses().map(|addr| addr.into()),
95 client_ip: NetworkUtil::get_local_address().map(|addr| addr.into()),
96 instance_name: env::var("rocketmq.client.name")
97 .unwrap_or_else(|_| "DEFAULT".to_string())
98 .into(),
99 client_callback_executor_threads: num_cpus::get(),
100 namespace: None,
101 namespace_initialized: Arc::new(AtomicBool::new(false)),
102 namespace_v2: None,
103 access_channel: AccessChannel::Local,
104 poll_name_server_interval: Duration::from_secs(30).as_millis() as u32,
105 heartbeat_broker_interval: Duration::from_secs(30).as_millis() as u32,
106 persist_consumer_offset_interval: Duration::from_secs(5).as_millis() as u32,
107 pull_time_delay_millis_when_exception: 1000,
108 unit_mode: false,
109 unit_name: None,
110 decode_read_body: env::var(Self::DECODE_READ_BODY)
111 .unwrap_or_else(|_| "true".to_string())
112 .parse::<bool>()
113 .unwrap_or(true),
114 decode_decompress_body: env::var(Self::DECODE_DECOMPRESS_BODY)
115 .unwrap_or_else(|_| "true".to_string())
116 .parse::<bool>()
117 .unwrap_or(true),
118 vip_channel_enabled: env::var(Self::SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY)
119 .unwrap_or_else(|_| "false".to_string())
120 .parse::<bool>()
121 .unwrap_or(false),
122 use_heartbeat_v2: env::var(Self::HEART_BEAT_V2)
123 .unwrap_or_else(|_| "false".to_string())
124 .parse::<bool>()
125 .unwrap_or(false),
126 enable_concurrent_heartbeat: env::var(Self::ENABLE_CONCURRENT_HEARTBEAT)
127 .unwrap_or_else(|_| "false".to_string())
128 .parse::<bool>()
129 .unwrap_or(false),
130 use_tls: false,
131 socks_proxy_config: env::var(Self::SOCKS_PROXY_CONFIG)
132 .unwrap_or_else(|_| "{}".to_string())
133 .into(),
134 mq_client_api_timeout: Duration::from_secs(3).as_millis() as u64,
135 detect_timeout: 200,
136 detect_interval: Duration::from_secs(2).as_millis() as u32,
137 language: LanguageCode::RUST,
138 enable_stream_request_type: false,
139 send_latency_enable: env::var(Self::SEND_LATENCY_ENABLE)
140 .unwrap_or_else(|_| "false".to_string())
141 .parse::<bool>()
142 .unwrap_or(false),
143 start_detector_enable: env::var(Self::START_DETECTOR_ENABLE)
144 .unwrap_or_else(|_| "false".to_string())
145 .parse::<bool>()
146 .unwrap_or(false),
147 enable_heartbeat_channel_event_listener: true,
148 enable_trace: false,
149 trace_topic: None,
150 trace_msg_batch_num: 10,
151 max_page_size_in_get_metadata: 2000,
152 concurrent_heartbeat_thread_pool_size: num_cpus::get(),
153 }
154 }
155}
156
157impl ClientConfig {
158 #[inline]
159 pub fn with_namespace(&mut self, resource: impl Into<CheetahString>) -> CheetahString {
160 let resource = resource.into();
161 let namespace = self.get_namespace().unwrap_or_default();
162
163 if namespace.is_empty() {
165 return resource;
166 }
167
168 if NamespaceUtil::is_already_with_namespace(resource.as_str(), namespace.as_str()) {
170 return resource;
171 }
172
173 NamespaceUtil::wrap_namespace(namespace, resource)
174 }
175
176 #[inline]
177 pub fn queue_with_namespace(&mut self, mut queue: MessageQueue) -> MessageQueue {
178 if let Some(namespace) = self.get_namespace() {
179 if !namespace.is_empty() {
180 let topic = NamespaceUtil::wrap_namespace(namespace.as_str(), queue.topic_str());
181 queue.set_topic(topic);
182 return queue;
183 }
184 }
185 queue
186 }
187
188 #[inline]
189 pub fn get_namespace(&mut self) -> Option<CheetahString> {
190 let namespace_initialized = self.namespace_initialized.load(Ordering::Acquire);
191 if namespace_initialized {
192 return self.namespace.clone();
193 }
194
195 if let Some(ref namespace) = self.namespace {
196 return Some(namespace.clone());
197 }
198
199 if let Some(ref namesrv_addr) = self.namesrv_addr {
200 if NameServerAddressUtils::validate_instance_endpoint(namesrv_addr.as_ref()) {
201 self.namespace =
202 NameServerAddressUtils::parse_instance_id_from_endpoint(namesrv_addr.as_ref()).map(|id| id.into());
203 }
204 }
205 self.namespace_initialized.store(true, Ordering::Release);
206 self.namespace.clone()
207 }
208
209 #[inline]
210 pub fn change_instance_name_to_pid(&mut self) {
211 if self.instance_name == "DEFAULT" {
212 self.instance_name = format!("{}#{}", std::process::id(), current_nano()).into();
213 }
214 }
215 #[inline]
216 pub fn set_instance_name(&mut self, instance_name: CheetahString) {
217 self.instance_name = instance_name;
218 }
219 #[inline]
220 pub fn set_namesrv_addr(&mut self, namesrv_addr: CheetahString) {
221 self.namesrv_addr = Some(namesrv_addr);
222 self.namespace_initialized.store(false, Ordering::Release);
223 }
224
225 #[inline]
226 pub fn build_mq_client_id(&self) -> String {
227 let estimated_capacity = self.client_ip.as_ref().map(|ip| ip.len()).unwrap_or(15)
229 + self.instance_name.len()
230 + self.unit_name.as_ref().map(|un| un.len() + 1).unwrap_or(0)
231 + if self.enable_stream_request_type { 8 } else { 0 }
232 + 3; let mut sb = String::with_capacity(estimated_capacity);
235 if let Some(ref client_ip) = self.client_ip {
236 sb.push_str(client_ip.as_str());
237 }
238
239 sb.push('@');
240 sb.push_str(self.instance_name.as_str());
241 if let Some(ref unit_name) = self.unit_name {
242 if !unit_name.is_empty() {
243 sb.push('@');
244 sb.push_str(unit_name.as_str());
245 }
246 }
247
248 if self.enable_stream_request_type {
249 sb.push('@');
250 sb.push_str(RequestType::Stream.to_string().as_str());
251 }
252
253 sb
254 }
255
256 #[inline]
257 pub fn get_namesrv_addr(&self) -> Option<CheetahString> {
258 if StringUtils::is_not_empty_str(self.namesrv_addr.as_deref())
259 && NAMESRV_ENDPOINT_PATTERN.is_match(self.namesrv_addr.as_ref().unwrap().as_str())
260 {
261 NameServerAddressUtils::get_name_srv_addr_from_namesrv_endpoint(
262 self.namesrv_addr.as_ref().unwrap().as_str(),
263 )
264 .map(|addr| addr.into())
265 } else {
266 self.namesrv_addr.clone()
267 }
268 }
269
270 #[inline]
273 pub fn get_client_ip(&self) -> Option<&CheetahString> {
274 self.client_ip.as_ref()
275 }
276
277 #[inline]
278 pub fn set_client_ip(&mut self, client_ip: CheetahString) {
279 self.client_ip = Some(client_ip);
280 }
281
282 #[inline]
283 pub fn get_instance_name(&self) -> &CheetahString {
284 &self.instance_name
285 }
286
287 #[inline]
288 pub fn get_client_callback_executor_threads(&self) -> usize {
289 self.client_callback_executor_threads
290 }
291
292 #[inline]
293 pub fn set_client_callback_executor_threads(&mut self, threads: usize) {
294 self.client_callback_executor_threads = threads;
295 }
296
297 #[inline]
298 pub fn get_namespace_v2(&self) -> Option<&CheetahString> {
299 self.namespace_v2.as_ref()
300 }
301
302 #[inline]
303 pub fn set_namespace_v2(&mut self, namespace_v2: CheetahString) {
304 self.namespace_v2 = Some(namespace_v2);
305 }
306
307 #[inline]
308 pub fn set_namespace(&mut self, namespace: CheetahString) {
309 self.namespace = Some(namespace);
310 self.namespace_initialized.store(true, Ordering::Release);
311 }
312
313 #[inline]
314 pub fn get_access_channel(&self) -> AccessChannel {
315 self.access_channel
316 }
317
318 #[inline]
319 pub fn set_access_channel(&mut self, access_channel: AccessChannel) {
320 self.access_channel = access_channel;
321 }
322
323 #[inline]
324 pub fn get_poll_name_server_interval(&self) -> u32 {
325 self.poll_name_server_interval
326 }
327
328 #[inline]
329 pub fn set_poll_name_server_interval(&mut self, interval: u32) {
330 self.poll_name_server_interval = interval;
331 }
332
333 #[inline]
334 pub fn get_heartbeat_broker_interval(&self) -> u32 {
335 self.heartbeat_broker_interval
336 }
337
338 #[inline]
339 pub fn set_heartbeat_broker_interval(&mut self, interval: u32) {
340 self.heartbeat_broker_interval = interval;
341 }
342
343 #[inline]
344 pub fn get_persist_consumer_offset_interval(&self) -> u32 {
345 self.persist_consumer_offset_interval
346 }
347
348 #[inline]
349 pub fn set_persist_consumer_offset_interval(&mut self, interval: u32) {
350 self.persist_consumer_offset_interval = interval;
351 }
352
353 #[inline]
354 pub fn get_pull_time_delay_millis_when_exception(&self) -> u32 {
355 self.pull_time_delay_millis_when_exception
356 }
357
358 #[inline]
359 pub fn set_pull_time_delay_millis_when_exception(&mut self, delay: u32) {
360 self.pull_time_delay_millis_when_exception = delay;
361 }
362
363 #[inline]
364 pub fn get_unit_name(&self) -> Option<&CheetahString> {
365 self.unit_name.as_ref()
366 }
367
368 #[inline]
369 pub fn set_unit_name(&mut self, unit_name: CheetahString) {
370 self.unit_name = Some(unit_name);
371 }
372
373 #[inline]
374 pub fn is_unit_mode(&self) -> bool {
375 self.unit_mode
376 }
377
378 #[inline]
379 pub fn set_unit_mode(&mut self, unit_mode: bool) {
380 self.unit_mode = unit_mode;
381 }
382
383 #[inline]
384 pub fn is_decode_read_body(&self) -> bool {
385 self.decode_read_body
386 }
387
388 #[inline]
389 pub fn set_decode_read_body(&mut self, decode_read_body: bool) {
390 self.decode_read_body = decode_read_body;
391 }
392
393 #[inline]
394 pub fn is_decode_decompress_body(&self) -> bool {
395 self.decode_decompress_body
396 }
397
398 #[inline]
399 pub fn set_decode_decompress_body(&mut self, decode_decompress_body: bool) {
400 self.decode_decompress_body = decode_decompress_body;
401 }
402
403 #[inline]
404 pub fn is_vip_channel_enabled(&self) -> bool {
405 self.vip_channel_enabled
406 }
407
408 #[inline]
409 pub fn set_vip_channel_enabled(&mut self, enabled: bool) {
410 self.vip_channel_enabled = enabled;
411 }
412
413 #[inline]
414 pub fn is_use_heartbeat_v2(&self) -> bool {
415 self.use_heartbeat_v2
416 }
417
418 #[inline]
419 pub fn set_use_heartbeat_v2(&mut self, use_heartbeat_v2: bool) {
420 self.use_heartbeat_v2 = use_heartbeat_v2;
421 }
422
423 #[inline]
424 pub fn is_use_tls(&self) -> bool {
425 self.use_tls
426 }
427
428 #[inline]
429 pub fn set_use_tls(&mut self, use_tls: bool) {
430 self.use_tls = use_tls;
431 }
432
433 #[inline]
434 pub fn get_socks_proxy_config(&self) -> &CheetahString {
435 &self.socks_proxy_config
436 }
437
438 #[inline]
439 pub fn set_socks_proxy_config(&mut self, config: CheetahString) {
440 self.socks_proxy_config = config;
441 }
442
443 #[inline]
444 pub fn get_language(&self) -> LanguageCode {
445 self.language
446 }
447
448 #[inline]
449 pub fn set_language(&mut self, language: LanguageCode) {
450 self.language = language;
451 }
452
453 #[inline]
454 pub fn get_mq_client_api_timeout(&self) -> u64 {
455 self.mq_client_api_timeout
456 }
457
458 #[inline]
459 pub fn set_mq_client_api_timeout(&mut self, timeout: u64) {
460 self.mq_client_api_timeout = timeout;
461 }
462
463 #[inline]
464 pub fn get_detect_timeout(&self) -> u32 {
465 self.detect_timeout
466 }
467
468 #[inline]
469 pub fn set_detect_timeout(&mut self, timeout: u32) {
470 self.detect_timeout = timeout;
471 }
472
473 #[inline]
474 pub fn get_detect_interval(&self) -> u32 {
475 self.detect_interval
476 }
477
478 #[inline]
479 pub fn set_detect_interval(&mut self, interval: u32) {
480 self.detect_interval = interval;
481 }
482
483 #[inline]
484 pub fn is_enable_stream_request_type(&self) -> bool {
485 self.enable_stream_request_type
486 }
487
488 #[inline]
489 pub fn set_enable_stream_request_type(&mut self, enabled: bool) {
490 self.enable_stream_request_type = enabled;
491 }
492
493 #[inline]
494 pub fn is_send_latency_enable(&self) -> bool {
495 self.send_latency_enable
496 }
497
498 #[inline]
499 pub fn set_send_latency_enable(&mut self, enabled: bool) {
500 self.send_latency_enable = enabled;
501 }
502
503 #[inline]
504 pub fn is_start_detector_enable(&self) -> bool {
505 self.start_detector_enable
506 }
507
508 #[inline]
509 pub fn set_start_detector_enable(&mut self, enabled: bool) {
510 self.start_detector_enable = enabled;
511 }
512
513 #[inline]
514 pub fn is_enable_heartbeat_channel_event_listener(&self) -> bool {
515 self.enable_heartbeat_channel_event_listener
516 }
517
518 #[inline]
519 pub fn set_enable_heartbeat_channel_event_listener(&mut self, enabled: bool) {
520 self.enable_heartbeat_channel_event_listener = enabled;
521 }
522
523 #[inline]
524 pub fn is_enable_trace(&self) -> bool {
525 self.enable_trace
526 }
527
528 #[inline]
529 pub fn set_enable_trace(&mut self, enabled: bool) {
530 self.enable_trace = enabled;
531 }
532
533 #[inline]
534 pub fn get_trace_topic(&self) -> Option<&CheetahString> {
535 self.trace_topic.as_ref()
536 }
537
538 #[inline]
539 pub fn set_trace_topic(&mut self, topic: CheetahString) {
540 self.trace_topic = Some(topic);
541 }
542
543 #[inline]
544 pub fn get_trace_msg_batch_num(&self) -> usize {
545 self.trace_msg_batch_num
546 }
547
548 #[inline]
549 pub fn set_trace_msg_batch_num(&mut self, num: usize) {
550 self.trace_msg_batch_num = num;
551 }
552
553 #[inline]
554 pub fn get_max_page_size_in_get_metadata(&self) -> usize {
555 self.max_page_size_in_get_metadata
556 }
557
558 #[inline]
559 pub fn set_max_page_size_in_get_metadata(&mut self, size: usize) {
560 self.max_page_size_in_get_metadata = size;
561 }
562
563 #[inline]
564 pub fn get_concurrent_heartbeat_thread_pool_size(&self) -> usize {
565 self.concurrent_heartbeat_thread_pool_size
566 }
567
568 #[inline]
569 pub fn set_concurrent_heartbeat_thread_pool_size(&mut self, size: usize) {
570 self.concurrent_heartbeat_thread_pool_size = size;
571 }
572
573 #[inline]
577 pub fn clone_client_config(&self) -> Self {
578 self.clone()
579 }
580
581 pub fn reset_client_config(&mut self, other: &ClientConfig) {
583 self.namesrv_addr = other.namesrv_addr.clone();
584 self.client_ip = other.client_ip.clone();
585 self.instance_name = other.instance_name.clone();
586 self.client_callback_executor_threads = other.client_callback_executor_threads;
587 self.namespace = other.namespace.clone();
588 self.namespace_v2 = other.namespace_v2.clone();
589 self.access_channel = other.access_channel;
590 self.poll_name_server_interval = other.poll_name_server_interval;
591 self.heartbeat_broker_interval = other.heartbeat_broker_interval;
592 self.persist_consumer_offset_interval = other.persist_consumer_offset_interval;
593 self.pull_time_delay_millis_when_exception = other.pull_time_delay_millis_when_exception;
594 self.unit_mode = other.unit_mode;
595 self.unit_name = other.unit_name.clone();
596 self.decode_read_body = other.decode_read_body;
597 self.decode_decompress_body = other.decode_decompress_body;
598 self.vip_channel_enabled = other.vip_channel_enabled;
599 self.use_heartbeat_v2 = other.use_heartbeat_v2;
600 self.use_tls = other.use_tls;
601 self.socks_proxy_config = other.socks_proxy_config.clone();
602 self.language = other.language;
603 self.mq_client_api_timeout = other.mq_client_api_timeout;
604 self.detect_timeout = other.detect_timeout;
605 self.detect_interval = other.detect_interval;
606 self.enable_stream_request_type = other.enable_stream_request_type;
607 self.send_latency_enable = other.send_latency_enable;
608 self.start_detector_enable = other.start_detector_enable;
609 self.enable_heartbeat_channel_event_listener = other.enable_heartbeat_channel_event_listener;
610 self.enable_trace = other.enable_trace;
611 self.trace_topic = other.trace_topic.clone();
612 self.trace_msg_batch_num = other.trace_msg_batch_num;
613 self.max_page_size_in_get_metadata = other.max_page_size_in_get_metadata;
614 self.concurrent_heartbeat_thread_pool_size = other.concurrent_heartbeat_thread_pool_size;
615 }
616
617 #[inline]
619 #[deprecated(note = "Use with_namespace for namespace wrapping")]
620 pub fn without_namespace(&mut self, resource: &str) -> CheetahString {
621 if let Some(namespace) = self.get_namespace().as_deref() {
622 NamespaceUtil::without_namespace_with_namespace(resource, namespace).into()
623 } else {
624 NamespaceUtil::without_namespace(resource).into()
625 }
626 }
627
628 #[inline]
643 pub fn builder() -> crate::base::client_config_builder::ClientConfigBuilder {
644 crate::base::client_config_builder::ClientConfigBuilder::new()
645 }
646}
647
648impl std::fmt::Display for ClientConfig {
649 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
650 write!(
651 f,
652 "ClientConfig {{ namesrv_addr: {:?}, client_ip: {:?}, instance_name: {}, \
653 client_callback_executor_threads: {}, namespace: {:?}, namespace_v2: {:?}, access_channel: {:?}, \
654 poll_name_server_interval: {}, heartbeat_broker_interval: {}, persist_consumer_offset_interval: {}, \
655 pull_time_delay_millis_when_exception: {}, unit_mode: {}, unit_name: {:?}, decode_read_body: {}, \
656 decode_decompress_body: {}, vip_channel_enabled: {}, use_heartbeat_v2: {}, use_tls: {}, \
657 socks_proxy_config: {}, mq_client_api_timeout: {}, detect_timeout: {}, detect_interval: {}, language: \
658 {:?}, enable_stream_request_type: {}, send_latency_enable: {}, start_detector_enable: {}, \
659 enable_heartbeat_channel_event_listener: {}, enable_trace: {}, trace_topic: {:?}, trace_msg_batch_num: \
660 {}, max_page_size_in_get_metadata: {}, enable_concurrent_heartbeat: {}, \
661 concurrent_heartbeat_thread_pool_size: {} }}",
662 self.namesrv_addr,
663 self.client_ip,
664 self.instance_name,
665 self.client_callback_executor_threads,
666 self.namespace,
667 self.namespace_v2,
668 self.access_channel,
669 self.poll_name_server_interval,
670 self.heartbeat_broker_interval,
671 self.persist_consumer_offset_interval,
672 self.pull_time_delay_millis_when_exception,
673 self.unit_mode,
674 self.unit_name,
675 self.decode_read_body,
676 self.decode_decompress_body,
677 self.vip_channel_enabled,
678 self.use_heartbeat_v2,
679 self.use_tls,
680 self.socks_proxy_config,
681 self.mq_client_api_timeout,
682 self.detect_timeout,
683 self.detect_interval,
684 self.language,
685 self.enable_stream_request_type,
686 self.send_latency_enable,
687 self.start_detector_enable,
688 self.enable_heartbeat_channel_event_listener,
689 self.enable_trace,
690 self.trace_topic,
691 self.trace_msg_batch_num,
692 self.max_page_size_in_get_metadata,
693 self.enable_concurrent_heartbeat,
694 self.concurrent_heartbeat_thread_pool_size
695 )
696 }
697}