use crate::subscriber::types::Subscriber;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub struct ConsumerGroup {
pub group_id: String,
pub members: Arc<Mutex<HashMap<String, GroupMember>>>,
pub assignments: Arc<Mutex<HashMap<String, Vec<usize>>>>,
}
pub struct GroupMember {
pub subscriber: Subscriber,
}
impl ConsumerGroup {
pub fn new(group_id: &str) -> Self {
ConsumerGroup {
group_id: group_id.to_string(),
members: Arc::new(Mutex::new(HashMap::new())),
assignments: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn add_member(&self, consumer_id: &str, subscriber: Subscriber) {
let mut members = self.members.lock().unwrap();
members.insert(consumer_id.to_string(), GroupMember { subscriber });
drop(members); self.rebalance_partitions();
}
fn rebalance_partitions(&self) {
let members = self.members.lock().unwrap();
let member_ids: Vec<_> = members.keys().cloned().collect();
drop(members);
let mut assignments = self.assignments.lock().unwrap();
assignments.clear();
if member_ids.is_empty() {
return;
}
let total_partitions = 10;
let num_members = member_ids.len();
for partition_id in 0..total_partitions {
let idx = partition_id % num_members;
let member_id = &member_ids[idx];
assignments
.entry(member_id.clone())
.or_default()
.push(partition_id);
}
}
}