rocketmq_client_v4/protocols/
mod.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use serde::Serialize;
3use serde_json::Value;
4use std::collections::HashMap;
5
6pub mod body;
7pub mod header;
8
9pub mod request_code;
10pub mod response_code;
11
12pub mod mq_command;
13
14pub trait SerializeDeserialize {
15    fn to_bytes_1(&self) -> Vec<u8>
16    where
17        Self: Serialize,
18    {
19        let value = serde_json::to_value(&self).unwrap();
20        match value {
21            Value::Object(map) => {
22                let mut buf = BytesMut::with_capacity(128);
23                // debug!("header map:{:?}", &map);
24                for (k, v) in map {
25                    let v = if v.is_string() {
26                        v.as_str().unwrap().to_string()
27                    } else if v.is_number() {
28                        v.as_number().unwrap().to_string()
29                    } else if v.is_boolean() {
30                        match v.as_bool().unwrap() {
31                            true => "true".to_string(),
32                            false => "false".to_string(),
33                        }
34                    } else if v.is_null() {
35                        continue;
36                    } else {
37                        panic!("not support type: key:{}, v:{}", k, v);
38                    };
39                    buf.put_i16(k.len() as i16);
40                    buf.put_slice(k.as_bytes());
41                    buf.put_i32(v.len() as i32);
42                    buf.put_slice(v.as_bytes());
43                }
44                buf.to_vec()
45            }
46            _ => {
47                vec![]
48            }
49        }
50    }
51
52    fn bytes_1_to_header(_bytes: Vec<u8>) -> Option<Box<Self>> {
53        None
54    }
55
56    fn bytes_1_to_map(bytes: Vec<u8>) -> HashMap<String, String> {
57        let mut bytes = Bytes::from(bytes);
58        let mut value = HashMap::new();
59        while bytes.remaining() > 0 {
60            let key1_len = bytes.get_i16();
61            let key1 = bytes.copy_to_bytes(key1_len as usize);
62
63            let v1_len = bytes.get_i32();
64            let v1 = bytes.copy_to_bytes(v1_len as usize).to_vec();
65            value.insert(
66                String::from_utf8(key1.to_vec()).unwrap(),
67                String::from_utf8(v1.to_vec()).unwrap(),
68            );
69        }
70        value
71    }
72
73    fn to_json_bytes(&self) -> Vec<u8>
74    where
75        Self: Serialize,
76    {
77        serde_json::to_vec(self).unwrap()
78    }
79}
80
81pub fn fixed_un_standard_json(src: &Vec<u8>) -> Vec<u8> {
82    let lcub = "{".as_bytes()[0]; // 123
83    let rcub = "}".as_bytes()[0]; // 125
84
85    let comma = ",".as_bytes()[0];
86
87    let quot = "\"".as_bytes()[0]; // 34
88
89    let colon = ":".as_bytes()[0];
90
91    let zero = "0".as_bytes()[0];
92    let nine = "9".as_bytes()[0];
93
94    let bsol = 92u8; // \
95
96    let mut dest: Vec<u8> = vec![];
97    let mut quot_count = 0;
98    for i in 0..src.len() {
99        if src[i] == quot {
100            if i > 0 && src[i - 1] != bsol {
101                quot_count = quot_count + 1;
102            }
103        }
104        // start { ,
105
106        if quot_count % 2 == 0 && (src[i] == lcub || src[i] == comma) {
107            if src[i + 1] != quot
108                && src[i + 1] != rcub
109                && (src[i + 1] >= zero && src[i + 1] <= nine)
110            {
111                dest.push(src[i]);
112                dest.push(quot);
113                continue;
114            }
115        }
116
117        if quot_count % 2 == 0 && src[i] == colon {
118            if src[i - 1] != quot {
119                dest.push(quot);
120                dest.push(src[i]);
121                continue;
122            }
123        }
124        dest.push(src[i]);
125    }
126    dest
127}
128
129#[allow(dead_code)]
130const PERM_PRIORITY: i32 = 0x1 << 3;
131const PERM_READ: i32 = 0x1 << 2;
132const PERM_WRITE: i32 = 0x1 << 1;
133
134const PERM_INHERIT: i32 = 0x1 << 0;
135
136pub struct PermName {}
137
138impl PermName {
139    pub fn perm_to_string(perm: i32) -> String {
140        let mut perm_str = String::from("");
141        if Self::is_readable(perm) {
142            perm_str.push_str("R");
143        } else {
144            perm_str.push_str("-");
145        }
146
147        if Self::is_writeable(perm) {
148            perm_str.push_str("W");
149        } else {
150            perm_str.push_str("-");
151        }
152
153        if Self::is_inherited(perm) {
154            perm_str.push_str("X");
155        } else {
156            perm_str.push_str("-");
157        }
158        return perm_str;
159    }
160
161    pub fn is_readable(perm: i32) -> bool {
162        return (perm & PERM_READ) == PERM_READ;
163    }
164
165    pub fn is_writeable(perm: i32) -> bool {
166        return (perm & PERM_WRITE) == PERM_WRITE;
167    }
168
169    pub fn is_inherited(perm: i32) -> bool {
170        return (perm & PERM_INHERIT) == PERM_INHERIT;
171    }
172}
173
174pub struct ConvertUtil;
175
176impl ConvertUtil {
177    pub fn convert_string_bytes_to_i64(bytes: Vec<u8>) -> i64 {
178        let str = String::from_utf8(bytes).unwrap();
179        let ret: i64 = str.parse().unwrap();
180        ret
181    }
182
183    pub fn convert_string_bytes_to_i32(bytes: Vec<u8>) -> i32 {
184        let str = String::from_utf8(bytes).unwrap();
185        let ret: i32 = str.parse().unwrap();
186        ret
187    }
188}
189
190pub fn get_current_time_millis() -> i64 {
191    let now = std::time::SystemTime::now();
192    let duration = now
193        .duration_since(std::time::SystemTime::UNIX_EPOCH)
194        .unwrap();
195    return duration.as_millis() as i64;
196}
197
198pub async fn sleep(millis: u64) {
199    tokio::time::sleep(std::time::Duration::from_millis(millis)).await;
200}
201
202#[cfg(test)]
203mod test {
204    use crate::protocols::fixed_un_standard_json;
205    use serde_json::Value;
206
207    #[test]
208    fn calc_ascii() {
209        let data = r#"{"broke\"rAdd\trTable":{"xd":{"brokerAddrs":{0:"192.168.3.49:10911"},"brokerName":"xd","cluster":"DefaultCluster"}},"clusterAddrTable":{"DefaultCluster":["xd"]}}"#;
210        let data = data.as_bytes();
211        let mut src = vec![];
212        for i in 0..data.len() {
213            src.push(data[i]);
214        }
215        let dest = fixed_un_standard_json(&src);
216        let _val: Value = serde_json::from_slice(&dest).unwrap();
217        println!("dest data:{}", String::from_utf8(dest).unwrap());
218    }
219
220    #[test]
221    fn calc_ascii_1() {
222        let data = r#"{"brokerDatas":[{"brokerAddrs":{0:"172.31.21.98:10911",1:"172.31.25.213:20911"},"brokerName":"broker-b","cluster":"DefaultCluster"},{"brokerAddrs":{0:"172.31.30.236:10911",1:"172.31.21.98:20911"},"brokerName":"broker-c","cluster":"DefaultCluster"},{"brokerAddrs":{0:"172.31.25.213:10911",1:"172.31.30.236:20911"},"brokerName":"broker-a","cluster":"DefaultCluster"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-b","perm":6,"readQueueNums":16,"topicSysFlag":0,"writeQueueNums":16},{"brokerName":"broker-c","perm":6,"readQueueNums":16,"topicSysFlag":0,"writeQueueNums":16},{"brokerName":"broker-a","perm":6,"readQueueNums":16,"topicSysFlag":0,"writeQueueNums":16}]}"#;
223        let data = data.as_bytes();
224        let mut src = vec![];
225        for i in 0..data.len() {
226            src.push(data[i]);
227        }
228        let dest = fixed_un_standard_json(&src);
229        let _val: Value = serde_json::from_slice(&dest).unwrap();
230        println!("dest data:{}", String::from_utf8(dest).unwrap());
231    }
232}