use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub type NodeId = u64;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TopicRecord {
pub name: String,
pub topic_id: Uuid,
pub partitions: i32,
pub replication_factor: i16,
}
fn default_partition_epoch() -> i32 {
-1
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct PartitionRecord {
pub topic: String,
pub partition: i32,
pub leader: NodeId,
pub replicas: Vec<NodeId>,
pub isr: Vec<NodeId>,
pub leader_epoch: i32,
pub adding_replicas: Vec<NodeId>,
pub removing_replicas: Vec<NodeId>,
pub directories: Vec<Uuid>,
#[serde(default = "default_partition_epoch")]
pub partition_epoch: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PartitionDirAssignmentRecord {
pub topic: String,
pub partition: i32,
pub replica: NodeId,
pub directory: Uuid,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokerEndpoint {
pub name: String,
pub host: String,
pub port: u16,
pub protocol: crabka_security::ListenerProtocol,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokerRegistrationRecord {
pub node_id: NodeId,
pub broker_epoch: i64,
#[serde(default)]
pub incarnation_id: uuid::Uuid,
pub host: String,
pub port: u16,
pub rack: Option<String>,
pub endpoints: Vec<BrokerEndpoint>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteTopicRecord {
pub name: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct UnregisterBrokerRecord {
pub node_id: NodeId,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TopicConfigRecord {
pub topic: String,
pub overrides: std::collections::BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokerConfigRecord {
pub node_id: NodeId,
pub config_name: String,
pub config_value: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClientMetricsConfigRecord {
pub name: String,
pub configs: std::collections::BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct QuotaEntity {
pub entity_type: String,
pub entity_name: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ClientQuotaRecord {
pub entity: Vec<QuotaEntity>,
pub config_key: String,
pub config_value: Option<f64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ScramCredentialRecord {
pub user: String,
pub mechanism: crabka_security::SaslMechanism,
pub salt: Vec<u8>,
pub stored_key: Vec<u8>,
pub server_key: Vec<u8>,
pub iterations: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteScramCredentialRecord {
pub user: String,
pub mechanism: crabka_security::SaslMechanism,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DelegationTokenRecord {
pub token_id: String,
pub owner: crabka_security::KafkaPrincipal,
pub hmac: Vec<u8>,
pub issue_timestamp_ms: i64,
pub expiry_timestamp_ms: i64,
pub max_timestamp_ms: i64,
pub renewers: Vec<crabka_security::KafkaPrincipal>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteDelegationTokenRecord {
pub token_id: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct KRaftVersionRecord {
pub kraft_version: u16,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct VotersRecord {
pub voters: crate::voters::VoterSet,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FeatureLevelRecord {
pub name: String,
pub level: i16,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct FeaturesEpochRecord {
pub epoch: i64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum MetadataRecord {
V1Topic(TopicRecord),
V1Partition(PartitionRecord),
V1BrokerRegistration(BrokerRegistrationRecord),
V1DeleteTopic(DeleteTopicRecord),
V1TopicConfig(TopicConfigRecord),
V1ScramCredential(ScramCredentialRecord),
V1DeleteScramCredential(DeleteScramCredentialRecord),
V1AccessControlEntry(crate::AclEntry),
V1DeleteAccessControlEntry(crate::AclEntryFilter),
V1BrokerConfig(BrokerConfigRecord),
V1ClientQuota(ClientQuotaRecord),
V1DelegationToken(DelegationTokenRecord),
V1DeleteDelegationToken(DeleteDelegationTokenRecord),
V1UnregisterBroker(UnregisterBrokerRecord),
V1KRaftVersion(KRaftVersionRecord),
V1Voters(VotersRecord),
V1FeatureLevel(FeatureLevelRecord),
V1ClientMetricsConfig(ClientMetricsConfigRecord),
V1FeaturesEpoch(FeaturesEpochRecord),
V1PartitionDirAssignment(PartitionDirAssignmentRecord),
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use serde_wincode::SerdeCompat;
use wincode::{Deserialize as _, Serialize as _};
fn round_trip(r: &MetadataRecord) -> MetadataRecord {
let bytes = <SerdeCompat<MetadataRecord>>::serialize(r).unwrap();
<SerdeCompat<MetadataRecord>>::deserialize(&bytes).unwrap()
}
#[test]
fn feature_level_round_trip() {
let r = MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
name: "metadata.version".into(),
level: 1,
});
assert!(round_trip(&r) == r);
}
#[test]
fn features_epoch_round_trip() {
let r = MetadataRecord::V1FeaturesEpoch(FeaturesEpochRecord { epoch: 7 });
assert!(round_trip(&r) == r);
}
#[test]
fn topic_record_round_trip() {
let r = MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id: Uuid::new_v4(),
partitions: 3,
replication_factor: 1,
});
assert!(round_trip(&r) == r);
}
#[test]
fn partition_record_round_trip() {
let r = MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1, 2, 3],
isr: vec![1, 2],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![Uuid::from_u128(1), Uuid::from_u128(2), Uuid::nil()],
partition_epoch: 0,
});
assert!(round_trip(&r) == r);
}
#[test]
fn partition_dir_assignment_round_trip() {
let r = MetadataRecord::V1PartitionDirAssignment(PartitionDirAssignmentRecord {
topic: "t".into(),
partition: 2,
replica: 3,
directory: Uuid::from_u128(0xAB),
});
assert!(round_trip(&r) == r);
}
#[test]
fn broker_registration_round_trip() {
let r = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
node_id: 7,
broker_epoch: 0,
incarnation_id: Uuid::from_u128(0xdeadbeef_cafe_babe_0123_456789abcdef),
host: "192.168.1.10".into(),
port: 9092,
rack: Some("us-east-1a".into()),
endpoints: vec![],
});
assert!(round_trip(&r) == r);
}
#[test]
fn broker_registration_with_endpoints_round_trip() {
let r = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
node_id: 1,
broker_epoch: 0,
incarnation_id: Uuid::from_u128(0xfeedface_0000_0000_0000_000000000001),
host: "h".into(),
port: 9092,
rack: None,
endpoints: vec![BrokerEndpoint {
name: "EXTERNAL".into(),
host: "ext.example.com".into(),
port: 9092,
protocol: crabka_security::ListenerProtocol::SaslSsl,
}],
});
assert!(round_trip(&r) == r);
}
#[test]
fn delete_topic_round_trip() {
let r = MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
name: "doomed".into(),
});
assert!(round_trip(&r) == r);
}
#[test]
fn unregister_broker_round_trip() {
let r = MetadataRecord::V1UnregisterBroker(UnregisterBrokerRecord { node_id: 42 });
assert!(round_trip(&r) == r);
}
#[test]
fn topic_config_record_round_trip() {
let mut overrides = std::collections::BTreeMap::new();
overrides.insert("retention.ms".to_string(), "60000".to_string());
overrides.insert("segment.bytes".to_string(), "1048576".to_string());
let r = MetadataRecord::V1TopicConfig(TopicConfigRecord {
topic: "t".into(),
overrides,
});
assert!(round_trip(&r) == r);
}
#[test]
fn scram_credential_round_trip() {
let r = MetadataRecord::V1ScramCredential(ScramCredentialRecord {
user: "alice".into(),
mechanism: crabka_security::SaslMechanism::ScramSha512,
salt: vec![1u8; 16],
stored_key: vec![2u8; 64],
server_key: vec![3u8; 64],
iterations: 4096,
});
assert!(round_trip(&r) == r);
}
#[test]
fn delete_scram_credential_round_trip() {
let r = MetadataRecord::V1DeleteScramCredential(DeleteScramCredentialRecord {
user: "alice".into(),
mechanism: crabka_security::SaslMechanism::ScramSha512,
});
assert!(round_trip(&r) == r);
}
#[test]
fn v1_access_control_entry_round_trip() {
let entry = crate::AclEntry {
resource_type: crate::ResourceType::Topic,
resource_name: "foo".into(),
pattern_type: crate::PatternType::Literal,
principal: "User:alice".into(),
host: "*".into(),
operation: crate::AclOperation::Read,
permission_type: crate::PermissionType::Allow,
};
let r = MetadataRecord::V1AccessControlEntry(entry);
assert!(round_trip(&r) == r);
}
#[test]
fn v1_delete_access_control_entry_round_trip() {
let filter = crate::AclEntryFilter {
resource_type: Some(crate::ResourceType::Group),
resource_name: Some("cg-foo".into()),
pattern_type: Some(crate::PatternType::Literal),
principal: None,
host: None,
operation: None,
permission_type: None,
};
let r = MetadataRecord::V1DeleteAccessControlEntry(filter);
assert!(round_trip(&r) == r);
}
#[test]
fn broker_config_record_round_trip() {
let r = MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
node_id: 7,
config_name: "leader.replication.throttled.rate".into(),
config_value: Some("2048".into()),
});
assert!(round_trip(&r) == r);
}
#[test]
fn client_quota_record_round_trip() {
let r = MetadataRecord::V1ClientQuota(ClientQuotaRecord {
entity: vec![
QuotaEntity {
entity_type: "client-id".into(),
entity_name: Some("app1".into()),
},
QuotaEntity {
entity_type: "user".into(),
entity_name: Some("alice".into()),
},
],
config_key: "producer_byte_rate".into(),
config_value: Some(1024.0),
});
assert!(round_trip(&r) == r);
}
#[test]
fn delegation_token_record_round_trip() {
let r = MetadataRecord::V1DelegationToken(DelegationTokenRecord {
token_id: "tok-abc".into(),
owner: crabka_security::KafkaPrincipal {
principal_type: "User".into(),
name: "alice".into(),
},
hmac: vec![0xAB; 32],
issue_timestamp_ms: 1_700_000_000_000,
expiry_timestamp_ms: 1_700_000_600_000,
max_timestamp_ms: 1_700_604_800_000,
renewers: vec![crabka_security::KafkaPrincipal {
principal_type: "User".into(),
name: "bob".into(),
}],
});
assert!(round_trip(&r) == r);
}
#[test]
fn delete_delegation_token_record_round_trip() {
let r = MetadataRecord::V1DeleteDelegationToken(DeleteDelegationTokenRecord {
token_id: "tok-abc".into(),
});
assert!(round_trip(&r) == r);
}
#[test]
fn voters_record_round_trips() {
let rec = MetadataRecord::V1Voters(VotersRecord {
voters: crate::voters::VoterSet::from_voters([crate::voters::Voter {
id: 7,
directory_id: uuid::Uuid::from_u128(7),
endpoints: vec![crate::voters::VoterEndpoint {
name: "CONTROLLER".into(),
host: "h".into(),
port: 1,
}],
kraft_version: crate::voters::KRaftVersionRange::default(),
}]),
});
assert!(round_trip(&rec) == rec);
}
#[test]
fn kraft_version_record_round_trips() {
let rec = MetadataRecord::V1KRaftVersion(KRaftVersionRecord { kraft_version: 1 });
assert!(round_trip(&rec) == rec);
}
#[test]
fn client_metrics_config_round_trip() {
let mut overrides = std::collections::BTreeMap::new();
overrides.insert("interval.ms".to_string(), "60000".to_string());
overrides.insert(
"metrics".to_string(),
"org.apache.kafka.consumer.".to_string(),
);
let r = MetadataRecord::V1ClientMetricsConfig(ClientMetricsConfigRecord {
name: "sub-a".into(),
configs: overrides,
});
assert_eq!(round_trip(&r), r);
}
}