vertx_rust/vertx/
message.rs

1use std::convert::TryInto;
2use std::sync::Arc;
3use std::ops::Deref;
4use std::panic::RefUnwindSafe;
5use std::sync::atomic::{AtomicBool, AtomicI16, Ordering};
6use atomic_refcell::AtomicRefCell;
7use crate::vertx::message::Body::{ByteArray, Byte, Short};
8
9#[derive(Default, Debug)]
10pub struct MessageInner {
11    //Destination sub address
12    pub(crate) address: Option<String>,
13    //Replay sub address
14    pub(crate) replay: Option<String>,
15    //Binary body content
16    pub(crate) body: Arc<Body>,
17    //Protocol version
18    #[allow(dead_code)]
19    pub(crate) protocol_version: i32,
20    //System codec id
21    #[allow(dead_code)]
22    pub(crate) system_codec_id: i32,
23    //Port to replay message
24    pub(crate) port: i32,
25    //Host to replay message
26    pub(crate) host: String,
27    //Headers
28    #[allow(dead_code)]
29    pub(crate) headers: i32,
30    //Message send as publish to all nodes in sub
31    pub(crate) publish: bool,
32}
33
34//Message used in event bus in standalone instance and cluster
35#[derive(Default, Debug)]
36pub struct Message {
37    pub(crate) inner: AtomicRefCell<MessageInner>,
38    pub(crate) invoke: AtomicBool,
39    pub(crate) invoke_count: AtomicI16,
40}
41
42impl RefUnwindSafe for Message {}
43impl RefUnwindSafe for MessageInner {}
44
45#[derive(Clone, Debug)]
46pub enum Body {
47
48    Byte(u8),
49    Short(i16),
50    Int(i32),
51    Long(i64),
52    Float(f32),
53    Double(f64),
54    String(String),
55    ByteArray(Vec<u8>),
56    Boolean(bool),
57    Char(char),
58    Null,
59    Ping,
60    Panic(String)
61}
62
63impl Body {
64
65    #[inline]
66    pub fn is_null(&self) -> bool {
67        matches!(self, Body::Null)
68    }
69
70    #[inline]
71    pub fn as_bool(&self) -> Result<bool, &str> {
72        match self {
73            Body::Boolean(s) => Ok(*s),
74            _ => Err("Body type is not a bool")
75        }
76    }
77
78    #[inline]
79    pub fn as_f64(&self) -> Result<f64, &str> {
80        match self {
81            Body::Double(s) => Ok(*s),
82            _ => Err("Body type is not a f64")
83        }
84    }
85
86    #[inline]
87    pub fn as_f32(&self) -> Result<f32, &str> {
88        match self {
89            Body::Float(s) => Ok(*s),
90            _ => Err("Body type is not a f32")
91        }
92    }
93
94    #[inline]
95    pub fn as_i64(&self) -> Result<i64, &str> {
96        match self {
97            Body::Long(s) => Ok(*s),
98            _ => Err("Body type is not a i64")
99        }
100    }
101    
102    #[inline]
103    pub fn as_i32(&self) -> Result<i32, &str> {
104        match self {
105            Body::Int(s) => Ok(*s),
106            _ => Err("Body type is not a i32")
107        }
108    }
109
110    #[inline]
111    pub fn as_i16(&self) -> Result<i16, &str> {
112        match self {
113            Short(s) => Ok(*s),
114            _ => Err("Body type is not a i16")
115        }
116    }
117
118    #[inline]
119    pub fn as_u8(&self) -> Result<u8, &str> {
120        match self {
121            Byte(s) => Ok(*s),
122            _ => Err("Body type is not a u8")
123        }
124    }
125
126    #[inline]
127    pub fn as_string(&self) -> Result<&String, &str> {
128        match self {
129            Body::String(s) => Ok(s),
130            _ => Err("Body type is not a String")
131        }
132    }
133
134    #[inline]
135    pub fn as_bytes(&self) -> Result<&Vec<u8>, &str> {
136        match self {
137            ByteArray(s) => Ok(s),
138            _ => Err("Body type is not a Byte Array")
139        }
140    }
141
142    #[inline]
143    pub fn as_panic(&self) -> Result<&String, &str> {
144        match self {
145            Body::Panic(s) => Ok(s),
146            _ => Err("Body type is not a Panic")
147        }
148    }
149}
150
151impl Default for Body {
152    fn default() -> Self {
153        Self::Null
154    }
155}
156
157impl Message {
158    #[inline]
159    pub fn body(&self) -> Arc<Body> {
160        self.inner.borrow().body.clone()
161    }
162
163    pub fn address(&self) -> Option<String> {
164        self.inner.borrow().address.clone()
165    }
166
167    pub fn replay(&self) -> Option<String> {
168        self.inner.borrow().replay.clone()
169    }
170
171    //Reply message to event bus
172    #[inline]
173    pub fn reply(&self, data: Body) {
174        let mut inner = self.inner.borrow_mut();
175        inner.body = Arc::new(data);
176        inner.address = inner.replay.clone();
177        inner.replay = None;
178        self.invoke.store(false, Ordering::SeqCst);
179    }
180
181    pub fn generate() -> Message {
182        let inner = MessageInner {
183            address: Some("test.01".to_string()),
184            replay: Some(format!(
185                "__vertx.reply.{}",
186                uuid::Uuid::new_v4().to_string()
187            )),
188            body: Arc::new(Body::String(uuid::Uuid::new_v4().to_string())),
189            port: 44532_i32,
190            host: "localhost".to_string(),
191            ..Default::default()
192        };
193        Message {
194            inner: AtomicRefCell::new(inner),
195            invoke: AtomicBool::new(false),
196            invoke_count: AtomicI16::new(0)
197        }
198    }
199}
200
201//Implementation of deserialize byte array to message
202impl From<Vec<u8>> for Message {
203    #[inline]
204    fn from(msg: Vec<u8>) -> Self {
205        let mut idx = 1;
206        let system_codec_id = i8::from_be_bytes(msg[idx..idx + 1].try_into().unwrap()) as i32;
207        idx += 2;
208        let len_addr = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
209        idx += 4;
210        let address = String::from_utf8(msg[idx..idx + len_addr].to_vec()).unwrap();
211        idx += len_addr;
212        let len_replay = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
213        idx += 4;
214        let mut replay = None;
215        if len_replay > 0 {
216            let replay_str = String::from_utf8(msg[idx..idx + len_replay].to_vec()).unwrap();
217            idx += len_replay;
218            replay = Some(replay_str);
219        }
220        let port = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap());
221        idx += 4;
222        let len_host = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
223        idx += 4;
224        let host = String::from_utf8(msg[idx..idx + len_host].to_vec()).unwrap();
225        idx += len_host;
226        let headers = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap());
227        idx += 4;
228        let body;
229        match system_codec_id {
230            99 => {
231                let len_body = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
232                idx += 4;
233                let body_array = msg[idx..idx + len_body].to_vec();
234                body = Body::Panic(String::from_utf8(body_array).unwrap())
235            }
236            0 => {
237                body = Body::Null
238            },
239            1 => {
240                body = Body::Ping
241            }
242            2 => {
243                body = Body::Byte(u8::from_be_bytes(msg[idx..idx + 1].try_into().unwrap()))
244            }
245            3 => {
246                body = Body::Boolean(i8::from_be_bytes(msg[idx..idx + 1].try_into().unwrap()) == 1)
247            },
248            4 => {
249                body = Body::Short(i16::from_be_bytes(msg[idx..idx + 2].try_into().unwrap()))
250            }
251            5 => {
252                body = Body::Int(i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()))
253            },
254            6 => {
255                body = Body::Long(i64::from_be_bytes(msg[idx..idx + 8].try_into().unwrap()))
256            },
257            7 => {
258                body = Body::Float(f32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()))
259            },
260            8 => {
261                body = Body::Double(f64::from_be_bytes(msg[idx..idx + 8].try_into().unwrap()))
262            },
263            9 => {
264                let len_body = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
265                idx += 4;
266                let body_array = msg[idx..idx + len_body].to_vec();
267                body = Body::String(String::from_utf8(body_array).unwrap())
268            },
269            10 => {
270                body = Body::Char(char::from_u32(i16::from_be_bytes(msg[idx..idx + 2].try_into().unwrap()) as u32).unwrap())
271            }
272            12 => {
273                let len_body = i32::from_be_bytes(msg[idx..idx + 4].try_into().unwrap()) as usize;
274                idx += 4;
275                let body_array = msg[idx..idx + len_body].to_vec();
276                body = Body::ByteArray(body_array)
277            },
278            _ => panic!("system_codec_id: {} not supported", system_codec_id)
279        }
280
281        let inner = MessageInner {
282            address: Some(address),
283            replay,
284            port,
285            host,
286            headers,
287            body: Arc::new(body),
288            system_codec_id,
289            ..Default::default()
290        };
291        Message {
292            inner: AtomicRefCell::new(inner),
293            invoke: AtomicBool::new(false),
294            invoke_count: AtomicI16::new(0)
295        }
296    }
297}
298
299impl Message {
300    //Serialize message to byte array
301    #[inline]
302    pub fn to_vec(&self) -> Result<Vec<u8>, &str> {
303        let mut data = Vec::with_capacity(2048);
304        data.push(1);
305
306        match self.inner.borrow().body.deref() {
307            Body::Int(_) => {
308                data.push(5);
309            }
310            Body::Long(_) => {
311                data.push(6);
312            }
313            Body::Float(_) => {
314                data.push(7);
315            }
316            Body::Double(_) => {
317                data.push(8);
318            }
319            Body::String(_) => {
320                data.push(9);
321            }
322            Body::ByteArray(_) => {
323                data.push(12);
324            }
325            Body::Boolean(_) => {
326                data.push(3);
327            }
328            Body::Null => {
329                data.push(0);
330            }
331            Body::Byte(_) => {
332                data.push(2);
333            }
334            Body::Short(_) => {
335                data.push(4);
336            }
337            Body::Char(_) => {
338                data.push(10);
339            }
340            Body::Ping => {
341                data.push(1);
342            }
343            Body::Panic(_) => {
344                data.push(99);
345            }
346        };
347
348        data.push(0);
349        if let Some(address) = &self.address() {
350            data.extend_from_slice(&(address.len() as i32).to_be_bytes());
351            data.extend_from_slice(address.as_bytes());
352        }
353        match &self.replay() {
354            Some(addr) => {
355                data.extend_from_slice(&(addr.len() as i32).to_be_bytes());
356                data.extend_from_slice(addr.as_bytes());
357            }
358            None => {
359                data.extend_from_slice(&(0_i32).to_be_bytes());
360            }
361        }
362        data.extend_from_slice(&self.inner.borrow().port.to_be_bytes());
363        data.extend_from_slice(&(self.inner.borrow().host.len() as i32).to_be_bytes());
364        data.extend_from_slice(self.inner.borrow().host.as_bytes());
365        data.extend_from_slice(&(4_i32).to_be_bytes());
366
367        match self.inner.borrow().body.deref() {
368            Body::Int(b) => {
369                data.extend_from_slice(b.to_be_bytes().as_slice());
370            }
371            Body::Long(b) => {
372                data.extend_from_slice(b.to_be_bytes().as_slice());
373            }
374            Body::Float(b) => {
375                data.extend_from_slice(b.to_be_bytes().as_slice());
376            }
377            Body::Double(b) => {
378                data.extend_from_slice(b.to_be_bytes().as_slice());
379            }
380            Body::String(b) => {
381                data.extend_from_slice(&(b.len() as i32).to_be_bytes());
382                data.extend_from_slice(b.as_bytes());
383            }
384            Body::ByteArray(b) => {
385                data.extend_from_slice(&(b.len() as i32).to_be_bytes());
386                data.extend_from_slice(b.as_slice());
387            }
388            Body::Boolean(b) => {
389                if *b {
390                    data.extend_from_slice((1_i8).to_be_bytes().as_slice())
391                } else {
392                    data.extend_from_slice((0_i8).to_be_bytes().as_slice())
393                }
394            }
395            Body::Byte(b) => {
396                data.push(*b);
397            }
398            Body::Short(b) => {
399                data.extend_from_slice(b.to_be_bytes().as_slice());
400            }
401            Body::Char(b) => {
402                data.extend_from_slice((((*b) as u32) as i16).to_be_bytes().as_slice());
403            }
404            Body::Panic(b) => {
405                data.extend_from_slice(&(b.len() as i32).to_be_bytes());
406                data.extend_from_slice(b.as_bytes());
407            }
408            _ => {}
409        };
410
411        let len = ((data.len()) as i32).to_be_bytes();
412        for idx in 0..4 {
413            data.insert(idx, len[idx]);
414        }
415        Ok(data)
416    }
417}