use std::io::Cursor;
use std::fmt::Debug;
use bytes::{Buf, BufMut};
use serde_derive::{Serialize, Deserialize};
use serde_json::{Value, Error};
use uuid::Uuid;
pub use bytes;
pub use uuid;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct CmpSpec {
pub addr: String,
pub tx: String
}
impl CmpSpec {
pub fn new_addr(&self, addr: &str) -> CmpSpec {
CmpSpec {
addr: addr.to_owned(),
tx: self.addr.clone()
}
}
pub fn add_to_addr(&self, delta: &str) -> CmpSpec {
CmpSpec {
addr: self.addr.clone() + "." + delta,
tx: self.addr.clone()
}
}
}
impl Default for CmpSpec {
fn default() -> Self {
CmpSpec {
addr: String::new(),
tx: String::new()
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Participator {
Component(String, Option<String>, Option<String>),
Service(String)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum RouteSpec {
Simple,
Client(Participator)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Route {
pub source: Participator,
pub spec: RouteSpec,
pub points: Vec<Participator>
}
#[derive(Debug, Deserialize, Clone)]
pub struct Message<T> {
pub meta: MsgMeta,
pub payload: T,
pub attachments_data: Vec<u8>
}
#[derive(Debug, Deserialize, Clone)]
pub struct MessageRaw {
pub meta: MsgMeta,
pub payload: Vec<u8>,
pub attachments_data: Vec<u8>
}
#[derive(Debug, Serialize, Clone)]
pub enum Response<T> {
Simple(T),
Full(T, Vec<(String, u64)>, Vec<u8>)
}
pub fn resp<T>(payload: T) -> Result<Response<T>, Box<dyn std::error::Error>> {
Ok(Response::Simple(payload))
}
pub fn resp_full<T>(payload: T, attachments: Vec<(String, u64)>, attachments_data: Vec<u8>) -> Result<Response<T>, Box<dyn std::error::Error>> {
Ok(Response::Full(payload, attachments, attachments_data))
}
pub fn resp_raw(payload: Vec<u8>) -> Result<ResponseRaw, Box<dyn std::error::Error>> {
Ok(ResponseRaw::Simple(payload))
}
pub fn resp_raw_full(payload: Vec<u8>, attachments: Vec<(String, u64)>, attachments_data: Vec<u8>) -> Result<ResponseRaw, Box<dyn std::error::Error>> {
Ok(ResponseRaw::Full(payload, attachments, attachments_data))
}
#[derive(Debug, Serialize, Clone)]
pub enum ResponseRaw {
Simple(Vec<u8>),
Full(Vec<u8>, Vec<(String, u64)>, Vec<u8>)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MsgMeta {
pub tx: String,
pub rx: String,
pub key: String,
pub kind: MsgKind,
pub correlation_id: Uuid,
pub route: Route,
pub payload_size: u64,
pub auth_token: Option<String>,
pub auth_data: Option<Value>,
pub attachments: Vec<Attachment>
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum MsgKind {
Event,
RpcRequest,
RpcResponse(RpcResult)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum RpcResult {
Ok,
Err
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Attachment {
pub name: String,
pub size: u64
}
impl MsgMeta {
pub fn content_len(&self) -> u64 {
let mut len = self.payload_size;
for attachment in &self.attachments {
len = len + attachment.size;
}
len
}
pub fn attachments_len(&self) -> u64 {
let mut len = 0;
for attachment in &self.attachments {
len = len + attachment.size;
}
len
}
pub fn attachments_sizes(&self) -> Vec<u64> {
let mut res = vec![];
for attachment in &self.attachments {
res.push(attachment.size);
}
res
}
pub fn display(&self) -> String {
format!("{} -> {} {} {:?}", self.tx, self.rx, self.key, self.kind)
}
pub fn key_part(&self, index: usize) -> Result<&str, String> {
let split: Vec<&str> = self.key.split(".").collect();
if index >= split.len() {
return Err("index equals or superior to parts length".to_owned());
}
return Ok(split[index])
}
pub fn match_key_part(&self, index: usize, value: &str) -> Result<bool, String> {
let split: Vec<&str> = self.key.split(".").collect();
if index >= split.len() {
return Err("index equals or superior to parts length".to_owned());
}
return Ok(split[index] == value)
}
pub fn tx_part(&self, index: usize) -> Result<&str, String> {
let split: Vec<&str> = self.tx.split(".").collect();
if index >= split.len() {
return Err("index equals or superior to parts length".to_owned());
}
return Ok(split[index])
}
pub fn match_tx_part(&self, index: usize, value: &str) -> Result<bool, String> {
let split: Vec<&str> = self.tx.split(".").collect();
if index >= split.len() {
return Err("index equals or superior to parts length".to_owned());
}
return Ok(split[index] == value)
}
pub fn source_cmp_addr(&self) -> Option<&str> {
match &self.route.source {
Participator::Component(addr, _, _) => Some(&addr),
_ => None
}
}
pub fn source_cmp_part(&self, index: usize) -> Result<&str, String> {
let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
let split: Vec<&str> = addr.split(".").collect();
if index >= split.len() {
return Err("index equals or superior to parts length".to_owned());
}
return Ok(split[index])
}
pub fn match_source_cmp_part(&self, index: usize, value: &str) -> Result<bool, String> {
let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
let split: Vec<&str> = addr.split(".").collect();
if index >= split.len() {
return Err("index equals or superior to parts length".to_owned());
}
return Ok(split[index] == value)
}
pub fn source_cmp_part_before_last(&self) -> Result<&str, String> {
let addr = self.source_cmp_addr().ok_or("Not a cmp source".to_owned())?;
let split: Vec<&str> = addr.split(".").collect();
if split.len() < 2 {
return Err("parts length is less than 2".to_owned());
}
return Ok(split[split.len() - 2])
}
pub fn source_svc_addr(&self) -> Option<String> {
match &self.route.source {
Participator::Service(addr) => Some(addr.clone()),
_ => None
}
}
pub fn client_cmp_addr(&self) -> Option<String> {
match &self.route.spec {
RouteSpec::Client(participator) => {
match participator {
Participator::Component(addr, _, _) => Some(addr.clone()),
_ => None
}
}
_ => None
}
}
pub fn client_svc_addr(&self) -> Option<String> {
match &self.route.spec {
RouteSpec::Client(participator) => {
match participator {
Participator::Service(addr) => Some(addr.clone()),
_ => None
}
}
_ => None
}
}
}
pub fn event_dto<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::Event,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn event_dto_with_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::Event,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let payload_size = msg_meta.payload_size;
let attachments_sizes = msg_meta.attachments_sizes();
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let msg_meta_size = msg_meta.len() as u64;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok((buf, msg_meta_size, payload_size, attachments_sizes))
}
pub fn reply_to_rpc_dto<T>(tx: String, rx: String, key: String, correlation_id: Uuid, payload: T, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcResponse(result),
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn rpc_dto<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn rpc_dto_with_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let payload_size = msg_meta.payload_size;
let attachments_sizes = msg_meta.attachments_sizes();
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let msg_meta_size = msg_meta.len() as u64;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok((buf, msg_meta_size, payload_size, attachments_sizes))
}
pub fn rpc_dto_with_correlation_id<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>), Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok((correlation_id, buf))
}
pub fn rpc_dto_with_correlation_id_sizes<T>(tx: String, rx: String, key: String, payload: T, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>, u64, u64, Vec<u64>), Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let payload_size = msg_meta.payload_size;
let attachments_sizes = msg_meta.attachments_sizes();
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let msg_meta_size = msg_meta.len() as u64;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok((correlation_id, buf, msg_meta_size, payload_size, attachments_sizes))
}
pub fn rpc_dto_with_attachments<T>(tx: String, rx: String, key: String, payload: T, attachments: Vec<(String, Vec<u8>)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let mut attachments_meta = vec![];
let mut attachments_payload = vec![];
for (attachment_name, mut attachment_payload) in attachments {
attachments_meta.push(Attachment {
name: attachment_name,
size: attachment_payload.len() as u64
});
attachments_payload.append(&mut attachment_payload);
}
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: attachments_meta
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
buf.append(&mut attachments_payload);
Ok(buf)
}
pub fn rpc_dto_with_later_attachments<T>(tx: String, rx: String, key: String, payload: T, attachments: Vec<(String, u64)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> where T: Debug, T: serde::Serialize {
let mut payload = serde_json::to_vec(&payload)?;
let correlation_id = Uuid::new_v4();
let mut attachments_meta = vec![];
for (attachment_name,attachment_size) in attachments {
attachments_meta.push(Attachment {
name: attachment_name,
size: attachment_size
});
}
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: attachments_meta
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn event_dto2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::Event,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn reply_to_rpc_dto2_sizes(tx: String, rx: String, key: String, correlation_id: Uuid, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, mut attachments_data: Vec<u8>, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Vec<u8>, u64, u64, Vec<u64>), Error> {
let mut attachments_meta = vec![];
for (attachment_name,attachment_size) in attachments {
attachments_meta.push(Attachment {
name: attachment_name,
size: attachment_size
});
}
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcResponse(result),
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: attachments_meta
};
let payload_size = msg_meta.payload_size;
let attachments_sizes = msg_meta.attachments_sizes();
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let msg_meta_size = msg_meta.len() as u64;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
buf.append(&mut attachments_data);
Ok((buf, msg_meta_size, payload_size, attachments_sizes))
}
pub fn reply_to_rpc_dto_with_later_attachments2(tx: String, rx: String, key: String, correlation_id: Uuid, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, result: RpcResult, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
let mut attachments_meta = vec![];
for (attachment_name,attachment_size) in attachments {
attachments_meta.push(Attachment {
name: attachment_name,
size: attachment_size
});
}
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcResponse(result),
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: attachments_meta
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn rpc_dto2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn rpc_dto_with_attachments2(tx: String, rx: String, key: String, mut payload: Vec<u8>, attachments: Vec<(String, Vec<u8>)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
let correlation_id = Uuid::new_v4();
let mut attachments_meta = vec![];
let mut attachments_payload = vec![];
for (attachment_name, mut attachment_payload) in attachments {
attachments_meta.push(Attachment {
name: attachment_name,
size: attachment_payload.len() as u64
});
attachments_payload.append(&mut attachment_payload);
}
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: attachments_meta
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
buf.append(&mut attachments_payload);
Ok(buf)
}
pub fn rpc_dto_with_later_attachments2(tx: String, rx: String, key: String, mut payload: Vec<u8>, attachments: Vec<(String, u64)>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<Vec<u8>, Error> {
let correlation_id = Uuid::new_v4();
let mut attachments_meta = vec![];
for (attachment_name,attachment_size) in attachments {
attachments_meta.push(Attachment {
name: attachment_name,
size: attachment_size
});
}
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: attachments_meta
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok(buf)
}
pub fn rpc_dto_with_correlation_id_2(tx: String, rx: String, key: String, mut payload: Vec<u8>, route: Route, auth_token: Option<String>, auth_data: Option<Value>) -> Result<(Uuid, Vec<u8>), Error> {
let correlation_id = Uuid::new_v4();
let msg_meta = MsgMeta {
tx,
rx,
key,
kind: MsgKind::RpcRequest,
correlation_id,
route,
payload_size: payload.len() as u64,
auth_token,
auth_data,
attachments: vec![]
};
let mut msg_meta = serde_json::to_vec(&msg_meta)?;
let mut buf = vec![];
buf.put_u32(msg_meta.len() as u32);
buf.append(&mut msg_meta);
buf.append(&mut payload);
Ok((correlation_id, buf))
}
pub fn get_msg_meta(data: &[u8]) -> Result<MsgMeta, Error> {
let mut buf = Cursor::new(data);
let len = buf.get_u32() as usize;
serde_json::from_slice::<MsgMeta>(&data[4..len + 4])
}
pub fn get_msg<T>(data: &[u8]) -> Result<(MsgMeta, T, Vec<(String, Vec<u8>)>), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
let mut buf = Cursor::new(data);
let len = buf.get_u32();
let msg_meta_offset = (len + 4) as usize;
let msg_meta = serde_json::from_slice::<MsgMeta>(&data[4..msg_meta_offset as usize])?;
let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
let mut attachments = vec![];
let mut attachment_offset = payload_offset;
for attachment in &msg_meta.attachments {
let attachment_start = attachment_offset;
attachment_offset = attachment_offset + attachment.size as usize;
attachments.push((attachment.name.clone(), (&data[attachment_start..attachment_offset]).to_owned()))
}
Ok((msg_meta, payload, attachments))
}
pub fn get_msg_meta_and_payload<T>(data: &[u8]) -> Result<(MsgMeta, T), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
let mut buf = Cursor::new(data);
let len = buf.get_u32();
let msg_meta_offset = (len + 4) as usize;
let msg_meta = serde_json::from_slice::<MsgMeta>(&data[4..msg_meta_offset as usize])?;
let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
Ok((msg_meta, payload))
}
pub fn get_payload<T>(msg_meta: &MsgMeta, data: &[u8]) -> Result<T, Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
let mut buf = Cursor::new(data);
let len = buf.get_u32();
let msg_meta_offset = (len + 4) as usize;
let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
Ok(payload)
}
pub fn get_payload_with_attachments<T>(msg_meta: &MsgMeta, data: &[u8]) -> Result<(T, Vec<(String, Vec<u8>)>), Error> where T: Debug, T: serde::Serialize, for<'de> T: serde::Deserialize<'de> {
let mut buf = Cursor::new(data);
let len = buf.get_u32();
let msg_meta_offset = (len + 4) as usize;
let payload_offset = msg_meta_offset + msg_meta.payload_size as usize;
let payload = serde_json::from_slice::<T>(&data[msg_meta_offset..payload_offset])?;
let mut attachments = vec![];
let mut attachment_offset = payload_offset;
for attachment in &msg_meta.attachments {
let attachment_start = attachment_offset;
attachment_offset = attachment_offset + attachment.size as usize;
attachments.push((attachment.name.clone(), (&data[attachment_start..attachment_offset]).to_owned()))
}
Ok((payload, attachments))
}