xCommonLib/protocol/
protocol_v1.rs

1use protobuf::CodedOutputStream;
2
3use crate::serial::request_message::RequestMessage;
4use crate::service::sys_service_api::ServiceKey;
5use super::{MsgType, XID};
6
7const DATA_LEN_POS: u16 = 0;
8const VERSION_POS: u16 = 4;
9const MSG_TYPE_POS: u16 = VERSION_POS + 2;
10const CONN_ID_POS: u16 = MSG_TYPE_POS + 4;
11const REQ_ID_POS: u16 = CONN_ID_POS + 8;
12const TIME_OUT_POS: u16 = REQ_ID_POS + 8;
13const FIX_LEN_POS: u16 = TIME_OUT_POS + 4;
14// len + type
15
16pub struct ProtocolV1Reader<'a> {
17    msg_buffer: &'a [u8],
18}
19
20impl<'a> ProtocolV1Reader<'a> {
21    pub fn new(msg_buffer: &'a [u8]) -> ProtocolV1Reader<'a> {
22        ProtocolV1Reader { msg_buffer }
23    }
24    //
25    pub fn msg_type(&self) -> Option<MsgType> {
26        unsafe {
27            let ptr = self.msg_buffer.as_ptr().offset(MSG_TYPE_POS as isize);
28
29            let bytes = *(ptr as *const [u8; 4]);
30
31            let value = i32::from_le_bytes(bytes);
32
33            MsgType::from(value)
34        }
35    }
36    //
37    pub fn version() -> u16 {
38        1
39    }
40    // 获取连接 id  = 4 字节 ip + 2 字节 端口号
41    pub fn conn_id(&self) -> i64 {
42        unsafe {
43            let ptr = self.msg_buffer.as_ptr().offset(CONN_ID_POS as isize);
44
45            let bytes = *(ptr as *const [u8; 8]);
46
47            i64::from_le_bytes(bytes)
48        }
49    }
50    /**
51     * 获取当前请求的 id
52     */
53    pub fn req_id(&self) -> i64 {
54        unsafe {
55            let ptr = self.msg_buffer.as_ptr().offset(REQ_ID_POS as isize);
56
57            let bytes = *(ptr as *const [u8; 8]);
58
59            i64::from_le_bytes(bytes)
60        }
61    }
62    /**
63     *
64     */
65    pub fn xid(&self) -> XID {
66        XID {
67            conn_id: self.conn_id(),
68            request_id: self.req_id(),
69        }
70    }
71    /**
72     * 超时
73     */
74    pub fn timeout(&self) -> u32 {
75        unsafe {
76            let ptr = self.msg_buffer.as_ptr().offset(TIME_OUT_POS as isize);
77
78            let bytes = *(ptr as *const [u8; 4]);
79
80            u32::from_le_bytes(bytes)
81        }
82    }
83    /**
84     * 获取消息长度
85     */
86    pub fn msg_size(&self) -> u32 {
87        self.msg_buffer.len() as u32
88    }
89
90    pub fn msg_body(&self) -> &'a[u8] {
91        let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
92
93        let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
94
95        let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
96
97        let receiver_key_slice = &self.msg_buffer[skip_bytes..];
98
99        let skip_receiver_bytes = ServiceKey::skip_from_bytes(receiver_key_slice);
100
101        &self.msg_buffer[(skip_bytes + skip_receiver_bytes as usize) as usize..]
102    }
103
104    pub fn sender(&self) -> ServiceKey {
105        let service_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
106
107        let mut sender = ServiceKey::default();
108
109        sender
110            .parse_from_bytes_return_num(service_key_slice)
111            .unwrap();
112
113        sender
114    }
115
116    pub fn receiver(&self) -> ServiceKey {
117        let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
118
119        let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
120
121        let mut receiver = ServiceKey::default();
122
123        let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
124
125        let service_key_slice = &self.msg_buffer[skip_bytes..];
126
127        receiver
128            .parse_from_bytes_return_num(service_key_slice)
129            .unwrap();
130
131        receiver
132    }
133}
134
135pub struct ProtocolV1Writer {
136    pub msg_buffer: Vec<u8>,
137    pub msg_body_size: u32,
138}
139
140impl ProtocolV1Writer {
141    pub fn new(
142        msg_size: u32,
143        req_id: i64,
144        sender: &ServiceKey,
145        receiver: &ServiceKey,
146    ) -> ProtocolV1Writer {
147        let sender_size = sender.compute_size_with_tag_and_len() as u32;
148        
149        let receiver_size = receiver.compute_size_with_tag_and_len() as u32;
150
151        let msg_len = (FIX_LEN_POS as u32 + msg_size + sender_size + receiver_size as u32) as usize;
152
153        let mut msg_buffer: Vec<u8> = Vec::with_capacity(msg_len);
154        unsafe {
155            msg_buffer.set_len(msg_len);
156        }
157
158        let mut writer = ProtocolV1Writer {
159            msg_buffer,
160            msg_body_size: msg_size,
161        };
162        //  写入 xid
163        writer.write_conn_id(0);
164        writer.write_req_id(req_id);
165
166        writer.write_version();
167        writer.write_data_len(msg_len as u32);
168        writer.write_timeout(u32::MAX);
169        //
170
171        writer.write_service_key(sender, receiver);
172
173        writer
174    }
175
176    fn write_service_key(&mut self, sender: &ServiceKey, receiver: &ServiceKey) {
177        //
178        unsafe {
179            let service_key_slice = self.msg_buffer.as_mut_ptr().offset(FIX_LEN_POS as isize);
180
181            let service_key_slice = std::slice::from_raw_parts_mut(
182                service_key_slice,
183                self.msg_buffer.len() - FIX_LEN_POS as usize,
184            );
185
186            let mut os = CodedOutputStream::bytes(service_key_slice);
187
188            sender.serial_with_tag_and_len(&mut os);
189
190            receiver.serial_with_tag_and_len(&mut os);
191
192        }
193    }
194
195    pub fn write_req_id(&mut self, req_id: i64) {
196        unsafe {
197            let bytes = req_id.to_le_bytes();
198
199            let ptr = self.msg_buffer.as_mut_ptr().offset(REQ_ID_POS as isize);
200
201            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
202        }
203    }
204    /**
205     *
206     */
207    pub fn write_conn_id(&mut self, conn_id: i64) {
208        unsafe {
209            let bytes = conn_id.to_le_bytes();
210
211            let ptr = self.msg_buffer.as_mut_ptr().offset(CONN_ID_POS as isize);
212
213            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
214        }
215    }
216
217    pub fn write_conn_id_to_message(message: &mut Vec<u8>, conn_id: i64) {
218        unsafe {
219            let bytes = conn_id.to_le_bytes();
220
221            let ptr = message.as_mut_ptr().offset(CONN_ID_POS as isize);
222
223            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
224        }
225    }
226
227    pub fn write_msg_type(&mut self, msg_type: MsgType) {
228        unsafe {
229            let bytes = (msg_type as i32).to_le_bytes();
230
231            let ptr = self.msg_buffer.as_mut_ptr().offset(MSG_TYPE_POS as isize);
232
233            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
234        }
235    }
236
237    pub fn msg_body_buffer(&mut self) -> &mut [u8] {
238        // msg_body_size
239
240        let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
241
242        &mut self.msg_buffer[msg_body_pos as usize..]
243    }
244
245    pub fn write_msg_body(&mut self, message: &Vec<u8>) {
246        unsafe {
247            let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
248
249            let dst_ptr = self.msg_buffer.as_mut_ptr().offset(msg_body_pos as isize);
250
251            std::ptr::copy_nonoverlapping(message.as_ptr(), dst_ptr, message.len());
252        }
253    }
254
255    pub fn write_msg_body_by_arr(&mut self, message: &[u8]) {
256      unsafe {
257          let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
258
259          let dst_ptr = self.msg_buffer.as_mut_ptr().offset(msg_body_pos as isize);
260
261          std::ptr::copy_nonoverlapping(message.as_ptr(), dst_ptr, message.len());
262      }
263  }
264
265    /**
266     *
267     */
268    pub fn write_timeout(&mut self, timeout: u32) {
269        unsafe {
270            let bytes = timeout.to_le_bytes();
271
272            let ptr = self.msg_buffer.as_mut_ptr().offset(TIME_OUT_POS as isize);
273
274            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
275        }
276    }
277
278    /**
279     *
280     */
281    fn write_version(&mut self) {
282        unsafe {
283            let bytes = (1 as u16).to_le_bytes();
284
285            let ptr = self.msg_buffer.as_mut_ptr().offset(VERSION_POS as isize);
286
287            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 2);
288        }
289    }
290    /**
291     * 写入长度
292     */
293    fn write_data_len(&mut self, data_len: u32) {
294        unsafe {
295            let bytes = data_len.to_le_bytes();
296
297            let ptr = self.msg_buffer.as_mut_ptr().offset(DATA_LEN_POS as isize);
298
299            std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
300        }
301    }
302}