1use std::collections::HashMap;
2
3#[derive(Debug)]
5pub struct ConsumerGroupManager {
6 groups: HashMap<String, ConsumerGroup>,
7}
8
9#[derive(Debug)]
10pub struct ConsumerGroup {
11 pub group_id: String,
12 pub members: HashMap<String, GroupMember>,
13 pub coordinator: GroupCoordinator,
14 pub offsets: HashMap<(String, i32), i64>, }
16
17#[derive(Debug)]
18pub struct GroupMember {
19 pub member_id: String,
20 pub client_id: String,
21 pub assignment: Vec<PartitionAssignment>,
22}
23
24#[derive(Debug, Clone)]
25pub struct PartitionAssignment {
26 pub topic: String,
27 pub partitions: Vec<i32>,
28}
29
30#[derive(Debug)]
31pub struct GroupCoordinator {
32 pub coordinator_id: i32,
33 pub host: String,
34 pub port: i32,
35}
36
37impl ConsumerGroupManager {
38 pub fn new() -> Self {
40 Self {
41 groups: HashMap::new(),
42 }
43 }
44}
45
46impl Default for ConsumerGroupManager {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl ConsumerGroupManager {
53 pub fn groups(&self) -> &HashMap<String, ConsumerGroup> {
55 &self.groups
56 }
57
58 pub async fn join_group(
60 &mut self,
61 group_id: &str,
62 member_id: &str,
63 client_id: &str,
64 ) -> Result<JoinGroupResponse> {
65 let group = self.groups.entry(group_id.to_string()).or_insert_with(|| ConsumerGroup {
66 group_id: group_id.to_string(),
67 members: HashMap::new(),
68 coordinator: GroupCoordinator {
69 coordinator_id: 1,
70 host: "localhost".to_string(),
71 port: 9092,
72 },
73 offsets: HashMap::new(),
74 });
75
76 group.members.insert(
77 member_id.to_string(),
78 GroupMember {
79 member_id: member_id.to_string(),
80 client_id: client_id.to_string(),
81 assignment: vec![],
82 },
83 );
84
85 Ok(JoinGroupResponse {
86 generation_id: 1,
87 protocol_name: "consumer".to_string(),
88 leader: member_id.to_string(),
89 member_id: member_id.to_string(),
90 members: group.members.keys().cloned().collect(),
91 })
92 }
93
94 pub async fn sync_group(
96 &mut self,
97 group_id: &str,
98 assignments: Vec<PartitionAssignment>,
99 topics: &std::collections::HashMap<String, crate::topics::Topic>,
100 ) -> Result<()> {
101 if let Some(group) = self.groups.get_mut(group_id) {
102 if !assignments.is_empty() {
104 for assignment in assignments {
106 for member in group.members.values_mut() {
108 member.assignment.push(assignment.clone());
109 }
110 }
111 } else {
112 Self::assign_partitions_round_robin(group, topics);
114 }
115 Ok(())
116 } else {
117 Err(anyhow::anyhow!("Group {} does not exist", group_id))
118 }
119 }
120
121 fn assign_partitions_round_robin(
123 group: &mut ConsumerGroup,
124 topics: &std::collections::HashMap<String, crate::topics::Topic>,
125 ) {
126 for member in group.members.values_mut() {
128 member.assignment.clear();
129 }
130
131 let mut member_ids: Vec<String> = group.members.keys().cloned().collect();
132 member_ids.sort(); let mut member_idx = 0;
135 for (topic_name, topic) in topics {
136 let num_partitions = topic.config.num_partitions as usize;
137 for partition_id in 0..num_partitions {
138 let member_id = &member_ids[member_idx % member_ids.len()];
139 if let Some(member) = group.members.get_mut(member_id.as_str()) {
140 let assignment = member.assignment.iter_mut().find(|a| a.topic == *topic_name);
142 if let Some(assignment) = assignment {
143 assignment.partitions.push(partition_id as i32);
144 } else {
145 member.assignment.push(PartitionAssignment {
146 topic: topic_name.clone(),
147 partitions: vec![partition_id as i32],
148 });
149 }
150 }
151 member_idx += 1;
152 }
153 }
154 }
155
156 pub async fn commit_offsets(
158 &mut self,
159 group_id: &str,
160 offsets: HashMap<(String, i32), i64>,
161 ) -> Result<()> {
162 if let Some(group) = self.groups.get_mut(group_id) {
163 group.offsets.extend(offsets);
164 Ok(())
165 } else {
166 Err(anyhow::anyhow!("Group {} does not exist", group_id))
167 }
168 }
169
170 pub fn get_committed_offsets(&self, group_id: &str) -> HashMap<(String, i32), i64> {
172 self.groups.get(group_id).map(|g| g.offsets.clone()).unwrap_or_default()
173 }
174
175 pub async fn simulate_lag(
177 &mut self,
178 group_id: &str,
179 topic: &str,
180 lag: i64,
181 topics: &std::collections::HashMap<String, crate::topics::Topic>,
182 ) {
183 if let Some(group) = self.groups.get_mut(group_id) {
184 let num_partitions =
186 topics.get(topic).map(|t| t.config.num_partitions).unwrap_or(1) as usize;
187 for partition in 0..num_partitions {
189 let key = (topic.to_string(), partition as i32);
190 let current_offset = group.offsets.get(&key).copied().unwrap_or(0);
191 group.offsets.insert(key, current_offset.saturating_sub(lag));
192 }
193 tracing::info!(
194 "Simulated lag of {} messages for group {} on topic {}",
195 lag,
196 group_id,
197 topic
198 );
199 }
200 }
201
202 pub async fn trigger_rebalance(&mut self, group_id: &str) {
204 if let Some(group) = self.groups.get_mut(group_id) {
205 for member in group.members.values_mut() {
207 member.assignment.clear();
208 }
209 tracing::info!("Triggered rebalance for group {}", group_id);
210 }
211 }
212
213 pub async fn reset_offsets(
215 &mut self,
216 group_id: &str,
217 topic: &str,
218 to: &str,
219 topics: &std::collections::HashMap<String, crate::topics::Topic>,
220 ) {
221 if let Some(group) = self.groups.get_mut(group_id) {
222 if let Some(topic_data) = topics.get(topic) {
223 let num_partitions = topic_data.config.num_partitions as usize;
224 for partition in 0..num_partitions {
225 let key = (topic.to_string(), partition as i32);
226 let target_offset = match to {
227 "earliest" => 0,
228 "latest" => topic_data.partitions[partition].high_watermark,
229 _ => return, };
231 group.offsets.insert(key, target_offset);
232 }
233 tracing::info!("Reset offsets for group {} on topic {} to {}", group_id, topic, to);
234 }
235 }
236 }
237}
238
239#[derive(Debug)]
241pub struct JoinGroupResponse {
242 pub generation_id: i32,
243 pub protocol_name: String,
244 pub leader: String,
245 pub member_id: String,
246 pub members: Vec<String>,
247}
248
249type Result<T> = std::result::Result<T, anyhow::Error>;
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn test_consumer_group_manager_new() {
257 let manager = ConsumerGroupManager::new();
258 assert!(manager.groups().is_empty());
259 }
260
261 #[test]
262 fn test_consumer_group_manager_default() {
263 let manager = ConsumerGroupManager::default();
264 assert!(manager.groups().is_empty());
265 }
266
267 #[test]
268 fn test_partition_assignment_clone() {
269 let assignment = PartitionAssignment {
270 topic: "test-topic".to_string(),
271 partitions: vec![0, 1, 2],
272 };
273
274 let cloned = assignment.clone();
275 assert_eq!(assignment.topic, cloned.topic);
276 assert_eq!(assignment.partitions, cloned.partitions);
277 }
278
279 #[test]
280 fn test_partition_assignment_debug() {
281 let assignment = PartitionAssignment {
282 topic: "test".to_string(),
283 partitions: vec![0],
284 };
285 let debug = format!("{:?}", assignment);
286 assert!(debug.contains("PartitionAssignment"));
287 assert!(debug.contains("test"));
288 }
289
290 #[test]
291 fn test_group_member_debug() {
292 let member = GroupMember {
293 member_id: "member-1".to_string(),
294 client_id: "client-1".to_string(),
295 assignment: vec![],
296 };
297 let debug = format!("{:?}", member);
298 assert!(debug.contains("GroupMember"));
299 assert!(debug.contains("member-1"));
300 }
301
302 #[test]
303 fn test_group_coordinator_debug() {
304 let coordinator = GroupCoordinator {
305 coordinator_id: 1,
306 host: "localhost".to_string(),
307 port: 9092,
308 };
309 let debug = format!("{:?}", coordinator);
310 assert!(debug.contains("GroupCoordinator"));
311 assert!(debug.contains("localhost"));
312 }
313
314 #[test]
315 fn test_consumer_group_debug() {
316 let group = ConsumerGroup {
317 group_id: "test-group".to_string(),
318 members: HashMap::new(),
319 coordinator: GroupCoordinator {
320 coordinator_id: 1,
321 host: "localhost".to_string(),
322 port: 9092,
323 },
324 offsets: HashMap::new(),
325 };
326 let debug = format!("{:?}", group);
327 assert!(debug.contains("ConsumerGroup"));
328 assert!(debug.contains("test-group"));
329 }
330
331 #[test]
332 fn test_join_group_response_debug() {
333 let response = JoinGroupResponse {
334 generation_id: 1,
335 protocol_name: "consumer".to_string(),
336 leader: "member-1".to_string(),
337 member_id: "member-1".to_string(),
338 members: vec!["member-1".to_string()],
339 };
340 let debug = format!("{:?}", response);
341 assert!(debug.contains("JoinGroupResponse"));
342 }
343
344 #[test]
345 fn test_consumer_group_manager_debug() {
346 let manager = ConsumerGroupManager::new();
347 let debug = format!("{:?}", manager);
348 assert!(debug.contains("ConsumerGroupManager"));
349 }
350
351 #[tokio::test]
352 async fn test_join_group() {
353 let mut manager = ConsumerGroupManager::new();
354 let response = manager.join_group("group-1", "member-1", "client-1").await.unwrap();
355
356 assert_eq!(response.generation_id, 1);
357 assert_eq!(response.protocol_name, "consumer");
358 assert_eq!(response.member_id, "member-1");
359 assert!(response.members.contains(&"member-1".to_string()));
360 }
361
362 #[tokio::test]
363 async fn test_join_group_multiple_members() {
364 let mut manager = ConsumerGroupManager::new();
365
366 manager.join_group("group-1", "member-1", "client-1").await.unwrap();
367 let response2 = manager.join_group("group-1", "member-2", "client-2").await.unwrap();
368
369 assert_eq!(response2.members.len(), 2);
370 }
371
372 #[tokio::test]
373 async fn test_commit_offsets() {
374 let mut manager = ConsumerGroupManager::new();
375 manager.join_group("group-1", "member-1", "client-1").await.unwrap();
376
377 let mut offsets = HashMap::new();
378 offsets.insert(("topic-1".to_string(), 0), 100);
379 offsets.insert(("topic-1".to_string(), 1), 200);
380
381 manager.commit_offsets("group-1", offsets).await.unwrap();
382
383 let committed = manager.get_committed_offsets("group-1");
384 assert_eq!(committed.get(&("topic-1".to_string(), 0)), Some(&100));
385 assert_eq!(committed.get(&("topic-1".to_string(), 1)), Some(&200));
386 }
387
388 #[tokio::test]
389 async fn test_commit_offsets_nonexistent_group() {
390 let mut manager = ConsumerGroupManager::new();
391
392 let mut offsets = HashMap::new();
393 offsets.insert(("topic-1".to_string(), 0), 100);
394
395 let result = manager.commit_offsets("nonexistent", offsets).await;
396 assert!(result.is_err());
397 }
398
399 #[test]
400 fn test_get_committed_offsets_nonexistent_group() {
401 let manager = ConsumerGroupManager::new();
402 let offsets = manager.get_committed_offsets("nonexistent");
403 assert!(offsets.is_empty());
404 }
405
406 #[tokio::test]
407 async fn test_trigger_rebalance() {
408 let mut manager = ConsumerGroupManager::new();
409 manager.join_group("group-1", "member-1", "client-1").await.unwrap();
410
411 if let Some(group) = manager.groups.get_mut("group-1") {
413 if let Some(member) = group.members.get_mut("member-1") {
414 member.assignment.push(PartitionAssignment {
415 topic: "test".to_string(),
416 partitions: vec![0, 1],
417 });
418 }
419 }
420
421 manager.trigger_rebalance("group-1").await;
423
424 if let Some(group) = manager.groups.get("group-1") {
426 if let Some(member) = group.members.get("member-1") {
427 assert!(member.assignment.is_empty());
428 }
429 }
430 }
431}