use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::check_compact_array_len;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
#[derive(Debug, Clone)]
pub struct DescribeClusterRequest {
pub include_cluster_authorized_operations: bool,
pub endpoint_type: i8,
pub include_fenced_brokers: bool,
}
impl Default for DescribeClusterRequest {
fn default() -> Self {
Self {
include_cluster_authorized_operations: false,
endpoint_type: 1,
include_fenced_brokers: false,
}
}
}
impl DescribeClusterRequest {
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_u8(u8::from(self.include_cluster_authorized_operations));
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_u8(u8::from(self.include_cluster_authorized_operations));
self.endpoint_type.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_u8(u8::from(self.include_cluster_authorized_operations));
self.endpoint_type.encode(buf);
buf.put_u8(u8::from(self.include_fenced_brokers));
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DescribeClusterResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub endpoint_type: i8,
pub cluster_id: String,
pub controller_id: i32,
pub brokers: Vec<DescribeClusterBroker>,
pub cluster_authorized_operations: i32,
}
#[derive(Debug, Clone)]
pub struct DescribeClusterBroker {
pub broker_id: i32,
pub host: String,
pub port: i32,
pub rack: Option<String>,
pub is_fenced: bool,
}
impl DescribeClusterResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let cluster_id = non_nullable_string("cluster_id", KafkaString::decode_compact(buf)?.0)?;
let controller_id = i32::decode(buf)?;
let broker_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut brokers = Vec::with_capacity(broker_count);
for _ in 0..broker_count {
let broker_id = i32::decode(buf)?;
let host = non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
brokers.push(DescribeClusterBroker {
broker_id,
host,
port,
rack,
is_fenced: false,
});
}
let cluster_authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
endpoint_type: 1,
cluster_id,
controller_id,
brokers,
cluster_authorized_operations,
})
}
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let endpoint_type = i8::decode(buf)?;
let cluster_id = non_nullable_string("cluster_id", KafkaString::decode_compact(buf)?.0)?;
let controller_id = i32::decode(buf)?;
let broker_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut brokers = Vec::with_capacity(broker_count);
for _ in 0..broker_count {
let broker_id = i32::decode(buf)?;
let host = non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
brokers.push(DescribeClusterBroker {
broker_id,
host,
port,
rack,
is_fenced: false,
});
}
let cluster_authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
endpoint_type,
cluster_id,
controller_id,
brokers,
cluster_authorized_operations,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let endpoint_type = i8::decode(buf)?;
let cluster_id = non_nullable_string("cluster_id", KafkaString::decode_compact(buf)?.0)?;
let controller_id = i32::decode(buf)?;
let broker_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut brokers = Vec::with_capacity(broker_count);
for _ in 0..broker_count {
let broker_id = i32::decode(buf)?;
let host = non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode_compact(buf)?.0;
let is_fenced = i8::decode(buf)? != 0;
let _ = TaggedFields::decode(buf)?;
brokers.push(DescribeClusterBroker {
broker_id,
host,
port,
rack,
is_fenced,
});
}
let cluster_authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
endpoint_type,
cluster_id,
controller_id,
brokers,
cluster_authorized_operations,
})
}
}
impl VersionedEncode for DescribeClusterRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
1 => self.encode_v1(buf)?,
2 => self.encode_v2(buf)?,
_ => return unsupported_encode!("DescribeClusterRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DescribeClusterResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("DescribeClusterResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
fn put_compact_string(buf: &mut BytesMut, s: Option<&str>) {
match s {
Some(val) => {
buf.put_u8((val.len() + 1) as u8);
buf.put_slice(val.as_bytes());
}
None => buf.put_u8(0),
}
}
fn put_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[test]
fn test_describe_cluster_request_encode_v0() {
let req = DescribeClusterRequest {
include_cluster_authorized_operations: true,
endpoint_type: 1,
include_fenced_brokers: false,
};
let mut buf = BytesMut::new();
req.encode_v0(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(cur.get_u8(), 1); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_describe_cluster_request_encode_v1_endpoint_type() {
let req = DescribeClusterRequest {
include_cluster_authorized_operations: false,
endpoint_type: 2,
include_fenced_brokers: false,
};
let mut buf = BytesMut::new();
req.encode_v1(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_i8(), 2); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_describe_cluster_request_encode_v2_fenced() {
let req = DescribeClusterRequest {
include_cluster_authorized_operations: true,
endpoint_type: 1,
include_fenced_brokers: true,
};
let mut buf = BytesMut::new();
req.encode_v2(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(cur.get_u8(), 1);
assert_eq!(cur.get_i8(), 1);
assert_eq!(cur.get_u8(), 1); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_describe_cluster_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(10); buf.put_i16(0); put_compact_string(&mut buf, None); put_compact_string(&mut buf, Some("cluster-1")); buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0); put_compact_string(&mut buf, Some("host-0")); buf.put_i32(9092); put_compact_string(&mut buf, Some("rack-a")); put_tagged_fields(&mut buf); buf.put_i32(-2_147_483_648); put_tagged_fields(&mut buf);
let resp = DescribeClusterResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 10);
assert!(resp.error_code.is_ok());
assert_eq!(resp.cluster_id, "cluster-1");
assert_eq!(resp.controller_id, 0);
assert_eq!(resp.brokers.len(), 1);
assert_eq!(resp.brokers[0].broker_id, 0);
assert_eq!(resp.brokers[0].host, "host-0");
assert_eq!(resp.brokers[0].port, 9092);
assert_eq!(resp.brokers[0].rack.as_deref(), Some("rack-a"));
assert!(!resp.brokers[0].is_fenced); assert_eq!(resp.endpoint_type, 1); }
#[test]
fn test_describe_cluster_response_decode_v1_endpoint_type() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); put_compact_string(&mut buf, None); buf.put_i8(2); put_compact_string(&mut buf, Some("c")); buf.put_i32(1); varint::encode_unsigned_varint(1, &mut buf); buf.put_i32(0); put_tagged_fields(&mut buf);
let resp = DescribeClusterResponse::decode_v1(&mut buf.freeze()).unwrap();
assert_eq!(resp.endpoint_type, 2);
assert_eq!(resp.cluster_id, "c");
assert!(resp.brokers.is_empty());
}
#[test]
fn test_describe_cluster_response_decode_v2_is_fenced() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); put_compact_string(&mut buf, None); buf.put_i8(1); put_compact_string(&mut buf, Some("c")); buf.put_i32(0); varint::encode_unsigned_varint(3, &mut buf); buf.put_i32(0);
put_compact_string(&mut buf, Some("h0"));
buf.put_i32(9092);
put_compact_string(&mut buf, None); buf.put_i8(0); put_tagged_fields(&mut buf);
buf.put_i32(1);
put_compact_string(&mut buf, Some("h1"));
buf.put_i32(9093);
put_compact_string(&mut buf, None);
buf.put_i8(1); put_tagged_fields(&mut buf);
buf.put_i32(0); put_tagged_fields(&mut buf);
let resp = DescribeClusterResponse::decode_v2(&mut buf.freeze()).unwrap();
assert_eq!(resp.brokers.len(), 2);
assert!(!resp.brokers[0].is_fenced);
assert!(resp.brokers[1].is_fenced);
}
}