1use std::io::Cursor;
2use std::fmt::Debug;
3use bytes::{Buf, BufMut};
4use serde_derive::{Serialize, Deserialize};
5use serde_json::{Value, Error};
6use uuid::Uuid;
7pub use bytes;
8pub use uuid;
9
10#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
11pub struct CmpSpec {
12 pub addr: String,
13 pub tx: String
14}
15
16impl CmpSpec {
17 pub fn new_addr(&self, addr: &str) -> CmpSpec {
19 CmpSpec {
20 addr: addr.to_owned(),
21 tx: self.addr.clone()
22 }
23 }
24 pub fn add_to_addr(&self, delta: &str) -> CmpSpec {
25 CmpSpec {
26 addr: self.addr.clone() + "." + delta,
27 tx: self.addr.clone()
28 }
29 }
30}
31
32impl Default for CmpSpec {
33 fn default() -> Self {
34 CmpSpec {
35 addr: String::new(),
36 tx: String::new()
37 }
38 }
39}
40
41#[derive(Debug, Serialize, Deserialize, Clone)]
42pub enum Participator {
43 Component(String, Option<String>, Option<String>),
45 Service(String)
47}
48
49#[derive(Debug, Serialize, Deserialize, Clone)]
51pub enum RouteSpec {
52 Simple,
54 Client(Participator)
56}
57
58#[derive(Debug, Serialize, Deserialize, Clone)]
59pub struct Route {
60 pub source: Participator,
61 pub spec: RouteSpec,
62 pub points: Vec<Participator>
63}
64
65#[derive(Debug, Deserialize, Clone)]
67pub struct Message<T> {
68 pub meta: MsgMeta,
69 pub payload: T,
70 pub attachments_data: Vec<u8>
71}
72
73#[derive(Debug, Deserialize, Clone)]
75pub struct MessageRaw {
76 pub meta: MsgMeta,
77 pub payload: Vec<u8>,
78 pub attachments_data: Vec<u8>
79}
80
81#[derive(Debug, Serialize, Clone)]
83pub enum Response<T> {
84 Simple(T),
85 Full(T, Vec<(String, u64)>, Vec<u8>)
86}
87
88
89pub fn resp<T>(payload: T) -> Result<Response<T>, Box<dyn std::error::Error>> {
91 Ok(Response::Simple(payload))
92}
93
94pub fn resp_full<T>(payload: T, attachments: Vec<(String, u64)>, attachments_data: Vec<u8>) -> Result<Response<T>, Box<dyn std::error::Error>> {
95 Ok(Response::Full(payload, attachments, attachments_data))
96}
97
98pub fn resp_raw(payload: Vec<u8>) -> Result<ResponseRaw, Box<dyn std::error::Error>> {
99 Ok(ResponseRaw::Simple(payload))
100}
101
102pub fn resp_raw_full(payload: Vec<u8>, attachments: Vec<(String, u64)>, attachments_data: Vec<u8>) -> Result<ResponseRaw, Box<dyn std::error::Error>> {
103 Ok(ResponseRaw::Full(payload, attachments, attachments_data))
104}
105
106#[derive(Debug, Serialize, Clone)]
108pub enum ResponseRaw {
109 Simple(Vec<u8>),
110 Full(Vec<u8>, Vec<(String, u64)>, Vec<u8>)
111}
112
113
114#[derive(Debug, Serialize, Deserialize, Clone)]
116pub struct MsgMeta {
117 pub tx: String,
119 pub rx: String,
121 pub key: String,
123 pub kind: MsgKind,
125 pub correlation_id: Uuid,
127 pub route: Route,
129 pub payload_size: u64,
131 pub auth_token: Option<String>,
133 pub auth_data: Option<Value>,
135 pub attachments: Vec<Attachment>
137}
138
139#[derive(Debug, Serialize, Deserialize, Clone)]
140pub enum MsgKind {
141 Event,
142 RpcRequest,
143 RpcResponse(RpcResult)
144}
145
146#[derive(Debug, Serialize, Deserialize, Clone)]
147pub enum RpcResult {
148 Ok,
149 Err
150}
151
152#[derive(Debug, Serialize, Deserialize, Clone)]
153pub struct Attachment {
154 pub name: String,
155 pub size: u64
156}
157
158impl MsgMeta {
159 pub fn content_len(&self) -> u64 {
161 let mut len = self.payload_size;
162 for attachment in &self.attachments {
163 len = len + attachment.size;
164 }
165 len
166 }
167 pub fn attachments_len(&self) -> u64 {
169 let mut len = 0;
170 for attachment in &self.attachments {
171 len = len + attachment.size;
172 }
173 len
174 }
175 pub fn attachments_sizes(&self) -> Vec<u64> {
177 let mut res = vec![];
178 for attachment in &self.attachments {
179 res.push(attachment.size);
180 }
181 res
182 }
183 pub fn display(&self) -> String {
185 format!("{} -> {} {} {:?}", self.tx, self.rx, self.key, self.kind)
186 }
187 pub fn key_part(&self, index: usize) -> Result<&str, String> {
189 let split: Vec<&str> = self.key.split(".").collect();
190
191 if index >= split.len() {
192 return Err("index equals or superior to parts length".to_owned());
193 }
194
195 return Ok(split[index])
196 }
197 pub fn match_key_part(&self, index: usize, value: &str) -> Result<bool, String> {
199 let split: Vec<&str> = self.key.split(".").collect();
200
201 if index >= split.len() {
202 return Err("index equals or superior to parts length".to_owned());
203 }
204
205 return Ok(split[index] == value)
206 }
207 pub fn tx_part(&self, index: usize) -> Result<&str, String> {
209 let split: Vec<&str> = self.tx.split(".").collect();
210
211 if index >= split.len() {
212 return Err("index equals or superior to parts length".to_owned());
213 }
214
215 return Ok(split[index])
216 }
217 pub fn match_tx_part(&self, index: usize, value: &str) -> Result<bool, String> {
219 let split: Vec<&str> = self.tx.split(".").collect();
220
221 if index >= split.len() {
222 return Err("index equals or superior to parts length".to_owned());
223 }
224
225 return Ok(split[index] == value)
226 }
227 pub fn source_cmp_addr(&self) -> Option<&str> {
228 match &self.route.source {
229 Participator::Component(addr, _, _) => Some(&addr),
230 _ => None
231 }
232 }
233 pub fn source_cmp_part(&self, index: usize) -> Result<&str, String> {
235 let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
236 let split: Vec<&str> = addr.split(".").collect();
237
238 if index >= split.len() {
239 return Err("index equals or superior to parts length".to_owned());
240 }
241
242 return Ok(split[index])
243 }
244 pub fn match_source_cmp_part(&self, index: usize, value: &str) -> Result<bool, String> {
246 let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
247 let split: Vec<&str> = addr.split(".").collect();
248
249 if index >= split.len() {
250 return Err("index equals or superior to parts length".to_owned());
251 }
252
253 return Ok(split[index] == value)
254 }
255 pub fn source_cmp_part_before_last(&self) -> Result<&str, String> {
257 let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
258 let split: Vec<&str> = addr.split(".").collect();
259
260 if split.len() < 2 {
261 return Err("parts length is less than 2".to_owned());
262 }
263
264 return Ok(split[split.len() - 2])
265 }
266 pub fn source_svc_addr(&self) -> Option<String> {
267 match &self.route.source {
268 Participator::Service(addr) => Some(addr.clone()),
269 _ => None
270 }
271 }
272 pub fn client_cmp_addr(&self) -> Option<String> {
273 match &self.route.spec {
274 RouteSpec::Client(participator) => {
275 match participator {
276 Participator::Component(addr, _, _) => Some(addr.clone()),
277 _ => None
278 }
279 }
280 _ => None
281 }
282 }
283 pub fn client_svc_addr(&self) -> Option<String> {
284 match &self.route.spec {
285 RouteSpec::Client(participator) => {
286 match participator {
287 Participator::Service(addr) => Some(addr.clone()),
288 _ => None
289 }
290 }
291 _ => None
292 }
293 }
294}
295
296pub fn event_dto<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
297 let mut payload = serde_json::to_vec(&payload)?;
298 let correlation_id = Uuid::new_v4();
299
300 let msg_meta = MsgMeta {
301 tx,
302 rx,
303 key,
304 kind: MsgKind::Event,
305 correlation_id,
306 route,
307 payload_size: payload.len() as u64,
308 auth_token,
309 auth_data,
310 attachments: vec![]
311 };
312
313 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
314
315 let mut buf = vec![];
316
317 buf.put_u32(msg_meta.len() as u32);
318
319 buf.append(&mut msg_meta);
320 buf.append(&mut payload);
321
322 Ok(buf)
323}
324
325pub fn event_dto_with_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
326 let mut payload = serde_json::to_vec(&payload)?;
327 let correlation_id = Uuid::new_v4();
328 let msg_meta = MsgMeta {
329 tx,
330 rx,
331 key,
332 kind: MsgKind::Event,
333 correlation_id,
334 route,
335 payload_size: payload.len() as u64,
336 auth_token,
337 auth_data,
338 attachments: vec![]
339 };
340 let payload_size = msg_meta.payload_size;
341 let attachments_sizes = msg_meta.attachments_sizes();
342 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
343 let msg_meta_size = msg_meta.len() as u64;
344 let mut buf = vec![];
345 buf.put_u32(msg_meta.len() as u32);
346 buf.append(&mut msg_meta);
347 buf.append(&mut payload);
348 Ok((buf, msg_meta_size, payload_size, attachments_sizes))
349}
350
351pub fn reply_to_rpc_dto<T>(tx: String, rx: String, key: String, correlation_id: Uuid, payload: T, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
352 let mut payload = serde_json::to_vec(&payload)?;
353
354 let msg_meta = MsgMeta {
355 tx,
356 rx,
357 key,
358 kind: MsgKind::RpcResponse(result),
359 correlation_id,
360 route,
361 payload_size: payload.len() as u64,
362 auth_token,
363 auth_data,
364 attachments: vec![]
365 };
366
367 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
368
369 let mut buf = vec![];
370
371 buf.put_u32(msg_meta.len() as u32);
372
373 buf.append(&mut msg_meta);
374 buf.append(&mut payload);
375
376 Ok(buf)
377}
378
379pub fn rpc_dto<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
397 let mut payload = serde_json::to_vec(&payload)?;
398 let correlation_id = Uuid::new_v4();
399
400 let msg_meta = MsgMeta {
401 tx,
402 rx,
403 key,
404 kind: MsgKind::RpcRequest,
405 correlation_id,
406 route,
407 payload_size: payload.len() as u64,
408 auth_token,
409 auth_data,
410 attachments: vec![]
411 };
412
413 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
414
415 let mut buf = vec![];
416
417 buf.put_u32(msg_meta.len() as u32);
418
419 buf.append(&mut msg_meta);
420 buf.append(&mut payload);
421
422 Ok(buf)
423}
424
425pub fn rpc_dto_with_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
426 let mut payload = serde_json::to_vec(&payload)?;
427 let correlation_id = Uuid::new_v4();
428 let msg_meta = MsgMeta {
429 tx,
430 rx,
431 key,
432 kind: MsgKind::RpcRequest,
433 correlation_id,
434 route,
435 payload_size: payload.len() as u64,
436 auth_token,
437 auth_data,
438 attachments: vec![]
439 };
440 let payload_size = msg_meta.payload_size;
441 let attachments_sizes = msg_meta.attachments_sizes();
442 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
443 let msg_meta_size = msg_meta.len() as u64;
444 let mut buf = vec![];
445 buf.put_u32(msg_meta.len() as u32);
446 buf.append(&mut msg_meta);
447 buf.append(&mut payload);
448 Ok((buf, msg_meta_size, payload_size, attachments_sizes))
449}
450
451pub fn rpc_dto_with_correlation_id<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>), Error> where T: Debug, T: serde::Serialize {
452 let mut payload = serde_json::to_vec(&payload)?;
453 let correlation_id = Uuid::new_v4();
454 let msg_meta = MsgMeta {
455 tx,
456 rx,
457 key,
458 kind: MsgKind::RpcRequest,
459 correlation_id,
460 route,
461 payload_size: payload.len() as u64,
462 auth_token,
463 auth_data,
464 attachments: vec![]
465 };
466 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
467 let mut buf = vec![];
468 buf.put_u32(msg_meta.len() as u32);
469 buf.append(&mut msg_meta);
470 buf.append(&mut payload);
471 Ok((correlation_id, buf))
472}
473
474pub fn rpc_dto_with_correlation_id_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
475 let mut payload = serde_json::to_vec(&payload)?;
476 let correlation_id = Uuid::new_v4();
477 let msg_meta = MsgMeta {
478 tx,
479 rx,
480 key,
481 kind: MsgKind::RpcRequest,
482 correlation_id,
483 route,
484 payload_size: payload.len() as u64,
485 auth_token,
486 auth_data,
487 attachments: vec![]
488 };
489 let payload_size = msg_meta.payload_size;
490 let attachments_sizes = msg_meta.attachments_sizes();
491 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
492 let msg_meta_size = msg_meta.len() as u64;
493 let mut buf = vec![];
494 buf.put_u32(msg_meta.len() as u32);
495 buf.append(&mut msg_meta);
496 buf.append(&mut payload);
497 Ok((correlation_id, buf, msg_meta_size, payload_size, attachments_sizes))
498}
499
500pub fn rpc_dto_with_attachments<T>(tx: String, rx: String, key: String, payload: T, attachments: Vec<(String, Vec<u8>)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
501 let mut payload = serde_json::to_vec(&payload)?;
502 let correlation_id = Uuid::new_v4();
503 let mut attachments_meta = vec![];
504 let mut attachments_payload = vec![];
505
506 for (attachment_name, mut attachment_payload) in attachments {
507 attachments_meta.push(Attachment {
508 name: attachment_name,
509 size: attachment_payload.len() as u64
510 });
511 attachments_payload.append(&mut attachment_payload);
512 }
513
514 let msg_meta = MsgMeta {
515 tx,
516 rx,
517 key,
518 kind: MsgKind::RpcRequest,
519 correlation_id,
520 route,
521 payload_size: payload.len() as u64,
522 auth_token,
523 auth_data,
524 attachments: attachments_meta
525 };
526
527 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
528
529 let mut buf = vec![];
530
531 buf.put_u32(msg_meta.len() as u32);
532
533 buf.append(&mut msg_meta);
534 buf.append(&mut payload);
535 buf.append(&mut attachments_payload);
536
537 Ok(buf)
538}
539
540pub fn rpc_dto_with_later_attachments<T>(tx: String, rx: String, key: String, payload: T, attachments: Vec<(String, u64)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
541 let mut payload = serde_json::to_vec(&payload)?;
542 let correlation_id = Uuid::new_v4();
543 let mut attachments_meta = vec![];
544
545 for (attachment_name,attachment_size) in attachments {
546 attachments_meta.push(Attachment {
547 name: attachment_name,
548 size: attachment_size
549 });
550 }
551
552 let msg_meta = MsgMeta {
553 tx,
554 rx,
555 key,
556 kind: MsgKind::RpcRequest,
557 correlation_id,
558 route,
559 payload_size: payload.len() as u64,
560 auth_token,
561 auth_data,
562 attachments: attachments_meta
563 };
564
565 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
566
567 let mut buf = vec![];
568
569 buf.put_u32(msg_meta.len() as u32);
570
571 buf.append(&mut msg_meta);
572 buf.append(&mut payload);
573
574 Ok(buf)
575}
576
577pub fn event_dto2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
591 let correlation_id = Uuid::new_v4();
592
593 let msg_meta = MsgMeta {
594 tx,
595 rx,
596 key,
597 kind: MsgKind::Event,
598 correlation_id,
599 route,
600 payload_size: payload.len() as u64,
601 auth_token,
602 auth_data,
603 attachments: vec![]
604 };
605
606 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
607
608 let mut buf = vec![];
609
610 buf.put_u32(msg_meta.len() as u32);
611
612 buf.append(&mut msg_meta);
613 buf.append(&mut payload);
614
615 Ok(buf)
616}
617
618pub fn reply_to_rpc_dto2_sizes(tx: String, rx: String, key: String, correlation_id: Uuid, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, mut attachments_data: Vec<u8>, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> {
619 let mut attachments_meta = vec![];
620 for (attachment_name,attachment_size) in attachments {
621 attachments_meta.push(Attachment {
622 name: attachment_name,
623 size: attachment_size
624 });
625 }
626 let msg_meta = MsgMeta {
627 tx,
628 rx,
629 key,
630 kind: MsgKind::RpcResponse(result),
631 correlation_id,
632 route,
633 payload_size: payload.len() as u64,
634 auth_token,
635 auth_data,
636 attachments: attachments_meta
637 };
638 let payload_size = msg_meta.payload_size;
639 let attachments_sizes = msg_meta.attachments_sizes();
640 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
641 let msg_meta_size = msg_meta.len() as u64;
642 let mut buf = vec![];
643 buf.put_u32(msg_meta.len() as u32);
644 buf.append(&mut msg_meta);
645 buf.append(&mut payload);
646 buf.append(&mut attachments_data);
647 Ok((buf, msg_meta_size, payload_size, attachments_sizes))
648}
649
650pub fn reply_to_rpc_dto_with_later_attachments2(tx: String, rx: String, key: String, correlation_id: Uuid, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
651 let mut attachments_meta = vec![];
652
653 for (attachment_name,attachment_size) in attachments {
654 attachments_meta.push(Attachment {
655 name: attachment_name,
656 size: attachment_size
657 });
658 }
659
660 let msg_meta = MsgMeta {
661 tx,
662 rx,
663 key,
664 kind: MsgKind::RpcResponse(result),
665 correlation_id,
666 route,
667 payload_size: payload.len() as u64,
668 auth_token,
669 auth_data,
670 attachments: attachments_meta
671 };
672
673 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
674
675 let mut buf = vec![];
676
677 buf.put_u32(msg_meta.len() as u32);
678
679 buf.append(&mut msg_meta);
680 buf.append(&mut payload);
681
682 Ok(buf)
683}
684
685 pub fn rpc_dto2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
701 let correlation_id = Uuid::new_v4();
702
703 let msg_meta = MsgMeta {
704 tx,
705 rx,
706 key,
707 kind: MsgKind::RpcRequest,
708 correlation_id,
709 route,
710 payload_size: payload.len() as u64,
711 auth_token,
712 auth_data,
713 attachments: vec![]
714 };
715
716 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
717
718 let mut buf = vec![];
719
720 buf.put_u32(msg_meta.len() as u32);
721
722 buf.append(&mut msg_meta);
723 buf.append(&mut payload);
724
725 Ok(buf)
726}
727
728pub fn rpc_dto_with_attachments2(tx: String, rx: String, key: String, mut payload: Vec<u8>, attachments: Vec<(String, Vec<u8>)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
729 let correlation_id = Uuid::new_v4();
730 let mut attachments_meta = vec![];
731 let mut attachments_payload = vec![];
732
733 for (attachment_name, mut attachment_payload) in attachments {
734 attachments_meta.push(Attachment {
735 name: attachment_name,
736 size: attachment_payload.len() as u64
737 });
738 attachments_payload.append(&mut attachment_payload);
739 }
740
741 let msg_meta = MsgMeta {
742 tx,
743 rx,
744 key,
745 kind: MsgKind::RpcRequest,
746 correlation_id,
747 route,
748 payload_size: payload.len() as u64,
749 auth_token,
750 auth_data,
751 attachments: attachments_meta
752 };
753
754 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
755
756 let mut buf = vec![];
757
758 buf.put_u32(msg_meta.len() as u32);
759
760 buf.append(&mut msg_meta);
761 buf.append(&mut payload);
762 buf.append(&mut attachments_payload);
763
764 Ok(buf)
765}
766
767pub fn rpc_dto_with_later_attachments2(tx: String, rx: String, key: String, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
768 let correlation_id = Uuid::new_v4();
769 let mut attachments_meta = vec![];
770
771 for (attachment_name,attachment_size) in attachments {
772 attachments_meta.push(Attachment {
773 name: attachment_name,
774 size: attachment_size
775 });
776 }
777
778 let msg_meta = MsgMeta {
779 tx,
780 rx,
781 key,
782 kind: MsgKind::RpcRequest,
783 correlation_id,
784 route,
785 payload_size: payload.len() as u64,
786 auth_token,
787 auth_data,
788 attachments: attachments_meta
789 };
790
791 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
792
793 let mut buf = vec![];
794
795 buf.put_u32(msg_meta.len() as u32);
796
797 buf.append(&mut msg_meta);
798 buf.append(&mut payload);
799
800 Ok(buf)
801}
802
803pub fn rpc_dto_with_correlation_id_2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>), Error> {
804 let correlation_id = Uuid::new_v4();
805
806 let msg_meta = MsgMeta {
807 tx,
808 rx,
809 key,
810 kind: MsgKind::RpcRequest,
811 correlation_id,
812 route,
813 payload_size: payload.len() as u64,
814 auth_token,
815 auth_data,
816 attachments: vec![]
817 };
818
819 let mut msg_meta = serde_json::to_vec(&msg_meta)?;
820
821 let mut buf = vec![];
822
823 buf.put_u32(msg_meta.len() as u32);
824
825 buf.append(&mut msg_meta);
826 buf.append(&mut payload);
827
828 Ok((correlation_id, buf))
829}
830
831pub fn get_msg_meta(data: &[u8]) -> Result<MsgMeta, Error> {
832 let mut buf = Cursor::new(data);
833 let len = buf.get_u32() as usize;
834
835 serde_json::from_slice::<MsgMeta>(&data[4..len + 4])
836}
837
838pub fn get_msg<T>(data: &[u8]) -> Result<(MsgMeta, T, Vec<(String, Vec<u8>)>), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
839 let mut buf = Cursor::new(data);
840 let len = buf.get_u32();
841 let msg_meta_offset = (len + 4) as usize;
842
843 let msg_meta = serde_json::from_slice::<MsgMeta>(&data[4..msg_meta_offset as usize])?;
844
845 let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
846
847 let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
848
849 let mut attachments = vec![];
850 let mut attachment_offset = payload_offset;
851
852 for attachment in &msg_meta.attachments {
853 let attachment_start = attachment_offset;
854 attachment_offset = attachment_offset + attachment.size as usize;
855 attachments.push((attachment.name.clone(), (&data[attachment_start..attachment_offset]).to_owned()))
856 }
857
858 Ok((msg_meta, payload, attachments))
859}
860
861pub fn get_msg_meta_and_payload<T>(data: &[u8]) -> Result<(MsgMeta, T), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
862 let mut buf = Cursor::new(data);
863 let len = buf.get_u32();
864 let msg_meta_offset = (len + 4) as usize;
865
866 let msg_meta = serde_json::from_slice::<MsgMeta>(&data[4..msg_meta_offset as usize])?;
867
868 let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
869
870 let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
871
872 Ok((msg_meta, payload))
873}
874
875pub fn get_payload<T>(msg_meta: &MsgMeta, data: &[u8]) -> Result<T, Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
876 let mut buf = Cursor::new(data);
877 let len = buf.get_u32();
878 let msg_meta_offset = (len + 4) as usize;
879
880 let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
881
882 let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
883
884 Ok(payload)
885}
886
887pub fn get_payload_with_attachments<T>(msg_meta: &MsgMeta, data: &[u8]) -> Result<(T, Vec<(String, Vec<u8>)>), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
888 let mut buf = Cursor::new(data);
889 let len = buf.get_u32();
890 let msg_meta_offset = (len + 4) as usize;
891
892 let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
893
894 let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
895
896 let mut attachments = vec![];
897 let mut attachment_offset = payload_offset;
898
899 for attachment in &msg_meta.attachments {
900 let attachment_start = attachment_offset;
901 attachment_offset = attachment_offset + attachment.size as usize;
902 attachments.push((attachment.name.clone(), (&data[attachment_start..attachment_offset]).to_owned()))
903 }
904
905 Ok((payload, attachments))
906}