1use std::any::Any;
19use std::collections::HashMap;
20use std::time::Duration;
21
22use cheetah_string::CheetahString;
23use lazy_static::lazy_static;
24use serde::Deserialize;
25use serde::Serialize;
26
27use crate::common::broker::broker_role::BrokerRole;
28use crate::common::constant::PermName;
29use crate::common::message::message_enum::MessageRequestMode;
30use crate::common::mix_all;
31use crate::common::mix_all::NAMESRV_ADDR_PROPERTY;
32use crate::common::server::config::ServerConfig;
33use crate::common::topic::TopicValidator;
34
35const DEFAULT_CLUSTER_NAME: &str = "DefaultCluster";
36
37lazy_static! {
38 pub static ref LOCAL_HOST_NAME: Option<String> = match hostname::get() {
39 Ok(hostname) => {
40 Some(hostname.to_string_lossy().to_string())
41 }
42 Err(_) => {
43 None
44 }
45 };
46 pub static ref NAMESRV_ADDR: Option<String> =
47 std::env::var(NAMESRV_ADDR_PROPERTY).map_or(Some("127.0.0.1:9876".to_string()), Some);
48}
49
50mod defaults {
52 use super::*;
53
54 pub fn broker_name() -> CheetahString {
56 default_broker_name().into()
57 }
58
59 pub fn broker_cluster_name() -> CheetahString {
60 DEFAULT_CLUSTER_NAME.to_string().into()
61 }
62
63 pub fn broker_id() -> u64 {
64 mix_all::MASTER_ID
65 }
66
67 pub fn broker_identity() -> BrokerIdentity {
69 BrokerIdentity::new()
70 }
71
72 pub fn topic_queue_config() -> TopicQueueConfig {
73 TopicQueueConfig::default()
74 }
75
76 pub fn timer_wheel_config() -> TimerWheelConfig {
77 TimerWheelConfig::default()
78 }
79
80 pub fn broker_server_config() -> ServerConfig {
81 ServerConfig::default()
82 }
83
84 pub fn broker_ip1() -> CheetahString {
85 match local_ip_address::local_ip() {
86 Ok(local_ip) => local_ip.to_string().into(),
87 Err(_) => "127.0.0.1".to_string().into(),
88 }
89 }
90
91 pub fn broker_ip2() -> Option<CheetahString> {
92 match local_ip_address::local_ip() {
93 Ok(local_ip) => Some(local_ip.to_string().into()),
94 Err(_) => None,
95 }
96 }
97
98 pub fn listen_port() -> u32 {
99 10911
100 }
101
102 pub fn msg_trace_topic_name() -> CheetahString {
103 CheetahString::from_static_str(TopicValidator::RMQ_SYS_TRACE_TOPIC)
104 }
105
106 pub fn region_id() -> CheetahString {
107 CheetahString::from_static_str(mix_all::DEFAULT_TRACE_REGION_ID)
108 }
109
110 pub fn trace_on() -> bool {
111 true
112 }
113
114 pub fn broker_permission() -> u32 {
115 PermName::PERM_WRITE | PermName::PERM_READ
116 }
117
118 pub fn store_path_root_dir() -> CheetahString {
119 dirs::home_dir()
120 .unwrap_or_default()
121 .join("store")
122 .to_string_lossy()
123 .into_owned()
124 .into()
125 }
126
127 pub fn split_registration_size() -> i32 {
128 800
129 }
130
131 pub fn register_broker_timeout_mills() -> i32 {
132 24000
133 }
134
135 pub fn commercial_size_per_msg() -> i32 {
136 4 * 1024
137 }
138
139 pub fn auto_create_topic_enable() -> bool {
140 true
141 }
142
143 pub fn enable_single_topic_register() -> bool {
144 true
145 }
146
147 pub fn broker_topic_enable() -> bool {
148 true
149 }
150
151 pub fn cluster_topic_enable() -> bool {
152 true
153 }
154
155 pub fn revive_queue_num() -> u32 {
156 8
157 }
158
159 pub fn enable_detail_stat() -> bool {
160 true
161 }
162
163 pub fn flush_consumer_offset_interval() -> u64 {
164 1000 * 5
165 }
166
167 pub fn force_register() -> bool {
168 true
169 }
170
171 pub fn register_name_server_period() -> u64 {
172 1000 * 30
173 }
174
175 pub fn namesrv_addr() -> Option<CheetahString> {
176 NAMESRV_ADDR.clone().map(|addr| addr.into())
177 }
178
179 pub fn lite_pull_message_enable() -> bool {
180 true
181 }
182
183 pub fn auto_create_subscription_group() -> bool {
184 true
185 }
186
187 pub fn channel_expired_timeout() -> u64 {
188 1000 * 120
189 }
190
191 pub fn subscription_expired_timeout() -> u64 {
192 1000 * 60 * 10
193 }
194
195 pub fn use_server_side_reset_offset() -> bool {
196 true
197 }
198
199 pub fn consumer_offset_update_version_step() -> i64 {
200 500
201 }
202
203 pub fn enable_broadcast_offset_store() -> bool {
204 true
205 }
206
207 pub fn transfer_msg_by_heap() -> bool {
208 true
209 }
210
211 pub fn short_polling_time_mills() -> u64 {
212 1000
213 }
214
215 pub fn long_polling_enable() -> bool {
216 true
217 }
218
219 pub fn max_error_rate_of_bloom_filter() -> i32 {
220 20
221 }
222
223 pub fn expect_consumer_num_use_filter() -> i32 {
224 32
225 }
226
227 pub fn bit_map_length_consume_queue_ext() -> i32 {
228 64
229 }
230
231 pub fn forward_timeout() -> u64 {
232 3 * 1000
233 }
234
235 pub fn validate_system_topic_when_update_topic() -> bool {
236 true
237 }
238
239 pub fn store_reply_message_enable() -> bool {
240 true
241 }
242
243 pub fn transaction_timeout() -> u64 {
244 6_000
245 }
246
247 pub fn transaction_op_msg_max_size() -> i32 {
248 4096
249 }
250
251 pub fn default_message_request_mode() -> MessageRequestMode {
252 MessageRequestMode::Pull
253 }
254
255 pub fn default_pop_share_queue_num() -> i32 {
256 -1
257 }
258
259 pub fn load_balance_poll_name_server_interval() -> u64 {
260 30_000
261 }
262
263 pub fn server_load_balancer_enable() -> bool {
264 true
265 }
266
267 pub fn retrieve_message_from_pop_retry_topic_v1() -> bool {
268 true
269 }
270
271 pub fn pop_from_retry_probability() -> i32 {
272 20
273 }
274
275 pub fn init_pop_offset_by_check_msg_in_mem() -> bool {
276 true
277 }
278
279 pub fn pop_ck_stay_buffer_time_out() -> u64 {
280 3_000
281 }
282
283 pub fn pop_ck_stay_buffer_time() -> u64 {
284 10_000
285 }
286
287 pub fn broker_role() -> BrokerRole {
288 BrokerRole::AsyncMaster
289 }
290
291 pub fn revive_interval() -> u64 {
292 1000
293 }
294
295 pub fn revive_max_slow() -> u64 {
296 3
297 }
298
299 pub fn revive_scan_time() -> u64 {
300 10_000
301 }
302
303 pub fn commercial_base_count() -> i32 {
304 1
305 }
306
307 pub fn broker_not_active_timeout_millis() -> i64 {
308 10_000
309 }
310
311 pub fn sync_broker_member_group_period() -> u64 {
312 1_000
313 }
314
315 pub fn pop_polling_map_size() -> usize {
316 100000
317 }
318
319 pub fn max_pop_polling_size() -> u64 {
320 100000
321 }
322
323 pub fn pop_polling_size() -> usize {
324 1024
325 }
326
327 pub fn pop_inflight_message_threshold() -> i64 {
328 10_000
329 }
330
331 pub fn pop_ck_max_buffer_size() -> i64 {
332 200_000
333 }
334
335 pub fn pop_ck_offset_max_queue_size() -> u64 {
336 20_000
337 }
338
339 pub fn delay_offset_update_version_step() -> u64 {
340 200
341 }
342
343 pub fn revive_ack_wait_ms() -> u64 {
344 Duration::from_secs(3 * 60).as_millis() as u64
345 }
346
347 pub fn os_page_cache_busy_timeout_mills() -> u64 {
348 1000
349 }
350
351 pub fn default_topic_queue_nums() -> u32 {
352 8
353 }
354 pub fn transaction_check_interval() -> u64 {
355 30_000
356 }
357
358 pub fn transaction_check_max() -> u32 {
359 15
360 }
361
362 pub fn transaction_op_batch_interval() -> u64 {
363 3000
364 }
365
366 pub fn compatible_with_old_name_srv() -> bool {
367 true
368 }
369
370 #[inline]
371 pub fn broker_heartbeat_interval() -> u64 {
372 1000
373 }
374}
375
376#[derive(Debug, Serialize, Deserialize, Clone)]
377#[serde(rename_all = "camelCase")]
378pub struct BrokerIdentity {
379 #[serde(default = "defaults::broker_name")]
380 pub broker_name: CheetahString,
381
382 #[serde(default = "defaults::broker_cluster_name")]
383 pub broker_cluster_name: CheetahString,
384
385 #[serde(default = "defaults::broker_id")]
386 pub broker_id: u64,
387
388 #[serde(default)]
389 pub is_broker_container: bool,
390
391 #[serde(default)]
392 pub is_in_broker_container: bool,
393}
394
395impl Default for BrokerIdentity {
396 fn default() -> Self {
397 BrokerIdentity::new()
398 }
399}
400
401impl BrokerIdentity {
402 pub fn new() -> Self {
403 let broker_name = default_broker_name();
404 let broker_cluster_name = String::from(DEFAULT_CLUSTER_NAME);
405 let broker_id = mix_all::MASTER_ID;
406 let is_broker_container = false;
407
408 BrokerIdentity {
409 broker_name: CheetahString::from_string(broker_name),
410 broker_cluster_name: CheetahString::from_string(broker_cluster_name),
411 broker_id,
412 is_broker_container,
413 is_in_broker_container: false,
414 }
415 }
416
417 fn new_with_container(is_broker_container: bool) -> Self {
418 let mut identity = BrokerIdentity::new();
419 identity.is_broker_container = is_broker_container;
420 identity
421 }
422
423 fn new_with_params(broker_cluster_name: String, broker_name: String, broker_id: u64) -> Self {
424 BrokerIdentity {
425 broker_name: CheetahString::from_string(broker_name),
426 broker_cluster_name: CheetahString::from_string(broker_cluster_name),
427 broker_id,
428 is_broker_container: false,
429 is_in_broker_container: false,
430 }
431 }
432
433 fn new_with_container_params(
434 broker_cluster_name: String,
435 broker_name: String,
436 broker_id: u64,
437 is_in_broker_container: bool,
438 ) -> Self {
439 BrokerIdentity {
440 broker_name: CheetahString::from_string(broker_name),
441 broker_cluster_name: CheetahString::from_string(broker_cluster_name),
442 broker_id,
443 is_broker_container: true,
444 is_in_broker_container,
445 }
446 }
447
448 pub fn get_canonical_name(&self) -> String {
449 match self.is_broker_container {
450 true => "BrokerContainer".to_string(),
451 false => {
452 format!(
453 "{}_{}_{}",
454 self.broker_cluster_name, self.broker_name, self.broker_id
455 )
456 }
457 }
458 }
459}
460
461#[derive(Debug, Serialize, Deserialize, Clone)]
462#[serde(rename_all = "camelCase")]
463pub struct BrokerConfig {
464 #[serde(default = "defaults::broker_identity")]
465 pub broker_identity: BrokerIdentity,
466
467 #[serde(default = "defaults::topic_queue_config")]
468 pub topic_queue_config: TopicQueueConfig,
469
470 #[serde(default = "defaults::timer_wheel_config")]
471 pub timer_wheel_config: TimerWheelConfig,
472
473 #[serde(default = "defaults::broker_server_config")]
474 pub broker_server_config: ServerConfig,
475
476 #[serde(default = "defaults::broker_ip1")]
477 pub broker_ip1: CheetahString,
478
479 #[serde(default = "defaults::broker_ip2")]
480 pub broker_ip2: Option<CheetahString>,
481
482 #[serde(default = "defaults::listen_port")]
483 pub listen_port: u32,
484
485 #[serde(default)]
486 pub trace_topic_enable: bool,
487
488 #[serde(default = "defaults::msg_trace_topic_name")]
489 pub msg_trace_topic_name: CheetahString,
490
491 #[serde(default)]
492 pub enable_controller_mode: bool,
493
494 #[serde(default = "defaults::broker_name")]
495 pub broker_name: CheetahString,
496
497 #[serde(default = "defaults::region_id")]
498 pub region_id: CheetahString,
499
500 #[serde(default = "defaults::trace_on")]
501 pub trace_on: bool,
502
503 #[serde(default = "defaults::broker_permission")]
504 pub broker_permission: u32,
505
506 #[serde(default)]
507 pub async_send_enable: bool, #[serde(default = "defaults::store_path_root_dir")]
510 pub store_path_root_dir: CheetahString,
511
512 #[serde(default)]
513 pub enable_split_registration: bool,
514
515 #[serde(default = "defaults::split_registration_size")]
516 pub split_registration_size: i32,
517
518 #[serde(default = "defaults::register_broker_timeout_mills")]
519 pub register_broker_timeout_mills: i32,
520
521 #[serde(default)]
522 pub is_in_broker_container: bool,
523
524 #[serde(default = "defaults::commercial_size_per_msg")]
525 pub commercial_size_per_msg: i32,
526
527 #[serde(default)]
528 pub recover_concurrently: bool,
529
530 #[serde(default)]
531 pub duplication_enable: bool,
532
533 #[serde(default)]
534 pub start_accept_send_request_time_stamp: i64,
535
536 #[serde(default = "defaults::auto_create_topic_enable")]
537 pub auto_create_topic_enable: bool,
538
539 #[serde(default = "defaults::enable_single_topic_register")]
540 pub enable_single_topic_register: bool,
541
542 #[serde(default = "defaults::broker_topic_enable")]
543 pub broker_topic_enable: bool,
544
545 #[serde(default = "defaults::cluster_topic_enable")]
546 pub cluster_topic_enable: bool,
547
548 #[serde(default = "defaults::revive_queue_num")]
549 pub revive_queue_num: u32,
550
551 #[serde(default)]
552 pub enable_slave_acting_master: bool,
553
554 #[serde(default)]
555 pub reject_transaction_message: bool,
556
557 #[serde(default = "defaults::enable_detail_stat")]
558 pub enable_detail_stat: bool,
559
560 #[serde(default = "defaults::flush_consumer_offset_interval")]
561 pub flush_consumer_offset_interval: u64,
562
563 #[serde(default = "defaults::force_register")]
564 pub force_register: bool,
565
566 #[serde(default = "defaults::register_name_server_period")]
567 pub register_name_server_period: u64,
568
569 #[serde(default)]
570 pub skip_pre_online: bool,
571
572 #[serde(default = "defaults::namesrv_addr")]
573 pub namesrv_addr: Option<CheetahString>,
574
575 #[serde(default)]
576 pub fetch_name_srv_addr_by_dns_lookup: bool,
577
578 #[serde(default = "defaults::lite_pull_message_enable")]
579 pub lite_pull_message_enable: bool,
580
581 #[serde(default = "defaults::auto_create_subscription_group")]
582 pub auto_create_subscription_group: bool,
583
584 #[serde(default = "defaults::channel_expired_timeout")]
585 pub channel_expired_timeout: u64,
586
587 #[serde(default = "defaults::subscription_expired_timeout")]
588 pub subscription_expired_timeout: u64,
589
590 #[serde(default)]
591 pub enable_property_filter: bool,
592
593 #[serde(default)]
594 pub filter_support_retry: bool,
595
596 #[serde(default = "defaults::use_server_side_reset_offset")]
597 pub use_server_side_reset_offset: bool,
598
599 #[serde(default)]
600 pub slave_read_enable: bool,
601
602 #[serde(default = "defaults::commercial_base_count")]
603 pub commercial_base_count: i32,
604
605 #[serde(default)]
606 pub reject_pull_consumer_enable: bool,
607
608 #[serde(default = "defaults::consumer_offset_update_version_step")]
609 pub consumer_offset_update_version_step: i64,
610
611 #[serde(default = "defaults::enable_broadcast_offset_store")]
612 pub enable_broadcast_offset_store: bool,
613
614 #[serde(default = "defaults::transfer_msg_by_heap")]
615 pub transfer_msg_by_heap: bool,
616
617 #[serde(default = "defaults::short_polling_time_mills")]
618 pub short_polling_time_mills: u64,
619
620 #[serde(default = "defaults::long_polling_enable")]
621 pub long_polling_enable: bool,
622
623 #[serde(default = "defaults::max_error_rate_of_bloom_filter")]
624 pub max_error_rate_of_bloom_filter: i32,
625
626 #[serde(default = "defaults::expect_consumer_num_use_filter")]
627 pub expect_consumer_num_use_filter: i32,
628
629 #[serde(default = "defaults::bit_map_length_consume_queue_ext")]
630 pub bit_map_length_consume_queue_ext: i32,
631
632 #[serde(default = "defaults::validate_system_topic_when_update_topic")]
633 pub validate_system_topic_when_update_topic: bool,
634
635 #[serde(default)]
636 pub enable_mixed_message_type: bool,
637
638 #[serde(default)]
639 pub auto_delete_unused_stats: bool,
640
641 #[serde(default = "defaults::forward_timeout")]
642 pub forward_timeout: u64,
643
644 #[serde(default = "defaults::store_reply_message_enable")]
645 pub store_reply_message_enable: bool,
646
647 #[serde(default)]
648 pub lock_in_strict_mode: bool,
649
650 #[serde(default = "defaults::transaction_timeout")]
651 pub transaction_timeout: u64,
652
653 #[serde(default = "defaults::transaction_op_msg_max_size")]
654 pub transaction_op_msg_max_size: i32,
655
656 #[serde(default = "defaults::default_message_request_mode")]
657 pub default_message_request_mode: MessageRequestMode,
658
659 #[serde(default = "defaults::default_pop_share_queue_num")]
660 pub default_pop_share_queue_num: i32,
661
662 #[serde(default = "defaults::load_balance_poll_name_server_interval")]
663 pub load_balance_poll_name_server_interval: u64,
664
665 #[serde(default = "defaults::server_load_balancer_enable")]
666 pub server_load_balancer_enable: bool,
667
668 #[serde(default)]
669 pub enable_remote_escape: bool,
670
671 #[serde(default)]
672 pub enable_pop_log: bool,
673
674 #[serde(default)]
675 pub enable_retry_topic_v2: bool,
676
677 #[serde(default = "defaults::retrieve_message_from_pop_retry_topic_v1")]
678 pub retrieve_message_from_pop_retry_topic_v1: bool,
679
680 #[serde(default = "defaults::pop_from_retry_probability")]
681 pub pop_from_retry_probability: i32,
682
683 #[serde(default)]
684 pub pop_response_return_actual_retry_topic: bool,
685
686 #[serde(default = "defaults::init_pop_offset_by_check_msg_in_mem")]
687 pub init_pop_offset_by_check_msg_in_mem: bool,
688
689 #[serde(default)]
690 pub enable_pop_buffer_merge: bool,
691
692 #[serde(default = "defaults::pop_ck_stay_buffer_time_out")]
693 pub pop_ck_stay_buffer_time_out: u64,
694
695 #[serde(default = "defaults::pop_ck_stay_buffer_time")]
696 pub pop_ck_stay_buffer_time: u64,
697
698 #[serde(default = "defaults::broker_role")]
699 pub broker_role: BrokerRole,
700
701 #[serde(default)]
702 pub enable_pop_batch_ack: bool,
703
704 #[serde(default = "defaults::revive_interval")]
705 pub revive_interval: u64,
706
707 #[serde(default = "defaults::revive_max_slow")]
708 pub revive_max_slow: u64,
709
710 #[serde(default = "defaults::revive_scan_time")]
711 pub revive_scan_time: u64,
712
713 #[serde(default)]
714 pub enable_skip_long_awaiting_ack: bool,
715
716 #[serde(default)]
717 pub skip_when_ck_re_put_reach_max_times: bool,
718
719 #[serde(default)]
720 pub compressed_register: bool,
721
722 #[serde(default = "defaults::broker_not_active_timeout_millis")]
723 pub broker_not_active_timeout_millis: i64,
724
725 #[serde(default = "defaults::sync_broker_member_group_period")]
726 pub sync_broker_member_group_period: u64,
727
728 #[serde(default = "defaults::pop_polling_map_size")]
729 pub pop_polling_map_size: usize,
730
731 #[serde(default = "defaults::max_pop_polling_size")]
732 pub max_pop_polling_size: u64,
733
734 #[serde(default = "defaults::pop_polling_size")]
735 pub pop_polling_size: usize,
736
737 #[serde(default)]
738 pub enable_pop_message_threshold: bool,
739
740 #[serde(default = "defaults::pop_inflight_message_threshold")]
741 pub pop_inflight_message_threshold: i64,
742
743 #[serde(default = "defaults::pop_ck_max_buffer_size")]
744 pub pop_ck_max_buffer_size: i64,
745
746 #[serde(default = "defaults::pop_ck_offset_max_queue_size")]
747 pub pop_ck_offset_max_queue_size: u64,
748
749 #[serde(default = "defaults::delay_offset_update_version_step")]
750 pub delay_offset_update_version_step: u64,
751
752 #[serde(default = "defaults::revive_ack_wait_ms")]
753 pub revive_ack_wait_ms: u64,
754
755 #[serde(default)]
756 pub enable_calc_filter_bit_map: bool,
757
758 #[serde(default = "defaults::transaction_check_interval")]
759 pub transaction_check_interval: u64,
760
761 #[serde(default = "defaults::transaction_check_max")]
762 pub transaction_check_max: u32,
763
764 #[serde(default = "defaults::transaction_op_batch_interval")]
765 pub transaction_op_batch_interval: u64,
766
767 #[serde(default = "defaults::compatible_with_old_name_srv")]
768 pub compatible_with_old_name_srv: bool,
769
770 #[serde(default = "defaults::broker_heartbeat_interval")]
771 pub broker_heartbeat_interval: u64,
772}
773
774impl Default for BrokerConfig {
775 fn default() -> Self {
776 let broker_identity = BrokerIdentity::new();
777 let local_ip = local_ip_address::local_ip().unwrap();
778 let broker_ip1 = local_ip.to_string().into();
779 let broker_ip2 = Some(local_ip.to_string().into());
780 let listen_port = 10911;
781
782 BrokerConfig {
783 broker_identity,
784 topic_queue_config: TopicQueueConfig::default(),
785 timer_wheel_config: TimerWheelConfig::default(),
786 broker_server_config: Default::default(),
787 broker_ip1,
788 broker_ip2,
789 listen_port,
790 trace_topic_enable: false,
791 msg_trace_topic_name: CheetahString::from_static_str(
792 TopicValidator::RMQ_SYS_TRACE_TOPIC,
793 ),
794 enable_controller_mode: false,
795 broker_name: default_broker_name().into(),
796 region_id: CheetahString::from_static_str(mix_all::DEFAULT_TRACE_REGION_ID),
797 trace_on: true,
798 broker_permission: PermName::PERM_WRITE | PermName::PERM_READ,
799 async_send_enable: false,
800 store_path_root_dir: dirs::home_dir()
801 .unwrap()
802 .join("store")
803 .to_string_lossy()
804 .into_owned()
805 .into(),
806 enable_split_registration: false,
807 split_registration_size: 800,
808 register_broker_timeout_mills: 24000,
809 is_in_broker_container: false,
810 commercial_size_per_msg: 4 * 1024,
811 recover_concurrently: false,
812 duplication_enable: false,
813 start_accept_send_request_time_stamp: 0,
814 auto_create_topic_enable: true,
815 enable_single_topic_register: true,
816 broker_topic_enable: true,
817 cluster_topic_enable: true,
818 revive_queue_num: 8,
819 enable_slave_acting_master: false,
820 reject_transaction_message: false,
821 enable_detail_stat: true,
822 flush_consumer_offset_interval: 1000 * 5,
823 force_register: true,
824 register_name_server_period: 1000 * 30,
825 skip_pre_online: false,
826 namesrv_addr: NAMESRV_ADDR.clone().map(|addr| addr.into()),
827 fetch_name_srv_addr_by_dns_lookup: false,
828 lite_pull_message_enable: true,
829 auto_create_subscription_group: true,
830 channel_expired_timeout: 1000 * 120,
831 subscription_expired_timeout: 1000 * 60 * 10,
832 enable_property_filter: false,
833 filter_support_retry: false,
834 use_server_side_reset_offset: true,
835 slave_read_enable: false,
836 commercial_base_count: 1,
837 reject_pull_consumer_enable: false,
838 consumer_offset_update_version_step: 500,
839 enable_broadcast_offset_store: true,
840 transfer_msg_by_heap: true,
841 short_polling_time_mills: 1000,
842 long_polling_enable: true,
843 max_error_rate_of_bloom_filter: 20,
844 expect_consumer_num_use_filter: 32,
845 bit_map_length_consume_queue_ext: 64,
846 forward_timeout: 3 * 1000,
847 validate_system_topic_when_update_topic: true,
848 enable_mixed_message_type: false,
849 auto_delete_unused_stats: false,
850 store_reply_message_enable: true,
851 lock_in_strict_mode: false,
852 transaction_timeout: 6_000,
853 transaction_op_msg_max_size: 4096,
854 default_message_request_mode: MessageRequestMode::Pull,
855 default_pop_share_queue_num: -1,
856 load_balance_poll_name_server_interval: 30_000,
857 server_load_balancer_enable: true,
858 enable_remote_escape: false,
859 enable_pop_log: false,
860 enable_retry_topic_v2: false,
861 retrieve_message_from_pop_retry_topic_v1: true,
862 pop_from_retry_probability: 20,
863 pop_response_return_actual_retry_topic: false,
864 init_pop_offset_by_check_msg_in_mem: true,
865 enable_pop_buffer_merge: false,
866 pop_ck_stay_buffer_time_out: 3_000,
867 pop_ck_stay_buffer_time: 10_000,
868 broker_role: BrokerRole::AsyncMaster,
869 enable_pop_batch_ack: false,
870 revive_interval: 1000,
871 revive_max_slow: 3,
872 revive_scan_time: 10_000,
873 enable_skip_long_awaiting_ack: false,
874 skip_when_ck_re_put_reach_max_times: false,
875 compressed_register: false,
876 broker_not_active_timeout_millis: 10_000,
877 sync_broker_member_group_period: 1_000,
878 pop_polling_map_size: 100000,
879 max_pop_polling_size: 100000,
880 pop_polling_size: 1024,
881 enable_pop_message_threshold: false,
882 pop_inflight_message_threshold: 10_000,
883 pop_ck_max_buffer_size: 200_000,
884 pop_ck_offset_max_queue_size: 20_000,
885 delay_offset_update_version_step: 200,
886 revive_ack_wait_ms: Duration::from_secs(3 * 60).as_millis() as u64,
887 enable_calc_filter_bit_map: false,
888 transaction_check_interval: 30_000,
889 transaction_check_max: 15,
890 transaction_op_batch_interval: 3_000,
891 compatible_with_old_name_srv: true,
892 broker_heartbeat_interval: 1000,
893 }
894 }
895}
896
897impl BrokerConfig {
898 pub fn broker_name(&self) -> CheetahString {
899 self.broker_name.clone()
900 }
901
902 pub fn broker_ip1(&self) -> CheetahString {
903 self.broker_ip1.clone()
904 }
905
906 pub fn broker_ip2(&self) -> Option<CheetahString> {
907 self.broker_ip2.clone()
908 }
909
910 pub fn listen_port(&self) -> u32 {
911 self.listen_port
912 }
913
914 pub fn trace_topic_enable(&self) -> bool {
915 self.trace_topic_enable
916 }
917
918 pub fn broker_server_config(&self) -> &ServerConfig {
919 &self.broker_server_config
920 }
921
922 pub fn region_id(&self) -> &str {
923 self.region_id.as_str()
924 }
925
926 #[inline]
927 pub fn broker_permission(&self) -> u32 {
928 self.broker_permission
929 }
930
931 pub fn get_broker_addr(&self) -> String {
932 format!("{}:{}", self.broker_ip1, self.listen_port)
933 }
934
935 pub fn get_start_accept_send_request_time_stamp(&self) -> i64 {
936 self.start_accept_send_request_time_stamp
937 }
938
939 pub fn get_properties(&self) -> HashMap<CheetahString, CheetahString> {
940 let mut properties = HashMap::new();
941 properties.insert("brokerName".into(), self.broker_name.clone());
942 properties.insert(
943 "brokerClusterName".into(),
944 self.broker_identity.broker_cluster_name.clone(),
945 );
946 properties.insert(
947 "brokerId".into(),
948 self.broker_identity.broker_id.to_string().into(),
949 );
950 properties.insert(
951 "isBrokerContainer".into(),
952 self.broker_identity.is_broker_container.to_string().into(),
953 );
954 properties.insert(
955 "defaultTopicQueueNums".into(),
956 self.topic_queue_config
957 .default_topic_queue_nums
958 .to_string()
959 .into(),
960 );
961 properties.insert(
962 "timerWheelEnable".into(),
963 self.timer_wheel_config
964 .timer_wheel_enable
965 .to_string()
966 .into(),
967 );
968 properties.insert(
969 "bindAddress".into(),
970 self.broker_server_config
971 .bind_address
972 .clone()
973 .to_string()
974 .into(),
975 );
976 properties.insert(
977 "brokerIp1".into(),
978 self.broker_ip1.clone().to_string().into(),
979 );
980 properties.insert(
981 "brokerIp2".into(),
982 self.broker_ip2.clone().unwrap_or_default(),
983 );
984 properties.insert("listenPort".into(), self.listen_port.to_string().into());
985 properties.insert(
986 "traceTopicEnable".into(),
987 self.trace_topic_enable.to_string().into(),
988 );
989 properties.insert(
990 "msgTraceTopicName".into(),
991 self.msg_trace_topic_name.clone(),
992 );
993 properties.insert(
994 "enableControllerMode".into(),
995 self.enable_controller_mode.to_string().into(),
996 );
997 properties.insert("regionId".into(), self.region_id.clone());
998 properties.insert("brokerName".into(), self.broker_name.clone());
999 properties.insert("traceOn".into(), self.trace_on.to_string().into());
1000 properties.insert(
1001 "brokerPermission".into(),
1002 self.broker_permission.to_string().into(),
1003 );
1004 properties.insert(
1005 "asyncSendEnable".into(),
1006 self.async_send_enable.to_string().into(),
1007 );
1008 properties.insert("storePathRootDir".into(), self.store_path_root_dir.clone());
1009 properties.insert(
1010 "enableSplitRegistration".into(),
1011 self.enable_split_registration.to_string().into(),
1012 );
1013 properties.insert(
1014 "splitRegistrationSize".into(),
1015 self.split_registration_size.to_string().into(),
1016 );
1017 properties.insert(
1018 "registerBrokerTimeoutMills".into(),
1019 self.register_broker_timeout_mills.to_string().into(),
1020 );
1021 properties.insert(
1022 "isInBrokerContainer".into(),
1023 self.is_in_broker_container.to_string().into(),
1024 );
1025 properties.insert(
1026 "commercialSizePerMsg".into(),
1027 self.commercial_size_per_msg.to_string().into(),
1028 );
1029 properties.insert(
1030 "recoverConcurrently".into(),
1031 self.recover_concurrently.to_string().into(),
1032 );
1033 properties.insert(
1034 "duplicationEnable".into(),
1035 self.duplication_enable.to_string().into(),
1036 );
1037 properties.insert(
1038 "startAcceptSendRequestTimeStamp".into(),
1039 self.start_accept_send_request_time_stamp.to_string().into(),
1040 );
1041 properties.insert(
1042 "autoCreateTopicEnable".into(),
1043 self.auto_create_topic_enable.to_string().into(),
1044 );
1045 properties.insert(
1046 "enableSingleTopicRegister".into(),
1047 self.enable_single_topic_register.to_string().into(),
1048 );
1049 properties.insert(
1050 "brokerTopicEnable".into(),
1051 self.broker_topic_enable.to_string().into(),
1052 );
1053 properties.insert(
1054 "clusterTopicEnable".into(),
1055 self.cluster_topic_enable.to_string().into(),
1056 );
1057 properties.insert(
1058 "reviveQueueNum".into(),
1059 self.revive_queue_num.to_string().into(),
1060 );
1061 properties.insert(
1062 "enableSlaveActingMaster".into(),
1063 self.enable_slave_acting_master.to_string().into(),
1064 );
1065 properties.insert(
1066 "rejectTransactionMessage".into(),
1067 self.reject_transaction_message.to_string().into(),
1068 );
1069 properties.insert(
1070 "enableDetailStat".into(),
1071 self.enable_detail_stat.to_string().into(),
1072 );
1073 properties.insert(
1074 "flushConsumerOffsetInterval".into(),
1075 self.flush_consumer_offset_interval.to_string().into(),
1076 );
1077 properties.insert(
1078 "forceRegister".into(),
1079 self.force_register.to_string().into(),
1080 );
1081 properties.insert(
1082 "registerNameServerPeriod".into(),
1083 self.register_name_server_period.to_string().into(),
1084 );
1085 properties.insert(
1086 "skipPreOnline".into(),
1087 self.skip_pre_online.to_string().into(),
1088 );
1089 properties.insert(
1090 "namesrvAddr".into(),
1091 self.namesrv_addr.clone().unwrap_or_default(),
1092 );
1093 properties.insert(
1094 "fetchNameSrvAddrByDnsLookup".into(),
1095 self.fetch_name_srv_addr_by_dns_lookup.to_string().into(),
1096 );
1097 properties.insert(
1098 "litePullMessageEnable".into(),
1099 self.lite_pull_message_enable.to_string().into(),
1100 );
1101 properties.insert(
1102 "autoCreateSubscriptionGroup".into(),
1103 self.auto_create_subscription_group.to_string().into(),
1104 );
1105 properties.insert(
1106 "channelExpiredTimeout".into(),
1107 self.channel_expired_timeout.to_string().into(),
1108 );
1109 properties.insert(
1110 "subscriptionExpiredTimeout".into(),
1111 self.subscription_expired_timeout.to_string().into(),
1112 );
1113 properties.insert(
1114 "enablePropertyFilter".into(),
1115 self.enable_property_filter.to_string().into(),
1116 );
1117 properties.insert(
1118 "filterSupportRetry".into(),
1119 self.filter_support_retry.to_string().into(),
1120 );
1121 properties.insert(
1122 "useServerSideResetOffset".into(),
1123 self.use_server_side_reset_offset.to_string().into(),
1124 );
1125 properties.insert(
1126 "slaveReadEnable".into(),
1127 self.slave_read_enable.to_string().into(),
1128 );
1129 properties.insert(
1130 "commercialBaseCount".into(),
1131 self.commercial_base_count.to_string().into(),
1132 );
1133 properties.insert(
1134 "rejectPullConsumerEnable".into(),
1135 self.reject_pull_consumer_enable.to_string().into(),
1136 );
1137 properties.insert(
1138 "consumerOffsetUpdateVersionStep".into(),
1139 self.consumer_offset_update_version_step.to_string().into(),
1140 );
1141 properties.insert(
1142 "enableBroadcastOffsetStore".into(),
1143 self.enable_broadcast_offset_store.to_string().into(),
1144 );
1145 properties.insert(
1146 "transferMsgByHeap".into(),
1147 self.transfer_msg_by_heap.to_string().into(),
1148 );
1149 properties.insert(
1150 "shortPollingTimeMills".into(),
1151 self.short_polling_time_mills.to_string().into(),
1152 );
1153 properties.insert(
1154 "longPollingEnable".into(),
1155 self.long_polling_enable.to_string().into(),
1156 );
1157 properties.insert(
1158 "maxErrorRateOfBloomFilter".into(),
1159 self.max_error_rate_of_bloom_filter.to_string().into(),
1160 );
1161 properties.insert(
1162 "expectConsumerNumUseFilter".into(),
1163 self.expect_consumer_num_use_filter.to_string().into(),
1164 );
1165 properties.insert(
1166 "bitMapLengthConsumeQueueExt".into(),
1167 self.bit_map_length_consume_queue_ext.to_string().into(),
1168 );
1169 properties.insert(
1170 "validateSystemTopicWhenUpdateTopic".into(),
1171 self.validate_system_topic_when_update_topic
1172 .to_string()
1173 .into(),
1174 );
1175 properties.insert(
1176 "enableMixedMessageType".into(),
1177 self.enable_mixed_message_type.to_string().into(),
1178 );
1179 properties.insert(
1180 "autoDeleteUnusedStats".into(),
1181 self.auto_delete_unused_stats.to_string().into(),
1182 );
1183 properties.insert(
1184 "forwardTimeout".into(),
1185 self.forward_timeout.to_string().into(),
1186 );
1187 properties
1188 }
1189}
1190
1191pub fn default_broker_name() -> String {
1192 LOCAL_HOST_NAME
1193 .clone()
1194 .unwrap_or_else(|| "DEFAULT_BROKER".to_string())
1195}
1196
1197#[derive(Debug, Serialize, Deserialize, Clone)]
1198#[serde(rename_all = "camelCase")]
1199pub struct TopicQueueConfig {
1200 #[serde(default = "defaults::default_topic_queue_nums")]
1201 pub default_topic_queue_nums: u32,
1202}
1203
1204impl Default for TopicQueueConfig {
1205 fn default() -> Self {
1206 TopicQueueConfig {
1207 default_topic_queue_nums: 8,
1208 }
1209 }
1210}
1211
1212#[derive(Debug, Serialize, Deserialize, Default, Clone)]
1213#[serde(rename_all = "camelCase")]
1214pub struct TimerWheelConfig {
1215 #[serde(default)]
1216 pub timer_wheel_enable: bool,
1217}