rocketmq_client_rust/base/
client_config.rs1use std::env;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21use std::time::Duration;
22
23use cheetah_string::CheetahString;
24use rocketmq_common::common::message::message_queue::MessageQueue;
25use rocketmq_common::utils::name_server_address_utils::NameServerAddressUtils;
26use rocketmq_common::utils::name_server_address_utils::NAMESRV_ENDPOINT_PATTERN;
27use rocketmq_common::utils::network_util::NetworkUtil;
28use rocketmq_common::utils::string_utils::StringUtils;
29use rocketmq_common::TimeUtils::get_current_nano;
30use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
31use rocketmq_remoting::protocol::request_type::RequestType;
32use rocketmq_remoting::protocol::LanguageCode;
33
34use crate::base::access_channel::AccessChannel;
35
36#[derive(Clone)]
37pub struct ClientConfig {
38 pub namesrv_addr: Option<CheetahString>,
39 pub client_ip: Option<CheetahString>,
40 pub instance_name: CheetahString,
41 pub client_callback_executor_threads: usize,
42 pub namespace: Option<CheetahString>,
43 pub namespace_initialized: Arc<AtomicBool>,
44 pub namespace_v2: Option<CheetahString>,
45 pub access_channel: AccessChannel,
46 pub poll_name_server_interval: u32,
47 pub heartbeat_broker_interval: u32,
48 pub persist_consumer_offset_interval: u32,
49 pub pull_time_delay_millis_when_exception: u32,
50 pub unit_mode: bool,
51 pub unit_name: Option<CheetahString>,
52 pub decode_read_body: bool,
53 pub decode_decompress_body: bool,
54 pub vip_channel_enabled: bool,
55 pub use_heartbeat_v2: bool,
56 pub use_tls: bool,
57 pub socks_proxy_config: CheetahString,
58 pub mq_client_api_timeout: u64,
59 pub detect_timeout: u32,
60 pub detect_interval: u32,
61 pub language: LanguageCode,
62 pub enable_stream_request_type: bool,
63 pub send_latency_enable: bool,
64 pub start_detector_enable: bool,
65 pub enable_heartbeat_channel_event_listener: bool,
66 pub enable_trace: bool,
67 pub trace_topic: Option<CheetahString>,
68}
69
70impl Default for ClientConfig {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl ClientConfig {
77 pub const SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY: &'static str =
78 "com.rocketmq.sendMessageWithVIPChannel";
79 pub const SOCKS_PROXY_CONFIG: &'static str = "com.rocketmq.socks.proxy.config";
80 pub const DECODE_READ_BODY: &'static str = "com.rocketmq.read.body";
81 pub const DECODE_DECOMPRESS_BODY: &'static str = "com.rocketmq.decompress.body";
82 pub const SEND_LATENCY_ENABLE: &'static str = "com.rocketmq.sendLatencyEnable";
83 pub const START_DETECTOR_ENABLE: &'static str = "com.rocketmq.startDetectorEnable";
84 pub const HEART_BEAT_V2: &'static str = "com.rocketmq.heartbeat.v2";
85
86 pub fn new() -> Self {
87 ClientConfig {
88 namesrv_addr: NameServerAddressUtils::get_name_server_addresses()
89 .map(|addr| addr.into()),
90 client_ip: NetworkUtil::get_local_address().map(|addr| addr.into()),
91 instance_name: env::var("rocketmq.client.name")
92 .unwrap_or_else(|_| "DEFAULT".to_string())
93 .into(),
94 client_callback_executor_threads: num_cpus::get(),
95 namespace: None,
96 namespace_initialized: Arc::new(AtomicBool::new(false)),
97 namespace_v2: None,
98 access_channel: AccessChannel::Local,
99 poll_name_server_interval: Duration::from_secs(30).as_millis() as u32,
100 heartbeat_broker_interval: Duration::from_secs(30).as_millis() as u32,
101 persist_consumer_offset_interval: Duration::from_secs(5).as_millis() as u32,
102 pull_time_delay_millis_when_exception: 1000,
103 unit_mode: false,
104 unit_name: None,
105 decode_read_body: env::var(Self::DECODE_READ_BODY)
106 .unwrap_or_else(|_| "true".to_string())
107 .parse::<bool>()
108 .unwrap_or(true),
109 decode_decompress_body: env::var(Self::DECODE_DECOMPRESS_BODY)
110 .unwrap_or_else(|_| "true".to_string())
111 .parse::<bool>()
112 .unwrap_or(true),
113 vip_channel_enabled: env::var(Self::SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY)
114 .unwrap_or_else(|_| "false".to_string())
115 .parse::<bool>()
116 .unwrap_or(false),
117 use_heartbeat_v2: env::var(Self::HEART_BEAT_V2)
118 .unwrap_or_else(|_| "false".to_string())
119 .parse::<bool>()
120 .unwrap_or(false),
121 use_tls: false,
122 socks_proxy_config: env::var(Self::SOCKS_PROXY_CONFIG)
123 .unwrap_or_else(|_| "{}".to_string())
124 .into(),
125 mq_client_api_timeout: Duration::from_secs(3).as_millis() as u64,
126 detect_timeout: 200,
127 detect_interval: Duration::from_secs(2).as_millis() as u32,
128 language: LanguageCode::RUST,
129 enable_stream_request_type: false,
130 send_latency_enable: env::var(Self::SEND_LATENCY_ENABLE)
131 .unwrap_or_else(|_| "false".to_string())
132 == "false",
133 start_detector_enable: env::var(Self::START_DETECTOR_ENABLE)
134 .unwrap_or_else(|_| "false".to_string())
135 == "false",
136 enable_heartbeat_channel_event_listener: true,
137 enable_trace: false,
138 trace_topic: None,
139 }
140 }
141}
142
143impl ClientConfig {
144 #[inline]
145 pub fn with_namespace(&mut self, resource: &str) -> CheetahString {
146 NamespaceUtil::wrap_namespace(self.get_namespace().unwrap_or_default().as_str(), resource)
147 .into()
148 }
149
150 #[inline]
151 pub fn queue_with_namespace(&mut self, mut queue: MessageQueue) -> MessageQueue {
152 if let Some(namespace) = self.get_namespace() {
153 if !namespace.is_empty() {
154 let topic = CheetahString::from_string(NamespaceUtil::wrap_namespace(
155 namespace.as_str(),
156 queue.get_topic(),
157 ));
158 queue.set_topic(topic);
159 return queue;
160 }
161 }
162 queue
163 }
164
165 #[inline]
166 pub fn get_namespace(&mut self) -> Option<CheetahString> {
167 let namespace_initialized = self.namespace_initialized.load(Ordering::Acquire);
168 if namespace_initialized {
169 return self.namespace.clone();
170 }
171
172 if let Some(ref namespace) = self.namespace {
173 return Some(namespace.clone());
174 }
175
176 if let Some(ref namesrv_addr) = self.namesrv_addr {
177 if NameServerAddressUtils::validate_instance_endpoint(namesrv_addr.as_ref()) {
178 self.namespace =
179 NameServerAddressUtils::parse_instance_id_from_endpoint(namesrv_addr.as_ref())
180 .map(|id| id.into());
181 }
182 }
183 self.namespace_initialized.store(true, Ordering::Release);
184 self.namespace.clone()
185 }
186
187 #[inline]
188 pub fn change_instance_name_to_pid(&mut self) {
189 if self.instance_name == "DEFAULT" {
190 self.instance_name = format!("{}#{}", std::process::id(), get_current_nano()).into();
191 }
192 }
193 #[inline]
194 pub fn set_instance_name(&mut self, instance_name: CheetahString) {
195 self.instance_name = instance_name;
196 }
197 #[inline]
198 pub fn set_namesrv_addr(&mut self, namesrv_addr: CheetahString) {
199 self.namesrv_addr = Some(namesrv_addr);
200 self.namespace_initialized.store(false, Ordering::Release);
201 }
202
203 #[inline]
204 pub fn build_mq_client_id(&self) -> String {
205 let mut sb = String::new();
206 sb.push_str(self.client_ip.as_ref().unwrap());
207
208 sb.push('@');
209 sb.push_str(self.instance_name.as_str());
210 if let Some(unit_name) = &self.unit_name {
211 if !unit_name.is_empty() {
212 sb.push('@');
213 sb.push_str(unit_name.as_str());
214 }
215 }
216
217 if self.enable_stream_request_type {
218 sb.push('@');
219 sb.push_str(RequestType::Stream.to_string().as_str());
220 }
221
222 sb
223 }
224
225 #[inline]
226 pub fn get_namesrv_addr(&self) -> Option<CheetahString> {
227 if StringUtils::is_not_empty_str(self.namesrv_addr.as_deref())
228 && NAMESRV_ENDPOINT_PATTERN.is_match(self.namesrv_addr.as_ref().unwrap().as_str())
229 {
230 NameServerAddressUtils::get_name_srv_addr_from_namesrv_endpoint(
231 self.namesrv_addr.as_ref().unwrap().as_str(),
232 )
233 .map(|addr| addr.into())
234 } else {
235 self.namesrv_addr.clone()
236 }
237 }
238}