rocketmq_common/common/broker/
broker_config.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::time::Duration;
21
22use cheetah_string::CheetahString;
23use lazy_static::lazy_static;
24use serde::Deserialize;
25use serde::Serialize;
26
27use crate::common::broker::broker_role::BrokerRole;
28use crate::common::constant::PermName;
29use crate::common::message::message_enum::MessageRequestMode;
30use crate::common::mix_all;
31use crate::common::mix_all::NAMESRV_ADDR_PROPERTY;
32use crate::common::server::config::ServerConfig;
33use crate::common::topic::TopicValidator;
34
35const DEFAULT_CLUSTER_NAME: &str = "DefaultCluster";
36
37lazy_static! {
38    pub static ref LOCAL_HOST_NAME: Option<String> = match hostname::get() {
39        Ok(hostname) => {
40            Some(hostname.to_string_lossy().to_string())
41        }
42        Err(_) => {
43            None
44        }
45    };
46    pub static ref NAMESRV_ADDR: Option<String> =
47        std::env::var(NAMESRV_ADDR_PROPERTY).map_or(Some("127.0.0.1:9876".to_string()), Some);
48}
49
50/// Default value functions for Serde deserialization
51mod defaults {
52    use super::*;
53
54    // BrokerIdentity defaults
55    pub fn broker_name() -> CheetahString {
56        default_broker_name().into()
57    }
58
59    pub fn broker_cluster_name() -> CheetahString {
60        DEFAULT_CLUSTER_NAME.to_string().into()
61    }
62
63    pub fn broker_id() -> u64 {
64        mix_all::MASTER_ID
65    }
66
67    // BrokerConfig defaults
68    pub fn broker_identity() -> BrokerIdentity {
69        BrokerIdentity::new()
70    }
71
72    pub fn topic_queue_config() -> TopicQueueConfig {
73        TopicQueueConfig::default()
74    }
75
76    pub fn timer_wheel_config() -> TimerWheelConfig {
77        TimerWheelConfig::default()
78    }
79
80    pub fn broker_server_config() -> ServerConfig {
81        ServerConfig::default()
82    }
83
84    pub fn broker_ip1() -> CheetahString {
85        match local_ip_address::local_ip() {
86            Ok(local_ip) => local_ip.to_string().into(),
87            Err(_) => "127.0.0.1".to_string().into(),
88        }
89    }
90
91    pub fn broker_ip2() -> Option<CheetahString> {
92        match local_ip_address::local_ip() {
93            Ok(local_ip) => Some(local_ip.to_string().into()),
94            Err(_) => None,
95        }
96    }
97
98    pub fn listen_port() -> u32 {
99        10911
100    }
101
102    pub fn msg_trace_topic_name() -> CheetahString {
103        CheetahString::from_static_str(TopicValidator::RMQ_SYS_TRACE_TOPIC)
104    }
105
106    pub fn region_id() -> CheetahString {
107        CheetahString::from_static_str(mix_all::DEFAULT_TRACE_REGION_ID)
108    }
109
110    pub fn trace_on() -> bool {
111        true
112    }
113
114    pub fn broker_permission() -> u32 {
115        PermName::PERM_WRITE | PermName::PERM_READ
116    }
117
118    pub fn store_path_root_dir() -> CheetahString {
119        dirs::home_dir()
120            .unwrap_or_default()
121            .join("store")
122            .to_string_lossy()
123            .into_owned()
124            .into()
125    }
126
127    pub fn split_registration_size() -> i32 {
128        800
129    }
130
131    pub fn register_broker_timeout_mills() -> i32 {
132        24000
133    }
134
135    pub fn commercial_size_per_msg() -> i32 {
136        4 * 1024
137    }
138
139    pub fn auto_create_topic_enable() -> bool {
140        true
141    }
142
143    pub fn enable_single_topic_register() -> bool {
144        true
145    }
146
147    pub fn broker_topic_enable() -> bool {
148        true
149    }
150
151    pub fn cluster_topic_enable() -> bool {
152        true
153    }
154
155    pub fn revive_queue_num() -> u32 {
156        8
157    }
158
159    pub fn enable_detail_stat() -> bool {
160        true
161    }
162
163    pub fn flush_consumer_offset_interval() -> u64 {
164        1000 * 5
165    }
166
167    pub fn force_register() -> bool {
168        true
169    }
170
171    pub fn register_name_server_period() -> u64 {
172        1000 * 30
173    }
174
175    pub fn namesrv_addr() -> Option<CheetahString> {
176        NAMESRV_ADDR.clone().map(|addr| addr.into())
177    }
178
179    pub fn lite_pull_message_enable() -> bool {
180        true
181    }
182
183    pub fn auto_create_subscription_group() -> bool {
184        true
185    }
186
187    pub fn channel_expired_timeout() -> u64 {
188        1000 * 120
189    }
190
191    pub fn subscription_expired_timeout() -> u64 {
192        1000 * 60 * 10
193    }
194
195    pub fn use_server_side_reset_offset() -> bool {
196        true
197    }
198
199    pub fn consumer_offset_update_version_step() -> i64 {
200        500
201    }
202
203    pub fn enable_broadcast_offset_store() -> bool {
204        true
205    }
206
207    pub fn transfer_msg_by_heap() -> bool {
208        true
209    }
210
211    pub fn short_polling_time_mills() -> u64 {
212        1000
213    }
214
215    pub fn long_polling_enable() -> bool {
216        true
217    }
218
219    pub fn max_error_rate_of_bloom_filter() -> i32 {
220        20
221    }
222
223    pub fn expect_consumer_num_use_filter() -> i32 {
224        32
225    }
226
227    pub fn bit_map_length_consume_queue_ext() -> i32 {
228        64
229    }
230
231    pub fn forward_timeout() -> u64 {
232        3 * 1000
233    }
234
235    pub fn validate_system_topic_when_update_topic() -> bool {
236        true
237    }
238
239    pub fn store_reply_message_enable() -> bool {
240        true
241    }
242
243    pub fn transaction_timeout() -> u64 {
244        6_000
245    }
246
247    pub fn transaction_op_msg_max_size() -> i32 {
248        4096
249    }
250
251    pub fn default_message_request_mode() -> MessageRequestMode {
252        MessageRequestMode::Pull
253    }
254
255    pub fn default_pop_share_queue_num() -> i32 {
256        -1
257    }
258
259    pub fn load_balance_poll_name_server_interval() -> u64 {
260        30_000
261    }
262
263    pub fn server_load_balancer_enable() -> bool {
264        true
265    }
266
267    pub fn retrieve_message_from_pop_retry_topic_v1() -> bool {
268        true
269    }
270
271    pub fn pop_from_retry_probability() -> i32 {
272        20
273    }
274
275    pub fn init_pop_offset_by_check_msg_in_mem() -> bool {
276        true
277    }
278
279    pub fn pop_ck_stay_buffer_time_out() -> u64 {
280        3_000
281    }
282
283    pub fn pop_ck_stay_buffer_time() -> u64 {
284        10_000
285    }
286
287    pub fn broker_role() -> BrokerRole {
288        BrokerRole::AsyncMaster
289    }
290
291    pub fn revive_interval() -> u64 {
292        1000
293    }
294
295    pub fn revive_max_slow() -> u64 {
296        3
297    }
298
299    pub fn revive_scan_time() -> u64 {
300        10_000
301    }
302
303    pub fn commercial_base_count() -> i32 {
304        1
305    }
306
307    pub fn broker_not_active_timeout_millis() -> i64 {
308        10_000
309    }
310
311    pub fn sync_broker_member_group_period() -> u64 {
312        1_000
313    }
314
315    pub fn pop_polling_map_size() -> usize {
316        100000
317    }
318
319    pub fn max_pop_polling_size() -> u64 {
320        100000
321    }
322
323    pub fn pop_polling_size() -> usize {
324        1024
325    }
326
327    pub fn pop_inflight_message_threshold() -> i64 {
328        10_000
329    }
330
331    pub fn pop_ck_max_buffer_size() -> i64 {
332        200_000
333    }
334
335    pub fn pop_ck_offset_max_queue_size() -> u64 {
336        20_000
337    }
338
339    pub fn delay_offset_update_version_step() -> u64 {
340        200
341    }
342
343    pub fn revive_ack_wait_ms() -> u64 {
344        Duration::from_secs(3 * 60).as_millis() as u64
345    }
346
347    pub fn os_page_cache_busy_timeout_mills() -> u64 {
348        1000
349    }
350
351    pub fn default_topic_queue_nums() -> u32 {
352        8
353    }
354    pub fn transaction_check_interval() -> u64 {
355        30_000
356    }
357
358    pub fn transaction_check_max() -> u32 {
359        15
360    }
361
362    pub fn transaction_op_batch_interval() -> u64 {
363        3000
364    }
365
366    pub fn compatible_with_old_name_srv() -> bool {
367        true
368    }
369
370    #[inline]
371    pub fn broker_heartbeat_interval() -> u64 {
372        1000
373    }
374}
375
376#[derive(Debug, Serialize, Deserialize, Clone)]
377#[serde(rename_all = "camelCase")]
378pub struct BrokerIdentity {
379    #[serde(default = "defaults::broker_name")]
380    pub broker_name: CheetahString,
381
382    #[serde(default = "defaults::broker_cluster_name")]
383    pub broker_cluster_name: CheetahString,
384
385    #[serde(default = "defaults::broker_id")]
386    pub broker_id: u64,
387
388    #[serde(default)]
389    pub is_broker_container: bool,
390
391    #[serde(default)]
392    pub is_in_broker_container: bool,
393}
394
395impl Default for BrokerIdentity {
396    fn default() -> Self {
397        BrokerIdentity::new()
398    }
399}
400
401impl BrokerIdentity {
402    pub fn new() -> Self {
403        let broker_name = default_broker_name();
404        let broker_cluster_name = String::from(DEFAULT_CLUSTER_NAME);
405        let broker_id = mix_all::MASTER_ID;
406        let is_broker_container = false;
407
408        BrokerIdentity {
409            broker_name: CheetahString::from_string(broker_name),
410            broker_cluster_name: CheetahString::from_string(broker_cluster_name),
411            broker_id,
412            is_broker_container,
413            is_in_broker_container: false,
414        }
415    }
416
417    fn new_with_container(is_broker_container: bool) -> Self {
418        let mut identity = BrokerIdentity::new();
419        identity.is_broker_container = is_broker_container;
420        identity
421    }
422
423    fn new_with_params(broker_cluster_name: String, broker_name: String, broker_id: u64) -> Self {
424        BrokerIdentity {
425            broker_name: CheetahString::from_string(broker_name),
426            broker_cluster_name: CheetahString::from_string(broker_cluster_name),
427            broker_id,
428            is_broker_container: false,
429            is_in_broker_container: false,
430        }
431    }
432
433    fn new_with_container_params(
434        broker_cluster_name: String,
435        broker_name: String,
436        broker_id: u64,
437        is_in_broker_container: bool,
438    ) -> Self {
439        BrokerIdentity {
440            broker_name: CheetahString::from_string(broker_name),
441            broker_cluster_name: CheetahString::from_string(broker_cluster_name),
442            broker_id,
443            is_broker_container: true,
444            is_in_broker_container,
445        }
446    }
447
448    pub fn get_canonical_name(&self) -> String {
449        match self.is_broker_container {
450            true => "BrokerContainer".to_string(),
451            false => {
452                format!(
453                    "{}_{}_{}",
454                    self.broker_cluster_name, self.broker_name, self.broker_id
455                )
456            }
457        }
458    }
459}
460
461#[derive(Debug, Serialize, Deserialize, Clone)]
462#[serde(rename_all = "camelCase")]
463pub struct BrokerConfig {
464    #[serde(default = "defaults::broker_identity")]
465    pub broker_identity: BrokerIdentity,
466
467    #[serde(default = "defaults::topic_queue_config")]
468    pub topic_queue_config: TopicQueueConfig,
469
470    #[serde(default = "defaults::timer_wheel_config")]
471    pub timer_wheel_config: TimerWheelConfig,
472
473    #[serde(default = "defaults::broker_server_config")]
474    pub broker_server_config: ServerConfig,
475
476    #[serde(default = "defaults::broker_ip1")]
477    pub broker_ip1: CheetahString,
478
479    #[serde(default = "defaults::broker_ip2")]
480    pub broker_ip2: Option<CheetahString>,
481
482    #[serde(default = "defaults::listen_port")]
483    pub listen_port: u32,
484
485    #[serde(default)]
486    pub trace_topic_enable: bool,
487
488    #[serde(default = "defaults::msg_trace_topic_name")]
489    pub msg_trace_topic_name: CheetahString,
490
491    #[serde(default)]
492    pub enable_controller_mode: bool,
493
494    #[serde(default = "defaults::broker_name")]
495    pub broker_name: CheetahString,
496
497    #[serde(default = "defaults::region_id")]
498    pub region_id: CheetahString,
499
500    #[serde(default = "defaults::trace_on")]
501    pub trace_on: bool,
502
503    #[serde(default = "defaults::broker_permission")]
504    pub broker_permission: u32,
505
506    #[serde(default)]
507    pub async_send_enable: bool, //not used in rust version,only for Java compatibility
508
509    #[serde(default = "defaults::store_path_root_dir")]
510    pub store_path_root_dir: CheetahString,
511
512    #[serde(default)]
513    pub enable_split_registration: bool,
514
515    #[serde(default = "defaults::split_registration_size")]
516    pub split_registration_size: i32,
517
518    #[serde(default = "defaults::register_broker_timeout_mills")]
519    pub register_broker_timeout_mills: i32,
520
521    #[serde(default)]
522    pub is_in_broker_container: bool,
523
524    #[serde(default = "defaults::commercial_size_per_msg")]
525    pub commercial_size_per_msg: i32,
526
527    #[serde(default)]
528    pub recover_concurrently: bool,
529
530    #[serde(default)]
531    pub duplication_enable: bool,
532
533    #[serde(default)]
534    pub start_accept_send_request_time_stamp: i64,
535
536    #[serde(default = "defaults::auto_create_topic_enable")]
537    pub auto_create_topic_enable: bool,
538
539    #[serde(default = "defaults::enable_single_topic_register")]
540    pub enable_single_topic_register: bool,
541
542    #[serde(default = "defaults::broker_topic_enable")]
543    pub broker_topic_enable: bool,
544
545    #[serde(default = "defaults::cluster_topic_enable")]
546    pub cluster_topic_enable: bool,
547
548    #[serde(default = "defaults::revive_queue_num")]
549    pub revive_queue_num: u32,
550
551    #[serde(default)]
552    pub enable_slave_acting_master: bool,
553
554    #[serde(default)]
555    pub reject_transaction_message: bool,
556
557    #[serde(default = "defaults::enable_detail_stat")]
558    pub enable_detail_stat: bool,
559
560    #[serde(default = "defaults::flush_consumer_offset_interval")]
561    pub flush_consumer_offset_interval: u64,
562
563    #[serde(default = "defaults::force_register")]
564    pub force_register: bool,
565
566    #[serde(default = "defaults::register_name_server_period")]
567    pub register_name_server_period: u64,
568
569    #[serde(default)]
570    pub skip_pre_online: bool,
571
572    #[serde(default = "defaults::namesrv_addr")]
573    pub namesrv_addr: Option<CheetahString>,
574
575    #[serde(default)]
576    pub fetch_name_srv_addr_by_dns_lookup: bool,
577
578    #[serde(default = "defaults::lite_pull_message_enable")]
579    pub lite_pull_message_enable: bool,
580
581    #[serde(default = "defaults::auto_create_subscription_group")]
582    pub auto_create_subscription_group: bool,
583
584    #[serde(default = "defaults::channel_expired_timeout")]
585    pub channel_expired_timeout: u64,
586
587    #[serde(default = "defaults::subscription_expired_timeout")]
588    pub subscription_expired_timeout: u64,
589
590    #[serde(default)]
591    pub enable_property_filter: bool,
592
593    #[serde(default)]
594    pub filter_support_retry: bool,
595
596    #[serde(default = "defaults::use_server_side_reset_offset")]
597    pub use_server_side_reset_offset: bool,
598
599    #[serde(default)]
600    pub slave_read_enable: bool,
601
602    #[serde(default = "defaults::commercial_base_count")]
603    pub commercial_base_count: i32,
604
605    #[serde(default)]
606    pub reject_pull_consumer_enable: bool,
607
608    #[serde(default = "defaults::consumer_offset_update_version_step")]
609    pub consumer_offset_update_version_step: i64,
610
611    #[serde(default = "defaults::enable_broadcast_offset_store")]
612    pub enable_broadcast_offset_store: bool,
613
614    #[serde(default = "defaults::transfer_msg_by_heap")]
615    pub transfer_msg_by_heap: bool,
616
617    #[serde(default = "defaults::short_polling_time_mills")]
618    pub short_polling_time_mills: u64,
619
620    #[serde(default = "defaults::long_polling_enable")]
621    pub long_polling_enable: bool,
622
623    #[serde(default = "defaults::max_error_rate_of_bloom_filter")]
624    pub max_error_rate_of_bloom_filter: i32,
625
626    #[serde(default = "defaults::expect_consumer_num_use_filter")]
627    pub expect_consumer_num_use_filter: i32,
628
629    #[serde(default = "defaults::bit_map_length_consume_queue_ext")]
630    pub bit_map_length_consume_queue_ext: i32,
631
632    #[serde(default = "defaults::validate_system_topic_when_update_topic")]
633    pub validate_system_topic_when_update_topic: bool,
634
635    #[serde(default)]
636    pub enable_mixed_message_type: bool,
637
638    #[serde(default)]
639    pub auto_delete_unused_stats: bool,
640
641    #[serde(default = "defaults::forward_timeout")]
642    pub forward_timeout: u64,
643
644    #[serde(default = "defaults::store_reply_message_enable")]
645    pub store_reply_message_enable: bool,
646
647    #[serde(default)]
648    pub lock_in_strict_mode: bool,
649
650    #[serde(default = "defaults::transaction_timeout")]
651    pub transaction_timeout: u64,
652
653    #[serde(default = "defaults::transaction_op_msg_max_size")]
654    pub transaction_op_msg_max_size: i32,
655
656    #[serde(default = "defaults::default_message_request_mode")]
657    pub default_message_request_mode: MessageRequestMode,
658
659    #[serde(default = "defaults::default_pop_share_queue_num")]
660    pub default_pop_share_queue_num: i32,
661
662    #[serde(default = "defaults::load_balance_poll_name_server_interval")]
663    pub load_balance_poll_name_server_interval: u64,
664
665    #[serde(default = "defaults::server_load_balancer_enable")]
666    pub server_load_balancer_enable: bool,
667
668    #[serde(default)]
669    pub enable_remote_escape: bool,
670
671    #[serde(default)]
672    pub enable_pop_log: bool,
673
674    #[serde(default)]
675    pub enable_retry_topic_v2: bool,
676
677    #[serde(default = "defaults::retrieve_message_from_pop_retry_topic_v1")]
678    pub retrieve_message_from_pop_retry_topic_v1: bool,
679
680    #[serde(default = "defaults::pop_from_retry_probability")]
681    pub pop_from_retry_probability: i32,
682
683    #[serde(default)]
684    pub pop_response_return_actual_retry_topic: bool,
685
686    #[serde(default = "defaults::init_pop_offset_by_check_msg_in_mem")]
687    pub init_pop_offset_by_check_msg_in_mem: bool,
688
689    #[serde(default)]
690    pub enable_pop_buffer_merge: bool,
691
692    #[serde(default = "defaults::pop_ck_stay_buffer_time_out")]
693    pub pop_ck_stay_buffer_time_out: u64,
694
695    #[serde(default = "defaults::pop_ck_stay_buffer_time")]
696    pub pop_ck_stay_buffer_time: u64,
697
698    #[serde(default = "defaults::broker_role")]
699    pub broker_role: BrokerRole,
700
701    #[serde(default)]
702    pub enable_pop_batch_ack: bool,
703
704    #[serde(default = "defaults::revive_interval")]
705    pub revive_interval: u64,
706
707    #[serde(default = "defaults::revive_max_slow")]
708    pub revive_max_slow: u64,
709
710    #[serde(default = "defaults::revive_scan_time")]
711    pub revive_scan_time: u64,
712
713    #[serde(default)]
714    pub enable_skip_long_awaiting_ack: bool,
715
716    #[serde(default)]
717    pub skip_when_ck_re_put_reach_max_times: bool,
718
719    #[serde(default)]
720    pub compressed_register: bool,
721
722    #[serde(default = "defaults::broker_not_active_timeout_millis")]
723    pub broker_not_active_timeout_millis: i64,
724
725    #[serde(default = "defaults::sync_broker_member_group_period")]
726    pub sync_broker_member_group_period: u64,
727
728    #[serde(default = "defaults::pop_polling_map_size")]
729    pub pop_polling_map_size: usize,
730
731    #[serde(default = "defaults::max_pop_polling_size")]
732    pub max_pop_polling_size: u64,
733
734    #[serde(default = "defaults::pop_polling_size")]
735    pub pop_polling_size: usize,
736
737    #[serde(default)]
738    pub enable_pop_message_threshold: bool,
739
740    #[serde(default = "defaults::pop_inflight_message_threshold")]
741    pub pop_inflight_message_threshold: i64,
742
743    #[serde(default = "defaults::pop_ck_max_buffer_size")]
744    pub pop_ck_max_buffer_size: i64,
745
746    #[serde(default = "defaults::pop_ck_offset_max_queue_size")]
747    pub pop_ck_offset_max_queue_size: u64,
748
749    #[serde(default = "defaults::delay_offset_update_version_step")]
750    pub delay_offset_update_version_step: u64,
751
752    #[serde(default = "defaults::revive_ack_wait_ms")]
753    pub revive_ack_wait_ms: u64,
754
755    #[serde(default)]
756    pub enable_calc_filter_bit_map: bool,
757
758    #[serde(default = "defaults::transaction_check_interval")]
759    pub transaction_check_interval: u64,
760
761    #[serde(default = "defaults::transaction_check_max")]
762    pub transaction_check_max: u32,
763
764    #[serde(default = "defaults::transaction_op_batch_interval")]
765    pub transaction_op_batch_interval: u64,
766
767    #[serde(default = "defaults::compatible_with_old_name_srv")]
768    pub compatible_with_old_name_srv: bool,
769
770    #[serde(default = "defaults::broker_heartbeat_interval")]
771    pub broker_heartbeat_interval: u64,
772}
773
774impl Default for BrokerConfig {
775    fn default() -> Self {
776        let broker_identity = BrokerIdentity::new();
777        let local_ip = local_ip_address::local_ip().unwrap();
778        let broker_ip1 = local_ip.to_string().into();
779        let broker_ip2 = Some(local_ip.to_string().into());
780        let listen_port = 10911;
781
782        BrokerConfig {
783            broker_identity,
784            topic_queue_config: TopicQueueConfig::default(),
785            timer_wheel_config: TimerWheelConfig::default(),
786            broker_server_config: Default::default(),
787            broker_ip1,
788            broker_ip2,
789            listen_port,
790            trace_topic_enable: false,
791            msg_trace_topic_name: CheetahString::from_static_str(
792                TopicValidator::RMQ_SYS_TRACE_TOPIC,
793            ),
794            enable_controller_mode: false,
795            broker_name: default_broker_name().into(),
796            region_id: CheetahString::from_static_str(mix_all::DEFAULT_TRACE_REGION_ID),
797            trace_on: true,
798            broker_permission: PermName::PERM_WRITE | PermName::PERM_READ,
799            async_send_enable: false,
800            store_path_root_dir: dirs::home_dir()
801                .unwrap()
802                .join("store")
803                .to_string_lossy()
804                .into_owned()
805                .into(),
806            enable_split_registration: false,
807            split_registration_size: 800,
808            register_broker_timeout_mills: 24000,
809            is_in_broker_container: false,
810            commercial_size_per_msg: 4 * 1024,
811            recover_concurrently: false,
812            duplication_enable: false,
813            start_accept_send_request_time_stamp: 0,
814            auto_create_topic_enable: true,
815            enable_single_topic_register: true,
816            broker_topic_enable: true,
817            cluster_topic_enable: true,
818            revive_queue_num: 8,
819            enable_slave_acting_master: false,
820            reject_transaction_message: false,
821            enable_detail_stat: true,
822            flush_consumer_offset_interval: 1000 * 5,
823            force_register: true,
824            register_name_server_period: 1000 * 30,
825            skip_pre_online: false,
826            namesrv_addr: NAMESRV_ADDR.clone().map(|addr| addr.into()),
827            fetch_name_srv_addr_by_dns_lookup: false,
828            lite_pull_message_enable: true,
829            auto_create_subscription_group: true,
830            channel_expired_timeout: 1000 * 120,
831            subscription_expired_timeout: 1000 * 60 * 10,
832            enable_property_filter: false,
833            filter_support_retry: false,
834            use_server_side_reset_offset: true,
835            slave_read_enable: false,
836            commercial_base_count: 1,
837            reject_pull_consumer_enable: false,
838            consumer_offset_update_version_step: 500,
839            enable_broadcast_offset_store: true,
840            transfer_msg_by_heap: true,
841            short_polling_time_mills: 1000,
842            long_polling_enable: true,
843            max_error_rate_of_bloom_filter: 20,
844            expect_consumer_num_use_filter: 32,
845            bit_map_length_consume_queue_ext: 64,
846            forward_timeout: 3 * 1000,
847            validate_system_topic_when_update_topic: true,
848            enable_mixed_message_type: false,
849            auto_delete_unused_stats: false,
850            store_reply_message_enable: true,
851            lock_in_strict_mode: false,
852            transaction_timeout: 6_000,
853            transaction_op_msg_max_size: 4096,
854            default_message_request_mode: MessageRequestMode::Pull,
855            default_pop_share_queue_num: -1,
856            load_balance_poll_name_server_interval: 30_000,
857            server_load_balancer_enable: true,
858            enable_remote_escape: false,
859            enable_pop_log: false,
860            enable_retry_topic_v2: false,
861            retrieve_message_from_pop_retry_topic_v1: true,
862            pop_from_retry_probability: 20,
863            pop_response_return_actual_retry_topic: false,
864            init_pop_offset_by_check_msg_in_mem: true,
865            enable_pop_buffer_merge: false,
866            pop_ck_stay_buffer_time_out: 3_000,
867            pop_ck_stay_buffer_time: 10_000,
868            broker_role: BrokerRole::AsyncMaster,
869            enable_pop_batch_ack: false,
870            revive_interval: 1000,
871            revive_max_slow: 3,
872            revive_scan_time: 10_000,
873            enable_skip_long_awaiting_ack: false,
874            skip_when_ck_re_put_reach_max_times: false,
875            compressed_register: false,
876            broker_not_active_timeout_millis: 10_000,
877            sync_broker_member_group_period: 1_000,
878            pop_polling_map_size: 100000,
879            max_pop_polling_size: 100000,
880            pop_polling_size: 1024,
881            enable_pop_message_threshold: false,
882            pop_inflight_message_threshold: 10_000,
883            pop_ck_max_buffer_size: 200_000,
884            pop_ck_offset_max_queue_size: 20_000,
885            delay_offset_update_version_step: 200,
886            revive_ack_wait_ms: Duration::from_secs(3 * 60).as_millis() as u64,
887            enable_calc_filter_bit_map: false,
888            transaction_check_interval: 30_000,
889            transaction_check_max: 15,
890            transaction_op_batch_interval: 3_000,
891            compatible_with_old_name_srv: true,
892            broker_heartbeat_interval: 1000,
893        }
894    }
895}
896
897impl BrokerConfig {
898    pub fn broker_name(&self) -> CheetahString {
899        self.broker_name.clone()
900    }
901
902    pub fn broker_ip1(&self) -> CheetahString {
903        self.broker_ip1.clone()
904    }
905
906    pub fn broker_ip2(&self) -> Option<CheetahString> {
907        self.broker_ip2.clone()
908    }
909
910    pub fn listen_port(&self) -> u32 {
911        self.listen_port
912    }
913
914    pub fn trace_topic_enable(&self) -> bool {
915        self.trace_topic_enable
916    }
917
918    pub fn broker_server_config(&self) -> &ServerConfig {
919        &self.broker_server_config
920    }
921
922    pub fn region_id(&self) -> &str {
923        self.region_id.as_str()
924    }
925
926    #[inline]
927    pub fn broker_permission(&self) -> u32 {
928        self.broker_permission
929    }
930
931    pub fn get_broker_addr(&self) -> String {
932        format!("{}:{}", self.broker_ip1, self.listen_port)
933    }
934
935    pub fn get_start_accept_send_request_time_stamp(&self) -> i64 {
936        self.start_accept_send_request_time_stamp
937    }
938
939    pub fn get_properties(&self) -> HashMap<CheetahString, CheetahString> {
940        let mut properties = HashMap::new();
941        properties.insert("brokerName".into(), self.broker_name.clone());
942        properties.insert(
943            "brokerClusterName".into(),
944            self.broker_identity.broker_cluster_name.clone(),
945        );
946        properties.insert(
947            "brokerId".into(),
948            self.broker_identity.broker_id.to_string().into(),
949        );
950        properties.insert(
951            "isBrokerContainer".into(),
952            self.broker_identity.is_broker_container.to_string().into(),
953        );
954        properties.insert(
955            "defaultTopicQueueNums".into(),
956            self.topic_queue_config
957                .default_topic_queue_nums
958                .to_string()
959                .into(),
960        );
961        properties.insert(
962            "timerWheelEnable".into(),
963            self.timer_wheel_config
964                .timer_wheel_enable
965                .to_string()
966                .into(),
967        );
968        properties.insert(
969            "bindAddress".into(),
970            self.broker_server_config
971                .bind_address
972                .clone()
973                .to_string()
974                .into(),
975        );
976        properties.insert(
977            "brokerIp1".into(),
978            self.broker_ip1.clone().to_string().into(),
979        );
980        properties.insert(
981            "brokerIp2".into(),
982            self.broker_ip2.clone().unwrap_or_default(),
983        );
984        properties.insert("listenPort".into(), self.listen_port.to_string().into());
985        properties.insert(
986            "traceTopicEnable".into(),
987            self.trace_topic_enable.to_string().into(),
988        );
989        properties.insert(
990            "msgTraceTopicName".into(),
991            self.msg_trace_topic_name.clone(),
992        );
993        properties.insert(
994            "enableControllerMode".into(),
995            self.enable_controller_mode.to_string().into(),
996        );
997        properties.insert("regionId".into(), self.region_id.clone());
998        properties.insert("brokerName".into(), self.broker_name.clone());
999        properties.insert("traceOn".into(), self.trace_on.to_string().into());
1000        properties.insert(
1001            "brokerPermission".into(),
1002            self.broker_permission.to_string().into(),
1003        );
1004        properties.insert(
1005            "asyncSendEnable".into(),
1006            self.async_send_enable.to_string().into(),
1007        );
1008        properties.insert("storePathRootDir".into(), self.store_path_root_dir.clone());
1009        properties.insert(
1010            "enableSplitRegistration".into(),
1011            self.enable_split_registration.to_string().into(),
1012        );
1013        properties.insert(
1014            "splitRegistrationSize".into(),
1015            self.split_registration_size.to_string().into(),
1016        );
1017        properties.insert(
1018            "registerBrokerTimeoutMills".into(),
1019            self.register_broker_timeout_mills.to_string().into(),
1020        );
1021        properties.insert(
1022            "isInBrokerContainer".into(),
1023            self.is_in_broker_container.to_string().into(),
1024        );
1025        properties.insert(
1026            "commercialSizePerMsg".into(),
1027            self.commercial_size_per_msg.to_string().into(),
1028        );
1029        properties.insert(
1030            "recoverConcurrently".into(),
1031            self.recover_concurrently.to_string().into(),
1032        );
1033        properties.insert(
1034            "duplicationEnable".into(),
1035            self.duplication_enable.to_string().into(),
1036        );
1037        properties.insert(
1038            "startAcceptSendRequestTimeStamp".into(),
1039            self.start_accept_send_request_time_stamp.to_string().into(),
1040        );
1041        properties.insert(
1042            "autoCreateTopicEnable".into(),
1043            self.auto_create_topic_enable.to_string().into(),
1044        );
1045        properties.insert(
1046            "enableSingleTopicRegister".into(),
1047            self.enable_single_topic_register.to_string().into(),
1048        );
1049        properties.insert(
1050            "brokerTopicEnable".into(),
1051            self.broker_topic_enable.to_string().into(),
1052        );
1053        properties.insert(
1054            "clusterTopicEnable".into(),
1055            self.cluster_topic_enable.to_string().into(),
1056        );
1057        properties.insert(
1058            "reviveQueueNum".into(),
1059            self.revive_queue_num.to_string().into(),
1060        );
1061        properties.insert(
1062            "enableSlaveActingMaster".into(),
1063            self.enable_slave_acting_master.to_string().into(),
1064        );
1065        properties.insert(
1066            "rejectTransactionMessage".into(),
1067            self.reject_transaction_message.to_string().into(),
1068        );
1069        properties.insert(
1070            "enableDetailStat".into(),
1071            self.enable_detail_stat.to_string().into(),
1072        );
1073        properties.insert(
1074            "flushConsumerOffsetInterval".into(),
1075            self.flush_consumer_offset_interval.to_string().into(),
1076        );
1077        properties.insert(
1078            "forceRegister".into(),
1079            self.force_register.to_string().into(),
1080        );
1081        properties.insert(
1082            "registerNameServerPeriod".into(),
1083            self.register_name_server_period.to_string().into(),
1084        );
1085        properties.insert(
1086            "skipPreOnline".into(),
1087            self.skip_pre_online.to_string().into(),
1088        );
1089        properties.insert(
1090            "namesrvAddr".into(),
1091            self.namesrv_addr.clone().unwrap_or_default(),
1092        );
1093        properties.insert(
1094            "fetchNameSrvAddrByDnsLookup".into(),
1095            self.fetch_name_srv_addr_by_dns_lookup.to_string().into(),
1096        );
1097        properties.insert(
1098            "litePullMessageEnable".into(),
1099            self.lite_pull_message_enable.to_string().into(),
1100        );
1101        properties.insert(
1102            "autoCreateSubscriptionGroup".into(),
1103            self.auto_create_subscription_group.to_string().into(),
1104        );
1105        properties.insert(
1106            "channelExpiredTimeout".into(),
1107            self.channel_expired_timeout.to_string().into(),
1108        );
1109        properties.insert(
1110            "subscriptionExpiredTimeout".into(),
1111            self.subscription_expired_timeout.to_string().into(),
1112        );
1113        properties.insert(
1114            "enablePropertyFilter".into(),
1115            self.enable_property_filter.to_string().into(),
1116        );
1117        properties.insert(
1118            "filterSupportRetry".into(),
1119            self.filter_support_retry.to_string().into(),
1120        );
1121        properties.insert(
1122            "useServerSideResetOffset".into(),
1123            self.use_server_side_reset_offset.to_string().into(),
1124        );
1125        properties.insert(
1126            "slaveReadEnable".into(),
1127            self.slave_read_enable.to_string().into(),
1128        );
1129        properties.insert(
1130            "commercialBaseCount".into(),
1131            self.commercial_base_count.to_string().into(),
1132        );
1133        properties.insert(
1134            "rejectPullConsumerEnable".into(),
1135            self.reject_pull_consumer_enable.to_string().into(),
1136        );
1137        properties.insert(
1138            "consumerOffsetUpdateVersionStep".into(),
1139            self.consumer_offset_update_version_step.to_string().into(),
1140        );
1141        properties.insert(
1142            "enableBroadcastOffsetStore".into(),
1143            self.enable_broadcast_offset_store.to_string().into(),
1144        );
1145        properties.insert(
1146            "transferMsgByHeap".into(),
1147            self.transfer_msg_by_heap.to_string().into(),
1148        );
1149        properties.insert(
1150            "shortPollingTimeMills".into(),
1151            self.short_polling_time_mills.to_string().into(),
1152        );
1153        properties.insert(
1154            "longPollingEnable".into(),
1155            self.long_polling_enable.to_string().into(),
1156        );
1157        properties.insert(
1158            "maxErrorRateOfBloomFilter".into(),
1159            self.max_error_rate_of_bloom_filter.to_string().into(),
1160        );
1161        properties.insert(
1162            "expectConsumerNumUseFilter".into(),
1163            self.expect_consumer_num_use_filter.to_string().into(),
1164        );
1165        properties.insert(
1166            "bitMapLengthConsumeQueueExt".into(),
1167            self.bit_map_length_consume_queue_ext.to_string().into(),
1168        );
1169        properties.insert(
1170            "validateSystemTopicWhenUpdateTopic".into(),
1171            self.validate_system_topic_when_update_topic
1172                .to_string()
1173                .into(),
1174        );
1175        properties.insert(
1176            "enableMixedMessageType".into(),
1177            self.enable_mixed_message_type.to_string().into(),
1178        );
1179        properties.insert(
1180            "autoDeleteUnusedStats".into(),
1181            self.auto_delete_unused_stats.to_string().into(),
1182        );
1183        properties.insert(
1184            "forwardTimeout".into(),
1185            self.forward_timeout.to_string().into(),
1186        );
1187        properties
1188    }
1189}
1190
1191pub fn default_broker_name() -> String {
1192    LOCAL_HOST_NAME
1193        .clone()
1194        .unwrap_or_else(|| "DEFAULT_BROKER".to_string())
1195}
1196
1197#[derive(Debug, Serialize, Deserialize, Clone)]
1198#[serde(rename_all = "camelCase")]
1199pub struct TopicQueueConfig {
1200    #[serde(default = "defaults::default_topic_queue_nums")]
1201    pub default_topic_queue_nums: u32,
1202}
1203
1204impl Default for TopicQueueConfig {
1205    fn default() -> Self {
1206        TopicQueueConfig {
1207            default_topic_queue_nums: 8,
1208        }
1209    }
1210}
1211
1212#[derive(Debug, Serialize, Deserialize, Default, Clone)]
1213#[serde(rename_all = "camelCase")]
1214pub struct TimerWheelConfig {
1215    #[serde(default)]
1216    pub timer_wheel_enable: bool,
1217}