1use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8pub type NodeId = u64;
9
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub struct TopicRecord {
12 pub name: String,
13 pub topic_id: Uuid,
14 pub partitions: i32,
15 pub replication_factor: i16,
16}
17
18fn default_partition_epoch() -> i32 {
19 -1
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
23pub struct PartitionRecord {
24 pub topic: String,
25 pub partition: i32,
26 pub leader: NodeId,
27 pub replicas: Vec<NodeId>,
28 pub isr: Vec<NodeId>,
29 pub leader_epoch: i32,
32 pub adding_replicas: Vec<NodeId>,
35 pub removing_replicas: Vec<NodeId>,
38 pub directories: Vec<Uuid>,
45 #[serde(default = "default_partition_epoch")]
50 pub partition_epoch: i32,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61pub struct PartitionDirAssignmentRecord {
62 pub topic: String,
63 pub partition: i32,
64 pub replica: NodeId,
66 pub directory: Uuid,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub struct BrokerEndpoint {
77 pub name: String,
79 pub host: String,
80 pub port: u16,
81 pub protocol: crabka_security::ListenerProtocol,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub struct BrokerRegistrationRecord {
86 pub node_id: NodeId,
87 pub broker_epoch: i64,
93 #[serde(default)]
98 pub incarnation_id: uuid::Uuid,
99 pub host: String,
102 pub port: u16,
103 pub rack: Option<String>,
104 pub endpoints: Vec<BrokerEndpoint>,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct DeleteTopicRecord {
112 pub name: String,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct UnregisterBrokerRecord {
125 pub node_id: NodeId,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133pub struct TopicConfigRecord {
134 pub topic: String,
135 pub overrides: std::collections::BTreeMap<String, String>,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct BrokerConfigRecord {
141 pub node_id: NodeId,
142 pub config_name: String,
143 pub config_value: Option<String>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct ClientMetricsConfigRecord {
155 pub name: String,
156 pub configs: std::collections::BTreeMap<String, String>,
157}
158
159#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
160pub struct QuotaEntity {
161 pub entity_type: String,
162 pub entity_name: Option<String>,
163}
164
165#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166pub struct ClientQuotaRecord {
167 pub entity: Vec<QuotaEntity>,
169 pub config_key: String,
170 pub config_value: Option<f64>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
174pub struct ScramCredentialRecord {
175 pub user: String,
176 pub mechanism: crabka_security::SaslMechanism,
177 pub salt: Vec<u8>,
178 pub stored_key: Vec<u8>,
179 pub server_key: Vec<u8>,
180 pub iterations: u32,
181}
182
183#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
184pub struct DeleteScramCredentialRecord {
185 pub user: String,
186 pub mechanism: crabka_security::SaslMechanism,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
198pub struct DelegationTokenRecord {
199 pub token_id: String,
200 pub owner: crabka_security::KafkaPrincipal,
201 pub hmac: Vec<u8>,
202 pub issue_timestamp_ms: i64,
203 pub expiry_timestamp_ms: i64,
204 pub max_timestamp_ms: i64,
207 pub renewers: Vec<crabka_security::KafkaPrincipal>,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214pub struct DeleteDelegationTokenRecord {
215 pub token_id: String,
216}
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220pub struct KRaftVersionRecord {
221 pub kraft_version: u16,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
226pub struct VotersRecord {
227 pub voters: crate::voters::VoterSet,
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
236pub struct FeatureLevelRecord {
237 pub name: String,
238 pub level: i16,
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
258pub struct FeaturesEpochRecord {
259 pub epoch: i64,
260}
261
262#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
263#[non_exhaustive]
264pub enum MetadataRecord {
265 V1Topic(TopicRecord),
266 V1Partition(PartitionRecord),
267 V1BrokerRegistration(BrokerRegistrationRecord),
268 V1DeleteTopic(DeleteTopicRecord),
269 V1TopicConfig(TopicConfigRecord),
270 V1ScramCredential(ScramCredentialRecord),
271 V1DeleteScramCredential(DeleteScramCredentialRecord),
272 V1AccessControlEntry(crate::AclEntry),
273 V1DeleteAccessControlEntry(crate::AclEntryFilter),
274 V1BrokerConfig(BrokerConfigRecord),
275 V1ClientQuota(ClientQuotaRecord),
276 V1DelegationToken(DelegationTokenRecord),
277 V1DeleteDelegationToken(DeleteDelegationTokenRecord),
278 V1UnregisterBroker(UnregisterBrokerRecord),
279 V1KRaftVersion(KRaftVersionRecord),
280 V1Voters(VotersRecord),
281 V1FeatureLevel(FeatureLevelRecord),
282 V1ClientMetricsConfig(ClientMetricsConfigRecord),
283 V1FeaturesEpoch(FeaturesEpochRecord),
286 V1PartitionDirAssignment(PartitionDirAssignmentRecord),
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use assert2::assert;
296 use serde_wincode::SerdeCompat;
297 use wincode::{Deserialize as _, Serialize as _};
298
299 fn round_trip(r: &MetadataRecord) -> MetadataRecord {
300 let bytes = <SerdeCompat<MetadataRecord>>::serialize(r).unwrap();
301 <SerdeCompat<MetadataRecord>>::deserialize(&bytes).unwrap()
302 }
303
304 #[test]
305 fn feature_level_round_trip() {
306 let r = MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
307 name: "metadata.version".into(),
308 level: 1,
309 });
310 assert!(round_trip(&r) == r);
311 }
312
313 #[test]
314 fn features_epoch_round_trip() {
315 let r = MetadataRecord::V1FeaturesEpoch(FeaturesEpochRecord { epoch: 7 });
316 assert!(round_trip(&r) == r);
317 }
318
319 #[test]
320 fn topic_record_round_trip() {
321 let r = MetadataRecord::V1Topic(TopicRecord {
322 name: "t".into(),
323 topic_id: Uuid::new_v4(),
324 partitions: 3,
325 replication_factor: 1,
326 });
327 assert!(round_trip(&r) == r);
328 }
329
330 #[test]
331 fn partition_record_round_trip() {
332 let r = MetadataRecord::V1Partition(PartitionRecord {
333 topic: "t".into(),
334 partition: 0,
335 leader: 1,
336 replicas: vec![1, 2, 3],
337 isr: vec![1, 2],
338 leader_epoch: 0,
339 adding_replicas: vec![],
340 removing_replicas: vec![],
341 directories: vec![Uuid::from_u128(1), Uuid::from_u128(2), Uuid::nil()],
342 partition_epoch: 0,
343 });
344 assert!(round_trip(&r) == r);
345 }
346
347 #[test]
348 fn partition_dir_assignment_round_trip() {
349 let r = MetadataRecord::V1PartitionDirAssignment(PartitionDirAssignmentRecord {
350 topic: "t".into(),
351 partition: 2,
352 replica: 3,
353 directory: Uuid::from_u128(0xAB),
354 });
355 assert!(round_trip(&r) == r);
356 }
357
358 #[test]
359 fn broker_registration_round_trip() {
360 let r = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
361 node_id: 7,
362 broker_epoch: 0,
363 incarnation_id: Uuid::from_u128(0xdeadbeef_cafe_babe_0123_456789abcdef),
364 host: "192.168.1.10".into(),
365 port: 9092,
366 rack: Some("us-east-1a".into()),
367 endpoints: vec![],
368 });
369 assert!(round_trip(&r) == r);
370 }
371
372 #[test]
373 fn broker_registration_with_endpoints_round_trip() {
374 let r = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
375 node_id: 1,
376 broker_epoch: 0,
377 incarnation_id: Uuid::from_u128(0xfeedface_0000_0000_0000_000000000001),
378 host: "h".into(),
379 port: 9092,
380 rack: None,
381 endpoints: vec![BrokerEndpoint {
382 name: "EXTERNAL".into(),
383 host: "ext.example.com".into(),
384 port: 9092,
385 protocol: crabka_security::ListenerProtocol::SaslSsl,
386 }],
387 });
388 assert!(round_trip(&r) == r);
389 }
390
391 #[test]
392 fn delete_topic_round_trip() {
393 let r = MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
394 name: "doomed".into(),
395 });
396 assert!(round_trip(&r) == r);
397 }
398
399 #[test]
400 fn unregister_broker_round_trip() {
401 let r = MetadataRecord::V1UnregisterBroker(UnregisterBrokerRecord { node_id: 42 });
402 assert!(round_trip(&r) == r);
403 }
404
405 #[test]
406 fn topic_config_record_round_trip() {
407 let mut overrides = std::collections::BTreeMap::new();
408 overrides.insert("retention.ms".to_string(), "60000".to_string());
409 overrides.insert("segment.bytes".to_string(), "1048576".to_string());
410 let r = MetadataRecord::V1TopicConfig(TopicConfigRecord {
411 topic: "t".into(),
412 overrides,
413 });
414 assert!(round_trip(&r) == r);
415 }
416
417 #[test]
418 fn scram_credential_round_trip() {
419 let r = MetadataRecord::V1ScramCredential(ScramCredentialRecord {
420 user: "alice".into(),
421 mechanism: crabka_security::SaslMechanism::ScramSha512,
422 salt: vec![1u8; 16],
423 stored_key: vec![2u8; 64],
424 server_key: vec![3u8; 64],
425 iterations: 4096,
426 });
427 assert!(round_trip(&r) == r);
428 }
429
430 #[test]
431 fn delete_scram_credential_round_trip() {
432 let r = MetadataRecord::V1DeleteScramCredential(DeleteScramCredentialRecord {
433 user: "alice".into(),
434 mechanism: crabka_security::SaslMechanism::ScramSha512,
435 });
436 assert!(round_trip(&r) == r);
437 }
438
439 #[test]
440 fn v1_access_control_entry_round_trip() {
441 let entry = crate::AclEntry {
442 resource_type: crate::ResourceType::Topic,
443 resource_name: "foo".into(),
444 pattern_type: crate::PatternType::Literal,
445 principal: "User:alice".into(),
446 host: "*".into(),
447 operation: crate::AclOperation::Read,
448 permission_type: crate::PermissionType::Allow,
449 };
450 let r = MetadataRecord::V1AccessControlEntry(entry);
451 assert!(round_trip(&r) == r);
452 }
453
454 #[test]
455 fn v1_delete_access_control_entry_round_trip() {
456 let filter = crate::AclEntryFilter {
457 resource_type: Some(crate::ResourceType::Group),
458 resource_name: Some("cg-foo".into()),
459 pattern_type: Some(crate::PatternType::Literal),
460 principal: None,
461 host: None,
462 operation: None,
463 permission_type: None,
464 };
465 let r = MetadataRecord::V1DeleteAccessControlEntry(filter);
466 assert!(round_trip(&r) == r);
467 }
468
469 #[test]
470 fn broker_config_record_round_trip() {
471 let r = MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
472 node_id: 7,
473 config_name: "leader.replication.throttled.rate".into(),
474 config_value: Some("2048".into()),
475 });
476 assert!(round_trip(&r) == r);
477 }
478
479 #[test]
480 fn client_quota_record_round_trip() {
481 let r = MetadataRecord::V1ClientQuota(ClientQuotaRecord {
482 entity: vec![
483 QuotaEntity {
484 entity_type: "client-id".into(),
485 entity_name: Some("app1".into()),
486 },
487 QuotaEntity {
488 entity_type: "user".into(),
489 entity_name: Some("alice".into()),
490 },
491 ],
492 config_key: "producer_byte_rate".into(),
493 config_value: Some(1024.0),
494 });
495 assert!(round_trip(&r) == r);
496 }
497
498 #[test]
499 fn delegation_token_record_round_trip() {
500 let r = MetadataRecord::V1DelegationToken(DelegationTokenRecord {
501 token_id: "tok-abc".into(),
502 owner: crabka_security::KafkaPrincipal {
503 principal_type: "User".into(),
504 name: "alice".into(),
505 },
506 hmac: vec![0xAB; 32],
507 issue_timestamp_ms: 1_700_000_000_000,
508 expiry_timestamp_ms: 1_700_000_600_000,
509 max_timestamp_ms: 1_700_604_800_000,
510 renewers: vec![crabka_security::KafkaPrincipal {
511 principal_type: "User".into(),
512 name: "bob".into(),
513 }],
514 });
515 assert!(round_trip(&r) == r);
516 }
517
518 #[test]
519 fn delete_delegation_token_record_round_trip() {
520 let r = MetadataRecord::V1DeleteDelegationToken(DeleteDelegationTokenRecord {
521 token_id: "tok-abc".into(),
522 });
523 assert!(round_trip(&r) == r);
524 }
525
526 #[test]
527 fn voters_record_round_trips() {
528 let rec = MetadataRecord::V1Voters(VotersRecord {
529 voters: crate::voters::VoterSet::from_voters([crate::voters::Voter {
530 id: 7,
531 directory_id: uuid::Uuid::from_u128(7),
532 endpoints: vec![crate::voters::VoterEndpoint {
533 name: "CONTROLLER".into(),
534 host: "h".into(),
535 port: 1,
536 }],
537 kraft_version: crate::voters::KRaftVersionRange::default(),
538 }]),
539 });
540 assert!(round_trip(&rec) == rec);
541 }
542
543 #[test]
544 fn kraft_version_record_round_trips() {
545 let rec = MetadataRecord::V1KRaftVersion(KRaftVersionRecord { kraft_version: 1 });
546 assert!(round_trip(&rec) == rec);
547 }
548
549 #[test]
550 fn client_metrics_config_round_trip() {
551 let mut overrides = std::collections::BTreeMap::new();
552 overrides.insert("interval.ms".to_string(), "60000".to_string());
553 overrides.insert(
554 "metrics".to_string(),
555 "org.apache.kafka.consumer.".to_string(),
556 );
557 let r = MetadataRecord::V1ClientMetricsConfig(ClientMetricsConfigRecord {
558 name: "sub-a".into(),
559 configs: overrides,
560 });
561 assert_eq!(round_trip(&r), r);
562 }
563}