use crate::produce_codec::{
push_compact_string, push_empty_tag_buffer, push_unsigned_varint, read_compact_string,
read_i32, read_i64, read_i8, read_unsigned_varint, skip_tag_buffer,
};
#[derive(Debug, Clone)]
pub struct ListOffsetsPartitionRequest {
pub partition_index: i32,
pub timestamp: i64,
}
#[derive(Debug, Clone)]
pub struct ListOffsetsTopicRequest {
pub topic: String,
pub partitions: Vec<ListOffsetsPartitionRequest>,
}
#[derive(Debug, Clone)]
pub struct ListOffsetsRequestV7 {
pub topics: Vec<ListOffsetsTopicRequest>,
}
#[derive(Debug, Clone, Copy)]
pub struct ListOffsetsPartitionResponse {
pub partition_index: i32,
pub error_code: i16,
pub timestamp: i64,
pub offset: i64,
}
#[derive(Debug, Clone)]
pub struct ListOffsetsTopicResponse {
pub topic: String,
pub partitions: Vec<ListOffsetsPartitionResponse>,
}
pub fn parse_listoffsets_v7(body: &[u8]) -> Result<ListOffsetsRequestV7, String> {
let mut cur = body;
let _replica_id = read_i32(&mut cur)?;
let _isolation_level = read_i8(&mut cur)?;
let topics_len_plus_one = read_unsigned_varint(&mut cur)?;
if topics_len_plus_one == 0 {
return Err("listoffsets topics array is null".into());
}
let topics_len = (topics_len_plus_one - 1) as usize;
let mut topics = Vec::with_capacity(topics_len);
for _ in 0..topics_len {
let name = read_compact_string(&mut cur)?;
let parts_len_plus_one = read_unsigned_varint(&mut cur)?;
if parts_len_plus_one == 0 {
return Err(format!("listoffsets partitions array for {name} is null"));
}
let parts_len = (parts_len_plus_one - 1) as usize;
let mut partitions = Vec::with_capacity(parts_len);
for _ in 0..parts_len {
let partition_index = read_i32(&mut cur)?;
let _current_leader_epoch = read_i32(&mut cur)?;
let timestamp = read_i64(&mut cur)?;
skip_tag_buffer(&mut cur)?;
partitions.push(ListOffsetsPartitionRequest {
partition_index,
timestamp,
});
}
skip_tag_buffer(&mut cur)?;
topics.push(ListOffsetsTopicRequest {
topic: name,
partitions,
});
}
skip_tag_buffer(&mut cur)?;
Ok(ListOffsetsRequestV7 { topics })
}
pub fn serialize_listoffsets_v7_response(
correlation_id: i32,
topics: &[ListOffsetsTopicResponse],
) -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&correlation_id.to_be_bytes());
push_empty_tag_buffer(&mut out);
out.extend_from_slice(&0i32.to_be_bytes());
push_unsigned_varint(&mut out, (topics.len() as u32) + 1);
for t in topics {
push_compact_string(&mut out, &t.topic);
push_unsigned_varint(&mut out, (t.partitions.len() as u32) + 1);
for p in &t.partitions {
out.extend_from_slice(&p.partition_index.to_be_bytes());
out.extend_from_slice(&p.error_code.to_be_bytes());
out.extend_from_slice(&p.timestamp.to_be_bytes());
out.extend_from_slice(&p.offset.to_be_bytes());
out.extend_from_slice(&(-1i32).to_be_bytes());
push_empty_tag_buffer(&mut out);
}
push_empty_tag_buffer(&mut out);
}
push_empty_tag_buffer(&mut out);
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_v7_single_topic() {
let mut body = Vec::new();
body.extend_from_slice(&(-1i32).to_be_bytes()); body.push(0); push_unsigned_varint(&mut body, 2); push_compact_string(&mut body, "t");
push_unsigned_varint(&mut body, 2); body.extend_from_slice(&0i32.to_be_bytes()); body.extend_from_slice(&(-1i32).to_be_bytes()); body.extend_from_slice(&(-2i64).to_be_bytes()); push_empty_tag_buffer(&mut body);
push_empty_tag_buffer(&mut body);
push_empty_tag_buffer(&mut body);
let parsed = parse_listoffsets_v7(&body).unwrap();
assert_eq!(parsed.topics.len(), 1);
assert_eq!(parsed.topics[0].topic, "t");
assert_eq!(parsed.topics[0].partitions[0].partition_index, 0);
assert_eq!(parsed.topics[0].partitions[0].timestamp, -2);
}
#[test]
fn response_v7_layout_is_sane() {
let resp = serialize_listoffsets_v7_response(
7,
&[ListOffsetsTopicResponse {
topic: "t".into(),
partitions: vec![ListOffsetsPartitionResponse {
partition_index: 0,
error_code: 0,
timestamp: -1,
offset: 42,
}],
}],
);
assert_eq!(&resp[0..4], &7i32.to_be_bytes());
assert_eq!(resp[4], 0);
assert_eq!(&resp[5..9], &0i32.to_be_bytes());
assert_eq!(resp[9], 2);
assert_eq!(resp[10], 2);
assert_eq!(resp[11], b't');
assert_eq!(resp[12], 2);
assert_eq!(&resp[13..17], &0i32.to_be_bytes());
assert_eq!(&resp[17..19], &0i16.to_be_bytes());
assert_eq!(&resp[19..27], &(-1i64).to_be_bytes());
assert_eq!(&resp[27..35], &42i64.to_be_bytes());
assert_eq!(&resp[35..39], &(-1i32).to_be_bytes());
assert_eq!(resp[39], 0);
assert_eq!(resp[40], 0);
assert_eq!(resp[41], 0);
assert_eq!(resp.len(), 42);
}
}