use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode};
use crate::error::{ErrorCode, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
use crate::util::varint::decode_unsigned_varint;
#[derive(Debug, Clone)]
pub struct DescribeQuorumPartitionRequest {
pub partition_index: i32,
}
#[derive(Debug, Clone)]
pub struct DescribeQuorumTopicRequest {
pub topic_name: String,
pub partitions: Vec<DescribeQuorumPartitionRequest>,
}
#[derive(Debug, Clone)]
pub struct DescribeQuorumRequest {
pub topics: Vec<DescribeQuorumTopicRequest>,
}
impl DescribeQuorumRequest {
pub fn api_key() -> ApiKey {
ApiKey::DescribeQuorum
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.topics.len(), buf)?;
for topic in &self.topics {
KafkaString::new(&topic.topic_name).try_encode_compact(buf)?;
encode_compact_array_len(topic.partitions.len(), buf)?;
for partition in &topic.partitions {
partition.partition_index.encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl VersionedEncode for DescribeQuorumRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf),
_ => unsupported_encode!("DescribeQuorumRequest", version),
}
}
}
#[derive(Debug, Clone)]
pub struct QuorumReplicaState {
pub replica_id: i32,
pub log_end_offset: i64,
}
#[derive(Debug, Clone)]
pub struct DescribeQuorumPartitionResponse {
pub partition_index: i32,
pub error_code: ErrorCode,
pub leader_id: i32,
pub leader_epoch: i32,
pub high_watermark: i64,
pub current_voters: Vec<QuorumReplicaState>,
pub observers: Vec<QuorumReplicaState>,
}
#[derive(Debug, Clone)]
pub struct DescribeQuorumTopicResponse {
pub topic_name: String,
pub partitions: Vec<DescribeQuorumPartitionResponse>,
}
#[derive(Debug, Clone)]
pub struct DescribeQuorumResponse {
pub error_code: ErrorCode,
pub topics: Vec<DescribeQuorumTopicResponse>,
}
impl DescribeQuorumResponse {
fn decode_replica_states_v0(buf: &mut impl Buf) -> Result<Vec<QuorumReplicaState>> {
let count = check_compact_array_len(decode_unsigned_varint(buf)?)? as usize;
let mut states = Vec::with_capacity(count);
for _ in 0..count {
let replica_id = i32::decode(buf)?;
let log_end_offset = i64::decode(buf)?;
TaggedFields::decode(buf)?;
states.push(QuorumReplicaState {
replica_id,
log_end_offset,
});
}
Ok(states)
}
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from(i16::decode(buf)?);
let topic_count = check_compact_array_len(decode_unsigned_varint(buf)?)? as usize;
let mut topics = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic_name = {
let len = decode_unsigned_varint(buf)? as usize;
if len < 1 {
return Err(crate::error::KrafkaError::protocol(
"compact string length 0 is null but field is non-nullable",
));
}
let str_len = len - 1;
if buf.remaining() < str_len {
return Err(crate::error::KrafkaError::protocol(
"not enough bytes for compact string",
));
}
let bytes = buf.copy_to_bytes(str_len);
String::from_utf8(bytes.to_vec()).map_err(|e| {
crate::error::KrafkaError::protocol(format!("invalid UTF-8: {e}"))
})?
};
let partition_count = check_compact_array_len(decode_unsigned_varint(buf)?)? as usize;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let partition_error_code = ErrorCode::from(i16::decode(buf)?);
let leader_id = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let high_watermark = i64::decode(buf)?;
let current_voters = Self::decode_replica_states_v0(buf)?;
let observers = Self::decode_replica_states_v0(buf)?;
TaggedFields::decode(buf)?;
partitions.push(DescribeQuorumPartitionResponse {
partition_index,
error_code: partition_error_code,
leader_id,
leader_epoch,
high_watermark,
current_voters,
observers,
});
}
TaggedFields::decode(buf)?;
topics.push(DescribeQuorumTopicResponse {
topic_name,
partitions,
});
}
TaggedFields::decode(buf)?;
Ok(Self { error_code, topics })
}
}
impl VersionedDecode for DescribeQuorumResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
_ => unsupported_decode!("DescribeQuorumResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
#[test]
fn describe_quorum_request_encode_v0() {
let request = DescribeQuorumRequest {
topics: vec![DescribeQuorumTopicRequest {
topic_name: "__cluster_metadata".to_string(),
partitions: vec![DescribeQuorumPartitionRequest { partition_index: 0 }],
}],
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn describe_quorum_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i16(0);
buf.put_u8(2);
let name = b"__cluster_metadata";
buf.put_u8((name.len() + 1) as u8);
buf.put_slice(name);
buf.put_u8(2);
buf.put_i32(0);
buf.put_i16(0);
buf.put_i32(1);
buf.put_i32(5);
buf.put_i64(100);
buf.put_u8(3);
buf.put_i32(1);
buf.put_i64(100);
buf.put_u8(0); buf.put_i32(2);
buf.put_i64(98);
buf.put_u8(0); buf.put_u8(2);
buf.put_i32(3);
buf.put_i64(95);
buf.put_u8(0); buf.put_u8(0);
buf.put_u8(0);
buf.put_u8(0);
let mut read_buf = buf.freeze();
let response = DescribeQuorumResponse::decode_v0(&mut read_buf).unwrap();
assert!(response.error_code.is_ok());
assert_eq!(response.topics.len(), 1);
assert_eq!(response.topics[0].topic_name, "__cluster_metadata");
let partition = &response.topics[0].partitions[0];
assert_eq!(partition.partition_index, 0);
assert!(partition.error_code.is_ok());
assert_eq!(partition.leader_id, 1);
assert_eq!(partition.leader_epoch, 5);
assert_eq!(partition.high_watermark, 100);
assert_eq!(partition.current_voters.len(), 2);
assert_eq!(partition.current_voters[0].replica_id, 1);
assert_eq!(partition.current_voters[0].log_end_offset, 100);
assert_eq!(partition.current_voters[1].replica_id, 2);
assert_eq!(partition.current_voters[1].log_end_offset, 98);
assert_eq!(partition.observers.len(), 1);
assert_eq!(partition.observers[0].replica_id, 3);
assert_eq!(partition.observers[0].log_end_offset, 95);
}
#[test]
fn describe_quorum_request_roundtrip_v0() {
let request = DescribeQuorumRequest {
topics: vec![DescribeQuorumTopicRequest {
topic_name: "__cluster_metadata".to_string(),
partitions: vec![
DescribeQuorumPartitionRequest { partition_index: 0 },
DescribeQuorumPartitionRequest { partition_index: 1 },
],
}],
};
let mut buf = BytesMut::new();
request.encode_versioned(0, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn describe_quorum_response_empty_topics() {
let mut buf = BytesMut::new();
buf.put_i16(0);
buf.put_u8(1);
buf.put_u8(0);
let mut read_buf = buf.freeze();
let response = DescribeQuorumResponse::decode_v0(&mut read_buf).unwrap();
assert!(response.topics.is_empty());
}
#[test]
fn describe_quorum_versioned_encode_dispatch() {
let request = DescribeQuorumRequest { topics: vec![] };
let mut buf = BytesMut::new();
request.encode_versioned(0, &mut buf).unwrap();
let mut buf2 = BytesMut::new();
assert!(request.encode_versioned(99, &mut buf2).is_err());
}
#[test]
fn describe_quorum_versioned_decode_dispatch() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_u8(1); buf.put_u8(0);
let mut read_buf = buf.freeze();
DescribeQuorumResponse::decode_versioned(0, &mut read_buf).unwrap();
let mut empty = BytesMut::new().freeze();
assert!(DescribeQuorumResponse::decode_versioned(99, &mut empty).is_err());
}
#[test]
fn describe_quorum_response_with_error() {
let mut buf = BytesMut::new();
buf.put_i16(-1);
buf.put_u8(1);
buf.put_u8(0);
let mut read_buf = buf.freeze();
let response = DescribeQuorumResponse::decode_v0(&mut read_buf).unwrap();
assert!(!response.error_code.is_ok());
}
#[test]
fn describe_quorum_response_empty_voters_and_observers() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_u8(2); let name = b"t";
buf.put_u8((name.len() + 1) as u8);
buf.put_slice(name);
buf.put_u8(2); buf.put_i32(0); buf.put_i16(0); buf.put_i32(1); buf.put_i32(0); buf.put_i64(0); buf.put_u8(1); buf.put_u8(1); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let mut read_buf = buf.freeze();
let response = DescribeQuorumResponse::decode_v0(&mut read_buf).unwrap();
let partition = &response.topics[0].partitions[0];
assert!(partition.current_voters.is_empty());
assert!(partition.observers.is_empty());
}
}