use bytes::{Bytes, BytesMut};
use crate::owned::access_control_entry_record::AccessControlEntryRecord;
use crate::owned::begin_transaction_record::BeginTransactionRecord;
use crate::owned::broker_registration_change_record::BrokerRegistrationChangeRecord;
use crate::owned::client_quota_record::ClientQuotaRecord;
use crate::owned::config_record::ConfigRecord;
use crate::owned::delegation_token_record::DelegationTokenRecord;
use crate::owned::end_transaction_record::EndTransactionRecord;
use crate::owned::feature_level_record::FeatureLevelRecord;
use crate::owned::no_op_record::NoOpRecord;
use crate::owned::partition_record::PartitionRecord;
use crate::owned::register_broker_record::RegisterBrokerRecord;
use crate::owned::register_controller_record::RegisterControllerRecord;
use crate::owned::remove_access_control_entry_record::RemoveAccessControlEntryRecord;
use crate::owned::remove_delegation_token_record::RemoveDelegationTokenRecord;
use crate::owned::remove_topic_record::RemoveTopicRecord;
use crate::owned::remove_user_scram_credential_record::RemoveUserScramCredentialRecord;
use crate::owned::topic_record::TopicRecord;
use crate::owned::unregister_broker_record::UnregisterBrokerRecord;
use crate::owned::user_scram_credential_record::UserScramCredentialRecord;
use crate::records::metadata::envelope::{decode_value_header, encode_value};
use crate::{Decode, Encode, ProtocolError};
#[derive(Debug, Clone, PartialEq)]
pub enum KraftMetadataRecord {
RegisterBroker(RegisterBrokerRecord), UnregisterBroker(UnregisterBrokerRecord), Topic(TopicRecord), Partition(PartitionRecord), Config(ConfigRecord), RemoveTopic(RemoveTopicRecord), DelegationToken(DelegationTokenRecord), UserScramCredential(UserScramCredentialRecord), FeatureLevel(FeatureLevelRecord), ClientQuota(ClientQuotaRecord), BrokerRegistrationChange(BrokerRegistrationChangeRecord), AccessControlEntry(AccessControlEntryRecord), RemoveAccessControlEntry(RemoveAccessControlEntryRecord), NoOp(NoOpRecord), RemoveUserScramCredential(RemoveUserScramCredentialRecord), BeginTransaction(BeginTransactionRecord), EndTransaction(EndTransactionRecord), RemoveDelegationToken(RemoveDelegationTokenRecord), RegisterController(RegisterControllerRecord), Unknown {
api_key: u32,
api_version: u32,
body: Bytes,
},
}
fn api_version_to_u32(version: i16) -> u32 {
u32::try_from(version).unwrap_or(0)
}
fn api_version_to_i16(version: u32) -> Result<i16, ProtocolError> {
i16::try_from(version).map_err(|_| ProtocolError::SchemaMismatch("metadata record apiVersion"))
}
impl KraftMetadataRecord {
#[must_use]
pub fn api_key(&self) -> u32 {
match self {
Self::RegisterBroker(_) => 0,
Self::Topic(_) => 2,
Self::Partition(_) => 3,
Self::RemoveTopic(_) => 9,
Self::FeatureLevel(_) => 12,
Self::BrokerRegistrationChange(_) => 17,
Self::NoOp(_) => 20,
Self::BeginTransaction(_) => 23,
Self::EndTransaction(_) => 24,
Self::RegisterController(_) => 27,
Self::UnregisterBroker(_) => 1,
Self::Config(_) => 4,
Self::DelegationToken(_) => 10,
Self::UserScramCredential(_) => 11,
Self::ClientQuota(_) => 14,
Self::AccessControlEntry(_) => 18,
Self::RemoveAccessControlEntry(_) => 19,
Self::RemoveUserScramCredential(_) => 22,
Self::RemoveDelegationToken(_) => 26,
Self::Unknown { api_key, .. } => *api_key,
}
}
pub fn encode_value(&self, api_version: i16) -> Result<Bytes, ProtocolError> {
let api_key = self.api_key();
let v = api_version;
let mut body = BytesMut::new();
match self {
Self::RegisterBroker(r) => r.encode(&mut body, v)?,
Self::Topic(r) => r.encode(&mut body, v)?,
Self::Partition(r) => r.encode(&mut body, v)?,
Self::RemoveTopic(r) => r.encode(&mut body, v)?,
Self::BeginTransaction(r) => r.encode(&mut body, v)?,
Self::EndTransaction(r) => r.encode(&mut body, v)?,
Self::NoOp(r) => r.encode(&mut body, v)?,
Self::RegisterController(r) => r.encode(&mut body, v)?,
Self::BrokerRegistrationChange(r) => r.encode(&mut body, v)?,
Self::FeatureLevel(r) => r.encode(&mut body, v)?,
Self::UnregisterBroker(r) => r.encode(&mut body, v)?,
Self::Config(r) => r.encode(&mut body, v)?,
Self::DelegationToken(r) => r.encode(&mut body, v)?,
Self::UserScramCredential(r) => r.encode(&mut body, v)?,
Self::ClientQuota(r) => r.encode(&mut body, v)?,
Self::AccessControlEntry(r) => r.encode(&mut body, v)?,
Self::RemoveAccessControlEntry(r) => r.encode(&mut body, v)?,
Self::RemoveUserScramCredential(r) => r.encode(&mut body, v)?,
Self::RemoveDelegationToken(r) => r.encode(&mut body, v)?,
Self::Unknown { body: raw, .. } => {
return Ok(encode_value(api_key, api_version_to_u32(api_version), raw));
}
}
Ok(encode_value(
api_key,
api_version_to_u32(api_version),
&body,
))
}
pub fn decode_value(value: &[u8]) -> Result<(Self, i16), ProtocolError> {
let mut cur: &[u8] = value;
let hdr = decode_value_header(&mut cur)
.map_err(|_| ProtocolError::SchemaMismatch("metadata record envelope"))?;
let v = api_version_to_i16(hdr.api_version)?;
let rec = match hdr.api_key {
0 => Self::RegisterBroker(RegisterBrokerRecord::decode(&mut cur, v)?),
2 => Self::Topic(TopicRecord::decode(&mut cur, v)?),
3 => Self::Partition(PartitionRecord::decode(&mut cur, v)?),
9 => Self::RemoveTopic(RemoveTopicRecord::decode(&mut cur, v)?),
12 => Self::FeatureLevel(FeatureLevelRecord::decode(&mut cur, v)?),
17 => {
Self::BrokerRegistrationChange(BrokerRegistrationChangeRecord::decode(&mut cur, v)?)
}
20 => Self::NoOp(NoOpRecord::decode(&mut cur, v)?),
23 => Self::BeginTransaction(BeginTransactionRecord::decode(&mut cur, v)?),
24 => Self::EndTransaction(EndTransactionRecord::decode(&mut cur, v)?),
27 => Self::RegisterController(RegisterControllerRecord::decode(&mut cur, v)?),
1 => Self::UnregisterBroker(UnregisterBrokerRecord::decode(&mut cur, v)?),
4 => Self::Config(ConfigRecord::decode(&mut cur, v)?),
10 => Self::DelegationToken(DelegationTokenRecord::decode(&mut cur, v)?),
11 => Self::UserScramCredential(UserScramCredentialRecord::decode(&mut cur, v)?),
14 => Self::ClientQuota(ClientQuotaRecord::decode(&mut cur, v)?),
18 => Self::AccessControlEntry(AccessControlEntryRecord::decode(&mut cur, v)?),
19 => {
Self::RemoveAccessControlEntry(RemoveAccessControlEntryRecord::decode(&mut cur, v)?)
}
22 => Self::RemoveUserScramCredential(RemoveUserScramCredentialRecord::decode(
&mut cur, v,
)?),
26 => Self::RemoveDelegationToken(RemoveDelegationTokenRecord::decode(&mut cur, v)?),
other => {
return Ok((
Self::Unknown {
api_key: other,
api_version: hdr.api_version,
body: Bytes::copy_from_slice(cur),
},
v,
));
}
};
if !cur.is_empty() {
return Err(ProtocolError::SchemaMismatch(
"trailing bytes after metadata record body",
));
}
Ok((rec, v))
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn feature_level_record_value_roundtrips_through_dispatch() {
let rec = KraftMetadataRecord::FeatureLevel(FeatureLevelRecord {
name: "metadata.version".to_string(),
feature_level: 25,
..Default::default()
});
let value = rec.encode_value(0).expect("encode");
let (decoded, ver) = KraftMetadataRecord::decode_value(&value).expect("decode");
assert!(decoded == rec);
assert!(ver == 0);
assert!(decoded.encode_value(ver).expect("re-encode") == value);
}
#[test]
fn unknown_api_key_decodes_to_unknown_arm() {
use crate::records::metadata::envelope::encode_value;
let value = encode_value(99, 0, &[0xAB, 0xCD]);
let (decoded, ver) = KraftMetadataRecord::decode_value(&value).expect("decode");
assert!(ver == 0);
match &decoded {
KraftMetadataRecord::Unknown {
api_key,
api_version,
body,
} => {
assert!(*api_key == 99);
assert!(*api_version == 0);
assert!(body.as_ref() == &[0xAB, 0xCD]);
}
other => panic!("expected Unknown, got {other:?}"),
}
assert!(decoded.encode_value(ver).expect("re-encode") == value);
}
#[test]
fn config_record_value_roundtrips() {
use crate::owned::config_record::ConfigRecord;
let rec = KraftMetadataRecord::Config(ConfigRecord::default());
let (decoded, ver) =
KraftMetadataRecord::decode_value(&rec.encode_value(0).unwrap()).unwrap();
assert!(decoded.encode_value(ver).unwrap() == rec.encode_value(0).unwrap());
assert!(decoded.api_key() == 4);
}
#[test]
fn all_new_records_have_correct_api_keys() {
use crate::owned::{
access_control_entry_record::AccessControlEntryRecord,
client_quota_record::ClientQuotaRecord, config_record::ConfigRecord,
delegation_token_record::DelegationTokenRecord,
remove_access_control_entry_record::RemoveAccessControlEntryRecord,
remove_delegation_token_record::RemoveDelegationTokenRecord,
remove_user_scram_credential_record::RemoveUserScramCredentialRecord,
unregister_broker_record::UnregisterBrokerRecord,
user_scram_credential_record::UserScramCredentialRecord,
};
assert!(
KraftMetadataRecord::UnregisterBroker(UnregisterBrokerRecord::default()).api_key() == 1
);
assert!(KraftMetadataRecord::Config(ConfigRecord::default()).api_key() == 4);
assert!(
KraftMetadataRecord::DelegationToken(DelegationTokenRecord::default()).api_key() == 10
);
assert!(
KraftMetadataRecord::UserScramCredential(UserScramCredentialRecord::default())
.api_key()
== 11
);
assert!(KraftMetadataRecord::ClientQuota(ClientQuotaRecord::default()).api_key() == 14);
assert!(
KraftMetadataRecord::AccessControlEntry(AccessControlEntryRecord::default()).api_key()
== 18
);
assert!(
KraftMetadataRecord::RemoveAccessControlEntry(RemoveAccessControlEntryRecord::default())
.api_key() == 19
);
assert!(
KraftMetadataRecord::RemoveUserScramCredential(
RemoveUserScramCredentialRecord::default()
)
.api_key()
== 22
);
assert!(
KraftMetadataRecord::RemoveDelegationToken(RemoveDelegationTokenRecord::default())
.api_key()
== 26
);
}
}