use bytes::{Buf, BufMut, BytesMut};
use std::convert::TryInto;
use tokio::io;
use tokio_util::codec::{Decoder, Encoder};
static LEN_SIZE: usize = 8;
#[inline]
fn frame_size(msg_size: usize) -> usize {
LEN_SIZE + msg_size
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct DistantCodec;
impl<'a> Encoder<&'a [u8]> for DistantCodec {
type Error = io::Error;
fn encode(&mut self, item: &'a [u8], dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.reserve(frame_size(item.len()));
dst.put_u64(item.len() as u64);
dst.put(item);
Ok(())
}
}
impl Decoder for DistantCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() <= LEN_SIZE {
return Ok(None);
}
let msg_len = u64::from_be_bytes(src[..LEN_SIZE].try_into().unwrap());
if msg_len == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Frame cannot have msg len of 0",
));
}
let frame_len = frame_size(msg_len as usize);
if src.len() >= frame_len {
let data = src[LEN_SIZE..frame_len].to_vec();
src.advance(frame_len);
Ok(Some(data))
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encoder_should_encode_byte_slice_with_frame_size() {
let mut encoder = DistantCodec;
let mut buf = BytesMut::new();
encoder.encode(&[1, 2, 3], &mut buf).unwrap();
assert_eq!(
buf,
vec![ 0, 0, 0, 0, 0, 0, 0, 3, 1, 2, 3],
);
encoder.encode(&[4, 5, 6, 7, 8, 9], &mut buf).unwrap();
assert_eq!(
buf,
vec![
0, 0, 0, 0, 0, 0, 0, 3, 1, 2, 3,
0, 0, 0, 0, 0, 0, 0, 6, 4, 5, 6, 7, 8, 9,
],
);
}
#[test]
fn decoder_should_return_none_if_received_data_smaller_than_frame_length_field() {
let mut decoder = DistantCodec;
let mut buf = BytesMut::new();
for i in 0..LEN_SIZE {
buf.put_u8(i as u8);
}
match decoder.decode(&mut buf) {
Ok(None) => {}
x => panic!("decoder.decode(...) wanted Ok(None), but got {:?}", x),
}
}
#[test]
fn decoder_should_return_none_if_received_data_is_not_a_full_frame() {
let mut decoder = DistantCodec;
let mut buf = BytesMut::new();
buf.put_u64(4);
match decoder.decode(&mut buf) {
Ok(None) => {}
x => panic!("decoder.decode(...) wanted Ok(None), but got {:?}", x),
}
buf.put_u8(1);
buf.put_u8(2);
buf.put_u8(3);
match decoder.decode(&mut buf) {
Ok(None) => {}
x => panic!("decoder.decode(...) wanted Ok(None), but got {:?}", x),
}
}
#[test]
fn decoder_should_decode_and_return_next_frame_if_available() {
let mut decoder = DistantCodec;
let mut buf = BytesMut::new();
buf.put_u64(4);
buf.put_u8(1);
buf.put_u8(2);
buf.put_u8(3);
buf.put_u8(4);
match decoder.decode(&mut buf) {
Ok(Some(data)) => assert_eq!(data, [1, 2, 3, 4]),
x => panic!(
"decoder.decode(...) wanted Ok(Vec[1, 2, 3, 4]), but got {:?}",
x
),
}
}
#[test]
fn decoder_should_properly_remove_decoded_frame_from_byte_buffer() {
let mut decoder = DistantCodec;
let mut buf = BytesMut::new();
buf.put_u64(4);
buf.put_u8(1);
buf.put_u8(2);
buf.put_u8(3);
buf.put_u8(4);
buf.put_u8(123);
match decoder.decode(&mut buf) {
Ok(Some(data)) => {
assert_eq!(data, [1, 2, 3, 4]);
assert_eq!(buf, vec![123]);
}
x => panic!(
"decoder.decode(...) wanted Ok(Vec[1, 2, 3, 4]), but got {:?}",
x
),
}
}
#[test]
fn decoder_should_return_error_if_frame_has_msg_len_of_zero() {
let mut decoder = DistantCodec;
let mut buf = BytesMut::new();
buf.put_u64(0);
buf.put_u8(1);
match decoder.decode(&mut buf) {
Err(x) => assert_eq!(x.kind(), io::ErrorKind::InvalidData),
x => panic!(
"decoder.decode(...) wanted Err(io::ErrorKind::InvalidData), but got {:?}",
x
),
}
}
}