vertx_rust/vertx/
message.rs1use std::convert::TryInto;
2use std::sync::Arc;
3use std::ops::Deref;
4use std::panic::RefUnwindSafe;
5use std::sync::atomic::{AtomicBool, AtomicI16, Ordering};
6use atomic_refcell::AtomicRefCell;
7use crate::vertx::message::Body::{ByteArray, Byte, Short};
8
9#[derive(Default, Debug)]
10pub struct MessageInner {
11 pub(crate) address: Option<String>,
13 pub(crate) replay: Option<String>,
15 pub(crate) body: Arc<Body>,
17 #[allow(dead_code)]
19 pub(crate) protocol_version: i32,
20 #[allow(dead_code)]
22 pub(crate) system_codec_id: i32,
23 pub(crate) port: i32,
25 pub(crate) host: String,
27 #[allow(dead_code)]
29 pub(crate) headers: i32,
30 pub(crate) publish: bool,
32}
33
34#[derive(Default, Debug)]
36pub struct Message {
37 pub(crate) inner: AtomicRefCell<MessageInner>,
38 pub(crate) invoke: AtomicBool,
39 pub(crate) invoke_count: AtomicI16,
40}
41
42impl RefUnwindSafe for Message {}
43impl RefUnwindSafe for MessageInner {}
44
45#[derive(Clone, Debug)]
46pub enum Body {
47
48 Byte(u8),
49 Short(i16),
50 Int(i32),
51 Long(i64),
52 Float(f32),
53 Double(f64),
54 String(String),
55 ByteArray(Vec<u8>),
56 Boolean(bool),
57 Char(char),
58 Null,
59 Ping,
60 Panic(String)
61}
62
63impl Body {
64
65 #[inline]
66 pub fn is_null(&self) -> bool {
67 matches!(self, Body::Null)
68 }
69
70 #[inline]
71 pub fn as_bool(&self) -> Result<bool, &str> {
72 match self {
73 Body::Boolean(s) => Ok(*s),
74 _ => Err("Body type is not a bool")
75 }
76 }
77
78 #[inline]
79 pub fn as_f64(&self) -> Result<f64, &str> {
80 match self {
81 Body::Double(s) => Ok(*s),
82 _ => Err("Body type is not a f64")
83 }
84 }
85
86 #[inline]
87 pub fn as_f32(&self) -> Result<f32, &str> {
88 match self {
89 Body::Float(s) => Ok(*s),
90 _ => Err("Body type is not a f32")
91 }
92 }
93
94 #[inline]
95 pub fn as_i64(&self) -> Result<i64, &str> {
96 match self {
97 Body::Long(s) => Ok(*s),
98 _ => Err("Body type is not a i64")
99 }
100 }
101
102 #[inline]
103 pub fn as_i32(&self) -> Result<i32, &str> {
104 match self {
105 Body::Int(s) => Ok(*s),
106 _ => Err("Body type is not a i32")
107 }
108 }
109
110 #[inline]
111 pub fn as_i16(&self) -> Result<i16, &str> {
112 match self {
113 Short(s) => Ok(*s),
114 _ => Err("Body type is not a i16")
115 }
116 }
117
118 #[inline]
119 pub fn as_u8(&self) -> Result<u8, &str> {
120 match self {
121 Byte(s) => Ok(*s),
122 _ => Err("Body type is not a u8")
123 }
124 }
125
126 #[inline]
127 pub fn as_string(&self) -> Result<&String, &str> {
128 match self {
129 Body::String(s) => Ok(s),
130 _ => Err("Body type is not a String")
131 }
132 }
133
134 #[inline]
135 pub fn as_bytes(&self) -> Result<&Vec<u8>, &str> {
136 match self {
137 ByteArray(s) => Ok(s),
138 _ => Err("Body type is not a Byte Array")
139 }
140 }
141
142 #[inline]
143 pub fn as_panic(&self) -> Result<&String, &str> {
144 match self {
145 Body::Panic(s) => Ok(s),
146 _ => Err("Body type is not a Panic")
147 }
148 }
149}
150
151impl Default for Body {
152 fn default() -> Self {
153 Self::Null
154 }
155}
156
157impl Message {
158 #[inline]
159 pub fn body(&self) -> Arc<Body> {
160 self.inner.borrow().body.clone()
161 }
162
163 pub fn address(&self) -> Option<String> {
164 self.inner.borrow().address.clone()
165 }
166
167 pub fn replay(&self) -> Option<String> {
168 self.inner.borrow().replay.clone()
169 }
170
171 #[inline]
173 pub fn reply(&self, data: Body) {
174 let mut inner = self.inner.borrow_mut();
175 inner.body = Arc::new(data);
176 inner.address = inner.replay.clone();
177 inner.replay = None;
178 self.invoke.store(false, Ordering::SeqCst);
179 }
180
181 pub fn generate() -> Message {
182 let inner = MessageInner {
183 address: Some("test.01".to_string()),
184 replay: Some(format!(
185 "__vertx.reply.{}",
186 uuid::Uuid::new_v4().to_string()
187 )),
188 body: Arc::new(Body::String(uuid::Uuid::new_v4().to_string())),
189 port: 44532_i32,
190 host: "localhost".to_string(),
191 ..Default::default()
192 };
193 Message {
194 inner: AtomicRefCell::new(inner),
195 invoke: AtomicBool::new(false),
196 invoke_count: AtomicI16::new(0)
197 }
198 }
199}
200
201impl From<Vec<u8>> for Message {
203 #[inline]
204 fn from(msg: Vec<u8>) -> Self {
205 let mut idx = 1;
206 let system_codec_id = i8::from_be_bytes(msg[idx..idx + 1].try_into().unwrap()) as i32;
207 idx += 2;
208 let len_addr = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
209 idx += 4;
210 let address = String::from_utf8(msg[idx..idx + len_addr].to_vec()).unwrap();
211 idx += len_addr;
212 let len_replay = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
213 idx += 4;
214 let mut replay = None;
215 if len_replay > 0 {
216 let replay_str = String::from_utf8(msg[idx..idx + len_replay].to_vec()).unwrap();
217 idx += len_replay;
218 replay = Some(replay_str);
219 }
220 let port = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap());
221 idx += 4;
222 let len_host = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
223 idx += 4;
224 let host = String::from_utf8(msg[idx..idx + len_host].to_vec()).unwrap();
225 idx += len_host;
226 let headers = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap());
227 idx += 4;
228 let body;
229 match system_codec_id {
230 99 => {
231 let len_body = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
232 idx += 4;
233 let body_array = msg[idx..idx + len_body].to_vec();
234 body = Body::Panic(String::from_utf8(body_array).unwrap())
235 }
236 0 => {
237 body = Body::Null
238 },
239 1 => {
240 body = Body::Ping
241 }
242 2 => {
243 body = Body::Byte(u8::from_be_bytes(msg[idx..idx + 1].try_into().unwrap()))
244 }
245 3 => {
246 body = Body::Boolean(i8::from_be_bytes(msg[idx..idx + 1].try_into().unwrap()) == 1)
247 },
248 4 => {
249 body = Body::Short(i16::from_be_bytes(msg[idx..idx + 2].try_into().unwrap()))
250 }
251 5 => {
252 body = Body::Int(i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()))
253 },
254 6 => {
255 body = Body::Long(i64::from_be_bytes(msg[idx..idx + 8].try_into().unwrap()))
256 },
257 7 => {
258 body = Body::Float(f32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()))
259 },
260 8 => {
261 body = Body::Double(f64::from_be_bytes(msg[idx..idx + 8].try_into().unwrap()))
262 },
263 9 => {
264 let len_body = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
265 idx += 4;
266 let body_array = msg[idx..idx + len_body].to_vec();
267 body = Body::String(String::from_utf8(body_array).unwrap())
268 },
269 10 => {
270 body = Body::Char(char::from_u32(i16::from_be_bytes(msg[idx..idx + 2].try_into().unwrap()) as u32).unwrap())
271 }
272 12 => {
273 let len_body = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
274 idx += 4;
275 let body_array = msg[idx..idx + len_body].to_vec();
276 body = Body::ByteArray(body_array)
277 },
278 _ => panic!("system_codec_id: {} not supported", system_codec_id)
279 }
280
281 let inner = MessageInner {
282 address: Some(address),
283 replay,
284 port,
285 host,
286 headers,
287 body: Arc::new(body),
288 system_codec_id,
289 ..Default::default()
290 };
291 Message {
292 inner: AtomicRefCell::new(inner),
293 invoke: AtomicBool::new(false),
294 invoke_count: AtomicI16::new(0)
295 }
296 }
297}
298
299impl Message {
300 #[inline]
302 pub fn to_vec(&self) -> Result<Vec<u8>, &str> {
303 let mut data = Vec::with_capacity(2048);
304 data.push(1);
305
306 match self.inner.borrow().body.deref() {
307 Body::Int(_) => {
308 data.push(5);
309 }
310 Body::Long(_) => {
311 data.push(6);
312 }
313 Body::Float(_) => {
314 data.push(7);
315 }
316 Body::Double(_) => {
317 data.push(8);
318 }
319 Body::String(_) => {
320 data.push(9);
321 }
322 Body::ByteArray(_) => {
323 data.push(12);
324 }
325 Body::Boolean(_) => {
326 data.push(3);
327 }
328 Body::Null => {
329 data.push(0);
330 }
331 Body::Byte(_) => {
332 data.push(2);
333 }
334 Body::Short(_) => {
335 data.push(4);
336 }
337 Body::Char(_) => {
338 data.push(10);
339 }
340 Body::Ping => {
341 data.push(1);
342 }
343 Body::Panic(_) => {
344 data.push(99);
345 }
346 };
347
348 data.push(0);
349 if let Some(address) = &self.address() {
350 data.extend_from_slice(&(address.len() as i32).to_be_bytes());
351 data.extend_from_slice(address.as_bytes());
352 }
353 match &self.replay() {
354 Some(addr) => {
355 data.extend_from_slice(&(addr.len() as i32).to_be_bytes());
356 data.extend_from_slice(addr.as_bytes());
357 }
358 None => {
359 data.extend_from_slice(&(0_i32).to_be_bytes());
360 }
361 }
362 data.extend_from_slice(&self.inner.borrow().port.to_be_bytes());
363 data.extend_from_slice(&(self.inner.borrow().host.len() as i32).to_be_bytes());
364 data.extend_from_slice(self.inner.borrow().host.as_bytes());
365 data.extend_from_slice(&(4_i32).to_be_bytes());
366
367 match self.inner.borrow().body.deref() {
368 Body::Int(b) => {
369 data.extend_from_slice(b.to_be_bytes().as_slice());
370 }
371 Body::Long(b) => {
372 data.extend_from_slice(b.to_be_bytes().as_slice());
373 }
374 Body::Float(b) => {
375 data.extend_from_slice(b.to_be_bytes().as_slice());
376 }
377 Body::Double(b) => {
378 data.extend_from_slice(b.to_be_bytes().as_slice());
379 }
380 Body::String(b) => {
381 data.extend_from_slice(&(b.len() as i32).to_be_bytes());
382 data.extend_from_slice(b.as_bytes());
383 }
384 Body::ByteArray(b) => {
385 data.extend_from_slice(&(b.len() as i32).to_be_bytes());
386 data.extend_from_slice(b.as_slice());
387 }
388 Body::Boolean(b) => {
389 if *b {
390 data.extend_from_slice((1_i8).to_be_bytes().as_slice())
391 } else {
392 data.extend_from_slice((0_i8).to_be_bytes().as_slice())
393 }
394 }
395 Body::Byte(b) => {
396 data.push(*b);
397 }
398 Body::Short(b) => {
399 data.extend_from_slice(b.to_be_bytes().as_slice());
400 }
401 Body::Char(b) => {
402 data.extend_from_slice((((*b) as u32) as i16).to_be_bytes().as_slice());
403 }
404 Body::Panic(b) => {
405 data.extend_from_slice(&(b.len() as i32).to_be_bytes());
406 data.extend_from_slice(b.as_bytes());
407 }
408 _ => {}
409 };
410
411 let len = ((data.len()) as i32).to_be_bytes();
412 for idx in 0..4 {
413 data.insert(idx, len[idx]);
414 }
415 Ok(data)
416 }
417}