Skip to main content

crabka_metadata/
image.rs

1//! Immutable snapshot of the cluster's metadata state. Mutated only by
2//! [`MetadataImage::apply`] (called from the Raft state machine), and
3//! read everywhere else via shared references / `Arc` clones.
4
5use std::collections::{BTreeMap, HashMap};
6
7use uuid::Uuid;
8
9use crabka_security::{KafkaPrincipal, SaslMechanism, ScramCredential};
10
11use crate::acl::{AclEntry, PatternType, ResourceType};
12use crate::error::MetadataError;
13use crate::records::{
14    BrokerConfigRecord, BrokerRegistrationRecord, ClientMetricsConfigRecord, ClientQuotaRecord,
15    DelegationTokenRecord, FeatureLevelRecord, FeaturesEpochRecord, KRaftVersionRecord,
16    MetadataRecord, NodeId, PartitionRecord, QuotaEntity, ScramCredentialRecord, TopicConfigRecord,
17    TopicRecord, VotersRecord,
18};
19
20pub type EntityKey = Vec<(String, Option<String>)>;
21
22/// In-memory image type for a single delegation
23/// token (KIP-48). Mirrors [`DelegationTokenRecord`] minus any tombstone
24/// concerns — tombstones are handled as removals on the apply path.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct DelegationToken {
27    pub token_id: String,
28    pub owner: KafkaPrincipal,
29    pub hmac: Vec<u8>,
30    pub issue_timestamp_ms: i64,
31    pub expiry_timestamp_ms: i64,
32    pub max_timestamp_ms: i64,
33    pub renewers: Vec<KafkaPrincipal>,
34}
35
36impl DelegationToken {
37    #[must_use]
38    pub fn from_record(rec: &DelegationTokenRecord) -> Self {
39        Self {
40            token_id: rec.token_id.clone(),
41            owner: rec.owner.clone(),
42            hmac: rec.hmac.clone(),
43            issue_timestamp_ms: rec.issue_timestamp_ms,
44            expiry_timestamp_ms: rec.expiry_timestamp_ms,
45            max_timestamp_ms: rec.max_timestamp_ms,
46            renewers: rec.renewers.clone(),
47        }
48    }
49}
50
51#[must_use]
52pub fn canonicalize_entity(mut tuple: Vec<(String, Option<String>)>) -> EntityKey {
53    tuple.sort_by(|a, b| a.0.cmp(&b.0));
54    tuple
55}
56
57#[derive(Debug, Clone, Default, PartialEq)]
58pub struct MetadataImage {
59    cluster_id: Uuid,
60    topics: HashMap<String, TopicRecord>,
61    /// KIP-516 reverse index: topic UUID -> topic name. Maintained in
62    /// `apply()` alongside `topics`; rebuilt on snapshot replay because
63    /// every record (including snapshot installs) flows through `apply()`.
64    topic_ids: HashMap<Uuid, String>,
65    partitions: HashMap<(String, i32), PartitionRecord>,
66    brokers: HashMap<NodeId, BrokerRegistrationRecord>,
67    topic_configs: HashMap<String, BTreeMap<String, String>>,
68    broker_configs: HashMap<NodeId, BTreeMap<String, String>>,
69    client_metrics_configs: HashMap<String, BTreeMap<String, String>>,
70    scram_credentials: HashMap<(String, SaslMechanism), ScramCredential>,
71    acls_literal: HashMap<(ResourceType, String), Vec<AclEntry>>,
72    acls_prefixed: HashMap<ResourceType, Vec<AclEntry>>,
73    client_quotas: HashMap<EntityKey, BTreeMap<String, f64>>,
74    delegation_tokens: HashMap<String, DelegationToken>,
75    kraft_version: u16,
76    voters: crate::voters::VoterSet,
77    feature_levels: BTreeMap<String, i16>,
78    /// KIP-584 finalized-features epoch. `-1` until the first
79    /// `V1FeatureLevel` record applies, then monotonically increasing
80    /// (one bump per applied record). Deterministic across replicas
81    /// because records apply in committed-log order on every node.
82    features_epoch: i64,
83}
84
85/// Selects which KIP-73 throttle rate config key to read.
86#[derive(Debug, Clone, Copy)]
87pub enum ThrottleKind {
88    Leader,
89    Follower,
90}
91
92impl MetadataImage {
93    #[must_use]
94    pub fn new(cluster_id: Uuid) -> Self {
95        Self {
96            cluster_id,
97            topics: HashMap::new(),
98            topic_ids: HashMap::new(),
99            partitions: HashMap::new(),
100            brokers: HashMap::new(),
101            topic_configs: HashMap::new(),
102            broker_configs: HashMap::new(),
103            client_metrics_configs: HashMap::new(),
104            scram_credentials: HashMap::new(),
105            acls_literal: HashMap::new(),
106            acls_prefixed: HashMap::new(),
107            client_quotas: HashMap::new(),
108            delegation_tokens: HashMap::new(),
109            kraft_version: 0,
110            voters: crate::voters::VoterSet::default(),
111            feature_levels: BTreeMap::new(),
112            features_epoch: -1,
113        }
114    }
115
116    #[must_use]
117    pub fn cluster_id(&self) -> Uuid {
118        self.cluster_id
119    }
120
121    pub fn topics(&self) -> impl Iterator<Item = &TopicRecord> {
122        self.topics.values()
123    }
124
125    #[must_use]
126    pub fn topic(&self, name: &str) -> Option<&TopicRecord> {
127        self.topics.get(name)
128    }
129
130    /// KIP-516: resolve a topic by its UUID. O(1) via the `topic_ids` index.
131    #[must_use]
132    pub fn topic_by_id(&self, id: &Uuid) -> Option<&TopicRecord> {
133        let name = self.topic_ids.get(id)?;
134        self.topics.get(name)
135    }
136
137    /// KIP-516: resolve a topic name by its UUID.
138    #[must_use]
139    pub fn topic_name_by_id(&self, id: &Uuid) -> Option<&str> {
140        self.topic_ids.get(id).map(String::as_str)
141    }
142
143    #[must_use]
144    pub fn partition(&self, topic: &str, idx: i32) -> Option<&PartitionRecord> {
145        self.partitions.get(&(topic.to_string(), idx))
146    }
147
148    pub fn partitions_of(&self, topic: &str) -> impl Iterator<Item = &PartitionRecord> {
149        self.partitions
150            .iter()
151            .filter(move |((t, _), _)| t == topic)
152            .map(|(_, v)| v)
153    }
154
155    /// The live partition count for `topic`, derived from the partitions
156    /// map rather than the stored `TopicRecord.partitions` field. This is
157    /// the authoritative count for the KIP-631 round-trip (the KIP-631
158    /// `TopicRecord` carries no partition count) and for the `validate`
159    /// partition-count-grew check. Returns 0 for an unknown topic or one
160    /// with no partition records yet applied.
161    #[must_use]
162    pub fn topic_partition_count(&self, topic: &str) -> i32 {
163        i32::try_from(self.partitions_of(topic).count()).unwrap_or(i32::MAX)
164    }
165
166    /// Single-pass iterator over every partition in the image, yielding
167    /// the flat `(topic_name, partition_index)` key alongside the record.
168    /// O(P) in total partition count — the cluster-wide maintenance loops
169    /// (failover, rebalance, reassignment, metrics) use this instead of
170    /// `topics().flat_map(partitions_of)`, which is O(topics × P).
171    pub fn all_partitions(&self) -> impl Iterator<Item = (&(String, i32), &PartitionRecord)> {
172        self.partitions.iter()
173    }
174
175    /// All partitions where a reassignment is currently in flight
176    /// (`adding_replicas` or `removing_replicas` non-empty).
177    pub fn reassignments_in_flight(&self) -> impl Iterator<Item = &PartitionRecord> + '_ {
178        self.all_partitions()
179            .map(|(_, p)| p)
180            .filter(|p| !p.adding_replicas.is_empty() || !p.removing_replicas.is_empty())
181    }
182
183    /// Currently-effective config overrides for `topic`, or `None` if no
184    /// `V1TopicConfig` record has been applied for this topic since the last
185    /// `V1DeleteTopic` (or since image creation).
186    #[must_use]
187    pub fn topic_config(&self, topic: &str) -> Option<&BTreeMap<String, String>> {
188        self.topic_configs.get(topic)
189    }
190
191    /// Per-broker config overrides for `node_id`, or `None` if no
192    /// `V1BrokerConfig` record has been applied for this broker.
193    #[must_use]
194    pub fn broker_config(&self, node_id: NodeId) -> Option<&BTreeMap<String, String>> {
195        self.broker_configs.get(&node_id)
196    }
197
198    /// Returns the throttle rate in bytes/sec for `node_id` and `kind`.
199    /// Returns `None` if the config key is absent, unparseable, or is `-1`
200    /// (Kafka convention for "disabled / unlimited").
201    #[must_use]
202    pub fn broker_throttle_rate(&self, node_id: NodeId, kind: ThrottleKind) -> Option<u64> {
203        let key = match kind {
204            ThrottleKind::Leader => "leader.replication.throttled.rate",
205            ThrottleKind::Follower => "follower.replication.throttled.rate",
206        };
207        let raw = self.broker_config(node_id)?.get(key)?;
208        let v: i64 = raw.parse().ok()?;
209        #[allow(clippy::cast_sign_loss)]
210        if v < 0 { None } else { Some(v as u64) }
211    }
212
213    /// Override map for a single KIP-714 client-metrics subscription.
214    #[must_use]
215    pub fn client_metrics_config(&self, name: &str) -> Option<&BTreeMap<String, String>> {
216        self.client_metrics_configs.get(name)
217    }
218
219    /// All configured client-metrics subscriptions, `(name, overrides)`.
220    pub fn client_metrics_subscriptions(
221        &self,
222    ) -> impl Iterator<Item = (&String, &BTreeMap<String, String>)> {
223        self.client_metrics_configs.iter()
224    }
225
226    #[must_use]
227    pub fn client_quotas(&self) -> &HashMap<EntityKey, BTreeMap<String, f64>> {
228        &self.client_quotas
229    }
230
231    #[must_use]
232    pub fn scram_credential(
233        &self,
234        user: &str,
235        mechanism: SaslMechanism,
236    ) -> Option<&ScramCredential> {
237        self.scram_credentials.get(&(user.to_string(), mechanism))
238    }
239
240    /// All distinct users with at least one SCRAM credential. Order is
241    /// unspecified.
242    #[must_use]
243    pub fn scram_credentials_users(&self) -> Vec<String> {
244        self.scram_credentials
245            .keys()
246            .map(|(u, _)| u.clone())
247            .collect::<std::collections::HashSet<_>>()
248            .into_iter()
249            .collect()
250    }
251
252    /// All `(mechanism, iterations)` pairs for `user`. Empty if user has
253    /// no SCRAM credentials. Order is unspecified.
254    #[must_use]
255    pub fn scram_credentials_for_user(&self, user: &str) -> Vec<(SaslMechanism, u32)> {
256        self.scram_credentials
257            .iter()
258            .filter(|((u, _), _)| u == user)
259            .map(|((_, mech), cred)| (*mech, cred.iterations))
260            .collect()
261    }
262
263    #[must_use]
264    pub fn broker(&self, node_id: NodeId) -> Option<&BrokerRegistrationRecord> {
265        self.brokers.get(&node_id)
266    }
267
268    /// KIP-903: the broker epoch (registration commit offset) for `node_id`,
269    /// or `None` if the broker is not registered in this image.
270    #[must_use]
271    pub fn broker_epoch(&self, node_id: NodeId) -> Option<i64> {
272        self.brokers.get(&node_id).map(|b| b.broker_epoch)
273    }
274
275    pub fn brokers(&self) -> impl Iterator<Item = &BrokerRegistrationRecord> {
276        self.brokers.values()
277    }
278
279    #[must_use]
280    pub fn kraft_version(&self) -> u16 {
281        self.kraft_version
282    }
283
284    #[must_use]
285    pub fn voters(&self) -> &crate::voters::VoterSet {
286        &self.voters
287    }
288
289    /// Iterate every ACL that could possibly match `(rt, rn)`:
290    /// - all literal entries at `(rt, rn)`
291    /// - all literal entries at `(rt, "*")` — the `WILDCARD_RESOURCE`, which
292    ///   matches every resource of that type (see Kafka's `AclAuthorizer` /
293    ///   `StandardAuthorizer`; this is what `kafka-acls --topic '*'` produces)
294    /// - all prefixed entries whose `resource_name` is a prefix of `rn`
295    pub fn matching_acls<'a>(
296        &'a self,
297        rt: ResourceType,
298        rn: &'a str,
299    ) -> impl Iterator<Item = &'a AclEntry> + 'a {
300        let literal_iter = self
301            .acls_literal
302            .get(&(rt, rn.to_string()))
303            .into_iter()
304            .flatten();
305        // Literal "*" is the WILDCARD_RESOURCE and matches every resource of
306        // this type. Skip it when `rn` is already "*" to avoid duplicating the
307        // entries already produced by `literal_iter`.
308        let wildcard_iter = (rn != "*")
309            .then(|| self.acls_literal.get(&(rt, "*".to_string())))
310            .into_iter()
311            .flatten()
312            .flatten();
313        let prefixed_iter = self
314            .acls_prefixed
315            .get(&rt)
316            .into_iter()
317            .flatten()
318            .filter(move |e| rn.starts_with(&e.resource_name));
319        literal_iter.chain(wildcard_iter).chain(prefixed_iter)
320    }
321
322    /// All ACL entries (literal + prefixed across all resource types).
323    /// Used by `DescribeAcls`.
324    pub fn all_acls(&self) -> impl Iterator<Item = &AclEntry> {
325        self.acls_literal
326            .values()
327            .flatten()
328            .chain(self.acls_prefixed.values().flatten())
329    }
330
331    /// Look up a delegation token by its `token_id` (KIP-48).
332    #[must_use]
333    pub fn delegation_token_by_id(&self, token_id: &str) -> Option<&DelegationToken> {
334        self.delegation_tokens.get(token_id)
335    }
336
337    /// All tokens owned by `owner` (KIP-48; exact match on
338    /// the owning [`KafkaPrincipal`]). Order is unspecified.
339    #[must_use]
340    pub fn delegation_tokens_by_owner(&self, owner: &KafkaPrincipal) -> Vec<&DelegationToken> {
341        self.delegation_tokens
342            .values()
343            .filter(|t| &t.owner == owner)
344            .collect()
345    }
346
347    /// Tokens that `principal` is allowed to see via
348    /// `DescribeDelegationToken` without `DescribeToken` permission —
349    /// either as the owner or as a listed renewer (KIP-48). Order is
350    /// unspecified.
351    #[must_use]
352    pub fn delegation_tokens_visible_to(
353        &self,
354        principal: &KafkaPrincipal,
355    ) -> Vec<&DelegationToken> {
356        self.delegation_tokens
357            .values()
358            .filter(|t| &t.owner == principal || t.renewers.iter().any(|r| r == principal))
359            .collect()
360    }
361
362    /// Every delegation token currently in the
363    /// image (KIP-48). Used by `DescribeDelegationToken` for callers with
364    /// `DescribeToken` permission on the cluster.
365    pub fn all_delegation_tokens(&self) -> impl Iterator<Item = &DelegationToken> {
366        self.delegation_tokens.values()
367    }
368
369    /// KIP-48: lookup a delegation token by its HMAC bytes.
370    /// `RenewDelegationToken` / `ExpireDelegationToken` identify a token
371    /// by HMAC on the wire (not by `token_id`), and the SCRAM
372    /// delegation-token fallback needs the same lookup at
373    /// the auth path. Implementation is a linear scan over the small
374    /// (per-broker, in-memory) token map — clarity over an explicit
375    /// `HMAC→token_id` index until cardinality justifies it.
376    #[must_use]
377    pub fn delegation_token_by_hmac(&self, hmac: &[u8]) -> Option<&DelegationToken> {
378        self.delegation_tokens.values().find(|t| t.hmac == hmac)
379    }
380
381    /// KIP-584: finalized feature levels, keyed by feature name. Empty
382    /// until an `UpdateFeatures` call lands a `V1FeatureLevel` record.
383    #[must_use]
384    pub fn finalized_features(&self) -> &BTreeMap<String, i16> {
385        &self.feature_levels
386    }
387
388    /// KIP-584 finalized-features epoch. `-1` ("unknown") until the first
389    /// feature is finalized.
390    #[must_use]
391    pub fn finalized_features_epoch(&self) -> i64 {
392        self.features_epoch
393    }
394
395    /// The finalized `metadata.version` level, or `None` if no
396    /// `V1FeatureLevel` for `metadata.version` has been applied
397    /// (a pre-bootstrap / legacy image — `MetadataVersion.UNKNOWN`).
398    #[must_use]
399    pub fn finalized_metadata_version(&self) -> Option<i16> {
400        self.feature_levels
401            .get(crate::metadata_version::METADATA_VERSION_FEATURE)
402            .copied()
403    }
404
405    /// KIP-584: the finalized level for an arbitrary feature, or `None` if it
406    /// has not been finalized. Generic counterpart to
407    /// [`Self::finalized_metadata_version`].
408    #[must_use]
409    pub fn finalized_feature(&self, name: &str) -> Option<i16> {
410        self.feature_levels.get(name).copied()
411    }
412
413    /// The minimum `metadata.version` level the live image requires: the
414    /// floor a downgrade must not drop below. Rises with feature-gated
415    /// state present in the image (`KRaft` SCRAM creds, delegation tokens).
416    /// Baseline is `METADATA_VERSION_MIN`.
417    #[must_use]
418    pub fn min_required_metadata_version(&self) -> i16 {
419        use crate::metadata_version::{
420            DELEGATION_TOKEN_MIN_LEVEL, METADATA_VERSION_MIN, SCRAM_MIN_LEVEL,
421        };
422        let mut floor = METADATA_VERSION_MIN;
423        if !self.scram_credentials.is_empty() {
424            floor = floor.max(SCRAM_MIN_LEVEL);
425        }
426        if !self.delegation_tokens.is_empty() {
427            floor = floor.max(DELEGATION_TOKEN_MIN_LEVEL);
428        }
429        floor
430    }
431
432    /// Apply one record. Returns the previous record (for `V1Topic` /
433    /// `V1BrokerRegistration`) so the caller can observe overwrite cases.
434    /// Infallible — pre-validation against the current image happens
435    /// in the controller before submitting to Raft. Apply must never
436    /// fail on a committed entry.
437    #[allow(clippy::too_many_lines)] // exhaustive match over MetadataRecord
438    pub fn apply(&mut self, rec: &MetadataRecord) {
439        match rec {
440            MetadataRecord::V1Topic(t) => {
441                // If a topic with this name already exists under a different
442                // id, drop the stale id entry before re-indexing.
443                if let Some(prev) = self.topics.get(&t.name)
444                    && prev.topic_id != t.topic_id
445                {
446                    self.topic_ids.remove(&prev.topic_id);
447                }
448                self.topic_ids.insert(t.topic_id, t.name.clone());
449                // The denormalized `partitions` count is a derived cache of the
450                // partitions map (KIP-631 `TopicRecord` carries no partition
451                // count), maintained incrementally as `V1Partition` records
452                // apply. A brand-new topic starts at 0 and the partition records
453                // that follow it in log order restore the real count; a
454                // re-registration of an existing topic keeps its partitions in
455                // the map, so preserve the running count rather than trusting
456                // the (KIP-631-zeroed) record field.
457                let mut rec = t.clone();
458                rec.partitions = self.topics.get(&t.name).map_or(0, |prev| prev.partitions);
459                self.topics.insert(t.name.clone(), rec);
460            }
461            MetadataRecord::V1Partition(p) => {
462                // Maintain the owning topic's denormalized partition count / RF
463                // incrementally. Re-scanning the whole partitions map on every
464                // record (the previous approach) made log/snapshot replay
465                // O(P²); bumping the cached count only when a *new* partition
466                // key lands keeps apply O(1) while leaving the count a true
467                // derived view of the partitions map.
468                let is_new = self
469                    .partitions
470                    .insert((p.topic.clone(), p.partition), p.clone())
471                    .is_none();
472                if let Some(t) = self.topics.get_mut(&p.topic) {
473                    if is_new {
474                        t.partitions = t.partitions.saturating_add(1);
475                    }
476                    t.replication_factor =
477                        i16::try_from(p.replicas.len()).unwrap_or(t.replication_factor);
478                }
479            }
480            MetadataRecord::V1BrokerRegistration(b) => {
481                self.brokers.insert(b.node_id, b.clone());
482            }
483            MetadataRecord::V1DeleteTopic(d) => {
484                if let Some(prev) = self.topics.get(&d.name) {
485                    self.topic_ids.remove(&prev.topic_id);
486                }
487                self.topics.remove(&d.name);
488                self.partitions.retain(|(t, _), _| t != &d.name);
489                self.topic_configs.remove(&d.name);
490            }
491            MetadataRecord::V1TopicConfig(c) => {
492                if c.overrides.is_empty() {
493                    self.topic_configs.remove(&c.topic);
494                } else {
495                    self.topic_configs
496                        .insert(c.topic.clone(), c.overrides.clone());
497                }
498            }
499            MetadataRecord::V1ScramCredential(r) => {
500                self.scram_credentials.insert(
501                    (r.user.clone(), r.mechanism),
502                    ScramCredential {
503                        mechanism: r.mechanism,
504                        salt: r.salt.clone(),
505                        stored_key: r.stored_key.clone(),
506                        server_key: r.server_key.clone(),
507                        iterations: r.iterations,
508                    },
509                );
510            }
511            MetadataRecord::V1DeleteScramCredential(r) => {
512                self.scram_credentials
513                    .remove(&(r.user.clone(), r.mechanism));
514            }
515            MetadataRecord::V1AccessControlEntry(entry) => {
516                let bucket = match entry.pattern_type {
517                    PatternType::Literal => self
518                        .acls_literal
519                        .entry((entry.resource_type, entry.resource_name.clone()))
520                        .or_default(),
521                    PatternType::Prefixed => {
522                        self.acls_prefixed.entry(entry.resource_type).or_default()
523                    }
524                };
525                // Last-write-wins on full-tuple equality.
526                bucket.retain(|e| e != entry);
527                bucket.push(entry.clone());
528            }
529            MetadataRecord::V1DeleteAccessControlEntry(filter) => {
530                self.acls_literal.retain(|_, v| {
531                    v.retain(|e| !filter.matches(e));
532                    !v.is_empty()
533                });
534                self.acls_prefixed.retain(|_, v| {
535                    v.retain(|e| !filter.matches(e));
536                    !v.is_empty()
537                });
538            }
539            MetadataRecord::V1BrokerConfig(rec) => {
540                let entry = self.broker_configs.entry(rec.node_id).or_default();
541                match &rec.config_value {
542                    Some(v) => {
543                        entry.insert(rec.config_name.clone(), v.clone());
544                    }
545                    None => {
546                        entry.remove(&rec.config_name);
547                    }
548                }
549            }
550            MetadataRecord::V1ClientQuota(rec) => {
551                let key = canonicalize_entity(
552                    rec.entity
553                        .iter()
554                        .map(|e| (e.entity_type.clone(), e.entity_name.clone()))
555                        .collect(),
556                );
557                let configs = self.client_quotas.entry(key).or_default();
558                match rec.config_value {
559                    Some(v) => {
560                        configs.insert(rec.config_key.clone(), v);
561                    }
562                    None => {
563                        configs.remove(&rec.config_key);
564                    }
565                }
566            }
567            // Replacement semantics (KIP-48) — same
568            // `token_id` overwrites the prior entry (used by Create
569            // and Renew). Tombstone removes by `token_id`.
570            MetadataRecord::V1DelegationToken(rec) => {
571                self.delegation_tokens
572                    .insert(rec.token_id.clone(), DelegationToken::from_record(rec));
573            }
574            MetadataRecord::V1DeleteDelegationToken(rec) => {
575                self.delegation_tokens.remove(&rec.token_id);
576            }
577            MetadataRecord::V1UnregisterBroker(rec) => {
578                // Idempotent: applying against an unknown `node_id` is
579                // a no-op.
580                self.brokers.remove(&rec.node_id);
581            }
582            MetadataRecord::V1KRaftVersion(r) => {
583                self.kraft_version = r.kraft_version;
584            }
585            MetadataRecord::V1Voters(r) => {
586                self.voters = r.voters.clone();
587            }
588            MetadataRecord::V1FeatureLevel(rec) => {
589                if rec.level == 0 {
590                    self.feature_levels.remove(&rec.name);
591                } else {
592                    self.feature_levels.insert(rec.name.clone(), rec.level);
593                }
594                // Monotonic epoch: -1 -> 0 on the first record, then +1.
595                self.features_epoch = self.features_epoch.saturating_add(1).max(0);
596            }
597            MetadataRecord::V1ClientMetricsConfig(c) => {
598                if c.configs.is_empty() {
599                    self.client_metrics_configs.remove(&c.name);
600                } else {
601                    self.client_metrics_configs
602                        .insert(c.name.clone(), c.configs.clone());
603                }
604            }
605            // Snapshot-only: restore the epoch verbatim rather than bumping.
606            // Emitted last by `to_records` so it overrides the value the
607            // preceding `V1FeatureLevel` replays accumulated. Never reaches
608            // the live log (not a controller-submittable change).
609            MetadataRecord::V1FeaturesEpoch(rec) => {
610                self.features_epoch = rec.epoch;
611            }
612            // KIP-858 directory-assignment delta: merge ONLY the reporting
613            // replica's `directories` slot. Never touches leader/isr/replicas/
614            // adding/removing, so a concurrent reassignment or ISR change is
615            // preserved regardless of commit order.
616            MetadataRecord::V1PartitionDirAssignment(r) => {
617                if let Some(pr) = self.partitions.get_mut(&(r.topic.clone(), r.partition))
618                    && let Some(slot) = pr.replicas.iter().position(|n| *n == r.replica)
619                {
620                    if pr.directories.len() < pr.replicas.len() {
621                        pr.directories.resize(pr.replicas.len(), uuid::Uuid::nil());
622                    }
623                    pr.directories[slot] = r.directory;
624                }
625            }
626        }
627    }
628
629    /// Serialize the image into the minimal sequence of [`MetadataRecord`]s
630    /// whose in-order [`apply`](Self::apply) reproduces this image exactly.
631    /// This is the inverse of `apply` over a sequence of non-tombstone
632    /// records: each stored entry maps to the record that would create it.
633    ///
634    /// Tombstone / removal records (`V1DeleteTopic`,
635    /// `V1DeleteScramCredential`, `V1DeleteAccessControlEntry`,
636    /// `V1DeleteDelegationToken`, `V1UnregisterBroker`) are intentionally
637    /// never emitted: a snapshot captures resulting *state*, not deletion
638    /// history. `V1BrokerConfig` deletes likewise vanish — only the
639    /// surviving key/value pairs are emitted as `Some(value)` sets.
640    ///
641    /// Records are emitted in dependency order so that a fresh image
642    /// `apply`ing them never sees a dangling reference: brokers,
643    /// broker-configs, topics, partitions, topic-configs, SCRAM creds,
644    /// ACLs, client quotas, delegation tokens.
645    #[must_use]
646    pub fn to_records(&self) -> Vec<MetadataRecord> {
647        let mut out = Vec::new();
648
649        // Brokers before their configs.
650        for b in self.brokers.values() {
651            out.push(MetadataRecord::V1BrokerRegistration(b.clone()));
652        }
653        for (node_id, configs) in &self.broker_configs {
654            for (config_name, config_value) in configs {
655                out.push(MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
656                    node_id: *node_id,
657                    config_name: config_name.clone(),
658                    config_value: Some(config_value.clone()),
659                }));
660            }
661        }
662
663        // Topics before partitions and topic-configs (apply-side validate
664        // and the snapshot's own DeleteTopic-clears-configs invariant both
665        // assume the topic exists first).
666        for t in self.topics.values() {
667            out.push(MetadataRecord::V1Topic(t.clone()));
668        }
669        for p in self.partitions.values() {
670            out.push(MetadataRecord::V1Partition(p.clone()));
671        }
672        for (topic, overrides) in &self.topic_configs {
673            out.push(MetadataRecord::V1TopicConfig(TopicConfigRecord {
674                topic: topic.clone(),
675                overrides: overrides.clone(),
676            }));
677        }
678
679        // SCRAM credentials.
680        for ((user, mechanism), cred) in &self.scram_credentials {
681            out.push(MetadataRecord::V1ScramCredential(ScramCredentialRecord {
682                user: user.clone(),
683                mechanism: *mechanism,
684                salt: cred.salt.clone(),
685                stored_key: cred.stored_key.clone(),
686                server_key: cred.server_key.clone(),
687                iterations: cred.iterations,
688            }));
689        }
690
691        // ACLs (literal + prefixed). Each stored entry already carries its
692        // own pattern_type, so re-applying routes it back to the right
693        // bucket.
694        for entry in self.all_acls() {
695            out.push(MetadataRecord::V1AccessControlEntry(entry.clone()));
696        }
697
698        // Client quotas. The key is the canonicalized entity tuple; rebuild
699        // a QuotaEntity list from it and emit one record per config pair.
700        // apply re-canonicalizes, so the round-trip is stable.
701        for (entity_key, configs) in &self.client_quotas {
702            let entity: Vec<QuotaEntity> = entity_key
703                .iter()
704                .map(|(entity_type, entity_name)| QuotaEntity {
705                    entity_type: entity_type.clone(),
706                    entity_name: entity_name.clone(),
707                })
708                .collect();
709            for (config_key, config_value) in configs {
710                out.push(MetadataRecord::V1ClientQuota(ClientQuotaRecord {
711                    entity: entity.clone(),
712                    config_key: config_key.clone(),
713                    config_value: Some(*config_value),
714                }));
715            }
716        }
717
718        // Delegation tokens.
719        for tok in self.delegation_tokens.values() {
720            out.push(MetadataRecord::V1DelegationToken(DelegationTokenRecord {
721                token_id: tok.token_id.clone(),
722                owner: tok.owner.clone(),
723                hmac: tok.hmac.clone(),
724                issue_timestamp_ms: tok.issue_timestamp_ms,
725                expiry_timestamp_ms: tok.expiry_timestamp_ms,
726                max_timestamp_ms: tok.max_timestamp_ms,
727                renewers: tok.renewers.clone(),
728            }));
729        }
730
731        // Client-metrics subscriptions (KIP-714).
732        for (name, configs) in &self.client_metrics_configs {
733            out.push(MetadataRecord::V1ClientMetricsConfig(
734                ClientMetricsConfigRecord {
735                    name: name.clone(),
736                    configs: configs.clone(),
737                },
738            ));
739        }
740
741        // KIP-584 finalized features: one record per live feature, in the
742        // BTreeMap's deterministic key order. (level == 0 is the delete
743        // sentinel and is never stored, so every emitted level is >= 1 and
744        // re-applies as a set.) The trailing V1FeaturesEpoch then pins the
745        // epoch verbatim — the preceding feature records each bump it, but the
746        // bumped value reflects the live feature count, not the original apply
747        // history, so it must be overwritten. Emitted whenever any feature was
748        // ever finalized (epoch left -1 only on a pristine image), so an image
749        // that finalized then removed every feature still restores its epoch.
750        for (name, level) in &self.feature_levels {
751            out.push(MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
752                name: name.clone(),
753                level: *level,
754            }));
755        }
756        if self.features_epoch >= 0 {
757            out.push(MetadataRecord::V1FeaturesEpoch(FeaturesEpochRecord {
758                epoch: self.features_epoch,
759            }));
760        }
761
762        // KIP-853 controller-quorum state. Both MUST be emitted or a snapshot
763        // recovery / learner install silently drops them: the voter set goes
764        // empty (breaking `DescribeQuorum` and KIP-853 auto-join) and the
765        // cluster `kraft.version` reverts to 0. The cluster `kraft_version`
766        // in particular has no other persistence path — unlike the openraft
767        // membership (restored from `SnapshotMeta.last_membership`), it is not
768        // carried anywhere outside the metadata records, so the image is its
769        // only source of truth. Independent of every other record, so order
770        // here is irrelevant. The reconfig coordinator commits the openraft
771        // membership change and the authoritative `V1Voters` record under one
772        // lock, so the emitted voter set mirrors `last_membership` for the
773        // same committed prefix (any transient one-reconfig skew self-heals as
774        // the trailing entry replays from the retained log — the same
775        // eventual lockstep that holds in live operation).
776        if self.kraft_version != 0 {
777            out.push(MetadataRecord::V1KRaftVersion(KRaftVersionRecord {
778                kraft_version: self.kraft_version,
779            }));
780        }
781        if !self.voters.is_empty() {
782            out.push(MetadataRecord::V1Voters(VotersRecord {
783                voters: self.voters.clone(),
784            }));
785        }
786
787        out
788    }
789
790    /// Reconstruct an image from a `cluster_id` and a record sequence
791    /// (typically [`Self::to_records`] output read back from a snapshot):
792    /// `new` an empty image and `apply` each record in order.
793    #[must_use]
794    pub fn from_records(cluster_id: Uuid, records: &[MetadataRecord]) -> Self {
795        let mut image = Self::new(cluster_id);
796        for rec in records {
797            image.apply(rec);
798        }
799        image
800    }
801
802    /// Synchronous pre-validation: returns `Ok` if the record would be a
803    /// no-conflict apply, otherwise the appropriate error. Used by
804    /// `Controller::submit_change` before forwarding to openraft.
805    pub fn validate(&self, rec: &MetadataRecord) -> Result<(), MetadataError> {
806        match rec {
807            MetadataRecord::V1Topic(t) => {
808                if let Some(existing) = self.topics.get(&t.name) {
809                    // Updating an existing topic is allowed only if it's a
810                    // strict partition-count expansion that preserves
811                    // identity: same topic_id, same replication_factor,
812                    // partitions strictly growing. CreatePartitions emits
813                    // exactly this. Identical re-submits stay rejected so
814                    // CreateTopics' idempotency contract still holds.
815                    // Partition count derives from the partitions map (the
816                    // KIP-631 round-trip does not carry the stored
817                    // `TopicRecord.partitions`), so the expansion check reads
818                    // the live count rather than `existing.partitions`.
819                    if existing.topic_id != t.topic_id
820                        || existing.replication_factor != t.replication_factor
821                        || t.partitions <= self.topic_partition_count(&t.name)
822                    {
823                        return Err(MetadataError::TopicExists(t.name.clone()));
824                    }
825                    return Ok(());
826                }
827                // A new topic is valid regardless of the `TopicRecord.partitions`
828                // count: KIP-631 framing does not carry that field, so a
829                // freshly-decoded `V1Topic` round-trips back as partitions=0 with
830                // the real count arriving via the following `V1Partition` records.
831                // Partition-count validity (`> 0`) is enforced at the CreateTopics
832                // request boundary, not on the metadata-record apply path.
833                Ok(())
834            }
835            MetadataRecord::V1Partition(p) => {
836                if !self.topics.contains_key(&p.topic) {
837                    return Err(MetadataError::UnknownTopic(p.topic.clone()));
838                }
839                Ok(())
840            }
841            MetadataRecord::V1DeleteTopic(d) => {
842                if !self.topics.contains_key(&d.name) {
843                    return Err(MetadataError::UnknownTopic(d.name.clone()));
844                }
845                Ok(())
846            }
847            MetadataRecord::V1TopicConfig(c) => {
848                if !self.topics.contains_key(&c.topic) {
849                    return Err(MetadataError::UnknownTopic(c.topic.clone()));
850                }
851                Ok(())
852            }
853            MetadataRecord::V1BrokerRegistration(_)
854            | MetadataRecord::V1ScramCredential(_)
855            | MetadataRecord::V1DeleteScramCredential(_)
856            | MetadataRecord::V1AccessControlEntry(_)
857            | MetadataRecord::V1DeleteAccessControlEntry(_)
858            | MetadataRecord::V1BrokerConfig(_)
859            | MetadataRecord::V1ClientQuota(_)
860            // Delegation-token records have no
861            // topic-store concerns and admission is gated by the
862            // handler-side checks (KIP-48 §protocol errors), so the
863            // image-level validate is unconditional Ok.
864            | MetadataRecord::V1DelegationToken(_)
865            | MetadataRecord::V1DeleteDelegationToken(_)
866            // UnregisterBroker (KIP-185 / api_key 64). The handler-side
867            // existence check + Cluster:Alter ACL gate provide all the
868            // pre-validation we need; image-level apply is idempotent.
869            | MetadataRecord::V1UnregisterBroker(_)
870            // KIP-853: voter-set / kraft.version records are validated by
871            // the reconfiguration coordinator before submission; the
872            // image-level apply is an unconditional replacement.
873            | MetadataRecord::V1KRaftVersion(_)
874            | MetadataRecord::V1Voters(_)
875            // KIP-584: feature-level admission is fully gated by the
876            // UpdateFeatures handler (supported-range + downgrade checks);
877            // image-level apply is an idempotent map upsert.
878            | MetadataRecord::V1FeatureLevel(_)
879            // KIP-714: client-metrics subscription config. The
880            // IncrementalAlterConfigs handler validates the subscription
881            // name and config keys before submission; image-level apply
882            // is an idempotent map upsert (or removal on empty map).
883            | MetadataRecord::V1ClientMetricsConfig(_)
884            // Snapshot-only epoch carrier: only ever produced by `to_records`
885            // and replayed on snapshot install, never submitted as a change.
886            // Validated permissively for match-exhaustiveness.
887            | MetadataRecord::V1FeaturesEpoch(_)
888            // KIP-858 directory-assignment delta: a merge into one replica's
889            // `directories` slot. The handler already resolved topic/partition/
890            // replica against the image; apply is an idempotent slot upsert
891            // (a no-op if the partition or replica is unknown).
892            | MetadataRecord::V1PartitionDirAssignment(_) => Ok(()),
893        }
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use crate::acl::{AclEntryFilter, AclOperation, PermissionType};
901    use crate::records::{
902        BrokerConfigRecord, ClientQuotaRecord, DeleteDelegationTokenRecord,
903        DeleteScramCredentialRecord, DeleteTopicRecord, FeatureLevelRecord, QuotaEntity,
904        ScramCredentialRecord,
905    };
906    use assert2::assert;
907
908    fn img() -> MetadataImage {
909        MetadataImage::new(Uuid::nil())
910    }
911
912    #[test]
913    fn fresh_image_has_no_features_and_unknown_epoch() {
914        let m = img();
915        assert!(m.finalized_features().is_empty());
916        assert!(m.finalized_features_epoch() == -1);
917    }
918
919    #[test]
920    fn finalized_feature_reads_arbitrary_name() {
921        let mut m = MetadataImage::new(uuid::Uuid::nil());
922        assert!(m.finalized_feature("transaction.version").is_none());
923        m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
924            name: "transaction.version".into(),
925            level: 2,
926        }));
927        assert!(m.finalized_feature("transaction.version") == Some(2));
928    }
929
930    #[test]
931    fn apply_feature_level_sets_level_and_bumps_epoch() {
932        let mut m = img();
933        m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
934            name: "metadata.version".into(),
935            level: 1,
936        }));
937        assert!(m.finalized_features().get("metadata.version") == Some(&1));
938        assert!(m.finalized_features_epoch() == 0);
939
940        // A second apply bumps the epoch again (monotonic).
941        m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
942            name: "metadata.version".into(),
943            level: 1,
944        }));
945        assert!(m.finalized_features_epoch() == 1);
946    }
947
948    #[test]
949    fn apply_feature_level_zero_deletes_entry() {
950        let mut m = img();
951        m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
952            name: "metadata.version".into(),
953            level: 1,
954        }));
955        m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
956            name: "metadata.version".into(),
957            level: 0,
958        }));
959        assert!(m.finalized_features().get("metadata.version").is_none());
960        // Epoch still advanced — it is monotonic, not a count of live features.
961        assert!(m.finalized_features_epoch() == 1);
962    }
963
964    #[test]
965    fn to_records_from_records_round_trips() {
966        let cid = Uuid::new_v4();
967        let mut image = MetadataImage::new(cid);
968        image.apply(&MetadataRecord::V1Topic(TopicRecord {
969            name: "orders".into(),
970            topic_id: Uuid::new_v4(),
971            partitions: 3,
972            replication_factor: 2,
973        }));
974        assert!(MetadataImage::from_records(cid, &image.to_records()) == image);
975    }
976
977    /// Exercises every stored variant the image can hold (the 9
978    /// state-producing records), plus tombstones whose effects must NOT
979    /// leak into the snapshot. The round-trip must reproduce the image
980    /// exactly.
981    #[test]
982    #[allow(clippy::too_many_lines)] // exhaustive fixture over every stored variant
983    fn to_records_round_trips_all_variants() {
984        use crabka_security::{KafkaPrincipal, SaslMechanism};
985
986        let cid = Uuid::new_v4();
987        let mut image = MetadataImage::new(cid);
988
989        // Brokers (one survives, one is unregistered → must not reappear).
990        image.apply(&MetadataRecord::V1BrokerRegistration(
991            BrokerRegistrationRecord {
992                node_id: 1,
993                broker_epoch: 0,
994                incarnation_id: Uuid::nil(),
995                host: "h1".into(),
996                port: 9092,
997                rack: Some("r1".into()),
998                endpoints: vec![crate::records::BrokerEndpoint {
999                    name: "EXTERNAL".into(),
1000                    host: "ext".into(),
1001                    port: 9093,
1002                    protocol: crabka_security::ListenerProtocol::SaslSsl,
1003                }],
1004            },
1005        ));
1006        image.apply(&MetadataRecord::V1BrokerRegistration(
1007            BrokerRegistrationRecord {
1008                node_id: 2,
1009                broker_epoch: 0,
1010                incarnation_id: Uuid::nil(),
1011                host: "h2".into(),
1012                port: 9092,
1013                rack: None,
1014                endpoints: vec![],
1015            },
1016        ));
1017        image.apply(&MetadataRecord::V1UnregisterBroker(
1018            crate::records::UnregisterBrokerRecord { node_id: 2 },
1019        ));
1020
1021        // Broker configs: one set survives, one deleted-back-to-empty.
1022        image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1023            node_id: 1,
1024            config_name: "leader.replication.throttled.rate".into(),
1025            config_value: Some("2048".into()),
1026        }));
1027        image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1028            node_id: 1,
1029            config_name: "follower.replication.throttled.rate".into(),
1030            config_value: Some("4096".into()),
1031        }));
1032        image.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1033            node_id: 1,
1034            config_name: "follower.replication.throttled.rate".into(),
1035            config_value: None,
1036        }));
1037
1038        // Topics + partitions (one topic deleted → topic, partitions and
1039        // its config must vanish).
1040        image.apply(&MetadataRecord::V1Topic(TopicRecord {
1041            name: "orders".into(),
1042            topic_id: Uuid::new_v4(),
1043            partitions: 2,
1044            replication_factor: 2,
1045        }));
1046        image.apply(&MetadataRecord::V1Topic(TopicRecord {
1047            name: "doomed".into(),
1048            topic_id: Uuid::new_v4(),
1049            partitions: 1,
1050            replication_factor: 1,
1051        }));
1052        image.apply(&MetadataRecord::V1Partition(PartitionRecord {
1053            topic: "orders".into(),
1054            partition: 0,
1055            leader: 1,
1056            replicas: vec![1, 2],
1057            isr: vec![1, 2],
1058            leader_epoch: 4,
1059            adding_replicas: vec![2],
1060            removing_replicas: vec![],
1061            directories: vec![],
1062            partition_epoch: 0,
1063        }));
1064        image.apply(&MetadataRecord::V1Partition(PartitionRecord {
1065            topic: "orders".into(),
1066            partition: 1,
1067            leader: 2,
1068            replicas: vec![2, 1],
1069            isr: vec![2],
1070            leader_epoch: 7,
1071            adding_replicas: vec![],
1072            removing_replicas: vec![1],
1073            directories: vec![],
1074            partition_epoch: 0,
1075        }));
1076        image.apply(&MetadataRecord::V1Partition(PartitionRecord {
1077            topic: "doomed".into(),
1078            partition: 0,
1079            leader: 1,
1080            replicas: vec![1],
1081            isr: vec![1],
1082            leader_epoch: 0,
1083            adding_replicas: vec![],
1084            removing_replicas: vec![],
1085            directories: vec![],
1086            partition_epoch: 0,
1087        }));
1088
1089        // Topic config.
1090        let mut overrides = std::collections::BTreeMap::new();
1091        overrides.insert("retention.ms".to_string(), "60000".to_string());
1092        overrides.insert("segment.bytes".to_string(), "1048576".to_string());
1093        image.apply(&MetadataRecord::V1TopicConfig(TopicConfigRecord {
1094            topic: "orders".into(),
1095            overrides,
1096        }));
1097
1098        // Delete the doomed topic (clears its partition + config too).
1099        image.apply(&MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
1100            name: "doomed".into(),
1101        }));
1102
1103        // SCRAM creds (one survives, one deleted).
1104        image.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1105            user: "alice".into(),
1106            mechanism: SaslMechanism::ScramSha512,
1107            salt: vec![1; 16],
1108            stored_key: vec![2; 64],
1109            server_key: vec![3; 64],
1110            iterations: 8192,
1111        }));
1112        image.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1113            user: "bob".into(),
1114            mechanism: SaslMechanism::ScramSha256,
1115            salt: vec![4; 16],
1116            stored_key: vec![5; 32],
1117            server_key: vec![6; 32],
1118            iterations: 4096,
1119        }));
1120        image.apply(&MetadataRecord::V1DeleteScramCredential(
1121            DeleteScramCredentialRecord {
1122                user: "bob".into(),
1123                mechanism: SaslMechanism::ScramSha256,
1124            },
1125        ));
1126
1127        // ACLs: literal + prefixed survive; a deleted one must not.
1128        let literal = AclEntry {
1129            resource_type: ResourceType::Topic,
1130            resource_name: "orders".into(),
1131            pattern_type: PatternType::Literal,
1132            principal: "User:alice".into(),
1133            host: "*".into(),
1134            operation: AclOperation::Read,
1135            permission_type: PermissionType::Allow,
1136        };
1137        let prefixed = AclEntry {
1138            resource_type: ResourceType::Group,
1139            resource_name: "cg-".into(),
1140            pattern_type: PatternType::Prefixed,
1141            principal: "User:alice".into(),
1142            host: "*".into(),
1143            operation: AclOperation::Read,
1144            permission_type: PermissionType::Allow,
1145        };
1146        let doomed_acl = AclEntry {
1147            resource_type: ResourceType::Cluster,
1148            resource_name: "kafka-cluster".into(),
1149            pattern_type: PatternType::Literal,
1150            principal: "User:eve".into(),
1151            host: "*".into(),
1152            operation: AclOperation::Alter,
1153            permission_type: PermissionType::Deny,
1154        };
1155        image.apply(&MetadataRecord::V1AccessControlEntry(literal));
1156        image.apply(&MetadataRecord::V1AccessControlEntry(prefixed));
1157        image.apply(&MetadataRecord::V1AccessControlEntry(doomed_acl));
1158        image.apply(&MetadataRecord::V1DeleteAccessControlEntry(
1159            AclEntryFilter {
1160                resource_type: Some(ResourceType::Cluster),
1161                ..AclEntryFilter::default()
1162            },
1163        ));
1164
1165        // Client quotas.
1166        image.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
1167            entity: vec![
1168                QuotaEntity {
1169                    entity_type: "user".into(),
1170                    entity_name: Some("alice".into()),
1171                },
1172                QuotaEntity {
1173                    entity_type: "client-id".into(),
1174                    entity_name: Some("app1".into()),
1175                },
1176            ],
1177            config_key: "producer_byte_rate".into(),
1178            config_value: Some(1024.0),
1179        }));
1180        image.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
1181            entity: vec![QuotaEntity {
1182                entity_type: "user".into(),
1183                entity_name: None,
1184            }],
1185            config_key: "consumer_byte_rate".into(),
1186            config_value: Some(2048.0),
1187        }));
1188
1189        // Delegation tokens (one survives, one deleted).
1190        let alice = KafkaPrincipal {
1191            principal_type: "User".into(),
1192            name: "alice".into(),
1193        };
1194        let bob = KafkaPrincipal {
1195            principal_type: "User".into(),
1196            name: "bob".into(),
1197        };
1198        image.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
1199            token_id: "tok-1".into(),
1200            owner: alice,
1201            hmac: vec![0x42; 32],
1202            issue_timestamp_ms: 1_000,
1203            expiry_timestamp_ms: 5_000,
1204            max_timestamp_ms: 10_000,
1205            renewers: vec![bob.clone()],
1206        }));
1207        image.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
1208            token_id: "tok-2".into(),
1209            owner: bob,
1210            hmac: vec![0x43; 32],
1211            issue_timestamp_ms: 1_000,
1212            expiry_timestamp_ms: 5_000,
1213            max_timestamp_ms: 10_000,
1214            renewers: vec![],
1215        }));
1216        image.apply(&MetadataRecord::V1DeleteDelegationToken(
1217            DeleteDelegationTokenRecord {
1218                token_id: "tok-2".into(),
1219            },
1220        ));
1221
1222        // KIP-584 finalized features: two live features at a non-trivial epoch
1223        // (the metadata.version re-finalize bumps the epoch past the live
1224        // count), so the trailing V1FeaturesEpoch carrier is exercised too.
1225        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1226            name: "metadata.version".into(),
1227            level: 24,
1228        }));
1229        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1230            name: "metadata.version".into(),
1231            level: 25,
1232        }));
1233        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1234            name: "group.version".into(),
1235            level: 1,
1236        }));
1237        assert!(image.finalized_features_epoch() == 2);
1238
1239        let rebuilt = MetadataImage::from_records(cid, &image.to_records());
1240        assert!(rebuilt == image);
1241    }
1242
1243    /// Finalized features and their epoch must survive a `to_records` /
1244    /// `from_records` round-trip exactly. The epoch is carried verbatim by a
1245    /// trailing snapshot-only record, NOT recomputed by apply-bumping — so an
1246    /// image whose epoch (history of `UpdateFeatures` applies) exceeds its live
1247    /// feature count still reproduces exactly. Regression guard for the bug
1248    /// where `to_records` emitted no feature records at all and the snapshot
1249    /// path silently dropped every finalized feature.
1250    #[test]
1251    fn to_records_round_trips_features_and_epoch() {
1252        let cid = Uuid::new_v4();
1253        let mut image = MetadataImage::new(cid);
1254        // Four applies → epoch climbs -1 → 0 → 1 → 2 → 3, while only two
1255        // features remain live (metadata.version re-finalized once).
1256        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1257            name: "metadata.version".into(),
1258            level: 24,
1259        }));
1260        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1261            name: "metadata.version".into(),
1262            level: 25,
1263        }));
1264        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1265            name: "group.version".into(),
1266            level: 1,
1267        }));
1268        image.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
1269            name: "metadata.version".into(),
1270            level: 25,
1271        }));
1272        assert!(image.finalized_features_epoch() == 3);
1273
1274        let rebuilt = MetadataImage::from_records(cid, &image.to_records());
1275        assert!(rebuilt == image);
1276        // Belt-and-suspenders beyond derived PartialEq: the two fields the
1277        // snapshot path used to lose.
1278        assert!(rebuilt.finalized_features().get("metadata.version") == Some(&25));
1279        assert!(rebuilt.finalized_features().get("group.version") == Some(&1));
1280        assert!(rebuilt.finalized_features_epoch() == 3);
1281    }
1282
1283    /// KIP-853 voter set + finalized cluster `kraft.version` must survive
1284    /// `to_records` / `from_records`. Before the fix `to_records` dropped
1285    /// both, so a snapshot recovery / learner install rebuilt the image with
1286    /// an EMPTY voter set and `kraft_version = 0` — breaking `DescribeQuorum`
1287    /// and KIP-853 auto-join. The cluster `kraft_version` has no other
1288    /// persistence source (it is not part of openraft's membership), so it
1289    /// can only round-trip through a `V1KRaftVersion` record.
1290    #[test]
1291    fn to_records_preserves_voters_and_kraft_version() {
1292        use crate::records::{KRaftVersionRecord, VotersRecord};
1293        use crate::voters::{KRaftVersionRange, Voter, VoterEndpoint, VoterSet};
1294
1295        let cid = Uuid::new_v4();
1296        let mut image = MetadataImage::new(cid);
1297        image.apply(&MetadataRecord::V1KRaftVersion(KRaftVersionRecord {
1298            kraft_version: 1,
1299        }));
1300        let voters = VoterSet::from_voters([
1301            Voter {
1302                id: 1,
1303                directory_id: Uuid::from_u128(1),
1304                endpoints: vec![VoterEndpoint {
1305                    name: "CONTROLLER".into(),
1306                    host: "127.0.0.1".into(),
1307                    port: 9093,
1308                }],
1309                kraft_version: KRaftVersionRange::default(),
1310            },
1311            Voter {
1312                id: 2,
1313                directory_id: Uuid::from_u128(2),
1314                endpoints: vec![VoterEndpoint {
1315                    name: "CONTROLLER".into(),
1316                    host: "127.0.0.1".into(),
1317                    port: 9094,
1318                }],
1319                kraft_version: KRaftVersionRange { min: 0, max: 1 },
1320            },
1321        ]);
1322        image.apply(&MetadataRecord::V1Voters(VotersRecord {
1323            voters: voters.clone(),
1324        }));
1325
1326        let rebuilt = MetadataImage::from_records(cid, &image.to_records());
1327
1328        assert_eq!(rebuilt.kraft_version(), 1);
1329        assert_eq!(rebuilt.voters(), &voters);
1330        assert_eq!(rebuilt, image);
1331    }
1332
1333    fn topic(name: &str, partitions: i32) -> MetadataRecord {
1334        MetadataRecord::V1Topic(TopicRecord {
1335            name: name.into(),
1336            topic_id: Uuid::new_v4(),
1337            partitions,
1338            replication_factor: 1,
1339        })
1340    }
1341
1342    #[test]
1343    fn apply_topic_inserts() {
1344        let mut m = img();
1345        m.apply(&topic("t", 3));
1346        assert!(m.topic("t").is_some());
1347    }
1348
1349    #[test]
1350    fn apply_dir_assignment_merges_one_slot_without_clobbering_reassignment() {
1351        // Regression guard for the KIP-858 dir-assignment clobber: a directory
1352        // report applied as a delta must set only the reporting replica's slot
1353        // and leave a concurrent reassignment's `adding_replicas` intact.
1354        let mut m = img();
1355        m.apply(&topic("t", 1));
1356        m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1357            topic: "t".into(),
1358            partition: 0,
1359            leader: 1,
1360            replicas: vec![1, 2, 3],
1361            isr: vec![1, 2],
1362            leader_epoch: 0,
1363            adding_replicas: vec![3],
1364            removing_replicas: vec![],
1365            directories: vec![uuid::Uuid::nil(), uuid::Uuid::nil(), uuid::Uuid::nil()],
1366            partition_epoch: 0,
1367        }));
1368        let dir = uuid::Uuid::from_u128(0xABCD);
1369        m.apply(&MetadataRecord::V1PartitionDirAssignment(
1370            crate::records::PartitionDirAssignmentRecord {
1371                topic: "t".into(),
1372                partition: 0,
1373                replica: 2,
1374                directory: dir,
1375            },
1376        ));
1377        let pr = m.partition("t", 0).unwrap();
1378        // Slot 1 (replica 2) set; slots 0 and 2 untouched.
1379        assert!(pr.directories == vec![uuid::Uuid::nil(), dir, uuid::Uuid::nil()]);
1380        // The delta did NOT clobber the in-flight reassignment.
1381        assert!(pr.adding_replicas == vec![3]);
1382        assert!(pr.replicas == vec![1, 2, 3]);
1383        assert!(pr.isr == vec![1, 2]);
1384    }
1385
1386    #[test]
1387    fn apply_delete_clears_partitions() {
1388        let mut m = img();
1389        m.apply(&topic("t", 2));
1390        m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1391            topic: "t".into(),
1392            partition: 0,
1393            leader: 1,
1394            replicas: vec![1],
1395            isr: vec![1],
1396            leader_epoch: 0,
1397            adding_replicas: vec![],
1398            removing_replicas: vec![],
1399            directories: vec![],
1400            partition_epoch: 0,
1401        }));
1402        m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1403            topic: "t".into(),
1404            partition: 1,
1405            leader: 1,
1406            replicas: vec![1],
1407            isr: vec![1],
1408            leader_epoch: 0,
1409            adding_replicas: vec![],
1410            removing_replicas: vec![],
1411            directories: vec![],
1412            partition_epoch: 0,
1413        }));
1414        assert!(m.partitions_of("t").count() == 2);
1415        m.apply(&MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
1416            name: "t".into(),
1417        }));
1418        assert!(m.topic("t").is_none());
1419        assert!(m.partitions_of("t").count() == 0);
1420    }
1421
1422    #[test]
1423    fn validate_topic_exists_rejected() {
1424        let mut m = img();
1425        m.apply(&topic("t", 1));
1426        let err = m.validate(&topic("t", 1)).unwrap_err();
1427        assert!(matches!(err, MetadataError::TopicExists(_)));
1428    }
1429
1430    /// Apply `count` partition records (indices `0..count`) for `topic` so
1431    /// the image's derived partition count reflects a realistic topic. The
1432    /// `validate` partition-count check reads this derived count, not the
1433    /// stored `TopicRecord.partitions`.
1434    fn apply_partitions(m: &mut MetadataImage, topic: &str, count: i32) {
1435        for p in 0..count {
1436            m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1437                topic: topic.into(),
1438                partition: p,
1439                leader: 1,
1440                replicas: vec![1],
1441                isr: vec![1],
1442                leader_epoch: 0,
1443                adding_replicas: vec![],
1444                removing_replicas: vec![],
1445                directories: vec![],
1446                partition_epoch: 0,
1447            }));
1448        }
1449    }
1450
1451    #[test]
1452    fn validate_topic_partition_count_increase_allowed() {
1453        let mut m = img();
1454        m.apply(&topic("t", 1));
1455        apply_partitions(&mut m, "t", 1);
1456        let existing = m.topic("t").unwrap().clone();
1457        let updated = MetadataRecord::V1Topic(TopicRecord {
1458            name: "t".into(),
1459            topic_id: existing.topic_id,
1460            partitions: 3,
1461            replication_factor: existing.replication_factor,
1462        });
1463        assert!(m.validate(&updated).is_ok());
1464    }
1465
1466    #[test]
1467    fn validate_topic_partition_count_decrease_rejected() {
1468        let mut m = img();
1469        m.apply(&topic("t", 3));
1470        apply_partitions(&mut m, "t", 3);
1471        let existing = m.topic("t").unwrap().clone();
1472        let updated = MetadataRecord::V1Topic(TopicRecord {
1473            name: "t".into(),
1474            topic_id: existing.topic_id,
1475            partitions: 1,
1476            replication_factor: existing.replication_factor,
1477        });
1478        let err = m.validate(&updated).unwrap_err();
1479        assert!(matches!(err, MetadataError::TopicExists(_)));
1480    }
1481
1482    #[test]
1483    fn validate_delete_unknown_topic_rejected() {
1484        let m = img();
1485        let err = m
1486            .validate(&MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
1487                name: "ghost".into(),
1488            }))
1489            .unwrap_err();
1490        assert!(matches!(err, MetadataError::UnknownTopic(_)));
1491    }
1492
1493    #[test]
1494    fn validate_partition_for_unknown_topic_rejected() {
1495        let m = img();
1496        let p = MetadataRecord::V1Partition(PartitionRecord {
1497            topic: "ghost".into(),
1498            partition: 0,
1499            leader: 1,
1500            replicas: vec![1],
1501            isr: vec![1],
1502            leader_epoch: 0,
1503            adding_replicas: vec![],
1504            removing_replicas: vec![],
1505            directories: vec![],
1506            partition_epoch: 0,
1507        });
1508        let err = m.validate(&p).unwrap_err();
1509        assert!(matches!(err, MetadataError::UnknownTopic(_)));
1510    }
1511
1512    #[test]
1513    fn broker_registration_is_idempotent() {
1514        let mut m = img();
1515        let b = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
1516            node_id: 1,
1517            broker_epoch: 0,
1518            incarnation_id: Uuid::nil(),
1519            host: "h".into(),
1520            port: 9092,
1521            rack: None,
1522            endpoints: vec![],
1523        });
1524        m.apply(&b);
1525        m.apply(&b);
1526        assert!(m.brokers().count() == 1);
1527    }
1528
1529    #[test]
1530    fn apply_topic_config_inserts() {
1531        let mut m = img();
1532        m.apply(&topic("t", 1));
1533        let mut overrides = std::collections::BTreeMap::new();
1534        overrides.insert("retention.ms".to_string(), "60000".to_string());
1535        m.apply(&MetadataRecord::V1TopicConfig(
1536            crate::records::TopicConfigRecord {
1537                topic: "t".into(),
1538                overrides: overrides.clone(),
1539            },
1540        ));
1541        assert!(m.topic_config("t") == Some(&overrides));
1542    }
1543
1544    #[test]
1545    fn apply_topic_config_replaces_previous() {
1546        let mut m = img();
1547        m.apply(&topic("t", 1));
1548
1549        let mut first = std::collections::BTreeMap::new();
1550        first.insert("retention.ms".to_string(), "60000".to_string());
1551        first.insert("segment.bytes".to_string(), "1024".to_string());
1552        m.apply(&MetadataRecord::V1TopicConfig(
1553            crate::records::TopicConfigRecord {
1554                topic: "t".into(),
1555                overrides: first,
1556            },
1557        ));
1558
1559        let mut second = std::collections::BTreeMap::new();
1560        second.insert("retention.ms".to_string(), "120000".to_string());
1561        m.apply(&MetadataRecord::V1TopicConfig(
1562            crate::records::TopicConfigRecord {
1563                topic: "t".into(),
1564                overrides: second.clone(),
1565            },
1566        ));
1567
1568        // segment.bytes is GONE — last-write-wins is authoritative.
1569        assert!(m.topic_config("t") == Some(&second));
1570    }
1571
1572    #[test]
1573    fn delete_topic_clears_configs() {
1574        let mut m = img();
1575        m.apply(&topic("t", 1));
1576        let mut overrides = std::collections::BTreeMap::new();
1577        overrides.insert("retention.ms".to_string(), "60000".to_string());
1578        m.apply(&MetadataRecord::V1TopicConfig(
1579            crate::records::TopicConfigRecord {
1580                topic: "t".into(),
1581                overrides,
1582            },
1583        ));
1584        m.apply(&MetadataRecord::V1DeleteTopic(
1585            crate::records::DeleteTopicRecord { name: "t".into() },
1586        ));
1587        assert!(m.topic_config("t").is_none());
1588    }
1589
1590    #[test]
1591    fn validate_topic_config_for_unknown_topic_rejected() {
1592        let m = img();
1593        let r = MetadataRecord::V1TopicConfig(crate::records::TopicConfigRecord {
1594            topic: "ghost".into(),
1595            overrides: std::collections::BTreeMap::new(),
1596        });
1597        let err = m.validate(&r).unwrap_err();
1598        assert!(matches!(err, MetadataError::UnknownTopic(_)));
1599    }
1600
1601    #[test]
1602    fn apply_scram_credential_stores() {
1603        let mut m = img();
1604        m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1605            user: "alice".into(),
1606            mechanism: crabka_security::SaslMechanism::ScramSha512,
1607            salt: vec![1; 16],
1608            stored_key: vec![2; 64],
1609            server_key: vec![3; 64],
1610            iterations: 4096,
1611        }));
1612        let got = m.scram_credential("alice", crabka_security::SaslMechanism::ScramSha512);
1613        assert!(got.is_some());
1614        assert!(got.unwrap().iterations == 4096);
1615    }
1616
1617    #[test]
1618    fn apply_scram_credential_last_write_wins() {
1619        let mut m = img();
1620        let mech = crabka_security::SaslMechanism::ScramSha512;
1621        m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1622            user: "alice".into(),
1623            mechanism: mech,
1624            salt: vec![1; 16],
1625            stored_key: vec![2; 64],
1626            server_key: vec![3; 64],
1627            iterations: 4096,
1628        }));
1629        m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1630            user: "alice".into(),
1631            mechanism: mech,
1632            salt: vec![9; 16],
1633            stored_key: vec![9; 64],
1634            server_key: vec![9; 64],
1635            iterations: 8192,
1636        }));
1637        let got = m.scram_credential("alice", mech).unwrap();
1638        assert!(got.iterations == 8192);
1639        assert!(got.salt == vec![9; 16]);
1640    }
1641
1642    #[test]
1643    fn delete_scram_credential_removes() {
1644        let mut m = img();
1645        let mech = crabka_security::SaslMechanism::ScramSha512;
1646        m.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
1647            user: "alice".into(),
1648            mechanism: mech,
1649            salt: vec![1; 16],
1650            stored_key: vec![2; 64],
1651            server_key: vec![3; 64],
1652            iterations: 4096,
1653        }));
1654        m.apply(&MetadataRecord::V1DeleteScramCredential(
1655            DeleteScramCredentialRecord {
1656                user: "alice".into(),
1657                mechanism: mech,
1658            },
1659        ));
1660        assert!(m.scram_credential("alice", mech).is_none());
1661    }
1662
1663    fn topic_read_for_alice() -> AclEntry {
1664        AclEntry {
1665            resource_type: ResourceType::Topic,
1666            resource_name: "foo".into(),
1667            pattern_type: PatternType::Literal,
1668            principal: "User:alice".into(),
1669            host: "*".into(),
1670            operation: AclOperation::Read,
1671            permission_type: PermissionType::Allow,
1672        }
1673    }
1674
1675    fn topic_prefixed_team() -> AclEntry {
1676        AclEntry {
1677            resource_type: ResourceType::Topic,
1678            resource_name: "team-".into(),
1679            pattern_type: PatternType::Prefixed,
1680            principal: "User:alice".into(),
1681            host: "*".into(),
1682            operation: AclOperation::Read,
1683            permission_type: PermissionType::Allow,
1684        }
1685    }
1686
1687    #[test]
1688    fn apply_v1_access_control_entry_literal_stores_in_literal_map() {
1689        let mut m = img();
1690        m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1691        let mut hits: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1692        assert!(hits.len() == 1);
1693        assert!(hits.pop().unwrap().resource_name == "foo");
1694    }
1695
1696    #[test]
1697    fn apply_v1_access_control_entry_prefixed_stores_in_prefixed_vec() {
1698        let mut m = img();
1699        m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1700        let hits: Vec<_> = m.matching_acls(ResourceType::Topic, "team-foo").collect();
1701        assert!(hits.len() == 1);
1702        assert!(hits[0].resource_name == "team-");
1703        // Non-matching resource: empty.
1704        let none: Vec<_> = m.matching_acls(ResourceType::Topic, "other").collect();
1705        assert!(none.is_empty());
1706    }
1707
1708    #[test]
1709    fn matching_acls_combines_literal_and_prefixed() {
1710        let mut m = img();
1711        m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1712        m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1713        let hits_foo: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1714        let hits_team: Vec<_> = m.matching_acls(ResourceType::Topic, "team-x").collect();
1715        assert!(hits_foo.len() == 1);
1716        assert!(hits_team.len() == 1);
1717    }
1718
1719    fn topic_wildcard_allow() -> AclEntry {
1720        AclEntry {
1721            resource_type: ResourceType::Topic,
1722            resource_name: "*".into(),
1723            pattern_type: PatternType::Literal,
1724            principal: "User:alice".into(),
1725            host: "*".into(),
1726            operation: AclOperation::Read,
1727            permission_type: PermissionType::Allow,
1728        }
1729    }
1730
1731    fn topic_wildcard_deny() -> AclEntry {
1732        AclEntry {
1733            resource_type: ResourceType::Topic,
1734            resource_name: "*".into(),
1735            pattern_type: PatternType::Literal,
1736            principal: "User:alice".into(),
1737            host: "*".into(),
1738            operation: AclOperation::Read,
1739            permission_type: PermissionType::Deny,
1740        }
1741    }
1742
1743    #[test]
1744    fn matching_acls_literal_wildcard_allow_matches_arbitrary_topic() {
1745        let mut m = img();
1746        m.apply(&MetadataRecord::V1AccessControlEntry(topic_wildcard_allow()));
1747        // An arbitrarily-named topic the grant never mentions by name.
1748        let hits: Vec<_> = m
1749            .matching_acls(ResourceType::Topic, "some-random-topic")
1750            .collect();
1751        assert!(hits.len() == 1);
1752        assert!(hits[0].resource_name == "*");
1753        assert!(hits[0].permission_type == PermissionType::Allow);
1754        // Requesting "*" itself must not double-count the wildcard entry.
1755        let star: Vec<_> = m.matching_acls(ResourceType::Topic, "*").collect();
1756        assert!(star.len() == 1);
1757    }
1758
1759    #[test]
1760    fn matching_acls_literal_wildcard_deny_matches_arbitrary_topic() {
1761        let mut m = img();
1762        m.apply(&MetadataRecord::V1AccessControlEntry(topic_wildcard_deny()));
1763        let hits: Vec<_> = m
1764            .matching_acls(ResourceType::Topic, "another-random-topic")
1765            .collect();
1766        assert!(hits.len() == 1);
1767        assert!(hits[0].resource_name == "*");
1768        assert!(hits[0].permission_type == PermissionType::Deny);
1769    }
1770
1771    #[test]
1772    fn apply_v1_delete_access_control_entry_removes_matching() {
1773        let mut m = img();
1774        m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1775        m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1776        let filter = AclEntryFilter {
1777            resource_type: Some(ResourceType::Topic),
1778            pattern_type: Some(PatternType::Literal),
1779            ..AclEntryFilter::default()
1780        };
1781        m.apply(&MetadataRecord::V1DeleteAccessControlEntry(filter));
1782        let hits_foo: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1783        let hits_team: Vec<_> = m.matching_acls(ResourceType::Topic, "team-x").collect();
1784        assert!(hits_foo.len() == 0); // literal removed
1785        assert!(hits_team.len() == 1); // prefixed survives
1786    }
1787
1788    #[test]
1789    fn apply_v1_delete_access_control_entry_no_match_is_noop() {
1790        let mut m = img();
1791        m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1792        let filter = AclEntryFilter {
1793            resource_type: Some(ResourceType::Group),
1794            ..AclEntryFilter::default()
1795        };
1796        m.apply(&MetadataRecord::V1DeleteAccessControlEntry(filter));
1797        let hits: Vec<_> = m.matching_acls(ResourceType::Topic, "foo").collect();
1798        assert!(hits.len() == 1);
1799    }
1800
1801    #[test]
1802    fn all_acls_returns_every_entry() {
1803        let mut m = img();
1804        m.apply(&MetadataRecord::V1AccessControlEntry(topic_read_for_alice()));
1805        m.apply(&MetadataRecord::V1AccessControlEntry(topic_prefixed_team()));
1806        assert!(m.all_acls().count() == 2);
1807    }
1808
1809    #[test]
1810    fn all_partitions_count_matches_map_size() {
1811        let mut m = img();
1812        m.apply(&topic("t", 3));
1813        for p in 0..3 {
1814            m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1815                topic: "t".into(),
1816                partition: p,
1817                leader: 1,
1818                replicas: vec![1],
1819                isr: vec![1],
1820                leader_epoch: 0,
1821                adding_replicas: vec![],
1822                removing_replicas: vec![],
1823                directories: vec![],
1824                partition_epoch: 0,
1825            }));
1826        }
1827        m.apply(&topic("u", 1));
1828        m.apply(&MetadataRecord::V1Partition(PartitionRecord {
1829            topic: "u".into(),
1830            partition: 0,
1831            leader: 1,
1832            replicas: vec![1],
1833            isr: vec![1],
1834            leader_epoch: 0,
1835            adding_replicas: vec![],
1836            removing_replicas: vec![],
1837            directories: vec![],
1838            partition_epoch: 0,
1839        }));
1840        // 3 partitions for "t" + 1 for "u" = 4, one walk, no per-topic filter.
1841        assert!(m.all_partitions().count() == 4);
1842        assert!(
1843            m.all_partitions().count()
1844                == m.partitions_of("t").count() + m.partitions_of("u").count()
1845        );
1846    }
1847
1848    #[test]
1849    fn reassignments_in_flight_excludes_idle_partitions() {
1850        let mut img = MetadataImage::new(uuid::Uuid::nil());
1851        img.apply(&MetadataRecord::V1Topic(TopicRecord {
1852            name: "foo".into(),
1853            topic_id: uuid::Uuid::nil(),
1854            partitions: 1,
1855            replication_factor: 3,
1856        }));
1857        img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1858            topic: "foo".into(),
1859            partition: 0,
1860            leader: 1,
1861            replicas: vec![1, 2, 3],
1862            isr: vec![1, 2, 3],
1863            leader_epoch: 0,
1864            adding_replicas: vec![],
1865            removing_replicas: vec![],
1866            directories: vec![],
1867            partition_epoch: 0,
1868        }));
1869        assert!(img.reassignments_in_flight().count() == 0);
1870    }
1871
1872    #[test]
1873    fn reassignments_in_flight_returns_partitions_with_adding() {
1874        let mut img = MetadataImage::new(uuid::Uuid::nil());
1875        img.apply(&MetadataRecord::V1Topic(TopicRecord {
1876            name: "foo".into(),
1877            topic_id: uuid::Uuid::nil(),
1878            partitions: 1,
1879            replication_factor: 3,
1880        }));
1881        img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1882            topic: "foo".into(),
1883            partition: 0,
1884            leader: 1,
1885            replicas: vec![1, 2, 3, 4],
1886            isr: vec![1, 2, 3],
1887            leader_epoch: 0,
1888            adding_replicas: vec![4],
1889            removing_replicas: vec![],
1890            directories: vec![],
1891            partition_epoch: 0,
1892        }));
1893        let rows: Vec<_> = img.reassignments_in_flight().collect();
1894        assert!(rows.len() == 1);
1895        assert!(rows[0].adding_replicas == vec![4]);
1896    }
1897
1898    #[test]
1899    fn reassignments_in_flight_returns_partitions_with_removing() {
1900        let mut img = MetadataImage::new(uuid::Uuid::nil());
1901        img.apply(&MetadataRecord::V1Topic(TopicRecord {
1902            name: "foo".into(),
1903            topic_id: uuid::Uuid::nil(),
1904            partitions: 1,
1905            replication_factor: 3,
1906        }));
1907        img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1908            topic: "foo".into(),
1909            partition: 0,
1910            leader: 1,
1911            replicas: vec![1, 2, 3],
1912            isr: vec![1, 2, 3],
1913            leader_epoch: 0,
1914            adding_replicas: vec![],
1915            removing_replicas: vec![3],
1916            directories: vec![],
1917            partition_epoch: 0,
1918        }));
1919        let rows: Vec<_> = img.reassignments_in_flight().collect();
1920        assert!(rows.len() == 1);
1921        assert!(rows[0].removing_replicas == vec![3]);
1922    }
1923
1924    #[test]
1925    fn reassignments_in_flight_covers_multiple_topics() {
1926        let mut img = MetadataImage::new(uuid::Uuid::nil());
1927        for name in ["foo", "bar"] {
1928            img.apply(&MetadataRecord::V1Topic(TopicRecord {
1929                name: name.into(),
1930                topic_id: uuid::Uuid::nil(),
1931                partitions: 1,
1932                replication_factor: 3,
1933            }));
1934            img.apply(&MetadataRecord::V1Partition(PartitionRecord {
1935                topic: name.into(),
1936                partition: 0,
1937                leader: 1,
1938                replicas: vec![1, 2, 3, 4],
1939                isr: vec![1, 2, 3],
1940                leader_epoch: 0,
1941                adding_replicas: vec![4],
1942                removing_replicas: vec![],
1943                directories: vec![],
1944                partition_epoch: 0,
1945            }));
1946        }
1947        assert!(img.reassignments_in_flight().count() == 2);
1948    }
1949
1950    #[test]
1951    fn broker_config_set_inserts_into_image() {
1952        let mut img = MetadataImage::new(uuid::Uuid::nil());
1953        img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1954            node_id: 1,
1955            config_name: "leader.replication.throttled.rate".into(),
1956            config_value: Some("2048".into()),
1957        }));
1958        let bc = img.broker_config(1).expect("broker config");
1959        assert!(bc.get("leader.replication.throttled.rate") == Some(&"2048".to_string()));
1960    }
1961
1962    #[test]
1963    fn broker_config_delete_removes_from_image() {
1964        let mut img = MetadataImage::new(uuid::Uuid::nil());
1965        img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1966            node_id: 1,
1967            config_name: "leader.replication.throttled.rate".into(),
1968            config_value: Some("2048".into()),
1969        }));
1970        img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1971            node_id: 1,
1972            config_name: "leader.replication.throttled.rate".into(),
1973            config_value: None,
1974        }));
1975        let bc = img.broker_config(1).expect("broker_configs entry retained");
1976        assert!(bc.get("leader.replication.throttled.rate").is_none());
1977    }
1978
1979    #[test]
1980    fn broker_throttle_rate_parses_positive_value() {
1981        let mut img = MetadataImage::new(uuid::Uuid::nil());
1982        img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1983            node_id: 1,
1984            config_name: "leader.replication.throttled.rate".into(),
1985            config_value: Some("2048".into()),
1986        }));
1987        assert!(img.broker_throttle_rate(1, ThrottleKind::Leader) == Some(2048));
1988    }
1989
1990    #[test]
1991    fn broker_throttle_rate_returns_none_for_negative_one() {
1992        let mut img = MetadataImage::new(uuid::Uuid::nil());
1993        img.apply(&MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
1994            node_id: 1,
1995            config_name: "leader.replication.throttled.rate".into(),
1996            config_value: Some("-1".into()),
1997        }));
1998        assert!(img.broker_throttle_rate(1, ThrottleKind::Leader).is_none());
1999    }
2000
2001    #[test]
2002    fn client_quota_apply_inserts_canonicalized() {
2003        let mut img = MetadataImage::new(uuid::Uuid::nil());
2004        // Input order: (user, client-id) — should canonicalize to (client-id, user).
2005        img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2006            entity: vec![
2007                QuotaEntity {
2008                    entity_type: "user".into(),
2009                    entity_name: Some("alice".into()),
2010                },
2011                QuotaEntity {
2012                    entity_type: "client-id".into(),
2013                    entity_name: Some("app1".into()),
2014                },
2015            ],
2016            config_key: "producer_byte_rate".into(),
2017            config_value: Some(1024.0),
2018        }));
2019        let key: EntityKey = vec![
2020            ("client-id".into(), Some("app1".into())),
2021            ("user".into(), Some("alice".into())),
2022        ];
2023        let configs = img
2024            .client_quotas()
2025            .get(&key)
2026            .expect("entry under canonical key");
2027        assert!(configs.get("producer_byte_rate") == Some(&1024.0));
2028    }
2029
2030    #[test]
2031    fn client_quota_apply_delete_removes_key() {
2032        let mut img = MetadataImage::new(uuid::Uuid::nil());
2033        img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2034            entity: vec![QuotaEntity {
2035                entity_type: "user".into(),
2036                entity_name: Some("alice".into()),
2037            }],
2038            config_key: "producer_byte_rate".into(),
2039            config_value: Some(1024.0),
2040        }));
2041        img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2042            entity: vec![QuotaEntity {
2043                entity_type: "user".into(),
2044                entity_name: Some("alice".into()),
2045            }],
2046            config_key: "producer_byte_rate".into(),
2047            config_value: None,
2048        }));
2049        let key: EntityKey = vec![("user".into(), Some("alice".into()))];
2050        let configs = img.client_quotas().get(&key).expect("entry retained");
2051        assert!(configs.get("producer_byte_rate").is_none());
2052    }
2053
2054    #[test]
2055    fn client_quota_default_entity_uses_none_name() {
2056        let mut img = MetadataImage::new(uuid::Uuid::nil());
2057        img.apply(&MetadataRecord::V1ClientQuota(ClientQuotaRecord {
2058            entity: vec![QuotaEntity {
2059                entity_type: "user".into(),
2060                entity_name: None,
2061            }],
2062            config_key: "producer_byte_rate".into(),
2063            config_value: Some(512.0),
2064        }));
2065        let key: EntityKey = vec![("user".into(), None)];
2066        assert!(img.client_quotas().contains_key(&key));
2067    }
2068
2069    #[test]
2070    fn canonicalize_sorts_alphabetically_by_entity_type() {
2071        let input = vec![
2072            ("user".to_string(), Some("alice".to_string())),
2073            ("client-id".to_string(), Some("app1".to_string())),
2074        ];
2075        let canon = canonicalize_entity(input);
2076        assert!(canon[0].0 == "client-id");
2077        assert!(canon[1].0 == "user");
2078    }
2079
2080    #[test]
2081    fn scram_credentials_users_returns_distinct_users() {
2082        let mut img = MetadataImage::new(uuid::Uuid::nil());
2083        img.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
2084            user: "alice".into(),
2085            mechanism: SaslMechanism::ScramSha512,
2086            salt: vec![1, 2, 3],
2087            stored_key: vec![4, 5, 6],
2088            server_key: vec![7, 8, 9],
2089            iterations: 4096,
2090        }));
2091        img.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
2092            user: "bob".into(),
2093            mechanism: SaslMechanism::ScramSha512,
2094            salt: vec![1, 2, 3],
2095            stored_key: vec![4, 5, 6],
2096            server_key: vec![7, 8, 9],
2097            iterations: 4096,
2098        }));
2099        let mut users = img.scram_credentials_users();
2100        users.sort();
2101        assert!(users == vec!["alice".to_string(), "bob".to_string()]);
2102    }
2103
2104    fn principal(pt: &str, name: &str) -> KafkaPrincipal {
2105        KafkaPrincipal {
2106            principal_type: pt.into(),
2107            name: name.into(),
2108        }
2109    }
2110
2111    fn dt_record(
2112        token_id: &str,
2113        owner: KafkaPrincipal,
2114        expiry_timestamp_ms: i64,
2115        renewers: Vec<KafkaPrincipal>,
2116    ) -> MetadataRecord {
2117        MetadataRecord::V1DelegationToken(DelegationTokenRecord {
2118            token_id: token_id.into(),
2119            owner,
2120            hmac: vec![0x42; 32],
2121            issue_timestamp_ms: 1_000,
2122            expiry_timestamp_ms,
2123            max_timestamp_ms: 10_000,
2124            renewers,
2125        })
2126    }
2127
2128    #[test]
2129    fn apply_delegation_token_insert_and_replace() {
2130        let mut img = MetadataImage::new(uuid::Uuid::nil());
2131        let alice = principal("User", "alice");
2132
2133        img.apply(&dt_record("tok-1", alice.clone(), 5_000, vec![]));
2134        let got = img.delegation_token_by_id("tok-1").expect("token present");
2135        assert!(got.expiry_timestamp_ms == 5_000);
2136        assert!(got.owner == alice);
2137
2138        // Same token_id, different expiry — replace, not duplicate.
2139        img.apply(&dt_record("tok-1", alice.clone(), 7_500, vec![]));
2140        let got = img.delegation_token_by_id("tok-1").expect("token present");
2141        assert!(got.expiry_timestamp_ms == 7_500);
2142        assert!(img.all_delegation_tokens().count() == 1);
2143    }
2144
2145    #[test]
2146    fn apply_delete_delegation_token_removes_from_image() {
2147        let mut img = MetadataImage::new(uuid::Uuid::nil());
2148        let alice = principal("User", "alice");
2149
2150        img.apply(&dt_record("tok-1", alice, 5_000, vec![]));
2151        assert!(img.delegation_token_by_id("tok-1").is_some());
2152
2153        img.apply(&MetadataRecord::V1DeleteDelegationToken(
2154            DeleteDelegationTokenRecord {
2155                token_id: "tok-1".into(),
2156            },
2157        ));
2158        assert!(img.delegation_token_by_id("tok-1").is_none());
2159        assert!(img.all_delegation_tokens().count() == 0);
2160    }
2161
2162    #[test]
2163    fn delegation_token_by_hmac_finds_token_by_hmac_bytes() {
2164        let mut img = MetadataImage::new(uuid::Uuid::nil());
2165        let alice = principal("User", "alice");
2166        let bob = principal("User", "bob");
2167
2168        let hmac_a = vec![0xAA; 32];
2169        let hmac_b = vec![0xBB; 32];
2170        img.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
2171            token_id: "tok-a".into(),
2172            owner: alice,
2173            hmac: hmac_a.clone(),
2174            issue_timestamp_ms: 1_000,
2175            expiry_timestamp_ms: 5_000,
2176            max_timestamp_ms: 10_000,
2177            renewers: vec![],
2178        }));
2179        img.apply(&MetadataRecord::V1DelegationToken(DelegationTokenRecord {
2180            token_id: "tok-b".into(),
2181            owner: bob,
2182            hmac: hmac_b.clone(),
2183            issue_timestamp_ms: 1_000,
2184            expiry_timestamp_ms: 5_000,
2185            max_timestamp_ms: 10_000,
2186            renewers: vec![],
2187        }));
2188
2189        let found_a = img
2190            .delegation_token_by_hmac(&hmac_a)
2191            .expect("hmac_a present");
2192        assert!(found_a.token_id == "tok-a");
2193        let found_b = img
2194            .delegation_token_by_hmac(&hmac_b)
2195            .expect("hmac_b present");
2196        assert!(found_b.token_id == "tok-b");
2197        assert!(img.delegation_token_by_hmac(&[0xCC; 32]).is_none());
2198    }
2199
2200    #[test]
2201    fn delegation_tokens_by_owner_filters_correctly() {
2202        let mut img = MetadataImage::new(uuid::Uuid::nil());
2203        let alice = principal("User", "alice");
2204        let bob = principal("User", "bob");
2205
2206        img.apply(&dt_record("a-1", alice.clone(), 5_000, vec![]));
2207        img.apply(&dt_record("a-2", alice.clone(), 6_000, vec![bob.clone()]));
2208        img.apply(&dt_record("b-1", bob.clone(), 7_000, vec![]));
2209
2210        let alice_tokens = img.delegation_tokens_by_owner(&alice);
2211        assert!(alice_tokens.len() == 2);
2212        assert!(alice_tokens.iter().all(|t| t.owner == alice));
2213
2214        let bob_tokens = img.delegation_tokens_by_owner(&bob);
2215        assert!(bob_tokens.len() == 1);
2216        assert!(bob_tokens[0].token_id == "b-1");
2217
2218        // visible_to: bob owns b-1 and is renewer on a-2 → 2 tokens.
2219        let bob_visible = img.delegation_tokens_visible_to(&bob);
2220        assert!(bob_visible.len() == 2);
2221        let mut ids: Vec<&str> = bob_visible.iter().map(|t| t.token_id.as_str()).collect();
2222        ids.sort_unstable();
2223        assert!(ids == vec!["a-2", "b-1"]);
2224    }
2225
2226    #[test]
2227    fn applies_voters_and_version() {
2228        let mut image = MetadataImage::default();
2229        image.apply(&MetadataRecord::V1KRaftVersion(
2230            crate::records::KRaftVersionRecord { kraft_version: 1 },
2231        ));
2232        image.apply(&MetadataRecord::V1Voters(crate::records::VotersRecord {
2233            voters: crate::voters::VoterSet::from_voters([crate::voters::Voter {
2234                id: 1,
2235                directory_id: uuid::Uuid::nil(),
2236                endpoints: vec![],
2237                kraft_version: crate::voters::KRaftVersionRange::default(),
2238            }]),
2239        }));
2240        assert!(image.kraft_version() == 1);
2241        assert!(image.voters().contains(1));
2242    }
2243
2244    #[test]
2245    fn scram_credentials_for_user_returns_pairs() {
2246        let mut img = MetadataImage::new(uuid::Uuid::nil());
2247        img.apply(&MetadataRecord::V1ScramCredential(ScramCredentialRecord {
2248            user: "alice".into(),
2249            mechanism: SaslMechanism::ScramSha512,
2250            salt: vec![1, 2, 3],
2251            stored_key: vec![4, 5, 6],
2252            server_key: vec![7, 8, 9],
2253            iterations: 8192,
2254        }));
2255        let pairs = img.scram_credentials_for_user("alice");
2256        assert!(pairs.len() == 1);
2257        assert!(pairs[0].0 == SaslMechanism::ScramSha512);
2258        assert!(pairs[0].1 == 8192);
2259        assert!(img.scram_credentials_for_user("ghost").is_empty());
2260    }
2261
2262    #[test]
2263    fn finalized_metadata_version_reads_feature_map() {
2264        use crate::records::FeatureLevelRecord;
2265        let mut m = img();
2266        assert!(m.finalized_metadata_version() == None);
2267        m.apply(&MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
2268            name: "metadata.version".into(),
2269            level: 19,
2270        }));
2271        assert!(m.finalized_metadata_version() == Some(19));
2272    }
2273
2274    #[test]
2275    fn min_required_metadata_version_baseline_is_min() {
2276        use crate::metadata_version::METADATA_VERSION_MIN;
2277        let m = img();
2278        assert!(m.min_required_metadata_version() == METADATA_VERSION_MIN);
2279    }
2280
2281    #[test]
2282    fn client_metrics_config_apply_and_clear() {
2283        use crate::records::ClientMetricsConfigRecord;
2284        let mut img = MetadataImage::new(uuid::Uuid::nil());
2285        let mut cfgs = std::collections::BTreeMap::new();
2286        cfgs.insert("interval.ms".to_string(), "60000".to_string());
2287        img.apply(&MetadataRecord::V1ClientMetricsConfig(
2288            ClientMetricsConfigRecord {
2289                name: "sub-a".into(),
2290                configs: cfgs,
2291            },
2292        ));
2293        assert_eq!(
2294            img.client_metrics_config("sub-a")
2295                .and_then(|m| m.get("interval.ms"))
2296                .map(String::as_str),
2297            Some("60000")
2298        );
2299        assert_eq!(img.client_metrics_subscriptions().count(), 1);
2300        img.apply(&MetadataRecord::V1ClientMetricsConfig(
2301            ClientMetricsConfigRecord {
2302                name: "sub-a".into(),
2303                configs: std::collections::BTreeMap::new(),
2304            },
2305        ));
2306        assert!(img.client_metrics_config("sub-a").is_none());
2307        assert_eq!(img.client_metrics_subscriptions().count(), 0);
2308    }
2309
2310    #[test]
2311    fn min_required_metadata_version_rises_with_scram_and_tokens() {
2312        use crate::metadata_version::{DELEGATION_TOKEN_MIN_LEVEL, SCRAM_MIN_LEVEL};
2313        use crabka_security::{KafkaPrincipal, SaslMechanism};
2314        let mut m = img();
2315        m.apply(&MetadataRecord::V1ScramCredential(
2316            crate::records::ScramCredentialRecord {
2317                user: "alice".into(),
2318                mechanism: SaslMechanism::ScramSha512,
2319                salt: vec![1; 16],
2320                stored_key: vec![2; 64],
2321                server_key: vec![3; 64],
2322                iterations: 4096,
2323            },
2324        ));
2325        assert!(m.min_required_metadata_version() == SCRAM_MIN_LEVEL);
2326        m.apply(&MetadataRecord::V1DelegationToken(
2327            crate::records::DelegationTokenRecord {
2328                token_id: "t1".into(),
2329                owner: KafkaPrincipal {
2330                    principal_type: "User".into(),
2331                    name: "alice".into(),
2332                },
2333                hmac: vec![0x42; 32],
2334                issue_timestamp_ms: 1,
2335                expiry_timestamp_ms: 5,
2336                max_timestamp_ms: 10,
2337                renewers: vec![],
2338            },
2339        ));
2340        assert!(m.min_required_metadata_version() == DELEGATION_TOKEN_MIN_LEVEL);
2341    }
2342
2343    #[test]
2344    fn topic_by_id_resolves_and_drops_on_delete() {
2345        use crate::records::{MetadataRecord, TopicRecord};
2346        let mut img = MetadataImage::new(Uuid::nil());
2347        let id = Uuid::from_u128(0x1234_5678_9abc_def0_1122_3344_5566_7788);
2348        img.apply(&MetadataRecord::V1Topic(TopicRecord {
2349            name: "orders".into(),
2350            topic_id: id,
2351            partitions: 1,
2352            replication_factor: 1,
2353        }));
2354
2355        // Resolves by id to the same record `topic(name)` returns.
2356        assert!(img.topic_by_id(&id).map(|t| t.name.as_str()) == Some("orders"));
2357        assert!(img.topic_name_by_id(&id) == Some("orders"));
2358
2359        // After delete the id no longer resolves and the name index is gone.
2360        img.apply(&MetadataRecord::V1DeleteTopic(
2361            crate::records::DeleteTopicRecord {
2362                name: "orders".into(),
2363            },
2364        ));
2365        assert!(img.topic_by_id(&id).is_none());
2366        assert!(img.topic_name_by_id(&id).is_none());
2367    }
2368
2369    #[test]
2370    fn broker_epoch_reads_back_registered_epoch() {
2371        let mut image = MetadataImage::new(Uuid::nil());
2372        image.apply(&MetadataRecord::V1BrokerRegistration(
2373            BrokerRegistrationRecord {
2374                node_id: 5,
2375                broker_epoch: 99,
2376                incarnation_id: Uuid::nil(),
2377                host: "h".into(),
2378                port: 9092,
2379                rack: None,
2380                endpoints: vec![],
2381            },
2382        ));
2383        assert!(image.broker_epoch(5) == Some(99));
2384        assert!(image.broker_epoch(404) == None);
2385    }
2386}