use std::collections::{BTreeMap, BTreeSet, HashMap};
use super::config::StreamsAssignorKind;
#[derive(Debug, Clone)]
pub struct AssignorMember {
pub member_id: String,
pub process_id: String,
pub rack_id: Option<String>,
pub current_active: BTreeMap<String, Vec<i32>>,
pub current_standby: BTreeMap<String, Vec<i32>>,
pub current_warmup: BTreeMap<String, Vec<i32>>,
pub task_lag: BTreeMap<(String, i32), i64>,
}
#[derive(Debug, Clone)]
pub struct AssignorInput {
pub tasks: BTreeMap<String, Vec<i32>>,
pub stateful: BTreeSet<String>,
pub num_standby_replicas: i32,
pub num_warmup_replicas: i32,
pub acceptable_recovery_lag: i64,
pub kind: StreamsAssignorKind,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct StreamsAssignment {
pub active: HashMap<String, BTreeMap<String, Vec<i32>>>,
pub standby: HashMap<String, BTreeMap<String, Vec<i32>>>,
pub warmup: HashMap<String, BTreeMap<String, Vec<i32>>>,
}
type Task = (String, i32);
#[must_use]
pub fn assign(members: &[AssignorMember], input: &AssignorInput) -> StreamsAssignment {
if members.is_empty() {
return StreamsAssignment::default();
}
let mut ordered: Vec<&AssignorMember> = members.iter().collect();
ordered.sort_by(|a, b| a.member_id.cmp(&b.member_id));
let kind = resolve_kind(input);
let highly_available = matches!(kind, StreamsAssignorKind::HighlyAvailable);
let tasks = flatten_tasks(&input.tasks);
let mut active = assign_active(&ordered, &tasks);
let mut warmup: HashMap<String, Vec<Task>> = HashMap::new();
if highly_available {
defer_warmups(&ordered, input, &tasks, &mut active, &mut warmup);
}
let mut standby: HashMap<String, Vec<Task>> = HashMap::new();
if highly_available {
assign_standby(&ordered, input, &tasks, &active, &warmup, &mut standby);
}
StreamsAssignment {
active: to_role_maps(&active),
standby: to_role_maps(&standby),
warmup: to_role_maps(&warmup),
}
}
fn resolve_kind(input: &AssignorInput) -> StreamsAssignorKind {
match input.kind {
StreamsAssignorKind::Auto => {
if input.stateful.is_empty() {
StreamsAssignorKind::Sticky
} else {
StreamsAssignorKind::HighlyAvailable
}
}
other => other,
}
}
fn flatten_tasks(tasks: &BTreeMap<String, Vec<i32>>) -> Vec<Task> {
let mut out: Vec<Task> = Vec::new();
for (sub, parts) in tasks {
for &p in parts {
out.push((sub.clone(), p));
}
}
out.sort();
out.dedup();
out
}
fn current_active_owner<'a>(members: &[&'a AssignorMember], task: &Task) -> Option<&'a str> {
members
.iter()
.find(|m| owns(&m.current_active, task))
.map(|m| m.member_id.as_str())
}
fn owns(role: &BTreeMap<String, Vec<i32>>, task: &Task) -> bool {
role.get(&task.0)
.is_some_and(|parts| parts.contains(&task.1))
}
fn assign_active(members: &[&AssignorMember], tasks: &[Task]) -> HashMap<String, Vec<Task>> {
let mut active: HashMap<String, Vec<Task>> = HashMap::new();
for m in members {
active.entry(m.member_id.clone()).or_default();
}
let mut orphans: Vec<Task> = Vec::new();
for task in tasks {
if let Some(owner) = current_active_owner(members, task) {
active
.get_mut(owner)
.expect("owner present")
.push(task.clone());
} else {
orphans.push(task.clone());
}
}
for task in orphans {
let target = least_loaded(members, &active);
active.get_mut(&target).expect("member present").push(task);
}
while let Some((max_id, min_id)) = load_extremes(members, &active) {
let max_load = active[&max_id].len();
let min_load = active[&min_id].len();
if max_load <= min_load + 1 {
break;
}
let moved = {
let from = active.get_mut(&max_id).expect("member present");
let idx = from
.iter()
.enumerate()
.max_by(|(_, a), (_, b)| a.cmp(b))
.map(|(i, _)| i)
.expect("most-loaded member is non-empty");
from.swap_remove(idx)
};
active.get_mut(&min_id).expect("member present").push(moved);
}
active
}
fn least_loaded(members: &[&AssignorMember], active: &HashMap<String, Vec<Task>>) -> String {
members
.iter()
.min_by(|a, b| {
let la = active[&a.member_id].len();
let lb = active[&b.member_id].len();
la.cmp(&lb).then_with(|| a.member_id.cmp(&b.member_id))
})
.map(|m| m.member_id.clone())
.expect("members non-empty")
}
fn load_extremes(
members: &[&AssignorMember],
active: &HashMap<String, Vec<Task>>,
) -> Option<(String, String)> {
let max = members
.iter()
.max_by(|a, b| {
let la = active[&a.member_id].len();
let lb = active[&b.member_id].len();
la.cmp(&lb).then_with(|| a.member_id.cmp(&b.member_id))
})
.map(|m| m.member_id.clone())?;
let min = least_loaded(members, active);
Some((max, min))
}
fn defer_warmups(
members: &[&AssignorMember],
input: &AssignorInput,
tasks: &[Task],
active: &mut HashMap<String, Vec<Task>>,
warmup: &mut HashMap<String, Vec<Task>>,
) {
let target_owner: HashMap<Task, String> = active
.iter()
.flat_map(|(member, ts)| ts.iter().map(move |t| (t.clone(), member.clone())))
.collect();
let mut warmups_created: i32 = 0;
for task in tasks {
if !input.stateful.contains(&task.0) {
continue; }
let Some(current) = current_active_owner(members, task) else {
continue; };
let Some(target) = target_owner.get(task) else {
continue; };
if target == current {
continue; }
let caught_up = members
.iter()
.find(|m| m.member_id == *target)
.and_then(|m| m.task_lag.get(&(task.0.clone(), task.1)))
.is_some_and(|&lag| lag <= input.acceptable_recovery_lag);
if caught_up {
continue; }
move_active(active, target, current, task);
if warmups_created < input.num_warmup_replicas {
warmup.entry(target.clone()).or_default().push(task.clone());
warmups_created += 1;
}
}
}
fn move_active(active: &mut HashMap<String, Vec<Task>>, from: &str, to: &str, task: &Task) {
if let Some(list) = active.get_mut(from)
&& let Some(idx) = list.iter().position(|t| t == task)
{
list.swap_remove(idx);
}
active.entry(to.to_owned()).or_default().push(task.clone());
}
fn assign_standby(
members: &[&AssignorMember],
input: &AssignorInput,
tasks: &[Task],
active: &HashMap<String, Vec<Task>>,
warmup: &HashMap<String, Vec<Task>>,
standby: &mut HashMap<String, Vec<Task>>,
) {
if input.num_standby_replicas <= 0 {
return;
}
let active_owner: HashMap<&Task, &str> = active
.iter()
.flat_map(|(member, ts)| ts.iter().map(move |t| (t, member.as_str())))
.collect();
let by_id: HashMap<&str, &AssignorMember> =
members.iter().map(|m| (m.member_id.as_str(), *m)).collect();
for task in tasks {
if !input.stateful.contains(&task.0) {
continue; }
let Some(&owner_id) = active_owner.get(task) else {
continue;
};
let Some(owner) = by_id.get(owner_id) else {
continue;
};
let active_rack = owner.rack_id.as_deref();
let mut used_processes: BTreeSet<&str> = BTreeSet::new();
used_processes.insert(owner.process_id.as_str());
for _ in 0..input.num_standby_replicas {
let chosen = members
.iter()
.filter(|m| !used_processes.contains(m.process_id.as_str()))
.filter(|m| !owns_role(warmup, &m.member_id, task))
.min_by(|a, b| {
standby_rank(a, active_rack, standby).cmp(&standby_rank(
b,
active_rack,
standby,
))
});
let Some(chosen) = chosen else {
break; };
used_processes.insert(chosen.process_id.as_str());
standby
.entry(chosen.member_id.clone())
.or_default()
.push(task.clone());
}
}
}
fn standby_rank(
m: &AssignorMember,
active_rack: Option<&str>,
standby: &HashMap<String, Vec<Task>>,
) -> (u8, usize, String) {
let rack_penalty = match (active_rack, m.rack_id.as_deref()) {
(Some(a), Some(b)) if a == b => 1,
_ => 0,
};
let load = standby.get(&m.member_id).map_or(0, Vec::len);
(rack_penalty, load, m.member_id.clone())
}
fn owns_role(role: &HashMap<String, Vec<Task>>, member: &str, task: &Task) -> bool {
role.get(member).is_some_and(|ts| ts.contains(task))
}
fn to_role_maps(
by_member: &HashMap<String, Vec<Task>>,
) -> HashMap<String, BTreeMap<String, Vec<i32>>> {
let mut out: HashMap<String, BTreeMap<String, Vec<i32>>> = HashMap::new();
for (member, ts) in by_member {
if ts.is_empty() {
continue;
}
let mut role: BTreeMap<String, Vec<i32>> = BTreeMap::new();
for (sub, part) in ts {
role.entry(sub.clone()).or_default().push(*part);
}
for parts in role.values_mut() {
parts.sort_unstable();
parts.dedup();
}
role.retain(|_, parts| !parts.is_empty());
if !role.is_empty() {
out.insert(member.clone(), role);
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn member(id: &str, process: &str) -> AssignorMember {
AssignorMember {
member_id: id.to_owned(),
process_id: process.to_owned(),
rack_id: None,
current_active: BTreeMap::new(),
current_standby: BTreeMap::new(),
current_warmup: BTreeMap::new(),
task_lag: BTreeMap::new(),
}
}
fn input(
tasks: &[(&str, &[i32])],
stateful: &[&str],
kind: StreamsAssignorKind,
) -> AssignorInput {
AssignorInput {
tasks: tasks
.iter()
.map(|(s, p)| ((*s).to_owned(), p.to_vec()))
.collect(),
stateful: stateful.iter().map(|s| (*s).to_owned()).collect(),
num_standby_replicas: 0,
num_warmup_replicas: 0,
acceptable_recovery_lag: 10_000,
kind,
}
}
fn count(role: &HashMap<String, BTreeMap<String, Vec<i32>>>) -> usize {
role.values().flat_map(BTreeMap::values).map(Vec::len).sum()
}
#[test]
fn empty_members_empty_assignment() {
let inp = input(&[("a", &[0, 1])], &[], StreamsAssignorKind::Sticky);
let out = assign(&[], &inp);
assert!(out.active.is_empty());
assert!(out.standby.is_empty());
assert!(out.warmup.is_empty());
}
#[test]
fn single_member_single_stateless_subtopology() {
let members = [member("A", "p1")];
let inp = input(&[("sub-0", &[0, 1, 2])], &[], StreamsAssignorKind::Sticky);
let out = assign(&members, &inp);
assert!(out.active.len() == 1);
assert!(out.active["A"]["sub-0"] == vec![0, 1, 2]);
assert!(out.standby.is_empty());
assert!(out.warmup.is_empty());
}
#[test]
fn two_members_four_stateless_tasks_balanced() {
let members = [member("A", "p1"), member("B", "p2")];
let inp = input(
&[("sub-0", &[0, 1, 2, 3])],
&[],
StreamsAssignorKind::Sticky,
);
let out = assign(&members, &inp);
assert!(count(&out.active) == 4);
assert!(out.active["A"]["sub-0"].len() == 2);
assert!(out.active["B"]["sub-0"].len() == 2);
assert!(out.active["A"]["sub-0"] == vec![0, 2]);
assert!(out.active["B"]["sub-0"] == vec![1, 3]);
let out2 = assign(&members, &inp);
assert!(out.active == out2.active);
}
#[test]
fn stickiness_keeps_owned_tasks() {
let mut a = member("A", "p1");
a.current_active = BTreeMap::from([("sub-0".to_owned(), vec![0, 1])]);
let b = member("B", "p2");
let members = [a, b];
let inp = input(
&[("sub-0", &[0, 1, 2, 3])],
&[],
StreamsAssignorKind::Sticky,
);
let out = assign(&members, &inp);
assert!(out.active["A"]["sub-0"] == vec![0, 1]);
assert!(out.active["B"]["sub-0"] == vec![2, 3]);
}
#[test]
fn stickiness_rebalances_when_skewed() {
let mut a = member("A", "p1");
a.current_active = BTreeMap::from([("sub-0".to_owned(), vec![0, 1, 2, 3])]);
let b = member("B", "p2");
let members = [a, b];
let inp = input(
&[("sub-0", &[0, 1, 2, 3])],
&[],
StreamsAssignorKind::Sticky,
);
let out = assign(&members, &inp);
assert!(out.active["A"]["sub-0"].len() == 2);
assert!(out.active["B"]["sub-0"].len() == 2);
}
#[test]
fn highly_available_standby_on_other_process() {
let members = [member("A", "p1"), member("B", "p2")];
let mut inp = input(
&[("sub-0", &[0, 1])],
&["sub-0"],
StreamsAssignorKind::HighlyAvailable,
);
inp.num_standby_replicas = 1;
let out = assign(&members, &inp);
assert!(count(&out.active) == 2);
assert!(count(&out.standby) == 2);
for (sub, parts) in [("sub-0", vec![0, 1])]
.iter()
.flat_map(|(s, ps)| ps.iter().map(move |p| ((*s).to_owned(), *p)))
{
let active_owner = out
.active
.iter()
.find(|(_, m)| m.get(&sub).is_some_and(|v| v.contains(&parts)))
.map(|(id, _)| id.clone())
.expect("active owner exists");
let standby_owner = out
.standby
.iter()
.find(|(_, m)| m.get(&sub).is_some_and(|v| v.contains(&parts)))
.map(|(id, _)| id.clone())
.expect("standby owner exists");
assert!(active_owner != standby_owner);
}
}
#[test]
fn highly_available_same_process_no_standby() {
let members = [member("A", "p1"), member("B", "p1")];
let mut inp = input(
&[("sub-0", &[0, 1])],
&["sub-0"],
StreamsAssignorKind::HighlyAvailable,
);
inp.num_standby_replicas = 1;
let out = assign(&members, &inp);
assert!(count(&out.active) == 2);
assert!(count(&out.standby) == 0);
}
#[test]
fn warmup_deferral_when_target_not_caught_up() {
let mut a = member("A", "p1");
a.current_active = BTreeMap::from([("sub-0".to_owned(), vec![0, 1])]);
let b = member("B", "p2");
let members = [a, b];
let mut inp = input(
&[("sub-0", &[0, 1])],
&["sub-0"],
StreamsAssignorKind::HighlyAvailable,
);
inp.num_warmup_replicas = 2;
inp.num_standby_replicas = 0;
let out = assign(&members, &inp);
assert!(count(&out.active) == 2);
assert!(out.active["A"]["sub-0"].len() == 2);
assert!(count(&out.warmup) == 1);
assert!(out.warmup.contains_key("B"));
}
#[test]
fn warmup_promotes_when_caught_up() {
let mut a = member("A", "p1");
a.current_active = BTreeMap::from([("sub-0".to_owned(), vec![0, 1])]);
let mut b = member("B", "p2");
b.task_lag = BTreeMap::from([(("sub-0".to_owned(), 1), 5_i64)]);
let members = [a, b];
let mut inp = input(
&[("sub-0", &[0, 1])],
&["sub-0"],
StreamsAssignorKind::HighlyAvailable,
);
inp.num_warmup_replicas = 2;
inp.acceptable_recovery_lag = 10;
let out = assign(&members, &inp);
assert!(out.active["A"]["sub-0"] == vec![0]);
assert!(out.active["B"]["sub-0"] == vec![1]);
assert!(out.warmup.is_empty());
}
#[test]
fn warmup_cap_respected() {
let mut a = member("A", "p1");
a.current_active = BTreeMap::from([("sub-0".to_owned(), vec![0, 1, 2, 3])]);
let b = member("B", "p2");
let members = [a, b];
let mut inp = input(
&[("sub-0", &[0, 1, 2, 3])],
&["sub-0"],
StreamsAssignorKind::HighlyAvailable,
);
inp.num_warmup_replicas = 1;
inp.num_standby_replicas = 0;
let out = assign(&members, &inp);
assert!(out.active["A"]["sub-0"].len() == 4);
assert!(count(&out.warmup) == 1);
}
#[test]
fn auto_resolves_to_sticky_when_stateless() {
let members = [member("A", "p1"), member("B", "p2")];
let mut inp = input(&[("sub-0", &[0, 1])], &[], StreamsAssignorKind::Auto);
inp.num_standby_replicas = 1;
inp.num_warmup_replicas = 2;
let out = assign(&members, &inp);
assert!(count(&out.active) == 2);
assert!(out.standby.is_empty());
assert!(out.warmup.is_empty());
}
#[test]
fn auto_resolves_to_highly_available_when_stateful() {
let members = [member("A", "p1"), member("B", "p2")];
let mut inp = input(&[("sub-0", &[0, 1])], &["sub-0"], StreamsAssignorKind::Auto);
inp.num_standby_replicas = 1;
let out = assign(&members, &inp);
assert!(count(&out.active) == 2);
assert!(count(&out.standby) == 2);
}
}