Skip to main content

crabka_metadata/
records.rs

1//! Versioned metadata records. Future versions add variants; older
2//! readers can skip unknown ones because we encode each variant
3//! length-prefixed inside the `bincode` payload.
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8pub type NodeId = u64;
9
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub struct TopicRecord {
12    pub name: String,
13    pub topic_id: Uuid,
14    pub partitions: i32,
15    pub replication_factor: i16,
16}
17
18fn default_partition_epoch() -> i32 {
19    -1
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
23pub struct PartitionRecord {
24    pub topic: String,
25    pub partition: i32,
26    pub leader: NodeId,
27    pub replicas: Vec<NodeId>,
28    pub isr: Vec<NodeId>,
29    /// Per-partition leader epoch. Bumped on every leader change.
30    /// Older on-disk metadata is not migrated.
31    pub leader_epoch: i32,
32    /// Replicas being added in an in-flight reassignment. Empty when no
33    /// reassignment in flight. KIP-455.
34    pub adding_replicas: Vec<NodeId>,
35    /// Replicas being removed in an in-flight reassignment. Empty when
36    /// no reassignment in flight. KIP-455.
37    pub removing_replicas: Vec<NodeId>,
38    /// KIP-858: the log-directory UUID hosting each replica, parallel to
39    /// [`Self::replicas`] (same index order). `Uuid::nil()` is
40    /// `DirectoryId.UNASSIGNED` — the owning broker has not yet reported
41    /// its `AssignReplicasToDirs` for this replica. The controller maps a
42    /// broker's failed-dir UUID to the partitions it must fail over by
43    /// matching this against the broker's replica slot.
44    pub directories: Vec<Uuid>,
45    /// KIP-631: per-partition state epoch. Increments on every state change
46    /// (leader election, ISR change, reassignment). Set to 0 on creation.
47    /// Default of -1 matches the KIP-631 schema default for compatibility
48    /// with records written before this field was added.
49    #[serde(default = "default_partition_epoch")]
50    pub partition_epoch: i32,
51}
52
53/// KIP-858 directory-assignment delta. A broker reports which log-dir UUID
54/// hosts its replica of `(topic, partition)`. Applied as a DELTA: sets ONLY
55/// the reporting replica's slot in `PartitionRecord.directories`, never
56/// touching leader/isr/replicas/adding/removing — so it cannot clobber a
57/// concurrent reassignment or ISR change. On the `KRaft` log it rides a
58/// Crabka-private carrier (via `to_kraft`) so it decodes back to this same
59/// delta and applies as a one-slot merge — never a full-record replace.
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61pub struct PartitionDirAssignmentRecord {
62    pub topic: String,
63    pub partition: i32,
64    /// The reporting broker (must be a replica of the partition).
65    pub replica: NodeId,
66    /// The log-directory UUID hosting this broker's replica.
67    pub directory: Uuid,
68}
69
70/// A single named listener endpoint advertised by a broker. Stored as a
71/// list on [`BrokerRegistrationRecord::endpoints`] so KRaft-style metadata
72/// can advertise per-listener `host:port`/protocol triples to clients on
73/// `Metadata` v9+. Legacy single-listener brokers leave the list empty
74/// and rely on the top-level `host`+`port` fields.
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub struct BrokerEndpoint {
77    /// Listener name (e.g. `"PLAINTEXT"`, `"SSL"`, `"SASL_SSL"`).
78    pub name: String,
79    pub host: String,
80    pub port: u16,
81    pub protocol: crabka_security::ListenerProtocol,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
85pub struct BrokerRegistrationRecord {
86    pub node_id: NodeId,
87    /// KIP-903 broker epoch: the raft log offset at which this registration
88    /// record committed. The controller leader assigns it at append time
89    /// (`on_submit_change`); a freshly-built literal carries `0` until the
90    /// leader stamps it. Used to fence stale replicas from the ISR on
91    /// `AlterPartition`.
92    pub broker_epoch: i64,
93    /// KIP-631: UUID that identifies this specific process invocation of the
94    /// broker. Generated once at first boot and persisted in
95    /// `{log_dir}/incarnation_id`. A JVM controller uses it to detect
96    /// broker restarts and fence stale replica memberships.
97    #[serde(default)]
98    pub incarnation_id: uuid::Uuid,
99    /// Legacy single-listener host, used as inter-broker default and by
100    /// pre-v9 `Metadata` responses. v9+ projects [`Self::endpoints`].
101    pub host: String,
102    pub port: u16,
103    pub rack: Option<String>,
104    /// Per-listener endpoints. Empty on records written before this
105    /// field was added; populated from
106    /// `BrokerConfig::effective_listeners()` for self-registration.
107    pub endpoints: Vec<BrokerEndpoint>,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct DeleteTopicRecord {
112    pub name: String,
113}
114
115/// KIP-185 / `UnregisterBroker` (`api_key` 64). Marks a broker as
116/// permanently unregistered: the admin operator confirms the broker is
117/// gone for good and asks the cluster to drop its registration entry
118/// from the metadata image. Subsequent `Metadata` responses no longer
119/// advertise the broker's endpoints; clients stop routing to it.
120///
121/// Idempotent — applying twice (or against an unknown `node_id`) is a
122/// no-op.
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct UnregisterBrokerRecord {
125    pub node_id: NodeId,
126}
127
128/// Mutable topic configuration overrides. Authoritative target state:
129/// each `V1TopicConfig` record fully replaces the previous override map
130/// for `topic`. Empty map = clear all overrides. Merging happens at the
131/// `AlterConfigs` handler before the record is submitted.
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133pub struct TopicConfigRecord {
134    pub topic: String,
135    pub overrides: std::collections::BTreeMap<String, String>,
136}
137
138/// Per-broker configuration key/value pair. `Some(value)` = set; `None` = delete.
139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct BrokerConfigRecord {
141    pub node_id: NodeId,
142    pub config_name: String,
143    /// `Some(value)` = set; `None` = delete.
144    pub config_value: Option<String>,
145}
146
147/// KIP-714 client-metrics subscription config. Authoritative target
148/// state: each `V1ClientMetricsConfig` fully replaces the previous
149/// override map for `name` (the subscription name). Empty map = delete
150/// the subscription. Merging happens at the `IncrementalAlterConfigs`
151/// handler before the record is submitted (same pattern as
152/// [`TopicConfigRecord`]).
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct ClientMetricsConfigRecord {
155    pub name: String,
156    pub configs: std::collections::BTreeMap<String, String>,
157}
158
159#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
160pub struct QuotaEntity {
161    pub entity_type: String,
162    pub entity_name: Option<String>,
163}
164
165#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
166pub struct ClientQuotaRecord {
167    /// Canonicalized entity tuple — sorted by `entity_type` alphabetically.
168    pub entity: Vec<QuotaEntity>,
169    pub config_key: String,
170    pub config_value: Option<f64>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
174pub struct ScramCredentialRecord {
175    pub user: String,
176    pub mechanism: crabka_security::SaslMechanism,
177    pub salt: Vec<u8>,
178    pub stored_key: Vec<u8>,
179    pub server_key: Vec<u8>,
180    pub iterations: u32,
181}
182
183#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
184pub struct DeleteScramCredentialRecord {
185    pub user: String,
186    pub mechanism: crabka_security::SaslMechanism,
187}
188
189/// A single delegation token's authoritative state (KIP-48).
190/// Replacement semantics — appending a new record with the same
191/// `token_id` overwrites the prior one in the image (used by both
192/// Create and Renew). Removal goes through
193/// [`DeleteDelegationTokenRecord`]. `hmac` is the 32-byte HMAC-SHA-256
194/// over `token_id` keyed by the broker's master secret key; clients
195/// authenticate via SCRAM-SHA-256 using the hex-encoded HMAC as the
196/// password.
197#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
198pub struct DelegationTokenRecord {
199    pub token_id: String,
200    pub owner: crabka_security::KafkaPrincipal,
201    pub hmac: Vec<u8>,
202    pub issue_timestamp_ms: i64,
203    pub expiry_timestamp_ms: i64,
204    /// Issue + max-lifetime; renewals cannot push `expiry_timestamp_ms`
205    /// past this ceiling.
206    pub max_timestamp_ms: i64,
207    pub renewers: Vec<crabka_security::KafkaPrincipal>,
208}
209
210/// Tombstone record removing a delegation token (KIP-48)
211/// from the image. Emitted by `ExpireDelegationToken` handlers and the
212/// background expiry sweep.
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214pub struct DeleteDelegationTokenRecord {
215    pub token_id: String,
216}
217
218/// KIP-853: finalizes the cluster-wide kraft.version feature level.
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220pub struct KRaftVersionRecord {
221    pub kraft_version: u16,
222}
223
224/// KIP-853: full snapshot of the controller voter set.
225#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
226pub struct VotersRecord {
227    pub voters: crate::voters::VoterSet,
228}
229
230/// KIP-584 finalized feature level. `level` is the finalized
231/// `max_version_level` for `name`. `level == 0` is the KIP-584 sentinel
232/// for "delete this finalized feature" — `MetadataImage::apply` removes the
233/// entry rather than storing a zero. Replacement semantics: a later record
234/// with the same `name` overwrites the previous level.
235#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
236pub struct FeatureLevelRecord {
237    pub name: String,
238    pub level: i16,
239}
240
241/// Snapshot-only carrier for the KIP-584 finalized-features epoch.
242///
243/// The epoch is normally apply-derived (one bump per `V1FeatureLevel`
244/// applied, so it tracks the history of `UpdateFeatures` calls, not the live
245/// feature count). That derivation can't survive a snapshot: a snapshot
246/// stores resulting *state*, so it emits at most one `V1FeatureLevel` per
247/// live feature — fewer records than the original apply history. Replaying
248/// those alone would reconstruct a smaller epoch and diverge from a replica
249/// that replayed the full log.
250///
251/// So [`MetadataImage::to_records`](crate::MetadataImage::to_records) emits
252/// this record last, and [`MetadataImage::apply`](crate::MetadataImage::apply)
253/// SETS the epoch from it verbatim (rather than bumping), pinning the
254/// reconstructed epoch to the original. It is produced only by `to_records`
255/// and consumed only on snapshot replay — it is never submitted as a
256/// controller change, so it never appears in the live Raft log.
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
258pub struct FeaturesEpochRecord {
259    pub epoch: i64,
260}
261
262#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
263#[non_exhaustive]
264pub enum MetadataRecord {
265    V1Topic(TopicRecord),
266    V1Partition(PartitionRecord),
267    V1BrokerRegistration(BrokerRegistrationRecord),
268    V1DeleteTopic(DeleteTopicRecord),
269    V1TopicConfig(TopicConfigRecord),
270    V1ScramCredential(ScramCredentialRecord),
271    V1DeleteScramCredential(DeleteScramCredentialRecord),
272    V1AccessControlEntry(crate::AclEntry),
273    V1DeleteAccessControlEntry(crate::AclEntryFilter),
274    V1BrokerConfig(BrokerConfigRecord),
275    V1ClientQuota(ClientQuotaRecord),
276    V1DelegationToken(DelegationTokenRecord),
277    V1DeleteDelegationToken(DeleteDelegationTokenRecord),
278    V1UnregisterBroker(UnregisterBrokerRecord),
279    V1KRaftVersion(KRaftVersionRecord),
280    V1Voters(VotersRecord),
281    V1FeatureLevel(FeatureLevelRecord),
282    V1ClientMetricsConfig(ClientMetricsConfigRecord),
283    /// Snapshot-only: pins the finalized-features epoch on reconstruction.
284    /// Never submitted via the controller; see [`FeaturesEpochRecord`].
285    V1FeaturesEpoch(FeaturesEpochRecord),
286    /// KIP-858 directory-assignment delta (see [`PartitionDirAssignmentRecord`]).
287    /// Applied as a merge into one replica's `directories` slot; on the `KRaft`
288    /// log it rides a Crabka-private carrier so it stays a delta end-to-end.
289    V1PartitionDirAssignment(PartitionDirAssignmentRecord),
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use assert2::assert;
296    use serde_wincode::SerdeCompat;
297    use wincode::{Deserialize as _, Serialize as _};
298
299    fn round_trip(r: &MetadataRecord) -> MetadataRecord {
300        let bytes = <SerdeCompat<MetadataRecord>>::serialize(r).unwrap();
301        <SerdeCompat<MetadataRecord>>::deserialize(&bytes).unwrap()
302    }
303
304    #[test]
305    fn feature_level_round_trip() {
306        let r = MetadataRecord::V1FeatureLevel(FeatureLevelRecord {
307            name: "metadata.version".into(),
308            level: 1,
309        });
310        assert!(round_trip(&r) == r);
311    }
312
313    #[test]
314    fn features_epoch_round_trip() {
315        let r = MetadataRecord::V1FeaturesEpoch(FeaturesEpochRecord { epoch: 7 });
316        assert!(round_trip(&r) == r);
317    }
318
319    #[test]
320    fn topic_record_round_trip() {
321        let r = MetadataRecord::V1Topic(TopicRecord {
322            name: "t".into(),
323            topic_id: Uuid::new_v4(),
324            partitions: 3,
325            replication_factor: 1,
326        });
327        assert!(round_trip(&r) == r);
328    }
329
330    #[test]
331    fn partition_record_round_trip() {
332        let r = MetadataRecord::V1Partition(PartitionRecord {
333            topic: "t".into(),
334            partition: 0,
335            leader: 1,
336            replicas: vec![1, 2, 3],
337            isr: vec![1, 2],
338            leader_epoch: 0,
339            adding_replicas: vec![],
340            removing_replicas: vec![],
341            directories: vec![Uuid::from_u128(1), Uuid::from_u128(2), Uuid::nil()],
342            partition_epoch: 0,
343        });
344        assert!(round_trip(&r) == r);
345    }
346
347    #[test]
348    fn partition_dir_assignment_round_trip() {
349        let r = MetadataRecord::V1PartitionDirAssignment(PartitionDirAssignmentRecord {
350            topic: "t".into(),
351            partition: 2,
352            replica: 3,
353            directory: Uuid::from_u128(0xAB),
354        });
355        assert!(round_trip(&r) == r);
356    }
357
358    #[test]
359    fn broker_registration_round_trip() {
360        let r = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
361            node_id: 7,
362            broker_epoch: 0,
363            incarnation_id: Uuid::from_u128(0xdeadbeef_cafe_babe_0123_456789abcdef),
364            host: "192.168.1.10".into(),
365            port: 9092,
366            rack: Some("us-east-1a".into()),
367            endpoints: vec![],
368        });
369        assert!(round_trip(&r) == r);
370    }
371
372    #[test]
373    fn broker_registration_with_endpoints_round_trip() {
374        let r = MetadataRecord::V1BrokerRegistration(BrokerRegistrationRecord {
375            node_id: 1,
376            broker_epoch: 0,
377            incarnation_id: Uuid::from_u128(0xfeedface_0000_0000_0000_000000000001),
378            host: "h".into(),
379            port: 9092,
380            rack: None,
381            endpoints: vec![BrokerEndpoint {
382                name: "EXTERNAL".into(),
383                host: "ext.example.com".into(),
384                port: 9092,
385                protocol: crabka_security::ListenerProtocol::SaslSsl,
386            }],
387        });
388        assert!(round_trip(&r) == r);
389    }
390
391    #[test]
392    fn delete_topic_round_trip() {
393        let r = MetadataRecord::V1DeleteTopic(DeleteTopicRecord {
394            name: "doomed".into(),
395        });
396        assert!(round_trip(&r) == r);
397    }
398
399    #[test]
400    fn unregister_broker_round_trip() {
401        let r = MetadataRecord::V1UnregisterBroker(UnregisterBrokerRecord { node_id: 42 });
402        assert!(round_trip(&r) == r);
403    }
404
405    #[test]
406    fn topic_config_record_round_trip() {
407        let mut overrides = std::collections::BTreeMap::new();
408        overrides.insert("retention.ms".to_string(), "60000".to_string());
409        overrides.insert("segment.bytes".to_string(), "1048576".to_string());
410        let r = MetadataRecord::V1TopicConfig(TopicConfigRecord {
411            topic: "t".into(),
412            overrides,
413        });
414        assert!(round_trip(&r) == r);
415    }
416
417    #[test]
418    fn scram_credential_round_trip() {
419        let r = MetadataRecord::V1ScramCredential(ScramCredentialRecord {
420            user: "alice".into(),
421            mechanism: crabka_security::SaslMechanism::ScramSha512,
422            salt: vec![1u8; 16],
423            stored_key: vec![2u8; 64],
424            server_key: vec![3u8; 64],
425            iterations: 4096,
426        });
427        assert!(round_trip(&r) == r);
428    }
429
430    #[test]
431    fn delete_scram_credential_round_trip() {
432        let r = MetadataRecord::V1DeleteScramCredential(DeleteScramCredentialRecord {
433            user: "alice".into(),
434            mechanism: crabka_security::SaslMechanism::ScramSha512,
435        });
436        assert!(round_trip(&r) == r);
437    }
438
439    #[test]
440    fn v1_access_control_entry_round_trip() {
441        let entry = crate::AclEntry {
442            resource_type: crate::ResourceType::Topic,
443            resource_name: "foo".into(),
444            pattern_type: crate::PatternType::Literal,
445            principal: "User:alice".into(),
446            host: "*".into(),
447            operation: crate::AclOperation::Read,
448            permission_type: crate::PermissionType::Allow,
449        };
450        let r = MetadataRecord::V1AccessControlEntry(entry);
451        assert!(round_trip(&r) == r);
452    }
453
454    #[test]
455    fn v1_delete_access_control_entry_round_trip() {
456        let filter = crate::AclEntryFilter {
457            resource_type: Some(crate::ResourceType::Group),
458            resource_name: Some("cg-foo".into()),
459            pattern_type: Some(crate::PatternType::Literal),
460            principal: None,
461            host: None,
462            operation: None,
463            permission_type: None,
464        };
465        let r = MetadataRecord::V1DeleteAccessControlEntry(filter);
466        assert!(round_trip(&r) == r);
467    }
468
469    #[test]
470    fn broker_config_record_round_trip() {
471        let r = MetadataRecord::V1BrokerConfig(BrokerConfigRecord {
472            node_id: 7,
473            config_name: "leader.replication.throttled.rate".into(),
474            config_value: Some("2048".into()),
475        });
476        assert!(round_trip(&r) == r);
477    }
478
479    #[test]
480    fn client_quota_record_round_trip() {
481        let r = MetadataRecord::V1ClientQuota(ClientQuotaRecord {
482            entity: vec![
483                QuotaEntity {
484                    entity_type: "client-id".into(),
485                    entity_name: Some("app1".into()),
486                },
487                QuotaEntity {
488                    entity_type: "user".into(),
489                    entity_name: Some("alice".into()),
490                },
491            ],
492            config_key: "producer_byte_rate".into(),
493            config_value: Some(1024.0),
494        });
495        assert!(round_trip(&r) == r);
496    }
497
498    #[test]
499    fn delegation_token_record_round_trip() {
500        let r = MetadataRecord::V1DelegationToken(DelegationTokenRecord {
501            token_id: "tok-abc".into(),
502            owner: crabka_security::KafkaPrincipal {
503                principal_type: "User".into(),
504                name: "alice".into(),
505            },
506            hmac: vec![0xAB; 32],
507            issue_timestamp_ms: 1_700_000_000_000,
508            expiry_timestamp_ms: 1_700_000_600_000,
509            max_timestamp_ms: 1_700_604_800_000,
510            renewers: vec![crabka_security::KafkaPrincipal {
511                principal_type: "User".into(),
512                name: "bob".into(),
513            }],
514        });
515        assert!(round_trip(&r) == r);
516    }
517
518    #[test]
519    fn delete_delegation_token_record_round_trip() {
520        let r = MetadataRecord::V1DeleteDelegationToken(DeleteDelegationTokenRecord {
521            token_id: "tok-abc".into(),
522        });
523        assert!(round_trip(&r) == r);
524    }
525
526    #[test]
527    fn voters_record_round_trips() {
528        let rec = MetadataRecord::V1Voters(VotersRecord {
529            voters: crate::voters::VoterSet::from_voters([crate::voters::Voter {
530                id: 7,
531                directory_id: uuid::Uuid::from_u128(7),
532                endpoints: vec![crate::voters::VoterEndpoint {
533                    name: "CONTROLLER".into(),
534                    host: "h".into(),
535                    port: 1,
536                }],
537                kraft_version: crate::voters::KRaftVersionRange::default(),
538            }]),
539        });
540        assert!(round_trip(&rec) == rec);
541    }
542
543    #[test]
544    fn kraft_version_record_round_trips() {
545        let rec = MetadataRecord::V1KRaftVersion(KRaftVersionRecord { kraft_version: 1 });
546        assert!(round_trip(&rec) == rec);
547    }
548
549    #[test]
550    fn client_metrics_config_round_trip() {
551        let mut overrides = std::collections::BTreeMap::new();
552        overrides.insert("interval.ms".to_string(), "60000".to_string());
553        overrides.insert(
554            "metrics".to_string(),
555            "org.apache.kafka.consumer.".to_string(),
556        );
557        let r = MetadataRecord::V1ClientMetricsConfig(ClientMetricsConfigRecord {
558            name: "sub-a".into(),
559            configs: overrides,
560        });
561        assert_eq!(round_trip(&r), r);
562    }
563}