mockforge_kafka/
consumer_groups.rs

1use std::collections::HashMap;
2
3/// Manages consumer groups for the Kafka broker
4#[derive(Debug)]
5pub struct ConsumerGroupManager {
6    groups: HashMap<String, ConsumerGroup>,
7}
8
9#[derive(Debug)]
10pub struct ConsumerGroup {
11    pub group_id: String,
12    pub members: HashMap<String, GroupMember>,
13    pub coordinator: GroupCoordinator,
14    pub offsets: HashMap<(String, i32), i64>, // (topic, partition) -> offset
15}
16
17#[derive(Debug)]
18pub struct GroupMember {
19    pub member_id: String,
20    pub client_id: String,
21    pub assignment: Vec<PartitionAssignment>,
22}
23
24#[derive(Debug, Clone)]
25pub struct PartitionAssignment {
26    pub topic: String,
27    pub partitions: Vec<i32>,
28}
29
30#[derive(Debug)]
31pub struct GroupCoordinator {
32    pub coordinator_id: i32,
33    pub host: String,
34    pub port: i32,
35}
36
37impl ConsumerGroupManager {
38    /// Create a new consumer group manager
39    pub fn new() -> Self {
40        Self {
41            groups: HashMap::new(),
42        }
43    }
44}
45
46impl Default for ConsumerGroupManager {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl ConsumerGroupManager {
53    /// Get a reference to all groups (for internal use)
54    pub fn groups(&self) -> &HashMap<String, ConsumerGroup> {
55        &self.groups
56    }
57
58    /// Join a consumer group
59    pub async fn join_group(
60        &mut self,
61        group_id: &str,
62        member_id: &str,
63        client_id: &str,
64    ) -> Result<JoinGroupResponse> {
65        let group = self.groups.entry(group_id.to_string()).or_insert_with(|| ConsumerGroup {
66            group_id: group_id.to_string(),
67            members: HashMap::new(),
68            coordinator: GroupCoordinator {
69                coordinator_id: 1,
70                host: "localhost".to_string(),
71                port: 9092,
72            },
73            offsets: HashMap::new(),
74        });
75
76        group.members.insert(
77            member_id.to_string(),
78            GroupMember {
79                member_id: member_id.to_string(),
80                client_id: client_id.to_string(),
81                assignment: vec![],
82            },
83        );
84
85        Ok(JoinGroupResponse {
86            generation_id: 1,
87            protocol_name: "consumer".to_string(),
88            leader: member_id.to_string(),
89            member_id: member_id.to_string(),
90            members: group.members.keys().cloned().collect(),
91        })
92    }
93
94    /// Sync group assignment
95    pub async fn sync_group(
96        &mut self,
97        group_id: &str,
98        assignments: Vec<PartitionAssignment>,
99        topics: &std::collections::HashMap<String, crate::topics::Topic>,
100    ) -> Result<()> {
101        if let Some(group) = self.groups.get_mut(group_id) {
102            // If assignments are provided, use them (leader assignment)
103            if !assignments.is_empty() {
104                // Distribute assignments to members
105                for assignment in assignments {
106                    // For simplicity, assign to all members (in real Kafka, leader assigns specific partitions to specific members)
107                    for member in group.members.values_mut() {
108                        member.assignment.push(assignment.clone());
109                    }
110                }
111            } else {
112                // Simple round-robin assignment if no assignments provided
113                Self::assign_partitions_round_robin(group, topics);
114            }
115            Ok(())
116        } else {
117            Err(anyhow::anyhow!("Group {} does not exist", group_id))
118        }
119    }
120
121    /// Assign partitions to group members using round-robin strategy
122    fn assign_partitions_round_robin(
123        group: &mut ConsumerGroup,
124        topics: &std::collections::HashMap<String, crate::topics::Topic>,
125    ) {
126        // Clear existing assignments for rebalance
127        for member in group.members.values_mut() {
128            member.assignment.clear();
129        }
130
131        let mut member_ids: Vec<String> = group.members.keys().cloned().collect();
132        member_ids.sort(); // Sort for deterministic assignment
133
134        let mut member_idx = 0;
135        for (topic_name, topic) in topics {
136            let num_partitions = topic.config.num_partitions as usize;
137            for partition_id in 0..num_partitions {
138                let member_id = &member_ids[member_idx % member_ids.len()];
139                if let Some(member) = group.members.get_mut(member_id.as_str()) {
140                    // Find or create assignment for this topic
141                    let assignment = member.assignment.iter_mut().find(|a| a.topic == *topic_name);
142                    if let Some(assignment) = assignment {
143                        assignment.partitions.push(partition_id as i32);
144                    } else {
145                        member.assignment.push(PartitionAssignment {
146                            topic: topic_name.clone(),
147                            partitions: vec![partition_id as i32],
148                        });
149                    }
150                }
151                member_idx += 1;
152            }
153        }
154    }
155
156    /// Commit consumer offsets
157    pub async fn commit_offsets(
158        &mut self,
159        group_id: &str,
160        offsets: HashMap<(String, i32), i64>,
161    ) -> Result<()> {
162        if let Some(group) = self.groups.get_mut(group_id) {
163            group.offsets.extend(offsets);
164            Ok(())
165        } else {
166            Err(anyhow::anyhow!("Group {} does not exist", group_id))
167        }
168    }
169
170    /// Get committed offsets for a group
171    pub fn get_committed_offsets(&self, group_id: &str) -> HashMap<(String, i32), i64> {
172        self.groups.get(group_id).map(|g| g.offsets.clone()).unwrap_or_default()
173    }
174
175    /// Simulate consumer lag
176    pub async fn simulate_lag(
177        &mut self,
178        group_id: &str,
179        topic: &str,
180        lag: i64,
181        topics: &std::collections::HashMap<String, crate::topics::Topic>,
182    ) {
183        if let Some(group) = self.groups.get_mut(group_id) {
184            // Get actual partition count from topics
185            let num_partitions =
186                topics.get(topic).map(|t| t.config.num_partitions).unwrap_or(1) as usize;
187            // Simulate lag by setting committed offsets behind
188            for partition in 0..num_partitions {
189                let key = (topic.to_string(), partition as i32);
190                let current_offset = group.offsets.get(&key).copied().unwrap_or(0);
191                group.offsets.insert(key, current_offset.saturating_sub(lag));
192            }
193            tracing::info!(
194                "Simulated lag of {} messages for group {} on topic {}",
195                lag,
196                group_id,
197                topic
198            );
199        }
200    }
201
202    /// Trigger rebalance for a group
203    pub async fn trigger_rebalance(&mut self, group_id: &str) {
204        if let Some(group) = self.groups.get_mut(group_id) {
205            // Simulate rebalance by clearing assignments and forcing rejoin
206            for member in group.members.values_mut() {
207                member.assignment.clear();
208            }
209            tracing::info!("Triggered rebalance for group {}", group_id);
210        }
211    }
212
213    /// Reset consumer offsets
214    pub async fn reset_offsets(
215        &mut self,
216        group_id: &str,
217        topic: &str,
218        to: &str,
219        topics: &std::collections::HashMap<String, crate::topics::Topic>,
220    ) {
221        if let Some(group) = self.groups.get_mut(group_id) {
222            if let Some(topic_data) = topics.get(topic) {
223                let num_partitions = topic_data.config.num_partitions as usize;
224                for partition in 0..num_partitions {
225                    let key = (topic.to_string(), partition as i32);
226                    let target_offset = match to {
227                        "earliest" => 0,
228                        "latest" => topic_data.partitions[partition].high_watermark,
229                        _ => return, // Invalid reset target
230                    };
231                    group.offsets.insert(key, target_offset);
232                }
233                tracing::info!("Reset offsets for group {} on topic {} to {}", group_id, topic, to);
234            }
235        }
236    }
237}
238
239/// Response for join group request
240#[derive(Debug)]
241pub struct JoinGroupResponse {
242    pub generation_id: i32,
243    pub protocol_name: String,
244    pub leader: String,
245    pub member_id: String,
246    pub members: Vec<String>,
247}
248
249type Result<T> = std::result::Result<T, anyhow::Error>;