use crate::partitions::KafkaMessage;
use crate::produce_codec::{
push_compact_string, push_empty_tag_buffer, push_signed_varint, push_unsigned_varint,
read_compact_string, read_i32, read_i64, read_i8, read_unsigned_varint, skip_tag_buffer, take,
};
#[derive(Debug, Clone)]
pub struct FetchPartitionRequest {
pub partition_index: i32,
pub fetch_offset: i64,
pub partition_max_bytes: i32,
}
#[derive(Debug, Clone)]
pub struct FetchTopicRequest {
pub topic: String,
pub partitions: Vec<FetchPartitionRequest>,
}
#[derive(Debug, Clone)]
pub struct FetchRequestV12 {
pub max_wait_ms: i32,
pub min_bytes: i32,
pub max_bytes: i32,
pub session_id: i32,
pub topics: Vec<FetchTopicRequest>,
}
#[derive(Debug, Clone)]
pub struct FetchPartitionResponse {
pub partition_index: i32,
pub error_code: i16,
pub high_watermark: i64,
pub log_start_offset: i64,
pub records: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct FetchTopicResponse {
pub topic: String,
pub partitions: Vec<FetchPartitionResponse>,
}
pub fn parse_fetch_v12(body: &[u8]) -> Result<FetchRequestV12, String> {
let mut cur = body;
let _replica_id = read_i32(&mut cur)?;
let max_wait_ms = read_i32(&mut cur)?;
let min_bytes = read_i32(&mut cur)?;
let max_bytes = read_i32(&mut cur)?;
let _isolation_level = read_i8(&mut cur)?;
let session_id = read_i32(&mut cur)?;
let _session_epoch = read_i32(&mut cur)?;
let topics_len_plus_one = read_unsigned_varint(&mut cur)?;
if topics_len_plus_one == 0 {
return Err("fetch topics array is null".into());
}
let topics_len = (topics_len_plus_one - 1) as usize;
let mut topics = Vec::with_capacity(topics_len);
for _ in 0..topics_len {
let topic = read_compact_string(&mut cur)?;
let parts_len_plus_one = read_unsigned_varint(&mut cur)?;
if parts_len_plus_one == 0 {
return Err(format!("fetch partitions array for {topic} is null"));
}
let parts_len = (parts_len_plus_one - 1) as usize;
let mut partitions = Vec::with_capacity(parts_len);
for _ in 0..parts_len {
let partition_index = read_i32(&mut cur)?;
let _current_leader_epoch = read_i32(&mut cur)?;
let fetch_offset = read_i64(&mut cur)?;
let _last_fetched_epoch = read_i32(&mut cur)?;
let _log_start_offset = read_i64(&mut cur)?;
let partition_max_bytes = read_i32(&mut cur)?;
skip_tag_buffer(&mut cur)?;
partitions.push(FetchPartitionRequest {
partition_index,
fetch_offset,
partition_max_bytes,
});
}
skip_tag_buffer(&mut cur)?;
topics.push(FetchTopicRequest { topic, partitions });
}
let forgotten_len_plus_one = read_unsigned_varint(&mut cur)?;
if forgotten_len_plus_one > 0 {
for _ in 0..(forgotten_len_plus_one - 1) {
let _forgotten_topic = read_compact_string(&mut cur)?;
let plen_plus_one = read_unsigned_varint(&mut cur)?;
if plen_plus_one > 0 {
for _ in 0..(plen_plus_one - 1) {
let _ = read_i32(&mut cur)?;
}
}
skip_tag_buffer(&mut cur)?;
}
}
let rack_len_plus_one = read_unsigned_varint(&mut cur)?;
if rack_len_plus_one > 0 {
let rack_len = (rack_len_plus_one - 1) as usize;
let _ = take(&mut cur, rack_len)?;
}
skip_tag_buffer(&mut cur)?;
Ok(FetchRequestV12 {
max_wait_ms,
min_bytes,
max_bytes,
session_id,
topics,
})
}
pub fn serialize_record_batch_v2(records: &[&KafkaMessage]) -> Vec<u8> {
serialize_record_batch_v2_with_compression(
records,
crate::record_compression::CompressionCodec::None,
)
}
pub fn serialize_record_batch_v2_with_compression(
records: &[&KafkaMessage],
codec: crate::record_compression::CompressionCodec,
) -> Vec<u8> {
if records.is_empty() {
return Vec::new();
}
let base_offset = records[0].offset;
let base_timestamp = records[0].timestamp;
let max_timestamp = records.iter().map(|r| r.timestamp).max().unwrap_or(base_timestamp);
let last_offset_delta = (records.last().unwrap().offset - base_offset) as i32;
let mut records_blob = Vec::new();
for r in records {
let mut rec = Vec::new();
rec.push(0i8 as u8); push_signed_varint(&mut rec, r.timestamp - base_timestamp); push_signed_varint(&mut rec, r.offset - base_offset); match &r.key {
None => push_signed_varint(&mut rec, -1),
Some(k) => {
push_signed_varint(&mut rec, k.len() as i64);
rec.extend_from_slice(k);
}
}
push_signed_varint(&mut rec, r.value.len() as i64);
rec.extend_from_slice(&r.value);
push_signed_varint(&mut rec, r.headers.len() as i64);
for (hk, hv) in &r.headers {
push_signed_varint(&mut rec, hk.len() as i64);
rec.extend_from_slice(hk.as_bytes());
push_signed_varint(&mut rec, hv.len() as i64);
rec.extend_from_slice(hv);
}
let mut framed = Vec::new();
push_signed_varint(&mut framed, rec.len() as i64);
framed.extend_from_slice(&rec);
records_blob.extend_from_slice(&framed);
}
let compressed_blob = crate::record_compression::compress(codec, &records_blob)
.expect("compression of in-memory records blob must succeed");
let mut body = Vec::new();
body.extend_from_slice(&codec.attributes_bits().to_be_bytes());
body.extend_from_slice(&last_offset_delta.to_be_bytes());
body.extend_from_slice(&base_timestamp.to_be_bytes());
body.extend_from_slice(&max_timestamp.to_be_bytes());
body.extend_from_slice(&(-1i64).to_be_bytes()); body.extend_from_slice(&(-1i16).to_be_bytes()); body.extend_from_slice(&(-1i32).to_be_bytes()); body.extend_from_slice(&(records.len() as i32).to_be_bytes());
body.extend_from_slice(&compressed_blob);
let crc = crc32c::crc32c(&body);
let mut batch = Vec::new();
batch.extend_from_slice(&base_offset.to_be_bytes());
let batch_length = 4 + 1 + 4 + body.len() as i32;
batch.extend_from_slice(&batch_length.to_be_bytes());
batch.extend_from_slice(&(-1i32).to_be_bytes()); batch.push(2); batch.extend_from_slice(&crc.to_be_bytes());
batch.extend_from_slice(&body);
batch
}
pub fn serialize_fetch_v12_response(
correlation_id: i32,
session_id: i32,
topics: &[FetchTopicResponse],
) -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&correlation_id.to_be_bytes());
push_empty_tag_buffer(&mut out);
out.extend_from_slice(&0i32.to_be_bytes()); out.extend_from_slice(&0i16.to_be_bytes()); out.extend_from_slice(&session_id.to_be_bytes());
push_unsigned_varint(&mut out, (topics.len() as u32) + 1);
for topic in topics {
push_compact_string(&mut out, &topic.topic);
push_unsigned_varint(&mut out, (topic.partitions.len() as u32) + 1);
for p in &topic.partitions {
out.extend_from_slice(&p.partition_index.to_be_bytes());
out.extend_from_slice(&p.error_code.to_be_bytes());
out.extend_from_slice(&p.high_watermark.to_be_bytes());
out.extend_from_slice(&p.high_watermark.to_be_bytes()); out.extend_from_slice(&p.log_start_offset.to_be_bytes());
push_unsigned_varint(&mut out, 1);
out.extend_from_slice(&(-1i32).to_be_bytes());
push_unsigned_varint(&mut out, (p.records.len() as u32) + 1);
out.extend_from_slice(&p.records);
push_empty_tag_buffer(&mut out);
}
push_empty_tag_buffer(&mut out);
}
push_empty_tag_buffer(&mut out);
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::partitions::KafkaMessage;
use crate::produce_codec::parse_record_batch;
fn stored_msg(offset: i64, ts: i64, key: Option<&[u8]>, value: &[u8]) -> KafkaMessage {
KafkaMessage {
offset,
timestamp: ts,
key: key.map(|k| k.to_vec()),
value: value.to_vec(),
headers: vec![],
}
}
#[test]
fn record_batch_roundtrips_through_parser() {
let msgs = [
stored_msg(10, 1_000, Some(b"k1"), b"v1"),
stored_msg(11, 1_500, None, b"v2"),
];
let refs: Vec<&KafkaMessage> = msgs.iter().collect();
let batch = serialize_record_batch_v2(&refs);
let (decoded, attrs) = parse_record_batch(&batch).expect("parse");
assert_eq!(attrs & 0x7, 0);
assert_eq!(decoded.len(), 2);
assert_eq!(decoded[0].value, b"v1");
assert_eq!(decoded[0].key.as_deref(), Some(b"k1".as_ref()));
assert_eq!(decoded[0].timestamp_ms, 1_000);
assert_eq!(decoded[1].value, b"v2");
assert!(decoded[1].key.is_none());
assert_eq!(decoded[1].timestamp_ms, 1_500);
}
#[test]
fn record_batch_crc_matches_spec() {
let msgs = [stored_msg(0, 0, None, b"x")];
let refs: Vec<&KafkaMessage> = msgs.iter().collect();
let batch = serialize_record_batch_v2(&refs);
let crc_in_batch = u32::from_be_bytes([batch[17], batch[18], batch[19], batch[20]]);
let expected = crc32c::crc32c(&batch[21..]);
assert_eq!(crc_in_batch, expected, "CRC in serialized batch must match crc32c of body");
}
#[test]
fn empty_records_serialize_to_empty_blob() {
let v: Vec<&KafkaMessage> = Vec::new();
assert!(serialize_record_batch_v2(&v).is_empty());
}
#[test]
fn compressed_record_batch_roundtrips_through_parser() {
use crate::record_compression::CompressionCodec;
let msgs = [
stored_msg(5, 100, Some(b"k"), b"value-one"),
stored_msg(6, 200, None, b"value-two-slightly-longer-to-exercise-compression"),
];
let refs: Vec<&KafkaMessage> = msgs.iter().collect();
for codec in [
CompressionCodec::Gzip,
CompressionCodec::Snappy,
CompressionCodec::Lz4,
CompressionCodec::Zstd,
] {
let batch = serialize_record_batch_v2_with_compression(&refs, codec);
let (decoded, attrs) =
parse_record_batch(&batch).unwrap_or_else(|e| panic!("parse {codec:?}: {e}"));
assert_eq!(attrs & 0x7, codec.attributes_bits(), "{codec:?}: attributes bits mismatch");
assert_eq!(decoded.len(), 2, "{codec:?}: wrong record count");
assert_eq!(decoded[0].value, b"value-one", "{codec:?}: v1 mismatch");
assert_eq!(decoded[0].key.as_deref(), Some(b"k".as_ref()), "{codec:?}: k1 mismatch");
assert_eq!(
decoded[1].value, b"value-two-slightly-longer-to-exercise-compression",
"{codec:?}: v2 mismatch"
);
}
}
#[test]
fn compressed_batch_crc_validates() {
use crate::record_compression::CompressionCodec;
let msgs = [stored_msg(0, 0, None, b"hello")];
let refs: Vec<&KafkaMessage> = msgs.iter().collect();
let batch = serialize_record_batch_v2_with_compression(&refs, CompressionCodec::Snappy);
let crc_in_batch = u32::from_be_bytes([batch[17], batch[18], batch[19], batch[20]]);
let expected = crc32c::crc32c(&batch[21..]);
assert_eq!(crc_in_batch, expected);
}
#[test]
fn fetch_v12_request_parses_single_topic() {
let mut body = Vec::new();
body.extend_from_slice(&(-1i32).to_be_bytes()); body.extend_from_slice(&500i32.to_be_bytes()); body.extend_from_slice(&1i32.to_be_bytes()); body.extend_from_slice(&1_048_576i32.to_be_bytes()); body.push(0); body.extend_from_slice(&0i32.to_be_bytes()); body.extend_from_slice(&(-1i32).to_be_bytes());
push_unsigned_varint(&mut body, 2); push_compact_string(&mut body, "orders");
push_unsigned_varint(&mut body, 2); body.extend_from_slice(&3i32.to_be_bytes()); body.extend_from_slice(&(-1i32).to_be_bytes()); body.extend_from_slice(&42i64.to_be_bytes()); body.extend_from_slice(&(-1i32).to_be_bytes()); body.extend_from_slice(&(-1i64).to_be_bytes()); body.extend_from_slice(&65_536i32.to_be_bytes()); push_empty_tag_buffer(&mut body); push_empty_tag_buffer(&mut body);
push_unsigned_varint(&mut body, 1); push_unsigned_varint(&mut body, 1); push_empty_tag_buffer(&mut body);
let parsed = parse_fetch_v12(&body).unwrap();
assert_eq!(parsed.max_wait_ms, 500);
assert_eq!(parsed.max_bytes, 1_048_576);
assert_eq!(parsed.topics.len(), 1);
assert_eq!(parsed.topics[0].topic, "orders");
assert_eq!(parsed.topics[0].partitions[0].partition_index, 3);
assert_eq!(parsed.topics[0].partitions[0].fetch_offset, 42);
assert_eq!(parsed.topics[0].partitions[0].partition_max_bytes, 65_536);
}
}