use std::collections::HashMap;
use crate::coordinator::unified::assignor::{Assignment, MemberSubscription, TopicMetadata};
#[derive(Debug, Default, Clone, Copy)]
pub struct ShareGroupAssignor;
impl ShareGroupAssignor {
#[must_use]
pub fn name(&self) -> &'static str {
"simple"
}
#[must_use]
pub fn assign(&self, members: &[MemberSubscription], topics: &TopicMetadata) -> Assignment {
let mut out: Assignment = members
.iter()
.map(|m| (m.member_id.clone(), HashMap::new()))
.collect();
if members.is_empty() {
return out;
}
for (topic_id, &num_parts) in &topics.partitions_per_topic {
let mut subs: Vec<&str> = members
.iter()
.filter(|m| m.subscribed_topic_ids.contains(topic_id))
.map(|m| m.member_id.as_str())
.collect();
subs.sort_unstable();
if subs.is_empty() || num_parts <= 0 {
continue;
}
let sub_count = i32::try_from(subs.len()).expect("member count fits i32");
if num_parts >= sub_count {
for p in 0..num_parts {
let idx = usize::try_from(p % sub_count).expect("non-negative");
let m = subs[idx];
out.get_mut(m)
.unwrap()
.entry(*topic_id)
.or_default()
.push(p);
}
} else {
for (i, m) in subs.iter().enumerate() {
let p = i32::try_from(i).expect("index fits i32") % num_parts;
out.get_mut(*m)
.unwrap()
.entry(*topic_id)
.or_default()
.push(p);
}
}
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coordinator::unified::assignor::{MemberSubscription, TopicMetadata};
use assert2::assert;
use crabka_protocol::primitives::uuid::Uuid;
fn topic(p: i32) -> (Uuid, TopicMetadata) {
let id = Uuid([1; 16]);
let mut t = TopicMetadata::default();
t.partitions_per_topic.insert(id, p);
(id, t)
}
#[test]
fn distributes_partitions_across_members() {
let (id, topics) = topic(4);
let members = vec![
MemberSubscription {
member_id: "m1".into(),
rack_id: None,
subscribed_topic_ids: vec![id],
},
MemberSubscription {
member_id: "m2".into(),
rack_id: None,
subscribed_topic_ids: vec![id],
},
];
let a = ShareGroupAssignor.assign(&members, &topics);
let total: usize = a.values().flat_map(|m| m.values()).map(Vec::len).sum();
assert!(total == 4); assert!(a["m1"][&id].len() == 2 && a["m2"][&id].len() == 2);
}
#[test]
fn more_members_than_partitions_overlap() {
let (id, topics) = topic(1);
let members: Vec<_> = (0..3)
.map(|i| MemberSubscription {
member_id: format!("m{i}"),
rack_id: None,
subscribed_topic_ids: vec![id],
})
.collect();
let a = ShareGroupAssignor.assign(&members, &topics);
for i in 0..3 {
assert!(a[&format!("m{i}")][&id] == vec![0]);
}
}
}