1use std::collections::HashMap;
2
3#[derive(Debug)]
5pub struct ConsumerGroupManager {
6 groups: HashMap<String, ConsumerGroup>,
7}
8
9#[derive(Debug)]
10pub struct ConsumerGroup {
11 pub group_id: String,
12 pub members: HashMap<String, GroupMember>,
13 pub coordinator: GroupCoordinator,
14 pub offsets: HashMap<(String, i32), i64>, }
16
17#[derive(Debug)]
18pub struct GroupMember {
19 pub member_id: String,
20 pub client_id: String,
21 pub assignment: Vec<PartitionAssignment>,
22}
23
24#[derive(Debug, Clone)]
25pub struct PartitionAssignment {
26 pub topic: String,
27 pub partitions: Vec<i32>,
28}
29
30#[derive(Debug)]
31pub struct GroupCoordinator {
32 pub coordinator_id: i32,
33 pub host: String,
34 pub port: i32,
35}
36
37impl ConsumerGroupManager {
38 pub fn new() -> Self {
40 Self {
41 groups: HashMap::new(),
42 }
43 }
44}
45
46impl Default for ConsumerGroupManager {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl ConsumerGroupManager {
53 pub fn groups(&self) -> &HashMap<String, ConsumerGroup> {
55 &self.groups
56 }
57
58 pub async fn join_group(
60 &mut self,
61 group_id: &str,
62 member_id: &str,
63 client_id: &str,
64 ) -> Result<JoinGroupResponse> {
65 let group = self.groups.entry(group_id.to_string()).or_insert_with(|| ConsumerGroup {
66 group_id: group_id.to_string(),
67 members: HashMap::new(),
68 coordinator: GroupCoordinator {
69 coordinator_id: 1,
70 host: "localhost".to_string(),
71 port: 9092,
72 },
73 offsets: HashMap::new(),
74 });
75
76 group.members.insert(
77 member_id.to_string(),
78 GroupMember {
79 member_id: member_id.to_string(),
80 client_id: client_id.to_string(),
81 assignment: vec![],
82 },
83 );
84
85 Ok(JoinGroupResponse {
86 generation_id: 1,
87 protocol_name: "consumer".to_string(),
88 leader: member_id.to_string(),
89 member_id: member_id.to_string(),
90 members: group.members.keys().cloned().collect(),
91 })
92 }
93
94 pub async fn sync_group(
96 &mut self,
97 group_id: &str,
98 assignments: Vec<PartitionAssignment>,
99 topics: &std::collections::HashMap<String, crate::topics::Topic>,
100 ) -> Result<()> {
101 if let Some(group) = self.groups.get_mut(group_id) {
102 if !assignments.is_empty() {
104 for assignment in assignments {
106 for member in group.members.values_mut() {
108 member.assignment.push(assignment.clone());
109 }
110 }
111 } else {
112 Self::assign_partitions_round_robin(group, topics);
114 }
115 Ok(())
116 } else {
117 Err(anyhow::anyhow!("Group {} does not exist", group_id))
118 }
119 }
120
121 fn assign_partitions_round_robin(
123 group: &mut ConsumerGroup,
124 topics: &std::collections::HashMap<String, crate::topics::Topic>,
125 ) {
126 for member in group.members.values_mut() {
128 member.assignment.clear();
129 }
130
131 let mut member_ids: Vec<String> = group.members.keys().cloned().collect();
132 member_ids.sort(); let mut member_idx = 0;
135 for (topic_name, topic) in topics {
136 let num_partitions = topic.config.num_partitions as usize;
137 for partition_id in 0..num_partitions {
138 let member_id = &member_ids[member_idx % member_ids.len()];
139 if let Some(member) = group.members.get_mut(member_id.as_str()) {
140 let assignment = member.assignment.iter_mut().find(|a| a.topic == *topic_name);
142 if let Some(assignment) = assignment {
143 assignment.partitions.push(partition_id as i32);
144 } else {
145 member.assignment.push(PartitionAssignment {
146 topic: topic_name.clone(),
147 partitions: vec![partition_id as i32],
148 });
149 }
150 }
151 member_idx += 1;
152 }
153 }
154 }
155
156 pub async fn commit_offsets(
158 &mut self,
159 group_id: &str,
160 offsets: HashMap<(String, i32), i64>,
161 ) -> Result<()> {
162 if let Some(group) = self.groups.get_mut(group_id) {
163 group.offsets.extend(offsets);
164 Ok(())
165 } else {
166 Err(anyhow::anyhow!("Group {} does not exist", group_id))
167 }
168 }
169
170 pub fn get_committed_offsets(&self, group_id: &str) -> HashMap<(String, i32), i64> {
172 self.groups.get(group_id).map(|g| g.offsets.clone()).unwrap_or_default()
173 }
174
175 pub async fn simulate_lag(
177 &mut self,
178 group_id: &str,
179 topic: &str,
180 lag: i64,
181 topics: &std::collections::HashMap<String, crate::topics::Topic>,
182 ) {
183 if let Some(group) = self.groups.get_mut(group_id) {
184 let num_partitions =
186 topics.get(topic).map(|t| t.config.num_partitions).unwrap_or(1) as usize;
187 for partition in 0..num_partitions {
189 let key = (topic.to_string(), partition as i32);
190 let current_offset = group.offsets.get(&key).copied().unwrap_or(0);
191 group.offsets.insert(key, current_offset.saturating_sub(lag));
192 }
193 tracing::info!(
194 "Simulated lag of {} messages for group {} on topic {}",
195 lag,
196 group_id,
197 topic
198 );
199 }
200 }
201
202 pub async fn trigger_rebalance(&mut self, group_id: &str) {
204 if let Some(group) = self.groups.get_mut(group_id) {
205 for member in group.members.values_mut() {
207 member.assignment.clear();
208 }
209 tracing::info!("Triggered rebalance for group {}", group_id);
210 }
211 }
212
213 pub async fn reset_offsets(
215 &mut self,
216 group_id: &str,
217 topic: &str,
218 to: &str,
219 topics: &std::collections::HashMap<String, crate::topics::Topic>,
220 ) {
221 if let Some(group) = self.groups.get_mut(group_id) {
222 if let Some(topic_data) = topics.get(topic) {
223 let num_partitions = topic_data.config.num_partitions as usize;
224 for partition in 0..num_partitions {
225 let key = (topic.to_string(), partition as i32);
226 let target_offset = match to {
227 "earliest" => 0,
228 "latest" => topic_data.partitions[partition].high_watermark,
229 _ => return, };
231 group.offsets.insert(key, target_offset);
232 }
233 tracing::info!("Reset offsets for group {} on topic {} to {}", group_id, topic, to);
234 }
235 }
236 }
237}
238
239#[derive(Debug)]
241pub struct JoinGroupResponse {
242 pub generation_id: i32,
243 pub protocol_name: String,
244 pub leader: String,
245 pub member_id: String,
246 pub members: Vec<String>,
247}
248
249type Result<T> = std::result::Result<T, anyhow::Error>;