use std::io::{Cursor, Read, Write};
#[cfg(test)]
use proptest::prelude::*;
use super::{
primitives::{Int16, Int32, Int64, Int8, Varint, Varlong},
traits::{ReadError, ReadType, WriteError, WriteType},
vec_builder::VecBuilder,
};
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct RecordHeader {
pub key: String,
pub value: Vec<u8>,
}
impl<R> ReadType<R> for RecordHeader
where
R: Read,
{
fn read(reader: &mut R) -> Result<Self, ReadError> {
let len = Varint::read(reader)?;
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut buf = VecBuilder::new(len);
buf = buf.read_exact(reader)?;
let key = String::from_utf8(buf.into()).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let len = Varint::read(reader)?;
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut value = VecBuilder::new(len);
value = value.read_exact(reader)?;
let value = value.into();
Ok(Self { key, value })
}
}
impl<W> WriteType<W> for RecordHeader
where
W: Write,
{
fn write(&self, writer: &mut W) -> Result<(), WriteError> {
let l = i32::try_from(self.key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(writer)?;
writer.write_all(self.key.as_bytes())?;
let l = i32::try_from(self.value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(writer)?;
writer.write_all(&self.value)?;
Ok(())
}
}
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct Record {
pub timestamp_delta: i64,
pub offset_delta: i32,
pub key: Option<Vec<u8>>,
pub value: Option<Vec<u8>>,
pub headers: Vec<RecordHeader>,
}
impl<R> ReadType<R> for Record
where
R: Read,
{
fn read(reader: &mut R) -> Result<Self, ReadError> {
let len = Varint::read(reader)?;
let len = u64::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let reader = &mut reader.take(len);
Int8::read(reader)?;
let timestamp_delta = Varlong::read(reader)?.0;
let offset_delta = Varint::read(reader)?.0;
let len = Varint::read(reader)?.0;
let key = if len == -1 {
None
} else {
let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut key = VecBuilder::new(len);
key = key.read_exact(reader)?;
Some(key.into())
};
let len = Varint::read(reader)?.0;
let value = if len == -1 {
None
} else {
let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut value = VecBuilder::new(len);
value = value.read_exact(reader)?;
Some(value.into())
};
let n_headers = Varint::read(reader)?;
let n_headers =
usize::try_from(n_headers.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut headers = VecBuilder::new(n_headers);
for _ in 0..n_headers {
headers.push(RecordHeader::read(reader)?);
}
if reader.limit() != 0 {
return Err(ReadError::Malformed(
format!("Found {} trailing bytes after Record", reader.limit()).into(),
));
}
Ok(Self {
timestamp_delta,
offset_delta,
key,
value,
headers: headers.into(),
})
}
}
impl<W> WriteType<W> for Record
where
W: Write,
{
fn write(&self, writer: &mut W) -> Result<(), WriteError> {
let mut data = vec![];
Int8(0).write(&mut data)?;
Varlong(self.timestamp_delta).write(&mut data)?;
Varint(self.offset_delta).write(&mut data)?;
match &self.key {
Some(key) => {
let l = i32::try_from(key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
data.write_all(key)?;
}
None => {
Varint(-1).write(&mut data)?;
}
}
match &self.value {
Some(value) => {
let l =
i32::try_from(value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
data.write_all(value)?;
}
None => {
Varint(-1).write(&mut data)?;
}
}
let l =
i32::try_from(self.headers.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
for header in &self.headers {
header.write(&mut data)?;
}
let l = i32::try_from(data.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(writer)?;
writer.write_all(&data)?;
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum ControlBatchRecord {
Abort,
Commit,
}
impl<R> ReadType<R> for ControlBatchRecord
where
R: Read,
{
fn read(reader: &mut R) -> Result<Self, ReadError> {
let version = Int16::read(reader)?.0;
if version != 0 {
return Err(ReadError::Malformed(
format!("Unknown control batch record version: {}", version).into(),
));
}
let t = Int16::read(reader)?.0;
match t {
0 => Ok(Self::Abort),
1 => Ok(Self::Commit),
_ => Err(ReadError::Malformed(
format!("Unknown control batch record type: {}", t).into(),
)),
}
}
}
impl<W> WriteType<W> for ControlBatchRecord
where
W: Write,
{
fn write(&self, writer: &mut W) -> Result<(), WriteError> {
Int16(0).write(writer)?;
let t = match self {
Self::Abort => 0,
Self::Commit => 1,
};
Int16(t).write(writer)?;
Ok(())
}
}
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum ControlBatchOrRecords {
ControlBatch(ControlBatchRecord),
#[cfg_attr(
test,
proptest(
strategy = "prop::collection::vec(any::<Record>(), 0..2).prop_map(ControlBatchOrRecords::Records)"
)
)]
Records(Vec<Record>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum RecordBatchCompression {
NoCompression,
Gzip,
Snappy,
Lz4,
Zstd,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum RecordBatchTimestampType {
CreateTime,
LogAppendTime,
}
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct RecordBatch {
pub base_offset: i64,
pub partition_leader_epoch: i32,
pub last_offset_delta: i32,
pub first_timestamp: i64,
pub max_timestamp: i64,
pub producer_id: i64,
pub producer_epoch: i16,
pub base_sequence: i32,
pub records: ControlBatchOrRecords,
pub compression: RecordBatchCompression,
pub is_transactional: bool,
pub timestamp_type: RecordBatchTimestampType,
}
impl<R> ReadType<R> for RecordBatch
where
R: Read,
{
fn read(reader: &mut R) -> Result<Self, ReadError> {
let base_offset = Int64::read(reader)?.0;
let len = Int32::read(reader)?;
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let len = len
.checked_sub(
4 + 1 + 4, )
.ok_or_else(|| {
ReadError::Malformed(format!("Record batch len too small: {}", len).into())
})?;
let partition_leader_epoch = Int32::read(reader)?.0;
let magic = Int8::read(reader)?.0;
if magic != 2 {
return Err(ReadError::Malformed(
format!("Invalid magic number in record batch: {}", magic).into(),
));
}
let crc = Int32::read(reader)?.0;
let crc = u32::from_be_bytes(crc.to_be_bytes());
let mut data = VecBuilder::new(len);
data = data.read_exact(reader)?;
let data: Vec<u8> = data.into();
let actual_crc = crc32c::crc32c(&data);
if crc != actual_crc {
return Err(ReadError::Malformed(
format!("CRC error, got 0x{:x}, expected 0x{:x}", actual_crc, crc).into(),
));
}
let mut data = Cursor::new(data);
let body = RecordBatchBody::read(&mut data)?;
let bytes_read = data.position();
let bytes_total = u64::try_from(data.into_inner().len()).map_err(ReadError::Overflow)?;
let bytes_left = bytes_total - bytes_read;
if bytes_left != 0 {
return Err(ReadError::Malformed(
format!("Found {} trailing bytes after RecordBatch", bytes_left).into(),
));
}
Ok(Self {
base_offset,
partition_leader_epoch,
last_offset_delta: body.last_offset_delta,
first_timestamp: body.first_timestamp,
max_timestamp: body.max_timestamp,
producer_id: body.producer_id,
producer_epoch: body.producer_epoch,
base_sequence: body.base_sequence,
compression: body.compression,
timestamp_type: body.timestamp_type,
is_transactional: body.is_transactional,
records: body.records,
})
}
}
impl<W> WriteType<W> for RecordBatch
where
W: Write,
{
fn write(&self, writer: &mut W) -> Result<(), WriteError> {
let mut data = vec![];
let body_ref = RecordBatchBodyRef {
last_offset_delta: self.last_offset_delta,
first_timestamp: self.first_timestamp,
max_timestamp: self.max_timestamp,
producer_id: self.producer_id,
producer_epoch: self.producer_epoch,
base_sequence: self.base_sequence,
records: &self.records,
compression: self.compression,
is_transactional: self.is_transactional,
timestamp_type: self.timestamp_type,
};
body_ref.write(&mut data)?;
Int64(self.base_offset).write(writer)?;
let l = i32::try_from(
data.len()
+ 4 + 1 + 4, )
.map_err(|e| WriteError::Malformed(Box::new(e)))?;
Int32(l).write(writer)?;
Int32(self.partition_leader_epoch).write(writer)?;
Int8(2).write(writer)?;
let crc = crc32c::crc32c(&data);
let crc = i32::from_be_bytes(crc.to_be_bytes());
Int32(crc).write(writer)?;
writer.write_all(&data)?;
Ok(())
}
}
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct RecordBatchBody {
pub last_offset_delta: i32,
pub first_timestamp: i64,
pub max_timestamp: i64,
pub producer_id: i64,
pub producer_epoch: i16,
pub base_sequence: i32,
pub records: ControlBatchOrRecords,
pub compression: RecordBatchCompression,
pub is_transactional: bool,
pub timestamp_type: RecordBatchTimestampType,
}
impl RecordBatchBody {
fn read_records<R>(
reader: &mut R,
is_control: bool,
n_records: usize,
) -> Result<ControlBatchOrRecords, ReadError>
where
R: Read,
{
if is_control {
if n_records != 1 {
return Err(ReadError::Malformed(
format!("Expected 1 control record but got {}", n_records).into(),
));
}
let record = ControlBatchRecord::read(reader)?;
Ok(ControlBatchOrRecords::ControlBatch(record))
} else {
let mut records = VecBuilder::new(n_records);
for _ in 0..n_records {
records.push(Record::read(reader)?);
}
Ok(ControlBatchOrRecords::Records(records.into()))
}
}
}
impl<R> ReadType<R> for RecordBatchBody
where
R: Read,
{
fn read(reader: &mut R) -> Result<Self, ReadError> {
let attributes = Int16::read(reader)?.0;
let compression = match attributes & 0x7 {
0 => RecordBatchCompression::NoCompression,
1 => RecordBatchCompression::Gzip,
2 => RecordBatchCompression::Snappy,
3 => RecordBatchCompression::Lz4,
4 => RecordBatchCompression::Zstd,
other => {
return Err(ReadError::Malformed(
format!("Invalid compression type: {}", other).into(),
));
}
};
let timestamp_type = if ((attributes >> 3) & 0x1) == 0 {
RecordBatchTimestampType::CreateTime
} else {
RecordBatchTimestampType::LogAppendTime
};
let is_transactional = ((attributes >> 4) & 0x1) == 1;
let is_control = ((attributes >> 5) & 0x1) == 1;
let last_offset_delta = Int32::read(reader)?.0;
let first_timestamp = Int64::read(reader)?.0;
let max_timestamp = Int64::read(reader)?.0;
let producer_id = Int64::read(reader)?.0;
let producer_epoch = Int16::read(reader)?.0;
let base_sequence = Int32::read(reader)?.0;
let n_records = match Int32::read(reader)?.0 {
-1 => 0,
n => usize::try_from(n)?,
};
let records = match compression {
RecordBatchCompression::NoCompression => {
Self::read_records(reader, is_control, n_records)?
}
#[cfg(feature = "compression-gzip")]
RecordBatchCompression::Gzip => {
use flate2::read::GzDecoder;
let mut decoder = GzDecoder::new(reader);
let records = Self::read_records(&mut decoder, is_control, n_records)?;
ensure_eof(&mut decoder, "Data left in gzip block")?;
records
}
#[cfg(feature = "compression-lz4")]
RecordBatchCompression::Lz4 => {
use lz4::Decoder;
let mut decoder = Decoder::new(reader)?;
let records = Self::read_records(&mut decoder, is_control, n_records)?;
ensure_eof(&mut decoder, "Data left in LZ4 block")?;
let (_reader, res) = decoder.finish();
res?;
records
}
#[cfg(feature = "compression-snappy")]
RecordBatchCompression::Snappy => {
use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
let mut input = vec![];
reader.read_to_end(&mut input)?;
const JAVA_MAGIC: &[u8] = &[0x82, b'S', b'N', b'A', b'P', b'P', b'Y', 0];
let output = if input.starts_with(JAVA_MAGIC) {
let cursor_content = &input[JAVA_MAGIC.len()..];
let mut cursor = Cursor::new(cursor_content);
let mut buf_version = [0u8; 4];
cursor.read_exact(&mut buf_version)?;
if buf_version != [0, 0, 0, 1] {
return Err(ReadError::Malformed(
format!("Detected Java-specific Snappy compression, but got unknown version: {buf_version:?}").into(),
));
}
let mut buf_compatible = [0u8; 4];
cursor.read_exact(&mut buf_compatible)?;
if buf_compatible != [0, 0, 0, 1] {
return Err(ReadError::Malformed(
format!("Detected Java-specific Snappy compression, but got unknown compat flags: {buf_compatible:?}").into(),
));
}
let mut output = vec![];
while cursor.position() < cursor.get_ref().len() as u64 {
let mut buf_chunk_length = [0u8; 4];
cursor.read_exact(&mut buf_chunk_length)?;
let chunk_length = u32::from_be_bytes(buf_chunk_length) as usize;
let bytes_left = cursor_content.len() - (cursor.position() as usize);
if chunk_length > bytes_left {
return Err(ReadError::Malformed(format!("Java-specific Snappy-compressed data has illegal chunk length, got {chunk_length} bytes but only {bytes_left} bytes are left.").into()));
}
let mut chunk_data = vec![0u8; chunk_length];
cursor.read_exact(&mut chunk_data)?;
let mut buf = carefully_decompress_snappy(&chunk_data, DEFAULT_BLOCK_SIZE)?;
output.append(&mut buf);
}
output
} else {
carefully_decompress_snappy(&input, DEFAULT_BLOCK_SIZE)?
};
let mut decoder = Cursor::new(output);
let records = Self::read_records(&mut decoder, is_control, n_records)?;
ensure_eof(&mut decoder, "Data left in Snappy block")?;
records
}
#[cfg(feature = "compression-zstd")]
RecordBatchCompression::Zstd => {
use zstd::Decoder;
let mut decoder = Decoder::new(reader)?;
let records = Self::read_records(&mut decoder, is_control, n_records)?;
ensure_eof(&mut decoder, "Data left in zstd block")?;
records
}
#[allow(unreachable_patterns)]
_ => {
return Err(ReadError::Malformed(
format!("Unimplemented compression: {:?}", compression).into(),
));
}
};
Ok(Self {
last_offset_delta,
first_timestamp,
max_timestamp,
producer_id,
producer_epoch,
base_sequence,
compression,
timestamp_type,
is_transactional,
records,
})
}
}
impl<W> WriteType<W> for RecordBatchBody
where
W: Write,
{
fn write(&self, writer: &mut W) -> Result<(), WriteError> {
let body_ref = RecordBatchBodyRef {
last_offset_delta: self.last_offset_delta,
first_timestamp: self.first_timestamp,
max_timestamp: self.max_timestamp,
producer_id: self.producer_id,
producer_epoch: self.producer_epoch,
base_sequence: self.base_sequence,
records: &self.records,
compression: self.compression,
is_transactional: self.is_transactional,
timestamp_type: self.timestamp_type,
};
body_ref.write(writer)
}
}
#[derive(Debug)]
struct RecordBatchBodyRef<'a> {
pub last_offset_delta: i32,
pub first_timestamp: i64,
pub max_timestamp: i64,
pub producer_id: i64,
pub producer_epoch: i16,
pub base_sequence: i32,
pub records: &'a ControlBatchOrRecords,
pub compression: RecordBatchCompression,
pub is_transactional: bool,
pub timestamp_type: RecordBatchTimestampType,
}
impl<'a> RecordBatchBodyRef<'a> {
fn write_records<W>(writer: &mut W, records: &ControlBatchOrRecords) -> Result<(), WriteError>
where
W: Write,
{
match records {
ControlBatchOrRecords::ControlBatch(control_batch) => control_batch.write(writer),
ControlBatchOrRecords::Records(records) => {
for record in records {
record.write(writer)?;
}
Ok(())
}
}
}
}
impl<'a, W> WriteType<W> for RecordBatchBodyRef<'a>
where
W: Write,
{
fn write(&self, writer: &mut W) -> Result<(), WriteError> {
let mut attributes: i16 = match self.compression {
RecordBatchCompression::NoCompression => 0,
RecordBatchCompression::Gzip => 1,
RecordBatchCompression::Snappy => 2,
RecordBatchCompression::Lz4 => 3,
RecordBatchCompression::Zstd => 4,
};
match self.timestamp_type {
RecordBatchTimestampType::CreateTime => (),
RecordBatchTimestampType::LogAppendTime => {
attributes |= 1 << 3;
}
}
if self.is_transactional {
attributes |= 1 << 4;
}
if matches!(self.records, ControlBatchOrRecords::ControlBatch(_)) {
attributes |= 1 << 5;
}
Int16(attributes).write(writer)?;
Int32(self.last_offset_delta).write(writer)?;
Int64(self.first_timestamp).write(writer)?;
Int64(self.max_timestamp).write(writer)?;
Int64(self.producer_id).write(writer)?;
Int16(self.producer_epoch).write(writer)?;
Int32(self.base_sequence).write(writer)?;
let n_records = match &self.records {
ControlBatchOrRecords::ControlBatch(_) => 1,
ControlBatchOrRecords::Records(records) => records.len(),
};
Int32(i32::try_from(n_records)?).write(writer)?;
match self.compression {
RecordBatchCompression::NoCompression => {
Self::write_records(writer, self.records)?;
}
#[cfg(feature = "compression-gzip")]
RecordBatchCompression::Gzip => {
use flate2::{write::GzEncoder, Compression};
let mut encoder = GzEncoder::new(writer, Compression::default());
Self::write_records(&mut encoder, self.records)?;
encoder.finish()?;
}
#[cfg(feature = "compression-lz4")]
RecordBatchCompression::Lz4 => {
use lz4::{liblz4::BlockMode, EncoderBuilder};
let mut encoder = EncoderBuilder::new()
.block_mode(
BlockMode::Independent,
)
.build(writer)?;
Self::write_records(&mut encoder, self.records)?;
let (_writer, res) = encoder.finish();
res?;
}
#[cfg(feature = "compression-snappy")]
RecordBatchCompression::Snappy => {
use snap::raw::{max_compress_len, Encoder};
let mut input = vec![];
Self::write_records(&mut input, self.records)?;
let mut encoder = Encoder::new();
let mut output = vec![0; max_compress_len(input.len())];
let len = encoder
.compress(&input, &mut output)
.map_err(|e| WriteError::Malformed(Box::new(e)))?;
writer.write_all(&output[..len])?;
}
#[cfg(feature = "compression-zstd")]
RecordBatchCompression::Zstd => {
use zstd::Encoder;
let mut encoder = Encoder::new(writer, 0)?;
Self::write_records(&mut encoder, self.records)?;
encoder.finish()?;
}
#[allow(unreachable_patterns)]
_ => {
return Err(WriteError::Malformed(
format!("Unimplemented compression: {:?}", self.compression).into(),
));
}
}
Ok(())
}
}
#[allow(dead_code)] fn ensure_eof<R>(reader: &mut R, msg: &str) -> Result<(), ReadError>
where
R: Read,
{
let mut buf = [0u8; 1];
match reader.read(&mut buf) {
Ok(0) => Ok(()),
Ok(_) => Err(ReadError::Malformed(msg.to_string().into())),
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(()),
Err(e) => Err(ReadError::IO(e)),
}
}
#[cfg(feature = "compression-snappy")]
fn carefully_decompress_snappy(
input: &[u8],
start_block_size: usize,
) -> Result<Vec<u8>, ReadError> {
use crate::protocol::primitives::UnsignedVarint;
use snap::raw::{decompress_len, Decoder};
if input.is_empty() {
return Err(ReadError::Malformed(Box::new(snap::Error::Empty)));
}
let uncompressed_size = decompress_len(input).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let uncompressed_size_encoded_length = {
let mut buf = Vec::with_capacity(100);
UnsignedVarint(uncompressed_size as u64)
.write(&mut buf)
.expect("this write should never fail");
buf.len()
};
let mut max_uncompressed_size = start_block_size;
loop {
let try_uncompressed_size = uncompressed_size.min(max_uncompressed_size);
let try_input = {
let mut buf = Cursor::new(Vec::with_capacity(input.len()));
UnsignedVarint(try_uncompressed_size as u64)
.write(&mut buf)
.expect("this write should never fail");
buf.write_all(&input[uncompressed_size_encoded_length..])
.expect("this write should never fail");
buf.into_inner()
};
let mut decoder = Decoder::new();
let mut output = vec![0; try_uncompressed_size];
let actual_uncompressed_size = match decoder.decompress(&try_input, &mut output) {
Ok(size) => size,
Err(e) => {
let looks_like_dst_too_small = match e {
snap::Error::CopyWrite { .. } => true,
snap::Error::Literal {
len,
dst_len,
src_len,
} => (dst_len < len) && (src_len >= len),
snap::Error::HeaderMismatch {
expected_len,
got_len,
} => expected_len < got_len,
snap::Error::BufferTooSmall { .. } => {
unreachable!("Just allocated a correctly-sized output buffer.")
}
snap::Error::Offset { .. } => false,
_ => false,
};
let used_smaller_dst = max_uncompressed_size < uncompressed_size;
if looks_like_dst_too_small && used_smaller_dst {
max_uncompressed_size *= 2;
continue;
} else {
return Err(ReadError::Malformed(Box::new(e)));
}
}
};
if actual_uncompressed_size != uncompressed_size {
return Err(ReadError::Malformed(
"broken snappy data".to_string().into(),
));
}
break Ok(output);
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use crate::protocol::test_utils::test_roundtrip;
use super::*;
use assert_matches::assert_matches;
test_roundtrip!(RecordHeader, test_record_header_roundtrip);
test_roundtrip!(Record, test_record_roundtrip);
test_roundtrip!(ControlBatchRecord, test_control_batch_record_roundtrip);
#[test]
fn test_control_batch_record_unknown_version() {
let mut buf = Cursor::new(Vec::<u8>::new());
Int16(1).write(&mut buf).unwrap();
Int16(0).write(&mut buf).unwrap();
buf.set_position(0);
let err = ControlBatchRecord::read(&mut buf).unwrap_err();
assert_matches!(err, ReadError::Malformed(_));
assert_eq!(
err.to_string(),
"Malformed data: Unknown control batch record version: 1",
);
}
#[test]
fn test_control_batch_record_unknown_type() {
let mut buf = Cursor::new(Vec::<u8>::new());
Int16(0).write(&mut buf).unwrap();
Int16(2).write(&mut buf).unwrap();
buf.set_position(0);
let err = ControlBatchRecord::read(&mut buf).unwrap_err();
assert_matches!(err, ReadError::Malformed(_));
assert_eq!(
err.to_string(),
"Malformed data: Unknown control batch record type: 2",
);
}
test_roundtrip!(RecordBatchBody, test_record_batch_body_roundtrip);
test_roundtrip!(RecordBatch, test_record_batch_roundtrip);
#[test]
fn test_decode_fixture_nocompression() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x4b\x00\x00\x00\x00".to_vec(),
b"\x02\x27\x24\xfe\xcd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x61".to_vec(),
b"\xd5\x9b\x77\x00\x00\x00\x00\x61\xd5\x9b\x77\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x32\x00\x00".to_vec(),
b"\x00\x00\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06".to_vec(),
b"\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1641388919,
max_timestamp: 1641388919,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: Some(vec![]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
}]),
compression: RecordBatchCompression::NoCompression,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
assert_eq!(data, data2);
}
#[cfg(feature = "compression-gzip")]
#[test]
fn test_decode_fixture_gzip() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x64\x00\x00\x00\x00".to_vec(),
b"\x02\xba\x41\x46\x65\x00\x01\x00\x00\x00\x00\x00\x00\x01\x7e\x90".to_vec(),
b"\xb3\x34\x67\x00\x00\x01\x7e\x90\xb3\x34\x67\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x1f\x8b\x08".to_vec(),
b"\x00\x00\x00\x00\x00\x00\x03\xfb\xc3\xc8\xc0\xc0\x70\x82\xb1\x82".to_vec(),
b"\x0e\x40\x2c\x23\x35\x27\x27\x5f\x21\x3b\x31\x2d\x3b\x91\x89\x2d".to_vec(),
b"\x2d\x3f\x9f\x2d\x29\xb1\x08\x00\xe4\xcd\xba\x1f\x80\x00\x00\x00".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643105170535,
max_timestamp: 1643105170535,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
}]),
compression: RecordBatchCompression::Gzip,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}
#[cfg(feature = "compression-lz4")]
#[test]
fn test_decode_fixture_lz4() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x63\x00\x00\x00\x00".to_vec(),
b"\x02\x1b\xa5\x92\x35\x00\x03\x00\x00\x00\x00\x00\x00\x01\x7e\xb1".to_vec(),
b"\x1f\xc7\x24\x00\x00\x01\x7e\xb1\x1f\xc7\x24\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x04\x22\x4d".to_vec(),
b"\x18\x60\x40\x82\x23\x00\x00\x00\x8f\xfc\x01\x00\x00\x00\xc8\x01".to_vec(),
b"\x78\x01\x00\x50\xf0\x06\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66".to_vec(),
b"\x6b\x61\x02\x06\x66\x6f\x6f\x06\x62\x61\x72\x00\x00\x00\x00".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643649156900,
max_timestamp: 1643649156900,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
}]),
compression: RecordBatchCompression::Lz4,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}
#[cfg(feature = "compression-snappy")]
mod snappy {
use super::*;
#[test]
fn test_decode_fixture_snappy() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x58\x00\x00\x00\x00".to_vec(),
b"\x02\xad\x86\xf4\xf4\x00\x02\x00\x00\x00\x00\x00\x00\x01\x7e\xb6".to_vec(),
b"\x45\x0e\x52\x00\x00\x01\x7e\xb6\x45\x0e\x52\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x80\x01\x1c".to_vec(),
b"\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a\x01\x00\x50\x16".to_vec(),
b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
b"\x06\x62\x61\x72".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643735486034,
max_timestamp: 1643735486034,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
}]),
compression: RecordBatchCompression::Snappy,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}
#[test]
fn test_decode_fixture_snappy_java() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8c\x00\x00\x00\x00".to_vec(),
b"\x02\x79\x1e\x2d\xce\x00\x02\x00\x00\x00\x01\x00\x00\x01\x7f\x07".to_vec(),
b"\x25\x7a\xb1\x00\x00\x01\x7f\x07\x25\x7a\xb1\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x02\x82\x53\x4e".to_vec(),
b"\x41\x50\x50\x59\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00".to_vec(),
b"\x47\xff\x01\x1c\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a".to_vec(),
b"\x01\x00\x64\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02".to_vec(),
b"\x06\x66\x6f\x6f\x06\x62\x61\x72\xfa\x01\x00\x00\x02\xfe\x80\x00".to_vec(),
b"\x96\x80\x00\x4c\x14\x73\x6f\x6d\x65\x20\x76\x61\x6c\x75\x65\x02".to_vec(),
b"\x06\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 1,
first_timestamp: 1645092371121,
max_timestamp: 1645092371121,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![
Record {
timestamp_delta: 0,
offset_delta: 0,
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
},
Record {
timestamp_delta: 0,
offset_delta: 1,
key: Some(vec![b'x'; 100]),
value: Some(b"some value".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
},
]),
compression: RecordBatchCompression::Snappy,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}
#[test]
fn test_decode_java_specific_oom() {
let data = [
0x0a, 0x0a, 0x83, 0x00, 0xd4, 0x00, 0x00, 0x22, 0x00, 0x4b, 0x08, 0xd2, 0x22, 0xfb,
0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x9b, 0x00, 0x9b, 0x0a, 0x40,
0x00, 0x00, 0x4b, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0xd3, 0x82, 0x53,
0x4e, 0x41, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x03, 0x01, 0x00, 0x00, 0xfc, 0x00, 0x09, 0x09, 0x09, 0x09, 0x09,
0x09, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0x00, 0x80,
0x00, 0x00, 0x00, 0x00, 0xb0, 0x9b, 0x00,
]
.to_vec();
let err = RecordBatchBody::read(&mut Cursor::new(data)).unwrap_err();
assert_matches!(err, ReadError::Malformed(_));
assert_eq!(err.to_string(), "Malformed data: Java-specific Snappy-compressed data has illegal chunk length, got 4227860745 bytes but only 38 bytes are left.");
}
#[test]
fn test_carefully_decompress_snappy_empty_input() {
let err = carefully_decompress_snappy(&[], 1).unwrap_err();
assert_matches!(err, ReadError::Malformed(_));
}
#[test]
fn test_carefully_decompress_snappy_empty_payload() {
let compressed = compress(&[]);
let data = carefully_decompress_snappy(&compressed, 1).unwrap();
assert!(data.is_empty());
}
proptest! {
#![proptest_config(ProptestConfig{cases: 200, ..Default::default()})]
#[test]
fn test_carefully_decompress_snappy(input in prop::collection::vec(any::<u8>(), 0..10_000)) {
let compressed = compress(&input);
let input2 = carefully_decompress_snappy(&compressed, 1).unwrap();
assert_eq!(input, input2);
}
}
fn compress(data: &[u8]) -> Vec<u8> {
use snap::raw::{max_compress_len, Encoder};
let mut encoder = Encoder::new();
let mut output = vec![0; max_compress_len(data.len())];
let l = encoder.compress(data, &mut output).unwrap();
output[..l].to_vec()
}
}
#[cfg(feature = "compression-zstd")]
#[test]
fn test_decode_fixture_zstd() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5d\x00\x00\x00\x00".to_vec(),
b"\x02\xa1\x6e\x4e\x95\x00\x04\x00\x00\x00\x00\x00\x00\x01\x7e\xbf".to_vec(),
b"\x78\xf3\xad\x00\x00\x01\x7e\xbf\x78\xf3\xad\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\xb5\x2f".to_vec(),
b"\xfd\x00\x58\x1d\x01\x00\xe8\xfc\x01\x00\x00\x00\xc8\x01\x78\x16".to_vec(),
b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
b"\x06\x62\x61\x72\x01\x00\x20\x05\x5c".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643889882029,
max_timestamp: 1643889882029,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
}]),
compression: RecordBatchCompression::Zstd,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}
#[test]
fn test_decode_fixture_null_key() {
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x1a\x00\x00\x00\x00".to_vec(),
b"\x02\x67\x98\xb9\x54\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7e\xbe".to_vec(),
b"\xdc\x91\xf6\x00\x00\x01\x7e\xbe\xdc\x91\xf6\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\xce\x03\x00".to_vec(),
b"\x00\x00\x01\xce\x01\x0a\x65\x0a\x2f\x74\x65\x73\x74\x5f\x74\x6f".to_vec(),
b"\x70\x69\x63\x5f\x33\x37\x33\x39\x38\x66\x38\x64\x2d\x39\x35\x66".to_vec(),
b"\x38\x2d\x34\x34\x65\x65\x2d\x38\x33\x61\x34\x2d\x34\x64\x30\x63".to_vec(),
b"\x35\x39\x32\x62\x34\x34\x36\x64\x12\x32\x0a\x03\x75\x70\x63\x12".to_vec(),
b"\x17\x0a\x04\x75\x73\x65\x72\x10\x03\x1a\x0a\x12\x08\x00\x00\x00".to_vec(),
b"\x00\x00\x00\xf0\x3f\x22\x01\x00\x12\x10\x0a\x04\x74\x69\x6d\x65".to_vec(),
b"\x10\x04\x1a\x03\x0a\x01\x64\x22\x01\x00\x18\x01\x04\x18\x63\x6f".to_vec(),
b"\x6e\x74\x65\x6e\x74\x2d\x74\x79\x70\x65\xa4\x01\x61\x70\x70\x6c".to_vec(),
b"\x69\x63\x61\x74\x69\x6f\x6e\x2f\x78\x2d\x70\x72\x6f\x74\x6f\x62".to_vec(),
b"\x75\x66\x3b\x20\x73\x63\x68\x65\x6d\x61\x3d\x22\x69\x6e\x66\x6c".to_vec(),
b"\x75\x78\x64\x61\x74\x61\x2e\x69\x6f\x78\x2e\x77\x72\x69\x74\x65".to_vec(),
b"\x5f\x62\x75\x66\x66\x65\x72\x2e\x76\x31\x2e\x57\x72\x69\x74\x65".to_vec(),
b"\x42\x75\x66\x66\x65\x72\x50\x61\x79\x6c\x6f\x61\x64\x22\x1a\x69".to_vec(),
b"\x6f\x78\x2d\x6e\x61\x6d\x65\x73\x70\x61\x63\x65\x12\x6e\x61\x6d".to_vec(),
b"\x65\x73\x70\x61\x63\x65".to_vec(),
]
.concat();
let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643879633398,
max_timestamp: 1643879633398,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: None,
value: Some(vec![
10, 101, 10, 47, 116, 101, 115, 116, 95, 116, 111, 112, 105, 99, 95, 51, 55,
51, 57, 56, 102, 56, 100, 45, 57, 53, 102, 56, 45, 52, 52, 101, 101, 45, 56,
51, 97, 52, 45, 52, 100, 48, 99, 53, 57, 50, 98, 52, 52, 54, 100, 18, 50, 10,
3, 117, 112, 99, 18, 23, 10, 4, 117, 115, 101, 114, 16, 3, 26, 10, 18, 8, 0, 0,
0, 0, 0, 0, 240, 63, 34, 1, 0, 18, 16, 10, 4, 116, 105, 109, 101, 16, 4, 26, 3,
10, 1, 100, 34, 1, 0, 24, 1,
]),
headers: vec![
RecordHeader {
key: "content-type".to_owned(),
value: br#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#.to_vec(),
},
RecordHeader {
key: "iox-namespace".to_owned(),
value: b"namespace".to_vec(),
},
],
}]),
compression: RecordBatchCompression::NoCompression,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);
let mut data2 = vec![];
actual.write(&mut data2).unwrap();
assert_eq!(data, data2);
}
}