use std::collections::{BTreeMap, HashMap};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StreamsMemberAssignmentState {
#[default]
Stable = 0,
UnrevokedActiveTasks = 1,
UnreleasedActiveTasks = 2,
}
impl StreamsMemberAssignmentState {
#[must_use]
pub fn as_i8(self) -> i8 {
self as i8
}
#[must_use]
pub fn from_i8(v: i8) -> Option<Self> {
match v {
0 => Some(Self::Stable),
1 => Some(Self::UnrevokedActiveTasks),
2 => Some(Self::UnreleasedActiveTasks),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamsMemberState {
pub member_id: String,
pub instance_id: Option<String>,
pub rack_id: Option<String>,
pub client_id: String,
pub client_host: String,
pub process_id: String,
pub user_endpoint: Option<(String, u32)>,
pub client_tags: Vec<(String, String)>,
pub rebalance_timeout_ms: i32,
pub member_epoch: i32,
pub previous_member_epoch: i32,
pub topology_epoch: i32,
pub assignment_state: StreamsMemberAssignmentState,
pub active: BTreeMap<String, Vec<i32>>,
pub standby: BTreeMap<String, Vec<i32>>,
pub warmup: BTreeMap<String, Vec<i32>>,
pub active_pending_revocation: BTreeMap<String, Vec<i32>>,
pub task_offsets: BTreeMap<(String, i32), i64>,
pub task_end_offsets: BTreeMap<(String, i32), i64>,
pub last_seen: Instant,
}
impl StreamsMemberState {
pub fn joining(
member_id: impl Into<String>,
client_id: impl Into<String>,
client_host: impl Into<String>,
) -> Self {
Self {
member_id: member_id.into(),
instance_id: None,
rack_id: None,
client_id: client_id.into(),
client_host: client_host.into(),
process_id: uuid::Uuid::new_v4().to_string(),
user_endpoint: None,
client_tags: Vec::new(),
rebalance_timeout_ms: 0,
member_epoch: 0,
previous_member_epoch: 0,
topology_epoch: 0,
assignment_state: StreamsMemberAssignmentState::Stable,
active: BTreeMap::new(),
standby: BTreeMap::new(),
warmup: BTreeMap::new(),
active_pending_revocation: BTreeMap::new(),
task_offsets: BTreeMap::new(),
task_end_offsets: BTreeMap::new(),
last_seen: Instant::now(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamsTargetAssignment {
pub epoch: i32,
pub active: HashMap<String, BTreeMap<String, Vec<i32>>>,
pub standby: HashMap<String, BTreeMap<String, Vec<i32>>>,
pub warmup: HashMap<String, BTreeMap<String, Vec<i32>>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StreamsGroupStatePhase {
#[default]
Empty,
NotReady,
Assigning,
Reconciling,
Stable,
}
impl StreamsGroupStatePhase {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Empty => "Empty",
Self::NotReady => "NotReady",
Self::Assigning => "Assigning",
Self::Reconciling => "Reconciling",
Self::Stable => "Stable",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StoredTopologyHandle {
pub epoch: i32,
}
#[derive(Debug)]
pub struct StreamsGroupState {
pub group_id: String,
pub group_epoch: i32,
pub assignment_epoch: i32,
pub members: HashMap<String, StreamsMemberState>,
pub topology_epoch: i32,
pub topology: Option<StoredTopologyHandle>,
pub target: StreamsTargetAssignment,
pub dirty: bool,
pub phase: StreamsGroupStatePhase,
pub status: Vec<(i8, String)>,
}
impl StreamsGroupState {
pub fn new(group_id: impl Into<String>) -> Self {
Self {
group_id: group_id.into(),
group_epoch: 0,
assignment_epoch: 0,
members: HashMap::new(),
topology_epoch: 0,
topology: None,
target: StreamsTargetAssignment::default(),
dirty: false,
phase: StreamsGroupStatePhase::Empty,
status: Vec::new(),
}
}
pub fn bump_epoch(&mut self) {
self.group_epoch += 1;
self.dirty = true;
}
pub fn add_or_update_member(&mut self, m: StreamsMemberState) {
let changed = match self.members.get(&m.member_id) {
None => true,
Some(prev) => prev.topology_epoch != m.topology_epoch,
};
self.members.insert(m.member_id.clone(), m);
if changed {
self.dirty = true;
}
}
pub fn remove_member(&mut self, member_id: &str) -> Option<StreamsMemberState> {
let m = self.members.remove(member_id);
if m.is_some() {
self.dirty = true;
}
m
}
pub fn evict_expired(&mut self, now: Instant, session_timeout: Duration) -> Vec<String> {
let evicted: Vec<String> = self
.members
.iter()
.filter(|(_, m)| now.duration_since(m.last_seen) > session_timeout)
.map(|(id, _)| id.clone())
.collect();
for id in &evicted {
self.members.remove(id);
}
if !evicted.is_empty() {
self.dirty = true;
}
evicted
}
pub fn install_target(&mut self, target: StreamsTargetAssignment) {
self.assignment_epoch = self.group_epoch;
self.target = target;
self.target.epoch = self.assignment_epoch;
for (mid, member) in &mut self.members {
let target_active = self.target.active.get(mid).cloned().unwrap_or_default();
let (keep, revoke) = compute_active_revoke_split(&member.active, &target_active);
member.active = keep;
member.active_pending_revocation = revoke;
member.assignment_state = if member.active_pending_revocation.is_empty() {
StreamsMemberAssignmentState::Stable
} else {
StreamsMemberAssignmentState::UnrevokedActiveTasks
};
}
}
pub fn advance_member_epoch(&mut self, member_id: &str) {
let active = self
.target
.active
.get(member_id)
.cloned()
.unwrap_or_default();
let standby = self
.target
.standby
.get(member_id)
.cloned()
.unwrap_or_default();
let warmup = self
.target
.warmup
.get(member_id)
.cloned()
.unwrap_or_default();
let epoch = self.assignment_epoch;
if let Some(m) = self.members.get_mut(member_id) {
m.previous_member_epoch = m.member_epoch;
m.member_epoch = epoch;
m.active = normalize_task_map(active);
m.standby = normalize_task_map(standby);
m.warmup = normalize_task_map(warmup);
m.active_pending_revocation.clear();
m.assignment_state = StreamsMemberAssignmentState::Stable;
}
}
}
fn normalize_task_map(mut map: BTreeMap<String, Vec<i32>>) -> BTreeMap<String, Vec<i32>> {
map.retain(|_, parts| {
parts.sort_unstable();
parts.dedup();
!parts.is_empty()
});
map
}
#[allow(dead_code)] fn merge_task_maps(dst: &mut BTreeMap<String, Vec<i32>>, src: &BTreeMap<String, Vec<i32>>) {
for (sub, parts) in src {
dst.entry(sub.clone()).or_default().extend_from_slice(parts);
}
for parts in dst.values_mut() {
parts.sort_unstable();
parts.dedup();
}
dst.retain(|_, parts| !parts.is_empty());
}
fn compute_active_revoke_split(
current: &BTreeMap<String, Vec<i32>>,
target: &BTreeMap<String, Vec<i32>>,
) -> (BTreeMap<String, Vec<i32>>, BTreeMap<String, Vec<i32>>) {
let mut revoke: BTreeMap<String, Vec<i32>> = BTreeMap::new();
let mut keep: BTreeMap<String, Vec<i32>> = BTreeMap::new();
for (sub, parts) in current {
let target_set: std::collections::HashSet<i32> =
target.get(sub).into_iter().flatten().copied().collect();
for &p in parts {
if target_set.contains(&p) {
keep.entry(sub.clone()).or_default().push(p);
} else {
revoke.entry(sub.clone()).or_default().push(p);
}
}
}
(normalize_task_map(keep), normalize_task_map(revoke))
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn task_map(entries: &[(&str, &[i32])]) -> BTreeMap<String, Vec<i32>> {
entries
.iter()
.map(|(sub, parts)| (sub.to_string(), parts.to_vec()))
.collect()
}
#[test]
fn add_member_marks_dirty_first_time() {
let mut g = StreamsGroupState::new("g");
assert!(!g.dirty);
g.add_or_update_member(StreamsMemberState::joining("m1", "c1", "h1"));
assert!(g.members.len() == 1);
assert!(g.dirty);
}
#[test]
fn re_add_identical_member_keeps_clean() {
let mut g = StreamsGroupState::new("g");
g.add_or_update_member(StreamsMemberState::joining("m1", "c1", "h1"));
g.dirty = false;
let mut m = StreamsMemberState::joining("m1", "c1", "h1");
m.topology_epoch = 0;
g.add_or_update_member(m);
assert!(!g.dirty);
}
#[test]
fn topology_epoch_change_marks_dirty() {
let mut g = StreamsGroupState::new("g");
g.add_or_update_member(StreamsMemberState::joining("m1", "c1", "h1"));
g.dirty = false;
let mut m = StreamsMemberState::joining("m1", "c1", "h1");
m.topology_epoch = 3;
g.add_or_update_member(m);
assert!(g.dirty);
}
#[test]
fn remove_member_marks_dirty() {
let mut g = StreamsGroupState::new("g");
g.add_or_update_member(StreamsMemberState::joining("m1", "c1", "h1"));
g.dirty = false;
let removed = g.remove_member("m1");
assert!(removed.is_some());
assert!(g.dirty);
g.dirty = false;
assert!(g.remove_member("m1").is_none());
assert!(!g.dirty);
}
#[test]
fn bump_epoch_increments_and_dirties() {
let mut g = StreamsGroupState::new("g");
g.dirty = false;
g.bump_epoch();
assert!(g.group_epoch == 1);
assert!(g.dirty);
}
#[test]
fn evict_expired_removes_and_returns_ids() {
let mut g = StreamsGroupState::new("g");
let mut m = StreamsMemberState::joining("m1", "c1", "h1");
m.last_seen = Instant::now();
g.add_or_update_member(m);
g.add_or_update_member(StreamsMemberState::joining("m2", "c1", "h1"));
g.dirty = false;
let recent = Instant::now() + Duration::from_millis(50);
let kept = g.evict_expired(recent, Duration::from_secs(45));
assert!(kept.is_empty());
assert!(g.members.len() == 2);
assert!(!g.dirty);
let later = Instant::now() + Duration::from_millis(50);
let mut evicted = g.evict_expired(later, Duration::from_millis(1));
evicted.sort();
assert!(evicted == vec!["m1".to_string(), "m2".to_string()]);
assert!(g.members.is_empty());
assert!(g.dirty);
}
#[test]
fn install_target_moves_vanished_active_to_pending_and_keeps_kept() {
let mut g = StreamsGroupState::new("g");
let mut m = StreamsMemberState::joining("m1", "c1", "h1");
m.active = task_map(&[("sub0", &[0, 1, 2])]);
g.add_or_update_member(m);
g.group_epoch = 7;
let mut target = StreamsTargetAssignment::default();
target
.active
.insert("m1".to_string(), task_map(&[("sub0", &[0, 1])]));
g.install_target(target);
let m = &g.members["m1"];
assert!(g.assignment_epoch == 7);
assert!(g.target.epoch == 7);
assert!(m.active == task_map(&[("sub0", &[0, 1])]));
assert!(m.active_pending_revocation == task_map(&[("sub0", &[2])]));
assert!(m.assignment_state == StreamsMemberAssignmentState::UnrevokedActiveTasks);
}
#[test]
fn install_target_with_no_revocation_stays_stable() {
let mut g = StreamsGroupState::new("g");
let mut m = StreamsMemberState::joining("m1", "c1", "h1");
m.active = task_map(&[("sub0", &[0, 1])]);
g.add_or_update_member(m);
g.group_epoch = 2;
let mut target = StreamsTargetAssignment::default();
target
.active
.insert("m1".to_string(), task_map(&[("sub0", &[0, 1, 2])]));
g.install_target(target);
let m = &g.members["m1"];
assert!(m.active == task_map(&[("sub0", &[0, 1])]));
assert!(m.active_pending_revocation.is_empty());
assert!(m.assignment_state == StreamsMemberAssignmentState::Stable);
}
#[test]
fn advance_member_epoch_installs_target_clears_pending_sets_stable() {
let mut g = StreamsGroupState::new("g");
let mut m = StreamsMemberState::joining("m1", "c1", "h1");
m.active = task_map(&[("sub0", &[0, 1, 2])]);
g.add_or_update_member(m);
g.group_epoch = 9;
let mut target = StreamsTargetAssignment::default();
target
.active
.insert("m1".to_string(), task_map(&[("sub0", &[0, 1])]));
target
.standby
.insert("m1".to_string(), task_map(&[("sub1", &[3])]));
target
.warmup
.insert("m1".to_string(), task_map(&[("sub2", &[4, 5])]));
g.install_target(target);
assert!(
g.members["m1"].assignment_state == StreamsMemberAssignmentState::UnrevokedActiveTasks
);
g.advance_member_epoch("m1");
let m = &g.members["m1"];
assert!(m.member_epoch == 9);
assert!(m.previous_member_epoch == 0);
assert!(m.active == task_map(&[("sub0", &[0, 1])]));
assert!(m.standby == task_map(&[("sub1", &[3])]));
assert!(m.warmup == task_map(&[("sub2", &[4, 5])]));
assert!(m.active_pending_revocation.is_empty());
assert!(m.assignment_state == StreamsMemberAssignmentState::Stable);
}
#[test]
fn group_state_phase_as_str_strings() {
assert!(StreamsGroupStatePhase::Empty.as_str() == "Empty");
assert!(StreamsGroupStatePhase::NotReady.as_str() == "NotReady");
assert!(StreamsGroupStatePhase::Assigning.as_str() == "Assigning");
assert!(StreamsGroupStatePhase::Reconciling.as_str() == "Reconciling");
assert!(StreamsGroupStatePhase::Stable.as_str() == "Stable");
assert!(StreamsGroupStatePhase::default() == StreamsGroupStatePhase::Empty);
}
#[test]
fn assignment_state_i8_roundtrips() {
for s in [
StreamsMemberAssignmentState::Stable,
StreamsMemberAssignmentState::UnrevokedActiveTasks,
StreamsMemberAssignmentState::UnreleasedActiveTasks,
] {
assert!(StreamsMemberAssignmentState::from_i8(s.as_i8()) == Some(s));
}
assert!(StreamsMemberAssignmentState::from_i8(99).is_none());
assert!(StreamsMemberAssignmentState::default() == StreamsMemberAssignmentState::Stable);
}
#[test]
fn normalize_sorts_dedups_and_drops_empty() {
let m = normalize_task_map(task_map(&[("sub0", &[2, 0, 1, 1]), ("sub1", &[])]));
assert!(m == task_map(&[("sub0", &[0, 1, 2])]));
}
#[test]
fn merge_task_maps_unions_and_normalizes() {
let mut dst = task_map(&[("sub0", &[0, 2])]);
merge_task_maps(&mut dst, &task_map(&[("sub0", &[1, 2]), ("sub1", &[3])]));
assert!(dst == task_map(&[("sub0", &[0, 1, 2]), ("sub1", &[3])]));
}
}