xCommonLib/protocol/
protocol_v2.rs1use protobuf::{
2 CodedInputStream, CodedOutputStream,
3};
4
5use crate::{
6 base::status::Status,
7 serial::{api_descriptor::make_tag, request_message::RequestMessage},
8 service::sys_service_api::ServiceKey,
9};
10
11use super::{MsgType, XID};
12
13const DATA_LEN_POS: u16 = 0;
14const VERSION_POS: u16 = 4;
15const MSG_TYPE_POS: u16 = VERSION_POS + 2;
16const CONN_ID_POS: u16 = MSG_TYPE_POS + 4;
17const REQ_ID_POS: u16 = CONN_ID_POS + 8;
18const TIME_OUT_POS: u16 = REQ_ID_POS + 8;
19const FIX_LEN_POS: u16 = TIME_OUT_POS + 4;
20
21pub struct ProtocolV2Reader<'a> {
23 msg_buffer: &'a [u8],
24}
25
26impl<'a> ProtocolV2Reader<'a> {
27 pub fn new(msg_buffer: &'a [u8]) -> ProtocolV2Reader<'a> {
28 ProtocolV2Reader { msg_buffer }
29 }
30 pub fn msg_type(&self) -> Option<MsgType> {
32 unsafe {
33 let ptr = self.msg_buffer.as_ptr().offset(MSG_TYPE_POS as isize);
34 let bytes = *(ptr as *const [u8; 4]);
35 let value = i32::from_le_bytes(bytes);
36 MsgType::from(value)
37 }
38 }
39 pub fn version() -> u16 {
41 2
42 }
43 pub fn conn_id(&self) -> i64 {
45 unsafe {
46 let ptr = self.msg_buffer.as_ptr().offset(CONN_ID_POS as isize);
47
48 let bytes = *(ptr as *const [u8; 8]);
49
50 i64::from_le_bytes(bytes)
51 }
52 }
53
54 pub fn req_id(&self) -> i64 {
58 unsafe {
59 let ptr = self.msg_buffer.as_ptr().offset(REQ_ID_POS as isize);
60
61 let bytes = *(ptr as *const [u8; 8]);
62
63 i64::from_le_bytes(bytes)
64 }
65 }
66 pub fn xid(&self) -> XID {
70 XID {
71 conn_id: self.conn_id(),
72 request_id: self.req_id(),
73 }
74 }
75 pub fn timeout(&self) -> u32 {
79 unsafe {
80 let ptr = self.msg_buffer.as_ptr().offset(TIME_OUT_POS as isize);
81
82 let bytes = *(ptr as *const [u8; 4]);
83
84 u32::from_le_bytes(bytes)
85 }
86 }
87 pub fn msg_size(&self) -> u32 {
91 self.msg_buffer.len() as u32
92 }
93
94 pub fn msg_body_without_vrs(&self) -> &'a [u8] {
95
96 let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
97
98 let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
99
100 let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
101
102 let receiver_key_slice = &self.msg_buffer[skip_bytes..];
103
104 let skip_receiver_bytes = ServiceKey::skip_from_bytes(receiver_key_slice);
105
106 &self.msg_buffer
107 [(skip_bytes + skip_receiver_bytes as usize) as usize..(self.msg_buffer.len() - 68)]
108 }
109 pub fn msg_body_with_vrs(&self) -> &'a [u8] {
111
112 let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
113
114 let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
115
116 let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
117
118 let receiver_key_slice = &self.msg_buffer[skip_bytes..];
119
120 let skip_receiver_bytes = ServiceKey::skip_from_bytes(receiver_key_slice);
121
122 &self.msg_buffer[(skip_bytes + skip_receiver_bytes as usize) as usize..]
123 }
124 pub fn sender(&self) -> ServiceKey {
126 let service_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
127
128 let mut sender = ServiceKey::default();
129
130 sender
131 .parse_from_bytes_return_num(service_key_slice)
132 .unwrap();
133
134 sender
135 }
136
137 pub fn receiver(&self) -> ServiceKey {
138 let sender_key_slice = &self.msg_buffer[FIX_LEN_POS as usize..];
139 let skip_sender_bytes = ServiceKey::skip_from_bytes(sender_key_slice);
140 let mut receiver = ServiceKey::default();
141 let skip_bytes = FIX_LEN_POS as usize + skip_sender_bytes as usize;
142 let service_key_slice = &self.msg_buffer[skip_bytes..];
143 receiver
144 .parse_from_bytes_return_num(service_key_slice)
145 .unwrap();
146
147 receiver
148 }
149
150 pub fn api(&self) -> Result<String, Status> {
151 let msg_body = self.msg_body_without_vrs();
152
153 let mut is = CodedInputStream::from_bytes(msg_body);
154
155 let tag = is.read_raw_tag_or_eof();
156
157 if let Err(err) = tag {
158 return Err(Status::error(err.to_string()));
159 }
160
161 let tag = tag.unwrap();
162
163 if tag.is_none() {
164 return Err(Status::error("读取 Api 类型标签出错!".into()));
165 }
166
167 let tag = tag.unwrap();
168 let api_tag = make_tag(1, 2);
169 if tag != api_tag {
170 return Err(Status::error("Api 类型错误!".into()));
171 }
172
173 let api = is.read_string();
174
175 match api {
176 Ok(api) => Ok(api),
177 Err(err) => Err(Status::error(err.to_string())),
178 }
179 }
180}
181
182pub struct ProtocolV2Writer {
183 pub msg_buffer: Vec<u8>,
184 pub msg_body_size: u32,
185}
186
187impl ProtocolV2Writer {
188 pub fn new(
189 msg_size: u32,
190 req_id: i64,
191 sender: &ServiceKey,
192 receiver: &ServiceKey,
193 ) -> ProtocolV2Writer {
194 let sender_size = sender.compute_size_with_tag_and_len() as u32;
195
196 let receiver_size = receiver.compute_size_with_tag_and_len() as u32;
197
198 let msg_len =
199 (FIX_LEN_POS as u32 + msg_size + sender_size + receiver_size as u32) as usize;
200
201 let mut msg_buffer: Vec<u8> = Vec::with_capacity(msg_len);
202 unsafe {
203 msg_buffer.set_len(msg_len);
204 }
205
206 let mut writer = ProtocolV2Writer {
207 msg_buffer,
208 msg_body_size: msg_size,
209 };
210 writer.write_conn_id(0);
212 writer.write_req_id(req_id);
213
214 writer.write_version();
215 writer.write_data_len(msg_len as u32);
216 writer.write_timeout(u32::MAX);
217 writer.write_service_key(sender, receiver);
220
221 writer
222 }
223
224 fn write_service_key(&mut self, sender: &ServiceKey, receiver: &ServiceKey) {
225 unsafe {
227 let service_key_slice = self.msg_buffer.as_mut_ptr().offset(FIX_LEN_POS as isize);
228
229 let service_key_slice = std::slice::from_raw_parts_mut(
230 service_key_slice,
231 self.msg_buffer.len() - FIX_LEN_POS as usize,
232 );
233
234 let mut os = CodedOutputStream::bytes(service_key_slice);
235
236 sender.serial_with_tag_and_len(&mut os);
237
238 receiver.serial_with_tag_and_len(&mut os);
239 }
240 }
241
242 pub fn write_req_id(&mut self, req_id: i64) {
243 unsafe {
244 let bytes = req_id.to_le_bytes();
245
246 let ptr = self.msg_buffer.as_mut_ptr().offset(REQ_ID_POS as isize);
247
248 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
249 }
250 }
251 pub fn write_conn_id(&mut self, conn_id: i64) {
255 unsafe {
256 let bytes = conn_id.to_le_bytes();
257
258 let ptr = self.msg_buffer.as_mut_ptr().offset(CONN_ID_POS as isize);
259
260 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
261 }
262 }
263
264 pub fn write_conn_id_to_message(message: &mut Vec<u8>, conn_id: i64) {
265 unsafe {
266 let bytes = conn_id.to_le_bytes();
267
268 let ptr = message.as_mut_ptr().offset(CONN_ID_POS as isize);
269
270 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 8);
271 }
272 }
273
274 pub fn write_msg_type(&mut self, msg_type: MsgType) {
275 unsafe {
276 let bytes = (msg_type as i32).to_le_bytes();
277
278 let ptr = self.msg_buffer.as_mut_ptr().offset(MSG_TYPE_POS as isize);
279
280 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
281 }
282 }
283
284 pub fn msg_body_buffer(&mut self) -> &mut [u8] {
285 let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
286
287 &mut self.msg_buffer[msg_body_pos as usize..]
288 }
289
290 pub fn write_msg_body(&mut self, message: &Vec<u8>) {
291 unsafe {
292 let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
293
294 let dst_ptr = self.msg_buffer.as_mut_ptr().offset(msg_body_pos as isize);
295
296 std::ptr::copy_nonoverlapping(message.as_ptr(), dst_ptr, message.len());
297 }
298 }
299
300 pub fn write_msg_body_by_arr(&mut self, message: &[u8]) {
301 unsafe {
302 let msg_body_pos = self.msg_buffer.len() - self.msg_body_size as usize;
303
304 let dst_ptr = self.msg_buffer.as_mut_ptr().offset(msg_body_pos as isize);
305
306 std::ptr::copy_nonoverlapping(message.as_ptr(), dst_ptr, message.len());
307 }
308 }
309
310 pub fn write_timeout(&mut self, timeout: u32) {
314 unsafe {
315 let bytes = timeout.to_le_bytes();
316
317 let ptr = self.msg_buffer.as_mut_ptr().offset(TIME_OUT_POS as isize);
318
319 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
320 }
321 }
322
323 fn write_version(&mut self) {
327 unsafe {
328 let bytes = (2 as u16).to_le_bytes();
329
330 let ptr = self.msg_buffer.as_mut_ptr().offset(VERSION_POS as isize);
331
332 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 2);
333 }
334 }
335 fn write_data_len(&mut self, data_len: u32) {
339 unsafe {
340 let bytes = data_len.to_le_bytes();
341
342 let ptr = self.msg_buffer.as_mut_ptr().offset(DATA_LEN_POS as isize);
343
344 std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, 4);
345 }
346 }
347}