use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::check_compact_array_len;
use crate::protocol::primitives::{Decode, KafkaArray, KafkaString, TaggedFields, TryEncode};
#[derive(Debug, Clone, Default)]
pub struct MetadataRequest {
pub topics: Option<Vec<MetadataRequestTopic>>,
pub allow_auto_topic_creation: bool,
}
#[derive(Debug, Clone)]
pub struct MetadataRequestTopic {
pub topic_id: Option<[u8; 16]>,
pub name: Option<String>,
}
impl MetadataRequest {
pub fn all_topics() -> Self {
Self {
topics: None,
..Default::default()
}
}
pub fn for_topics(topics: Vec<&str>) -> Self {
Self {
topics: Some(
topics
.into_iter()
.map(|name| MetadataRequestTopic {
topic_id: None,
name: Some(name.to_string()),
})
.collect(),
),
..Default::default()
}
}
pub fn api_key() -> ApiKey {
ApiKey::Metadata
}
fn topic_names(topics: &[MetadataRequestTopic]) -> Result<Vec<KafkaString>> {
topics
.iter()
.map(|t| {
t.name.as_ref().map(KafkaString::new).ok_or_else(|| {
crate::error::KrafkaError::protocol(
"MetadataRequestTopic.name is required for v0-v8",
)
})
})
.collect()
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.topics {
None => KafkaArray::<KafkaString>::null().try_encode(buf)?,
Some(topics) => KafkaArray::new(Self::topic_names(topics)?).try_encode(buf)?,
}
Ok(())
}
pub fn encode_v4(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_v1(buf)?;
buf.put_u8(if self.allow_auto_topic_creation { 1 } else { 0 });
Ok(())
}
pub fn encode_v8(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_v4(buf)?;
buf.put_u8(0); buf.put_u8(0); Ok(())
}
pub fn encode_v9(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_topic_entries_flexible(buf, TopicIdMode::Omit)?;
buf.put_u8(if self.allow_auto_topic_creation { 1 } else { 0 });
buf.put_u8(0); buf.put_u8(0); TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v10(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_topic_entries_flexible(buf, TopicIdMode::ForceZero)?;
buf.put_u8(if self.allow_auto_topic_creation { 1 } else { 0 });
buf.put_u8(0); buf.put_u8(0); TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v11(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_topic_entries_flexible(buf, TopicIdMode::ForceZero)?;
buf.put_u8(if self.allow_auto_topic_creation { 1 } else { 0 });
buf.put_u8(0); TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v12(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode_topic_entries_flexible(buf, TopicIdMode::UseField)?;
buf.put_u8(if self.allow_auto_topic_creation { 1 } else { 0 });
buf.put_u8(0); TaggedFields::default().try_encode(buf)?;
Ok(())
}
fn encode_topic_entries_flexible(
&self,
buf: &mut impl BufMut,
topic_id_mode: TopicIdMode,
) -> Result<()> {
match &self.topics {
None => {
KafkaArray::<KafkaString>::null().try_encode_compact(buf)?;
}
Some(topics) => {
let len_plus_one = u32::try_from(topics.len().saturating_add(1)).map_err(|_| {
crate::error::KrafkaError::protocol(format!(
"topics array length {} exceeds u32 limit",
topics.len()
))
})?;
crate::util::varint::encode_unsigned_varint(len_plus_one, buf);
for t in topics {
match topic_id_mode {
TopicIdMode::Omit | TopicIdMode::ForceZero => {
if t.name.is_none() {
return Err(crate::error::KrafkaError::protocol(
"MetadataRequest topic name must be non-null \
when TopicId is absent or zero",
));
}
if matches!(topic_id_mode, TopicIdMode::ForceZero) {
buf.put_slice(&[0u8; 16]);
}
}
TopicIdMode::UseField => {
if t.topic_id.is_none() && t.name.is_none() {
return Err(crate::error::KrafkaError::protocol(
"MetadataRequest topic must have at least one \
of topic_id or name set",
));
}
buf.put_slice(&t.topic_id.unwrap_or([0u8; 16]));
}
}
match &t.name {
Some(name) => KafkaString::new(name).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
TaggedFields::default().try_encode(buf)?;
}
}
}
Ok(())
}
}
enum TopicIdMode {
Omit,
ForceZero,
UseField,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct MetadataResponse {
pub throttle_time_ms: i32,
pub brokers: Vec<MetadataBroker>,
pub cluster_id: Option<String>,
pub controller_id: i32,
pub topics: Vec<MetadataTopicResponse>,
pub error_code: ErrorCode,
}
#[derive(Debug, Clone)]
pub struct MetadataBroker {
pub node_id: i32,
pub host: String,
pub port: i32,
pub rack: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MetadataTopicResponse {
pub error_code: ErrorCode,
pub name: Option<String>,
pub topic_id: Option<[u8; 16]>,
pub is_internal: bool,
pub partitions: Vec<MetadataPartitionResponse>,
}
#[derive(Debug, Clone)]
pub struct MetadataPartitionResponse {
pub error_code: ErrorCode,
pub partition_index: i32,
pub leader_id: i32,
pub leader_epoch: i32,
pub replica_nodes: Vec<i32>,
pub isr_nodes: Vec<i32>,
pub offline_replicas: Vec<i32>,
}
impl MetadataResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let brokers = decode_array::<MetadataBrokerV1, _>(buf)?;
let controller_id = i32::decode(buf)?;
let topics = decode_array::<MetadataTopicResponseV1, _>(buf)?;
Ok(Self {
throttle_time_ms: 0,
brokers,
cluster_id: None,
controller_id,
topics,
error_code: ErrorCode::None,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let brokers = decode_array::<MetadataBrokerV1, _>(buf)?;
let cluster_id = KafkaString::decode(buf)?.0;
let controller_id = i32::decode(buf)?;
let topics = decode_array::<MetadataTopicResponseV1, _>(buf)?;
Ok(Self {
throttle_time_ms: 0,
brokers,
cluster_id,
controller_id,
topics,
error_code: ErrorCode::None,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v3_plus::<MetadataTopicResponseV1>(buf)
}
pub fn decode_v5(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v3_plus::<MetadataTopicResponseV5>(buf)
}
pub fn decode_v7(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v3_plus::<MetadataTopicResponseV7>(buf)
}
pub fn decode_v8(buf: &mut impl Buf) -> Result<Self> {
let resp = Self::decode_v3_plus::<MetadataTopicResponseV8>(buf)?;
let _cluster_authorized_operations = i32::decode(buf)?;
Ok(resp)
}
pub fn decode_v9(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v9_plus::<MetadataTopicResponseV9>(buf, true)
}
pub fn decode_v10(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v9_plus::<MetadataTopicResponseV10>(buf, true)
}
pub fn decode_v11(buf: &mut impl Buf) -> Result<Self> {
Self::decode_v9_plus::<MetadataTopicResponseV10>(buf, false)
}
fn decode_v3_plus<T: Decode + Into<MetadataTopicResponse>>(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let brokers = decode_array::<MetadataBrokerV1, _>(buf)?;
let cluster_id = KafkaString::decode(buf)?.0;
let controller_id = i32::decode(buf)?;
let topics = decode_array::<T, _>(buf)?;
Ok(Self {
throttle_time_ms,
brokers,
cluster_id,
controller_id,
topics,
error_code: ErrorCode::None,
})
}
fn decode_v9_plus<T: Decode + Into<MetadataTopicResponse>>(
buf: &mut impl Buf,
include_cluster_auth_ops: bool,
) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let brokers = decode_compact_array::<MetadataBrokerV9, _>(buf)?;
let cluster_id = KafkaString::decode_compact(buf)?.0;
let controller_id = i32::decode(buf)?;
let topics = decode_compact_array::<T, _>(buf)?;
if include_cluster_auth_ops {
let _cluster_authorized_operations = i32::decode(buf)?;
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
brokers,
cluster_id,
controller_id,
topics,
error_code: ErrorCode::None,
})
}
pub fn decode_v13(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let brokers = decode_compact_array::<MetadataBrokerV9, _>(buf)?;
let cluster_id = KafkaString::decode_compact(buf)?.0;
let controller_id = i32::decode(buf)?;
let topics = decode_compact_array::<MetadataTopicResponseV10, _>(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
brokers,
cluster_id,
controller_id,
topics,
error_code,
})
}
pub fn find_broker(&self, node_id: i32) -> Option<&MetadataBroker> {
self.brokers.iter().find(|b| b.node_id == node_id)
}
pub fn find_topic(&self, name: &str) -> Option<&MetadataTopicResponse> {
self.topics.iter().find(|t| t.name.as_deref() == Some(name))
}
}
fn decode_array<W: Decode + Into<T>, T>(buf: &mut impl Buf) -> Result<Vec<T>> {
let items = non_nullable_array(KafkaArray::<W>::decode(buf)?.0)?;
Ok(items.into_iter().map(Into::into).collect())
}
fn decode_compact_array<W: Decode + Into<T>, T>(buf: &mut impl Buf) -> Result<Vec<T>> {
let items = KafkaArray::<W>::decode_compact(buf)?.0.ok_or_else(|| {
crate::error::KrafkaError::protocol(
"compact array raw value 0 (null) is invalid for a non-nullable field",
)
})?;
Ok(items.into_iter().map(Into::into).collect())
}
fn non_nullable_array<T>(opt: Option<Vec<T>>) -> Result<Vec<T>> {
opt.ok_or_else(|| {
crate::error::KrafkaError::protocol(
"array length -1 (null) is invalid for a non-nullable field",
)
})
}
struct MetadataBrokerV0(MetadataBroker);
impl Decode for MetadataBrokerV0 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let node_id = i32::decode(buf)?;
let host = KafkaString::decode(buf)?.0.ok_or_else(|| {
KrafkaError::protocol("metadata broker host must be a non-null string")
})?;
let port = i32::decode(buf)?;
Ok(Self(MetadataBroker {
node_id,
host,
port,
rack: None,
}))
}
}
impl From<MetadataBrokerV0> for MetadataBroker {
fn from(w: MetadataBrokerV0) -> Self {
w.0
}
}
struct MetadataBrokerV1(MetadataBroker);
impl Decode for MetadataBrokerV1 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let node_id = i32::decode(buf)?;
let host = KafkaString::decode(buf)?.0.ok_or_else(|| {
KrafkaError::protocol("metadata broker host must be a non-null string")
})?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode(buf)?.0;
Ok(Self(MetadataBroker {
node_id,
host,
port,
rack,
}))
}
}
impl From<MetadataBrokerV1> for MetadataBroker {
fn from(w: MetadataBrokerV1) -> Self {
w.0
}
}
struct MetadataBrokerV9(MetadataBroker);
impl Decode for MetadataBrokerV9 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let node_id = i32::decode(buf)?;
let host = KafkaString::decode_compact(buf)?.0.ok_or_else(|| {
KrafkaError::protocol("metadata broker host must be a non-null compact string")
})?;
let port = i32::decode(buf)?;
let rack = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
Ok(Self(MetadataBroker {
node_id,
host,
port,
rack,
}))
}
fn decode_compact(buf: &mut impl Buf) -> Result<Self> {
Self::decode(buf)
}
}
impl From<MetadataBrokerV9> for MetadataBroker {
fn from(w: MetadataBrokerV9) -> Self {
w.0
}
}
struct MetadataPartitionResponseV0(MetadataPartitionResponse);
impl Decode for MetadataPartitionResponseV0 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition_index = i32::decode(buf)?;
let leader_id = i32::decode(buf)?;
let replica_nodes = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
let isr_nodes = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
Ok(Self(MetadataPartitionResponse {
error_code,
partition_index,
leader_id,
leader_epoch: -1,
replica_nodes,
isr_nodes,
offline_replicas: Vec::new(),
}))
}
}
impl From<MetadataPartitionResponseV0> for MetadataPartitionResponse {
fn from(w: MetadataPartitionResponseV0) -> Self {
w.0
}
}
struct MetadataPartitionResponseV5(MetadataPartitionResponse);
impl Decode for MetadataPartitionResponseV5 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition_index = i32::decode(buf)?;
let leader_id = i32::decode(buf)?;
let replica_nodes = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
let isr_nodes = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
let offline_replicas = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
Ok(Self(MetadataPartitionResponse {
error_code,
partition_index,
leader_id,
leader_epoch: -1,
replica_nodes,
isr_nodes,
offline_replicas,
}))
}
}
impl From<MetadataPartitionResponseV5> for MetadataPartitionResponse {
fn from(w: MetadataPartitionResponseV5) -> Self {
w.0
}
}
struct MetadataPartitionResponseV7(MetadataPartitionResponse);
impl Decode for MetadataPartitionResponseV7 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition_index = i32::decode(buf)?;
let leader_id = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let replica_nodes = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
let isr_nodes = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
let offline_replicas = non_nullable_array(KafkaArray::<i32>::decode(buf)?.0)?;
Ok(Self(MetadataPartitionResponse {
error_code,
partition_index,
leader_id,
leader_epoch,
replica_nodes,
isr_nodes,
offline_replicas,
}))
}
}
impl From<MetadataPartitionResponseV7> for MetadataPartitionResponse {
fn from(w: MetadataPartitionResponseV7) -> Self {
w.0
}
}
struct MetadataPartitionResponseV9(MetadataPartitionResponse);
impl Decode for MetadataPartitionResponseV9 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let partition_index = i32::decode(buf)?;
let leader_id = i32::decode(buf)?;
let leader_epoch = i32::decode(buf)?;
let replica_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut replica_nodes = Vec::with_capacity(replica_count);
for _ in 0..replica_count {
replica_nodes.push(i32::decode(buf)?);
}
let isr_count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut isr_nodes = Vec::with_capacity(isr_count);
for _ in 0..isr_count {
isr_nodes.push(i32::decode(buf)?);
}
let offline_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut offline_replicas = Vec::with_capacity(offline_count);
for _ in 0..offline_count {
offline_replicas.push(i32::decode(buf)?);
}
let _ = TaggedFields::decode(buf)?;
Ok(Self(MetadataPartitionResponse {
error_code,
partition_index,
leader_id,
leader_epoch,
replica_nodes,
isr_nodes,
offline_replicas,
}))
}
fn decode_compact(buf: &mut impl Buf) -> Result<Self> {
Self::decode(buf)
}
}
impl From<MetadataPartitionResponseV9> for MetadataPartitionResponse {
fn from(w: MetadataPartitionResponseV9) -> Self {
w.0
}
}
struct MetadataTopicResponseV0(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV0 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode(buf)?.0;
let partitions = decode_array::<MetadataPartitionResponseV0, _>(buf)?;
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: None,
is_internal: false,
partitions,
}))
}
}
impl From<MetadataTopicResponseV0> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV0) -> Self {
w.0
}
}
struct MetadataTopicResponseV1(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV1 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode(buf)?.0;
let is_internal = bool::decode(buf)?;
let partitions = decode_array::<MetadataPartitionResponseV0, _>(buf)?;
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: None,
is_internal,
partitions,
}))
}
}
impl From<MetadataTopicResponseV1> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV1) -> Self {
w.0
}
}
struct MetadataTopicResponseV5(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV5 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode(buf)?.0;
let is_internal = bool::decode(buf)?;
let partitions = decode_array::<MetadataPartitionResponseV5, _>(buf)?;
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: None,
is_internal,
partitions,
}))
}
}
impl From<MetadataTopicResponseV5> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV5) -> Self {
w.0
}
}
struct MetadataTopicResponseV7(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV7 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode(buf)?.0;
let is_internal = bool::decode(buf)?;
let partitions = decode_array::<MetadataPartitionResponseV7, _>(buf)?;
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: None,
is_internal,
partitions,
}))
}
}
impl From<MetadataTopicResponseV7> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV7) -> Self {
w.0
}
}
struct MetadataTopicResponseV8(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV8 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode(buf)?.0;
let is_internal = bool::decode(buf)?;
let partitions = decode_array::<MetadataPartitionResponseV7, _>(buf)?;
let _topic_authorized_operations = i32::decode(buf)?;
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: None,
is_internal,
partitions,
}))
}
}
impl From<MetadataTopicResponseV8> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV8) -> Self {
w.0
}
}
struct MetadataTopicResponseV9(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV9 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode_compact(buf)?.0;
let is_internal = bool::decode(buf)?;
let partitions = decode_compact_array::<MetadataPartitionResponseV9, _>(buf)?;
let _topic_authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: None,
is_internal,
partitions,
}))
}
fn decode_compact(buf: &mut impl Buf) -> Result<Self> {
Self::decode(buf)
}
}
impl From<MetadataTopicResponseV9> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV9) -> Self {
w.0
}
}
struct MetadataTopicResponseV10(MetadataTopicResponse);
impl Decode for MetadataTopicResponseV10 {
fn decode(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let name = KafkaString::decode_compact(buf)?.0;
let mut topic_id = [0u8; 16];
if buf.remaining() < 16 {
return Err(crate::error::KrafkaError::protocol(
"not enough bytes for topic_id UUID",
));
}
buf.copy_to_slice(&mut topic_id);
let is_internal = bool::decode(buf)?;
let partitions = decode_compact_array::<MetadataPartitionResponseV9, _>(buf)?;
let _topic_authorized_operations = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
let topic_id_opt = if topic_id == [0u8; 16] {
None
} else {
Some(topic_id)
};
Ok(Self(MetadataTopicResponse {
error_code,
name,
topic_id: topic_id_opt,
is_internal,
partitions,
}))
}
fn decode_compact(buf: &mut impl Buf) -> Result<Self> {
Self::decode(buf)
}
}
impl From<MetadataTopicResponseV10> for MetadataTopicResponse {
fn from(w: MetadataTopicResponseV10) -> Self {
w.0
}
}
impl VersionedEncode for MetadataRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1..=3 => self.encode_v1(buf)?,
4..=7 => self.encode_v4(buf)?,
8 => self.encode_v8(buf)?,
9 => self.encode_v9(buf)?,
10 => self.encode_v10(buf)?,
11 => self.encode_v11(buf)?,
12..=13 => self.encode_v12(buf)?,
_ => return unsupported_encode!("MetadataRequest", version),
}
Ok(())
}
}
impl VersionedDecode for MetadataResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
3..=4 => Self::decode_v3(buf),
5..=6 => Self::decode_v5(buf),
7 => Self::decode_v7(buf),
8 => Self::decode_v8(buf),
9 => Self::decode_v9(buf),
10 => Self::decode_v10(buf),
11..=12 => Self::decode_v11(buf),
13 => Self::decode_v13(buf),
_ => unsupported_decode!("MetadataResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::protocol::*;
use crate::util::varint;
use bytes::BytesMut;
use rstest::rstest;
fn build_metadata_response_v9_bytes() -> BytesMut {
let mut buf = BytesMut::new();
buf.put_i32(10); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(1); let host = b"broker-1";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9092); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf); let cluster = b"cluster-abc";
varint::encode_unsigned_varint(cluster.len() as u32 + 1, &mut buf);
buf.put_slice(cluster);
buf.put_i32(1); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i16(0); let topic = b"test-topic";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
buf.put_u8(0); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(5); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(1);
varint::encode_unsigned_varint(2, &mut buf);
buf.put_i32(1);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(0x7FFF_FFFF); varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(0x7FFF_FFFF); varint::encode_unsigned_varint(0, &mut buf); buf
}
#[test]
fn test_metadata_request_all_topics() {
let request = MetadataRequest::all_topics();
assert!(request.topics.is_none());
}
#[test]
fn test_metadata_request_all_topics_encode_v1() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]), -1);
}
#[test]
fn test_metadata_request_specific_topics() {
let request = MetadataRequest::for_topics(vec!["topic1", "topic2"]);
assert_eq!(request.topics.as_ref().unwrap().len(), 2);
}
#[test]
fn test_versioned_encode_metadata_request_v0_unsupported() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
}
#[test]
fn test_versioned_encode_metadata_request_v1() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
request.encode_versioned(1, &mut buf).unwrap();
let mut expected = BytesMut::new();
request.encode_v1(&mut expected).unwrap();
assert_eq!(buf, expected);
}
#[test]
fn test_versioned_encode_metadata_v1_vs_v4_all_topics() {
let request = MetadataRequest::all_topics();
let mut buf_v1 = BytesMut::new();
request.encode_versioned(1, &mut buf_v1).unwrap();
let mut buf_v4 = BytesMut::new();
request.encode_versioned(4, &mut buf_v4).unwrap();
assert_ne!(buf_v1, buf_v4);
}
#[test]
fn test_versioned_encode_metadata_request_v4() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
request.encode_versioned(4, &mut buf).unwrap();
let mut expected = BytesMut::new();
request.encode_v4(&mut expected).unwrap();
assert_eq!(buf, expected);
}
#[test]
fn test_versioned_encode_metadata_request_v8() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
request.encode_versioned(8, &mut buf).unwrap();
let mut expected = BytesMut::new();
request.encode_v8(&mut expected).unwrap();
assert_eq!(buf, expected);
}
#[test]
fn test_versioned_encode_metadata_request_rejects_v14() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
let result = request.encode_versioned(14, &mut buf);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("unsupported"), "got: {msg}");
}
#[test]
fn test_versioned_decode_metadata_rejects_v14() {
let mut buf = bytes::Bytes::new();
let result = MetadataResponse::decode_versioned(14, &mut buf);
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("unsupported"), "got: {msg}");
}
#[test]
fn test_metadata_response_decode_v0_unsupported() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i32(0);
assert!(MetadataResponse::decode_versioned(0, &mut buf.freeze()).is_err());
}
#[test]
fn test_metadata_response_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(1);
buf.put_i32(1); let host = b"broker1";
buf.put_i16(host.len() as i16);
buf.put_slice(host);
buf.put_i32(9092); let rack = b"us-east-1a";
buf.put_i16(rack.len() as i16);
buf.put_slice(rack); buf.put_i32(1);
buf.put_i32(1);
buf.put_i16(0); let topic = b"test";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_u8(0); buf.put_i32(1);
buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(1); buf.put_i32(1); buf.put_i32(1); buf.put_i32(1);
let resp = MetadataResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.brokers.len(), 1);
assert_eq!(resp.brokers[0].rack.as_deref(), Some("us-east-1a"));
assert_eq!(resp.controller_id, 1);
assert_eq!(resp.cluster_id, None);
assert_eq!(resp.throttle_time_ms, 0);
assert_eq!(resp.topics[0].partitions[0].leader_epoch, -1);
}
#[test]
fn test_metadata_response_decode_v2() {
let mut buf = BytesMut::new();
buf.put_i32(1);
buf.put_i32(1);
let host = b"broker1";
buf.put_i16(host.len() as i16);
buf.put_slice(host);
buf.put_i32(9092);
let rack = b"rack-a";
buf.put_i16(rack.len() as i16);
buf.put_slice(rack);
let cid = b"abc-cluster";
buf.put_i16(cid.len() as i16);
buf.put_slice(cid);
buf.put_i32(1);
buf.put_i32(0);
let resp = MetadataResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert_eq!(resp.cluster_id.as_deref(), Some("abc-cluster"));
assert_eq!(resp.brokers[0].rack.as_deref(), Some("rack-a"));
}
#[test]
fn test_metadata_response_decode_v3() {
let mut buf = BytesMut::new();
buf.put_i32(50); buf.put_i32(0);
buf.put_i16(-1);
buf.put_i32(-1);
buf.put_i32(0);
let resp = MetadataResponse::decode_versioned(3, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.cluster_id, None);
}
#[test]
fn test_metadata_response_decode_v4() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i32(0); let cid = b"kraft-cluster-1";
buf.put_i16(cid.len() as i16);
buf.put_slice(cid);
buf.put_i32(2); buf.put_i32(0);
let resp = MetadataResponse::decode_versioned(4, &mut buf.freeze()).unwrap();
assert_eq!(resp.cluster_id.as_deref(), Some("kraft-cluster-1"));
assert_eq!(resp.controller_id, 2);
}
#[test]
fn test_metadata_response_decode_v5() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i32(0); buf.put_i16(-1); buf.put_i32(-1); buf.put_i32(1);
buf.put_i16(0); let topic = b"t1";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_u8(0); buf.put_i32(1);
buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(2); buf.put_i32(1);
buf.put_i32(2);
buf.put_i32(2); buf.put_i32(1);
buf.put_i32(2);
buf.put_i32(1); buf.put_i32(2);
let resp = MetadataResponse::decode_versioned(5, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].partitions[0].offline_replicas, vec![2]);
assert_eq!(resp.topics[0].partitions[0].leader_epoch, -1); }
#[test]
fn test_metadata_response_decode_v6() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i32(0); buf.put_i16(-1); buf.put_i32(-1); buf.put_i32(1);
buf.put_i16(0); let topic = b"t2";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_u8(1); buf.put_i32(1);
buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(1); buf.put_i32(1);
buf.put_i32(1); buf.put_i32(1);
buf.put_i32(0);
let resp = MetadataResponse::decode_versioned(6, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].name.as_deref(), Some("t2"));
assert!(resp.topics[0].is_internal);
assert_eq!(resp.topics[0].partitions[0].leader_epoch, -1); }
#[test]
fn test_metadata_response_decode_v7() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i32(1);
buf.put_i32(1);
let host = b"broker1";
buf.put_i16(host.len() as i16);
buf.put_slice(host);
buf.put_i32(9092);
let rack = b"az-1";
buf.put_i16(rack.len() as i16);
buf.put_slice(rack);
let cid = b"kraft-id";
buf.put_i16(cid.len() as i16);
buf.put_slice(cid);
buf.put_i32(1); buf.put_i32(1);
buf.put_i16(0); let topic = b"events";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_u8(0); buf.put_i32(1);
buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(42); buf.put_i32(1); buf.put_i32(1);
buf.put_i32(1); buf.put_i32(1);
buf.put_i32(0);
let resp = MetadataResponse::decode_versioned(7, &mut buf.freeze()).unwrap();
assert_eq!(resp.cluster_id.as_deref(), Some("kraft-id"));
assert_eq!(resp.brokers[0].rack.as_deref(), Some("az-1"));
assert_eq!(resp.topics[0].partitions[0].leader_epoch, 42);
assert!(resp.topics[0].partitions[0].offline_replicas.is_empty());
}
#[test]
fn test_metadata_response_decode_v8() {
let mut buf = BytesMut::new();
buf.put_i32(10); buf.put_i32(0); let cid = b"kraft-8";
buf.put_i16(cid.len() as i16);
buf.put_slice(cid);
buf.put_i32(0); buf.put_i32(1);
buf.put_i16(0); let topic = b"orders";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_u8(0); buf.put_i32(0);
buf.put_i32(-2147483648_i32); buf.put_i32(-2147483648_i32);
let resp = MetadataResponse::decode_versioned(8, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 10);
assert_eq!(resp.cluster_id.as_deref(), Some("kraft-8"));
assert_eq!(resp.topics[0].name.as_deref(), Some("orders"));
}
#[test]
fn test_metadata_response_decode_v9() {
let mut buf = BytesMut::new();
buf.put_i32(5);
buf.put_u8(2); buf.put_i32(1); buf.put_u8(3); buf.put_slice(b"b1");
buf.put_i32(9092); buf.put_u8(1); buf.put_u8(0);
buf.put_u8(6); buf.put_slice(b"cls-9");
buf.put_i32(1);
buf.put_u8(2); buf.put_i16(0); buf.put_u8(5); buf.put_slice(b"my-t");
buf.put_u8(0); buf.put_u8(1); buf.put_i32(-2147483648_i32); buf.put_u8(0);
buf.put_i32(-2147483648_i32);
buf.put_u8(0);
let resp = MetadataResponse::decode_versioned(9, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 5);
assert_eq!(resp.cluster_id.as_deref(), Some("cls-9"));
assert_eq!(resp.brokers.len(), 1);
assert_eq!(resp.brokers[0].host, "b1");
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name.as_deref(), Some("my-t"));
assert!(resp.topics[0].topic_id.is_none());
}
#[test]
fn test_metadata_response_decode_v9_with_partitions() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_u8(1);
buf.put_u8(0);
buf.put_i32(-1);
buf.put_u8(2); buf.put_i16(0); buf.put_u8(3); buf.put_slice(b"t1");
buf.put_u8(0);
buf.put_u8(2); buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(5);
buf.put_u8(4); buf.put_i32(1);
buf.put_i32(2);
buf.put_i32(3);
buf.put_u8(3); buf.put_i32(1);
buf.put_i32(2);
buf.put_u8(1); buf.put_u8(0);
buf.put_i32(-2147483648_i32); buf.put_u8(0);
buf.put_i32(-2147483648_i32); buf.put_u8(0);
let resp = MetadataResponse::decode_versioned(9, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
let topic = &resp.topics[0];
assert_eq!(topic.name.as_deref(), Some("t1"));
assert_eq!(topic.partitions.len(), 1);
let part = &topic.partitions[0];
assert_eq!(part.partition_index, 0);
assert_eq!(part.leader_id, 1);
assert_eq!(part.leader_epoch, 5);
assert_eq!(part.replica_nodes, vec![1, 2, 3]);
assert_eq!(part.isr_nodes, vec![1, 2]);
assert!(part.offline_replicas.is_empty());
}
#[test]
fn test_metadata_response_decode_v9_null_replica_array_rejected() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(1); buf.put_u8(0); buf.put_i32(-1);
buf.put_u8(2); buf.put_i16(0); buf.put_u8(3); buf.put_slice(b"t1");
buf.put_u8(0); buf.put_u8(2); buf.put_i16(0); buf.put_i32(0); buf.put_i32(1); buf.put_i32(0); buf.put_u8(0);
let err = MetadataResponse::decode_versioned(9, &mut buf.freeze()).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("null") || msg.contains("0"),
"expected null/0 rejection error, got: {msg}"
);
}
#[test]
fn test_metadata_response_decode_v10() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_u8(1);
buf.put_u8(0);
buf.put_i32(-1);
buf.put_u8(2); buf.put_i16(0); buf.put_u8(7); buf.put_slice(b"events");
let topic_uuid: [u8; 16] = [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
0x0f, 0x10,
];
buf.put_slice(&topic_uuid);
buf.put_u8(0); buf.put_u8(1); buf.put_i32(-2147483648_i32); buf.put_u8(0);
buf.put_i32(-2147483648_i32); buf.put_u8(0);
let resp = MetadataResponse::decode_versioned(10, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name.as_deref(), Some("events"));
assert_eq!(resp.topics[0].topic_id, Some(topic_uuid));
}
#[test]
fn test_metadata_response_decode_v10_zero_uuid() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(1); buf.put_u8(0); buf.put_i32(-1);
buf.put_u8(2); buf.put_i16(0); buf.put_u8(4); buf.put_slice(b"foo");
buf.put_slice(&[0u8; 16]); buf.put_u8(0); buf.put_u8(1); buf.put_i32(-2147483648_i32); buf.put_u8(0);
buf.put_i32(-2147483648_i32); buf.put_u8(0);
let resp = MetadataResponse::decode_versioned(10, &mut buf.freeze()).unwrap();
assert!(resp.topics[0].topic_id.is_none());
}
#[test]
fn test_metadata_request_encode_v9() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
request.encode_v9(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf2 = BytesMut::new();
request.encode_versioned(9, &mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_metadata_versioned_v12_dispatches() {
let request = MetadataRequest::all_topics();
let mut buf_v11 = BytesMut::new();
request.encode_v11(&mut buf_v11).unwrap();
let mut buf_v12 = BytesMut::new();
request.encode_versioned(12, &mut buf_v12).unwrap();
assert_eq!(buf_v11, buf_v12);
}
#[test]
fn test_metadata_request_encode_v4_adds_auto_create() {
let request = MetadataRequest::all_topics();
let mut buf_v1 = BytesMut::new();
request.encode_v1(&mut buf_v1).unwrap();
let mut buf_v4 = BytesMut::new();
request.encode_v4(&mut buf_v4).unwrap();
assert_eq!(buf_v4.len(), buf_v1.len() + 1);
}
#[test]
fn test_metadata_request_encode_v10_adds_topic_id() {
let request = MetadataRequest::for_topics(vec!["my-test"]);
let mut buf_v9 = BytesMut::new();
request.encode_v9(&mut buf_v9).unwrap();
let mut buf_v10 = BytesMut::new();
request.encode_v10(&mut buf_v10).unwrap();
assert_eq!(buf_v10.len(), buf_v9.len() + 16);
}
#[test]
fn test_metadata_request_encode_v11_no_cluster_auth_ops() {
let request = MetadataRequest::all_topics();
let mut buf_v10 = BytesMut::new();
request.encode_v10(&mut buf_v10).unwrap();
let mut buf_v11 = BytesMut::new();
request.encode_v11(&mut buf_v11).unwrap();
assert_eq!(buf_v11.len(), buf_v10.len() - 1);
}
#[test]
fn test_metadata_response_decode_v11() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(1); buf.put_u8(0); buf.put_i32(-1);
buf.put_u8(2); buf.put_i16(0); buf.put_u8(4); buf.put_slice(b"foo");
buf.put_slice(&[0xAB; 16]); buf.put_u8(0); buf.put_u8(1); buf.put_i32(-2147483648_i32); buf.put_u8(0);
buf.put_u8(0);
let resp = MetadataResponse::decode_versioned(11, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name.as_deref(), Some("foo"));
assert_eq!(resp.topics[0].topic_id, Some([0xAB; 16]));
}
#[test]
fn test_metadata_request_encode_v9_with_topics() {
let request = MetadataRequest::for_topics(vec!["test-topic"]);
let mut buf = BytesMut::new();
request.encode_v9(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf2 = BytesMut::new();
request.encode_versioned(9, &mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_metadata_request_encode_v8_adds_authorized_ops() {
let request = MetadataRequest::all_topics();
let mut buf_v4 = BytesMut::new();
request.encode_v4(&mut buf_v4).unwrap();
let mut buf_v8 = BytesMut::new();
request.encode_v8(&mut buf_v8).unwrap();
assert_eq!(buf_v8.len(), buf_v4.len() + 2);
}
#[test]
fn test_encode_oversized_string_returns_error_not_panic() {
let oversized = "x".repeat(i16::MAX as usize + 1);
let request = FindCoordinatorRequest {
key: oversized,
key_type: 0,
};
let mut buf = BytesMut::new();
let result = request.encode_v1(&mut buf);
assert!(result.is_err(), "expected Err for oversized string");
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("exceeds"),
"error should mention size limit: {msg}"
);
}
#[test]
fn test_encode_oversized_topic_name_returns_error_not_panic() {
let oversized_topic = "x".repeat(i16::MAX as usize + 1);
let request = ProduceRequest {
transactional_id: None,
acks: -1,
timeout_ms: 30000,
topic_data: vec![ProduceTopicData {
name: oversized_topic,
topic_id: None,
partition_data: vec![],
}],
};
let mut buf = BytesMut::new();
let result = request.encode_v3(&mut buf);
assert!(result.is_err(), "expected Err for oversized topic name");
}
#[test]
fn test_encode_versioned_oversized_returns_error() {
let oversized = "x".repeat(i16::MAX as usize + 1);
let request = FindCoordinatorRequest {
key: oversized,
key_type: 0,
};
let mut buf = BytesMut::new();
let result = request.encode_versioned(1, &mut buf);
assert!(
result.is_err(),
"VersionedEncode must propagate encoding errors"
);
}
#[test]
fn test_metadata_request_v1_encodes_null_topics() {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
request.encode_versioned(1, &mut buf).unwrap();
let mut r = buf.freeze();
assert_eq!(i32::decode(&mut r).unwrap(), -1);
}
#[test]
fn test_metadata_request_v9_flexible_encoding() {
let request = MetadataRequest {
topics: Some(vec![MetadataRequestTopic {
topic_id: None,
name: Some("my-topic".to_string()),
}]),
allow_auto_topic_creation: true,
};
let mut buf = BytesMut::new();
request.encode_versioned(9, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[rstest]
#[case::metadata_v0(0)]
fn test_metadata_encode_below_min(#[case] version: i16) {
let request = MetadataRequest::all_topics();
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[test]
fn test_metadata_request_encode_v9_flexible() {
let request = MetadataRequest {
topics: Some(vec![MetadataRequestTopic {
topic_id: None,
name: Some("test-topic".to_string()),
}]),
allow_auto_topic_creation: false,
};
let mut buf_v8 = BytesMut::new();
request.encode_versioned(8, &mut buf_v8).unwrap();
let mut buf_v9 = BytesMut::new();
request.encode_versioned(9, &mut buf_v9).unwrap();
assert_ne!(
buf_v8.as_ref(),
buf_v9.as_ref(),
"v9 flexible should differ from v8"
);
}
#[rstest]
#[case::v10(10)]
#[case::v11(11)]
fn test_metadata_request_encode_v10_v11(#[case] version: i16) {
let request = MetadataRequest {
topics: Some(vec![MetadataRequestTopic {
topic_id: Some([0xAB; 16]),
name: Some("t".to_string()),
}]),
allow_auto_topic_creation: true,
};
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert!(!buf.is_empty());
}
#[rstest]
#[case::v12(12)]
#[case::v13(13)]
fn test_metadata_request_encode_v12_v13_topic_id(#[case] version: i16) {
let request = MetadataRequest {
topics: Some(vec![MetadataRequestTopic {
topic_id: Some([0xAB; 16]),
name: Some("t".to_string()),
}]),
allow_auto_topic_creation: false,
};
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
let mut buf2 = BytesMut::new();
request.encode_v12(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_metadata_response_decode_v9_flexible() {
let buf = build_metadata_response_v9_bytes();
let resp = MetadataResponse::decode_versioned(9, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 10);
assert_eq!(resp.brokers.len(), 1);
assert_eq!(resp.brokers[0].host, "broker-1");
assert_eq!(resp.brokers[0].port, 9092);
assert_eq!(resp.cluster_id.as_deref(), Some("cluster-abc"));
assert_eq!(resp.controller_id, 1);
assert_eq!(resp.topics.len(), 1);
assert_eq!(resp.topics[0].name.as_deref(), Some("test-topic"));
assert_eq!(resp.topics[0].partitions.len(), 1);
assert_eq!(resp.topics[0].partitions[0].leader_id, 1);
assert_eq!(resp.topics[0].partitions[0].leader_epoch, 5);
}
#[test]
fn test_metadata_response_decode_v10_topic_id() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(-1); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i16(0); let topic = b"t1";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
let uuid = [0xAB_u8; 16];
buf.put_slice(&uuid);
buf.put_u8(0); varint::encode_unsigned_varint(1, &mut buf);
buf.put_i32(0); varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(0); varint::encode_unsigned_varint(0, &mut buf);
let resp = MetadataResponse::decode_versioned(10, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].name.as_deref(), Some("t1"));
assert_eq!(resp.topics[0].topic_id, Some([0xAB; 16]));
}
#[test]
fn test_metadata_response_decode_v11_no_cluster_auth_ops() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(-1); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i16(0); let topic = b"t";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
buf.put_slice(&[0u8; 16]); buf.put_u8(0); varint::encode_unsigned_varint(1, &mut buf); buf.put_i32(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = MetadataResponse::decode_versioned(11, &mut buf.freeze()).unwrap();
assert_eq!(resp.topics[0].name.as_deref(), Some("t"));
}
#[test]
fn test_metadata_response_decode_v13_top_level_error_code() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(-1); varint::encode_unsigned_varint(1, &mut buf); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
let resp = MetadataResponse::decode_versioned(13, &mut buf.freeze()).unwrap();
assert!(resp.error_code.is_ok());
}
#[test]
fn test_metadata_response_decode_v12_uses_v11_decoder() {
let mut buf = BytesMut::new();
buf.put_i32(0);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(-1);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
let resp = MetadataResponse::decode_versioned(12, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert!(resp.topics.is_empty());
}
#[test]
fn test_api_versions_request_v5_fields() {
use crate::protocol::api::ApiVersionsRequest;
use crate::protocol::primitives::KafkaString;
let req = ApiVersionsRequest {
client_software_name: Some(KafkaString::new("krafka")),
client_software_version: Some(KafkaString::new("0.4.0")),
cluster_id: Some(KafkaString::new("my-cluster")),
node_id: 42,
};
let mut buf = BytesMut::new();
req.encode_v5(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf_v3 = BytesMut::new();
req.encode_v3(&mut buf_v3).unwrap();
assert!(
buf.len() > buf_v3.len(),
"v5 should be longer than v3 (cluster_id + node_id)"
);
}
}