rocketmq_common/common/
mix_all.rs1use std::collections::HashMap;
19use std::env;
20
21use cheetah_string::CheetahString;
22use once_cell::sync::Lazy;
23
24pub const ROCKETMQ_HOME_ENV: &str = "ROCKETMQ_HOME";
25pub const ROCKETMQ_HOME_PROPERTY: &str = "rocketmq.home.dir";
26pub const NAMESRV_ADDR_ENV: &str = "NAMESRV_ADDR";
27pub const NAMESRV_ADDR_PROPERTY: &str = "rocketmq.rocketmq-namesrv.addr";
28pub const MESSAGE_COMPRESS_TYPE: &str = "rocketmq.message.compressType";
29pub const MESSAGE_COMPRESS_LEVEL: &str = "rocketmq.message.compressLevel";
30pub const DEFAULT_NAMESRV_ADDR_LOOKUP: &str = "jmenv.tbsite.net";
31pub const WS_DOMAIN_NAME: &str = "rocketmq.rocketmq-namesrv.domain";
32pub const DEFAULT_PRODUCER_GROUP: &str = "DEFAULT_PRODUCER";
33pub const DEFAULT_CONSUMER_GROUP: &str = "DEFAULT_CONSUMER";
34pub const TOOLS_CONSUMER_GROUP: &str = "TOOLS_CONSUMER";
35pub const SCHEDULE_CONSUMER_GROUP: &str = "SCHEDULE_CONSUMER";
36pub const FILTERSRV_CONSUMER_GROUP: &str = "FILTERSRV_CONSUMER";
37pub const MONITOR_CONSUMER_GROUP: &str = "__MONITOR_CONSUMER";
38pub const CLIENT_INNER_PRODUCER_GROUP: &str = "CLIENT_INNER_PRODUCER";
39pub const SELF_TEST_PRODUCER_GROUP: &str = "SELF_TEST_P_GROUP";
40pub const SELF_TEST_CONSUMER_GROUP: &str = "SELF_TEST_C_GROUP";
41pub const ONS_HTTP_PROXY_GROUP: &str = "CID_ONS-HTTP-PROXY";
42pub const CID_ONSAPI_PERMISSION_GROUP: &str = "CID_ONSAPI_PERMISSION";
43pub const CID_ONSAPI_OWNER_GROUP: &str = "CID_ONSAPI_OWNER";
44pub const CID_ONSAPI_PULL_GROUP: &str = "CID_ONSAPI_PULL";
45pub const CID_RMQ_SYS_PREFIX: &str = "CID_RMQ_SYS_";
46pub const IS_SUPPORT_HEART_BEAT_V2: &str = "IS_SUPPORT_HEART_BEAT_V2";
47pub const IS_SUB_CHANGE: &str = "IS_SUB_CHANGE";
48pub const DEFAULT_CHARSET: &str = "UTF-8";
49pub const MASTER_ID: u64 = 0;
50pub const FIRST_SLAVE_ID: u64 = 1;
51pub const FIRST_BROKER_CONTROLLER_ID: u64 = 1;
52pub const UNIT_PRE_SIZE_FOR_MSG: i32 = 28;
53pub const ALL_ACK_IN_SYNC_STATE_SET: i32 = -1;
54pub const RETRY_GROUP_TOPIC_PREFIX: &str = "%RETRY%";
55pub const DLQ_GROUP_TOPIC_PREFIX: &str = "%DLQ%";
56pub const REPLY_TOPIC_POSTFIX: &str = "REPLY_TOPIC";
57pub const UNIQUE_MSG_QUERY_FLAG: &str = "_UNIQUE_KEY_QUERY";
58pub const DEFAULT_TRACE_REGION_ID: &str = "DefaultRegion";
59pub const CONSUME_CONTEXT_TYPE: &str = "ConsumeContextType";
60pub const CID_SYS_RMQ_TRANS: &str = "CID_RMQ_SYS_TRANS";
61pub const ACL_CONF_TOOLS_FILE: &str = "/conf/tools.yml";
62pub const REPLY_MESSAGE_FLAG: &str = "reply";
63pub const LMQ_PREFIX: &str = "%LMQ%";
64pub const LMQ_QUEUE_ID: u64 = 0;
65pub const MULTI_DISPATCH_QUEUE_SPLITTER: &str = ",";
66pub const REQ_T: &str = "ReqT";
67pub const ROCKETMQ_ZONE_ENV: &str = "ROCKETMQ_ZONE";
68pub const ROCKETMQ_ZONE_PROPERTY: &str = "rocketmq.zone";
69pub const ROCKETMQ_ZONE_MODE_ENV: &str = "ROCKETMQ_ZONE_MODE";
70pub const ROCKETMQ_ZONE_MODE_PROPERTY: &str = "rocketmq.zone.mode";
71pub const ZONE_NAME: &str = "__ZONE_NAME";
72pub const ZONE_MODE: &str = "__ZONE_MODE";
73pub const LOGICAL_QUEUE_MOCK_BROKER_PREFIX: &str = "__syslo__";
74pub const METADATA_SCOPE_GLOBAL: &str = "__global__";
75pub const LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST: &str = "__syslo__none__";
76pub static MULTI_PATH_SPLITTER: Lazy<String> =
77 Lazy::new(|| env::var("rocketmq.broker.multiPathSplitter").unwrap_or_else(|_| ",".to_string()));
78
79pub fn is_sys_consumer_group(consumer_group: &str) -> bool {
80 consumer_group.starts_with(CID_RMQ_SYS_PREFIX)
81}
82
83pub fn is_sys_consumer_group_for_no_cold_read_limit(consumer_group: &str) -> bool {
84 if consumer_group == DEFAULT_CONSUMER_GROUP
85 || consumer_group == TOOLS_CONSUMER_GROUP
86 || consumer_group == SCHEDULE_CONSUMER_GROUP
87 || consumer_group == FILTERSRV_CONSUMER_GROUP
88 || consumer_group == MONITOR_CONSUMER_GROUP
89 || consumer_group == SELF_TEST_CONSUMER_GROUP
90 || consumer_group == ONS_HTTP_PROXY_GROUP
91 || consumer_group == CID_ONSAPI_PERMISSION_GROUP
92 || consumer_group == CID_ONSAPI_OWNER_GROUP
93 || consumer_group == CID_ONSAPI_PULL_GROUP
94 || consumer_group == CID_SYS_RMQ_TRANS
95 || consumer_group.starts_with(CID_RMQ_SYS_PREFIX)
96 {
97 return true;
98 }
99 false
100}
101
102pub fn get_retry_topic(consumer_group: &str) -> String {
103 format!("{RETRY_GROUP_TOPIC_PREFIX}{consumer_group}")
104}
105
106pub fn get_dlq_topic(consumer_group: &str) -> String {
107 format!("{DLQ_GROUP_TOPIC_PREFIX}{consumer_group}")
108}
109
110pub fn is_lmq(lmq_meta_data: Option<&str>) -> bool {
111 match lmq_meta_data {
112 Some(data) => data.starts_with(LMQ_PREFIX),
113 None => false,
114 }
115}
116
117pub fn get_ws_addr() -> String {
118 let ws_domain_name = env::var("rocketmq.namesrv.domain")
119 .unwrap_or_else(|_| DEFAULT_NAMESRV_ADDR_LOOKUP.to_string());
120 let ws_domain_subgroup =
121 env::var("rocketmq.namesrv.domain.subgroup").unwrap_or_else(|_| "nsaddr".to_string());
122 let mut ws_addr = format!("http://{ws_domain_name}:8080/rocketmq/{ws_domain_subgroup}");
123
124 if ws_domain_name.contains(':') {
125 ws_addr = format!("http://{ws_domain_name}/rocketmq/{ws_domain_subgroup}");
126 }
127
128 ws_addr
129}
130
131pub fn broker_vip_channel(is_broker_vip_channel: bool, broker_addr: &str) -> CheetahString {
132 if is_broker_vip_channel {
133 if let Some(split) = broker_addr.rfind(':') {
134 let ip = &broker_addr[..split];
135 if let Ok(port) = broker_addr[split + 1..].parse::<i32>() {
136 let broker_addr_new = format!("{}:{}", ip, port - 2);
137 return CheetahString::from_string(broker_addr_new);
138 }
139 }
140 }
141 CheetahString::from_slice(broker_addr)
142}
143
144pub fn human_readable_byte_count(bytes: i64, si: bool) -> String {
145 let bytes = bytes as f64;
146 let unit = if si { 1000.0 } else { 1024.0 };
147 if bytes < unit {
148 return format!("{bytes} B");
149 }
150 let exp = (bytes.ln() / unit.ln()).floor() as i32;
151 let pre = ['K', 'M', 'G', 'T', 'P', 'E'][(exp - 1) as usize];
152 let pre = if si {
153 pre.to_string()
154 } else {
155 format!("{pre}i")
156 };
157 format!("{:.1} {}B", bytes / unit.powi(exp), pre)
158}
159
160pub fn string_to_properties(input: &str) -> Option<HashMap<CheetahString, CheetahString>> {
161 let mut properties = HashMap::new();
162
163 for line in input.lines() {
164 let line = line.trim();
165 if line.is_empty() || line.starts_with('#') {
166 continue;
168 }
169
170 if let Some((key, value)) = line.split_once('=') {
171 let key = CheetahString::from(key.trim());
173 let value = CheetahString::from(value.trim());
174 properties.insert(key, value);
175 } else {
176 return None; }
178 }
179
180 Some(properties)
181}
182
183pub fn properties_to_string(properties: &HashMap<CheetahString, CheetahString>) -> CheetahString {
184 properties
185 .iter()
186 .map(|(key, value)| format!("{}={}", key.as_str(), value.as_str()))
187 .collect::<Vec<String>>()
188 .join("\n")
189 .into()
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn identifies_sys_consumer_group() {
198 assert!(is_sys_consumer_group("CID_RMQ_SYS_SOME_GROUP"));
199 assert!(!is_sys_consumer_group("NON_SYS_GROUP"));
200 }
201
202 #[test]
203 fn identifies_sys_consumer_group_for_no_cold_read_limit() {
204 assert!(is_sys_consumer_group_for_no_cold_read_limit(
205 "DEFAULT_CONSUMER"
206 ));
207 assert!(is_sys_consumer_group_for_no_cold_read_limit(
208 "TOOLS_CONSUMER"
209 ));
210 assert!(is_sys_consumer_group_for_no_cold_read_limit(
211 "SCHEDULE_CONSUMER"
212 ));
213 assert!(is_sys_consumer_group_for_no_cold_read_limit(
214 "FILTERSRV_CONSUMER"
215 ));
216 assert!(!is_sys_consumer_group_for_no_cold_read_limit(
217 "MONITOR_CONSUMER"
218 ));
219 assert!(!is_sys_consumer_group_for_no_cold_read_limit(
220 "SELF_TEST_CONSUMER"
221 ));
222 assert!(!is_sys_consumer_group_for_no_cold_read_limit(
223 "ONS_HTTP_PROXY_GROUP"
224 ));
225 assert!(is_sys_consumer_group_for_no_cold_read_limit(
226 "CID_ONSAPI_PERMISSION"
227 ));
228 assert!(is_sys_consumer_group_for_no_cold_read_limit(
229 "CID_ONSAPI_OWNER"
230 ));
231 assert!(is_sys_consumer_group_for_no_cold_read_limit(
232 "CID_ONSAPI_PULL"
233 ));
234 assert!(is_sys_consumer_group_for_no_cold_read_limit(
235 "CID_RMQ_SYS_TRANS"
236 ));
237 assert!(is_sys_consumer_group_for_no_cold_read_limit(
238 "CID_RMQ_SYS_SOME_GROUP"
239 ));
240 assert!(!is_sys_consumer_group_for_no_cold_read_limit(
241 "NON_SYS_GROUP"
242 ));
243 }
244
245 #[test]
246 fn generates_retry_topic_for_consumer_group() {
247 let consumer_group = "test_group";
248 let expected = format!("{}{}", RETRY_GROUP_TOPIC_PREFIX, consumer_group);
249 assert_eq!(get_retry_topic(consumer_group), expected);
250 }
251
252 #[test]
253 fn generates_retry_topic_for_empty_consumer_group() {
254 let consumer_group = "";
255 let expected = RETRY_GROUP_TOPIC_PREFIX.to_string();
256 assert_eq!(get_retry_topic(consumer_group), expected);
257 }
258
259 #[test]
260 fn returns_true_for_lmq_prefixed_metadata() {
261 let lmq_meta_data = Some("%LMQ%SpecificInfo");
262 assert!(is_lmq(lmq_meta_data));
263 }
264
265 #[test]
266 fn returns_false_for_non_lmq_prefixed_metadata() {
267 let lmq_meta_data = Some("NonLMQSpecificInfo");
268 assert!(!is_lmq(lmq_meta_data));
269 }
270 #[test]
271 fn returns_false_for_none_metadata() {
272 assert!(!is_lmq(None));
273 }
274
275 #[test]
276 fn test_string_to_properties_valid_input() {
277 let input = r#"
278 # This is a comment
279 key1=value1
280 key2 = value2
281 key3=value3
282 "#;
283
284 let result = string_to_properties(input).expect("Parsing should succeed");
285 let mut expected = HashMap::new();
286 expected.insert(CheetahString::from("key1"), CheetahString::from("value1"));
287 expected.insert(CheetahString::from("key2"), CheetahString::from("value2"));
288 expected.insert(CheetahString::from("key3"), CheetahString::from("value3"));
289
290 assert_eq!(result, expected);
291 }
292
293 #[test]
294 fn test_string_to_properties_invalid_line() {
295 let input = r#"
296 key1=value1
297 invalid_line
298 "#;
299
300 let result = string_to_properties(input);
301 assert!(result.is_none(), "Parsing should fail for invalid input");
302 }
303}