use crate::bytes::{BufMut, Bytes, BytesMut};
use crate::codec::{Decoder, Encoder};
use std::fmt;
use super::status::GrpcError;
pub use super::DEFAULT_MAX_MESSAGE_SIZE;
pub const MESSAGE_HEADER_SIZE: usize = 5;
#[derive(Debug, Clone)]
pub struct GrpcMessage {
pub compressed: bool,
pub data: Bytes,
}
impl GrpcMessage {
#[must_use]
pub fn new(data: Bytes) -> Self {
Self {
compressed: false,
data,
}
}
#[must_use]
pub fn compressed(data: Bytes) -> Self {
Self {
compressed: true,
data,
}
}
}
#[derive(Debug)]
pub struct GrpcCodec {
max_encode_message_size: usize,
max_decode_message_size: usize,
}
impl GrpcCodec {
#[must_use]
pub fn new() -> Self {
Self::with_message_size_limits(DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
}
#[must_use]
pub fn with_max_size(max_message_size: usize) -> Self {
Self::with_message_size_limits(max_message_size, max_message_size)
}
#[must_use]
pub fn with_message_size_limits(
max_encode_message_size: usize,
max_decode_message_size: usize,
) -> Self {
Self {
max_encode_message_size,
max_decode_message_size,
}
}
#[must_use]
pub fn max_message_size(&self) -> usize {
self.max_encode_message_size
.max(self.max_decode_message_size)
}
#[must_use]
pub fn max_encode_message_size(&self) -> usize {
self.max_encode_message_size
}
#[must_use]
pub fn max_decode_message_size(&self) -> usize {
self.max_decode_message_size
}
}
impl Default for GrpcCodec {
fn default() -> Self {
Self::new()
}
}
impl Decoder for GrpcCodec {
type Item = GrpcMessage;
type Error = GrpcError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < MESSAGE_HEADER_SIZE {
return Ok(None);
}
let compressed = match src[0] {
0 => false,
1 => true,
flag => {
return Err(GrpcError::protocol(format!(
"invalid gRPC compression flag: {flag}"
)));
}
};
let length = u32::from_be_bytes([src[1], src[2], src[3], src[4]]) as usize;
if length > self.max_decode_message_size {
return Err(GrpcError::MessageTooLarge);
}
if src.len() < MESSAGE_HEADER_SIZE.saturating_add(length) {
return Ok(None);
}
let _ = src.split_to(MESSAGE_HEADER_SIZE);
let data = src.split_to(length).freeze();
Ok(Some(GrpcMessage { compressed, data }))
}
}
impl Encoder<GrpcMessage> for GrpcCodec {
type Error = GrpcError;
fn encode(&mut self, item: GrpcMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
if item.data.len() > self.max_encode_message_size {
return Err(GrpcError::MessageTooLarge);
}
dst.reserve(MESSAGE_HEADER_SIZE + item.data.len());
dst.put_u8(u8::from(item.compressed));
let length = u32::try_from(item.data.len()).map_err(|_| GrpcError::MessageTooLarge)?;
dst.put_u32(length);
dst.extend_from_slice(&item.data);
Ok(())
}
}
pub trait Codec: Send + 'static {
type Encode: Send + 'static;
type Decode: Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
fn encode(&mut self, item: &Self::Encode) -> Result<Bytes, Self::Error>;
fn decode(&mut self, buf: &Bytes) -> Result<Self::Decode, Self::Error>;
fn set_max_encode_message_size(&mut self, _max_size: usize) {}
fn set_max_decode_message_size(&mut self, _max_size: usize) {}
fn map_encode_error(error: Self::Error) -> GrpcError {
GrpcError::invalid_message(error.to_string())
}
fn map_decode_error(error: Self::Error) -> GrpcError {
GrpcError::invalid_message(error.to_string())
}
}
pub type FrameCompressor = fn(&[u8]) -> Result<Bytes, GrpcError>;
pub type FrameDecompressor = fn(&[u8], usize) -> Result<Bytes, GrpcError>;
#[allow(clippy::unnecessary_wraps)]
fn identity_frame_compress(input: &[u8]) -> Result<Bytes, GrpcError> {
Ok(Bytes::copy_from_slice(input))
}
fn identity_frame_decompress(input: &[u8], max_size: usize) -> Result<Bytes, GrpcError> {
if input.len() > max_size {
return Err(GrpcError::MessageTooLarge);
}
Ok(Bytes::copy_from_slice(input))
}
#[cfg(feature = "compression")]
pub fn gzip_frame_compress(input: &[u8]) -> Result<Bytes, GrpcError> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(input)
.map_err(|e| GrpcError::compression(e.to_string()))?;
let compressed = encoder
.finish()
.map_err(|e| GrpcError::compression(e.to_string()))?;
Ok(Bytes::from(compressed))
}
#[cfg(feature = "compression")]
pub fn gzip_frame_decompress(input: &[u8], max_size: usize) -> Result<Bytes, GrpcError> {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(input);
let mut output = Vec::new();
let mut buf = [0u8; 8192];
let mut total = 0;
loop {
let n = decoder
.read(&mut buf)
.map_err(|e| GrpcError::compression(e.to_string()))?;
if n == 0 {
break;
}
total += n;
if total > max_size {
return Err(GrpcError::MessageTooLarge);
}
output.extend_from_slice(&buf[..n]);
}
Ok(Bytes::from(output))
}
pub struct FramedCodec<C> {
inner: C,
framing: GrpcCodec,
use_compression: bool,
compressor: Option<FrameCompressor>,
decompressor: Option<FrameDecompressor>,
}
impl<C: fmt::Debug> fmt::Debug for FramedCodec<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FramedCodec")
.field("inner", &self.inner)
.field("framing", &self.framing)
.field("use_compression", &self.use_compression)
.field("has_compressor", &self.compressor.is_some())
.field("has_decompressor", &self.decompressor.is_some())
.finish()
}
}
impl<C: Codec> FramedCodec<C> {
#[must_use]
pub fn new(inner: C) -> Self {
Self::with_message_size_limits(inner, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
}
#[must_use]
pub fn with_max_size(inner: C, max_size: usize) -> Self {
Self::with_message_size_limits(inner, max_size, max_size)
}
#[must_use]
pub fn with_message_size_limits(
mut inner: C,
max_encode_message_size: usize,
max_decode_message_size: usize,
) -> Self {
inner.set_max_encode_message_size(max_encode_message_size);
inner.set_max_decode_message_size(max_decode_message_size);
Self {
inner,
framing: GrpcCodec::with_message_size_limits(
max_encode_message_size,
max_decode_message_size,
),
use_compression: false,
compressor: None,
decompressor: None,
}
}
#[must_use]
pub fn with_frame_hooks(
mut self,
compressor: Option<FrameCompressor>,
decompressor: Option<FrameDecompressor>,
) -> Self {
if compressor.is_some() || decompressor.is_some() {
self.use_compression = true;
}
self.compressor = compressor;
self.decompressor = decompressor;
self
}
#[must_use]
pub fn with_compression(mut self) -> Self {
self.use_compression = true;
self
}
#[must_use]
pub fn with_frame_codec(
self,
compressor: FrameCompressor,
decompressor: FrameDecompressor,
) -> Self {
self.with_frame_hooks(Some(compressor), Some(decompressor))
}
#[cfg(feature = "compression")]
#[must_use]
pub fn with_gzip_frame_codec(self) -> Self {
self.with_frame_codec(gzip_frame_compress, gzip_frame_decompress)
}
#[must_use]
pub fn with_identity_frame_codec(self) -> Self {
self.with_frame_codec(identity_frame_compress, identity_frame_decompress)
}
pub fn inner(&self) -> &C {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut C {
&mut self.inner
}
#[must_use]
pub fn max_encode_message_size(&self) -> usize {
self.framing.max_encode_message_size()
}
#[must_use]
pub fn max_decode_message_size(&self) -> usize {
self.framing.max_decode_message_size()
}
pub fn encode_message(
&mut self,
item: &C::Encode,
dst: &mut BytesMut,
) -> Result<(), GrpcError> {
let data = self.inner.encode(item).map_err(C::map_encode_error)?;
let message = if self.use_compression {
let compressor = self.compressor.ok_or_else(|| {
GrpcError::compression("compression requested but no frame compressor configured")
})?;
let compressed = compressor(data.as_ref())?;
if compressed.len() > self.max_encode_message_size() {
return Err(GrpcError::MessageTooLarge);
}
GrpcMessage::compressed(compressed)
} else {
GrpcMessage::new(data)
};
self.framing.encode(message, dst)
}
pub fn decode_message(&mut self, src: &mut BytesMut) -> Result<Option<C::Decode>, GrpcError> {
let Some(message) = self.framing.decode(src)? else {
return Ok(None);
};
let data = if message.compressed {
let decompressor = self.decompressor.ok_or_else(|| {
GrpcError::compression(
"compressed frame received but no frame decompressor configured",
)
})?;
decompressor(message.data.as_ref(), self.max_decode_message_size())?
} else {
message.data
};
let decoded = self.inner.decode(&data).map_err(C::map_decode_error)?;
Ok(Some(decoded))
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct IdentityCodec;
impl Codec for IdentityCodec {
type Encode = Bytes;
type Decode = Bytes;
type Error = std::convert::Infallible;
fn encode(&mut self, item: &Self::Encode) -> Result<Bytes, Self::Error> {
Ok(item.clone())
}
fn decode(&mut self, buf: &Bytes) -> Result<Self::Decode, Self::Error> {
Ok(buf.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fmt::Write;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn format_hex(bytes: &[u8]) -> String {
bytes
.iter()
.map(|byte| format!("{byte:02x}"))
.collect::<Vec<_>>()
.join(" ")
}
fn render_grpc_frame_for_snapshot_test(bytes: &[u8]) -> String {
let mut out = String::new();
let compressed_flag = bytes[0];
let payload_len = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
let payload = &bytes[MESSAGE_HEADER_SIZE..];
let _ = writeln!(out, "compressed_flag: {compressed_flag:02x}");
let _ = writeln!(out, "message_length_be: {}", format_hex(&bytes[1..5]));
let _ = writeln!(out, "message_length: {payload_len}");
let _ = writeln!(out, "payload_utf8: {:?}", String::from_utf8_lossy(payload));
let _ = writeln!(out, "payload_hex: {}", format_hex(payload));
out
}
#[derive(Debug, thiserror::Error)]
enum LimitTrackingCodecError {
#[error("message too large")]
MessageTooLarge,
}
#[derive(Debug, Default)]
struct LimitTrackingCodec {
max_encode_message_size: usize,
max_decode_message_size: usize,
}
impl Codec for LimitTrackingCodec {
type Encode = Bytes;
type Decode = Bytes;
type Error = LimitTrackingCodecError;
fn encode(&mut self, item: &Self::Encode) -> Result<Bytes, Self::Error> {
if item.len() > self.max_encode_message_size {
return Err(LimitTrackingCodecError::MessageTooLarge);
}
Ok(item.clone())
}
fn decode(&mut self, buf: &Bytes) -> Result<Self::Decode, Self::Error> {
if buf.len() > self.max_decode_message_size {
return Err(LimitTrackingCodecError::MessageTooLarge);
}
Ok(buf.clone())
}
fn set_max_encode_message_size(&mut self, max_size: usize) {
self.max_encode_message_size = max_size;
}
fn set_max_decode_message_size(&mut self, max_size: usize) {
self.max_decode_message_size = max_size;
}
fn map_encode_error(error: Self::Error) -> GrpcError {
match error {
LimitTrackingCodecError::MessageTooLarge => GrpcError::MessageTooLarge,
}
}
fn map_decode_error(error: Self::Error) -> GrpcError {
match error {
LimitTrackingCodecError::MessageTooLarge => GrpcError::MessageTooLarge,
}
}
}
#[test]
fn test_grpc_codec_roundtrip() {
init_test("test_grpc_codec_roundtrip");
let mut codec = GrpcCodec::new();
let mut buf = BytesMut::new();
let original = GrpcMessage::new(Bytes::from_static(b"hello world"));
codec.encode(original.clone(), &mut buf).unwrap();
let decoded = codec.decode(&mut buf).unwrap().unwrap();
let compressed = decoded.compressed;
crate::assert_with_log!(!compressed, "not compressed", false, compressed);
crate::assert_with_log!(
decoded.data == original.data,
"data",
original.data,
decoded.data
);
crate::test_complete!("test_grpc_codec_roundtrip");
}
#[test]
fn test_grpc_codec_message_too_large() {
init_test("test_grpc_codec_message_too_large");
let mut codec = GrpcCodec::with_max_size(10);
let mut buf = BytesMut::new();
let large_message = GrpcMessage::new(Bytes::from(vec![0u8; 100]));
let result = codec.encode(large_message, &mut buf);
let ok = matches!(result, Err(GrpcError::MessageTooLarge));
crate::assert_with_log!(ok, "message too large", true, ok);
crate::test_complete!("test_grpc_codec_message_too_large");
}
#[test]
fn test_grpc_codec_decode_message_too_large() {
init_test("test_grpc_codec_decode_message_too_large");
let mut codec = GrpcCodec::with_max_size(3);
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u32(4);
buf.extend_from_slice(b"abcd");
let result = codec.decode(&mut buf);
let ok = matches!(result, Err(GrpcError::MessageTooLarge));
crate::assert_with_log!(ok, "decode rejects oversized frame", true, ok);
crate::test_complete!("test_grpc_codec_decode_message_too_large");
}
#[test]
fn test_grpc_codec_partial_header() {
init_test("test_grpc_codec_partial_header");
let mut codec = GrpcCodec::new();
let mut buf = BytesMut::from(&[0u8, 0, 0][..]);
let result = codec.decode(&mut buf).unwrap();
let none = result.is_none();
crate::assert_with_log!(none, "none", true, none);
crate::test_complete!("test_grpc_codec_partial_header");
}
#[test]
fn test_grpc_codec_partial_body() {
init_test("test_grpc_codec_partial_body");
let mut codec = GrpcCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(0); buf.put_u32(10); buf.extend_from_slice(&[1, 2, 3, 4, 5]);
let result = codec.decode(&mut buf).unwrap();
let none = result.is_none();
crate::assert_with_log!(none, "none", true, none);
crate::test_complete!("test_grpc_codec_partial_body");
}
#[test]
fn test_grpc_codec_partial_body_then_complete() {
init_test("test_grpc_codec_partial_body_then_complete");
let mut codec = GrpcCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(0);
buf.put_u32(5);
buf.extend_from_slice(b"ab");
let first = codec.decode(&mut buf).unwrap();
let first_none = first.is_none();
crate::assert_with_log!(first_none, "first decode pending", true, first_none);
buf.extend_from_slice(b"cde");
let second = codec.decode(&mut buf).unwrap();
let second_some = second.is_some();
crate::assert_with_log!(second_some, "second decode ready", true, second_some);
let decoded = second.unwrap();
crate::assert_with_log!(
decoded.data == Bytes::from_static(b"abcde"),
"decoded payload after completion",
Bytes::from_static(b"abcde"),
decoded.data
);
let drained = buf.is_empty();
crate::assert_with_log!(drained, "buffer fully consumed", true, drained);
crate::test_complete!("test_grpc_codec_partial_body_then_complete");
}
#[test]
fn test_grpc_codec_rejects_invalid_compression_flag() {
init_test("test_grpc_codec_rejects_invalid_compression_flag");
let mut codec = GrpcCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(2);
buf.put_u32(3);
buf.extend_from_slice(b"abc");
let result = codec.decode(&mut buf);
let ok = matches!(result, Err(GrpcError::Protocol(_)));
crate::assert_with_log!(ok, "invalid compression flag rejected", true, ok);
crate::test_complete!("test_grpc_codec_rejects_invalid_compression_flag");
}
#[test]
fn test_grpc_codec_invalid_compression_flag_preserves_buffer() {
init_test("test_grpc_codec_invalid_compression_flag_preserves_buffer");
let mut codec = GrpcCodec::new();
let mut buf = BytesMut::new();
buf.put_u8(2);
buf.put_u32(3);
buf.extend_from_slice(b"abc");
let before = buf.as_ref().to_vec();
let result = codec.decode(&mut buf);
let ok = matches!(result, Err(GrpcError::Protocol(_)));
crate::assert_with_log!(ok, "invalid compression flag rejected", true, ok);
crate::assert_with_log!(
buf.as_ref() == before.as_slice(),
"invalid frame leaves buffer untouched",
before,
buf.as_ref().to_vec()
);
crate::test_complete!("test_grpc_codec_invalid_compression_flag_preserves_buffer");
}
#[test]
fn test_identity_codec() {
init_test("test_identity_codec");
let mut codec = IdentityCodec;
let data = Bytes::from_static(b"test data");
let encoded = codec.encode(&data).unwrap();
crate::assert_with_log!(encoded == data, "encoded", data, encoded);
let decoded = codec.decode(&encoded).unwrap();
crate::assert_with_log!(decoded == data, "decoded", data, decoded);
crate::test_complete!("test_identity_codec");
}
#[test]
fn test_framed_codec_roundtrip() {
init_test("test_framed_codec_roundtrip");
let mut codec = FramedCodec::new(IdentityCodec);
let mut buf = BytesMut::new();
let original = Bytes::from_static(b"hello gRPC");
codec.encode_message(&original, &mut buf).unwrap();
let decoded = codec.decode_message(&mut buf).unwrap().unwrap();
crate::assert_with_log!(decoded == original, "decoded", original, decoded);
crate::test_complete!("test_framed_codec_roundtrip");
}
#[test]
fn test_framed_codec_with_compression_errors_on_encode() {
init_test("test_framed_codec_with_compression_errors_on_encode");
let mut codec = FramedCodec::new(IdentityCodec).with_compression();
let mut buf = BytesMut::new();
let original = Bytes::from_static(b"hello gRPC");
let result = codec.encode_message(&original, &mut buf);
let ok = matches!(result, Err(GrpcError::Compression(_)));
crate::assert_with_log!(ok, "compression unsupported", true, ok);
crate::test_complete!("test_framed_codec_with_compression_errors_on_encode");
}
#[test]
fn test_framed_codec_decode_rejects_compressed_frame() {
init_test("test_framed_codec_decode_rejects_compressed_frame");
let mut codec = FramedCodec::new(IdentityCodec);
let mut buf = BytesMut::new();
buf.put_u8(1);
buf.put_u32(3);
buf.extend_from_slice(b"xyz");
let result = codec.decode_message(&mut buf);
let ok = matches!(result, Err(GrpcError::Compression(_)));
crate::assert_with_log!(ok, "compressed frame rejected", true, ok);
let drained = buf.is_empty();
crate::assert_with_log!(drained, "compressed frame consumed", true, drained);
crate::test_complete!("test_framed_codec_decode_rejects_compressed_frame");
}
#[test]
fn test_framed_codec_identity_frame_codec_roundtrip() {
init_test("test_framed_codec_identity_frame_codec_roundtrip");
let mut codec = FramedCodec::new(IdentityCodec).with_identity_frame_codec();
let mut buf = BytesMut::new();
let original = Bytes::from_static(b"compressed-passthrough");
codec
.encode_message(&original, &mut buf)
.expect("encode must succeed");
crate::assert_with_log!(
buf.first().copied() == Some(1),
"compressed flag set",
Some(1u8),
buf.first().copied()
);
insta::assert_snapshot!(
"grpc_identity_frame_wire_layout",
render_grpc_frame_for_snapshot_test(buf.as_ref())
);
let decoded = codec
.decode_message(&mut buf)
.expect("decode must succeed")
.expect("frame must decode");
crate::assert_with_log!(decoded == original, "decoded", original, decoded);
crate::test_complete!("test_framed_codec_identity_frame_codec_roundtrip");
}
#[test]
#[cfg(feature = "compression")]
fn test_gzip_frame_compress_decompress_roundtrip() {
init_test("test_gzip_frame_compress_decompress_roundtrip");
let original = b"hello gzip compression roundtrip test";
let compressed = gzip_frame_compress(original).expect("compress must succeed");
crate::assert_with_log!(
compressed.as_ref() != original.as_slice(),
"compressed differs from original",
true,
compressed.as_ref() != original.as_slice()
);
let decompressed =
gzip_frame_decompress(&compressed, 1024).expect("decompress must succeed");
crate::assert_with_log!(
decompressed.as_ref() == original.as_slice(),
"decompressed matches original",
original.as_slice(),
decompressed.as_ref()
);
crate::test_complete!("test_gzip_frame_compress_decompress_roundtrip");
}
#[test]
#[cfg(feature = "compression")]
fn test_gzip_frame_decompress_bomb_protection() {
init_test("test_gzip_frame_decompress_bomb_protection");
let large = vec![0u8; 4096];
let compressed = gzip_frame_compress(&large).expect("compress must succeed");
let result = gzip_frame_decompress(&compressed, 100);
let ok = matches!(result, Err(GrpcError::MessageTooLarge));
crate::assert_with_log!(ok, "decompression bomb rejected", true, ok);
crate::test_complete!("test_gzip_frame_decompress_bomb_protection");
}
#[test]
#[cfg(feature = "compression")]
fn test_gzip_frame_empty_input() {
init_test("test_gzip_frame_empty_input");
let compressed = gzip_frame_compress(b"").expect("compress empty must succeed");
let decompressed =
gzip_frame_decompress(&compressed, 1024).expect("decompress empty must succeed");
let empty = decompressed.is_empty();
crate::assert_with_log!(empty, "empty roundtrip", true, empty);
crate::test_complete!("test_gzip_frame_empty_input");
}
#[test]
#[cfg(feature = "compression")]
fn test_framed_codec_gzip_roundtrip() {
init_test("test_framed_codec_gzip_roundtrip");
let mut codec = FramedCodec::new(IdentityCodec).with_gzip_frame_codec();
let mut buf = BytesMut::new();
let original = Bytes::from_static(b"gzip framed codec roundtrip");
codec
.encode_message(&original, &mut buf)
.expect("encode must succeed");
crate::assert_with_log!(
buf.first().copied() == Some(1),
"compressed flag set",
Some(1u8),
buf.first().copied()
);
let decoded = codec
.decode_message(&mut buf)
.expect("decode must succeed")
.expect("frame must decode");
crate::assert_with_log!(
decoded == original,
"decoded matches original",
original,
decoded
);
crate::test_complete!("test_framed_codec_gzip_roundtrip");
}
#[test]
#[cfg(feature = "compression")]
fn test_gzip_frame_decompress_invalid_input() {
init_test("test_gzip_frame_decompress_invalid_input");
let garbage = b"this is not gzip data";
let result = gzip_frame_decompress(garbage, 4096);
let ok = matches!(result, Err(GrpcError::Compression(_)));
crate::assert_with_log!(ok, "invalid gzip rejected", true, ok);
crate::test_complete!("test_gzip_frame_decompress_invalid_input");
}
#[test]
#[allow(clippy::unnecessary_wraps)]
fn test_framed_codec_custom_decompressor_enforces_size() {
fn passthrough_compress(input: &[u8]) -> Result<Bytes, GrpcError> {
Ok(Bytes::copy_from_slice(input))
}
fn expanding_decompress(_input: &[u8], max_size: usize) -> Result<Bytes, GrpcError> {
let expanded = vec![7u8; max_size.saturating_add(1)];
if expanded.len() > max_size {
return Err(GrpcError::MessageTooLarge);
}
Ok(Bytes::from(expanded))
}
init_test("test_framed_codec_custom_decompressor_enforces_size");
let mut codec = FramedCodec::with_max_size(IdentityCodec, 8)
.with_frame_codec(passthrough_compress, expanding_decompress);
let mut buf = BytesMut::new();
buf.put_u8(1);
buf.put_u32(3);
buf.extend_from_slice(b"abc");
let result = codec.decode_message(&mut buf);
let ok = matches!(result, Err(GrpcError::MessageTooLarge));
crate::assert_with_log!(ok, "decompress overflow rejected", true, ok);
crate::test_complete!("test_framed_codec_custom_decompressor_enforces_size");
}
#[test]
fn test_framed_codec_with_message_size_limits_updates_inner_codec() {
init_test("test_framed_codec_with_message_size_limits_updates_inner_codec");
let codec = FramedCodec::with_message_size_limits(LimitTrackingCodec::default(), 17, 29);
crate::assert_with_log!(
codec.max_encode_message_size() == 17,
"framed encode limit",
17,
codec.max_encode_message_size()
);
crate::assert_with_log!(
codec.max_decode_message_size() == 29,
"framed decode limit",
29,
codec.max_decode_message_size()
);
crate::assert_with_log!(
codec.inner().max_encode_message_size == 17,
"inner encode limit",
17,
codec.inner().max_encode_message_size
);
crate::assert_with_log!(
codec.inner().max_decode_message_size == 29,
"inner decode limit",
29,
codec.inner().max_decode_message_size
);
crate::test_complete!("test_framed_codec_with_message_size_limits_updates_inner_codec");
}
#[test]
fn test_framed_codec_maps_inner_message_too_large_errors() {
init_test("test_framed_codec_maps_inner_message_too_large_errors");
let mut codec = FramedCodec::new(LimitTrackingCodec::default());
codec.inner_mut().max_encode_message_size = 8;
codec.inner_mut().max_decode_message_size = 8;
let large = Bytes::from_static(b"oversized inner payload");
let encode_err = codec
.encode_message(&large, &mut BytesMut::new())
.expect_err("oversized encode must fail");
crate::assert_with_log!(
matches!(encode_err, GrpcError::MessageTooLarge),
"encode preserves message too large",
true,
matches!(encode_err, GrpcError::MessageTooLarge)
);
let mut encoded = BytesMut::new();
let mut producer = GrpcCodec::new();
producer
.encode(
GrpcMessage::new(Bytes::from_static(b"123456789")),
&mut encoded,
)
.expect("producer encode must succeed");
let decode_err = codec
.decode_message(&mut encoded)
.expect_err("oversized decode must fail");
crate::assert_with_log!(
matches!(decode_err, GrpcError::MessageTooLarge),
"decode preserves message too large",
true,
matches!(decode_err, GrpcError::MessageTooLarge)
);
crate::test_complete!("test_framed_codec_maps_inner_message_too_large_errors");
}
#[test]
fn test_framed_codec_enforces_asymmetric_framing_limits() {
init_test("test_framed_codec_enforces_asymmetric_framing_limits");
let mut codec = FramedCodec::with_message_size_limits(IdentityCodec, 3, 5);
let encode_err = codec
.encode_message(&Bytes::from_static(b"abcd"), &mut BytesMut::new())
.expect_err("encode should enforce outbound framing limit");
crate::assert_with_log!(
matches!(encode_err, GrpcError::MessageTooLarge),
"encode framing limit",
true,
matches!(encode_err, GrpcError::MessageTooLarge)
);
let mut encoded = BytesMut::new();
let mut framing = GrpcCodec::new();
framing
.encode(
GrpcMessage::new(Bytes::from_static(b"123456")),
&mut encoded,
)
.expect("producer encode must succeed");
let decode_err = codec
.decode_message(&mut encoded)
.expect_err("decode should enforce inbound framing limit");
crate::assert_with_log!(
matches!(decode_err, GrpcError::MessageTooLarge),
"decode framing limit",
true,
matches!(decode_err, GrpcError::MessageTooLarge)
);
crate::test_complete!("test_framed_codec_enforces_asymmetric_framing_limits");
}
}