rocketmq_client_v4/protocols/
mod.rs1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use serde::Serialize;
3use serde_json::Value;
4use std::collections::HashMap;
5
6pub mod body;
7pub mod header;
8
9pub mod request_code;
10pub mod response_code;
11
12pub mod mq_command;
13
14pub trait SerializeDeserialize {
15 fn to_bytes_1(&self) -> Vec<u8>
16 where
17 Self: Serialize,
18 {
19 let value = serde_json::to_value(&self).unwrap();
20 match value {
21 Value::Object(map) => {
22 let mut buf = BytesMut::with_capacity(128);
23 for (k, v) in map {
25 let v = if v.is_string() {
26 v.as_str().unwrap().to_string()
27 } else if v.is_number() {
28 v.as_number().unwrap().to_string()
29 } else if v.is_boolean() {
30 match v.as_bool().unwrap() {
31 true => "true".to_string(),
32 false => "false".to_string(),
33 }
34 } else if v.is_null() {
35 continue;
36 } else {
37 panic!("not support type: key:{}, v:{}", k, v);
38 };
39 buf.put_i16(k.len() as i16);
40 buf.put_slice(k.as_bytes());
41 buf.put_i32(v.len() as i32);
42 buf.put_slice(v.as_bytes());
43 }
44 buf.to_vec()
45 }
46 _ => {
47 vec![]
48 }
49 }
50 }
51
52 fn bytes_1_to_header(_bytes: Vec<u8>) -> Option<Box<Self>> {
53 None
54 }
55
56 fn bytes_1_to_map(bytes: Vec<u8>) -> HashMap<String, String> {
57 let mut bytes = Bytes::from(bytes);
58 let mut value = HashMap::new();
59 while bytes.remaining() > 0 {
60 let key1_len = bytes.get_i16();
61 let key1 = bytes.copy_to_bytes(key1_len as usize);
62
63 let v1_len = bytes.get_i32();
64 let v1 = bytes.copy_to_bytes(v1_len as usize).to_vec();
65 value.insert(
66 String::from_utf8(key1.to_vec()).unwrap(),
67 String::from_utf8(v1.to_vec()).unwrap(),
68 );
69 }
70 value
71 }
72
73 fn to_json_bytes(&self) -> Vec<u8>
74 where
75 Self: Serialize,
76 {
77 serde_json::to_vec(self).unwrap()
78 }
79}
80
81pub fn fixed_un_standard_json(src: &Vec<u8>) -> Vec<u8> {
82 let lcub = "{".as_bytes()[0]; let rcub = "}".as_bytes()[0]; let comma = ",".as_bytes()[0];
86
87 let quot = "\"".as_bytes()[0]; let colon = ":".as_bytes()[0];
90
91 let zero = "0".as_bytes()[0];
92 let nine = "9".as_bytes()[0];
93
94 let bsol = 92u8; let mut dest: Vec<u8> = vec![];
97 let mut quot_count = 0;
98 for i in 0..src.len() {
99 if src[i] == quot {
100 if i > 0 && src[i - 1] != bsol {
101 quot_count = quot_count + 1;
102 }
103 }
104 if quot_count % 2 == 0 && (src[i] == lcub || src[i] == comma) {
107 if src[i + 1] != quot
108 && src[i + 1] != rcub
109 && (src[i + 1] >= zero && src[i + 1] <= nine)
110 {
111 dest.push(src[i]);
112 dest.push(quot);
113 continue;
114 }
115 }
116
117 if quot_count % 2 == 0 && src[i] == colon {
118 if src[i - 1] != quot {
119 dest.push(quot);
120 dest.push(src[i]);
121 continue;
122 }
123 }
124 dest.push(src[i]);
125 }
126 dest
127}
128
129#[allow(dead_code)]
130const PERM_PRIORITY: i32 = 0x1 << 3;
131const PERM_READ: i32 = 0x1 << 2;
132const PERM_WRITE: i32 = 0x1 << 1;
133
134const PERM_INHERIT: i32 = 0x1 << 0;
135
136pub struct PermName {}
137
138impl PermName {
139 pub fn perm_to_string(perm: i32) -> String {
140 let mut perm_str = String::from("");
141 if Self::is_readable(perm) {
142 perm_str.push_str("R");
143 } else {
144 perm_str.push_str("-");
145 }
146
147 if Self::is_writeable(perm) {
148 perm_str.push_str("W");
149 } else {
150 perm_str.push_str("-");
151 }
152
153 if Self::is_inherited(perm) {
154 perm_str.push_str("X");
155 } else {
156 perm_str.push_str("-");
157 }
158 return perm_str;
159 }
160
161 pub fn is_readable(perm: i32) -> bool {
162 return (perm & PERM_READ) == PERM_READ;
163 }
164
165 pub fn is_writeable(perm: i32) -> bool {
166 return (perm & PERM_WRITE) == PERM_WRITE;
167 }
168
169 pub fn is_inherited(perm: i32) -> bool {
170 return (perm & PERM_INHERIT) == PERM_INHERIT;
171 }
172}
173
174pub struct ConvertUtil;
175
176impl ConvertUtil {
177 pub fn convert_string_bytes_to_i64(bytes: Vec<u8>) -> i64 {
178 let str = String::from_utf8(bytes).unwrap();
179 let ret: i64 = str.parse().unwrap();
180 ret
181 }
182
183 pub fn convert_string_bytes_to_i32(bytes: Vec<u8>) -> i32 {
184 let str = String::from_utf8(bytes).unwrap();
185 let ret: i32 = str.parse().unwrap();
186 ret
187 }
188}
189
190pub fn get_current_time_millis() -> i64 {
191 let now = std::time::SystemTime::now();
192 let duration = now
193 .duration_since(std::time::SystemTime::UNIX_EPOCH)
194 .unwrap();
195 return duration.as_millis() as i64;
196}
197
198pub async fn sleep(millis: u64) {
199 tokio::time::sleep(std::time::Duration::from_millis(millis)).await;
200}
201
202#[cfg(test)]
203mod test {
204 use crate::protocols::fixed_un_standard_json;
205 use serde_json::Value;
206
207 #[test]
208 fn calc_ascii() {
209 let data = r#"{"broke\"rAdd\trTable":{"xd":{"brokerAddrs":{0:"192.168.3.49:10911"},"brokerName":"xd","cluster":"DefaultCluster"}},"clusterAddrTable":{"DefaultCluster":["xd"]}}"#;
210 let data = data.as_bytes();
211 let mut src = vec![];
212 for i in 0..data.len() {
213 src.push(data[i]);
214 }
215 let dest = fixed_un_standard_json(&src);
216 let _val: Value = serde_json::from_slice(&dest).unwrap();
217 println!("dest data:{}", String::from_utf8(dest).unwrap());
218 }
219
220 #[test]
221 fn calc_ascii_1() {
222 let data = r#"{"brokerDatas":[{"brokerAddrs":{0:"172.31.21.98:10911",1:"172.31.25.213:20911"},"brokerName":"broker-b","cluster":"DefaultCluster"},{"brokerAddrs":{0:"172.31.30.236:10911",1:"172.31.21.98:20911"},"brokerName":"broker-c","cluster":"DefaultCluster"},{"brokerAddrs":{0:"172.31.25.213:10911",1:"172.31.30.236:20911"},"brokerName":"broker-a","cluster":"DefaultCluster"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-b","perm":6,"readQueueNums":16,"topicSysFlag":0,"writeQueueNums":16},{"brokerName":"broker-c","perm":6,"readQueueNums":16,"topicSysFlag":0,"writeQueueNums":16},{"brokerName":"broker-a","perm":6,"readQueueNums":16,"topicSysFlag":0,"writeQueueNums":16}]}"#;
223 let data = data.as_bytes();
224 let mut src = vec![];
225 for i in 0..data.len() {
226 src.push(data[i]);
227 }
228 let dest = fixed_un_standard_json(&src);
229 let _val: Value = serde_json::from_slice(&dest).unwrap();
230 println!("dest data:{}", String::from_utf8(dest).unwrap());
231 }
232}