use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_decode_array_len, encode_compact_array_len,
};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConfigResourceType {
Unknown = 0,
Topic = 2,
Broker = 4,
BrokerLogger = 8,
}
impl ConfigResourceType {
#[inline]
pub fn from_i8(value: i8) -> Self {
match value {
2 => Self::Topic,
4 => Self::Broker,
8 => Self::BrokerLogger,
_ => Self::Unknown,
}
}
#[inline]
pub fn to_i8(self) -> i8 {
self as i8
}
}
#[derive(Debug, Clone)]
pub struct DescribeConfigsRequest {
pub resources: Vec<DescribeConfigsResource>,
pub include_synonyms: bool,
pub include_documentation: bool,
}
#[derive(Debug, Clone)]
pub struct DescribeConfigsResource {
pub resource_type: ConfigResourceType,
pub resource_name: String,
pub config_names: Option<Vec<String>>,
}
impl DescribeConfigsRequest {
pub fn for_topic(topic: impl Into<String>) -> Self {
Self {
resources: vec![DescribeConfigsResource {
resource_type: ConfigResourceType::Topic,
resource_name: topic.into(),
config_names: None,
}],
include_synonyms: false,
include_documentation: false,
}
}
pub fn for_broker(broker_id: i32) -> Self {
Self {
resources: vec![DescribeConfigsResource {
resource_type: ConfigResourceType::Broker,
resource_name: broker_id.to_string(),
config_names: None,
}],
include_synonyms: false,
include_documentation: false,
}
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
array_len_i32(self.resources.len())?.encode(buf);
for resource in &self.resources {
resource.resource_type.to_i8().encode(buf);
KafkaString::new(&resource.resource_name).try_encode(buf)?;
match &resource.config_names {
None => (-1i32).encode(buf),
Some(names) => {
array_len_i32(names.len())?.encode(buf);
for name in names {
KafkaString::new(name).try_encode(buf)?;
}
}
}
}
Ok(())
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
array_len_i32(self.resources.len())?.encode(buf);
for resource in &self.resources {
resource.resource_type.to_i8().encode(buf);
KafkaString::new(&resource.resource_name).try_encode(buf)?;
match &resource.config_names {
None => (-1i32).encode(buf),
Some(names) => {
array_len_i32(names.len())?.encode(buf);
for name in names {
KafkaString::new(name).try_encode(buf)?;
}
}
}
}
buf.put_u8(u8::from(self.include_synonyms));
Ok(())
}
pub fn encode_v3(&self, buf: &mut impl BufMut) -> Result<()> {
array_len_i32(self.resources.len())?.encode(buf);
for resource in &self.resources {
resource.resource_type.to_i8().encode(buf);
KafkaString::new(&resource.resource_name).try_encode(buf)?;
match &resource.config_names {
None => (-1i32).encode(buf),
Some(names) => {
array_len_i32(names.len())?.encode(buf);
for name in names {
KafkaString::new(name).try_encode(buf)?;
}
}
}
}
buf.put_u8(u8::from(self.include_synonyms));
buf.put_u8(u8::from(self.include_documentation));
Ok(())
}
pub fn encode_v4(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.resources.len(), buf)?;
for resource in &self.resources {
resource.resource_type.to_i8().encode(buf);
KafkaString::new(&resource.resource_name).try_encode_compact(buf)?;
match &resource.config_names {
None => {
crate::util::varint::encode_unsigned_varint(0, buf);
}
Some(names) => {
encode_compact_array_len(names.len(), buf)?;
for name in names {
KafkaString::new(name).try_encode_compact(buf)?;
}
}
}
TaggedFields::default().try_encode(buf)?;
}
buf.put_u8(u8::from(self.include_synonyms));
buf.put_u8(u8::from(self.include_documentation));
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DescribeConfigsResponse {
pub throttle_time_ms: i32,
pub results: Vec<DescribeConfigsResult>,
}
#[derive(Debug, Clone)]
pub struct DescribeConfigsResult {
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub resource_type: ConfigResourceType,
pub resource_name: String,
pub configs: Vec<DescribeConfigsEntry>,
}
#[derive(Debug, Clone)]
pub struct DescribeConfigsEntry {
pub name: String,
pub value: Option<String>,
pub read_only: bool,
pub is_default: bool,
pub is_sensitive: bool,
pub config_source: i8,
pub synonyms: Vec<ConfigSynonym>,
pub config_type: i8,
pub documentation: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ConfigSynonym {
pub name: String,
pub value: Option<String>,
pub source: i8,
}
impl DescribeConfigsResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count = check_decode_array_len(i32::decode(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
let resource_type = ConfigResourceType::from_i8(i8::decode(buf)?);
let resource_name = non_nullable_string("resource name", KafkaString::decode(buf)?.0)?;
let config_count = check_decode_array_len(i32::decode(buf)?)?;
let mut configs = Vec::with_capacity(config_count);
for _ in 0..config_count {
let name = non_nullable_string("config entry name", KafkaString::decode(buf)?.0)?;
let value = KafkaString::decode(buf)?.0;
let read_only = i8::decode(buf)? != 0;
let is_default = i8::decode(buf)? != 0;
let is_sensitive = i8::decode(buf)? != 0;
configs.push(DescribeConfigsEntry {
name,
value,
read_only,
is_default,
is_sensitive,
config_source: -1,
synonyms: Vec::new(),
config_type: 0,
documentation: None,
});
}
results.push(DescribeConfigsResult {
error_code,
error_message,
resource_type,
resource_name,
configs,
});
}
Ok(Self {
throttle_time_ms,
results,
})
}
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count = check_decode_array_len(i32::decode(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
let resource_type = ConfigResourceType::from_i8(i8::decode(buf)?);
let resource_name = non_nullable_string("resource name", KafkaString::decode(buf)?.0)?;
let config_count = check_decode_array_len(i32::decode(buf)?)?;
let mut configs = Vec::with_capacity(config_count);
for _ in 0..config_count {
let name = non_nullable_string("config entry name", KafkaString::decode(buf)?.0)?;
let value = KafkaString::decode(buf)?.0;
let read_only = i8::decode(buf)? != 0;
let config_source = i8::decode(buf)?;
let is_sensitive = i8::decode(buf)? != 0;
let synonym_count = check_decode_array_len(i32::decode(buf)?)?;
let mut synonyms = Vec::with_capacity(synonym_count);
for _ in 0..synonym_count {
let syn_name =
non_nullable_string("synonym name", KafkaString::decode(buf)?.0)?;
let syn_value = KafkaString::decode(buf)?.0;
let syn_source = i8::decode(buf)?;
synonyms.push(ConfigSynonym {
name: syn_name,
value: syn_value,
source: syn_source,
});
}
configs.push(DescribeConfigsEntry {
name,
value,
read_only,
is_default: false,
is_sensitive,
config_source,
synonyms,
config_type: 0,
documentation: None,
});
}
results.push(DescribeConfigsResult {
error_code,
error_message,
resource_type,
resource_name,
configs,
});
}
Ok(Self {
throttle_time_ms,
results,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count = check_decode_array_len(i32::decode(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
let resource_type = ConfigResourceType::from_i8(i8::decode(buf)?);
let resource_name = non_nullable_string("resource name", KafkaString::decode(buf)?.0)?;
let config_count = check_decode_array_len(i32::decode(buf)?)?;
let mut configs = Vec::with_capacity(config_count);
for _ in 0..config_count {
let name = non_nullable_string("config entry name", KafkaString::decode(buf)?.0)?;
let value = KafkaString::decode(buf)?.0;
let read_only = i8::decode(buf)? != 0;
let config_source = i8::decode(buf)?;
let is_sensitive = i8::decode(buf)? != 0;
let synonym_count = check_decode_array_len(i32::decode(buf)?)?;
let mut synonyms = Vec::with_capacity(synonym_count);
for _ in 0..synonym_count {
let syn_name =
non_nullable_string("synonym name", KafkaString::decode(buf)?.0)?;
let syn_value = KafkaString::decode(buf)?.0;
let syn_source = i8::decode(buf)?;
synonyms.push(ConfigSynonym {
name: syn_name,
value: syn_value,
source: syn_source,
});
}
let config_type = i8::decode(buf)?;
let documentation = KafkaString::decode(buf)?.0;
configs.push(DescribeConfigsEntry {
name,
value,
read_only,
is_default: false,
is_sensitive,
config_source,
synonyms,
config_type,
documentation,
});
}
results.push(DescribeConfigsResult {
error_code,
error_message,
resource_type,
resource_name,
configs,
});
}
Ok(Self {
throttle_time_ms,
results,
})
}
pub fn decode_v4(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let resource_type = ConfigResourceType::from_i8(i8::decode(buf)?);
let resource_name =
non_nullable_string("resource name", KafkaString::decode_compact(buf)?.0)?;
let config_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut configs = Vec::with_capacity(config_count);
for _ in 0..config_count {
let name =
non_nullable_string("config entry name", KafkaString::decode_compact(buf)?.0)?;
let value = KafkaString::decode_compact(buf)?.0;
let read_only = i8::decode(buf)? != 0;
let config_source = i8::decode(buf)?;
let is_sensitive = i8::decode(buf)? != 0;
let synonym_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut synonyms = Vec::with_capacity(synonym_count);
for _ in 0..synonym_count {
let syn_name =
non_nullable_string("synonym name", KafkaString::decode_compact(buf)?.0)?;
let syn_value = KafkaString::decode_compact(buf)?.0;
let syn_source = i8::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
synonyms.push(ConfigSynonym {
name: syn_name,
value: syn_value,
source: syn_source,
});
}
let config_type = i8::decode(buf)?;
let documentation = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
configs.push(DescribeConfigsEntry {
name,
value,
read_only,
is_default: false,
is_sensitive,
config_source,
synonyms,
config_type,
documentation,
});
}
let _ = TaggedFields::decode(buf)?;
results.push(DescribeConfigsResult {
error_code,
error_message,
resource_type,
resource_name,
configs,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
results,
})
}
}
impl VersionedEncode for DescribeConfigsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
1 | 2 => self.encode_v1(buf)?,
3 => self.encode_v3(buf)?,
4 => self.encode_v4(buf)?,
_ => return unsupported_encode!("DescribeConfigsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DescribeConfigsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
1 | 2 => Self::decode_v1(buf),
3 => Self::decode_v3(buf),
4 => Self::decode_v4(buf),
_ => unsupported_decode!("DescribeConfigsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
fn put_compact_string(buf: &mut BytesMut, s: Option<&str>) {
match s {
Some(val) => {
buf.put_u8((val.len() + 1) as u8);
buf.put_slice(val.as_bytes());
}
None => buf.put_u8(0),
}
}
fn put_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[test]
fn test_describe_configs_request_encode_v1_round_trip() {
let req = DescribeConfigsRequest {
resources: vec![DescribeConfigsResource {
resource_type: ConfigResourceType::Topic,
resource_name: "test-topic".to_string(),
config_names: Some(vec!["retention.ms".to_string()]),
}],
include_synonyms: true,
include_documentation: false,
};
let mut buf = BytesMut::new();
req.encode_v1(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(cur.get_i32(), 1); assert_eq!(cur.get_i8(), 2); let name_len = cur.get_i16() as usize;
let mut name_bytes = vec![0u8; name_len];
cur.copy_to_slice(&mut name_bytes);
assert_eq!(name_bytes, b"test-topic");
assert_eq!(cur.get_i32(), 1); let cfg_len = cur.get_i16() as usize;
let mut cfg_bytes = vec![0u8; cfg_len];
cur.copy_to_slice(&mut cfg_bytes);
assert_eq!(cfg_bytes, b"retention.ms");
assert_eq!(cur.get_u8(), 1); assert!(cur.is_empty());
}
#[test]
fn test_describe_configs_response_decode_v1_with_synonyms() {
let mut buf = BytesMut::new();
buf.put_i32(10); buf.put_i32(1); buf.put_i16(0); buf.put_i16(-1); buf.put_i8(2); buf.put_i16(5);
buf.put_slice(b"topic");
buf.put_i32(1); buf.put_i16(12);
buf.put_slice(b"retention.ms");
buf.put_i16(6);
buf.put_slice(b"604800"); buf.put_i8(0); buf.put_i8(5); buf.put_i8(0); buf.put_i32(1); buf.put_i16(12);
buf.put_slice(b"retention.ms");
buf.put_i16(6);
buf.put_slice(b"604800");
buf.put_i8(5);
let resp = DescribeConfigsResponse::decode_v1(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 10);
assert_eq!(resp.results.len(), 1);
let r = &resp.results[0];
assert!(r.error_code.is_ok());
assert_eq!(r.resource_name, "topic");
assert_eq!(r.configs.len(), 1);
let c = &r.configs[0];
assert_eq!(c.name, "retention.ms");
assert_eq!(c.value.as_deref(), Some("604800"));
assert_eq!(c.config_source, 5);
assert_eq!(c.synonyms.len(), 1);
assert_eq!(c.synonyms[0].name, "retention.ms");
assert_eq!(c.synonyms[0].source, 5);
assert_eq!(c.config_type, 0);
assert!(c.documentation.is_none());
}
#[test]
fn test_describe_configs_response_decode_v3_with_type_and_docs() {
let mut buf = BytesMut::new();
buf.put_i32(5); buf.put_i32(1); buf.put_i16(0); buf.put_i16(-1); buf.put_i8(2); buf.put_i16(1);
buf.put_slice(b"t"); buf.put_i32(1); buf.put_i16(3);
buf.put_slice(b"key"); buf.put_i16(3);
buf.put_slice(b"val"); buf.put_i8(1); buf.put_i8(1); buf.put_i8(0); buf.put_i32(0); buf.put_i8(3); buf.put_i16(4);
buf.put_slice(b"docs");
let resp = DescribeConfigsResponse::decode_v3(&mut buf.freeze()).unwrap();
let c = &resp.results[0].configs[0];
assert_eq!(c.name, "key");
assert!(c.read_only);
assert_eq!(c.config_source, 1);
assert_eq!(c.config_type, 3);
assert_eq!(c.documentation.as_deref(), Some("docs"));
}
#[test]
fn test_describe_configs_request_encode_v4_flexible() {
let req = DescribeConfigsRequest {
resources: vec![DescribeConfigsResource {
resource_type: ConfigResourceType::Broker,
resource_name: "0".to_string(),
config_names: None,
}],
include_synonyms: true,
include_documentation: true,
};
let mut buf = BytesMut::new();
req.encode_v4(&mut buf).unwrap();
let mut cur = &buf[..];
let arr_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(arr_varint, 2); assert_eq!(cur.get_i8(), 4); let name_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(name_varint, 2); assert_eq!(cur.get_u8(), b'0');
let null_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(null_varint, 0);
assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_u8(), 1); assert_eq!(cur.get_u8(), 1); assert_eq!(cur.get_u8(), 0); assert!(cur.is_empty());
}
#[test]
fn test_describe_configs_response_decode_v4_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); buf.put_i16(0); put_compact_string(&mut buf, None); buf.put_i8(2); put_compact_string(&mut buf, Some("tp")); varint::encode_unsigned_varint(2, &mut buf); put_compact_string(&mut buf, Some("k")); put_compact_string(&mut buf, Some("v")); buf.put_i8(0); buf.put_i8(2); buf.put_i8(0); varint::encode_unsigned_varint(2, &mut buf); put_compact_string(&mut buf, Some("k")); put_compact_string(&mut buf, Some("v")); buf.put_i8(2); put_tagged_fields(&mut buf); buf.put_i8(1); put_compact_string(&mut buf, Some("doc")); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf);
let resp = DescribeConfigsResponse::decode_v4(&mut buf.freeze()).unwrap();
assert_eq!(resp.results.len(), 1);
let c = &resp.results[0].configs[0];
assert_eq!(c.name, "k");
assert_eq!(c.value.as_deref(), Some("v"));
assert_eq!(c.config_source, 2);
assert_eq!(c.synonyms.len(), 1);
assert_eq!(c.config_type, 1);
assert_eq!(c.documentation.as_deref(), Some("doc"));
}
}