use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt};
use bytes::{BufMut, BytesMut};
use std::fmt::Debug;
use tokio_util::codec::{Decoder, Encoder};
use crate::communication::{CodecError, InterProcessMessage, MessageMetadata};
const HEADER_SIZE: usize = 8;
#[derive(Debug)]
enum DecodeStatus {
Header,
Metadata {
metadata_size: usize,
data_size: usize,
},
Data {
data_size: usize,
},
}
#[derive(Debug)]
pub struct MessageCodec {
status: DecodeStatus,
msg_metadata: Option<MessageMetadata>,
}
impl MessageCodec {
pub fn new() -> MessageCodec {
MessageCodec {
status: DecodeStatus::Header,
msg_metadata: None,
}
}
}
impl Decoder for MessageCodec {
type Item = InterProcessMessage;
type Error = CodecError;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<InterProcessMessage>, CodecError> {
match self.status {
DecodeStatus::Header => {
if buf.len() >= HEADER_SIZE {
let header = buf.split_to(HEADER_SIZE);
let metadata_size = NetworkEndian::read_u32(&header[0..4]) as usize;
let data_size = NetworkEndian::read_u32(&header[4..8]) as usize;
self.status = DecodeStatus::Metadata {
metadata_size,
data_size,
};
buf.reserve(metadata_size + data_size + HEADER_SIZE);
self.decode(buf)
} else {
Ok(None)
}
}
DecodeStatus::Metadata {
metadata_size,
data_size,
} => {
if buf.len() >= metadata_size {
let metadata_bytes = buf.split_to(metadata_size);
let metadata: MessageMetadata =
bincode::deserialize(&metadata_bytes).map_err(CodecError::BincodeError)?;
self.msg_metadata = Some(metadata);
self.status = DecodeStatus::Data { data_size };
self.decode(buf)
} else {
Ok(None)
}
}
DecodeStatus::Data { data_size } => {
if buf.len() >= data_size {
let bytes = buf.split_to(data_size);
let msg = InterProcessMessage::new_serialized(
bytes,
self.msg_metadata.take().unwrap(),
);
self.status = DecodeStatus::Header;
Ok(Some(msg))
} else {
Ok(None)
}
}
}
}
}
impl Encoder<InterProcessMessage> for MessageCodec {
type Error = CodecError;
fn encode(&mut self, msg: InterProcessMessage, buf: &mut BytesMut) -> Result<(), CodecError> {
let (metadata, data) = match msg {
InterProcessMessage::Deserialized { metadata, data } => (metadata, data),
InterProcessMessage::Serialized {
metadata: _,
bytes: _,
} => unreachable!(),
};
let metadata_size = bincode::serialized_size(&metadata).map_err(CodecError::from)?;
let data_size = data.serialized_size().unwrap();
buf.reserve(HEADER_SIZE + metadata_size as usize + data_size);
let mut writer = buf.writer();
writer.write_u32::<NetworkEndian>(metadata_size as u32)?;
writer.write_u32::<NetworkEndian>(data_size as u32)?;
bincode::serialize_into(&mut writer, &metadata).map_err(CodecError::from)?;
data.encode_into(buf).unwrap();
Ok(())
}
}
impl Default for MessageCodec {
fn default() -> Self {
Self::new()
}
}