1use std::collections::{BTreeMap, HashMap};
6
7use uuid::Uuid;
8
9use crabka_security::{KafkaPrincipal, SaslMechanism, ScramCredential};
10
11use crate::acl::{AclEntry, PatternType, ResourceType};
12use crate::error::MetadataError;
13use crate::records::{
14 BrokerConfigRecord, BrokerRegistrationRecord, ClientMetricsConfigRecord, ClientQuotaRecord,
15 DelegationTokenRecord, FeatureLevelRecord, FeaturesEpochRecord, KRaftVersionRecord,
16 MetadataRecord, NodeId, PartitionRecord, QuotaEntity, ScramCredentialRecord, TopicConfigRecord,
17 TopicRecord, VotersRecord,
18};
19
20pub type EntityKey = Vec<(String, Option<String>)>;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct DelegationToken {
27 pub token_id: String,
28 pub owner: KafkaPrincipal,
29 pub hmac: Vec<u8>,
30 pub issue_timestamp_ms: i64,
31 pub expiry_timestamp_ms: i64,
32 pub max_timestamp_ms: i64,
33 pub renewers: Vec<KafkaPrincipal>,
34}
35
36impl DelegationToken {
37 #[must_use]
38 pub fn from_record(rec: &DelegationTokenRecord) -> Self {
39 Self {
40 token_id: rec.token_id.clone(),
41 owner: rec.owner.clone(),
42 hmac: rec.hmac.clone(),
43 issue_timestamp_ms: rec.issue_timestamp_ms,
44 expiry_timestamp_ms: rec.expiry_timestamp_ms,
45 max_timestamp_ms: rec.max_timestamp_ms,
46 renewers: rec.renewers.clone(),
47 }
48 }
49}
50
51#[must_use]
52pub fn canonicalize_entity(mut tuple: Vec<(String, Option<String>)>) -> EntityKey {
53 tuple.sort_by(|a, b| a.0.cmp(&b.0));
54 tuple
55}
56
57#[derive(Debug, Clone, Default, PartialEq)]
58pub struct MetadataImage {
59 cluster_id: Uuid,
60 topics: HashMap<String, TopicRecord>,
61 topic_ids: HashMap<Uuid, String>,
65 partitions: HashMap<(String, i32), PartitionRecord>,
66 brokers: HashMap<NodeId, BrokerRegistrationRecord>,
67 topic_configs: HashMap<String, BTreeMap<String, String>>,
68 broker_configs: HashMap<NodeId, BTreeMap<String, String>>,
69 client_metrics_configs: HashMap<String, BTreeMap<String, String>>,
70 scram_credentials: HashMap<(String, SaslMechanism), ScramCredential>,
71 acls_literal: HashMap<(ResourceType, String), Vec<AclEntry>>,
72 acls_prefixed: HashMap<ResourceType, Vec<AclEntry>>,
73 client_quotas: HashMap<EntityKey, BTreeMap<String, f64>>,
74 delegation_tokens: HashMap<String, DelegationToken>,
75 kraft_version: u16,
76 voters: crate::voters::VoterSet,
77 feature_levels: BTreeMap<String, i16>,
78 features_epoch: i64,
83}
84
85#[derive(Debug, Clone, Copy)]
87pub enum ThrottleKind {
88 Leader,
89 Follower,
90}
91
92impl MetadataImage {
93 #[must_use]
94 pub fn new(cluster_id: Uuid) -> Self {
95 Self {
96 cluster_id,
97 topics: HashMap::new(),
98 topic_ids: HashMap::new(),
99 partitions: HashMap::new(),
100 brokers: HashMap::new(),
101 topic_configs: HashMap::new(),
102 broker_configs: HashMap::new(),
103 client_metrics_configs: HashMap::new(),
104 scram_credentials: HashMap::new(),
105 acls_literal: HashMap::new(),
106 acls_prefixed: HashMap::new(),
107 client_quotas: HashMap::new(),
108 delegation_tokens: HashMap::new(),
109 kraft_version: 0,
110 voters: crate::voters::VoterSet::default(),
111 feature_levels: BTreeMap::new(),
112 features_epoch: -1,
113 }
114 }
115
116 #[must_use]
117 pub fn cluster_id(&self) -> Uuid {
118 self.cluster_id
119 }
120
121 pub fn topics(&self) -> impl Iterator<Item = &TopicRecord> {
122 self.topics.values()
123 }
124
125 #[must_use]
126 pub fn topic(&self, name: &str) -> Option<&TopicRecord> {
127 self.topics.get(name)
128 }
129
130 #[must_use]
132 pub fn topic_by_id(&self, id: &Uuid) -> Option<&TopicRecord> {
133 let name = self.topic_ids.get(id)?;
134 self.topics.get(name)
135 }
136
137 #[must_use]
139 pub fn topic_name_by_id(&self, id: &Uuid) -> Option<&str> {
140 self.topic_ids.get(id).map(String::as_str)
141 }
142
143 #[must_use]
144 pub fn partition(&self, topic: &str, idx: i32) -> Option<&PartitionRecord> {
145 self.partitions.get(&(topic.to_string(), idx))
146 }
147
148 pub fn partitions_of(&self, topic: &str) -> impl Iterator<Item = &PartitionRecord> {
149 self.partitions
150 .iter()
151 .filter(move |((t, _), _)| t == topic)
152 .map(|(_, v)| v)
153 }
154
155 #[must_use]
162 pub fn topic_partition_count(&self, topic: &str) -> i32 {
163 i32::try_from(self.partitions_of(topic).count()).unwrap_or(i32::MAX)
164 }
165
166 pub fn all_partitions(&self) -> impl Iterator<Item = (&(String, i32), &PartitionRecord)> {
172 self.partitions.iter()
173 }
174
175 pub fn reassignments_in_flight(&self) -> impl Iterator<Item = &PartitionRecord> + '_ {
178 self.all_partitions()
179 .map(|(_, p)| p)
180 .filter(|p| !p.adding_replicas.is_empty() || !p.removing_replicas.is_empty())
181 }
182
183 #[must_use]
187 pub fn topic_config(&self, topic: &str) -> Option<&BTreeMap<String, String>> {
188 self.topic_configs.get(topic)
189 }
190
191 #[must_use]
194 pub fn broker_config(&self, node_id: NodeId) -> Option<&BTreeMap<String, String>> {
195 self.broker_configs.get(&node_id)
196 }
197
198 #[must_use]
202 pub fn broker_throttle_rate(&self, node_id: NodeId, kind: ThrottleKind) -> Option<u64> {
203 let key = match kind {
204 ThrottleKind::Leader => "leader.replication.throttled.rate",
205 ThrottleKind::Follower => "follower.replication.throttled.rate",
206 };
207 let raw = self.broker_config(node_id)?.get(key)?;
208 let v: i64 = raw.parse().ok()?;
209 #[allow(clippy::cast_sign_loss)]
210 if v < 0 { None } else { Some(v as u64) }
211 }
212
213 #[must_use]
215 pub fn client_metrics_config(&self, name: &str) -> Option<&BTreeMap<String, String>> {
216 self.client_metrics_configs.get(name)
217 }
218
219 pub fn client_metrics_subscriptions(
221 &self,
222 ) -> impl Iterator<Item = (&String, &BTreeMap<String, String>)> {
223 self.client_metrics_configs.iter()
224 }
225
226 #[must_use]
227 pub fn client_quotas(&self) -> &HashMap<EntityKey, BTreeMap<String, f64>> {
228 &self.client_quotas
229 }
230
231 #[must_use]
232 pub fn scram_credential(
233 &self,
234 user: &str,
235 mechanism: SaslMechanism,
236 ) -> Option<&ScramCredential> {
237 self.scram_credentials.get(&(user.to_string(), mechanism))
238 }
239
240 #[must_use]
243 pub fn scram_credentials_users(&self) -> Vec<String> {
244 self.scram_credentials
245 .keys()
246 .map(|(u, _)| u.clone())
247 .collect::<std::collections::HashSet<_>>()
248 .into_iter()
249 .collect()
250 }
251
252 #[must_use]
255 pub fn scram_credentials_for_user(&self, user: &str) -> Vec<(SaslMechanism, u32)> {
256 self.scram_credentials
257 .iter()
258 .filter(|((u, _), _)| u == user)
259 .map(|((_, mech), cred)| (*mech, cred.iterations))
260 .collect()
261 }
262
263 #[must_use]
264 pub fn broker(&self, node_id: NodeId) -> Option<&BrokerRegistrationRecord> {
265 self.brokers.get(&node_id)
266 }
267
268 #[must_use]
271 pub fn broker_epoch(&self, node_id: NodeId) -> Option<i64> {
272 self.brokers.get(&node_id).map(|b| b.broker_epoch)
273 }
274
275 pub fn brokers(&self) -> impl Iterator<Item = &BrokerRegistrationRecord> {
276 self.brokers.values()
277 }
278
279 #[must_use]
280 pub fn kraft_version(&self) -> u16 {
281 self.kraft_version
282 }
283
284 #[must_use]
285 pub fn voters(&self) -> &crate::voters::VoterSet {
286 &self.voters
287 }
288
289 pub fn matching_acls<'a>(
296 &'a self,
297 rt: ResourceType,
298 rn: &'a str,
299 ) -> impl Iterator<Item = &'a AclEntry> + 'a {
300 let literal_iter = self
301 .acls_literal
302 .get(&(rt, rn.to_string()))
303 .into_iter()
304 .flatten();
305 let wildcard_iter = (rn != "*")
309 .then(|| self.acls_literal.get(&(rt, "*".to_string())))
310 .into_iter()
311 .flatten()
312 .flatten();
313 let prefixed_iter = self
314 .acls_prefixed
315 .get(&rt)
316 .into_iter()
317 .flatten()
318 .filter(move |e| rn.starts_with(&e.resource_name));
319 literal_iter.chain(wildcard_iter).chain(prefixed_iter)
320 }
321
322 pub fn all_acls(&self) -> impl Iterator<Item = &AclEntry> {
325 self.acls_literal
326 .values()
327 .flatten()
328 .chain(self.acls_prefixed.values().flatten())
329 }
330
331 #[must_use]
333 pub fn delegation_token_by_id(&self, token_id: &str) -> Option<&DelegationToken> {
334 self.delegation_tokens.get(token_id)
335 }
336
337 #[must_use]
340 pub fn delegation_tokens_by_owner(&self, owner: &KafkaPrincipal) -> Vec<&DelegationToken> {
341 self.delegation_tokens
342 .values()
343 .filter(|t| &t.owner == owner)
344 .collect()
345 }
346
347 #[must_use]
352 pub fn delegation_tokens_visible_to(
353 &self,
354 principal: &KafkaPrincipal,
355 ) -> Vec<&DelegationToken> {
356 self.delegation_tokens
357 .values()
358 .filter(|t| &t.owner == principal || t.renewers.iter().any(|r| r == principal))
359 .collect()
360 }
361
362 pub fn all_delegation_tokens(&self) -> impl Iterator<Item = &DelegationToken> {
366 self.delegation_tokens.values()
367 }
368
369 #[must_use]
377 pub fn delegation_token_by_hmac(&self, hmac: &[u8]) -> Option<&DelegationToken> {
378 self.delegation_tokens.values().find(|t| t.hmac == hmac)
379 }
380
381 #[must_use]
384 pub fn finalized_features(&self) -> &BTreeMap<String, i16> {
385 &self.feature_levels
386 }
387
388 #[must_use]
391 pub fn finalized_features_epoch(&self) -> i64 {
392 self.features_epoch
393 }
394
395 #[must_use]
399 pub fn finalized_metadata_version(&self) -> Option<i16> {
400 self.feature_levels
401 .get(crate::metadata_version::METADATA_VERSION_FEATURE)
402 .copied()
403 }
404
405 #[must_use]
409 pub fn finalized_feature(&self, name: &str) -> Option<i16> {
410 self.feature_levels.get(name).copied()
411 }
412
413 #[must_use]
418 pub fn min_required_metadata_version(&self) -> i16 {
419 use crate::metadata_version::{
420 DELEGATION_TOKEN_MIN_LEVEL, METADATA_VERSION_MIN, SCRAM_MIN_LEVEL,
421 };
422 let mut floor = METADATA_VERSION_MIN;
423 if !self.scram_credentials.is_empty() {
424 floor = floor.max(SCRAM_MIN_LEVEL);
425 }
426 if !self.delegation_tokens.is_empty() {
427 floor = floor.max(DELEGATION_TOKEN_MIN_LEVEL);
428 }
429 floor
430 }
431
432 #[allow(clippy::too_many_lines)] pub fn apply(&mut self, rec: &MetadataRecord) {
439 match rec {
440 MetadataRecord::V1Topic(t) => {
441 if let Some(prev) = self.topics.get(&t.name)
444 && prev.topic_id != t.topic_id
445 {
446 self.topic_ids.remove(&prev.topic_id);
447 }
448 self.topic_ids.insert(t.topic_id, t.name.clone());
449 let mut rec = t.clone();
458 rec.partitions = self.topics.get(&t.name).map_or(0, |prev| prev.partitions);
459 self.topics.insert(t.name.clone(), rec);
460 }
461 MetadataRecord::V1Partition(p) => {
462 let is_new = self
469 .partitions
470 .insert((p.topic.clone(), p.partition), p.clone())
471 .is_none();
472 if let Some(t) = self.topics.get_mut(&p.topic) {
473 if is_new {
474 t.partitions = t.partitions.saturating_add(1);
475 }
476 t.replication_factor =
477 i16::try_from(p.replicas.len()).unwrap_or(t.replication_factor);
478 }
479 }
480 MetadataRecord::V1BrokerRegistration(b) => {
481 self.brokers.insert(b.node_id, b.clone());
482 }
483 MetadataRecord::V1DeleteTopic(d) => {
484 if let Some(prev) = self.topics.get(&d.name) {
485 self.topic_ids.remove(&prev.topic_id);
486 }
487 self.topics.remove(&d.name);
488 self.partitions.retain(|(t, _), _| t != &d.name);
489 self.topic_configs.remove(&d.name);
490 }
491 MetadataRecord::V1TopicConfig(c) => {
492 if c.overrides.is_empty() {
493 self.topic_configs.remove(&c.topic);
494 } else {
495 self.topic_configs
496 .insert(c.topic.clone(), c.overrides.clone());
497 }
498 }
499 MetadataRecord::V1ScramCredential(r) => {
500 self.scram_credentials.insert(
501 (r.user.clone(), r.mechanism),
502 ScramCredential {
503 mechanism: r.mechanism,
504 salt: r.salt.clone(),
505 stored_key: r.stored_key.clone(),
506 server_key: r.server_key.clone(),
507 iterations: r.iterations,
508 },
509 );
510 }
511 MetadataRecord::V1DeleteScramCredential(r) => {
512 self.scram_credentials
513 .remove(&(r.user.clone(), r.mechanism));
514 }
515 MetadataRecord::V1AccessControlEntry(entry) => {
516 let bucket = match entry.pattern_type {
517 PatternType::Literal => self
518 .acls_literal
519 .entry((entry.resource_type, entry.resource_name.clone()))
520 .or_default(),
521 PatternType::Prefixed => {
522 self.acls_prefixed.entry(entry.resource_type).or_default()
523 }
524 };
525 bucket.retain(|e| e != entry);
527 bucket.push(entry.clone());
528 }
529 MetadataRecord::V1DeleteAccessControlEntry(filter) => {
530 self.acls_literal.retain(|_, v| {
531 v.retain(|e| !filter.matches(e));
532 !v.is_empty()
533 });
534 self.acls_prefixed.retain(|_, v| {
535 v.retain(|e| !filter.matches(e));
536 !v.is_empty()
537 });
538 }
539 MetadataRecord::V1BrokerConfig(rec) => {
540 let entry = self.broker_configs.entry(rec.node_id).or_default();
541 match &rec.config_value {
542 Some(v) => {
543 entry.insert(rec.config_name.clone(), v.clone());
544 }
545 None => {
546 entry.remove(&rec.config_name);
547 }
548 }
549 }
550 MetadataRecord::V1ClientQuota(rec) => {
551 let key = canonicalize_entity(
552 rec.entity
553 .iter()
554 .map(|e| (e.entity_type.clone(), e.entity_name.clone()))
555 .collect(),
556 );
557 let configs = self.client_quotas.entry(key).or_default();
558 match rec.config_value {
559 Some(v) => {
560 configs.insert(rec.config_key.clone(), v);
561 }
562 None => {
563 configs.remove(&rec.config_key);
564 }
565 }
566 }
567 MetadataRecord::V1DelegationToken(rec) => {
571 self.delegation_tokens
572 .insert(rec.token_id.clone(), DelegationToken::from_record(rec));
573 }
574 MetadataRecord::V1DeleteDelegationToken(rec) => {
575 self.delegation_tokens.remove(&rec.token_id);
576 }
577 MetadataRecord::V1UnregisterBroker(rec) => {
578 self.brokers.remove(&rec.node_id);
581 }
582 MetadataRecord::V1KRaftVersion(r) => {
583 self.kraft_version = r.kraft_version;
584 }
585 MetadataRecord::V1Voters(r) => {
586 self.voters = r.voters.clone();
587 }
588 MetadataRecord::V1FeatureLevel(rec) => {
589 if rec.level == 0 {
590 self.feature_levels.remove(&rec.name);
591 } else {
592 self.feature_levels.insert(rec.name.clone(), rec.level);
593 }
594 self.features_epoch = self.features_epoch.saturating_add(1).max(0);
596 }
597 MetadataRecord::V1ClientMetricsConfig(c) => {
598 if c.configs.is_empty() {
599 self.client_metrics_configs.remove(&c.name);
600 } else {
601 self.client_metrics_configs
602 .insert(c.name.clone(), c.configs.clone());
603 }
604 }
605 MetadataRecord::V1FeaturesEpoch(rec) => {
610 self.features_epoch = rec.epoch;
611 }
612 MetadataRecord::V1PartitionDirAssignment(r) => {
617 if let Some(pr) = self.partitions.get_mut(&(r.topic.clone(), r.partition))
618 && let Some(slot) = pr.replicas.iter().position(|n| *n == r.replica)
619 {
620 if pr.directories.len() < pr.replicas.len() {
621 pr.directories.resize(pr.replicas.len(), uuid::Uuid::nil());
622 }
623 pr.directories[slot] = r.directory;
624 }
625 }
626 }
627 }
628
629 #[must_use]
646 pub fn to_records(&self) -> Vec<MetadataRecord> {
647 let mut out = Vec::new();
648
649 for b in self.brokers.values() {
651 out.push(MetadataRecord::V1BrokerRegistration(b.clone()));
652 }
653 for (node_id, configs) in &self.broker_configs {
654 for (config_name, config_value) in configs {
655 out.push(MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
656 node_id: *node_id,
657 config_name: config_name.clone(),
658 config_value: Some(config_value.clone()),
659 }));
660 }
661 }
662
663 for t in self.topics.values() {
667 out.push(MetadataRecord::V1Topic(t.clone()));
668 }
669 for p in self.partitions.values() {
670 out.push(MetadataRecord::V1Partition(p.clone()));
671 }
672 for (topic, overrides) in &self.topic_configs {
673 out.push(MetadataRecord::V1TopicConfig(TopicConfigRecord {
674 topic: topic.clone(),
675 overrides: overrides.clone(),
676 }));
677 }
678
679 for ((user, mechanism), cred) in &self.scram_credentials {
681 out.push(MetadataRecord::V1ScramCredential(ScramCredentialRecord {
682 user: user.clone(),
683 mechanism: *mechanism,
684 salt: cred.salt.clone(),
685 stored_key: cred.stored_key.clone(),
686 server_key: cred.server_key.clone(),
687 iterations: cred.iterations,
688 }));
689 }
690
691 for entry in self.all_acls() {
695 out.push(MetadataRecord::V1AccessControlEntry(entry.clone()));
696 }
697
698 for (entity_key, configs) in &self.client_quotas {
702 let entity: Vec<QuotaEntity> = entity_key
703 .iter()
704 .map(|(entity_type, entity_name)| QuotaEntity {
705 entity_type: entity_type.clone(),
706 entity_name: entity_name.clone(),
707 })
708 .collect();
709 for (config_key, config_value) in configs {
710 out.push(MetadataRecord::V1ClientQuota(ClientQuotaRecord {
711 entity: entity.clone(),
712 config_key: config_key.clone(),
713 config_value: Some(*config_value),
714 }));
715 }
716 }
717
718 for tok in self.delegation_tokens.values() {
720 out.push(MetadataRecord::V1DelegationToken(DelegationTokenRecord {
721 token_id: tok.token_id.clone(),
722 owner: tok.owner.clone(),
723 hmac: tok.hmac.clone(),
724 issue_timestamp_ms: tok.issue_timestamp_ms,
725 expiry_timestamp_ms: tok.expiry_timestamp_ms,
726 max_timestamp_ms: tok.max_timestamp_ms,
727 renewers: tok.renewers.clone(),
728 }));
729 }
730
731 for (name, configs) in &self.client_metrics_configs {
733 out.push(MetadataRecord::V1ClientMetricsConfig(
734 ClientMetricsConfigRecord {
735 name: name.clone(),
736 configs: configs.clone(),
737 },
738 ));
739 }
740
741 for (name, level) in &self.feature_levels {
751 out.push(MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
752 name: name.clone(),
753 level: *level,
754 }));
755 }
756 if self.features_epoch >= 0 {
757 out.push(MetadataRecord::V1FeaturesEpoch(FeaturesEpochRecord {
758 epoch: self.features_epoch,
759 }));
760 }
761
762 if self.kraft_version != 0 {
777 out.push(MetadataRecord::V1KRaftVersion(KRaftVersionRecord {
778 kraft_version: self.kraft_version,
779 }));
780 }
781 if !self.voters.is_empty() {
782 out.push(MetadataRecord::V1Voters(VotersRecord {
783 voters: self.voters.clone(),
784 }));
785 }
786
787 out
788 }
789
790 #[must_use]
794 pub fn from_records(cluster_id: Uuid, records: &[MetadataRecord]) -> Self {
795 let mut image = Self::new(cluster_id);
796 for rec in records {
797 image.apply(rec);
798 }
799 image
800 }
801
802 pub fn validate(&self, rec: &MetadataRecord) -> Result<(), MetadataError> {
806 match rec {
807 MetadataRecord::V1Topic(t) => {
808 if let Some(existing) = self.topics.get(&t.name) {
809 if existing.topic_id != t.topic_id
820 || existing.replication_factor != t.replication_factor
821 || t.partitions <= self.topic_partition_count(&t.name)
822 {
823 return Err(MetadataError::TopicExists(t.name.clone()));
824 }
825 return Ok(());
826 }
827 Ok(())
834 }
835 MetadataRecord::V1Partition(p) => {
836 if !self.topics.contains_key(&p.topic) {
837 return Err(MetadataError::UnknownTopic(p.topic.clone()));
838 }
839 Ok(())
840 }
841 MetadataRecord::V1DeleteTopic(d) => {
842 if !self.topics.contains_key(&d.name) {
843 return Err(MetadataError::UnknownTopic(d.name.clone()));
844 }
845 Ok(())
846 }
847 MetadataRecord::V1TopicConfig(c) => {
848 if !self.topics.contains_key(&c.topic) {
849 return Err(MetadataError::UnknownTopic(c.topic.clone()));
850 }
851 Ok(())
852 }
853 MetadataRecord::V1BrokerRegistration(_)
854 | MetadataRecord::V1ScramCredential(_)
855 | MetadataRecord::V1DeleteScramCredential(_)
856 | MetadataRecord::V1AccessControlEntry(_)
857 | MetadataRecord::V1DeleteAccessControlEntry(_)
858 | MetadataRecord::V1BrokerConfig(_)
859 | MetadataRecord::V1ClientQuota(_)
860 | MetadataRecord::V1DelegationToken(_)
865 | MetadataRecord::V1DeleteDelegationToken(_)
866 | MetadataRecord::V1UnregisterBroker(_)
870 | MetadataRecord::V1KRaftVersion(_)
874 | MetadataRecord::V1Voters(_)
875 | MetadataRecord::V1FeatureLevel(_)
879 | MetadataRecord::V1ClientMetricsConfig(_)
884 | MetadataRecord::V1FeaturesEpoch(_)
888 | MetadataRecord::V1PartitionDirAssignment(_) => Ok(()),
893 }
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900 use crate::acl::{AclEntryFilter, AclOperation, PermissionType};
901 use crate::records::{
902 BrokerConfigRecord, ClientQuotaRecord, DeleteDelegationTokenRecord,
903 DeleteScramCredentialRecord, DeleteTopicRecord, FeatureLevelRecord, QuotaEntity,
904 ScramCredentialRecord,
905 };
906 use assert2::assert;
907
908 fn img() -> MetadataImage {
909 MetadataImage::new(Uuid::nil())
910 }
911
912 #[test]
913 fn fresh_image_has_no_features_and_unknown_epoch() {
914 let m = img();
915 assert!(m.finalized_features().is_empty());
916 assert!(m.finalized_features_epoch() == -1);
917 }
918
919 #[test]
920 fn finalized_feature_reads_arbitrary_name() {
921 let mut m = MetadataImage::new(uuid::Uuid::nil());
922 assert!(m.finalized_feature("transaction.version").is_none());
923 m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
924 name: "transaction.version".into(),
925 level: 2,
926 }));
927 assert!(m.finalized_feature("transaction.version") == Some(2));
928 }
929
930 #[test]
931 fn apply_feature_level_sets_level_and_bumps_epoch() {
932 let mut m = img();
933 m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
934 name: "metadata.version".into(),
935 level: 1,
936 }));
937 assert!(m.finalized_features().get("metadata.version") == Some(&1));
938 assert!(m.finalized_features_epoch() == 0);
939
940 m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
942 name: "metadata.version".into(),
943 level: 1,
944 }));
945 assert!(m.finalized_features_epoch() == 1);
946 }
947
948 #[test]
949 fn apply_feature_level_zero_deletes_entry() {
950 let mut m = img();
951 m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
952 name: "metadata.version".into(),
953 level: 1,
954 }));
955 m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
956 name: "metadata.version".into(),
957 level: 0,
958 }));
959 assert!(m.finalized_features().get("metadata.version").is_none());
960 assert!(m.finalized_features_epoch() == 1);
962 }
963
964 #[test]
965 fn to_records_from_records_round_trips() {
966 let cid = Uuid::new_v4();
967 let mut image = MetadataImage::new(cid);
968 image.apply(&MetadataRecord::V1Topic(TopicRecord {
969 name: "orders".into(),
970 topic_id: Uuid::new_v4(),
971 partitions: 3,
972 replication_factor: 2,
973 }));
974 assert!(MetadataImage::from_records(cid, &image.to_records()) == image);
975 }
976
977 #[test]
982 #[allow(clippy::too_many_lines)] fn to_records_round_trips_all_variants() {
984 use crabka_security::{KafkaPrincipal, SaslMechanism};
985
986 let cid = Uuid::new_v4();
987 let mut image = MetadataImage::new(cid);
988
989 image.apply(&MetadataRecord::V1BrokerRegistration(
991 BrokerRegistrationRecord {
992 node_id: 1,
993 broker_epoch: 0,
994 incarnation_id: Uuid::nil(),
995 host: "h1".into(),
996 port: 9092,
997 rack: Some("r1".into()),
998 endpoints: vec![crate::records::BrokerEndpoint {
999 name: "EXTERNAL".into(),
1000 host: "ext".into(),
1001 port: 9093,
1002 protocol: crabka_security::ListenerProtocol::SaslSsl,
1003 }],
1004 },
1005 ));
1006 image.apply(&MetadataRecord::V1BrokerRegistration(
1007 BrokerRegistrationRecord {
1008 node_id: 2,
1009 broker_epoch: 0,
1010 incarnation_id: Uuid::nil(),
1011 host: "h2".into(),
1012 port: 9092,
1013 rack: None,
1014 endpoints: vec![],
1015 },
1016 ));
1017 image.apply(&MetadataRecord::V1UnregisterBroker(
1018 crate::records::UnregisterBrokerRecord { node_id: 2 },
1019 ));
1020
1021 image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1023 node_id: 1,
1024 config_name: "leader.replication.throttled.rate".into(),
1025 config_value: Some("2048".into()),
1026 }));
1027 image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1028 node_id: 1,
1029 config_name: "follower.replication.throttled.rate".into(),
1030 config_value: Some("4096".into()),
1031 }));
1032 image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1033 node_id: 1,
1034 config_name: "follower.replication.throttled.rate".into(),
1035 config_value: None,
1036 }));
1037
1038 image.apply(&MetadataRecord::V1Topic(TopicRecord {
1041 name: "orders".into(),
1042 topic_id: Uuid::new_v4(),
1043 partitions: 2,
1044 replication_factor: 2,
1045 }));
1046 image.apply(&MetadataRecord::V1Topic(TopicRecord {
1047 name: "doomed".into(),
1048 topic_id: Uuid::new_v4(),
1049 partitions: 1,
1050 replication_factor: 1,
1051 }));
1052 image.apply(&MetadataRecord::V1Partition(PartitionRecord {
1053 topic: "orders".into(),
1054 partition: 0,
1055 leader: 1,
1056 replicas: vec![1, 2],
1057 isr: vec![1, 2],
1058 leader_epoch: 4,
1059 adding_replicas: vec![2],
1060 removing_replicas: vec![],
1061 directories: vec![],
1062 partition_epoch: 0,
1063 }));
1064 image.apply(&MetadataRecord::V1Partition(PartitionRecord {
1065 topic: "orders".into(),
1066 partition: 1,
1067 leader: 2,
1068 replicas: vec![2, 1],
1069 isr: vec![2],
1070 leader_epoch: 7,
1071 adding_replicas: vec![],
1072 removing_replicas: vec![1],
1073 directories: vec![],
1074 partition_epoch: 0,
1075 }));
1076 image.apply(&MetadataRecord::V1Partition(PartitionRecord {
1077 topic: "doomed".into(),
1078 partition: 0,
1079 leader: 1,
1080 replicas: vec![1],
1081 isr: vec![1],
1082 leader_epoch: 0,
1083 adding_replicas: vec![],
1084 removing_replicas: vec![],
1085 directories: vec![],
1086 partition_epoch: 0,
1087 }));
1088
1089 let mut overrides = std::collections::BTreeMap::new();
1091 overrides.insert("retention.ms".to_string(), "60000".to_string());
1092 overrides.insert("segment.bytes".to_string(), "1048576".to_string());
1093 image.apply(&MetadataRecord::V1TopicConfig(TopicConfigRecord {
1094 topic: "orders".into(),
1095 overrides,
1096 }));
1097
1098 image.apply(&MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
1100 name: "doomed".into(),
1101 }));
1102
1103 image.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1105 user: "alice".into(),
1106 mechanism: SaslMechanism::ScramSha512,
1107 salt: vec![1; 16],
1108 stored_key: vec![2; 64],
1109 server_key: vec![3; 64],
1110 iterations: 8192,
1111 }));
1112 image.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1113 user: "bob".into(),
1114 mechanism: SaslMechanism::ScramSha256,
1115 salt: vec![4; 16],
1116 stored_key: vec![5; 32],
1117 server_key: vec![6; 32],
1118 iterations: 4096,
1119 }));
1120 image.apply(&MetadataRecord::V1DeleteScramCredential(
1121 DeleteScramCredentialRecord {
1122 user: "bob".into(),
1123 mechanism: SaslMechanism::ScramSha256,
1124 },
1125 ));
1126
1127 let literal = AclEntry {
1129 resource_type: ResourceType::Topic,
1130 resource_name: "orders".into(),
1131 pattern_type: PatternType::Literal,
1132 principal: "User:alice".into(),
1133 host: "*".into(),
1134 operation: AclOperation::Read,
1135 permission_type: PermissionType::Allow,
1136 };
1137 let prefixed = AclEntry {
1138 resource_type: ResourceType::Group,
1139 resource_name: "cg-".into(),
1140 pattern_type: PatternType::Prefixed,
1141 principal: "User:alice".into(),
1142 host: "*".into(),
1143 operation: AclOperation::Read,
1144 permission_type: PermissionType::Allow,
1145 };
1146 let doomed_acl = AclEntry {
1147 resource_type: ResourceType::Cluster,
1148 resource_name: "kafka-cluster".into(),
1149 pattern_type: PatternType::Literal,
1150 principal: "User:eve".into(),
1151 host: "*".into(),
1152 operation: AclOperation::Alter,
1153 permission_type: PermissionType::Deny,
1154 };
1155 image.apply(&MetadataRecord::V1AccessControlEntry(literal));
1156 image.apply(&MetadataRecord::V1AccessControlEntry(prefixed));
1157 image.apply(&MetadataRecord::V1AccessControlEntry(doomed_acl));
1158 image.apply(&MetadataRecord::V1DeleteAccessControlEntry(
1159 AclEntryFilter {
1160 resource_type: Some(ResourceType::Cluster),
1161 ..AclEntryFilter::default()
1162 },
1163 ));
1164
1165 image.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
1167 entity: vec![
1168 QuotaEntity {
1169 entity_type: "user".into(),
1170 entity_name: Some("alice".into()),
1171 },
1172 QuotaEntity {
1173 entity_type: "client-id".into(),
1174 entity_name: Some("app1".into()),
1175 },
1176 ],
1177 config_key: "producer_byte_rate".into(),
1178 config_value: Some(1024.0),
1179 }));
1180 image.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
1181 entity: vec![QuotaEntity {
1182 entity_type: "user".into(),
1183 entity_name: None,
1184 }],
1185 config_key: "consumer_byte_rate".into(),
1186 config_value: Some(2048.0),
1187 }));
1188
1189 let alice = KafkaPrincipal {
1191 principal_type: "User".into(),
1192 name: "alice".into(),
1193 };
1194 let bob = KafkaPrincipal {
1195 principal_type: "User".into(),
1196 name: "bob".into(),
1197 };
1198 image.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
1199 token_id: "tok-1".into(),
1200 owner: alice,
1201 hmac: vec![0x42; 32],
1202 issue_timestamp_ms: 1_000,
1203 expiry_timestamp_ms: 5_000,
1204 max_timestamp_ms: 10_000,
1205 renewers: vec![bob.clone()],
1206 }));
1207 image.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
1208 token_id: "tok-2".into(),
1209 owner: bob,
1210 hmac: vec![0x43; 32],
1211 issue_timestamp_ms: 1_000,
1212 expiry_timestamp_ms: 5_000,
1213 max_timestamp_ms: 10_000,
1214 renewers: vec![],
1215 }));
1216 image.apply(&MetadataRecord::V1DeleteDelegationToken(
1217 DeleteDelegationTokenRecord {
1218 token_id: "tok-2".into(),
1219 },
1220 ));
1221
1222 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1226 name: "metadata.version".into(),
1227 level: 24,
1228 }));
1229 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1230 name: "metadata.version".into(),
1231 level: 25,
1232 }));
1233 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1234 name: "group.version".into(),
1235 level: 1,
1236 }));
1237 assert!(image.finalized_features_epoch() == 2);
1238
1239 let rebuilt = MetadataImage::from_records(cid, &image.to_records());
1240 assert!(rebuilt == image);
1241 }
1242
1243 #[test]
1251 fn to_records_round_trips_features_and_epoch() {
1252 let cid = Uuid::new_v4();
1253 let mut image = MetadataImage::new(cid);
1254 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1257 name: "metadata.version".into(),
1258 level: 24,
1259 }));
1260 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1261 name: "metadata.version".into(),
1262 level: 25,
1263 }));
1264 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1265 name: "group.version".into(),
1266 level: 1,
1267 }));
1268 image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1269 name: "metadata.version".into(),
1270 level: 25,
1271 }));
1272 assert!(image.finalized_features_epoch() == 3);
1273
1274 let rebuilt = MetadataImage::from_records(cid, &image.to_records());
1275 assert!(rebuilt == image);
1276 assert!(rebuilt.finalized_features().get("metadata.version") == Some(&25));
1279 assert!(rebuilt.finalized_features().get("group.version") == Some(&1));
1280 assert!(rebuilt.finalized_features_epoch() == 3);
1281 }
1282
1283 #[test]
1291 fn to_records_preserves_voters_and_kraft_version() {
1292 use crate::records::{KRaftVersionRecord, VotersRecord};
1293 use crate::voters::{KRaftVersionRange, Voter, VoterEndpoint, VoterSet};
1294
1295 let cid = Uuid::new_v4();
1296 let mut image = MetadataImage::new(cid);
1297 image.apply(&MetadataRecord::V1KRaftVersion(KRaftVersionRecord {
1298 kraft_version: 1,
1299 }));
1300 let voters = VoterSet::from_voters([
1301 Voter {
1302 id: 1,
1303 directory_id: Uuid::from_u128(1),
1304 endpoints: vec![VoterEndpoint {
1305 name: "CONTROLLER".into(),
1306 host: "127.0.0.1".into(),
1307 port: 9093,
1308 }],
1309 kraft_version: KRaftVersionRange::default(),
1310 },
1311 Voter {
1312 id: 2,
1313 directory_id: Uuid::from_u128(2),
1314 endpoints: vec![VoterEndpoint {
1315 name: "CONTROLLER".into(),
1316 host: "127.0.0.1".into(),
1317 port: 9094,
1318 }],
1319 kraft_version: KRaftVersionRange { min: 0, max: 1 },
1320 },
1321 ]);
1322 image.apply(&MetadataRecord::V1Voters(VotersRecord {
1323 voters: voters.clone(),
1324 }));
1325
1326 let rebuilt = MetadataImage::from_records(cid, &image.to_records());
1327
1328 assert_eq!(rebuilt.kraft_version(), 1);
1329 assert_eq!(rebuilt.voters(), &voters);
1330 assert_eq!(rebuilt, image);
1331 }
1332
1333 fn topic(name: &str, partitions: i32) -> MetadataRecord {
1334 MetadataRecord::V1Topic(TopicRecord {
1335 name: name.into(),
1336 topic_id: Uuid::new_v4(),
1337 partitions,
1338 replication_factor: 1,
1339 })
1340 }
1341
1342 #[test]
1343 fn apply_topic_inserts() {
1344 let mut m = img();
1345 m.apply(&topic("t", 3));
1346 assert!(m.topic("t").is_some());
1347 }
1348
1349 #[test]
1350 fn apply_dir_assignment_merges_one_slot_without_clobbering_reassignment() {
1351 let mut m = img();
1355 m.apply(&topic("t", 1));
1356 m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1357 topic: "t".into(),
1358 partition: 0,
1359 leader: 1,
1360 replicas: vec![1, 2, 3],
1361 isr: vec![1, 2],
1362 leader_epoch: 0,
1363 adding_replicas: vec![3],
1364 removing_replicas: vec![],
1365 directories: vec![uuid::Uuid::nil(), uuid::Uuid::nil(), uuid::Uuid::nil()],
1366 partition_epoch: 0,
1367 }));
1368 let dir = uuid::Uuid::from_u128(0xABCD);
1369 m.apply(&MetadataRecord::V1PartitionDirAssignment(
1370 crate::records::PartitionDirAssignmentRecord {
1371 topic: "t".into(),
1372 partition: 0,
1373 replica: 2,
1374 directory: dir,
1375 },
1376 ));
1377 let pr = m.partition("t", 0).unwrap();
1378 assert!(pr.directories == vec![uuid::Uuid::nil(), dir, uuid::Uuid::nil()]);
1380 assert!(pr.adding_replicas == vec![3]);
1382 assert!(pr.replicas == vec![1, 2, 3]);
1383 assert!(pr.isr == vec![1, 2]);
1384 }
1385
1386 #[test]
1387 fn apply_delete_clears_partitions() {
1388 let mut m = img();
1389 m.apply(&topic("t", 2));
1390 m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1391 topic: "t".into(),
1392 partition: 0,
1393 leader: 1,
1394 replicas: vec![1],
1395 isr: vec![1],
1396 leader_epoch: 0,
1397 adding_replicas: vec![],
1398 removing_replicas: vec![],
1399 directories: vec![],
1400 partition_epoch: 0,
1401 }));
1402 m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1403 topic: "t".into(),
1404 partition: 1,
1405 leader: 1,
1406 replicas: vec![1],
1407 isr: vec![1],
1408 leader_epoch: 0,
1409 adding_replicas: vec![],
1410 removing_replicas: vec![],
1411 directories: vec![],
1412 partition_epoch: 0,
1413 }));
1414 assert!(m.partitions_of("t").count() == 2);
1415 m.apply(&MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
1416 name: "t".into(),
1417 }));
1418 assert!(m.topic("t").is_none());
1419 assert!(m.partitions_of("t").count() == 0);
1420 }
1421
1422 #[test]
1423 fn validate_topic_exists_rejected() {
1424 let mut m = img();
1425 m.apply(&topic("t", 1));
1426 let err = m.validate(&topic("t", 1)).unwrap_err();
1427 assert!(matches!(err, MetadataError::TopicExists(_)));
1428 }
1429
1430 fn apply_partitions(m: &mut MetadataImage, topic: &str, count: i32) {
1435 for p in 0..count {
1436 m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1437 topic: topic.into(),
1438 partition: p,
1439 leader: 1,
1440 replicas: vec![1],
1441 isr: vec![1],
1442 leader_epoch: 0,
1443 adding_replicas: vec![],
1444 removing_replicas: vec![],
1445 directories: vec![],
1446 partition_epoch: 0,
1447 }));
1448 }
1449 }
1450
1451 #[test]
1452 fn validate_topic_partition_count_increase_allowed() {
1453 let mut m = img();
1454 m.apply(&topic("t", 1));
1455 apply_partitions(&mut m, "t", 1);
1456 let existing = m.topic("t").unwrap().clone();
1457 let updated = MetadataRecord::V1Topic(TopicRecord {
1458 name: "t".into(),
1459 topic_id: existing.topic_id,
1460 partitions: 3,
1461 replication_factor: existing.replication_factor,
1462 });
1463 assert!(m.validate(&updated).is_ok());
1464 }
1465
1466 #[test]
1467 fn validate_topic_partition_count_decrease_rejected() {
1468 let mut m = img();
1469 m.apply(&topic("t", 3));
1470 apply_partitions(&mut m, "t", 3);
1471 let existing = m.topic("t").unwrap().clone();
1472 let updated = MetadataRecord::V1Topic(TopicRecord {
1473 name: "t".into(),
1474 topic_id: existing.topic_id,
1475 partitions: 1,
1476 replication_factor: existing.replication_factor,
1477 });
1478 let err = m.validate(&updated).unwrap_err();
1479 assert!(matches!(err, MetadataError::TopicExists(_)));
1480 }
1481
1482 #[test]
1483 fn validate_delete_unknown_topic_rejected() {
1484 let m = img();
1485 let err = m
1486 .validate(&MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
1487 name: "ghost".into(),
1488 }))
1489 .unwrap_err();
1490 assert!(matches!(err, MetadataError::UnknownTopic(_)));
1491 }
1492
1493 #[test]
1494 fn validate_partition_for_unknown_topic_rejected() {
1495 let m = img();
1496 let p = MetadataRecord::V1Partition(PartitionRecord {
1497 topic: "ghost".into(),
1498 partition: 0,
1499 leader: 1,
1500 replicas: vec![1],
1501 isr: vec![1],
1502 leader_epoch: 0,
1503 adding_replicas: vec![],
1504 removing_replicas: vec![],
1505 directories: vec![],
1506 partition_epoch: 0,
1507 });
1508 let err = m.validate(&p).unwrap_err();
1509 assert!(matches!(err, MetadataError::UnknownTopic(_)));
1510 }
1511
1512 #[test]
1513 fn broker_registration_is_idempotent() {
1514 let mut m = img();
1515 let b = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
1516 node_id: 1,
1517 broker_epoch: 0,
1518 incarnation_id: Uuid::nil(),
1519 host: "h".into(),
1520 port: 9092,
1521 rack: None,
1522 endpoints: vec![],
1523 });
1524 m.apply(&b);
1525 m.apply(&b);
1526 assert!(m.brokers().count() == 1);
1527 }
1528
1529 #[test]
1530 fn apply_topic_config_inserts() {
1531 let mut m = img();
1532 m.apply(&topic("t", 1));
1533 let mut overrides = std::collections::BTreeMap::new();
1534 overrides.insert("retention.ms".to_string(), "60000".to_string());
1535 m.apply(&MetadataRecord::V1TopicConfig(
1536 crate::records::TopicConfigRecord {
1537 topic: "t".into(),
1538 overrides: overrides.clone(),
1539 },
1540 ));
1541 assert!(m.topic_config("t") == Some(&overrides));
1542 }
1543
1544 #[test]
1545 fn apply_topic_config_replaces_previous() {
1546 let mut m = img();
1547 m.apply(&topic("t", 1));
1548
1549 let mut first = std::collections::BTreeMap::new();
1550 first.insert("retention.ms".to_string(), "60000".to_string());
1551 first.insert("segment.bytes".to_string(), "1024".to_string());
1552 m.apply(&MetadataRecord::V1TopicConfig(
1553 crate::records::TopicConfigRecord {
1554 topic: "t".into(),
1555 overrides: first,
1556 },
1557 ));
1558
1559 let mut second = std::collections::BTreeMap::new();
1560 second.insert("retention.ms".to_string(), "120000".to_string());
1561 m.apply(&MetadataRecord::V1TopicConfig(
1562 crate::records::TopicConfigRecord {
1563 topic: "t".into(),
1564 overrides: second.clone(),
1565 },
1566 ));
1567
1568 assert!(m.topic_config("t") == Some(&second));
1570 }
1571
1572 #[test]
1573 fn delete_topic_clears_configs() {
1574 let mut m = img();
1575 m.apply(&topic("t", 1));
1576 let mut overrides = std::collections::BTreeMap::new();
1577 overrides.insert("retention.ms".to_string(), "60000".to_string());
1578 m.apply(&MetadataRecord::V1TopicConfig(
1579 crate::records::TopicConfigRecord {
1580 topic: "t".into(),
1581 overrides,
1582 },
1583 ));
1584 m.apply(&MetadataRecord::V1DeleteTopic(
1585 crate::records::DeleteTopicRecord { name: "t".into() },
1586 ));
1587 assert!(m.topic_config("t").is_none());
1588 }
1589
1590 #[test]
1591 fn validate_topic_config_for_unknown_topic_rejected() {
1592 let m = img();
1593 let r = MetadataRecord::V1TopicConfig(crate::records::TopicConfigRecord {
1594 topic: "ghost".into(),
1595 overrides: std::collections::BTreeMap::new(),
1596 });
1597 let err = m.validate(&r).unwrap_err();
1598 assert!(matches!(err, MetadataError::UnknownTopic(_)));
1599 }
1600
1601 #[test]
1602 fn apply_scram_credential_stores() {
1603 let mut m = img();
1604 m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1605 user: "alice".into(),
1606 mechanism: crabka_security::SaslMechanism::ScramSha512,
1607 salt: vec![1; 16],
1608 stored_key: vec![2; 64],
1609 server_key: vec![3; 64],
1610 iterations: 4096,
1611 }));
1612 let got = m.scram_credential("alice", crabka_security::SaslMechanism::ScramSha512);
1613 assert!(got.is_some());
1614 assert!(got.unwrap().iterations == 4096);
1615 }
1616
1617 #[test]
1618 fn apply_scram_credential_last_write_wins() {
1619 let mut m = img();
1620 let mech = crabka_security::SaslMechanism::ScramSha512;
1621 m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1622 user: "alice".into(),
1623 mechanism: mech,
1624 salt: vec![1; 16],
1625 stored_key: vec![2; 64],
1626 server_key: vec![3; 64],
1627 iterations: 4096,
1628 }));
1629 m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1630 user: "alice".into(),
1631 mechanism: mech,
1632 salt: vec![9; 16],
1633 stored_key: vec![9; 64],
1634 server_key: vec![9; 64],
1635 iterations: 8192,
1636 }));
1637 let got = m.scram_credential("alice", mech).unwrap();
1638 assert!(got.iterations == 8192);
1639 assert!(got.salt == vec![9; 16]);
1640 }
1641
1642 #[test]
1643 fn delete_scram_credential_removes() {
1644 let mut m = img();
1645 let mech = crabka_security::SaslMechanism::ScramSha512;
1646 m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1647 user: "alice".into(),
1648 mechanism: mech,
1649 salt: vec![1; 16],
1650 stored_key: vec![2; 64],
1651 server_key: vec![3; 64],
1652 iterations: 4096,
1653 }));
1654 m.apply(&MetadataRecord::V1DeleteScramCredential(
1655 DeleteScramCredentialRecord {
1656 user: "alice".into(),
1657 mechanism: mech,
1658 },
1659 ));
1660 assert!(m.scram_credential("alice", mech).is_none());
1661 }
1662
1663 fn topic_read_for_alice() -> AclEntry {
1664 AclEntry {
1665 resource_type: ResourceType::Topic,
1666 resource_name: "foo".into(),
1667 pattern_type: PatternType::Literal,
1668 principal: "User:alice".into(),
1669 host: "*".into(),
1670 operation: AclOperation::Read,
1671 permission_type: PermissionType::Allow,
1672 }
1673 }
1674
1675 fn topic_prefixed_team() -> AclEntry {
1676 AclEntry {
1677 resource_type: ResourceType::Topic,
1678 resource_name: "team-".into(),
1679 pattern_type: PatternType::Prefixed,
1680 principal: "User:alice".into(),
1681 host: "*".into(),
1682 operation: AclOperation::Read,
1683 permission_type: PermissionType::Allow,
1684 }
1685 }
1686
1687 #[test]
1688 fn apply_v1_access_control_entry_literal_stores_in_literal_map() {
1689 let mut m = img();
1690 m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1691 let mut hits: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1692 assert!(hits.len() == 1);
1693 assert!(hits.pop().unwrap().resource_name == "foo");
1694 }
1695
1696 #[test]
1697 fn apply_v1_access_control_entry_prefixed_stores_in_prefixed_vec() {
1698 let mut m = img();
1699 m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1700 let hits: Vec<_> = m.matching_acls(ResourceType::Topic, "team-foo").collect();
1701 assert!(hits.len() == 1);
1702 assert!(hits[0].resource_name == "team-");
1703 let none: Vec<_> = m.matching_acls(ResourceType::Topic, "other").collect();
1705 assert!(none.is_empty());
1706 }
1707
1708 #[test]
1709 fn matching_acls_combines_literal_and_prefixed() {
1710 let mut m = img();
1711 m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1712 m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1713 let hits_foo: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1714 let hits_team: Vec<_> = m.matching_acls(ResourceType::Topic, "team-x").collect();
1715 assert!(hits_foo.len() == 1);
1716 assert!(hits_team.len() == 1);
1717 }
1718
1719 fn topic_wildcard_allow() -> AclEntry {
1720 AclEntry {
1721 resource_type: ResourceType::Topic,
1722 resource_name: "*".into(),
1723 pattern_type: PatternType::Literal,
1724 principal: "User:alice".into(),
1725 host: "*".into(),
1726 operation: AclOperation::Read,
1727 permission_type: PermissionType::Allow,
1728 }
1729 }
1730
1731 fn topic_wildcard_deny() -> AclEntry {
1732 AclEntry {
1733 resource_type: ResourceType::Topic,
1734 resource_name: "*".into(),
1735 pattern_type: PatternType::Literal,
1736 principal: "User:alice".into(),
1737 host: "*".into(),
1738 operation: AclOperation::Read,
1739 permission_type: PermissionType::Deny,
1740 }
1741 }
1742
1743 #[test]
1744 fn matching_acls_literal_wildcard_allow_matches_arbitrary_topic() {
1745 let mut m = img();
1746 m.apply(&MetadataRecord::V1AccessControlEntry(topic_wildcard_allow()));
1747 let hits: Vec<_> = m
1749 .matching_acls(ResourceType::Topic, "some-random-topic")
1750 .collect();
1751 assert!(hits.len() == 1);
1752 assert!(hits[0].resource_name == "*");
1753 assert!(hits[0].permission_type == PermissionType::Allow);
1754 let star: Vec<_> = m.matching_acls(ResourceType::Topic, "*").collect();
1756 assert!(star.len() == 1);
1757 }
1758
1759 #[test]
1760 fn matching_acls_literal_wildcard_deny_matches_arbitrary_topic() {
1761 let mut m = img();
1762 m.apply(&MetadataRecord::V1AccessControlEntry(topic_wildcard_deny()));
1763 let hits: Vec<_> = m
1764 .matching_acls(ResourceType::Topic, "another-random-topic")
1765 .collect();
1766 assert!(hits.len() == 1);
1767 assert!(hits[0].resource_name == "*");
1768 assert!(hits[0].permission_type == PermissionType::Deny);
1769 }
1770
1771 #[test]
1772 fn apply_v1_delete_access_control_entry_removes_matching() {
1773 let mut m = img();
1774 m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1775 m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1776 let filter = AclEntryFilter {
1777 resource_type: Some(ResourceType::Topic),
1778 pattern_type: Some(PatternType::Literal),
1779 ..AclEntryFilter::default()
1780 };
1781 m.apply(&MetadataRecord::V1DeleteAccessControlEntry(filter));
1782 let hits_foo: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1783 let hits_team: Vec<_> = m.matching_acls(ResourceType::Topic, "team-x").collect();
1784 assert!(hits_foo.len() == 0); assert!(hits_team.len() == 1); }
1787
1788 #[test]
1789 fn apply_v1_delete_access_control_entry_no_match_is_noop() {
1790 let mut m = img();
1791 m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1792 let filter = AclEntryFilter {
1793 resource_type: Some(ResourceType::Group),
1794 ..AclEntryFilter::default()
1795 };
1796 m.apply(&MetadataRecord::V1DeleteAccessControlEntry(filter));
1797 let hits: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1798 assert!(hits.len() == 1);
1799 }
1800
1801 #[test]
1802 fn all_acls_returns_every_entry() {
1803 let mut m = img();
1804 m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1805 m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1806 assert!(m.all_acls().count() == 2);
1807 }
1808
1809 #[test]
1810 fn all_partitions_count_matches_map_size() {
1811 let mut m = img();
1812 m.apply(&topic("t", 3));
1813 for p in 0..3 {
1814 m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1815 topic: "t".into(),
1816 partition: p,
1817 leader: 1,
1818 replicas: vec![1],
1819 isr: vec![1],
1820 leader_epoch: 0,
1821 adding_replicas: vec![],
1822 removing_replicas: vec![],
1823 directories: vec![],
1824 partition_epoch: 0,
1825 }));
1826 }
1827 m.apply(&topic("u", 1));
1828 m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1829 topic: "u".into(),
1830 partition: 0,
1831 leader: 1,
1832 replicas: vec![1],
1833 isr: vec![1],
1834 leader_epoch: 0,
1835 adding_replicas: vec![],
1836 removing_replicas: vec![],
1837 directories: vec![],
1838 partition_epoch: 0,
1839 }));
1840 assert!(m.all_partitions().count() == 4);
1842 assert!(
1843 m.all_partitions().count()
1844 == m.partitions_of("t").count() + m.partitions_of("u").count()
1845 );
1846 }
1847
1848 #[test]
1849 fn reassignments_in_flight_excludes_idle_partitions() {
1850 let mut img = MetadataImage::new(uuid::Uuid::nil());
1851 img.apply(&MetadataRecord::V1Topic(TopicRecord {
1852 name: "foo".into(),
1853 topic_id: uuid::Uuid::nil(),
1854 partitions: 1,
1855 replication_factor: 3,
1856 }));
1857 img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1858 topic: "foo".into(),
1859 partition: 0,
1860 leader: 1,
1861 replicas: vec![1, 2, 3],
1862 isr: vec![1, 2, 3],
1863 leader_epoch: 0,
1864 adding_replicas: vec![],
1865 removing_replicas: vec![],
1866 directories: vec![],
1867 partition_epoch: 0,
1868 }));
1869 assert!(img.reassignments_in_flight().count() == 0);
1870 }
1871
1872 #[test]
1873 fn reassignments_in_flight_returns_partitions_with_adding() {
1874 let mut img = MetadataImage::new(uuid::Uuid::nil());
1875 img.apply(&MetadataRecord::V1Topic(TopicRecord {
1876 name: "foo".into(),
1877 topic_id: uuid::Uuid::nil(),
1878 partitions: 1,
1879 replication_factor: 3,
1880 }));
1881 img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1882 topic: "foo".into(),
1883 partition: 0,
1884 leader: 1,
1885 replicas: vec![1, 2, 3, 4],
1886 isr: vec![1, 2, 3],
1887 leader_epoch: 0,
1888 adding_replicas: vec![4],
1889 removing_replicas: vec![],
1890 directories: vec![],
1891 partition_epoch: 0,
1892 }));
1893 let rows: Vec<_> = img.reassignments_in_flight().collect();
1894 assert!(rows.len() == 1);
1895 assert!(rows[0].adding_replicas == vec![4]);
1896 }
1897
1898 #[test]
1899 fn reassignments_in_flight_returns_partitions_with_removing() {
1900 let mut img = MetadataImage::new(uuid::Uuid::nil());
1901 img.apply(&MetadataRecord::V1Topic(TopicRecord {
1902 name: "foo".into(),
1903 topic_id: uuid::Uuid::nil(),
1904 partitions: 1,
1905 replication_factor: 3,
1906 }));
1907 img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1908 topic: "foo".into(),
1909 partition: 0,
1910 leader: 1,
1911 replicas: vec![1, 2, 3],
1912 isr: vec![1, 2, 3],
1913 leader_epoch: 0,
1914 adding_replicas: vec![],
1915 removing_replicas: vec![3],
1916 directories: vec![],
1917 partition_epoch: 0,
1918 }));
1919 let rows: Vec<_> = img.reassignments_in_flight().collect();
1920 assert!(rows.len() == 1);
1921 assert!(rows[0].removing_replicas == vec![3]);
1922 }
1923
1924 #[test]
1925 fn reassignments_in_flight_covers_multiple_topics() {
1926 let mut img = MetadataImage::new(uuid::Uuid::nil());
1927 for name in ["foo", "bar"] {
1928 img.apply(&MetadataRecord::V1Topic(TopicRecord {
1929 name: name.into(),
1930 topic_id: uuid::Uuid::nil(),
1931 partitions: 1,
1932 replication_factor: 3,
1933 }));
1934 img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1935 topic: name.into(),
1936 partition: 0,
1937 leader: 1,
1938 replicas: vec![1, 2, 3, 4],
1939 isr: vec![1, 2, 3],
1940 leader_epoch: 0,
1941 adding_replicas: vec![4],
1942 removing_replicas: vec![],
1943 directories: vec![],
1944 partition_epoch: 0,
1945 }));
1946 }
1947 assert!(img.reassignments_in_flight().count() == 2);
1948 }
1949
1950 #[test]
1951 fn broker_config_set_inserts_into_image() {
1952 let mut img = MetadataImage::new(uuid::Uuid::nil());
1953 img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1954 node_id: 1,
1955 config_name: "leader.replication.throttled.rate".into(),
1956 config_value: Some("2048".into()),
1957 }));
1958 let bc = img.broker_config(1).expect("broker config");
1959 assert!(bc.get("leader.replication.throttled.rate") == Some(&"2048".to_string()));
1960 }
1961
1962 #[test]
1963 fn broker_config_delete_removes_from_image() {
1964 let mut img = MetadataImage::new(uuid::Uuid::nil());
1965 img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1966 node_id: 1,
1967 config_name: "leader.replication.throttled.rate".into(),
1968 config_value: Some("2048".into()),
1969 }));
1970 img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1971 node_id: 1,
1972 config_name: "leader.replication.throttled.rate".into(),
1973 config_value: None,
1974 }));
1975 let bc = img.broker_config(1).expect("broker_configs entry retained");
1976 assert!(bc.get("leader.replication.throttled.rate").is_none());
1977 }
1978
1979 #[test]
1980 fn broker_throttle_rate_parses_positive_value() {
1981 let mut img = MetadataImage::new(uuid::Uuid::nil());
1982 img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1983 node_id: 1,
1984 config_name: "leader.replication.throttled.rate".into(),
1985 config_value: Some("2048".into()),
1986 }));
1987 assert!(img.broker_throttle_rate(1, ThrottleKind::Leader) == Some(2048));
1988 }
1989
1990 #[test]
1991 fn broker_throttle_rate_returns_none_for_negative_one() {
1992 let mut img = MetadataImage::new(uuid::Uuid::nil());
1993 img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1994 node_id: 1,
1995 config_name: "leader.replication.throttled.rate".into(),
1996 config_value: Some("-1".into()),
1997 }));
1998 assert!(img.broker_throttle_rate(1, ThrottleKind::Leader).is_none());
1999 }
2000
2001 #[test]
2002 fn client_quota_apply_inserts_canonicalized() {
2003 let mut img = MetadataImage::new(uuid::Uuid::nil());
2004 img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2006 entity: vec![
2007 QuotaEntity {
2008 entity_type: "user".into(),
2009 entity_name: Some("alice".into()),
2010 },
2011 QuotaEntity {
2012 entity_type: "client-id".into(),
2013 entity_name: Some("app1".into()),
2014 },
2015 ],
2016 config_key: "producer_byte_rate".into(),
2017 config_value: Some(1024.0),
2018 }));
2019 let key: EntityKey = vec![
2020 ("client-id".into(), Some("app1".into())),
2021 ("user".into(), Some("alice".into())),
2022 ];
2023 let configs = img
2024 .client_quotas()
2025 .get(&key)
2026 .expect("entry under canonical key");
2027 assert!(configs.get("producer_byte_rate") == Some(&1024.0));
2028 }
2029
2030 #[test]
2031 fn client_quota_apply_delete_removes_key() {
2032 let mut img = MetadataImage::new(uuid::Uuid::nil());
2033 img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2034 entity: vec![QuotaEntity {
2035 entity_type: "user".into(),
2036 entity_name: Some("alice".into()),
2037 }],
2038 config_key: "producer_byte_rate".into(),
2039 config_value: Some(1024.0),
2040 }));
2041 img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2042 entity: vec![QuotaEntity {
2043 entity_type: "user".into(),
2044 entity_name: Some("alice".into()),
2045 }],
2046 config_key: "producer_byte_rate".into(),
2047 config_value: None,
2048 }));
2049 let key: EntityKey = vec![("user".into(), Some("alice".into()))];
2050 let configs = img.client_quotas().get(&key).expect("entry retained");
2051 assert!(configs.get("producer_byte_rate").is_none());
2052 }
2053
2054 #[test]
2055 fn client_quota_default_entity_uses_none_name() {
2056 let mut img = MetadataImage::new(uuid::Uuid::nil());
2057 img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2058 entity: vec![QuotaEntity {
2059 entity_type: "user".into(),
2060 entity_name: None,
2061 }],
2062 config_key: "producer_byte_rate".into(),
2063 config_value: Some(512.0),
2064 }));
2065 let key: EntityKey = vec![("user".into(), None)];
2066 assert!(img.client_quotas().contains_key(&key));
2067 }
2068
2069 #[test]
2070 fn canonicalize_sorts_alphabetically_by_entity_type() {
2071 let input = vec![
2072 ("user".to_string(), Some("alice".to_string())),
2073 ("client-id".to_string(), Some("app1".to_string())),
2074 ];
2075 let canon = canonicalize_entity(input);
2076 assert!(canon[0].0 == "client-id");
2077 assert!(canon[1].0 == "user");
2078 }
2079
2080 #[test]
2081 fn scram_credentials_users_returns_distinct_users() {
2082 let mut img = MetadataImage::new(uuid::Uuid::nil());
2083 img.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
2084 user: "alice".into(),
2085 mechanism: SaslMechanism::ScramSha512,
2086 salt: vec![1, 2, 3],
2087 stored_key: vec![4, 5, 6],
2088 server_key: vec![7, 8, 9],
2089 iterations: 4096,
2090 }));
2091 img.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
2092 user: "bob".into(),
2093 mechanism: SaslMechanism::ScramSha512,
2094 salt: vec![1, 2, 3],
2095 stored_key: vec![4, 5, 6],
2096 server_key: vec![7, 8, 9],
2097 iterations: 4096,
2098 }));
2099 let mut users = img.scram_credentials_users();
2100 users.sort();
2101 assert!(users == vec!["alice".to_string(), "bob".to_string()]);
2102 }
2103
2104 fn principal(pt: &str, name: &str) -> KafkaPrincipal {
2105 KafkaPrincipal {
2106 principal_type: pt.into(),
2107 name: name.into(),
2108 }
2109 }
2110
2111 fn dt_record(
2112 token_id: &str,
2113 owner: KafkaPrincipal,
2114 expiry_timestamp_ms: i64,
2115 renewers: Vec<KafkaPrincipal>,
2116 ) -> MetadataRecord {
2117 MetadataRecord::V1DelegationToken(DelegationTokenRecord {
2118 token_id: token_id.into(),
2119 owner,
2120 hmac: vec![0x42; 32],
2121 issue_timestamp_ms: 1_000,
2122 expiry_timestamp_ms,
2123 max_timestamp_ms: 10_000,
2124 renewers,
2125 })
2126 }
2127
2128 #[test]
2129 fn apply_delegation_token_insert_and_replace() {
2130 let mut img = MetadataImage::new(uuid::Uuid::nil());
2131 let alice = principal("User", "alice");
2132
2133 img.apply(&dt_record("tok-1", alice.clone(), 5_000, vec![]));
2134 let got = img.delegation_token_by_id("tok-1").expect("token present");
2135 assert!(got.expiry_timestamp_ms == 5_000);
2136 assert!(got.owner == alice);
2137
2138 img.apply(&dt_record("tok-1", alice.clone(), 7_500, vec![]));
2140 let got = img.delegation_token_by_id("tok-1").expect("token present");
2141 assert!(got.expiry_timestamp_ms == 7_500);
2142 assert!(img.all_delegation_tokens().count() == 1);
2143 }
2144
2145 #[test]
2146 fn apply_delete_delegation_token_removes_from_image() {
2147 let mut img = MetadataImage::new(uuid::Uuid::nil());
2148 let alice = principal("User", "alice");
2149
2150 img.apply(&dt_record("tok-1", alice, 5_000, vec![]));
2151 assert!(img.delegation_token_by_id("tok-1").is_some());
2152
2153 img.apply(&MetadataRecord::V1DeleteDelegationToken(
2154 DeleteDelegationTokenRecord {
2155 token_id: "tok-1".into(),
2156 },
2157 ));
2158 assert!(img.delegation_token_by_id("tok-1").is_none());
2159 assert!(img.all_delegation_tokens().count() == 0);
2160 }
2161
2162 #[test]
2163 fn delegation_token_by_hmac_finds_token_by_hmac_bytes() {
2164 let mut img = MetadataImage::new(uuid::Uuid::nil());
2165 let alice = principal("User", "alice");
2166 let bob = principal("User", "bob");
2167
2168 let hmac_a = vec![0xAA; 32];
2169 let hmac_b = vec![0xBB; 32];
2170 img.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
2171 token_id: "tok-a".into(),
2172 owner: alice,
2173 hmac: hmac_a.clone(),
2174 issue_timestamp_ms: 1_000,
2175 expiry_timestamp_ms: 5_000,
2176 max_timestamp_ms: 10_000,
2177 renewers: vec![],
2178 }));
2179 img.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
2180 token_id: "tok-b".into(),
2181 owner: bob,
2182 hmac: hmac_b.clone(),
2183 issue_timestamp_ms: 1_000,
2184 expiry_timestamp_ms: 5_000,
2185 max_timestamp_ms: 10_000,
2186 renewers: vec![],
2187 }));
2188
2189 let found_a = img
2190 .delegation_token_by_hmac(&hmac_a)
2191 .expect("hmac_a present");
2192 assert!(found_a.token_id == "tok-a");
2193 let found_b = img
2194 .delegation_token_by_hmac(&hmac_b)
2195 .expect("hmac_b present");
2196 assert!(found_b.token_id == "tok-b");
2197 assert!(img.delegation_token_by_hmac(&[0xCC; 32]).is_none());
2198 }
2199
2200 #[test]
2201 fn delegation_tokens_by_owner_filters_correctly() {
2202 let mut img = MetadataImage::new(uuid::Uuid::nil());
2203 let alice = principal("User", "alice");
2204 let bob = principal("User", "bob");
2205
2206 img.apply(&dt_record("a-1", alice.clone(), 5_000, vec![]));
2207 img.apply(&dt_record("a-2", alice.clone(), 6_000, vec![bob.clone()]));
2208 img.apply(&dt_record("b-1", bob.clone(), 7_000, vec![]));
2209
2210 let alice_tokens = img.delegation_tokens_by_owner(&alice);
2211 assert!(alice_tokens.len() == 2);
2212 assert!(alice_tokens.iter().all(|t| t.owner == alice));
2213
2214 let bob_tokens = img.delegation_tokens_by_owner(&bob);
2215 assert!(bob_tokens.len() == 1);
2216 assert!(bob_tokens[0].token_id == "b-1");
2217
2218 let bob_visible = img.delegation_tokens_visible_to(&bob);
2220 assert!(bob_visible.len() == 2);
2221 let mut ids: Vec<&str> = bob_visible.iter().map(|t| t.token_id.as_str()).collect();
2222 ids.sort_unstable();
2223 assert!(ids == vec!["a-2", "b-1"]);
2224 }
2225
2226 #[test]
2227 fn applies_voters_and_version() {
2228 let mut image = MetadataImage::default();
2229 image.apply(&MetadataRecord::V1KRaftVersion(
2230 crate::records::KRaftVersionRecord { kraft_version: 1 },
2231 ));
2232 image.apply(&MetadataRecord::V1Voters(crate::records::VotersRecord {
2233 voters: crate::voters::VoterSet::from_voters([crate::voters::Voter {
2234 id: 1,
2235 directory_id: uuid::Uuid::nil(),
2236 endpoints: vec![],
2237 kraft_version: crate::voters::KRaftVersionRange::default(),
2238 }]),
2239 }));
2240 assert!(image.kraft_version() == 1);
2241 assert!(image.voters().contains(1));
2242 }
2243
2244 #[test]
2245 fn scram_credentials_for_user_returns_pairs() {
2246 let mut img = MetadataImage::new(uuid::Uuid::nil());
2247 img.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
2248 user: "alice".into(),
2249 mechanism: SaslMechanism::ScramSha512,
2250 salt: vec![1, 2, 3],
2251 stored_key: vec![4, 5, 6],
2252 server_key: vec![7, 8, 9],
2253 iterations: 8192,
2254 }));
2255 let pairs = img.scram_credentials_for_user("alice");
2256 assert!(pairs.len() == 1);
2257 assert!(pairs[0].0 == SaslMechanism::ScramSha512);
2258 assert!(pairs[0].1 == 8192);
2259 assert!(img.scram_credentials_for_user("ghost").is_empty());
2260 }
2261
2262 #[test]
2263 fn finalized_metadata_version_reads_feature_map() {
2264 use crate::records::FeatureLevelRecord;
2265 let mut m = img();
2266 assert!(m.finalized_metadata_version() == None);
2267 m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
2268 name: "metadata.version".into(),
2269 level: 19,
2270 }));
2271 assert!(m.finalized_metadata_version() == Some(19));
2272 }
2273
2274 #[test]
2275 fn min_required_metadata_version_baseline_is_min() {
2276 use crate::metadata_version::METADATA_VERSION_MIN;
2277 let m = img();
2278 assert!(m.min_required_metadata_version() == METADATA_VERSION_MIN);
2279 }
2280
2281 #[test]
2282 fn client_metrics_config_apply_and_clear() {
2283 use crate::records::ClientMetricsConfigRecord;
2284 let mut img = MetadataImage::new(uuid::Uuid::nil());
2285 let mut cfgs = std::collections::BTreeMap::new();
2286 cfgs.insert("interval.ms".to_string(), "60000".to_string());
2287 img.apply(&MetadataRecord::V1ClientMetricsConfig(
2288 ClientMetricsConfigRecord {
2289 name: "sub-a".into(),
2290 configs: cfgs,
2291 },
2292 ));
2293 assert_eq!(
2294 img.client_metrics_config("sub-a")
2295 .and_then(|m| m.get("interval.ms"))
2296 .map(String::as_str),
2297 Some("60000")
2298 );
2299 assert_eq!(img.client_metrics_subscriptions().count(), 1);
2300 img.apply(&MetadataRecord::V1ClientMetricsConfig(
2301 ClientMetricsConfigRecord {
2302 name: "sub-a".into(),
2303 configs: std::collections::BTreeMap::new(),
2304 },
2305 ));
2306 assert!(img.client_metrics_config("sub-a").is_none());
2307 assert_eq!(img.client_metrics_subscriptions().count(), 0);
2308 }
2309
2310 #[test]
2311 fn min_required_metadata_version_rises_with_scram_and_tokens() {
2312 use crate::metadata_version::{DELEGATION_TOKEN_MIN_LEVEL, SCRAM_MIN_LEVEL};
2313 use crabka_security::{KafkaPrincipal, SaslMechanism};
2314 let mut m = img();
2315 m.apply(&MetadataRecord::V1ScramCredential(
2316 crate::records::ScramCredentialRecord {
2317 user: "alice".into(),
2318 mechanism: SaslMechanism::ScramSha512,
2319 salt: vec![1; 16],
2320 stored_key: vec![2; 64],
2321 server_key: vec![3; 64],
2322 iterations: 4096,
2323 },
2324 ));
2325 assert!(m.min_required_metadata_version() == SCRAM_MIN_LEVEL);
2326 m.apply(&MetadataRecord::V1DelegationToken(
2327 crate::records::DelegationTokenRecord {
2328 token_id: "t1".into(),
2329 owner: KafkaPrincipal {
2330 principal_type: "User".into(),
2331 name: "alice".into(),
2332 },
2333 hmac: vec![0x42; 32],
2334 issue_timestamp_ms: 1,
2335 expiry_timestamp_ms: 5,
2336 max_timestamp_ms: 10,
2337 renewers: vec![],
2338 },
2339 ));
2340 assert!(m.min_required_metadata_version() == DELEGATION_TOKEN_MIN_LEVEL);
2341 }
2342
2343 #[test]
2344 fn topic_by_id_resolves_and_drops_on_delete() {
2345 use crate::records::{MetadataRecord, TopicRecord};
2346 let mut img = MetadataImage::new(Uuid::nil());
2347 let id = Uuid::from_u128(0x1234_5678_9abc_def0_1122_3344_5566_7788);
2348 img.apply(&MetadataRecord::V1Topic(TopicRecord {
2349 name: "orders".into(),
2350 topic_id: id,
2351 partitions: 1,
2352 replication_factor: 1,
2353 }));
2354
2355 assert!(img.topic_by_id(&id).map(|t| t.name.as_str()) == Some("orders"));
2357 assert!(img.topic_name_by_id(&id) == Some("orders"));
2358
2359 img.apply(&MetadataRecord::V1DeleteTopic(
2361 crate::records::DeleteTopicRecord {
2362 name: "orders".into(),
2363 },
2364 ));
2365 assert!(img.topic_by_id(&id).is_none());
2366 assert!(img.topic_name_by_id(&id).is_none());
2367 }
2368
2369 #[test]
2370 fn broker_epoch_reads_back_registered_epoch() {
2371 let mut image = MetadataImage::new(Uuid::nil());
2372 image.apply(&MetadataRecord::V1BrokerRegistration(
2373 BrokerRegistrationRecord {
2374 node_id: 5,
2375 broker_epoch: 99,
2376 incarnation_id: Uuid::nil(),
2377 host: "h".into(),
2378 port: 9092,
2379 rack: None,
2380 endpoints: vec![],
2381 },
2382 ));
2383 assert!(image.broker_epoch(5) == Some(99));
2384 assert!(image.broker_epoch(404) == None);
2385 }
2386}