rocketmq_client_rust/base/
client_config.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::env;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21use std::time::Duration;
22
23use cheetah_string::CheetahString;
24use rocketmq_common::common::message::message_queue::MessageQueue;
25use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils;
26use rocketmq_common::utils::name_server_address_utils::NAMESRV_ENDPOINT_PATTERN;
27use rocketmq_common::utils::network_util::NetworkUtil;
28use rocketmq_common::utils::string_utils::StringUtils;
29use rocketmq_common::TimeUtils::get_current_nano;
30use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
31use rocketmq_remoting::protocol::request_type::RequestType;
32use rocketmq_remoting::protocol::LanguageCode;
33
34use crate::base::access_channel::AccessChannel;
35
36#[derive(Clone)]
37pub struct ClientConfig {
38    pub namesrv_addr: Option<CheetahString>,
39    pub client_ip: Option<CheetahString>,
40    pub instance_name: CheetahString,
41    pub client_callback_executor_threads: usize,
42    pub namespace: Option<CheetahString>,
43    pub namespace_initialized: Arc<AtomicBool>,
44    pub namespace_v2: Option<CheetahString>,
45    pub access_channel: AccessChannel,
46    pub poll_name_server_interval: u32,
47    pub heartbeat_broker_interval: u32,
48    pub persist_consumer_offset_interval: u32,
49    pub pull_time_delay_millis_when_exception: u32,
50    pub unit_mode: bool,
51    pub unit_name: Option<CheetahString>,
52    pub decode_read_body: bool,
53    pub decode_decompress_body: bool,
54    pub vip_channel_enabled: bool,
55    pub use_heartbeat_v2: bool,
56    pub use_tls: bool,
57    pub socks_proxy_config: CheetahString,
58    pub mq_client_api_timeout: u64,
59    pub detect_timeout: u32,
60    pub detect_interval: u32,
61    pub language: LanguageCode,
62    pub enable_stream_request_type: bool,
63    pub send_latency_enable: bool,
64    pub start_detector_enable: bool,
65    pub enable_heartbeat_channel_event_listener: bool,
66    pub enable_trace: bool,
67    pub trace_topic: Option<CheetahString>,
68}
69
70impl Default for ClientConfig {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl ClientConfig {
77    pub const SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY: &'static str =
78        "com.rocketmq.sendMessageWithVIPChannel";
79    pub const SOCKS_PROXY_CONFIG: &'static str = "com.rocketmq.socks.proxy.config";
80    pub const DECODE_READ_BODY: &'static str = "com.rocketmq.read.body";
81    pub const DECODE_DECOMPRESS_BODY: &'static str = "com.rocketmq.decompress.body";
82    pub const SEND_LATENCY_ENABLE: &'static str = "com.rocketmq.sendLatencyEnable";
83    pub const START_DETECTOR_ENABLE: &'static str = "com.rocketmq.startDetectorEnable";
84    pub const HEART_BEAT_V2: &'static str = "com.rocketmq.heartbeat.v2";
85
86    pub fn new() -> Self {
87        ClientConfig {
88            namesrv_addr: NameServerAddressUtils::get_name_server_addresses()
89                .map(|addr| addr.into()),
90            client_ip: NetworkUtil::get_local_address().map(|addr| addr.into()),
91            instance_name: env::var("rocketmq.client.name")
92                .unwrap_or_else(|_| "DEFAULT".to_string())
93                .into(),
94            client_callback_executor_threads: num_cpus::get(),
95            namespace: None,
96            namespace_initialized: Arc::new(AtomicBool::new(false)),
97            namespace_v2: None,
98            access_channel: AccessChannel::Local,
99            poll_name_server_interval: Duration::from_secs(30).as_millis() as u32,
100            heartbeat_broker_interval: Duration::from_secs(30).as_millis() as u32,
101            persist_consumer_offset_interval: Duration::from_secs(5).as_millis() as u32,
102            pull_time_delay_millis_when_exception: 1000,
103            unit_mode: false,
104            unit_name: None,
105            decode_read_body: env::var(Self::DECODE_READ_BODY)
106                .unwrap_or_else(|_| "true".to_string())
107                .parse::<bool>()
108                .unwrap_or(true),
109            decode_decompress_body: env::var(Self::DECODE_DECOMPRESS_BODY)
110                .unwrap_or_else(|_| "true".to_string())
111                .parse::<bool>()
112                .unwrap_or(true),
113            vip_channel_enabled: env::var(Self::SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY)
114                .unwrap_or_else(|_| "false".to_string())
115                .parse::<bool>()
116                .unwrap_or(false),
117            use_heartbeat_v2: env::var(Self::HEART_BEAT_V2)
118                .unwrap_or_else(|_| "false".to_string())
119                .parse::<bool>()
120                .unwrap_or(false),
121            use_tls: false,
122            socks_proxy_config: env::var(Self::SOCKS_PROXY_CONFIG)
123                .unwrap_or_else(|_| "{}".to_string())
124                .into(),
125            mq_client_api_timeout: Duration::from_secs(3).as_millis() as u64,
126            detect_timeout: 200,
127            detect_interval: Duration::from_secs(2).as_millis() as u32,
128            language: LanguageCode::RUST,
129            enable_stream_request_type: false,
130            send_latency_enable: env::var(Self::SEND_LATENCY_ENABLE)
131                .unwrap_or_else(|_| "false".to_string())
132                == "false",
133            start_detector_enable: env::var(Self::START_DETECTOR_ENABLE)
134                .unwrap_or_else(|_| "false".to_string())
135                == "false",
136            enable_heartbeat_channel_event_listener: true,
137            enable_trace: false,
138            trace_topic: None,
139        }
140    }
141}
142
143impl ClientConfig {
144    #[inline]
145    pub fn with_namespace(&mut self, resource: &str) -> CheetahString {
146        NamespaceUtil::wrap_namespace(self.get_namespace().unwrap_or_default().as_str(), resource)
147            .into()
148    }
149
150    #[inline]
151    pub fn queue_with_namespace(&mut self, mut queue: MessageQueue) -> MessageQueue {
152        if let Some(namespace) = self.get_namespace() {
153            if !namespace.is_empty() {
154                let topic = CheetahString::from_string(NamespaceUtil::wrap_namespace(
155                    namespace.as_str(),
156                    queue.get_topic(),
157                ));
158                queue.set_topic(topic);
159                return queue;
160            }
161        }
162        queue
163    }
164
165    #[inline]
166    pub fn get_namespace(&mut self) -> Option<CheetahString> {
167        let namespace_initialized = self.namespace_initialized.load(Ordering::Acquire);
168        if namespace_initialized {
169            return self.namespace.clone();
170        }
171
172        if let Some(ref namespace) = self.namespace {
173            return Some(namespace.clone());
174        }
175
176        if let Some(ref namesrv_addr) = self.namesrv_addr {
177            if NameServerAddressUtils::validate_instance_endpoint(namesrv_addr.as_ref()) {
178                self.namespace =
179                    NameServerAddressUtils::parse_instance_id_from_endpoint(namesrv_addr.as_ref())
180                        .map(|id| id.into());
181            }
182        }
183        self.namespace_initialized.store(true, Ordering::Release);
184        self.namespace.clone()
185    }
186
187    #[inline]
188    pub fn change_instance_name_to_pid(&mut self) {
189        if self.instance_name == "DEFAULT" {
190            self.instance_name = format!("{}#{}", std::process::id(), get_current_nano()).into();
191        }
192    }
193    #[inline]
194    pub fn set_instance_name(&mut self, instance_name: CheetahString) {
195        self.instance_name = instance_name;
196    }
197    #[inline]
198    pub fn set_namesrv_addr(&mut self, namesrv_addr: CheetahString) {
199        self.namesrv_addr = Some(namesrv_addr);
200        self.namespace_initialized.store(false, Ordering::Release);
201    }
202
203    #[inline]
204    pub fn build_mq_client_id(&self) -> String {
205        let mut sb = String::new();
206        sb.push_str(self.client_ip.as_ref().unwrap());
207
208        sb.push('@');
209        sb.push_str(self.instance_name.as_str());
210        if let Some(unit_name) = &self.unit_name {
211            if !unit_name.is_empty() {
212                sb.push('@');
213                sb.push_str(unit_name.as_str());
214            }
215        }
216
217        if self.enable_stream_request_type {
218            sb.push('@');
219            sb.push_str(RequestType::Stream.to_string().as_str());
220        }
221
222        sb
223    }
224
225    #[inline]
226    pub fn get_namesrv_addr(&self) -> Option<CheetahString> {
227        if StringUtils::is_not_empty_str(self.namesrv_addr.as_deref())
228            && NAMESRV_ENDPOINT_PATTERN.is_match(self.namesrv_addr.as_ref().unwrap().as_str())
229        {
230            NameServerAddressUtils::get_name_srv_addr_from_namesrv_endpoint(
231                self.namesrv_addr.as_ref().unwrap().as_str(),
232            )
233            .map(|addr| addr.into())
234        } else {
235            self.namesrv_addr.clone()
236        }
237    }
238}