use bytes::{Buf, BufMut};
use super::primitives::{Decode, Encode, KafkaArray, KafkaString, TaggedFields, TryEncode};
use crate::error::Result;
use crate::util::varint;
const MAX_SUPPORTED_FEATURES: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
#[repr(i16)]
pub enum ApiKey {
Produce = 0,
Fetch = 1,
ListOffsets = 2,
Metadata = 3,
LeaderAndIsr = 4,
StopReplica = 5,
UpdateMetadata = 6,
ControlledShutdown = 7,
OffsetCommit = 8,
OffsetFetch = 9,
FindCoordinator = 10,
JoinGroup = 11,
Heartbeat = 12,
LeaveGroup = 13,
SyncGroup = 14,
DescribeGroups = 15,
ListGroups = 16,
SaslHandshake = 17,
ApiVersions = 18,
CreateTopics = 19,
DeleteTopics = 20,
DeleteRecords = 21,
InitProducerId = 22,
OffsetForLeaderEpoch = 23,
AddPartitionsToTxn = 24,
AddOffsetsToTxn = 25,
EndTxn = 26,
WriteTxnMarkers = 27,
TxnOffsetCommit = 28,
DescribeAcls = 29,
CreateAcls = 30,
DeleteAcls = 31,
DescribeConfigs = 32,
AlterConfigs = 33,
AlterReplicaLogDirs = 34,
DescribeLogDirs = 35,
SaslAuthenticate = 36,
CreatePartitions = 37,
CreateDelegationToken = 38,
RenewDelegationToken = 39,
ExpireDelegationToken = 40,
DescribeDelegationToken = 41,
DeleteGroups = 42,
ElectLeaders = 43,
IncrementalAlterConfigs = 44,
AlterPartitionReassignments = 45,
ListPartitionReassignments = 46,
OffsetDelete = 47,
DescribeClientQuotas = 48,
AlterClientQuotas = 49,
DescribeUserScramCredentials = 50,
AlterUserScramCredentials = 51,
Vote = 52,
BeginQuorumEpoch = 53,
EndQuorumEpoch = 54,
DescribeQuorum = 55,
AlterPartition = 56,
UpdateFeatures = 57,
Envelope = 58,
FetchSnapshot = 59,
DescribeCluster = 60,
DescribeProducers = 61,
BrokerRegistration = 62,
BrokerHeartbeat = 63,
UnregisterBroker = 64,
DescribeTransactions = 65,
ListTransactions = 66,
AllocateProducerIds = 67,
ConsumerGroupHeartbeat = 68,
ConsumerGroupDescribe = 69,
GetTelemetrySubscriptions = 71,
PushTelemetry = 72,
ListClientMetricsResources = 74,
DescribeTopicPartitions = 75,
ShareGroupHeartbeat = 76,
ShareGroupDescribe = 77,
ShareFetch = 78,
ShareAcknowledge = 79,
Unknown(i16),
}
impl ApiKey {
#[inline]
pub fn from_i16(key: i16) -> Self {
match key {
0 => Self::Produce,
1 => Self::Fetch,
2 => Self::ListOffsets,
3 => Self::Metadata,
4 => Self::LeaderAndIsr,
5 => Self::StopReplica,
6 => Self::UpdateMetadata,
7 => Self::ControlledShutdown,
8 => Self::OffsetCommit,
9 => Self::OffsetFetch,
10 => Self::FindCoordinator,
11 => Self::JoinGroup,
12 => Self::Heartbeat,
13 => Self::LeaveGroup,
14 => Self::SyncGroup,
15 => Self::DescribeGroups,
16 => Self::ListGroups,
17 => Self::SaslHandshake,
18 => Self::ApiVersions,
19 => Self::CreateTopics,
20 => Self::DeleteTopics,
21 => Self::DeleteRecords,
22 => Self::InitProducerId,
23 => Self::OffsetForLeaderEpoch,
24 => Self::AddPartitionsToTxn,
25 => Self::AddOffsetsToTxn,
26 => Self::EndTxn,
27 => Self::WriteTxnMarkers,
28 => Self::TxnOffsetCommit,
29 => Self::DescribeAcls,
30 => Self::CreateAcls,
31 => Self::DeleteAcls,
32 => Self::DescribeConfigs,
33 => Self::AlterConfigs,
34 => Self::AlterReplicaLogDirs,
35 => Self::DescribeLogDirs,
36 => Self::SaslAuthenticate,
37 => Self::CreatePartitions,
38 => Self::CreateDelegationToken,
39 => Self::RenewDelegationToken,
40 => Self::ExpireDelegationToken,
41 => Self::DescribeDelegationToken,
42 => Self::DeleteGroups,
43 => Self::ElectLeaders,
44 => Self::IncrementalAlterConfigs,
45 => Self::AlterPartitionReassignments,
46 => Self::ListPartitionReassignments,
47 => Self::OffsetDelete,
48 => Self::DescribeClientQuotas,
49 => Self::AlterClientQuotas,
50 => Self::DescribeUserScramCredentials,
51 => Self::AlterUserScramCredentials,
52 => Self::Vote,
53 => Self::BeginQuorumEpoch,
54 => Self::EndQuorumEpoch,
55 => Self::DescribeQuorum,
56 => Self::AlterPartition,
57 => Self::UpdateFeatures,
58 => Self::Envelope,
59 => Self::FetchSnapshot,
60 => Self::DescribeCluster,
61 => Self::DescribeProducers,
62 => Self::BrokerRegistration,
63 => Self::BrokerHeartbeat,
64 => Self::UnregisterBroker,
65 => Self::DescribeTransactions,
66 => Self::ListTransactions,
67 => Self::AllocateProducerIds,
68 => Self::ConsumerGroupHeartbeat,
69 => Self::ConsumerGroupDescribe,
71 => Self::GetTelemetrySubscriptions,
72 => Self::PushTelemetry,
74 => Self::ListClientMetricsResources,
75 => Self::DescribeTopicPartitions,
76 => Self::ShareGroupHeartbeat,
77 => Self::ShareGroupDescribe,
78 => Self::ShareFetch,
79 => Self::ShareAcknowledge,
other => Self::Unknown(other),
}
}
#[inline]
pub fn to_i16(self) -> i16 {
match self {
Self::Produce => 0,
Self::Fetch => 1,
Self::ListOffsets => 2,
Self::Metadata => 3,
Self::LeaderAndIsr => 4,
Self::StopReplica => 5,
Self::UpdateMetadata => 6,
Self::ControlledShutdown => 7,
Self::OffsetCommit => 8,
Self::OffsetFetch => 9,
Self::FindCoordinator => 10,
Self::JoinGroup => 11,
Self::Heartbeat => 12,
Self::LeaveGroup => 13,
Self::SyncGroup => 14,
Self::DescribeGroups => 15,
Self::ListGroups => 16,
Self::SaslHandshake => 17,
Self::ApiVersions => 18,
Self::CreateTopics => 19,
Self::DeleteTopics => 20,
Self::DeleteRecords => 21,
Self::InitProducerId => 22,
Self::OffsetForLeaderEpoch => 23,
Self::AddPartitionsToTxn => 24,
Self::AddOffsetsToTxn => 25,
Self::EndTxn => 26,
Self::WriteTxnMarkers => 27,
Self::TxnOffsetCommit => 28,
Self::DescribeAcls => 29,
Self::CreateAcls => 30,
Self::DeleteAcls => 31,
Self::DescribeConfigs => 32,
Self::AlterConfigs => 33,
Self::AlterReplicaLogDirs => 34,
Self::DescribeLogDirs => 35,
Self::SaslAuthenticate => 36,
Self::CreatePartitions => 37,
Self::CreateDelegationToken => 38,
Self::RenewDelegationToken => 39,
Self::ExpireDelegationToken => 40,
Self::DescribeDelegationToken => 41,
Self::DeleteGroups => 42,
Self::ElectLeaders => 43,
Self::IncrementalAlterConfigs => 44,
Self::AlterPartitionReassignments => 45,
Self::ListPartitionReassignments => 46,
Self::OffsetDelete => 47,
Self::DescribeClientQuotas => 48,
Self::AlterClientQuotas => 49,
Self::DescribeUserScramCredentials => 50,
Self::AlterUserScramCredentials => 51,
Self::Vote => 52,
Self::BeginQuorumEpoch => 53,
Self::EndQuorumEpoch => 54,
Self::DescribeQuorum => 55,
Self::AlterPartition => 56,
Self::UpdateFeatures => 57,
Self::Envelope => 58,
Self::FetchSnapshot => 59,
Self::DescribeCluster => 60,
Self::DescribeProducers => 61,
Self::BrokerRegistration => 62,
Self::BrokerHeartbeat => 63,
Self::UnregisterBroker => 64,
Self::DescribeTransactions => 65,
Self::ListTransactions => 66,
Self::AllocateProducerIds => 67,
Self::ConsumerGroupHeartbeat => 68,
Self::ConsumerGroupDescribe => 69,
Self::GetTelemetrySubscriptions => 71,
Self::PushTelemetry => 72,
Self::ListClientMetricsResources => 74,
Self::DescribeTopicPartitions => 75,
Self::ShareGroupHeartbeat => 76,
Self::ShareGroupDescribe => 77,
Self::ShareFetch => 78,
Self::ShareAcknowledge => 79,
Self::Unknown(key) => key,
}
}
pub fn flexible_version(self) -> i16 {
match self {
Self::Produce => 9,
Self::Fetch => 12,
Self::ListOffsets => 6,
Self::Metadata => 9,
Self::LeaderAndIsr => 4,
Self::StopReplica => 2,
Self::UpdateMetadata => 6,
Self::ControlledShutdown => 3,
Self::OffsetCommit => 8,
Self::OffsetFetch => 6,
Self::FindCoordinator => 3,
Self::JoinGroup => 6,
Self::Heartbeat => 4,
Self::LeaveGroup => 4,
Self::SyncGroup => 4,
Self::DescribeGroups => 5,
Self::ListGroups => 3,
Self::SaslHandshake => i16::MAX,
Self::ApiVersions => 3,
Self::CreateTopics => 5,
Self::DeleteTopics => 4,
Self::DeleteRecords => 2,
Self::InitProducerId => 2,
Self::OffsetForLeaderEpoch => 4,
Self::AddPartitionsToTxn => 3,
Self::AddOffsetsToTxn => 3,
Self::EndTxn => 3,
Self::WriteTxnMarkers => 1,
Self::TxnOffsetCommit => 3,
Self::DescribeAcls => 2,
Self::CreateAcls => 2,
Self::DeleteAcls => 2,
Self::DescribeConfigs => 4,
Self::AlterConfigs => 2,
Self::AlterReplicaLogDirs => 2,
Self::DescribeLogDirs => 2,
Self::SaslAuthenticate => 2,
Self::CreatePartitions => 2,
Self::CreateDelegationToken => 2,
Self::RenewDelegationToken => 2,
Self::ExpireDelegationToken => 2,
Self::DescribeDelegationToken => 2,
Self::DeleteGroups => 2,
Self::ElectLeaders => 2,
Self::IncrementalAlterConfigs => 1,
Self::AlterPartitionReassignments => 0,
Self::ListPartitionReassignments => 0,
Self::OffsetDelete => i16::MAX,
Self::DescribeClientQuotas => 1,
Self::AlterClientQuotas => 1,
Self::DescribeUserScramCredentials => 0,
Self::AlterUserScramCredentials => 0,
Self::Vote => 0,
Self::BeginQuorumEpoch => 0,
Self::EndQuorumEpoch => 0,
Self::DescribeQuorum => 0,
Self::AlterPartition => 0,
Self::UpdateFeatures => 0,
Self::Envelope => 0,
Self::FetchSnapshot => 0,
Self::DescribeCluster => 0,
Self::DescribeProducers => 0,
Self::BrokerRegistration => 0,
Self::BrokerHeartbeat => 0,
Self::UnregisterBroker => 0,
Self::DescribeTransactions => 0,
Self::ListTransactions => 0,
Self::AllocateProducerIds => 0,
Self::ConsumerGroupHeartbeat => 0,
Self::ConsumerGroupDescribe => 0,
Self::GetTelemetrySubscriptions => 0,
Self::PushTelemetry => 0,
Self::ListClientMetricsResources => 0,
Self::DescribeTopicPartitions => 0,
Self::ShareGroupHeartbeat => 0,
Self::ShareGroupDescribe => 0,
Self::ShareFetch => 0,
Self::ShareAcknowledge => 0,
Self::Unknown(_) => i16::MAX,
}
}
}
impl From<i16> for ApiKey {
fn from(key: i16) -> Self {
Self::from_i16(key)
}
}
impl std::fmt::Display for ApiKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unknown(key) => write!(f, "Unknown({key})"),
other => std::fmt::Debug::fmt(other, f),
}
}
}
impl From<ApiKey> for i16 {
fn from(key: ApiKey) -> Self {
key.to_i16()
}
}
impl Encode for ApiKey {
fn encode(&self, buf: &mut impl BufMut) {
self.to_i16().encode(buf);
}
}
impl TryEncode for ApiKey {
#[inline]
fn try_encode(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode(buf);
Ok(())
}
}
impl Decode for ApiKey {
fn decode(buf: &mut impl Buf) -> Result<Self> {
Ok(Self::from_i16(i16::decode(buf)?))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ApiVersionRange {
pub api_key: ApiKey,
pub min_version: i16,
pub max_version: i16,
}
impl ApiVersionRange {
pub fn new(api_key: ApiKey, min_version: i16, max_version: i16) -> Self {
Self {
api_key,
min_version,
max_version,
}
}
pub fn supports(&self, version: i16) -> bool {
version >= self.min_version && version <= self.max_version
}
#[inline]
pub fn negotiate(&self, client_max: i16, client_min: i16) -> Option<i16> {
let max_supported = self.max_version.min(client_max);
let min_supported = self.min_version.max(client_min);
if max_supported >= min_supported {
Some(max_supported)
} else {
None
}
}
#[inline]
pub fn negotiate_max(&self, client_max: i16) -> Option<i16> {
self.negotiate(client_max, 0)
}
}
impl Encode for ApiVersionRange {
fn encode(&self, buf: &mut impl BufMut) {
self.api_key.encode(buf);
self.min_version.encode(buf);
self.max_version.encode(buf);
}
fn encode_compact(&self, buf: &mut impl BufMut) {
self.encode(buf);
varint::encode_unsigned_varint(0, buf);
}
}
impl TryEncode for ApiVersionRange {
fn try_encode(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode(buf);
Ok(())
}
fn try_encode_compact(&self, buf: &mut impl BufMut) -> Result<()> {
self.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl Decode for ApiVersionRange {
fn decode(buf: &mut impl Buf) -> Result<Self> {
Ok(Self {
api_key: ApiKey::decode(buf)?,
min_version: i16::decode(buf)?,
max_version: i16::decode(buf)?,
})
}
fn decode_compact(buf: &mut impl Buf) -> Result<Self> {
let result = Self {
api_key: ApiKey::decode(buf)?,
min_version: i16::decode(buf)?,
max_version: i16::decode(buf)?,
};
let _ = TaggedFields::decode(buf)?;
Ok(result)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SupportedFeature {
pub name: String,
pub min_version: i16,
pub max_version: i16,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FinalizedFeature {
pub name: String,
pub max_version_level: i16,
pub min_version_level: i16,
}
#[derive(Debug, Clone)]
pub struct ApiVersionsRequest {
pub client_software_name: Option<KafkaString>,
pub client_software_version: Option<KafkaString>,
pub cluster_id: Option<KafkaString>,
pub node_id: i32,
}
impl Default for ApiVersionsRequest {
fn default() -> Self {
Self {
client_software_name: None,
client_software_version: None,
cluster_id: None,
node_id: -1,
}
}
}
impl ApiVersionsRequest {
pub fn new() -> Self {
Self::default()
}
pub fn with_client_software(mut self, name: &str, version: &str) -> Self {
self.client_software_name = Some(KafkaString::new(name));
self.client_software_version = Some(KafkaString::new(version));
self
}
pub fn api_key() -> ApiKey {
ApiKey::ApiVersions
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
let _ = buf;
Ok(())
}
pub fn encode_v3(&self, buf: &mut impl BufMut) -> Result<()> {
if let Some(ref name) = self.client_software_name {
name.try_encode_compact(buf)?;
} else {
KafkaString::null().try_encode_compact(buf)?;
}
if let Some(ref version) = self.client_software_version {
version.try_encode_compact(buf)?;
} else {
KafkaString::null().try_encode_compact(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v5(&self, buf: &mut impl BufMut) -> Result<()> {
if let Some(ref name) = self.client_software_name {
name.try_encode_compact(buf)?;
} else {
KafkaString::null().try_encode_compact(buf)?;
}
if let Some(ref version) = self.client_software_version {
version.try_encode_compact(buf)?;
} else {
KafkaString::null().try_encode_compact(buf)?;
}
if let Some(ref cluster_id) = self.cluster_id {
cluster_id.try_encode_compact(buf)?;
} else {
KafkaString::null().try_encode_compact(buf)?;
}
self.node_id.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct ApiVersionsResponse {
pub error_code: i16,
pub api_keys: Vec<ApiVersionRange>,
pub throttle_time_ms: i32,
pub supported_features: Vec<SupportedFeature>,
pub finalized_features_epoch: i64,
pub finalized_features: Vec<FinalizedFeature>,
}
impl ApiVersionsResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let error_code = i16::decode(buf)?;
let api_keys = KafkaArray::<ApiVersionRange>::decode(buf)?
.0
.unwrap_or_default();
Ok(Self {
error_code,
api_keys,
throttle_time_ms: 0,
supported_features: Vec::new(),
finalized_features_epoch: -1,
finalized_features: Vec::new(),
})
}
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let error_code = i16::decode(buf)?;
let api_keys = KafkaArray::<ApiVersionRange>::decode(buf)?
.0
.unwrap_or_default();
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
error_code,
api_keys,
throttle_time_ms,
supported_features: Vec::new(),
finalized_features_epoch: -1,
finalized_features: Vec::new(),
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let error_code = i16::decode(buf)?;
let api_keys = KafkaArray::<ApiVersionRange>::decode_compact(buf)?
.0
.unwrap_or_default();
let throttle_time_ms = i32::decode(buf)?;
let tagged = TaggedFields::decode(buf)?;
let supported_features = Self::parse_supported_features(&tagged)?;
let finalized_features_epoch = Self::parse_finalized_features_epoch(&tagged)?;
let finalized_features = if finalized_features_epoch >= 0 {
Self::parse_finalized_features(&tagged)?
} else {
Vec::new()
};
Ok(Self {
error_code,
api_keys,
throttle_time_ms,
supported_features,
finalized_features_epoch,
finalized_features,
})
}
fn parse_supported_features(tagged: &TaggedFields) -> Result<Vec<SupportedFeature>> {
let Some(field) = tagged.0.iter().find(|f| f.tag == 0) else {
return Ok(Vec::new());
};
let mut buf = &field.data[..];
let raw_count = crate::util::varint::decode_unsigned_varint(&mut buf)?;
let items = super::check_compact_array_len(raw_count)?;
if items > MAX_SUPPORTED_FEATURES {
return Err(crate::error::KrafkaError::protocol_kind(
crate::error::ProtocolErrorKind::InvalidLength,
format!(
"SupportedFeatures array length {items} exceeds limit {MAX_SUPPORTED_FEATURES}"
),
));
}
let mut features = Vec::with_capacity(items);
for _ in 0..items {
let name = super::non_nullable_string(
"feature name",
KafkaString::decode_compact(&mut buf)?.0,
)?;
let min_version = i16::decode(&mut buf)?;
let max_version = i16::decode(&mut buf)?;
let _ = TaggedFields::decode(&mut buf)?;
features.push(SupportedFeature {
name,
min_version,
max_version,
});
}
if buf.has_remaining() {
return Err(crate::error::KrafkaError::protocol_kind(
crate::error::ProtocolErrorKind::Malformed,
format!(
"SupportedFeatures: {} trailing bytes after parsing {items} entries",
buf.remaining()
),
));
}
Ok(features)
}
fn parse_finalized_features_epoch(tagged: &TaggedFields) -> Result<i64> {
let Some(field) = tagged.0.iter().find(|f| f.tag == 1) else {
return Ok(-1);
};
let bytes: [u8; 8] = field.data[..].try_into().map_err(|_| {
crate::error::KrafkaError::protocol_kind(
crate::error::ProtocolErrorKind::InvalidLength,
format!(
"FinalizedFeaturesEpoch (tag 1) has invalid length {}, expected 8",
field.data.len()
),
)
})?;
Ok(i64::from_be_bytes(bytes))
}
fn parse_finalized_features(tagged: &TaggedFields) -> Result<Vec<FinalizedFeature>> {
let Some(field) = tagged.0.iter().find(|f| f.tag == 2) else {
return Ok(Vec::new());
};
let mut buf = &field.data[..];
let raw_count = crate::util::varint::decode_unsigned_varint(&mut buf)?;
let items = super::check_compact_array_len(raw_count)?;
if items > MAX_SUPPORTED_FEATURES {
return Err(crate::error::KrafkaError::protocol_kind(
crate::error::ProtocolErrorKind::InvalidLength,
format!(
"FinalizedFeatures array length {items} exceeds limit {MAX_SUPPORTED_FEATURES}"
),
));
}
let mut features = Vec::with_capacity(items);
for _ in 0..items {
let name = super::non_nullable_string(
"finalized feature name",
KafkaString::decode_compact(&mut buf)?.0,
)?;
let max_version_level = i16::decode(&mut buf)?;
let min_version_level = i16::decode(&mut buf)?;
let _ = TaggedFields::decode(&mut buf)?;
features.push(FinalizedFeature {
name,
max_version_level,
min_version_level,
});
}
if buf.has_remaining() {
return Err(crate::error::KrafkaError::protocol_kind(
crate::error::ProtocolErrorKind::Malformed,
format!(
"FinalizedFeatures: {} trailing bytes after parsing {items} entries",
buf.remaining()
),
));
}
Ok(features)
}
pub fn get_api_version(&self, api_key: ApiKey) -> Option<&ApiVersionRange> {
self.api_keys.iter().find(|v| v.api_key == api_key)
}
pub fn get_supported_feature(&self, name: &str) -> Option<&SupportedFeature> {
self.supported_features.iter().find(|f| f.name == name)
}
pub fn get_finalized_feature(&self, name: &str) -> Option<&FinalizedFeature> {
self.finalized_features.iter().find(|f| f.name == name)
}
pub fn supports(&self, api_key: ApiKey, version: i16) -> bool {
self.get_api_version(api_key)
.is_some_and(|v| v.supports(version))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use bytes::BytesMut;
use super::*;
use crate::protocol::primitives::TaggedField;
#[test]
fn test_api_key_roundtrip() {
let keys = [
ApiKey::Produce,
ApiKey::Fetch,
ApiKey::Metadata,
ApiKey::ApiVersions,
];
for key in keys {
let mut buf = BytesMut::new();
key.encode(&mut buf);
let decoded = ApiKey::decode(&mut buf.freeze()).unwrap();
assert_eq!(decoded, key);
}
}
#[test]
fn test_api_version_range() {
let range = ApiVersionRange::new(ApiKey::Produce, 0, 9);
assert!(range.supports(0));
assert!(range.supports(5));
assert!(range.supports(9));
assert!(!range.supports(-1));
assert!(!range.supports(10));
}
#[test]
fn test_api_version_range_encode_decode() {
let range = ApiVersionRange::new(ApiKey::Fetch, 0, 13);
let mut buf = BytesMut::new();
range.encode(&mut buf);
let decoded = ApiVersionRange::decode(&mut buf.freeze()).unwrap();
assert_eq!(decoded.api_key, ApiKey::Fetch);
assert_eq!(decoded.min_version, 0);
assert_eq!(decoded.max_version, 13);
}
#[test]
fn test_api_versions_request() {
let request =
ApiVersionsRequest::new().with_client_software("krafka", env!("CARGO_PKG_VERSION"));
assert_eq!(
request.client_software_name.as_ref().unwrap().as_str(),
Some("krafka")
);
assert_eq!(
request.client_software_version.as_ref().unwrap().as_str(),
Some(env!("CARGO_PKG_VERSION"))
);
}
#[test]
fn test_api_versions_response() {
let response = ApiVersionsResponse {
error_code: 0,
api_keys: vec![
ApiVersionRange::new(ApiKey::Produce, 0, 9),
ApiVersionRange::new(ApiKey::Fetch, 0, 13),
],
throttle_time_ms: 0,
supported_features: Vec::new(),
finalized_features_epoch: -1,
finalized_features: Vec::new(),
};
assert!(response.supports(ApiKey::Produce, 5));
assert!(!response.supports(ApiKey::Produce, 10));
assert!(response.supports(ApiKey::Fetch, 13));
assert!(!response.supports(ApiKey::Metadata, 0));
}
#[test]
fn test_negotiate_version() {
let range = ApiVersionRange::new(ApiKey::Fetch, 0, 12);
assert_eq!(range.negotiate(11, 4), Some(11));
let range = ApiVersionRange::new(ApiKey::Produce, 4, 8);
assert_eq!(range.negotiate(6, 0), Some(6));
let range = ApiVersionRange::new(ApiKey::Metadata, 0, 3);
assert_eq!(range.negotiate(10, 5), None);
let range = ApiVersionRange::new(ApiKey::Heartbeat, 2, 2);
assert_eq!(range.negotiate(2, 2), Some(2));
let range = ApiVersionRange::new(ApiKey::Fetch, 0, 12);
assert_eq!(range.negotiate_max(8), Some(8));
assert_eq!(range.negotiate_max(15), Some(12)); }
#[test]
fn test_api_key_display() {
assert_eq!(ApiKey::Produce.to_string(), "Produce");
assert_eq!(ApiKey::Fetch.to_string(), "Fetch");
assert_eq!(ApiKey::Unknown(999).to_string(), "Unknown(999)");
}
#[test]
fn test_api_versions_request_v3_round_trip() {
let request =
ApiVersionsRequest::new().with_client_software("krafka", env!("CARGO_PKG_VERSION"));
let mut buf = BytesMut::new();
request.encode_v3(&mut buf).unwrap();
let mut buf2 = BytesMut::new();
request.encode_v3(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_api_versions_response_decode_v3_no_tagged_features() {
use crate::util::varint;
let mut buf = BytesMut::new();
buf.put_i16(0); varint::encode_unsigned_varint(1, &mut buf);
buf.put_i32(0); buf.put_u8(0); let mut data = buf.freeze();
let resp = ApiVersionsResponse::decode_v3(&mut data).unwrap();
assert_eq!(resp.error_code, 0);
assert!(resp.api_keys.is_empty());
assert!(resp.supported_features.is_empty());
}
#[test]
fn test_api_versions_response_decode_v3_with_supported_features() {
use crate::util::varint;
let mut buf = BytesMut::new();
buf.put_i16(0); varint::encode_unsigned_varint(2, &mut buf);
buf.put_i16(0); buf.put_i16(0); buf.put_i16(9); buf.put_u8(0);
buf.put_i32(0);
let mut tag_data = BytesMut::new();
varint::encode_unsigned_varint(3, &mut tag_data);
let name1 = b"metadata.version";
varint::encode_unsigned_varint(name1.len() as u32 + 1, &mut tag_data);
tag_data.put_slice(name1);
tag_data.put_i16(1); tag_data.put_i16(20); tag_data.put_u8(0); let name2 = b"kraft.version";
varint::encode_unsigned_varint(name2.len() as u32 + 1, &mut tag_data);
tag_data.put_slice(name2);
tag_data.put_i16(0); tag_data.put_i16(1); tag_data.put_u8(0);
varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(tag_data.len() as u32, &mut buf);
buf.extend_from_slice(&tag_data);
let mut data = buf.freeze();
let resp = ApiVersionsResponse::decode_v3(&mut data).unwrap();
assert_eq!(resp.supported_features.len(), 2);
assert_eq!(resp.supported_features[0].name, "metadata.version");
assert_eq!(resp.supported_features[0].min_version, 1);
assert_eq!(resp.supported_features[0].max_version, 20);
assert_eq!(resp.supported_features[1].name, "kraft.version");
assert_eq!(resp.supported_features[1].min_version, 0);
assert_eq!(resp.supported_features[1].max_version, 1);
let feat = resp.get_supported_feature("kraft.version").unwrap();
assert_eq!(feat.min_version, 0);
assert_eq!(feat.max_version, 1);
assert!(resp.get_supported_feature("nonexistent").is_none());
}
#[test]
fn test_api_versions_response_decode_v0_no_features() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i32(0); let mut data = buf.freeze();
let resp = ApiVersionsResponse::decode_v0(&mut data).unwrap();
assert!(resp.supported_features.is_empty());
assert_eq!(resp.throttle_time_ms, 0);
}
#[test]
fn test_parse_supported_features_null_rejected() {
use crate::util::varint;
let mut tag_data = BytesMut::new();
varint::encode_unsigned_varint(0, &mut tag_data);
let tagged = TaggedFields(vec![TaggedField {
tag: 0,
data: tag_data.freeze(),
}]);
let err = ApiVersionsResponse::parse_supported_features(&tagged).unwrap_err();
assert!(
err.to_string().contains("null"),
"expected null rejection, got: {err}"
);
}
#[test]
fn test_parse_supported_features_exceeds_max() {
use crate::util::varint;
let mut tag_data = BytesMut::new();
varint::encode_unsigned_varint(258, &mut tag_data); for i in 0..257u16 {
let name = format!("f{i}");
varint::encode_unsigned_varint(name.len() as u32 + 1, &mut tag_data);
tag_data.put_slice(name.as_bytes());
tag_data.put_i16(0); tag_data.put_i16(1); tag_data.put_u8(0); }
let tagged = TaggedFields(vec![TaggedField {
tag: 0,
data: tag_data.freeze(),
}]);
let err = ApiVersionsResponse::parse_supported_features(&tagged).unwrap_err();
assert!(
err.to_string().contains("exceeds limit"),
"expected limit error, got: {err}"
);
}
#[test]
fn test_parse_supported_features_trailing_bytes() {
use crate::util::varint;
let mut tag_data = BytesMut::new();
varint::encode_unsigned_varint(2, &mut tag_data);
let name = b"test.feature";
varint::encode_unsigned_varint(name.len() as u32 + 1, &mut tag_data);
tag_data.put_slice(name);
tag_data.put_i16(0); tag_data.put_i16(1); tag_data.put_u8(0); tag_data.put_u8(0xFF);
let tagged = TaggedFields(vec![TaggedField {
tag: 0,
data: tag_data.freeze(),
}]);
let err = ApiVersionsResponse::parse_supported_features(&tagged).unwrap_err();
assert!(
err.to_string().contains("trailing bytes"),
"expected trailing bytes error, got: {err}"
);
}
#[test]
fn test_parse_finalized_features_epoch_present() {
let mut epoch_bytes = BytesMut::new();
epoch_bytes.put_i64(42);
let tagged = TaggedFields(vec![TaggedField {
tag: 1,
data: epoch_bytes.freeze(),
}]);
assert_eq!(
ApiVersionsResponse::parse_finalized_features_epoch(&tagged).unwrap(),
42
);
}
#[test]
fn test_parse_finalized_features_epoch_absent() {
let tagged = TaggedFields(vec![]);
assert_eq!(
ApiVersionsResponse::parse_finalized_features_epoch(&tagged).unwrap(),
-1
);
}
#[test]
fn test_parse_finalized_features_epoch_short_data() {
let tagged = TaggedFields(vec![TaggedField {
tag: 1,
data: bytes::Bytes::from_static(&[0, 0, 0]),
}]);
let err = ApiVersionsResponse::parse_finalized_features_epoch(&tagged).unwrap_err();
assert!(
err.to_string().contains("invalid length 3"),
"expected error mentioning invalid length, got: {err}"
);
}
#[test]
fn test_parse_finalized_features_epoch_too_long() {
let tagged = TaggedFields(vec![TaggedField {
tag: 1,
data: bytes::Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 42, 0xFF]),
}]);
let err = ApiVersionsResponse::parse_finalized_features_epoch(&tagged).unwrap_err();
assert!(
err.to_string().contains("invalid length 9"),
"expected error mentioning invalid length, got: {err}"
);
}
#[test]
fn test_parse_finalized_features_present() {
use crate::util::varint;
let mut tag_data = BytesMut::new();
varint::encode_unsigned_varint(2, &mut tag_data);
let name = b"metadata.version";
varint::encode_unsigned_varint(name.len() as u32 + 1, &mut tag_data);
tag_data.put_slice(name);
tag_data.put_i16(17); tag_data.put_i16(1); tag_data.put_u8(0);
let tagged = TaggedFields(vec![TaggedField {
tag: 2,
data: tag_data.freeze(),
}]);
let features = ApiVersionsResponse::parse_finalized_features(&tagged).unwrap();
assert_eq!(features.len(), 1);
assert_eq!(features[0].name, "metadata.version");
assert_eq!(features[0].max_version_level, 17);
assert_eq!(features[0].min_version_level, 1);
}
#[test]
fn test_parse_finalized_features_absent() {
let tagged = TaggedFields(vec![]);
let features = ApiVersionsResponse::parse_finalized_features(&tagged).unwrap();
assert!(features.is_empty());
}
#[test]
fn test_api_versions_response_v3_all_feature_tags() {
use crate::util::varint;
let mut buf = BytesMut::new();
buf.put_i16(0); varint::encode_unsigned_varint(1, &mut buf); buf.put_i32(0);
let mut tag0 = BytesMut::new();
varint::encode_unsigned_varint(2, &mut tag0); let n0 = b"metadata.version";
varint::encode_unsigned_varint(n0.len() as u32 + 1, &mut tag0);
tag0.put_slice(n0);
tag0.put_i16(1);
tag0.put_i16(20);
tag0.put_u8(0);
let mut tag1 = BytesMut::new();
tag1.put_i64(99);
let mut tag2 = BytesMut::new();
varint::encode_unsigned_varint(2, &mut tag2); let n2 = b"metadata.version";
varint::encode_unsigned_varint(n2.len() as u32 + 1, &mut tag2);
tag2.put_slice(n2);
tag2.put_i16(17); tag2.put_i16(1); tag2.put_u8(0);
varint::encode_unsigned_varint(3, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(tag0.len() as u32, &mut buf);
buf.extend_from_slice(&tag0);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(tag1.len() as u32, &mut buf);
buf.extend_from_slice(&tag1);
varint::encode_unsigned_varint(2, &mut buf);
varint::encode_unsigned_varint(tag2.len() as u32, &mut buf);
buf.extend_from_slice(&tag2);
let mut data = buf.freeze();
let resp = ApiVersionsResponse::decode_v3(&mut data).unwrap();
assert_eq!(resp.supported_features.len(), 1);
assert_eq!(resp.supported_features[0].name, "metadata.version");
assert_eq!(resp.supported_features[0].max_version, 20);
assert_eq!(resp.finalized_features_epoch, 99);
assert_eq!(resp.finalized_features.len(), 1);
let ff = resp.get_finalized_feature("metadata.version").unwrap();
assert_eq!(ff.max_version_level, 17);
assert_eq!(ff.min_version_level, 1);
assert!(resp.get_finalized_feature("nonexistent").is_none());
}
#[test]
fn test_api_versions_response_v0_defaults_finalized() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i32(0); let mut data = buf.freeze();
let resp = ApiVersionsResponse::decode_v0(&mut data).unwrap();
assert_eq!(resp.finalized_features_epoch, -1);
assert!(resp.finalized_features.is_empty());
}
#[test]
fn test_api_versions_response_v3_finalized_features_ignored_when_epoch_absent() {
use crate::util::varint;
let mut buf = BytesMut::new();
buf.put_i16(0); varint::encode_unsigned_varint(1, &mut buf); buf.put_i32(0);
let mut tag2 = BytesMut::new();
varint::encode_unsigned_varint(2, &mut tag2); let name = b"metadata.version";
varint::encode_unsigned_varint(name.len() as u32 + 1, &mut tag2);
tag2.put_slice(name);
tag2.put_i16(17); tag2.put_i16(1); tag2.put_u8(0);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(2, &mut buf); varint::encode_unsigned_varint(tag2.len() as u32, &mut buf);
buf.extend_from_slice(&tag2);
let mut data = buf.freeze();
let resp = ApiVersionsResponse::decode_v3(&mut data).unwrap();
assert_eq!(resp.finalized_features_epoch, -1);
assert!(resp.finalized_features.is_empty());
}
}