use crate::messaging::{Error, MsgId, MsgKind, Result};
use bincode::{
config::{BigEndian, FixintEncoding, WithOtherEndian, WithOtherIntEncoding},
Options,
};
use bytes::{BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
use custom_debug::Debug as CustomDebug;
use std::io::Write;
const MESSAGING_PROTO_VERSION: u16 = 1u16;
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct WireMsgHeader {
version: u16,
pub msg_envelope: MsgEnvelope,
}
#[derive(CustomDebug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct MsgEnvelope {
pub msg_id: MsgId,
pub kind: MsgKind,
}
#[derive(Debug, Deserialize, Serialize)]
struct HeaderMeta {
header_len: u16,
version: u16,
}
impl HeaderMeta {
const SIZE: usize = size_of::<Self>();
fn header_len(&self) -> usize {
self.header_len.into()
}
}
lazy_static! {
static ref BINCODE_OPTIONS: WithOtherIntEncoding<
WithOtherEndian<bincode::DefaultOptions, BigEndian>,
FixintEncoding,
> = {
bincode::DefaultOptions::new()
.with_big_endian()
.with_fixint_encoding()
};
}
impl WireMsgHeader {
pub fn new(msg_id: MsgId, kind: MsgKind) -> Self {
Self {
version: MESSAGING_PROTO_VERSION,
msg_envelope: MsgEnvelope { msg_id, kind },
}
}
pub fn from(bytes: Bytes) -> Result<Self> {
let bytes_len = bytes.len();
let meta: HeaderMeta = BINCODE_OPTIONS
.allow_trailing_bytes()
.deserialize(&bytes)
.map_err(|err| Error::FailedToParse(format!("invalid message header: {err}")))?;
if meta.header_len() > bytes_len {
return Err(Error::FailedToParse(format!(
"not enough bytes received ({bytes_len}) to deserialize wire message header",
)));
}
if meta.version != MESSAGING_PROTO_VERSION {
return Err(Error::UnsupportedVersion(meta.version));
}
let msg_envelope_bytes = &bytes[HeaderMeta::SIZE..meta.header_len()];
let msg_envelope: MsgEnvelope =
rmp_serde::from_slice(msg_envelope_bytes).map_err(|err| {
Error::FailedToParse(format!(
"source authority couldn't be deserialized from the header: {err}",
))
})?;
let header = Self {
version: meta.version,
msg_envelope,
};
Ok(header)
}
pub fn serialize(&self) -> Result<Bytes> {
let msg_envelope_vec = rmp_serde::to_vec_named(&self.msg_envelope).map_err(|err| {
Error::Serialisation(format!(
"could not serialize message envelope with Msgpack: {err}",
))
})?;
let meta = HeaderMeta {
header_len: (HeaderMeta::SIZE + msg_envelope_vec.len()) as u16,
version: self.version,
};
let mut buffer_writer = BytesMut::new().writer();
BINCODE_OPTIONS
.serialize_into(&mut buffer_writer, &meta)
.map_err(|err| {
Error::Serialisation(format!(
"header metadata couldn't be serialized into the header: {err}",
))
})?;
buffer_writer
.write(&msg_envelope_vec)
.map_err(|_| Error::Serialisation("ups".to_string()))?;
Ok(buffer_writer.into_inner().freeze())
}
}