rocketmq_client_v4/protocols/
mq_command.rs

1use atomic_counter::{AtomicCounter, ConsistentCounter};
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3use log::{debug, warn};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::LazyLock;
7use tokio::io::AsyncReadExt;
8use tokio::net::tcp::OwnedReadHalf;
9use tokio::net::TcpStream;
10
11/**
12https://github.com/apache/rocketmq-client-go/blob/master/docs/zh/rocketmq-protocol_zh.md
13
14total frame
15    ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
16    + frame_size | header_length |         header_body        |     body     +
17    ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
18    +   4bytes   |     4bytes    | (21 + r_len + e_len) bytes | remain bytes +
19    ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
20
21item 	type 	description
22frame_size 	int32 	一个 RemotingCommand 数据包大小
23header_length 	int32 	高8位表示数据的序列化方式,余下的表示真实 header 长度
24header_body 	[]byte 	header 的 payload,长度由附带的 remark 和 properties 决定
25body 	[]byte 	具体 Request/Response 的 payload
26
27
28
29header_body
30+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
31+  request_code | l_flag | v_flag | opaque | request_flag |  r_len  |   r_body    |  e_len  |    e_body   +
32+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
33+     2bytes    |  1byte | 2bytes | 4bytes |    4 bytes   | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
34+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
35
36item 	type 	description
37request_code 	int16 	哪一种 Request 或 ResponseCode,具体类别由 request_flag 决定
38l_flag 	byte 	language 位,用来标识Request来源方的开发语言
39v_flag 	int16 	版本标记位
40request_flag 	int32 	Header标记位,用来标记该 RemotingCommand 的类型和请求方式
41opaque 	int32 	标识 Request/Response 的 RequestID,Broker 返回的 Response 通过该值和 Client 缓存的 Request 一一对应
42r_len 	int32 	length of remark, remark 是 Request/Response 的附带说明信息,一般在 Response 中用来说明具体的错误原因
43r_body 	[]byte 	payload of remark
44e_len 	int32 	length of extended fields,即 properties,一些非标准字段会存储在这里,在 RocketMQ 的各种 feature 中均有广泛应用
45e_body 	int32 	payload of extended fields
46
47*/
48
49const LANGUAGE_FLAG: i8 = 12; // 12 is define as rust
50pub const VERSION_FLAG: i16 = 63;
51static OPAQUE: LazyLock<ConsistentCounter> = LazyLock::new(|| ConsistentCounter::new(200));
52
53pub const HEADER_SERIALIZE_METHOD_JSON: u8 = 0;
54pub const HEADER_SERIALIZE_METHOD_PRIVATE: u8 = 1;
55
56#[derive(Debug, Deserialize, Serialize)]
57#[allow(non_snake_case)]
58pub struct RemotingCommand {
59    //private int code;
60    //     private LanguageCode language = LanguageCode.JAVA;
61    //     private int version = 0;
62    //     private int opaque = requestId.getAndIncrement();
63    //     private int flag = 0;
64    //     private String remark;
65    //     private HashMap<String, String> extFields;
66    pub code: i16,
67    pub language: String,
68    pub version: i32,
69    pub opaque: i32,
70    pub flag: i32,
71    pub remark: Option<String>,
72    pub extFields: HashMap<String, String>,
73    pub serializeTypeCurrentRPC: Option<String>,
74}
75
76impl RemotingCommand {
77    pub fn get_language_i16(&self) -> i16 {
78        return match self.language.as_str() {
79            "JAVA" => 0,
80            _ => 7,
81        };
82    }
83}
84
85#[derive(Debug)]
86pub struct MqCommand {
87    pub req_code: i16,
88    pub l_flag: i8,
89    /** for rust ,it always 7, means other language */
90    pub v_flag: i16,
91    pub opaque: i32,
92    pub request_flag: i32,
93    pub r_len: i32,
94    pub r_body: Vec<u8>,
95    pub e_len: i32,
96    pub e_body: Vec<u8>,
97    pub body: Vec<u8>,
98    pub header_serialize_method: u8,
99}
100
101impl MqCommand {
102    pub fn new() -> MqCommand {
103        return MqCommand {
104            req_code: 0,
105            l_flag: LANGUAGE_FLAG,
106            v_flag: VERSION_FLAG,
107            opaque: OPAQUE.add(1) as i32,
108            request_flag: 0,
109            r_len: 0,
110            r_body: Vec::new(),
111            e_len: 0,
112            e_body: Vec::new(),
113            body: Vec::new(),
114            header_serialize_method: HEADER_SERIALIZE_METHOD_PRIVATE,
115        };
116    }
117
118    pub fn new_with_body(
119        req_code: i16,
120        r_body: Vec<u8>,
121        e_body: Vec<u8>,
122        body: Vec<u8>,
123    ) -> MqCommand {
124        return MqCommand {
125            req_code,
126            l_flag: LANGUAGE_FLAG,
127            v_flag: VERSION_FLAG,
128            opaque: OPAQUE.add(1) as i32,
129            request_flag: 0,
130            r_len: r_body.len() as i32,
131            r_body,
132            e_len: e_body.len() as i32,
133            e_body,
134            body,
135            header_serialize_method: HEADER_SERIALIZE_METHOD_PRIVATE,
136        };
137    }
138
139    pub async fn read_from_stream_with_opaque(broker_stream: &mut TcpStream, opaque: i32) -> Self {
140        for _ in 0..5 {
141            let cmd = Self::read_from_stream(broker_stream).await;
142            if cmd.opaque != opaque {
143                debug!("receive server send extended message, req_code:{}, remark:{:?}, extend:{:?}, body:{:?}",
144            cmd.req_code, String::from_utf8(cmd.r_body), String::from_utf8(cmd.e_body), String::from_utf8(cmd.body));
145            } else {
146                return cmd;
147            }
148        }
149        panic!("read from mq server failed, stop to connect");
150    }
151
152    pub async fn read_from_stream(stream: &mut TcpStream) -> Self {
153        let size = stream.read_i32().await;
154        if size.is_err() {
155            panic!("read command from mq failed! {:?}", size.err());
156        }
157        let size = size.unwrap();
158        let mut buf = vec![0u8; size as usize];
159        let body = stream.read_exact(&mut buf).await;
160        if body.is_err() {
161            panic!("read command data from mq failed!, ignore it");
162        }
163        let mut frame = BytesMut::with_capacity((4 + size) as usize);
164        frame.put_i32(size);
165        frame.put_slice(&buf.to_vec());
166
167        let cmd = Self::convert_bytes_to_mq_command(frame.to_vec());
168        cmd
169    }
170
171    pub async fn read_from_read_half(stream: &mut OwnedReadHalf) -> Option<Self> {
172        let size = stream.read_i32().await;
173        if size.is_err() {
174            warn!("read command from mq failed! {:?}", size.err());
175            return None;
176        }
177        let size = size.unwrap();
178        let mut buf = vec![0u8; size as usize];
179        let body = stream.read_exact(&mut buf).await;
180        if body.is_err() {
181            panic!("read command data from mq failed!, ignore it");
182        }
183        let mut frame = BytesMut::with_capacity((4 + size) as usize);
184        frame.put_i32(size);
185        frame.put_slice(&buf.to_vec());
186
187        let cmd = Self::convert_bytes_to_mq_command(frame.to_vec());
188        Some(cmd)
189    }
190
191    /**
192       convert a command to bytes
193    */
194    pub fn to_bytes(&self) -> Vec<u8> {
195        let mut buf = BytesMut::with_capacity(1024);
196        buf.put_i16(self.req_code);
197        buf.put_i8(self.l_flag);
198        buf.put_i16(self.v_flag);
199        buf.put_i32(self.opaque);
200        buf.put_i32(self.request_flag);
201        buf.put_i32(self.r_body.len() as i32);
202        buf.put_slice(&self.r_body);
203        buf.put_i32(self.e_body.len() as i32);
204        buf.put_slice(&self.e_body);
205        let header_body = buf.to_vec();
206        let header_len: i32 = header_body.len() as i32;
207        let mut header_buf: [u8; 4] = [0; 4];
208        // 0 means json, 1 means rocket mq protocol
209        header_buf[0] = HEADER_SERIALIZE_METHOD_PRIVATE;
210        header_buf[1] = ((header_len >> 16) & 0xFF) as u8;
211        header_buf[2] = ((header_len >> 8) & 0xFF) as u8;
212        header_buf[3] = ((header_len) & 0xFF) as u8;
213        let body_len: i32 = self.body.len() as i32;
214        // frame_size = header_len(4byte) + header_data's len + body_data's len
215        let frame_size: i32 = 4 + header_len + body_len;
216
217        let mut total_frame = BytesMut::with_capacity((4 + frame_size) as usize);
218        total_frame.put_i32(frame_size);
219        total_frame.put_slice(&header_buf);
220        total_frame.put_slice(&header_body);
221        total_frame.put_slice(&self.body);
222        return total_frame.to_vec();
223    }
224
225    pub fn convert_bytes_to_mq_command(bytes: Vec<u8>) -> MqCommand {
226        let mut buf = Bytes::from(bytes);
227        if buf.len() < 4 {
228            panic!("invalid body. the len less than 4!");
229        }
230
231        let frame_size = buf.get_i32();
232        if buf.len() as i32 != frame_size {
233            panic!(
234                "invalid body. the len is not equal to frame_size!, frame_size: {}, buf_len: {}",
235                frame_size,
236                buf.len()
237            );
238        }
239
240        let header_len = buf.get_i32();
241        let header_serialize_method = ((header_len >> 24) & 0xFF) as u8;
242        let header_len = header_len & 0x00FFFFFF;
243        let header_body = buf.copy_to_bytes(header_len as usize).to_vec();
244        let mut head_buf = Bytes::from(header_body);
245        if header_serialize_method == HEADER_SERIALIZE_METHOD_JSON {
246            let remoting_cmd: RemotingCommand = serde_json::from_slice(&head_buf.to_vec()).unwrap();
247            let r_body = if remoting_cmd.remark.is_none() {
248                vec![]
249            } else {
250                Vec::from(remoting_cmd.remark.unwrap().to_string())
251            };
252
253            let ext_fields = if remoting_cmd.extFields.is_empty() {
254                vec![]
255            } else {
256                Vec::from(serde_json::to_string(&remoting_cmd.extFields).unwrap())
257            };
258
259            let body_len = buf.remaining();
260            let body = buf.copy_to_bytes(body_len).to_vec();
261
262            return MqCommand {
263                req_code: remoting_cmd.code,
264                l_flag: 0,
265                v_flag: 0,
266                opaque: remoting_cmd.opaque,
267                request_flag: remoting_cmd.flag,
268                r_len: r_body.len() as i32,
269                r_body,
270                e_len: ext_fields.len() as i32,
271                e_body: ext_fields,
272                body,
273                header_serialize_method,
274            };
275        }
276        let req_code = head_buf.get_i16();
277        let l_flag = head_buf.get_i8();
278        let v_flag = head_buf.get_i16();
279        let opaque = head_buf.get_i32();
280        let request_flag = head_buf.get_i32();
281        let r_len = head_buf.get_i32();
282        let r_body = if r_len > 0 {
283            head_buf.copy_to_bytes(r_len as usize).to_vec()
284        } else {
285            vec![]
286        };
287        let e_len = head_buf.get_i32();
288        let e_body = if e_len > 0 {
289            head_buf.copy_to_bytes(e_len as usize).to_vec()
290        } else {
291            vec![]
292        };
293
294        let body_len = buf.remaining();
295        let body = buf.copy_to_bytes(body_len).to_vec();
296
297        MqCommand {
298            req_code,
299            l_flag,
300            v_flag,
301            opaque,
302            request_flag,
303            r_len,
304            r_body,
305            e_len,
306            e_body,
307            body,
308            header_serialize_method,
309        }
310    }
311
312    pub fn convert_extend_header_to_json(&self) -> String {
313        match self.header_serialize_method {
314            HEADER_SERIALIZE_METHOD_JSON => {
315                warn!("not support header_serialize_method");
316                panic!("not support header_serialize_method")
317            }
318            HEADER_SERIALIZE_METHOD_PRIVATE => {
319                if self.e_len == 0 {
320                    return "{}".to_string();
321                } else {
322                    let mut data = Bytes::from(self.e_body.clone());
323                    let mut map = HashMap::new();
324                    while data.has_remaining() {
325                        let k_len = data.get_i16();
326                        let k_name = data.copy_to_bytes(k_len as usize);
327                        let v_len = data.get_i32();
328                        let v_value = data.copy_to_bytes(v_len as usize);
329                        map.insert(
330                            String::from_utf8(k_name.to_vec()).unwrap(),
331                            String::from_utf8(v_value.to_vec()).unwrap(),
332                        );
333                    }
334
335                    serde_json::to_string(&map).unwrap()
336                }
337            }
338
339            _ => {
340                warn!("not support header_serialize_method");
341                panic!("not support header_serialize_method");
342            }
343        }
344    }
345}
346
347#[cfg(test)]
348mod test {
349    use atomic_counter::{AtomicCounter, ConsistentCounter};
350
351    #[test]
352    fn auto_incr_test() {
353        let c = ConsistentCounter::new(0);
354        for _ in 0..5 {
355            let k = c.add(1) as i32;
356            println!("k value is :{k}");
357        }
358    }
359}