#[derive(Debug, Clone)]
pub struct DecodedRecord {
pub timestamp_ms: i64,
pub key: Option<Vec<u8>>,
pub value: Vec<u8>,
pub headers: Vec<(String, Vec<u8>)>,
}
#[derive(Debug, Clone)]
pub struct PartitionProduceData {
pub partition_index: i32,
pub records: Vec<DecodedRecord>,
pub compression_codec: i8,
}
#[derive(Debug, Clone)]
pub struct TopicProduceData {
pub name: String,
pub partitions: Vec<PartitionProduceData>,
}
#[derive(Debug, Clone)]
pub struct ProduceRequestV9 {
pub transactional_id: Option<String>,
pub acks: i16,
pub timeout_ms: i32,
pub topics: Vec<TopicProduceData>,
}
#[derive(Debug, Clone, Copy)]
pub struct PartitionProduceResult {
pub partition_index: i32,
pub error_code: i16,
pub base_offset: i64,
pub log_append_time_ms: i64,
pub log_start_offset: i64,
}
#[derive(Debug, Clone)]
pub struct TopicProduceResult {
pub name: String,
pub partitions: Vec<PartitionProduceResult>,
}
pub(crate) fn take<'a>(buf: &mut &'a [u8], n: usize) -> Result<&'a [u8], String> {
if buf.len() < n {
return Err(format!("short read: wanted {n}, have {}", buf.len()));
}
let (head, tail) = buf.split_at(n);
*buf = tail;
Ok(head)
}
pub(crate) fn read_i8(buf: &mut &[u8]) -> Result<i8, String> {
Ok(take(buf, 1)?[0] as i8)
}
pub(crate) fn read_i16(buf: &mut &[u8]) -> Result<i16, String> {
let b = take(buf, 2)?;
Ok(i16::from_be_bytes([b[0], b[1]]))
}
pub(crate) fn read_i32(buf: &mut &[u8]) -> Result<i32, String> {
let b = take(buf, 4)?;
Ok(i32::from_be_bytes([b[0], b[1], b[2], b[3]]))
}
pub(crate) fn read_i64(buf: &mut &[u8]) -> Result<i64, String> {
let b = take(buf, 8)?;
Ok(i64::from_be_bytes([b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7]]))
}
pub fn read_unsigned_varint(buf: &mut &[u8]) -> Result<u32, String> {
let mut value: u32 = 0;
let mut shift: u32 = 0;
loop {
if buf.is_empty() {
return Err("truncated unsigned varint".into());
}
let byte = buf[0];
*buf = &buf[1..];
value |= ((byte & 0x7F) as u32) << shift;
if (byte & 0x80) == 0 {
return Ok(value);
}
shift += 7;
if shift >= 32 {
return Err("unsigned varint overflow".into());
}
}
}
pub fn read_signed_varint(buf: &mut &[u8]) -> Result<i64, String> {
let mut value: u64 = 0;
let mut shift: u32 = 0;
loop {
if buf.is_empty() {
return Err("truncated signed varint".into());
}
let byte = buf[0];
*buf = &buf[1..];
value |= ((byte & 0x7F) as u64) << shift;
if (byte & 0x80) == 0 {
let signed = ((value >> 1) as i64) ^ -((value & 1) as i64);
return Ok(signed);
}
shift += 7;
if shift >= 64 {
return Err("signed varint overflow".into());
}
}
}
pub(crate) fn read_compact_nullable_string(buf: &mut &[u8]) -> Result<Option<String>, String> {
let len_plus_one = read_unsigned_varint(buf)?;
if len_plus_one == 0 {
return Ok(None);
}
let len = (len_plus_one - 1) as usize;
let bytes = take(buf, len)?;
String::from_utf8(bytes.to_vec())
.map(Some)
.map_err(|e| format!("invalid utf8: {e}"))
}
pub(crate) fn read_compact_string(buf: &mut &[u8]) -> Result<String, String> {
read_compact_nullable_string(buf)?.ok_or_else(|| "expected non-null compact string".into())
}
pub(crate) fn read_compact_nullable_bytes<'a>(
buf: &mut &'a [u8],
) -> Result<Option<&'a [u8]>, String> {
let len_plus_one = read_unsigned_varint(buf)?;
if len_plus_one == 0 {
return Ok(None);
}
let len = (len_plus_one - 1) as usize;
let bytes = take(buf, len)?;
Ok(Some(bytes))
}
pub(crate) fn skip_tag_buffer(buf: &mut &[u8]) -> Result<(), String> {
let count = read_unsigned_varint(buf)?;
for _ in 0..count {
let _tag_id = read_unsigned_varint(buf)?;
let len = read_unsigned_varint(buf)? as usize;
take(buf, len)?;
}
Ok(())
}
pub(crate) fn push_unsigned_varint(buf: &mut Vec<u8>, mut value: u32) {
while (value & !0x7F) != 0 {
buf.push(((value & 0x7F) | 0x80) as u8);
value >>= 7;
}
buf.push(value as u8);
}
pub(crate) fn push_signed_varint(buf: &mut Vec<u8>, value: i64) {
let zz = ((value << 1) ^ (value >> 63)) as u64;
let mut v = zz;
while (v & !0x7F) != 0 {
buf.push(((v & 0x7F) | 0x80) as u8);
v >>= 7;
}
buf.push(v as u8);
}
pub(crate) fn push_compact_string(buf: &mut Vec<u8>, s: &str) {
push_unsigned_varint(buf, (s.len() as u32) + 1);
buf.extend_from_slice(s.as_bytes());
}
pub(crate) fn push_empty_tag_buffer(buf: &mut Vec<u8>) {
buf.push(0);
}
pub fn parse_record_batch(batch_bytes: &[u8]) -> Result<(Vec<DecodedRecord>, i16), String> {
use crate::record_compression::{decompress, CompressionCodec};
let mut cur = batch_bytes;
let _base_offset = read_i64(&mut cur)?;
let _batch_length = read_i32(&mut cur)?;
let _partition_leader_epoch = read_i32(&mut cur)?;
let magic = read_i8(&mut cur)?;
if magic != 2 {
return Err(format!("unsupported RecordBatch magic: {magic}"));
}
let _crc = read_i32(&mut cur)?;
let attributes = read_i16(&mut cur)?;
let codec = CompressionCodec::from_attributes_bits((attributes & 0x7) as i8)
.ok_or_else(|| format!("unknown compression codec: {}", attributes & 0x7))?;
let _last_offset_delta = read_i32(&mut cur)?;
let base_timestamp = read_i64(&mut cur)?;
let _max_timestamp = read_i64(&mut cur)?;
let _producer_id = read_i64(&mut cur)?;
let _producer_epoch = read_i16(&mut cur)?;
let _base_sequence = read_i32(&mut cur)?;
let records_count = read_i32(&mut cur)?;
if records_count < 0 {
return Err(format!("negative records count: {records_count}"));
}
let decompressed_blob: Option<Vec<u8>> = if codec == CompressionCodec::None {
None
} else {
Some(decompress(codec, cur)?)
};
let mut records_cur: &[u8] = match &decompressed_blob {
Some(v) => v.as_slice(),
None => cur,
};
let mut records = Vec::with_capacity(records_count as usize);
for _ in 0..records_count {
let record_len = read_signed_varint(&mut records_cur)?;
if record_len < 0 {
return Err(format!("negative record length: {record_len}"));
}
if (record_len as usize) > records_cur.len() {
return Err("record length overruns batch".into());
}
let mut body = &records_cur[..record_len as usize];
records_cur = &records_cur[record_len as usize..];
let _attributes = read_i8(&mut body)?;
let timestamp_delta = read_signed_varint(&mut body)?;
let _offset_delta = read_signed_varint(&mut body)?;
let key_len = read_signed_varint(&mut body)?;
let key = if key_len < 0 {
None
} else {
Some(take(&mut body, key_len as usize)?.to_vec())
};
let value_len = read_signed_varint(&mut body)?;
let value = if value_len < 0 {
Vec::new()
} else {
take(&mut body, value_len as usize)?.to_vec()
};
let headers_len = read_signed_varint(&mut body)?;
if headers_len < 0 {
return Err(format!("negative headers count: {headers_len}"));
}
let mut headers = Vec::with_capacity(headers_len as usize);
for _ in 0..headers_len {
let hk_len = read_signed_varint(&mut body)?;
if hk_len < 0 {
return Err("negative header key length".into());
}
let hk_bytes = take(&mut body, hk_len as usize)?;
let hk = String::from_utf8(hk_bytes.to_vec())
.map_err(|e| format!("invalid header key utf8: {e}"))?;
let hv_len = read_signed_varint(&mut body)?;
let hv = if hv_len < 0 {
Vec::new()
} else {
take(&mut body, hv_len as usize)?.to_vec()
};
headers.push((hk, hv));
}
records.push(DecodedRecord {
timestamp_ms: base_timestamp.saturating_add(timestamp_delta),
key,
value,
headers,
});
}
Ok((records, attributes))
}
pub fn parse_produce_v9(body: &[u8]) -> Result<ProduceRequestV9, String> {
let mut cur = body;
let transactional_id = read_compact_nullable_string(&mut cur)?;
let acks = read_i16(&mut cur)?;
let timeout_ms = read_i32(&mut cur)?;
let topics_len_plus_one = read_unsigned_varint(&mut cur)?;
if topics_len_plus_one == 0 {
return Err("produce request topic array is null".into());
}
let topics_len = (topics_len_plus_one - 1) as usize;
let mut topics = Vec::with_capacity(topics_len);
for _ in 0..topics_len {
let name = read_compact_string(&mut cur)?;
let parts_len_plus_one = read_unsigned_varint(&mut cur)?;
if parts_len_plus_one == 0 {
return Err(format!("topic {name} partition array is null"));
}
let parts_len = (parts_len_plus_one - 1) as usize;
let mut parts = Vec::with_capacity(parts_len);
for _ in 0..parts_len {
let partition_index = read_i32(&mut cur)?;
let records_bytes = read_compact_nullable_bytes(&mut cur)?;
let (records, attributes) = match records_bytes {
None => (Vec::new(), 0i16),
Some(bytes) => parse_record_batch(bytes)?,
};
skip_tag_buffer(&mut cur)?;
parts.push(PartitionProduceData {
partition_index,
records,
compression_codec: (attributes & 0x7) as i8,
});
}
skip_tag_buffer(&mut cur)?;
topics.push(TopicProduceData {
name,
partitions: parts,
});
}
skip_tag_buffer(&mut cur)?;
Ok(ProduceRequestV9 {
transactional_id,
acks,
timeout_ms,
topics,
})
}
pub fn serialize_produce_v9_response(
correlation_id: i32,
results: &[TopicProduceResult],
) -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&correlation_id.to_be_bytes());
push_empty_tag_buffer(&mut out);
push_unsigned_varint(&mut out, (results.len() as u32) + 1);
for topic in results {
push_compact_string(&mut out, &topic.name);
push_unsigned_varint(&mut out, (topic.partitions.len() as u32) + 1);
for p in &topic.partitions {
out.extend_from_slice(&p.partition_index.to_be_bytes());
out.extend_from_slice(&p.error_code.to_be_bytes());
out.extend_from_slice(&p.base_offset.to_be_bytes());
out.extend_from_slice(&p.log_append_time_ms.to_be_bytes());
out.extend_from_slice(&p.log_start_offset.to_be_bytes());
push_unsigned_varint(&mut out, 1);
push_unsigned_varint(&mut out, 0);
push_empty_tag_buffer(&mut out);
}
push_empty_tag_buffer(&mut out);
}
out.extend_from_slice(&0i32.to_be_bytes());
push_empty_tag_buffer(&mut out);
out
}
#[cfg(test)]
pub(crate) fn one_record_batch_for_testing(key: Option<&[u8]>, value: &[u8]) -> Vec<u8> {
let mut record = Vec::new();
record.push(0); record.push(0); record.push(0); match key {
None => record.push(1), Some(k) => {
let zz = ((k.len() as i64) << 1) as u64;
let mut v = zz;
while (v & !0x7F) != 0 {
record.push(((v & 0x7F) | 0x80) as u8);
v >>= 7;
}
record.push(v as u8);
record.extend_from_slice(k);
}
}
let zz = ((value.len() as i64) << 1) as u64;
let mut v = zz;
while (v & !0x7F) != 0 {
record.push(((v & 0x7F) | 0x80) as u8);
v >>= 7;
}
record.push(v as u8);
record.extend_from_slice(value);
record.push(0);
let mut record_framed = Vec::new();
let zz = ((record.len() as i64) << 1) as u64;
let mut v = zz;
while (v & !0x7F) != 0 {
record_framed.push(((v & 0x7F) | 0x80) as u8);
v >>= 7;
}
record_framed.push(v as u8);
record_framed.extend_from_slice(&record);
let mut batch = Vec::new();
batch.extend_from_slice(&0i64.to_be_bytes()); batch.extend_from_slice(&0i32.to_be_bytes()); batch.extend_from_slice(&(-1i32).to_be_bytes()); batch.push(2); batch.extend_from_slice(&0i32.to_be_bytes()); batch.extend_from_slice(&0i16.to_be_bytes()); batch.extend_from_slice(&0i32.to_be_bytes()); batch.extend_from_slice(&1_000i64.to_be_bytes()); batch.extend_from_slice(&1_000i64.to_be_bytes()); batch.extend_from_slice(&(-1i64).to_be_bytes()); batch.extend_from_slice(&(-1i16).to_be_bytes()); batch.extend_from_slice(&(-1i32).to_be_bytes()); batch.extend_from_slice(&1i32.to_be_bytes()); batch.extend_from_slice(&record_framed);
batch
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unsigned_varint_roundtrip() {
for v in [0u32, 1, 127, 128, 300, 16_383, 16_384, u32::MAX] {
let mut buf = Vec::new();
push_unsigned_varint(&mut buf, v);
let mut slice = buf.as_slice();
let decoded = read_unsigned_varint(&mut slice).unwrap();
assert_eq!(decoded, v, "mismatch on {v}");
assert!(slice.is_empty());
}
}
#[test]
fn signed_varint_zigzag() {
fn encode(n: i64) -> Vec<u8> {
let zz = ((n << 1) ^ (n >> 63)) as u64;
let mut buf = Vec::new();
let mut v = zz;
while (v & !0x7F) != 0 {
buf.push(((v & 0x7F) | 0x80) as u8);
v >>= 7;
}
buf.push(v as u8);
buf
}
for n in [-1i64, 0, 1, -2, 2, -63, 64, 2147483647, -2147483648] {
let enc = encode(n);
let mut slice = enc.as_slice();
assert_eq!(read_signed_varint(&mut slice).unwrap(), n);
assert!(slice.is_empty());
}
}
use super::one_record_batch_for_testing as one_record_batch;
#[test]
fn parses_single_record_batch() {
let batch = one_record_batch(Some(b"k"), b"hello");
let (records, attrs) = parse_record_batch(&batch).unwrap();
assert_eq!(attrs & 0x7, 0);
assert_eq!(records.len(), 1);
assert_eq!(records[0].key.as_deref(), Some(b"k".as_ref()));
assert_eq!(records[0].value, b"hello");
assert_eq!(records[0].timestamp_ms, 1_000);
assert!(records[0].headers.is_empty());
}
#[test]
fn rejects_unknown_compression_codec() {
let mut batch = one_record_batch(None, b"x");
batch[21] = 0;
batch[22] = 5;
let err = parse_record_batch(&batch).unwrap_err();
assert!(err.contains("unknown compression codec"), "unexpected error: {err}");
}
fn swap_records_for_compressed(
batch: &[u8],
codec: crate::record_compression::CompressionCodec,
) -> Vec<u8> {
assert!(batch.len() >= 61);
let records_blob = &batch[61..];
let compressed = crate::record_compression::compress(codec, records_blob).unwrap();
let mut out = Vec::with_capacity(61 + compressed.len());
out.extend_from_slice(&batch[..21]);
let attributes = codec.attributes_bits();
out.extend_from_slice(&attributes.to_be_bytes());
out.extend_from_slice(&batch[23..61]);
out.extend_from_slice(&compressed);
out
}
#[test]
fn decompresses_gzip_and_yields_records() {
let uncompressed = one_record_batch(Some(b"k"), b"gzipped-hello");
let batch = swap_records_for_compressed(
&uncompressed,
crate::record_compression::CompressionCodec::Gzip,
);
let (records, attrs) = parse_record_batch(&batch).unwrap();
assert_eq!(attrs & 0x7, 1);
assert_eq!(records.len(), 1);
assert_eq!(records[0].value, b"gzipped-hello");
assert_eq!(records[0].key.as_deref(), Some(b"k".as_ref()));
}
#[test]
fn decompresses_snappy_and_yields_records() {
let uncompressed = one_record_batch(None, b"snappy-hello");
let batch = swap_records_for_compressed(
&uncompressed,
crate::record_compression::CompressionCodec::Snappy,
);
let (records, attrs) = parse_record_batch(&batch).unwrap();
assert_eq!(attrs & 0x7, 2);
assert_eq!(records.len(), 1);
assert_eq!(records[0].value, b"snappy-hello");
}
#[test]
fn decompresses_lz4_and_yields_records() {
let uncompressed = one_record_batch(None, b"lz4-hello");
let batch = swap_records_for_compressed(
&uncompressed,
crate::record_compression::CompressionCodec::Lz4,
);
let (records, attrs) = parse_record_batch(&batch).unwrap();
assert_eq!(attrs & 0x7, 3);
assert_eq!(records[0].value, b"lz4-hello");
}
#[test]
fn decompresses_zstd_and_yields_records() {
let uncompressed = one_record_batch(None, b"zstd-hello");
let batch = swap_records_for_compressed(
&uncompressed,
crate::record_compression::CompressionCodec::Zstd,
);
let (records, attrs) = parse_record_batch(&batch).unwrap();
assert_eq!(attrs & 0x7, 4);
assert_eq!(records[0].value, b"zstd-hello");
}
#[test]
fn parses_produce_v9_single_topic() {
let batch = one_record_batch(None, b"v");
let mut body = Vec::new();
body.push(0); body.extend_from_slice(&(-1i16).to_be_bytes()); body.extend_from_slice(&30_000i32.to_be_bytes());
push_unsigned_varint(&mut body, 2);
push_unsigned_varint(&mut body, 2);
body.push(b't');
push_unsigned_varint(&mut body, 2);
body.extend_from_slice(&0i32.to_be_bytes());
push_unsigned_varint(&mut body, (batch.len() as u32) + 1);
body.extend_from_slice(&batch);
body.push(0);
body.push(0);
body.push(0);
let req = parse_produce_v9(&body).unwrap();
assert_eq!(req.acks, -1);
assert_eq!(req.timeout_ms, 30_000);
assert_eq!(req.topics.len(), 1);
assert_eq!(req.topics[0].name, "t");
assert_eq!(req.topics[0].partitions.len(), 1);
assert_eq!(req.topics[0].partitions[0].partition_index, 0);
assert_eq!(req.topics[0].partitions[0].records.len(), 1);
assert_eq!(req.topics[0].partitions[0].records[0].value, b"v");
}
#[test]
fn response_shape_matches_spec() {
let results = vec![TopicProduceResult {
name: "orders".to_string(),
partitions: vec![PartitionProduceResult {
partition_index: 2,
error_code: 0,
base_offset: 41,
log_append_time_ms: -1,
log_start_offset: 0,
}],
}];
let data = serialize_produce_v9_response(99, &results);
assert_eq!(&data[0..4], &99i32.to_be_bytes());
assert_eq!(data[4], 0);
assert_eq!(data[5], 2);
assert_eq!(data[6], 7);
assert_eq!(&data[7..13], b"orders");
assert_eq!(data[13], 2);
assert_eq!(&data[14..18], &2i32.to_be_bytes());
assert_eq!(&data[18..20], &0i16.to_be_bytes());
assert_eq!(&data[20..28], &41i64.to_be_bytes());
assert_eq!(&data[28..36], &(-1i64).to_be_bytes());
assert_eq!(&data[36..44], &0i64.to_be_bytes());
assert_eq!(data[44], 1);
assert_eq!(data[45], 0);
assert_eq!(data[46], 0);
assert_eq!(data[47], 0);
assert_eq!(&data[48..52], &0i32.to_be_bytes());
assert_eq!(data[52], 0);
assert_eq!(data.len(), 53);
}
}