crabka_client_consumer/group_metadata.rs
1//! KIP-447 consumer group metadata, handed to a transactional producer's
2//! `send_offsets_to_transaction` so the group coordinator can fence zombie
3//! producers via the consumer group's generation (classic) or member epoch
4//! (KIP-848 next-gen), instead of requiring one producer per input partition.
5
6/// The identity a consumer presents to a transactional producer for KIP-447
7/// offset-commit fencing. Mirrors the JVM's
8/// `org.apache.kafka.clients.consumer.ConsumerGroupMetadata`.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ConsumerGroupMetadata {
11 /// The consumer group id.
12 pub group_id: String,
13 /// Classic-group generation id, or — for a KIP-848 next-gen group — the
14 /// member epoch. Sent verbatim in `TxnOffsetCommitRequest.generation_id`
15 /// (matches the JVM wire convention); the coordinator interprets it per
16 /// group kind.
17 pub generation_id: i32,
18 /// The member id assigned by the coordinator at join time. Empty for a
19 /// simple consumer (manual assignment, no group membership).
20 pub member_id: String,
21 /// `group.instance.id` for static members; `None` for dynamic members.
22 pub group_instance_id: Option<String>,
23}
24
25impl ConsumerGroupMetadata {
26 /// Metadata for a producer committing offsets to a group it is not a
27 /// member of (manual partition assignment / simple consumer). The group
28 /// coordinator applies no generation/member fencing to this shape.
29 #[must_use]
30 pub fn for_group(group_id: impl Into<String>) -> Self {
31 Self {
32 group_id: group_id.into(),
33 generation_id: -1,
34 member_id: String::new(),
35 group_instance_id: None,
36 }
37 }
38}
39
40#[cfg(test)]
41mod tests {
42 use super::*;
43
44 #[test]
45 fn for_group_is_simple_consumer_shape() {
46 let m = ConsumerGroupMetadata::for_group("g");
47 assert!(m.group_id == "g");
48 assert!(m.generation_id == -1);
49 assert!(m.member_id.is_empty());
50 assert!(m.group_instance_id.is_none());
51 }
52}