rocketmq_client_v4/protocols/
mq_command.rs1use atomic_counter::{AtomicCounter, ConsistentCounter};
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3use log::{debug, warn};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::LazyLock;
7use tokio::io::AsyncReadExt;
8use tokio::net::tcp::OwnedReadHalf;
9use tokio::net::TcpStream;
10
11const LANGUAGE_FLAG: i8 = 12; pub const VERSION_FLAG: i16 = 63;
51static OPAQUE: LazyLock<ConsistentCounter> = LazyLock::new(|| ConsistentCounter::new(200));
52
53pub const HEADER_SERIALIZE_METHOD_JSON: u8 = 0;
54pub const HEADER_SERIALIZE_METHOD_PRIVATE: u8 = 1;
55
56#[derive(Debug, Deserialize, Serialize)]
57#[allow(non_snake_case)]
58pub struct RemotingCommand {
59 pub code: i16,
67 pub language: String,
68 pub version: i32,
69 pub opaque: i32,
70 pub flag: i32,
71 pub remark: Option<String>,
72 pub extFields: HashMap<String, String>,
73 pub serializeTypeCurrentRPC: Option<String>,
74}
75
76impl RemotingCommand {
77 pub fn get_language_i16(&self) -> i16 {
78 return match self.language.as_str() {
79 "JAVA" => 0,
80 _ => 7,
81 };
82 }
83}
84
85#[derive(Debug)]
86pub struct MqCommand {
87 pub req_code: i16,
88 pub l_flag: i8,
89 pub v_flag: i16,
91 pub opaque: i32,
92 pub request_flag: i32,
93 pub r_len: i32,
94 pub r_body: Vec<u8>,
95 pub e_len: i32,
96 pub e_body: Vec<u8>,
97 pub body: Vec<u8>,
98 pub header_serialize_method: u8,
99}
100
101impl MqCommand {
102 pub fn new() -> MqCommand {
103 return MqCommand {
104 req_code: 0,
105 l_flag: LANGUAGE_FLAG,
106 v_flag: VERSION_FLAG,
107 opaque: OPAQUE.add(1) as i32,
108 request_flag: 0,
109 r_len: 0,
110 r_body: Vec::new(),
111 e_len: 0,
112 e_body: Vec::new(),
113 body: Vec::new(),
114 header_serialize_method: HEADER_SERIALIZE_METHOD_PRIVATE,
115 };
116 }
117
118 pub fn new_with_body(
119 req_code: i16,
120 r_body: Vec<u8>,
121 e_body: Vec<u8>,
122 body: Vec<u8>,
123 ) -> MqCommand {
124 return MqCommand {
125 req_code,
126 l_flag: LANGUAGE_FLAG,
127 v_flag: VERSION_FLAG,
128 opaque: OPAQUE.add(1) as i32,
129 request_flag: 0,
130 r_len: r_body.len() as i32,
131 r_body,
132 e_len: e_body.len() as i32,
133 e_body,
134 body,
135 header_serialize_method: HEADER_SERIALIZE_METHOD_PRIVATE,
136 };
137 }
138
139 pub async fn read_from_stream_with_opaque(broker_stream: &mut TcpStream, opaque: i32) -> Self {
140 for _ in 0..5 {
141 let cmd = Self::read_from_stream(broker_stream).await;
142 if cmd.opaque != opaque {
143 debug!("receive server send extended message, req_code:{}, remark:{:?}, extend:{:?}, body:{:?}",
144 cmd.req_code, String::from_utf8(cmd.r_body), String::from_utf8(cmd.e_body), String::from_utf8(cmd.body));
145 } else {
146 return cmd;
147 }
148 }
149 panic!("read from mq server failed, stop to connect");
150 }
151
152 pub async fn read_from_stream(stream: &mut TcpStream) -> Self {
153 let size = stream.read_i32().await;
154 if size.is_err() {
155 panic!("read command from mq failed! {:?}", size.err());
156 }
157 let size = size.unwrap();
158 let mut buf = vec![0u8; size as usize];
159 let body = stream.read_exact(&mut buf).await;
160 if body.is_err() {
161 panic!("read command data from mq failed!, ignore it");
162 }
163 let mut frame = BytesMut::with_capacity((4 + size) as usize);
164 frame.put_i32(size);
165 frame.put_slice(&buf.to_vec());
166
167 let cmd = Self::convert_bytes_to_mq_command(frame.to_vec());
168 cmd
169 }
170
171 pub async fn read_from_read_half(stream: &mut OwnedReadHalf) -> Option<Self> {
172 let size = stream.read_i32().await;
173 if size.is_err() {
174 warn!("read command from mq failed! {:?}", size.err());
175 return None;
176 }
177 let size = size.unwrap();
178 let mut buf = vec![0u8; size as usize];
179 let body = stream.read_exact(&mut buf).await;
180 if body.is_err() {
181 panic!("read command data from mq failed!, ignore it");
182 }
183 let mut frame = BytesMut::with_capacity((4 + size) as usize);
184 frame.put_i32(size);
185 frame.put_slice(&buf.to_vec());
186
187 let cmd = Self::convert_bytes_to_mq_command(frame.to_vec());
188 Some(cmd)
189 }
190
191 pub fn to_bytes(&self) -> Vec<u8> {
195 let mut buf = BytesMut::with_capacity(1024);
196 buf.put_i16(self.req_code);
197 buf.put_i8(self.l_flag);
198 buf.put_i16(self.v_flag);
199 buf.put_i32(self.opaque);
200 buf.put_i32(self.request_flag);
201 buf.put_i32(self.r_body.len() as i32);
202 buf.put_slice(&self.r_body);
203 buf.put_i32(self.e_body.len() as i32);
204 buf.put_slice(&self.e_body);
205 let header_body = buf.to_vec();
206 let header_len: i32 = header_body.len() as i32;
207 let mut header_buf: [u8; 4] = [0; 4];
208 header_buf[0] = HEADER_SERIALIZE_METHOD_PRIVATE;
210 header_buf[1] = ((header_len >> 16) & 0xFF) as u8;
211 header_buf[2] = ((header_len >> 8) & 0xFF) as u8;
212 header_buf[3] = ((header_len) & 0xFF) as u8;
213 let body_len: i32 = self.body.len() as i32;
214 let frame_size: i32 = 4 + header_len + body_len;
216
217 let mut total_frame = BytesMut::with_capacity((4 + frame_size) as usize);
218 total_frame.put_i32(frame_size);
219 total_frame.put_slice(&header_buf);
220 total_frame.put_slice(&header_body);
221 total_frame.put_slice(&self.body);
222 return total_frame.to_vec();
223 }
224
225 pub fn convert_bytes_to_mq_command(bytes: Vec<u8>) -> MqCommand {
226 let mut buf = Bytes::from(bytes);
227 if buf.len() < 4 {
228 panic!("invalid body. the len less than 4!");
229 }
230
231 let frame_size = buf.get_i32();
232 if buf.len() as i32 != frame_size {
233 panic!(
234 "invalid body. the len is not equal to frame_size!, frame_size: {}, buf_len: {}",
235 frame_size,
236 buf.len()
237 );
238 }
239
240 let header_len = buf.get_i32();
241 let header_serialize_method = ((header_len >> 24) & 0xFF) as u8;
242 let header_len = header_len & 0x00FFFFFF;
243 let header_body = buf.copy_to_bytes(header_len as usize).to_vec();
244 let mut head_buf = Bytes::from(header_body);
245 if header_serialize_method == HEADER_SERIALIZE_METHOD_JSON {
246 let remoting_cmd: RemotingCommand = serde_json::from_slice(&head_buf.to_vec()).unwrap();
247 let r_body = if remoting_cmd.remark.is_none() {
248 vec![]
249 } else {
250 Vec::from(remoting_cmd.remark.unwrap().to_string())
251 };
252
253 let ext_fields = if remoting_cmd.extFields.is_empty() {
254 vec![]
255 } else {
256 Vec::from(serde_json::to_string(&remoting_cmd.extFields).unwrap())
257 };
258
259 let body_len = buf.remaining();
260 let body = buf.copy_to_bytes(body_len).to_vec();
261
262 return MqCommand {
263 req_code: remoting_cmd.code,
264 l_flag: 0,
265 v_flag: 0,
266 opaque: remoting_cmd.opaque,
267 request_flag: remoting_cmd.flag,
268 r_len: r_body.len() as i32,
269 r_body,
270 e_len: ext_fields.len() as i32,
271 e_body: ext_fields,
272 body,
273 header_serialize_method,
274 };
275 }
276 let req_code = head_buf.get_i16();
277 let l_flag = head_buf.get_i8();
278 let v_flag = head_buf.get_i16();
279 let opaque = head_buf.get_i32();
280 let request_flag = head_buf.get_i32();
281 let r_len = head_buf.get_i32();
282 let r_body = if r_len > 0 {
283 head_buf.copy_to_bytes(r_len as usize).to_vec()
284 } else {
285 vec![]
286 };
287 let e_len = head_buf.get_i32();
288 let e_body = if e_len > 0 {
289 head_buf.copy_to_bytes(e_len as usize).to_vec()
290 } else {
291 vec![]
292 };
293
294 let body_len = buf.remaining();
295 let body = buf.copy_to_bytes(body_len).to_vec();
296
297 MqCommand {
298 req_code,
299 l_flag,
300 v_flag,
301 opaque,
302 request_flag,
303 r_len,
304 r_body,
305 e_len,
306 e_body,
307 body,
308 header_serialize_method,
309 }
310 }
311
312 pub fn convert_extend_header_to_json(&self) -> String {
313 match self.header_serialize_method {
314 HEADER_SERIALIZE_METHOD_JSON => {
315 warn!("not support header_serialize_method");
316 panic!("not support header_serialize_method")
317 }
318 HEADER_SERIALIZE_METHOD_PRIVATE => {
319 if self.e_len == 0 {
320 return "{}".to_string();
321 } else {
322 let mut data = Bytes::from(self.e_body.clone());
323 let mut map = HashMap::new();
324 while data.has_remaining() {
325 let k_len = data.get_i16();
326 let k_name = data.copy_to_bytes(k_len as usize);
327 let v_len = data.get_i32();
328 let v_value = data.copy_to_bytes(v_len as usize);
329 map.insert(
330 String::from_utf8(k_name.to_vec()).unwrap(),
331 String::from_utf8(v_value.to_vec()).unwrap(),
332 );
333 }
334
335 serde_json::to_string(&map).unwrap()
336 }
337 }
338
339 _ => {
340 warn!("not support header_serialize_method");
341 panic!("not support header_serialize_method");
342 }
343 }
344 }
345}
346
347#[cfg(test)]
348mod test {
349 use atomic_counter::{AtomicCounter, ConsistentCounter};
350
351 #[test]
352 fn auto_incr_test() {
353 let c = ConsistentCounter::new(0);
354 for _ in 0..5 {
355 let k = c.add(1) as i32;
356 println!("k value is :{k}");
357 }
358 }
359}