rocketmq_common/common/
mix_all.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::env;
20
21use cheetah_string::CheetahString;
22use once_cell::sync::Lazy;
23
24pub const ROCKETMQ_HOME_ENV: &str = "ROCKETMQ_HOME";
25pub const ROCKETMQ_HOME_PROPERTY: &str = "rocketmq.home.dir";
26pub const NAMESRV_ADDR_ENV: &str = "NAMESRV_ADDR";
27pub const NAMESRV_ADDR_PROPERTY: &str = "rocketmq.rocketmq-namesrv.addr";
28pub const MESSAGE_COMPRESS_TYPE: &str = "rocketmq.message.compressType";
29pub const MESSAGE_COMPRESS_LEVEL: &str = "rocketmq.message.compressLevel";
30pub const DEFAULT_NAMESRV_ADDR_LOOKUP: &str = "jmenv.tbsite.net";
31pub const WS_DOMAIN_NAME: &str = "rocketmq.rocketmq-namesrv.domain";
32pub const DEFAULT_PRODUCER_GROUP: &str = "DEFAULT_PRODUCER";
33pub const DEFAULT_CONSUMER_GROUP: &str = "DEFAULT_CONSUMER";
34pub const TOOLS_CONSUMER_GROUP: &str = "TOOLS_CONSUMER";
35pub const SCHEDULE_CONSUMER_GROUP: &str = "SCHEDULE_CONSUMER";
36pub const FILTERSRV_CONSUMER_GROUP: &str = "FILTERSRV_CONSUMER";
37pub const MONITOR_CONSUMER_GROUP: &str = "__MONITOR_CONSUMER";
38pub const CLIENT_INNER_PRODUCER_GROUP: &str = "CLIENT_INNER_PRODUCER";
39pub const SELF_TEST_PRODUCER_GROUP: &str = "SELF_TEST_P_GROUP";
40pub const SELF_TEST_CONSUMER_GROUP: &str = "SELF_TEST_C_GROUP";
41pub const ONS_HTTP_PROXY_GROUP: &str = "CID_ONS-HTTP-PROXY";
42pub const CID_ONSAPI_PERMISSION_GROUP: &str = "CID_ONSAPI_PERMISSION";
43pub const CID_ONSAPI_OWNER_GROUP: &str = "CID_ONSAPI_OWNER";
44pub const CID_ONSAPI_PULL_GROUP: &str = "CID_ONSAPI_PULL";
45pub const CID_RMQ_SYS_PREFIX: &str = "CID_RMQ_SYS_";
46pub const IS_SUPPORT_HEART_BEAT_V2: &str = "IS_SUPPORT_HEART_BEAT_V2";
47pub const IS_SUB_CHANGE: &str = "IS_SUB_CHANGE";
48pub const DEFAULT_CHARSET: &str = "UTF-8";
49pub const MASTER_ID: u64 = 0;
50pub const FIRST_SLAVE_ID: u64 = 1;
51pub const FIRST_BROKER_CONTROLLER_ID: u64 = 1;
52pub const UNIT_PRE_SIZE_FOR_MSG: i32 = 28;
53pub const ALL_ACK_IN_SYNC_STATE_SET: i32 = -1;
54pub const RETRY_GROUP_TOPIC_PREFIX: &str = "%RETRY%";
55pub const DLQ_GROUP_TOPIC_PREFIX: &str = "%DLQ%";
56pub const REPLY_TOPIC_POSTFIX: &str = "REPLY_TOPIC";
57pub const UNIQUE_MSG_QUERY_FLAG: &str = "_UNIQUE_KEY_QUERY";
58pub const DEFAULT_TRACE_REGION_ID: &str = "DefaultRegion";
59pub const CONSUME_CONTEXT_TYPE: &str = "ConsumeContextType";
60pub const CID_SYS_RMQ_TRANS: &str = "CID_RMQ_SYS_TRANS";
61pub const ACL_CONF_TOOLS_FILE: &str = "/conf/tools.yml";
62pub const REPLY_MESSAGE_FLAG: &str = "reply";
63pub const LMQ_PREFIX: &str = "%LMQ%";
64pub const LMQ_QUEUE_ID: u64 = 0;
65pub const MULTI_DISPATCH_QUEUE_SPLITTER: &str = ",";
66pub const REQ_T: &str = "ReqT";
67pub const ROCKETMQ_ZONE_ENV: &str = "ROCKETMQ_ZONE";
68pub const ROCKETMQ_ZONE_PROPERTY: &str = "rocketmq.zone";
69pub const ROCKETMQ_ZONE_MODE_ENV: &str = "ROCKETMQ_ZONE_MODE";
70pub const ROCKETMQ_ZONE_MODE_PROPERTY: &str = "rocketmq.zone.mode";
71pub const ZONE_NAME: &str = "__ZONE_NAME";
72pub const ZONE_MODE: &str = "__ZONE_MODE";
73pub const LOGICAL_QUEUE_MOCK_BROKER_PREFIX: &str = "__syslo__";
74pub const METADATA_SCOPE_GLOBAL: &str = "__global__";
75pub const LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST: &str = "__syslo__none__";
76pub static MULTI_PATH_SPLITTER: Lazy<String> =
77    Lazy::new(|| env::var("rocketmq.broker.multiPathSplitter").unwrap_or_else(|_| ",".to_string()));
78
79pub fn is_sys_consumer_group(consumer_group: &str) -> bool {
80    consumer_group.starts_with(CID_RMQ_SYS_PREFIX)
81}
82
83pub fn is_sys_consumer_group_for_no_cold_read_limit(consumer_group: &str) -> bool {
84    if consumer_group == DEFAULT_CONSUMER_GROUP
85        || consumer_group == TOOLS_CONSUMER_GROUP
86        || consumer_group == SCHEDULE_CONSUMER_GROUP
87        || consumer_group == FILTERSRV_CONSUMER_GROUP
88        || consumer_group == MONITOR_CONSUMER_GROUP
89        || consumer_group == SELF_TEST_CONSUMER_GROUP
90        || consumer_group == ONS_HTTP_PROXY_GROUP
91        || consumer_group == CID_ONSAPI_PERMISSION_GROUP
92        || consumer_group == CID_ONSAPI_OWNER_GROUP
93        || consumer_group == CID_ONSAPI_PULL_GROUP
94        || consumer_group == CID_SYS_RMQ_TRANS
95        || consumer_group.starts_with(CID_RMQ_SYS_PREFIX)
96    {
97        return true;
98    }
99    false
100}
101
102pub fn get_retry_topic(consumer_group: &str) -> String {
103    format!("{RETRY_GROUP_TOPIC_PREFIX}{consumer_group}")
104}
105
106pub fn get_dlq_topic(consumer_group: &str) -> String {
107    format!("{DLQ_GROUP_TOPIC_PREFIX}{consumer_group}")
108}
109
110pub fn is_lmq(lmq_meta_data: Option<&str>) -> bool {
111    match lmq_meta_data {
112        Some(data) => data.starts_with(LMQ_PREFIX),
113        None => false,
114    }
115}
116
117pub fn get_ws_addr() -> String {
118    let ws_domain_name = env::var("rocketmq.namesrv.domain")
119        .unwrap_or_else(|_| DEFAULT_NAMESRV_ADDR_LOOKUP.to_string());
120    let ws_domain_subgroup =
121        env::var("rocketmq.namesrv.domain.subgroup").unwrap_or_else(|_| "nsaddr".to_string());
122    let mut ws_addr = format!("http://{ws_domain_name}:8080/rocketmq/{ws_domain_subgroup}");
123
124    if ws_domain_name.contains(':') {
125        ws_addr = format!("http://{ws_domain_name}/rocketmq/{ws_domain_subgroup}");
126    }
127
128    ws_addr
129}
130
131pub fn broker_vip_channel(is_broker_vip_channel: bool, broker_addr: &str) -> CheetahString {
132    if is_broker_vip_channel {
133        if let Some(split) = broker_addr.rfind(':') {
134            let ip = &broker_addr[..split];
135            if let Ok(port) = broker_addr[split + 1..].parse::<i32>() {
136                let broker_addr_new = format!("{}:{}", ip, port - 2);
137                return CheetahString::from_string(broker_addr_new);
138            }
139        }
140    }
141    CheetahString::from_slice(broker_addr)
142}
143
144pub fn human_readable_byte_count(bytes: i64, si: bool) -> String {
145    let bytes = bytes as f64;
146    let unit = if si { 1000.0 } else { 1024.0 };
147    if bytes < unit {
148        return format!("{bytes} B");
149    }
150    let exp = (bytes.ln() / unit.ln()).floor() as i32;
151    let pre = ['K', 'M', 'G', 'T', 'P', 'E'][(exp - 1) as usize];
152    let pre = if si {
153        pre.to_string()
154    } else {
155        format!("{pre}i")
156    };
157    format!("{:.1} {}B", bytes / unit.powi(exp), pre)
158}
159
160pub fn string_to_properties(input: &str) -> Option<HashMap<CheetahString, CheetahString>> {
161    let mut properties = HashMap::new();
162
163    for line in input.lines() {
164        let line = line.trim();
165        if line.is_empty() || line.starts_with('#') {
166            // Skip empty lines or comments
167            continue;
168        }
169
170        if let Some((key, value)) = line.split_once('=') {
171            // Convert key and value to CheetahString
172            let key = CheetahString::from(key.trim());
173            let value = CheetahString::from(value.trim());
174            properties.insert(key, value);
175        } else {
176            return None; // Return None if the line isn't in `key=value` format
177        }
178    }
179
180    Some(properties)
181}
182
183pub fn properties_to_string(properties: &HashMap<CheetahString, CheetahString>) -> CheetahString {
184    properties
185        .iter()
186        .map(|(key, value)| format!("{}={}", key.as_str(), value.as_str()))
187        .collect::<Vec<String>>()
188        .join("\n")
189        .into()
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn identifies_sys_consumer_group() {
198        assert!(is_sys_consumer_group("CID_RMQ_SYS_SOME_GROUP"));
199        assert!(!is_sys_consumer_group("NON_SYS_GROUP"));
200    }
201
202    #[test]
203    fn identifies_sys_consumer_group_for_no_cold_read_limit() {
204        assert!(is_sys_consumer_group_for_no_cold_read_limit(
205            "DEFAULT_CONSUMER"
206        ));
207        assert!(is_sys_consumer_group_for_no_cold_read_limit(
208            "TOOLS_CONSUMER"
209        ));
210        assert!(is_sys_consumer_group_for_no_cold_read_limit(
211            "SCHEDULE_CONSUMER"
212        ));
213        assert!(is_sys_consumer_group_for_no_cold_read_limit(
214            "FILTERSRV_CONSUMER"
215        ));
216        assert!(!is_sys_consumer_group_for_no_cold_read_limit(
217            "MONITOR_CONSUMER"
218        ));
219        assert!(!is_sys_consumer_group_for_no_cold_read_limit(
220            "SELF_TEST_CONSUMER"
221        ));
222        assert!(!is_sys_consumer_group_for_no_cold_read_limit(
223            "ONS_HTTP_PROXY_GROUP"
224        ));
225        assert!(is_sys_consumer_group_for_no_cold_read_limit(
226            "CID_ONSAPI_PERMISSION"
227        ));
228        assert!(is_sys_consumer_group_for_no_cold_read_limit(
229            "CID_ONSAPI_OWNER"
230        ));
231        assert!(is_sys_consumer_group_for_no_cold_read_limit(
232            "CID_ONSAPI_PULL"
233        ));
234        assert!(is_sys_consumer_group_for_no_cold_read_limit(
235            "CID_RMQ_SYS_TRANS"
236        ));
237        assert!(is_sys_consumer_group_for_no_cold_read_limit(
238            "CID_RMQ_SYS_SOME_GROUP"
239        ));
240        assert!(!is_sys_consumer_group_for_no_cold_read_limit(
241            "NON_SYS_GROUP"
242        ));
243    }
244
245    #[test]
246    fn generates_retry_topic_for_consumer_group() {
247        let consumer_group = "test_group";
248        let expected = format!("{}{}", RETRY_GROUP_TOPIC_PREFIX, consumer_group);
249        assert_eq!(get_retry_topic(consumer_group), expected);
250    }
251
252    #[test]
253    fn generates_retry_topic_for_empty_consumer_group() {
254        let consumer_group = "";
255        let expected = RETRY_GROUP_TOPIC_PREFIX.to_string();
256        assert_eq!(get_retry_topic(consumer_group), expected);
257    }
258
259    #[test]
260    fn returns_true_for_lmq_prefixed_metadata() {
261        let lmq_meta_data = Some("%LMQ%SpecificInfo");
262        assert!(is_lmq(lmq_meta_data));
263    }
264
265    #[test]
266    fn returns_false_for_non_lmq_prefixed_metadata() {
267        let lmq_meta_data = Some("NonLMQSpecificInfo");
268        assert!(!is_lmq(lmq_meta_data));
269    }
270    #[test]
271    fn returns_false_for_none_metadata() {
272        assert!(!is_lmq(None));
273    }
274
275    #[test]
276    fn test_string_to_properties_valid_input() {
277        let input = r#"
278             # This is a comment
279             key1=value1
280             key2 = value2
281             key3=value3
282         "#;
283
284        let result = string_to_properties(input).expect("Parsing should succeed");
285        let mut expected = HashMap::new();
286        expected.insert(CheetahString::from("key1"), CheetahString::from("value1"));
287        expected.insert(CheetahString::from("key2"), CheetahString::from("value2"));
288        expected.insert(CheetahString::from("key3"), CheetahString::from("value3"));
289
290        assert_eq!(result, expected);
291    }
292
293    #[test]
294    fn test_string_to_properties_invalid_line() {
295        let input = r#"
296             key1=value1
297             invalid_line
298         "#;
299
300        let result = string_to_properties(input);
301        assert!(result.is_none(), "Parsing should fail for invalid input");
302    }
303}