mockforge_kafka/
consumer_groups.rs

1use std::collections::HashMap;
2
3/// Manages consumer groups for the Kafka broker
4#[derive(Debug)]
5pub struct ConsumerGroupManager {
6    groups: HashMap<String, ConsumerGroup>,
7}
8
9#[derive(Debug)]
10pub struct ConsumerGroup {
11    pub group_id: String,
12    pub members: HashMap<String, GroupMember>,
13    pub coordinator: GroupCoordinator,
14    pub offsets: HashMap<(String, i32), i64>, // (topic, partition) -> offset
15}
16
17#[derive(Debug)]
18pub struct GroupMember {
19    pub member_id: String,
20    pub client_id: String,
21    pub assignment: Vec<PartitionAssignment>,
22}
23
24#[derive(Debug, Clone)]
25pub struct PartitionAssignment {
26    pub topic: String,
27    pub partitions: Vec<i32>,
28}
29
30#[derive(Debug)]
31pub struct GroupCoordinator {
32    pub coordinator_id: i32,
33    pub host: String,
34    pub port: i32,
35}
36
37impl ConsumerGroupManager {
38    /// Create a new consumer group manager
39    pub fn new() -> Self {
40        Self {
41            groups: HashMap::new(),
42        }
43    }
44}
45
46impl Default for ConsumerGroupManager {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl ConsumerGroupManager {
53    /// Get a reference to all groups (for internal use)
54    pub fn groups(&self) -> &HashMap<String, ConsumerGroup> {
55        &self.groups
56    }
57
58    /// Join a consumer group
59    pub async fn join_group(
60        &mut self,
61        group_id: &str,
62        member_id: &str,
63        client_id: &str,
64    ) -> Result<JoinGroupResponse> {
65        let group = self.groups.entry(group_id.to_string()).or_insert_with(|| ConsumerGroup {
66            group_id: group_id.to_string(),
67            members: HashMap::new(),
68            coordinator: GroupCoordinator {
69                coordinator_id: 1,
70                host: "localhost".to_string(),
71                port: 9092,
72            },
73            offsets: HashMap::new(),
74        });
75
76        group.members.insert(
77            member_id.to_string(),
78            GroupMember {
79                member_id: member_id.to_string(),
80                client_id: client_id.to_string(),
81                assignment: vec![],
82            },
83        );
84
85        Ok(JoinGroupResponse {
86            generation_id: 1,
87            protocol_name: "consumer".to_string(),
88            leader: member_id.to_string(),
89            member_id: member_id.to_string(),
90            members: group.members.keys().cloned().collect(),
91        })
92    }
93
94    /// Sync group assignment
95    pub async fn sync_group(
96        &mut self,
97        group_id: &str,
98        assignments: Vec<PartitionAssignment>,
99        topics: &std::collections::HashMap<String, crate::topics::Topic>,
100    ) -> Result<()> {
101        if let Some(group) = self.groups.get_mut(group_id) {
102            // If assignments are provided, use them (leader assignment)
103            if !assignments.is_empty() {
104                // Distribute assignments to members
105                for assignment in assignments {
106                    // For simplicity, assign to all members (in real Kafka, leader assigns specific partitions to specific members)
107                    for member in group.members.values_mut() {
108                        member.assignment.push(assignment.clone());
109                    }
110                }
111            } else {
112                // Simple round-robin assignment if no assignments provided
113                Self::assign_partitions_round_robin(group, topics);
114            }
115            Ok(())
116        } else {
117            Err(anyhow::anyhow!("Group {} does not exist", group_id))
118        }
119    }
120
121    /// Assign partitions to group members using round-robin strategy
122    fn assign_partitions_round_robin(
123        group: &mut ConsumerGroup,
124        topics: &std::collections::HashMap<String, crate::topics::Topic>,
125    ) {
126        // Clear existing assignments for rebalance
127        for member in group.members.values_mut() {
128            member.assignment.clear();
129        }
130
131        let mut member_ids: Vec<String> = group.members.keys().cloned().collect();
132        member_ids.sort(); // Sort for deterministic assignment
133
134        let mut member_idx = 0;
135        for (topic_name, topic) in topics {
136            let num_partitions = topic.config.num_partitions as usize;
137            for partition_id in 0..num_partitions {
138                let member_id = &member_ids[member_idx % member_ids.len()];
139                if let Some(member) = group.members.get_mut(member_id.as_str()) {
140                    // Find or create assignment for this topic
141                    let assignment = member.assignment.iter_mut().find(|a| a.topic == *topic_name);
142                    if let Some(assignment) = assignment {
143                        assignment.partitions.push(partition_id as i32);
144                    } else {
145                        member.assignment.push(PartitionAssignment {
146                            topic: topic_name.clone(),
147                            partitions: vec![partition_id as i32],
148                        });
149                    }
150                }
151                member_idx += 1;
152            }
153        }
154    }
155
156    /// Commit consumer offsets
157    pub async fn commit_offsets(
158        &mut self,
159        group_id: &str,
160        offsets: HashMap<(String, i32), i64>,
161    ) -> Result<()> {
162        if let Some(group) = self.groups.get_mut(group_id) {
163            group.offsets.extend(offsets);
164            Ok(())
165        } else {
166            Err(anyhow::anyhow!("Group {} does not exist", group_id))
167        }
168    }
169
170    /// Get committed offsets for a group
171    pub fn get_committed_offsets(&self, group_id: &str) -> HashMap<(String, i32), i64> {
172        self.groups.get(group_id).map(|g| g.offsets.clone()).unwrap_or_default()
173    }
174
175    /// Simulate consumer lag
176    pub async fn simulate_lag(
177        &mut self,
178        group_id: &str,
179        topic: &str,
180        lag: i64,
181        topics: &std::collections::HashMap<String, crate::topics::Topic>,
182    ) {
183        if let Some(group) = self.groups.get_mut(group_id) {
184            // Get actual partition count from topics
185            let num_partitions =
186                topics.get(topic).map(|t| t.config.num_partitions).unwrap_or(1) as usize;
187            // Simulate lag by setting committed offsets behind
188            for partition in 0..num_partitions {
189                let key = (topic.to_string(), partition as i32);
190                let current_offset = group.offsets.get(&key).copied().unwrap_or(0);
191                group.offsets.insert(key, current_offset.saturating_sub(lag));
192            }
193            tracing::info!(
194                "Simulated lag of {} messages for group {} on topic {}",
195                lag,
196                group_id,
197                topic
198            );
199        }
200    }
201
202    /// Trigger rebalance for a group
203    pub async fn trigger_rebalance(&mut self, group_id: &str) {
204        if let Some(group) = self.groups.get_mut(group_id) {
205            // Simulate rebalance by clearing assignments and forcing rejoin
206            for member in group.members.values_mut() {
207                member.assignment.clear();
208            }
209            tracing::info!("Triggered rebalance for group {}", group_id);
210        }
211    }
212
213    /// Reset consumer offsets
214    pub async fn reset_offsets(
215        &mut self,
216        group_id: &str,
217        topic: &str,
218        to: &str,
219        topics: &std::collections::HashMap<String, crate::topics::Topic>,
220    ) {
221        if let Some(group) = self.groups.get_mut(group_id) {
222            if let Some(topic_data) = topics.get(topic) {
223                let num_partitions = topic_data.config.num_partitions as usize;
224                for partition in 0..num_partitions {
225                    let key = (topic.to_string(), partition as i32);
226                    let target_offset = match to {
227                        "earliest" => 0,
228                        "latest" => topic_data.partitions[partition].high_watermark,
229                        _ => return, // Invalid reset target
230                    };
231                    group.offsets.insert(key, target_offset);
232                }
233                tracing::info!("Reset offsets for group {} on topic {} to {}", group_id, topic, to);
234            }
235        }
236    }
237}
238
239/// Response for join group request
240#[derive(Debug)]
241pub struct JoinGroupResponse {
242    pub generation_id: i32,
243    pub protocol_name: String,
244    pub leader: String,
245    pub member_id: String,
246    pub members: Vec<String>,
247}
248
249type Result<T> = std::result::Result<T, anyhow::Error>;
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn test_consumer_group_manager_new() {
257        let manager = ConsumerGroupManager::new();
258        assert!(manager.groups().is_empty());
259    }
260
261    #[test]
262    fn test_consumer_group_manager_default() {
263        let manager = ConsumerGroupManager::default();
264        assert!(manager.groups().is_empty());
265    }
266
267    #[test]
268    fn test_partition_assignment_clone() {
269        let assignment = PartitionAssignment {
270            topic: "test-topic".to_string(),
271            partitions: vec![0, 1, 2],
272        };
273
274        let cloned = assignment.clone();
275        assert_eq!(assignment.topic, cloned.topic);
276        assert_eq!(assignment.partitions, cloned.partitions);
277    }
278
279    #[test]
280    fn test_partition_assignment_debug() {
281        let assignment = PartitionAssignment {
282            topic: "test".to_string(),
283            partitions: vec![0],
284        };
285        let debug = format!("{:?}", assignment);
286        assert!(debug.contains("PartitionAssignment"));
287        assert!(debug.contains("test"));
288    }
289
290    #[test]
291    fn test_group_member_debug() {
292        let member = GroupMember {
293            member_id: "member-1".to_string(),
294            client_id: "client-1".to_string(),
295            assignment: vec![],
296        };
297        let debug = format!("{:?}", member);
298        assert!(debug.contains("GroupMember"));
299        assert!(debug.contains("member-1"));
300    }
301
302    #[test]
303    fn test_group_coordinator_debug() {
304        let coordinator = GroupCoordinator {
305            coordinator_id: 1,
306            host: "localhost".to_string(),
307            port: 9092,
308        };
309        let debug = format!("{:?}", coordinator);
310        assert!(debug.contains("GroupCoordinator"));
311        assert!(debug.contains("localhost"));
312    }
313
314    #[test]
315    fn test_consumer_group_debug() {
316        let group = ConsumerGroup {
317            group_id: "test-group".to_string(),
318            members: HashMap::new(),
319            coordinator: GroupCoordinator {
320                coordinator_id: 1,
321                host: "localhost".to_string(),
322                port: 9092,
323            },
324            offsets: HashMap::new(),
325        };
326        let debug = format!("{:?}", group);
327        assert!(debug.contains("ConsumerGroup"));
328        assert!(debug.contains("test-group"));
329    }
330
331    #[test]
332    fn test_join_group_response_debug() {
333        let response = JoinGroupResponse {
334            generation_id: 1,
335            protocol_name: "consumer".to_string(),
336            leader: "member-1".to_string(),
337            member_id: "member-1".to_string(),
338            members: vec!["member-1".to_string()],
339        };
340        let debug = format!("{:?}", response);
341        assert!(debug.contains("JoinGroupResponse"));
342    }
343
344    #[test]
345    fn test_consumer_group_manager_debug() {
346        let manager = ConsumerGroupManager::new();
347        let debug = format!("{:?}", manager);
348        assert!(debug.contains("ConsumerGroupManager"));
349    }
350
351    #[tokio::test]
352    async fn test_join_group() {
353        let mut manager = ConsumerGroupManager::new();
354        let response = manager.join_group("group-1", "member-1", "client-1").await.unwrap();
355
356        assert_eq!(response.generation_id, 1);
357        assert_eq!(response.protocol_name, "consumer");
358        assert_eq!(response.member_id, "member-1");
359        assert!(response.members.contains(&"member-1".to_string()));
360    }
361
362    #[tokio::test]
363    async fn test_join_group_multiple_members() {
364        let mut manager = ConsumerGroupManager::new();
365
366        manager.join_group("group-1", "member-1", "client-1").await.unwrap();
367        let response2 = manager.join_group("group-1", "member-2", "client-2").await.unwrap();
368
369        assert_eq!(response2.members.len(), 2);
370    }
371
372    #[tokio::test]
373    async fn test_commit_offsets() {
374        let mut manager = ConsumerGroupManager::new();
375        manager.join_group("group-1", "member-1", "client-1").await.unwrap();
376
377        let mut offsets = HashMap::new();
378        offsets.insert(("topic-1".to_string(), 0), 100);
379        offsets.insert(("topic-1".to_string(), 1), 200);
380
381        manager.commit_offsets("group-1", offsets).await.unwrap();
382
383        let committed = manager.get_committed_offsets("group-1");
384        assert_eq!(committed.get(&("topic-1".to_string(), 0)), Some(&100));
385        assert_eq!(committed.get(&("topic-1".to_string(), 1)), Some(&200));
386    }
387
388    #[tokio::test]
389    async fn test_commit_offsets_nonexistent_group() {
390        let mut manager = ConsumerGroupManager::new();
391
392        let mut offsets = HashMap::new();
393        offsets.insert(("topic-1".to_string(), 0), 100);
394
395        let result = manager.commit_offsets("nonexistent", offsets).await;
396        assert!(result.is_err());
397    }
398
399    #[test]
400    fn test_get_committed_offsets_nonexistent_group() {
401        let manager = ConsumerGroupManager::new();
402        let offsets = manager.get_committed_offsets("nonexistent");
403        assert!(offsets.is_empty());
404    }
405
406    #[tokio::test]
407    async fn test_trigger_rebalance() {
408        let mut manager = ConsumerGroupManager::new();
409        manager.join_group("group-1", "member-1", "client-1").await.unwrap();
410
411        // Add assignment
412        if let Some(group) = manager.groups.get_mut("group-1") {
413            if let Some(member) = group.members.get_mut("member-1") {
414                member.assignment.push(PartitionAssignment {
415                    topic: "test".to_string(),
416                    partitions: vec![0, 1],
417                });
418            }
419        }
420
421        // Trigger rebalance
422        manager.trigger_rebalance("group-1").await;
423
424        // Assignments should be cleared
425        if let Some(group) = manager.groups.get("group-1") {
426            if let Some(member) = group.members.get("member-1") {
427                assert!(member.assignment.is_empty());
428            }
429        }
430    }
431}