xCommonLib/protocol/
protocol_v2.rs

1use protobuf::{
2    CodedInputStream, CodedOutputStream,
3};
4
5use crate::{
6    base::status::Status,
7    serial::{api_descriptor::make_tag, request_message::RequestMessage},
8    service::sys_service_api::ServiceKey,
9};
10
11use super::{MsgType, XID};
12
13const DATA_LEN_POS: u16 = 0;
14const VERSION_POS: u16 = 4;
15const MSG_TYPE_POS: u16 = VERSION_POS + 2;
16const CONN_ID_POS: u16 = MSG_TYPE_POS + 4;
17const REQ_ID_POS: u16 = CONN_ID_POS + 8;
18const TIME_OUT_POS: u16 = REQ_ID_POS + 8;
19const FIX_LEN_POS: u16 = TIME_OUT_POS + 4;
20
21// 支持签名的协议
22pub struct ProtocolV2Reader<'a> {
23    msg_buffer: &'a [u8],
24}
25
26impl<'a> ProtocolV2Reader<'a> {
27    pub fn new(msg_buffer: &'a [u8]) -> ProtocolV2Reader<'a> {
28        ProtocolV2Reader { msg_buffer }
29    }
30    //
31    pub fn msg_type(&self) -> Option<MsgType> {
32        unsafe {
33            let ptr = self.msg_buffer.as_ptr().offset(MSG_TYPE_POS as isize);
34            let bytes = *(ptr as *const [u8; 4]);
35            let value = i32::from_le_bytes(bytes);
36            MsgType::from(value)
37        }
38    }
39    //
40    pub fn version() -> u16 {
41        2
42    }
43    // 获取连接 id  = 4 字节 ip + 2 字节 端口号
44    pub fn conn_id(&self) -> i64 {
45        unsafe {
46            let ptr = self.msg_buffer.as_ptr().offset(CONN_ID_POS as isize);
47
48            let bytes = *(ptr as *const [u8; 8]);
49
50            i64::from_le_bytes(bytes)
51        }
52    }
53
54    /**
55     * 获取当前请求的 id
56     */
57    pub fn req_id(&self) -> i64 {
58        unsafe {
59            let ptr = self.msg_buffer.as_ptr().offset(REQ_ID_POS as isize);
60
61            let bytes = *(ptr as *const [u8; 8]);
62
63            i64::from_le_bytes(bytes)
64        }
65    }
66    /**
67     *
68     */
69    pub fn xid(&self) -> XID {
70        XID {
71            conn_id: self.conn_id(),
72            request_id: self.req_id(),
73        }
74    }
75    /**
76     * 超时
77     */
78    pub fn timeout(&self) -> u32 {
79        unsafe {
80            let ptr = self.msg_buffer.as_ptr().offset(TIME_OUT_POS as isize);
81
82            let bytes = *(ptr as *const [u8; 4]);
83
84            u32::from_le_bytes(bytes)
85        }
86    }
87    /**
88     * 获取消息长度
89     */
90    pub fn msg_size(&self) -> u32 {
91        self.msg_buffer.len() as u32
92    }
93
94    pub fn msg_body_without_vrs(&self) -> &'a [u8] {
95
96        let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
97
98        let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
99
100        let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
101
102        let receiver_key_slice = &self.msg_buffer[skip_bytes..];
103
104        let skip_receiver_bytes = ServiceKey::skip_from_bytes(receiver_key_slice);
105
106        &self.msg_buffer
107            [(skip_bytes + skip_receiver_bytes as usize) as usize..(self.msg_buffer.len() - 68)]
108    }
109    //
110    pub fn msg_body_with_vrs(&self) -> &'a [u8] {
111
112        let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
113        
114        let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
115
116        let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
117
118        let receiver_key_slice = &self.msg_buffer[skip_bytes..];
119
120        let skip_receiver_bytes = ServiceKey::skip_from_bytes(receiver_key_slice);
121
122        &self.msg_buffer[(skip_bytes + skip_receiver_bytes as usize) as usize..]
123    }
124    //
125    pub fn sender(&self) -> ServiceKey {
126        let service_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
127
128        let mut sender = ServiceKey::default();
129
130        sender
131            .parse_from_bytes_return_num(service_key_slice)
132            .unwrap();
133
134        sender
135    }
136
137    pub fn receiver(&self) -> ServiceKey {
138        let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
139        let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
140        let mut receiver = ServiceKey::default();
141        let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
142        let service_key_slice = &self.msg_buffer[skip_bytes..];
143        receiver
144            .parse_from_bytes_return_num(service_key_slice)
145            .unwrap();
146
147        receiver
148    }
149
150    pub fn api(&self) -> Result<String, Status> {
151        let msg_body = self.msg_body_without_vrs();
152
153        let mut is = CodedInputStream::from_bytes(msg_body);
154
155        let tag = is.read_raw_tag_or_eof();
156
157        if let Err(err) = tag {
158            return Err(Status::error(err.to_string()));
159        }
160
161        let tag = tag.unwrap();
162
163        if tag.is_none() {
164            return Err(Status::error("读取 Api 类型标签出错!".into()));
165        }
166
167        let tag = tag.unwrap();
168        let api_tag = make_tag(1, 2);
169        if tag != api_tag {
170            return Err(Status::error("Api 类型错误!".into()));
171        }
172
173        let api = is.read_string();
174
175        match api {
176            Ok(api) => Ok(api),
177            Err(err) => Err(Status::error(err.to_string())),
178        }
179    }
180}
181
182pub struct ProtocolV2Writer {
183    pub msg_buffer: Vec<u8>,
184    pub msg_body_size: u32,
185}
186
187impl ProtocolV2Writer {
188    pub fn new(
189        msg_size: u32,
190        req_id: i64,
191        sender: &ServiceKey,
192        receiver: &ServiceKey,
193    ) -> ProtocolV2Writer {
194        let sender_size = sender.compute_size_with_tag_and_len() as u32;
195
196        let receiver_size = receiver.compute_size_with_tag_and_len() as u32;
197
198        let msg_len =
199            (FIX_LEN_POS as u32 + msg_size + sender_size + receiver_size as u32) as usize;
200
201        let mut msg_buffer: Vec<u8> = Vec::with_capacity(msg_len);
202        unsafe {
203            msg_buffer.set_len(msg_len);
204        }
205
206        let mut writer = ProtocolV2Writer {
207            msg_buffer,
208            msg_body_size: msg_size,
209        };
210        //  写入 xid
211        writer.write_conn_id(0);
212        writer.write_req_id(req_id);
213
214        writer.write_version();
215        writer.write_data_len(msg_len as u32);
216        writer.write_timeout(u32::MAX);
217        //
218
219        writer.write_service_key(sender, receiver);
220
221        writer
222    }
223
224    fn write_service_key(&mut self, sender: &ServiceKey, receiver: &ServiceKey) {
225        //
226        unsafe {
227            let service_key_slice = self.msg_buffer.as_mut_ptr().offset(FIX_LEN_POS as isize);
228
229            let service_key_slice = std::slice::from_raw_parts_mut(
230                service_key_slice,
231                self.msg_buffer.len() - FIX_LEN_POS as usize,
232            );
233
234            let mut os = CodedOutputStream::bytes(service_key_slice);
235
236            sender.serial_with_tag_and_len(&mut os);
237
238            receiver.serial_with_tag_and_len(&mut os);
239        }
240    }
241
242    pub fn write_req_id(&mut self, req_id: i64) {
243        unsafe {
244            let bytes = req_id.to_le_bytes();
245
246            let ptr = self.msg_buffer.as_mut_ptr().offset(REQ_ID_POS as isize);
247
248            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
249        }
250    }
251    /**
252     *
253     */
254    pub fn write_conn_id(&mut self, conn_id: i64) {
255        unsafe {
256            let bytes = conn_id.to_le_bytes();
257
258            let ptr = self.msg_buffer.as_mut_ptr().offset(CONN_ID_POS as isize);
259
260            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
261        }
262    }
263
264    pub fn write_conn_id_to_message(message: &mut Vec<u8>, conn_id: i64) {
265        unsafe {
266            let bytes = conn_id.to_le_bytes();
267
268            let ptr = message.as_mut_ptr().offset(CONN_ID_POS as isize);
269
270            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
271        }
272    }
273
274    pub fn write_msg_type(&mut self, msg_type: MsgType) {
275        unsafe {
276            let bytes = (msg_type as i32).to_le_bytes();
277
278            let ptr = self.msg_buffer.as_mut_ptr().offset(MSG_TYPE_POS as isize);
279
280            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
281        }
282    }
283
284    pub fn msg_body_buffer(&mut self) -> &mut [u8] {
285        let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
286
287        &mut self.msg_buffer[msg_body_pos as usize..]
288    }
289
290    pub fn write_msg_body(&mut self, message: &Vec<u8>) {
291        unsafe {
292            let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
293
294            let dst_ptr = self.msg_buffer.as_mut_ptr().offset(msg_body_pos as isize);
295
296            std::ptr::copy_nonoverlapping(message.as_ptr(), dst_ptr, message.len());
297        }
298    }
299
300    pub fn write_msg_body_by_arr(&mut self, message: &[u8]) {
301        unsafe {
302            let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
303
304            let dst_ptr = self.msg_buffer.as_mut_ptr().offset(msg_body_pos as isize);
305
306            std::ptr::copy_nonoverlapping(message.as_ptr(), dst_ptr, message.len());
307        }
308    }
309
310    /**
311     *
312     */
313    pub fn write_timeout(&mut self, timeout: u32) {
314        unsafe {
315            let bytes = timeout.to_le_bytes();
316
317            let ptr = self.msg_buffer.as_mut_ptr().offset(TIME_OUT_POS as isize);
318
319            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
320        }
321    }
322
323    /**
324     *
325     */
326    fn write_version(&mut self) {
327        unsafe {
328            let bytes = (2 as u16).to_le_bytes();
329
330            let ptr = self.msg_buffer.as_mut_ptr().offset(VERSION_POS as isize);
331
332            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 2);
333        }
334    }
335    /**
336     * 写入长度
337     */
338    fn write_data_len(&mut self, data_len: u32) {
339        unsafe {
340            let bytes = data_len.to_le_bytes();
341
342            let ptr = self.msg_buffer.as_mut_ptr().offset(DATA_LEN_POS as isize);
343
344            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
345        }
346    }
347}