use crate::bytes::{BufMut, BytesMut};
use crate::codec::{Decoder, Encoder};
use std::io;
#[derive(Debug, Clone)]
pub struct LengthDelimitedCodec {
builder: LengthDelimitedCodecBuilder,
state: DecodeState,
}
#[derive(Debug, Clone)]
pub struct LengthDelimitedCodecBuilder {
length_field_offset: usize,
length_field_length: usize,
length_adjustment: isize,
num_skip: usize,
max_frame_length: usize,
big_endian: bool,
}
#[derive(Debug, Clone, Copy)]
enum DecodeState {
Head,
Data(usize),
}
fn max_length_field_value(length_field_length: usize) -> io::Result<u64> {
match length_field_length {
1..=7 => Ok((1u64 << (length_field_length * 8)) - 1),
8 => Ok(u64::MAX),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid length_field_length",
)),
}
}
impl LengthDelimitedCodec {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::builder().new_codec()
}
#[inline]
#[must_use]
pub fn builder() -> LengthDelimitedCodecBuilder {
LengthDelimitedCodecBuilder {
length_field_offset: 0,
length_field_length: 4,
length_adjustment: 0,
num_skip: 4,
max_frame_length: 8 * 1024 * 1024,
big_endian: true,
}
}
}
impl Default for LengthDelimitedCodec {
fn default() -> Self {
Self::new()
}
}
impl LengthDelimitedCodecBuilder {
#[inline]
#[must_use]
pub fn length_field_offset(mut self, val: usize) -> Self {
self.length_field_offset = val;
self
}
#[inline]
#[must_use]
pub fn length_field_length(mut self, val: usize) -> Self {
self.length_field_length = val;
self
}
#[inline]
#[must_use]
pub fn length_adjustment(mut self, val: isize) -> Self {
self.length_adjustment = val;
self
}
#[inline]
#[must_use]
pub fn num_skip(mut self, val: usize) -> Self {
self.num_skip = val;
self
}
#[inline]
#[must_use]
pub fn max_frame_length(mut self, val: usize) -> Self {
self.max_frame_length = val;
self
}
#[inline]
#[must_use]
pub fn big_endian(mut self) -> Self {
self.big_endian = true;
self
}
#[inline]
#[must_use]
pub fn little_endian(mut self) -> Self {
self.big_endian = false;
self
}
#[inline]
#[must_use]
pub fn new_codec(self) -> LengthDelimitedCodec {
assert!(
(1..=8).contains(&self.length_field_length),
"length_field_length must be 1..=8"
);
LengthDelimitedCodec {
builder: self,
state: DecodeState::Head,
}
}
}
impl LengthDelimitedCodec {
fn decode_head(&self, src: &BytesMut) -> io::Result<u64> {
let offset = self.builder.length_field_offset;
let len = self.builder.length_field_length;
let end = offset.saturating_add(len);
if src.len() < end {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"not enough bytes for length field",
));
}
let bytes = &src[offset..end];
let mut value: u64 = 0;
if self.builder.big_endian {
for &b in bytes {
value = (value << 8) | u64::from(b);
}
} else {
for (shift, &b) in bytes.iter().enumerate() {
value |= u64::from(b) << (shift * 8);
}
}
Ok(value)
}
fn adjusted_frame_len(&self, len: u64) -> io::Result<usize> {
let len_i64 = i64::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "length exceeds i64"))?;
let adjustment = i64::try_from(self.builder.length_adjustment).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "length adjustment exceeds i64")
})?;
let adjusted = len_i64
.checked_add(adjustment)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "length overflow"))?;
if adjusted < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"negative frame length",
));
}
let len_usize = usize::try_from(adjusted)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "length exceeds usize"))?;
if len_usize > self.builder.max_frame_length {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"frame length exceeds max_frame_length",
));
}
Ok(len_usize)
}
}
impl Decoder for LengthDelimitedCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
loop {
match self.state {
DecodeState::Head => {
let header_len = self
.builder
.length_field_offset
.saturating_add(self.builder.length_field_length);
if src.len() < header_len {
return Ok(None);
}
let raw_len = self.decode_head(src)?;
let frame_len = self.adjusted_frame_len(raw_len)?;
let total_frame_len = header_len.checked_add(frame_len).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "frame length overflow")
})?;
let retained_len = total_frame_len
.checked_sub(self.builder.num_skip)
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"num_skip exceeds total frame length",
)
})?;
if src.len() < self.builder.num_skip {
return Ok(None);
}
if self.builder.num_skip > 0 {
let _ = src.split_to(self.builder.num_skip);
}
self.state = DecodeState::Data(retained_len);
}
DecodeState::Data(frame_len) => {
if src.len() < frame_len {
return Ok(None);
}
let data = src.split_to(frame_len);
self.state = DecodeState::Head;
return Ok(Some(data));
}
}
}
}
}
impl Encoder<BytesMut> for LengthDelimitedCodec {
type Error = io::Error;
fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
let frame_len = item.len();
let adjustment = i64::try_from(self.builder.length_adjustment).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "length adjustment exceeds i64")
})?;
let frame_len_i64 = i64::try_from(frame_len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "frame length exceeds i64"))?;
let adjusted_len = frame_len_i64
.checked_sub(adjustment)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "length underflow"))?;
if adjusted_len < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"negative encoded length",
));
}
let length_to_encode = u64::try_from(adjusted_len).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "encoded length exceeds u64")
})?;
let max_length_value = max_length_field_value(self.builder.length_field_length)?;
if length_to_encode > max_length_value {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"encoded length exceeds length_field_length capacity",
));
}
if frame_len > self.builder.max_frame_length {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"frame length exceeds max_frame_length",
));
}
let header_len = self
.builder
.length_field_offset
.saturating_add(self.builder.length_field_length);
dst.reserve(header_len + frame_len);
for _ in 0..self.builder.length_field_offset {
dst.put_u8(0);
}
if self.builder.big_endian {
match self.builder.length_field_length {
1 => dst.put_u8(length_to_encode as u8),
2 => dst.put_u16(length_to_encode as u16),
3 => {
dst.put_u8((length_to_encode >> 16) as u8);
dst.put_u16(length_to_encode as u16);
}
4 => dst.put_u32(length_to_encode as u32),
5 => {
dst.put_u8((length_to_encode >> 32) as u8);
dst.put_u32(length_to_encode as u32);
}
6 => {
dst.put_u16((length_to_encode >> 32) as u16);
dst.put_u32(length_to_encode as u32);
}
7 => {
dst.put_u8((length_to_encode >> 48) as u8);
dst.put_u16((length_to_encode >> 32) as u16);
dst.put_u32(length_to_encode as u32);
}
8 => dst.put_u64(length_to_encode),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid length_field_length",
));
}
}
} else {
match self.builder.length_field_length {
1 => dst.put_u8(length_to_encode as u8),
2 => dst.put_u16_le(length_to_encode as u16),
3 => {
dst.put_u16_le(length_to_encode as u16);
dst.put_u8((length_to_encode >> 16) as u8);
}
4 => dst.put_u32_le(length_to_encode as u32),
5 => {
dst.put_u32_le(length_to_encode as u32);
dst.put_u8((length_to_encode >> 32) as u8);
}
6 => {
dst.put_u32_le(length_to_encode as u32);
dst.put_u16_le((length_to_encode >> 32) as u16);
}
7 => {
dst.put_u32_le(length_to_encode as u32);
dst.put_u16_le((length_to_encode >> 32) as u16);
dst.put_u8((length_to_encode >> 48) as u8);
}
8 => dst.put_u64_le(length_to_encode),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid length_field_length",
));
}
}
}
dst.put_slice(&item);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bytes::BytesMut;
#[test]
fn test_length_delimited_decode() {
let mut codec = LengthDelimitedCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(5);
buf.put_slice(b"hello");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], b"hello");
assert!(buf.is_empty());
}
#[test]
fn test_length_delimited_partial() {
let mut codec = LengthDelimitedCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(5);
buf.put_slice(b"he");
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b"llo");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], b"hello");
}
#[test]
fn test_length_delimited_adjustment() {
let mut codec = LengthDelimitedCodec::builder()
.length_adjustment(2)
.num_skip(4)
.new_codec();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(3);
buf.put_slice(b"hello");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], b"hello");
}
#[test]
fn test_length_delimited_max_frame_length() {
let mut codec = LengthDelimitedCodec::builder()
.max_frame_length(4)
.new_codec();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(5);
buf.put_slice(b"hello");
let err = codec.decode(&mut buf).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn codec_debug() {
let codec = LengthDelimitedCodec::new();
let dbg = format!("{codec:?}");
assert!(dbg.contains("LengthDelimitedCodec"));
}
#[test]
fn codec_clone() {
let codec = LengthDelimitedCodec::builder()
.max_frame_length(1024)
.new_codec();
let cloned = codec;
let dbg = format!("{cloned:?}");
assert!(dbg.contains("LengthDelimitedCodec"));
}
#[test]
fn codec_default() {
let codec = LengthDelimitedCodec::default();
let dbg = format!("{codec:?}");
assert!(dbg.contains("8388608"));
}
#[test]
fn builder_debug() {
let builder = LengthDelimitedCodec::builder();
let dbg = format!("{builder:?}");
assert!(dbg.contains("LengthDelimitedCodecBuilder"));
}
#[test]
fn builder_clone() {
let builder = LengthDelimitedCodec::builder().max_frame_length(512);
let cloned = builder;
let dbg = format!("{cloned:?}");
assert!(dbg.contains("512"));
}
#[test]
fn builder_all_setters() {
let codec = LengthDelimitedCodec::builder()
.length_field_offset(2)
.length_field_length(2)
.length_adjustment(-2)
.num_skip(4)
.max_frame_length(4096)
.big_endian()
.new_codec();
let dbg = format!("{codec:?}");
assert!(dbg.contains("4096"));
}
#[test]
fn builder_little_endian_decode() {
let mut codec = LengthDelimitedCodec::builder().little_endian().new_codec();
let mut buf = BytesMut::new();
buf.put_u8(3);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_slice(b"abc");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], b"abc");
}
#[test]
fn builder_length_field_length_2() {
let mut codec = LengthDelimitedCodec::builder()
.length_field_length(2)
.num_skip(2)
.new_codec();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(4);
buf.put_slice(b"data");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], b"data");
}
#[test]
fn builder_length_field_offset() {
let mut codec = LengthDelimitedCodec::builder()
.length_field_offset(2)
.num_skip(6)
.new_codec();
let mut buf = BytesMut::new();
buf.put_u8(0xAA);
buf.put_u8(0xBB);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(3);
buf.put_slice(b"xyz");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], b"xyz");
}
#[test]
fn builder_num_skip_zero_retains_entire_header_and_keeps_alignment() {
let mut codec = LengthDelimitedCodec::builder().num_skip(0).new_codec();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(1);
buf.put_slice(b"a");
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(1);
buf.put_slice(b"b");
let frame1 = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame1[..], &[0, 0, 0, 1, b'a']);
let frame2 = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame2[..], &[0, 0, 0, 1, b'b']);
assert!(buf.is_empty());
}
#[test]
fn builder_partial_header_retention_returns_remaining_prefix_bytes() {
let mut codec = LengthDelimitedCodec::builder().num_skip(2).new_codec();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(3);
buf.put_slice(b"hey");
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&frame[..], &[0, 3, b'h', b'e', b'y']);
assert!(buf.is_empty());
}
#[test]
fn decode_empty_frame() {
let mut codec = LengthDelimitedCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
let frame = codec.decode(&mut buf).unwrap().unwrap();
assert!(frame.is_empty());
}
#[test]
fn length_delimited_codec_debug_clone() {
let codec = LengthDelimitedCodec::new();
let cloned = codec.clone();
let dbg = format!("{codec:?}");
assert!(dbg.contains("LengthDelimitedCodec"));
let dbg2 = format!("{cloned:?}");
assert_eq!(dbg, dbg2);
}
#[test]
fn length_delimited_codec_builder_debug_clone() {
let builder = LengthDelimitedCodec::builder();
let cloned = builder.clone();
let dbg = format!("{builder:?}");
assert!(dbg.contains("LengthDelimitedCodecBuilder"));
let dbg2 = format!("{cloned:?}");
assert_eq!(dbg, dbg2);
}
#[test]
fn test_encode_basic() {
let mut codec = LengthDelimitedCodec::new();
let mut dst = BytesMut::new();
let data = BytesMut::from("hello");
codec.encode(data, &mut dst).unwrap();
assert_eq!(dst.len(), 9);
assert_eq!(&dst[0..4], &[0, 0, 0, 5]);
assert_eq!(&dst[4..9], b"hello");
}
#[test]
fn test_encode_little_endian() {
let mut codec = LengthDelimitedCodec::builder().little_endian().new_codec();
let mut dst = BytesMut::new();
let data = BytesMut::from("hi");
codec.encode(data, &mut dst).unwrap();
assert_eq!(dst.len(), 6);
assert_eq!(&dst[0..4], &[2, 0, 0, 0]);
assert_eq!(&dst[4..6], b"hi");
}
#[test]
fn test_encode_max_frame_length_rejection() {
let mut codec = LengthDelimitedCodec::builder()
.max_frame_length(3)
.new_codec();
let mut dst = BytesMut::new();
let data = BytesMut::from("toolong");
let err = codec.encode(data, &mut dst).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(err.to_string().contains("max_frame_length"));
}
#[test]
fn test_encode_rejects_length_that_exceeds_length_field_capacity() {
let mut codec = LengthDelimitedCodec::builder()
.length_field_length(1)
.length_adjustment(-32)
.num_skip(1)
.max_frame_length(512)
.new_codec();
let mut dst = BytesMut::from(&b"existing"[..]);
let original = dst.clone();
let payload = BytesMut::from(vec![0xAB; 240].as_slice());
let err = codec.encode(payload, &mut dst).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(
err.to_string()
.contains("encoded length exceeds length_field_length capacity")
);
assert_eq!(dst, original, "encode must not partially mutate dst");
}
#[derive(Debug, Clone)]
struct MetamorphicTestConfig {
length_field_offset: usize,
length_field_length: usize,
length_adjustment: isize,
num_skip: usize,
max_frame_length: usize,
big_endian: bool,
}
impl Default for MetamorphicTestConfig {
fn default() -> Self {
Self {
length_field_offset: 0,
length_field_length: 4,
length_adjustment: 0,
num_skip: 4,
max_frame_length: 8 * 1024 * 1024,
big_endian: true,
}
}
}
impl MetamorphicTestConfig {
fn build_codec(&self) -> LengthDelimitedCodec {
let mut builder = LengthDelimitedCodec::builder()
.length_field_offset(self.length_field_offset)
.length_field_length(self.length_field_length)
.length_adjustment(self.length_adjustment)
.num_skip(self.num_skip)
.max_frame_length(self.max_frame_length);
if self.big_endian {
builder = builder.big_endian();
} else {
builder = builder.little_endian();
}
builder.new_codec()
}
}
trait DetRngExt {
fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize;
fn gen_range_inclusive(&mut self, range: std::ops::RangeInclusive<usize>) -> usize;
}
impl DetRngExt for crate::util::det_rng::DetRng {
fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize {
if range.is_empty() {
range.start
} else {
range.start + (self.next_u64() as usize % (range.end - range.start))
}
}
fn gen_range_inclusive(&mut self, range: std::ops::RangeInclusive<usize>) -> usize {
self.gen_range(*range.start()..*range.end() + 1)
}
}
fn generate_test_payload(rng: &mut crate::util::det_rng::DetRng, size: usize) -> BytesMut {
let mut data = BytesMut::with_capacity(size);
for _ in 0..size {
data.put_u8((rng.next_u64() % 256) as u8);
}
data
}
fn generate_test_configs(
rng: &mut crate::util::det_rng::DetRng,
count: usize,
) -> Vec<MetamorphicTestConfig> {
(0..count)
.map(|_| MetamorphicTestConfig {
length_field_offset: rng.gen_range(0..3),
length_field_length: rng.gen_range_inclusive(1..=4),
length_adjustment: (rng.next_u64() % 21) as isize - 10,
num_skip: rng.gen_range(0..8),
max_frame_length: rng.gen_range_inclusive(100..=1024),
big_endian: (rng.next_u64() % 2) == 0,
})
.collect()
}
#[test]
fn metamorphic_round_trip_property() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut rng = crate::util::det_rng::DetRng::new(seed);
let configs = generate_test_configs(&mut rng, 20);
for config in configs {
let mut encoder = config.build_codec();
let mut decoder = config.build_codec();
for size in [0, 1, 10, 100, 255] {
if size <= config.max_frame_length {
let original_payload = generate_test_payload(&mut rng, size);
let mut encoded = BytesMut::new();
if encoder
.encode(original_payload.clone(), &mut encoded)
.is_ok()
{
let header_len = config.length_field_offset + config.length_field_length;
let total_frame_len = header_len + original_payload.len();
match decoder.decode(&mut encoded) {
Ok(Some(frame)) => {
if config.num_skip >= header_len {
let skipped_payload = config.num_skip - header_len;
let expected_payload = &original_payload[skipped_payload..];
assert_eq!(
&frame[..],
expected_payload,
"Round-trip failed for config {:?}, size {}",
config,
size
);
} else {
let payload_start = header_len - config.num_skip;
assert_eq!(
&frame[payload_start..],
&original_payload[..],
"Round-trip payload mismatch for config {:?}, size {}",
config,
size
);
}
}
Err(err) => {
assert!(
config.num_skip > total_frame_len,
"Decode error {err:?} for decodable config {:?}, size {}",
config,
size
);
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(
err.to_string()
.contains("num_skip exceeds total frame length"),
"Unexpected decode error for config {:?}, size {}: {err:?}",
config,
size
);
}
Ok(None) => panic!(
"Complete encoded frame did not decode for config {:?}, size {}",
config, size
),
}
}
}
}
}
}
#[test]
fn metamorphic_partial_frame_state_preservation() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut rng = crate::util::det_rng::DetRng::new(seed);
let config = MetamorphicTestConfig::default();
let payload = generate_test_payload(&mut rng, 20);
let mut encoder = config.build_codec();
let mut encoded = BytesMut::new();
encoder.encode(payload.clone(), &mut encoded).unwrap();
for split_point in 1..encoded.len() {
let mut decoder1 = config.build_codec();
let mut decoder2 = config.build_codec();
let mut part1 = encoded.clone();
let part2 = part1.split_off(split_point);
let result1_partial = decoder1.decode(&mut part1).unwrap();
assert!(
result1_partial.is_none(),
"Partial frame should return None"
);
part1.extend_from_slice(&part2);
let result1_complete = decoder1.decode(&mut part1).unwrap();
let mut complete_data = encoded.clone();
let result2_complete = decoder2.decode(&mut complete_data).unwrap();
assert_eq!(
result1_complete, result2_complete,
"Partial frame handling changed result at split point {}",
split_point
);
}
}
fn encode_length_prefix_for_test(
length: u64,
length_field_length: usize,
big_endian: bool,
) -> Vec<u8> {
let max_value = max_length_field_value(length_field_length).unwrap();
assert!(
length <= max_value,
"test length {} exceeds {}-byte field capacity {}",
length,
length_field_length,
max_value
);
let bytes = if big_endian {
length.to_be_bytes()
} else {
length.to_le_bytes()
};
if big_endian {
bytes[bytes.len() - length_field_length..].to_vec()
} else {
bytes[..length_field_length].to_vec()
}
}
fn decode_all_available_frames(
codec: &mut LengthDelimitedCodec,
src: &mut BytesMut,
) -> io::Result<Vec<BytesMut>> {
let mut decoded = Vec::new();
while let Some(frame) = codec.decode(src)? {
decoded.push(frame);
}
Ok(decoded)
}
#[test]
fn metamorphic_max_frame_size_rejections() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut rng = crate::util::det_rng::DetRng::new(seed);
for max_len in [10, 50, 100, 500] {
let config = MetamorphicTestConfig {
max_frame_length: max_len,
..Default::default()
};
let mut encoder = config.build_codec();
let mut decoder = config.build_codec();
for size in [max_len - 1, max_len, max_len + 1, max_len * 2] {
let payload = generate_test_payload(&mut rng, size);
let mut encoded = BytesMut::new();
let encode_result = encoder.encode(payload, &mut encoded);
if size > max_len {
assert!(
encode_result.is_err(),
"Encoder should reject frame size {} > max_len {}",
size,
max_len
);
assert_eq!(
encode_result.unwrap_err().kind(),
io::ErrorKind::InvalidData
);
} else {
assert!(
encode_result.is_ok(),
"Encoder should accept frame size {} <= max_len {}",
size,
max_len
);
let decode_result = decoder.decode(&mut encoded);
assert!(
decode_result.is_ok(),
"Decoder should accept frame that encoder produced"
);
}
}
let mut decoder_direct = config.build_codec();
let mut crafted_frame = BytesMut::new();
let oversized_len = max_len + 100;
crafted_frame.put_u32(oversized_len as u32);
let decode_result = decoder_direct.decode(&mut crafted_frame);
assert!(
decode_result.is_err(),
"Decoder should reject oversized frame length in header"
);
}
}
#[test]
fn metamorphic_decoder_boundary_fragmentation_preserves_multi_frame_arrival_order() {
let configs = [
MetamorphicTestConfig {
length_field_length: 1,
num_skip: 1,
max_frame_length: 31,
big_endian: true,
..Default::default()
},
MetamorphicTestConfig {
length_field_length: 1,
num_skip: 1,
max_frame_length: 31,
big_endian: false,
..Default::default()
},
MetamorphicTestConfig {
length_field_length: 2,
num_skip: 2,
max_frame_length: 257,
big_endian: true,
..Default::default()
},
MetamorphicTestConfig {
length_field_length: 4,
num_skip: 4,
max_frame_length: 257,
big_endian: false,
..Default::default()
},
];
for config in configs {
let sizes = [
0,
1,
config.max_frame_length.saturating_sub(1),
config.max_frame_length,
];
let payloads: Vec<BytesMut> = sizes
.into_iter()
.enumerate()
.map(|(index, size)| {
let fill = b'A'.saturating_add(index as u8);
let mut payload = BytesMut::with_capacity(size);
payload.resize(size, fill);
payload
})
.collect();
let mut encoder = config.build_codec();
let mut encoded_stream = BytesMut::new();
for payload in &payloads {
encoder
.encode(payload.clone(), &mut encoded_stream)
.expect("boundary payload must encode");
}
let mut reference_decoder = config.build_codec();
let mut reference_stream = encoded_stream.clone();
let expected =
decode_all_available_frames(&mut reference_decoder, &mut reference_stream).unwrap();
assert!(
reference_stream.is_empty(),
"reference decode should drain the full stream for {config:?}"
);
assert_eq!(
expected, payloads,
"reference decode should preserve boundary payload order for {config:?}"
);
let mut incremental_decoder = config.build_codec();
let mut fragmented_stream = BytesMut::new();
let mut fragmented_source = encoded_stream.clone();
let mut actual = Vec::new();
while !fragmented_source.is_empty() {
let next_byte = fragmented_source.split_to(1);
fragmented_stream.extend_from_slice(&next_byte);
actual.extend(
decode_all_available_frames(&mut incremental_decoder, &mut fragmented_stream)
.unwrap(),
);
}
assert_eq!(
actual, expected,
"byte-wise fragmentation changed arrival order for {config:?}"
);
assert!(
fragmented_stream.is_empty(),
"fragmented decoder should drain all buffered bytes for {config:?}"
);
}
}
#[test]
fn metamorphic_decoder_boundary_oversized_prefixes_reject_without_consuming_header() {
for (length_field_length, big_endian, max_frame_length) in [
(1, true, 31usize),
(1, false, 31usize),
(2, true, 257usize),
(2, false, 257usize),
(4, true, 257usize),
(8, false, 257usize),
] {
let config = MetamorphicTestConfig {
length_field_length,
num_skip: length_field_length,
max_frame_length,
big_endian,
..Default::default()
};
let mut exact_boundary_decoder = config.build_codec();
let exact_header = encode_length_prefix_for_test(
max_frame_length as u64,
length_field_length,
big_endian,
);
let mut exact_boundary = BytesMut::from(exact_header.as_slice());
assert!(
exact_boundary_decoder
.decode(&mut exact_boundary)
.expect("exact-max header should not error without payload")
.is_none(),
"exact-max frame should wait for payload bytes for {:?}",
config
);
assert_eq!(
exact_boundary.as_ref(),
&[] as &[u8],
"exact-max header should be consumed once the decoder enters payload-wait state for {:?}",
config
);
let exact_payload = vec![0xAB; max_frame_length];
exact_boundary.extend_from_slice(&exact_payload);
let decoded = exact_boundary_decoder
.decode(&mut exact_boundary)
.expect("exact-max payload should decode")
.expect("exact-max payload should be emitted");
assert_eq!(
decoded.as_ref(),
exact_payload.as_slice(),
"exact-max payload bytes changed for {:?}",
config
);
assert!(
exact_boundary.is_empty(),
"exact-max decode should drain the buffer for {:?}",
config
);
let max_value = max_length_field_value(length_field_length).unwrap();
let mut oversized_lengths = Vec::new();
for candidate in [
max_frame_length as u64 + 1,
max_frame_length as u64 + 17,
max_frame_length as u64 * 2 + 1,
max_value,
] {
if candidate > max_frame_length as u64
&& candidate <= max_value
&& !oversized_lengths.contains(&candidate)
{
oversized_lengths.push(candidate);
}
}
for oversized_length in oversized_lengths {
let header = encode_length_prefix_for_test(
oversized_length,
length_field_length,
big_endian,
);
let mut decoder = config.build_codec();
let mut src = BytesMut::from(header.as_slice());
let error = decoder
.decode(&mut src)
.expect_err("oversized prefix must be rejected");
assert_eq!(
error.kind(),
io::ErrorKind::InvalidData,
"oversized prefix must raise InvalidData for {:?}",
config
);
assert_eq!(
src.as_ref(),
header.as_slice(),
"oversized-prefix rejection must not consume header bytes for {:?}",
config
);
}
}
}
#[test]
fn metamorphic_byte_order_consistency() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut rng = crate::util::det_rng::DetRng::new(seed);
let payload = generate_test_payload(&mut rng, 100);
let base_config = MetamorphicTestConfig {
length_field_length: 4,
..Default::default()
};
let big_endian_config = MetamorphicTestConfig {
big_endian: true,
..base_config
};
let little_endian_config = MetamorphicTestConfig {
big_endian: false,
..base_config
};
let mut be_encoder = big_endian_config.build_codec();
let mut le_encoder = little_endian_config.build_codec();
let mut be_encoded = BytesMut::new();
let mut le_encoded = BytesMut::new();
be_encoder.encode(payload.clone(), &mut be_encoded).unwrap();
le_encoder.encode(payload.clone(), &mut le_encoded).unwrap();
if payload.len() > 255 {
assert_ne!(
be_encoded, le_encoded,
"Big-endian and little-endian should produce different wire formats"
);
let be_len =
u32::from_be_bytes([be_encoded[0], be_encoded[1], be_encoded[2], be_encoded[3]]);
let le_len =
u32::from_le_bytes([le_encoded[0], le_encoded[1], le_encoded[2], le_encoded[3]]);
assert_eq!(
be_len, le_len,
"Length values should be equal when interpreted correctly"
);
}
let mut be_decoder = big_endian_config.build_codec();
let mut le_decoder = little_endian_config.build_codec();
let be_decoded = be_decoder.decode(&mut be_encoded).unwrap().unwrap();
let le_decoded = le_decoder.decode(&mut le_encoded).unwrap().unwrap();
let header_len = base_config.length_field_offset + base_config.length_field_length;
let payload_start = if base_config.num_skip >= header_len {
0
} else {
header_len - base_config.num_skip
};
assert_eq!(
&be_decoded[payload_start..],
&le_decoded[payload_start..],
"Decoded payloads should be identical regardless of byte order"
);
}
#[test]
fn metamorphic_lab_runtime_replay_identical() {
const SEED: u64 = 0x1234_5678_9ABC_DEF0;
let results: Vec<Vec<BytesMut>> = (0..3)
.map(|_| {
let mut rng = crate::util::det_rng::DetRng::new(SEED);
let mut frames = Vec::new();
for _ in 0..10 {
let config = MetamorphicTestConfig {
length_field_length: 2,
num_skip: 2,
max_frame_length: 1000,
big_endian: (rng.next_u64() % 2) == 0,
..Default::default()
};
let payload_size = rng.gen_range(1..100);
let payload = generate_test_payload(&mut rng, payload_size);
let mut encoder = config.build_codec();
let mut decoder = config.build_codec();
let mut encoded = BytesMut::new();
if encoder.encode(payload, &mut encoded).is_ok() {
if let Ok(Some(frame)) = decoder.decode(&mut encoded) {
frames.push(frame);
}
}
}
frames
})
.collect();
for i in 1..results.len() {
assert_eq!(
results[0], results[i],
"Run {} produced different results than run 0 - non-deterministic behavior detected",
i
);
}
assert!(
!results[0].is_empty(),
"Should have processed some frames successfully"
);
}
#[test]
fn metamorphic_composite_round_trip_with_partial_frames() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut rng = crate::util::det_rng::DetRng::new(seed);
let config = MetamorphicTestConfig::default();
let payload = generate_test_payload(&mut rng, 50);
let mut encoder = config.build_codec();
let mut encoded = BytesMut::new();
encoder.encode(payload.clone(), &mut encoded).unwrap();
let mut decoder = config.build_codec();
let mut remaining = encoded.clone();
let mut accumulated = BytesMut::new();
while !remaining.is_empty() {
let chunk_size = rng.gen_range(1..remaining.len() + 1).min(remaining.len());
let chunk = remaining.split_to(chunk_size);
accumulated.put_slice(&chunk);
if let Ok(Some(frame)) = decoder.decode(&mut accumulated) {
let header_len = config.length_field_offset + config.length_field_length;
let payload_start = if config.num_skip >= header_len {
0
} else {
header_len - config.num_skip
};
assert_eq!(
&frame[payload_start..],
&payload[..],
"Composite round-trip with partial frames failed"
);
break;
}
}
}
#[test]
fn metamorphic_cross_configuration_compatibility() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let mut rng = crate::util::det_rng::DetRng::new(seed);
let base_config = MetamorphicTestConfig {
length_field_length: 4,
num_skip: 4,
max_frame_length: 1000,
..Default::default()
};
let configs = vec![
base_config.clone(),
MetamorphicTestConfig {
big_endian: false,
..base_config.clone()
},
MetamorphicTestConfig {
length_field_offset: 2,
num_skip: 6,
..base_config.clone()
},
];
let payload = generate_test_payload(&mut rng, 30);
for config in &configs {
let mut encoder = config.build_codec();
let mut decoder = config.build_codec();
let mut encoded = BytesMut::new();
encoder.encode(payload.clone(), &mut encoded).unwrap();
let decoded = decoder.decode(&mut encoded).unwrap().unwrap();
let header_len = config.length_field_offset + config.length_field_length;
let payload_start = if config.num_skip >= header_len {
0
} else {
header_len - config.num_skip
};
assert_eq!(
&decoded[payload_start..],
&payload[..],
"Configuration {:?} failed round-trip test",
config
);
}
}
}