use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
#[derive(Debug, Clone)]
pub struct DescribeTopicPartitionsCursor {
pub topic_name: String,
pub partition_index: i32,
}
#[derive(Debug, Clone)]
pub struct DescribeTopicPartitionsRequest {
pub topics: Vec<String>,
pub response_partition_limit: i32,
pub cursor: Option<DescribeTopicPartitionsCursor>,
}
impl DescribeTopicPartitionsRequest {
pub fn new(topics: Vec<String>) -> Self {
Self {
topics,
response_partition_limit: 2000,
cursor: None,
}
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topics.len(), buf)?;
for topic in &self.topics {
KafkaString::new(topic).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
}
self.response_partition_limit.encode(buf);
match &self.cursor {
None => {
buf.put_u8(0xFF);
}
Some(c) => {
KafkaString::new(&c.topic_name).try_encode_compact(buf)?;
c.partition_index.encode(buf);
TaggedFields::default().try_encode(buf)?;
}
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DescribeTopicPartitionsPartition {
pub error_code: ErrorCode,
pub partition_index: i32,
pub leader_id: i32,
pub leader_epoch: i32,
pub replica_nodes: Vec<i32>,
pub isr_nodes: Vec<i32>,
pub eligible_leader_replicas: Option<Vec<i32>>,
pub last_known_elr: Option<Vec<i32>>,
pub offline_replicas: Vec<i32>,
}
#[derive(Debug, Clone)]
pub struct DescribeTopicPartitionsTopic {
pub error_code: ErrorCode,
pub name: Option<String>,
pub topic_id: [u8; 16],
pub is_internal: bool,
pub partitions: Vec<DescribeTopicPartitionsPartition>,
pub topic_authorized_operations: i32,
}
#[derive(Debug, Clone)]
pub struct DescribeTopicPartitionsResponse {
pub throttle_time_ms: i32,
pub topics: Vec<DescribeTopicPartitionsTopic>,
pub next_cursor: Option<DescribeTopicPartitionsCursor>,
}
impl DescribeTopicPartitionsResponse {
fn decode_compact_nullable_i32_array(buf: &mut impl Buf) -> Result<Option<Vec<i32>>> {
let raw = crate::util::varint::decode_unsigned_varint(buf)?;
if raw == 0 {
return Ok(None);
}
let count = check_compact_array_len(raw)?;
let mut arr = Vec::with_capacity(count);
for _ in 0..count {
arr.push(i32::decode(buf)?);
}
Ok(Some(arr))
}
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode_compact(buf)?.0;
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(KrafkaError::protocol("short buf for topic_id"));
}
buf.copy_to_slice(&mut topic_id);
let is_internal = i8::decode(buf)? != 0;
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(part_count);
for _ in 0..part_count {
let p_error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition_index = i32::decode(buf)?;
let leader_id = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let replica_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut replica_nodes = Vec::with_capacity(replica_count);
for _ in 0..replica_count {
replica_nodes.push(i32::decode(buf)?);
}
let isr_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut isr_nodes = Vec::with_capacity(isr_count);
for _ in 0..isr_count {
isr_nodes.push(i32::decode(buf)?);
}
let eligible_leader_replicas = Self::decode_compact_nullable_i32_array(buf)?;
let last_known_elr = Self::decode_compact_nullable_i32_array(buf)?;
let offline_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut offline_replicas = Vec::with_capacity(offline_count);
for _ in 0..offline_count {
offline_replicas.push(i32::decode(buf)?);
}
let _ = TaggedFields::decode(buf)?;
partitions.push(DescribeTopicPartitionsPartition {
error_code: p_error_code,
partition_index,
leader_id,
leader_epoch,
replica_nodes,
isr_nodes,
eligible_leader_replicas,
last_known_elr,
offline_replicas,
});
}
let topic_authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
topics.push(DescribeTopicPartitionsTopic {
error_code,
name,
topic_id,
is_internal,
partitions,
topic_authorized_operations,
});
}
let next_cursor = if buf.remaining() > 0 {
let marker = buf.chunk()[0];
if marker == 0xFF {
buf.advance(1);
let _ = TaggedFields::decode(buf)?;
None
} else {
let topic_name =
non_nullable_string("cursor topic", KafkaString::decode_compact(buf)?.0)?;
let partition_index = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Some(DescribeTopicPartitionsCursor {
topic_name,
partition_index,
})
}
} else {
None
};
Ok(Self {
throttle_time_ms,
topics,
next_cursor,
})
}
}
impl VersionedEncode for DescribeTopicPartitionsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
_ => return unsupported_encode!("DescribeTopicPartitionsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DescribeTopicPartitionsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
_ => unsupported_decode!("DescribeTopicPartitionsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
fn put_compact_string(buf: &mut BytesMut, s: Option<&str>) {
match s {
Some(val) => {
buf.put_u8((val.len() + 1) as u8);
buf.put_slice(val.as_bytes());
}
None => buf.put_u8(0),
}
}
fn put_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[test]
fn test_describe_topic_partitions_request_encode_v0() {
let req = DescribeTopicPartitionsRequest {
topics: vec!["t1".to_string()],
response_partition_limit: 500,
cursor: None,
};
let mut buf = BytesMut::new();
req.encode_v0(&mut buf).unwrap();
let mut cur = &buf[..];
let arr = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(arr, 2); let name_v = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(name_v, 3); let mut name = vec![0u8; 2];
cur.copy_to_slice(&mut name);
assert_eq!(name, b"t1");
assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_i32(), 500); assert_eq!(cur.get_u8(), 0xFF); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_describe_topic_partitions_request_encode_v0_with_cursor() {
let req = DescribeTopicPartitionsRequest {
topics: vec!["t1".to_string()],
response_partition_limit: 100,
cursor: Some(DescribeTopicPartitionsCursor {
topic_name: "t1".to_string(),
partition_index: 5,
}),
};
let mut buf = BytesMut::new();
req.encode_v0(&mut buf).unwrap();
let mut cur = &buf[..];
let _ = varint::decode_unsigned_varint(&mut cur).unwrap(); let _ = varint::decode_unsigned_varint(&mut cur).unwrap(); cur.advance(2); assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_i32(), 100); let cursor_name_v = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(cursor_name_v, 3); cur.advance(2); assert_eq!(cur.get_i32(), 5); assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_describe_topic_partitions_response_decode_v0_null_cursor() {
let mut buf = BytesMut::new();
buf.put_i32(15); varint::encode_unsigned_varint(2, &mut buf); buf.put_i16(0); put_compact_string(&mut buf, Some("tp")); buf.put_slice(&[0u8; 16]); buf.put_i8(0); varint::encode_unsigned_varint(2, &mut buf); buf.put_i16(0); buf.put_i32(0); buf.put_i32(0); buf.put_i32(1); varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0);
varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(1, &mut buf); put_tagged_fields(&mut buf); buf.put_i32(0); put_tagged_fields(&mut buf); buf.put_u8(0xFF); put_tagged_fields(&mut buf);
let resp = DescribeTopicPartitionsResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 15);
assert_eq!(resp.topics.len(), 1);
let t = &resp.topics[0];
assert!(t.error_code.is_ok());
assert_eq!(t.name.as_deref(), Some("tp"));
assert!(!t.is_internal);
assert_eq!(t.partitions.len(), 1);
let p = &t.partitions[0];
assert_eq!(p.partition_index, 0);
assert_eq!(p.leader_id, 0);
assert_eq!(p.leader_epoch, 1);
assert_eq!(p.replica_nodes, vec![0]);
assert_eq!(p.isr_nodes, vec![0]);
assert!(p.eligible_leader_replicas.is_none());
assert!(p.last_known_elr.is_none());
assert!(p.offline_replicas.is_empty());
assert!(resp.next_cursor.is_none());
}
#[test]
fn test_describe_topic_partitions_response_decode_v0_with_cursor() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(1, &mut buf); put_compact_string(&mut buf, Some("next")); buf.put_i32(10); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf);
let resp = DescribeTopicPartitionsResponse::decode_v0(&mut buf.freeze()).unwrap();
assert!(resp.topics.is_empty());
assert!(resp.next_cursor.is_some());
let cursor = resp.next_cursor.as_ref().unwrap();
assert_eq!(cursor.topic_name, "next");
assert_eq!(cursor.partition_index, 10);
}
}