sp_dto/
lib.rs

1use std::io::Cursor;
2use std::fmt::Debug;
3use bytes::{Buf, BufMut};
4use serde_derive::{Serialize, Deserialize};
5use serde_json::{Value, Error};
6use uuid::Uuid;
7pub use bytes;
8pub use uuid;
9
10#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
11pub struct CmpSpec {
12    pub addr: String,
13    pub tx: String        
14}
15
16impl CmpSpec {
17    // These methods create spec for child components, note this: tx: self.addr.clone(),
18    pub fn new_addr(&self, addr: &str) -> CmpSpec {
19        CmpSpec {
20            addr: addr.to_owned(),
21            tx: self.addr.clone()            
22        }
23    }
24    pub fn add_to_addr(&self, delta: &str) -> CmpSpec {
25        CmpSpec {
26            addr: self.addr.clone() + "." + delta,
27            tx: self.addr.clone()            
28        }
29    }
30}
31
32impl Default for CmpSpec {
33    fn default() -> Self {
34        CmpSpec {
35            addr: String::new(),
36            tx: String::new()
37        }
38    }
39}
40
41#[derive(Debug, Serialize, Deserialize, Clone)]
42pub enum Participator {
43    /// UI component addr, app_addr and client addr
44    Component(String, Option<String>, Option<String>),
45    /// Service process addr running in background somewhere
46    Service(String)
47}
48
49/// At the moment used in case when it is needed to overwrite rpc response receiver
50#[derive(Debug, Serialize, Deserialize, Clone)]
51pub enum RouteSpec {
52    /// No rpc reponse receiver overwrite will happen
53    Simple,
54    /// Rpc reponse receiver overwrite will happen
55    Client(Participator)
56}
57
58#[derive(Debug, Serialize, Deserialize, Clone)]
59pub struct Route {
60    pub source: Participator,
61    pub spec: RouteSpec,
62    pub points: Vec<Participator>
63}
64
65/// The message itself
66#[derive(Debug, Deserialize, Clone)]
67pub struct Message<T> {
68    pub meta: MsgMeta,
69    pub payload: T,
70    pub attachments_data: Vec<u8>
71}
72
73/// The message itself with payload as raw bytes
74#[derive(Debug, Deserialize, Clone)]
75pub struct MessageRaw {
76    pub meta: MsgMeta,
77    pub payload: Vec<u8>,
78    pub attachments_data: Vec<u8>
79}
80
81/// Enum used for returning from processing rpc functions in full message mode
82#[derive(Debug, Serialize, Clone)]
83pub enum Response<T> {
84    Simple(T),
85    Full(T, Vec<(String, u64)>, Vec<u8>)
86}
87
88
89/// Helper function for creating responses.
90pub fn resp<T>(payload: T) -> Result<Response<T>, Box<dyn std::error::Error>> {
91    Ok(Response::Simple(payload))
92}
93
94pub fn resp_full<T>(payload: T, attachments: Vec<(String, u64)>, attachments_data: Vec<u8>) -> Result<Response<T>, Box<dyn std::error::Error>> {
95    Ok(Response::Full(payload, attachments, attachments_data))
96}
97
98pub fn resp_raw(payload: Vec<u8>) -> Result<ResponseRaw, Box<dyn std::error::Error>> {
99    Ok(ResponseRaw::Simple(payload))
100}
101
102pub fn resp_raw_full(payload: Vec<u8>, attachments: Vec<(String, u64)>, attachments_data: Vec<u8>) -> Result<ResponseRaw, Box<dyn std::error::Error>> {
103    Ok(ResponseRaw::Full(payload, attachments, attachments_data))
104}
105
106/// Enum used for returning from processing rpc functions in raw mode
107#[derive(Debug, Serialize, Clone)]
108pub enum ResponseRaw {
109    Simple(Vec<u8>),
110    Full(Vec<u8>, Vec<(String, u64)>, Vec<u8>)
111}
112
113
114/// Message meta data. Message passing protocol is build around this structure.
115#[derive(Debug, Serialize, Deserialize, Clone)]
116pub struct MsgMeta {
117    /// Addr of message sender
118    pub tx: String,
119    /// Addr of message receiver
120    pub rx: String,
121    /// Logic key for message processing
122    pub key: String,
123    /// Defines what kind of message it is
124    pub kind: MsgKind,
125    /// Correlation id is needed for rpc and for message chains
126    pub correlation_id: Uuid,
127    /// Logical message route, receiver are responsible for moving message on.
128    pub route: Route,
129    /// Size of payload, used for deserialization. Also useful for monitoring.
130    pub payload_size: u64,
131    /// Authorization token.
132    pub auth_token: Option<String>,
133    /// Authorization data.
134    pub auth_data: Option<Value>,
135    /// Attachments to message
136	pub attachments: Vec<Attachment>
137}
138
139#[derive(Debug, Serialize, Deserialize, Clone)]
140pub enum MsgKind {
141    Event,
142    RpcRequest,
143    RpcResponse(RpcResult)
144}
145
146#[derive(Debug, Serialize, Deserialize, Clone)]
147pub enum RpcResult {
148    Ok,
149    Err
150}
151
152#[derive(Debug, Serialize, Deserialize, Clone)]
153pub struct Attachment {
154	pub name: String,
155    pub size: u64
156}
157
158impl MsgMeta {
159    /// Payload plus attachments len.
160    pub fn content_len(&self) -> u64 {
161        let mut len = self.payload_size;
162        for attachment in &self.attachments {
163            len = len + attachment.size;
164        }
165        len
166    }
167    /// Attachments len.
168    pub fn attachments_len(&self) -> u64 {
169        let mut len = 0;
170        for attachment in &self.attachments {
171            len = len + attachment.size;
172        }
173        len
174    }
175    /// Attachments sizes.
176    pub fn attachments_sizes(&self) -> Vec<u64> {
177        let mut res = vec![];
178        for attachment in &self.attachments {
179            res.push(attachment.size);
180        }
181        res        
182    }
183    /// Short display of message meta data
184    pub fn display(&self) -> String {
185        format!("{} -> {} {} {:?}", self.tx, self.rx, self.key, self.kind)
186    }
187    /// Get key part, index is zero based, . is used as a separator.
188    pub fn key_part(&self, index: usize) -> Result<&str, String> {
189        let split: Vec<&str> = self.key.split(".").collect();
190
191        if index >= split.len() {
192            return Err("index equals or superior to parts length".to_owned());
193        }
194
195        return Ok(split[index])
196    }
197    /// Compares key part with passed value, index is zero based, . is used as a separator.
198    pub fn match_key_part(&self, index: usize, value: &str) -> Result<bool, String> {
199        let split: Vec<&str> = self.key.split(".").collect();
200
201        if index >= split.len() {
202            return Err("index equals or superior to parts length".to_owned());
203        }
204
205        return Ok(split[index] == value)
206    }
207    /// Get tx part, index is zero based, . is used as a separator.
208    pub fn tx_part(&self, index: usize) -> Result<&str, String> {
209        let split: Vec<&str> = self.tx.split(".").collect();
210
211        if index >= split.len() {
212            return Err("index equals or superior to parts length".to_owned());
213        }
214
215        return Ok(split[index])
216    }
217    /// Compares tx part with passed value, index is zero based, . is used as a separator.
218    pub fn match_tx_part(&self, index: usize, value: &str) -> Result<bool, String> {
219        let split: Vec<&str> = self.tx.split(".").collect();
220
221        if index >= split.len() {
222            return Err("index equals or superior to parts length".to_owned());
223        }
224
225        return Ok(split[index] == value)
226    }    
227    pub fn source_cmp_addr(&self) -> Option<&str> {
228        match &self.route.source {
229            Participator::Component(addr, _, _) => Some(&addr),
230            _ => None
231        }
232    }
233    /// Get source cmp part, index is zero based, . is used as a separator.
234    pub fn source_cmp_part(&self, index: usize) -> Result<&str, String> {
235        let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
236        let split: Vec<&str> = addr.split(".").collect();
237
238        if index >= split.len() {
239            return Err("index equals or superior to parts length".to_owned());
240        }
241
242        return Ok(split[index])
243    }
244    /// Compares source cmp part with passed value, index is zero based, . is used as a separator.
245    pub fn match_source_cmp_part(&self, index: usize, value: &str) -> Result<bool, String> {
246        let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
247        let split: Vec<&str> = addr.split(".").collect();
248
249        if index >= split.len() {
250            return Err("index equals or superior to parts length".to_owned());
251        }
252
253        return Ok(split[index] == value)
254    }
255    /// Get source cmp part before last one, . is used as a separator.
256    pub fn source_cmp_part_before_last(&self) -> Result<&str, String> {
257        let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
258        let split: Vec<&str> = addr.split(".").collect();
259
260        if split.len() < 2 {
261            return Err("parts length is less than 2".to_owned());
262        }
263
264        return Ok(split[split.len() - 2])
265    }
266    pub fn source_svc_addr(&self) -> Option<String> {
267        match &self.route.source {
268            Participator::Service(addr) => Some(addr.clone()),
269            _ => None
270        }
271    }
272    pub fn client_cmp_addr(&self) -> Option<String> {
273        match &self.route.spec {
274            RouteSpec::Client(participator) => {
275                match participator {
276                    Participator::Component(addr, _, _) => Some(addr.clone()),
277                    _ => None
278                }
279            }
280            _ => None
281        }
282    }
283    pub fn client_svc_addr(&self) -> Option<String> {
284        match &self.route.spec {
285            RouteSpec::Client(participator) => {
286                match participator {
287                    Participator::Service(addr) => Some(addr.clone()),
288                    _ => None
289                }
290            }
291            _ => None
292        }
293    }
294}
295
296pub fn event_dto<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
297    let mut payload = serde_json::to_vec(&payload)?;
298    let correlation_id = Uuid::new_v4();
299
300    let msg_meta = MsgMeta {
301        tx,
302        rx,
303        key,
304        kind: MsgKind::Event,
305        correlation_id,
306        route,
307        payload_size: payload.len() as u64,
308        auth_token,
309        auth_data,
310		attachments: vec![]
311    };
312
313    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
314
315    let mut buf = vec![];
316
317    buf.put_u32(msg_meta.len() as u32);
318
319    buf.append(&mut msg_meta);
320    buf.append(&mut payload);
321    
322    Ok(buf)
323}
324
325pub fn event_dto_with_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
326    let mut payload = serde_json::to_vec(&payload)?;
327    let correlation_id = Uuid::new_v4();
328    let msg_meta = MsgMeta {
329        tx,
330        rx,
331        key,
332        kind: MsgKind::Event,
333        correlation_id,
334        route,
335        payload_size: payload.len() as u64,
336        auth_token,
337        auth_data,
338		attachments: vec![]
339    };
340    let payload_size = msg_meta.payload_size;
341    let attachments_sizes = msg_meta.attachments_sizes();
342    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
343    let msg_meta_size = msg_meta.len() as u64;
344    let mut buf = vec![];
345    buf.put_u32(msg_meta.len() as u32);
346    buf.append(&mut msg_meta);
347    buf.append(&mut payload);    
348    Ok((buf, msg_meta_size, payload_size, attachments_sizes))
349}
350
351pub fn reply_to_rpc_dto<T>(tx: String, rx: String, key: String, correlation_id: Uuid, payload: T, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
352    let mut payload = serde_json::to_vec(&payload)?;
353
354    let msg_meta = MsgMeta {
355        tx,
356        rx,
357        key,
358        kind: MsgKind::RpcResponse(result),
359        correlation_id,
360        route,
361        payload_size: payload.len() as u64,
362        auth_token,
363        auth_data,
364		attachments: vec![]
365    };        
366
367    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
368
369    let mut buf = vec![];
370
371    buf.put_u32(msg_meta.len() as u32);
372
373    buf.append(&mut msg_meta);
374    buf.append(&mut payload);
375  
376    Ok(buf)
377}
378
379/*
380    pub fn recv_event(&self) -> Result<(MsgMeta, R), Error> {
381        let (msg_meta, len, data) = self.rx.recv()?;            
382
383        let payload = serde_json::from_slice::<R>(&data[len + 4..])?;        
384
385        Ok((msg_meta, payload))
386    }
387    pub fn recv_rpc_request(&self) -> Result<(MsgMeta, R), Error> {
388        let (msg_meta, len, data) = self.rpc_request_rx.recv()?;            
389
390        let payload = serde_json::from_slice::<R>(&data[len + 4..])?;        
391
392        Ok((msg_meta, payload))
393    }
394    */
395
396pub fn rpc_dto<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
397    let mut payload = serde_json::to_vec(&payload)?;
398    let correlation_id = Uuid::new_v4();
399
400    let msg_meta = MsgMeta {
401        tx,
402        rx,
403        key,
404        kind: MsgKind::RpcRequest,
405        correlation_id,
406        route,
407        payload_size: payload.len() as u64,
408        auth_token,
409        auth_data,
410		attachments: vec![]
411    };
412    
413    let mut msg_meta = serde_json::to_vec(&msg_meta)?;
414
415    let mut buf = vec![];
416
417    buf.put_u32(msg_meta.len() as u32);
418
419    buf.append(&mut msg_meta);
420    buf.append(&mut payload);
421
422    Ok(buf)
423}
424
425pub fn rpc_dto_with_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
426    let mut payload = serde_json::to_vec(&payload)?;
427    let correlation_id = Uuid::new_v4();
428    let msg_meta = MsgMeta {
429        tx,
430        rx,
431        key,
432        kind: MsgKind::RpcRequest,
433        correlation_id,
434        route,
435        payload_size: payload.len() as u64,
436        auth_token,
437        auth_data,
438		attachments: vec![]
439    };
440    let payload_size = msg_meta.payload_size;
441    let attachments_sizes = msg_meta.attachments_sizes();
442    let mut msg_meta = serde_json::to_vec(&msg_meta)?;
443    let msg_meta_size = msg_meta.len() as u64;
444    let mut buf = vec![];
445    buf.put_u32(msg_meta.len() as u32);
446    buf.append(&mut msg_meta);
447    buf.append(&mut payload);
448    Ok((buf, msg_meta_size, payload_size, attachments_sizes))
449}
450
451pub fn rpc_dto_with_correlation_id<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>), Error> where T: Debug, T: serde::Serialize {
452    let mut payload = serde_json::to_vec(&payload)?;
453    let correlation_id = Uuid::new_v4();
454    let msg_meta = MsgMeta {
455        tx,
456        rx,
457        key,
458        kind: MsgKind::RpcRequest,
459        correlation_id,
460        route,
461        payload_size: payload.len() as u64,
462        auth_token,
463        auth_data,
464		attachments: vec![]
465    };    
466    let mut msg_meta = serde_json::to_vec(&msg_meta)?;
467    let mut buf = vec![];
468    buf.put_u32(msg_meta.len() as u32);
469    buf.append(&mut msg_meta);
470    buf.append(&mut payload);
471    Ok((correlation_id, buf))
472}
473
474pub fn rpc_dto_with_correlation_id_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
475    let mut payload = serde_json::to_vec(&payload)?;
476    let correlation_id = Uuid::new_v4();
477    let msg_meta = MsgMeta {
478        tx,
479        rx,
480        key,
481        kind: MsgKind::RpcRequest,
482        correlation_id,
483        route,
484        payload_size: payload.len() as u64,
485        auth_token,
486        auth_data,
487		attachments: vec![]
488    };
489    let payload_size = msg_meta.payload_size;
490    let attachments_sizes = msg_meta.attachments_sizes();
491    let mut msg_meta = serde_json::to_vec(&msg_meta)?;
492    let msg_meta_size = msg_meta.len() as u64;
493    let mut buf = vec![];
494    buf.put_u32(msg_meta.len() as u32);
495    buf.append(&mut msg_meta);
496    buf.append(&mut payload);
497    Ok((correlation_id, buf, msg_meta_size, payload_size, attachments_sizes))
498}
499
500pub fn rpc_dto_with_attachments<T>(tx: String, rx: String, key: String, payload: T, attachments: Vec<(String, Vec<u8>)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
501    let mut payload = serde_json::to_vec(&payload)?;
502    let correlation_id = Uuid::new_v4();
503    let mut attachments_meta = vec![];
504    let mut attachments_payload = vec![];
505
506    for (attachment_name, mut attachment_payload) in attachments {
507        attachments_meta.push(Attachment {
508            name: attachment_name,
509            size: attachment_payload.len() as u64
510        });        
511        attachments_payload.append(&mut attachment_payload);
512    }
513
514    let msg_meta = MsgMeta {
515        tx,
516        rx,
517        key,
518        kind: MsgKind::RpcRequest,
519        correlation_id,
520        route,
521        payload_size: payload.len() as u64,
522        auth_token,
523        auth_data,
524		attachments: attachments_meta
525    };
526
527    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
528
529    let mut buf = vec![];
530
531    buf.put_u32(msg_meta.len() as u32);
532
533    buf.append(&mut msg_meta);
534    buf.append(&mut payload);
535    buf.append(&mut attachments_payload);
536
537    Ok(buf)
538}
539
540pub fn rpc_dto_with_later_attachments<T>(tx: String, rx: String, key: String, payload: T, attachments: Vec<(String, u64)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
541    let mut payload = serde_json::to_vec(&payload)?;
542    let correlation_id = Uuid::new_v4();
543    let mut attachments_meta = vec![];    
544
545    for (attachment_name,attachment_size) in attachments {
546        attachments_meta.push(Attachment {
547            name: attachment_name,
548            size: attachment_size
549        });                
550    }
551
552    let msg_meta = MsgMeta {
553        tx,
554        rx,
555        key,
556        kind: MsgKind::RpcRequest,
557        correlation_id,
558        route,
559        payload_size: payload.len() as u64,
560        auth_token,
561        auth_data,
562		attachments: attachments_meta
563    };
564
565    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
566
567    let mut buf = vec![];
568
569    buf.put_u32(msg_meta.len() as u32);
570
571    buf.append(&mut msg_meta);
572    buf.append(&mut payload);    
573
574    Ok(buf)
575}
576
577/*
578impl MagicBall2 {
579    pub fn new(addr: String, sender: Sender, rx: crossbeam::channel::Receiver<(MsgMeta, usize, Vec<u8>)>, rpc_request_rx: crossbeam::channel::Receiver<(MsgMeta, usize, Vec<u8>)>, rpc_tx: crossbeam::channel::Sender<ClientMsg>) -> MagicBall2 {
580        MagicBall2 {
581            addr,
582            sender,
583            rx,
584            rpc_request_rx,
585            rpc_tx
586        }
587    }
588    */
589
590pub fn event_dto2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {        
591    let correlation_id = Uuid::new_v4();
592    
593    let msg_meta = MsgMeta {
594        tx,
595        rx,
596        key,
597        kind: MsgKind::Event,
598        correlation_id,
599        route,
600        payload_size: payload.len() as u64,
601        auth_token,
602        auth_data,
603		attachments: vec![]
604    };
605
606    let mut msg_meta = serde_json::to_vec(&msg_meta)?;        
607
608    let mut buf = vec![];
609
610    buf.put_u32(msg_meta.len() as u32);
611
612    buf.append(&mut msg_meta);
613    buf.append(&mut payload);
614    
615    Ok(buf)
616}
617
618pub fn reply_to_rpc_dto2_sizes(tx: String, rx: String, key: String, correlation_id: Uuid, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, mut attachments_data: Vec<u8>, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> {
619    let mut attachments_meta = vec![];
620    for (attachment_name,attachment_size) in attachments {
621        attachments_meta.push(Attachment {
622            name: attachment_name,
623            size: attachment_size
624        });                
625    }
626    let msg_meta = MsgMeta {
627        tx,
628        rx,
629        key,
630        kind: MsgKind::RpcResponse(result),
631        correlation_id,
632        route,
633        payload_size: payload.len() as u64,
634        auth_token,
635        auth_data,
636		attachments: attachments_meta
637    };
638    let payload_size = msg_meta.payload_size;
639    let attachments_sizes = msg_meta.attachments_sizes();
640    let mut msg_meta = serde_json::to_vec(&msg_meta)?;
641    let msg_meta_size = msg_meta.len() as u64;     
642    let mut buf = vec![];
643    buf.put_u32(msg_meta.len() as u32);
644    buf.append(&mut msg_meta);
645    buf.append(&mut payload);
646    buf.append(&mut attachments_data);    
647    Ok((buf, msg_meta_size, payload_size, attachments_sizes))
648}
649
650pub fn reply_to_rpc_dto_with_later_attachments2(tx: String, rx: String, key: String, correlation_id: Uuid, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
651    let mut attachments_meta = vec![];
652
653    for (attachment_name,attachment_size) in attachments {
654        attachments_meta.push(Attachment {
655            name: attachment_name,
656            size: attachment_size
657        });                
658    }
659
660    let msg_meta = MsgMeta {
661        tx,
662        rx,
663        key,
664        kind: MsgKind::RpcResponse(result),
665        correlation_id,
666        route,
667        payload_size: payload.len() as u64,
668        auth_token,
669        auth_data,
670		attachments: attachments_meta
671    };
672
673    let mut msg_meta = serde_json::to_vec(&msg_meta)?;        
674
675    let mut buf = vec![];
676
677    buf.put_u32(msg_meta.len() as u32);
678
679    buf.append(&mut msg_meta);
680    buf.append(&mut payload);
681    
682    Ok(buf)
683}
684
685    /*
686    pub fn recv_event(&self) -> Result<(MsgMeta, Vec<u8>), Error> {
687        let (msg_meta, len, data) = self.rx.recv()?;            
688        let payload = &data[len + 4..];        
689
690        Ok((msg_meta, payload.to_vec()))
691    }
692    pub fn recv_rpc_request(&self) -> Result<(MsgMeta, Vec<u8>), Error> {
693        let (msg_meta, len, data) = self.rpc_request_rx.recv()?;                
694        let payload = &data[len + 4..];        
695
696        Ok((msg_meta, payload.to_vec()))
697    }
698    */
699
700pub fn rpc_dto2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
701    let correlation_id = Uuid::new_v4();
702
703    let msg_meta = MsgMeta {
704        tx,
705        rx,
706        key,
707        kind: MsgKind::RpcRequest,
708        correlation_id,
709        route,
710        payload_size: payload.len() as u64,
711        auth_token,
712        auth_data,
713		attachments: vec![]
714    };
715
716    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
717
718    let mut buf = vec![];
719
720    buf.put_u32(msg_meta.len() as u32);
721
722    buf.append(&mut msg_meta);
723    buf.append(&mut payload);
724
725    Ok(buf)
726}
727
728pub fn rpc_dto_with_attachments2(tx: String, rx: String, key: String, mut payload: Vec<u8>, attachments: Vec<(String, Vec<u8>)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
729    let correlation_id = Uuid::new_v4();
730    let mut attachments_meta = vec![];
731    let mut attachments_payload = vec![];
732
733    for (attachment_name, mut attachment_payload) in attachments {
734        attachments_meta.push(Attachment {
735            name: attachment_name,
736            size: attachment_payload.len() as u64
737        });        
738        attachments_payload.append(&mut attachment_payload);
739    }
740
741    let msg_meta = MsgMeta {
742        tx,
743        rx,
744        key,
745        kind: MsgKind::RpcRequest,
746        correlation_id,
747        route,
748        payload_size: payload.len() as u64,
749        auth_token,
750        auth_data,
751		attachments: attachments_meta
752    };
753
754    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
755
756    let mut buf = vec![];
757
758    buf.put_u32(msg_meta.len() as u32);
759
760    buf.append(&mut msg_meta);
761    buf.append(&mut payload);
762    buf.append(&mut attachments_payload);
763
764    Ok(buf)
765}
766
767pub fn rpc_dto_with_later_attachments2(tx: String, rx: String, key: String, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
768    let correlation_id = Uuid::new_v4();
769    let mut attachments_meta = vec![];
770
771    for (attachment_name,attachment_size) in attachments {
772        attachments_meta.push(Attachment {
773            name: attachment_name,
774            size: attachment_size
775        });                
776    }
777
778    let msg_meta = MsgMeta {
779        tx,
780        rx,
781        key,
782        kind: MsgKind::RpcRequest,
783        correlation_id,
784        route,
785        payload_size: payload.len() as u64,
786        auth_token,
787        auth_data,
788		attachments: attachments_meta
789    };
790
791    let mut msg_meta = serde_json::to_vec(&msg_meta)?;    
792
793    let mut buf = vec![];
794
795    buf.put_u32(msg_meta.len() as u32);
796
797    buf.append(&mut msg_meta);
798    buf.append(&mut payload);    
799
800    Ok(buf)
801}
802
803pub fn rpc_dto_with_correlation_id_2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>), Error> {
804    let correlation_id = Uuid::new_v4();
805
806    let msg_meta = MsgMeta {
807        tx,
808        rx,
809        key,
810        kind: MsgKind::RpcRequest,
811        correlation_id,
812        route,
813        payload_size: payload.len() as u64,
814        auth_token,
815        auth_data,
816		attachments: vec![]
817    };
818
819    let mut msg_meta = serde_json::to_vec(&msg_meta)?;        
820
821    let mut buf = vec![];
822
823    buf.put_u32(msg_meta.len() as u32);
824
825    buf.append(&mut msg_meta);
826    buf.append(&mut payload);
827
828    Ok((correlation_id, buf))
829}
830
831pub fn get_msg_meta(data: &[u8]) -> Result<MsgMeta, Error> {
832    let mut buf = Cursor::new(data);
833    let len = buf.get_u32() as usize;
834
835    serde_json::from_slice::<MsgMeta>(&data[4..len + 4])
836}
837
838pub fn get_msg<T>(data: &[u8]) -> Result<(MsgMeta, T, Vec<(String, Vec<u8>)>), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
839    let mut buf = Cursor::new(data);    
840    let len = buf.get_u32();
841    let msg_meta_offset = (len + 4) as usize;
842
843    let msg_meta = serde_json::from_slice::<MsgMeta>(&data[4..msg_meta_offset as usize])?;
844
845    let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
846
847    let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
848
849    let mut attachments = vec![];
850    let mut attachment_offset = payload_offset;
851
852    for attachment in &msg_meta.attachments {
853        let attachment_start = attachment_offset;
854        attachment_offset = attachment_offset + attachment.size as usize;
855        attachments.push((attachment.name.clone(), (&data[attachment_start..attachment_offset]).to_owned()))
856    }
857
858    Ok((msg_meta, payload, attachments))
859}
860
861pub fn get_msg_meta_and_payload<T>(data: &[u8]) -> Result<(MsgMeta, T), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
862    let mut buf = Cursor::new(data);    
863    let len = buf.get_u32();
864    let msg_meta_offset = (len + 4) as usize;
865
866    let msg_meta = serde_json::from_slice::<MsgMeta>(&data[4..msg_meta_offset as usize])?;
867
868    let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
869
870    let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;    
871
872    Ok((msg_meta, payload))
873}
874
875pub fn get_payload<T>(msg_meta: &MsgMeta, data: &[u8]) -> Result<T, Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
876    let mut buf = Cursor::new(data);    
877    let len = buf.get_u32();
878    let msg_meta_offset = (len + 4) as usize;    
879
880    let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
881
882    let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;    
883
884    Ok(payload)
885}
886
887pub fn get_payload_with_attachments<T>(msg_meta: &MsgMeta, data: &[u8]) -> Result<(T, Vec<(String, Vec<u8>)>), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
888    let mut buf = Cursor::new(data);    
889    let len = buf.get_u32();
890    let msg_meta_offset = (len + 4) as usize;
891
892    let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
893
894    let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
895
896    let mut attachments = vec![];
897    let mut attachment_offset = payload_offset;
898
899    for attachment in &msg_meta.attachments {
900        let attachment_start = attachment_offset;
901        attachment_offset = attachment_offset + attachment.size as usize;
902        attachments.push((attachment.name.clone(), (&data[attachment_start..attachment_offset]).to_owned()))
903    }
904
905    Ok((payload, attachments))
906}