use crate::produce_codec::{
parse_record_batch, PartitionProduceData, ProduceRequestV9, TopicProduceData,
TopicProduceResult,
};
fn take<'a>(buf: &mut &'a [u8], n: usize) -> Result<&'a [u8], String> {
if buf.len() < n {
return Err(format!("short read: wanted {n}, have {}", buf.len()));
}
let (head, tail) = buf.split_at(n);
*buf = tail;
Ok(head)
}
fn read_i16(buf: &mut &[u8]) -> Result<i16, String> {
let b = take(buf, 2)?;
Ok(i16::from_be_bytes([b[0], b[1]]))
}
fn read_i32(buf: &mut &[u8]) -> Result<i32, String> {
let b = take(buf, 4)?;
Ok(i32::from_be_bytes([b[0], b[1], b[2], b[3]]))
}
fn read_nullable_string(buf: &mut &[u8]) -> Result<Option<String>, String> {
let len = read_i16(buf)?;
if len < 0 {
return Ok(None);
}
let bytes = take(buf, len as usize)?;
String::from_utf8(bytes.to_vec())
.map(Some)
.map_err(|e| format!("invalid utf8: {e}"))
}
fn read_string(buf: &mut &[u8]) -> Result<String, String> {
read_nullable_string(buf)?.ok_or_else(|| "expected non-null STRING, got null".into())
}
fn read_nullable_bytes<'a>(buf: &mut &'a [u8]) -> Result<Option<&'a [u8]>, String> {
let len = read_i32(buf)?;
if len < 0 {
return Ok(None);
}
Ok(Some(take(buf, len as usize)?))
}
fn push_string(out: &mut Vec<u8>, s: &str) {
out.extend_from_slice(&(s.len() as i16).to_be_bytes());
out.extend_from_slice(s.as_bytes());
}
pub fn parse_produce_v3_v8(body: &[u8]) -> Result<ProduceRequestV9, String> {
let mut cur = body;
let transactional_id = read_nullable_string(&mut cur)?;
let acks = read_i16(&mut cur)?;
let timeout_ms = read_i32(&mut cur)?;
let topics_count = read_i32(&mut cur)?;
if topics_count < 0 {
return Err(format!("produce topics count is negative: {topics_count}"));
}
let mut topics = Vec::with_capacity(topics_count as usize);
for _ in 0..topics_count {
let name = read_string(&mut cur)?;
let parts_count = read_i32(&mut cur)?;
if parts_count < 0 {
return Err(format!("produce partitions count for {name} is negative"));
}
let mut parts = Vec::with_capacity(parts_count as usize);
for _ in 0..parts_count {
let partition_index = read_i32(&mut cur)?;
let records_bytes = read_nullable_bytes(&mut cur)?;
let (records, attributes) = match records_bytes {
None => (Vec::new(), 0i16),
Some(bytes) => parse_record_batch(bytes)?,
};
parts.push(PartitionProduceData {
partition_index,
records,
compression_codec: (attributes & 0x7) as i8,
});
}
topics.push(TopicProduceData {
name,
partitions: parts,
});
}
Ok(ProduceRequestV9 {
transactional_id,
acks,
timeout_ms,
topics,
})
}
pub fn serialize_produce_v3_v8_response(
correlation_id: i32,
api_version: i16,
results: &[TopicProduceResult],
) -> Vec<u8> {
debug_assert!(
(3..=8).contains(&api_version),
"serialize_produce_v3_v8_response called with api_version {api_version}"
);
let has_log_start_offset = api_version >= 5;
let has_record_errors = api_version >= 8;
let mut out = Vec::new();
out.extend_from_slice(&correlation_id.to_be_bytes());
out.extend_from_slice(&(results.len() as i32).to_be_bytes());
for topic in results {
push_string(&mut out, &topic.name);
out.extend_from_slice(&(topic.partitions.len() as i32).to_be_bytes());
for p in &topic.partitions {
out.extend_from_slice(&p.partition_index.to_be_bytes());
out.extend_from_slice(&p.error_code.to_be_bytes());
out.extend_from_slice(&p.base_offset.to_be_bytes());
out.extend_from_slice(&p.log_append_time_ms.to_be_bytes());
if has_log_start_offset {
out.extend_from_slice(&p.log_start_offset.to_be_bytes());
}
if has_record_errors {
out.extend_from_slice(&0i32.to_be_bytes());
out.extend_from_slice(&(-1i16).to_be_bytes());
}
}
}
out.extend_from_slice(&0i32.to_be_bytes());
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::produce_codec::{one_record_batch_for_testing, PartitionProduceResult};
fn build_request_body(topic: &str, partition: i32, batch: &[u8]) -> Vec<u8> {
let mut body = Vec::new();
body.extend_from_slice(&(-1i16).to_be_bytes());
body.extend_from_slice(&(-1i16).to_be_bytes());
body.extend_from_slice(&30_000i32.to_be_bytes());
body.extend_from_slice(&1i32.to_be_bytes());
push_string(&mut body, topic);
body.extend_from_slice(&1i32.to_be_bytes());
body.extend_from_slice(&partition.to_be_bytes());
body.extend_from_slice(&(batch.len() as i32).to_be_bytes());
body.extend_from_slice(batch);
body
}
#[test]
fn parses_minimal_produce_v7_request() {
let batch = one_record_batch_for_testing(Some(b"k"), b"hello");
let body = build_request_body("orders", 2, &batch);
let parsed = parse_produce_v3_v8(&body).unwrap();
assert_eq!(parsed.transactional_id, None);
assert_eq!(parsed.acks, -1);
assert_eq!(parsed.timeout_ms, 30_000);
assert_eq!(parsed.topics.len(), 1);
assert_eq!(parsed.topics[0].name, "orders");
assert_eq!(parsed.topics[0].partitions.len(), 1);
assert_eq!(parsed.topics[0].partitions[0].partition_index, 2);
assert_eq!(parsed.topics[0].partitions[0].records.len(), 1);
assert_eq!(parsed.topics[0].partitions[0].records[0].value, b"hello");
assert_eq!(parsed.topics[0].partitions[0].compression_codec, 0);
}
#[test]
fn parses_produce_v3_v8_with_null_records() {
let mut body = Vec::new();
body.extend_from_slice(&(-1i16).to_be_bytes()); body.extend_from_slice(&1i16.to_be_bytes()); body.extend_from_slice(&5_000i32.to_be_bytes()); body.extend_from_slice(&1i32.to_be_bytes()); push_string(&mut body, "t");
body.extend_from_slice(&1i32.to_be_bytes()); body.extend_from_slice(&0i32.to_be_bytes()); body.extend_from_slice(&(-1i32).to_be_bytes());
let parsed = parse_produce_v3_v8(&body).unwrap();
assert_eq!(parsed.acks, 1);
assert!(parsed.topics[0].partitions[0].records.is_empty());
}
fn one_result(log_start_offset: i64) -> Vec<TopicProduceResult> {
vec![TopicProduceResult {
name: "orders".to_string(),
partitions: vec![PartitionProduceResult {
partition_index: 2,
error_code: 0,
base_offset: 41,
log_append_time_ms: -1,
log_start_offset,
}],
}]
}
#[test]
fn response_v3_shape() {
let data = serialize_produce_v3_v8_response(99, 3, &one_result(0));
assert_eq!(&data[0..4], &99i32.to_be_bytes());
assert_eq!(&data[4..8], &1i32.to_be_bytes());
assert_eq!(&data[8..10], &6i16.to_be_bytes());
assert_eq!(&data[10..16], b"orders");
assert_eq!(&data[16..20], &1i32.to_be_bytes());
assert_eq!(&data[20..24], &2i32.to_be_bytes());
assert_eq!(&data[24..26], &0i16.to_be_bytes());
assert_eq!(&data[26..34], &41i64.to_be_bytes());
assert_eq!(&data[34..42], &(-1i64).to_be_bytes());
assert_eq!(&data[42..46], &0i32.to_be_bytes());
assert_eq!(data.len(), 46);
}
#[test]
fn response_v7_adds_log_start_offset() {
let data = serialize_produce_v3_v8_response(7, 7, &one_result(0));
assert_eq!(data.len(), 54);
assert_eq!(&data[42..50], &0i64.to_be_bytes());
}
#[test]
fn response_v8_adds_record_errors_and_error_message() {
let data = serialize_produce_v3_v8_response(123, 8, &one_result(0));
assert_eq!(data.len(), 60);
assert_eq!(&data[50..54], &0i32.to_be_bytes());
assert_eq!(&data[54..56], &(-1i16).to_be_bytes());
}
#[test]
fn request_parser_roundtrips_through_serializer() {
let batch = one_record_batch_for_testing(None, b"v");
let body = build_request_body("t", 0, &batch);
let parsed = parse_produce_v3_v8(&body).unwrap();
let results = vec![TopicProduceResult {
name: parsed.topics[0].name.clone(),
partitions: vec![PartitionProduceResult {
partition_index: parsed.topics[0].partitions[0].partition_index,
error_code: 0,
base_offset: 0,
log_append_time_ms: 1_700_000_000_000,
log_start_offset: 0,
}],
}];
let resp = serialize_produce_v3_v8_response(42, 7, &results);
assert_eq!(&resp[0..4], &42i32.to_be_bytes());
}
}