Skip to main content

crabka_client_consumer/
group_metadata.rs

1//! KIP-447 consumer group metadata, handed to a transactional producer's
2//! `send_offsets_to_transaction` so the group coordinator can fence zombie
3//! producers via the consumer group's generation (classic) or member epoch
4//! (KIP-848 next-gen), instead of requiring one producer per input partition.
5
6/// The identity a consumer presents to a transactional producer for KIP-447
7/// offset-commit fencing. Mirrors the JVM's
8/// `org.apache.kafka.clients.consumer.ConsumerGroupMetadata`.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ConsumerGroupMetadata {
11    /// The consumer group id.
12    pub group_id: String,
13    /// Classic-group generation id, or — for a KIP-848 next-gen group — the
14    /// member epoch. Sent verbatim in `TxnOffsetCommitRequest.generation_id`
15    /// (matches the JVM wire convention); the coordinator interprets it per
16    /// group kind.
17    pub generation_id: i32,
18    /// The member id assigned by the coordinator at join time. Empty for a
19    /// simple consumer (manual assignment, no group membership).
20    pub member_id: String,
21    /// `group.instance.id` for static members; `None` for dynamic members.
22    pub group_instance_id: Option<String>,
23}
24
25impl ConsumerGroupMetadata {
26    /// Metadata for a producer committing offsets to a group it is not a
27    /// member of (manual partition assignment / simple consumer). The group
28    /// coordinator applies no generation/member fencing to this shape.
29    #[must_use]
30    pub fn for_group(group_id: impl Into<String>) -> Self {
31        Self {
32            group_id: group_id.into(),
33            generation_id: -1,
34            member_id: String::new(),
35            group_instance_id: None,
36        }
37    }
38}
39
40#[cfg(test)]
41mod tests {
42    use super::*;
43
44    #[test]
45    fn for_group_is_simple_consumer_shape() {
46        let m = ConsumerGroupMetadata::for_group("g");
47        assert!(m.group_id == "g");
48        assert!(m.generation_id == -1);
49        assert!(m.member_id.is_empty());
50        assert!(m.group_instance_id.is_none());
51    }
52}