use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
use bytes::Bytes;
use regex::Regex;
use crabka_protocol::primitives::uuid::Uuid;
use super::persistence_next_gen::MemberAssignmentState;
#[derive(Debug, Clone)]
pub struct ClassicMemberFacade {
pub generation_id: i32,
pub supported_protocols: Vec<(String, Bytes)>,
pub session_timeout: Duration,
pub last_synced_assignment: Bytes,
pub awaiting_sync: bool,
}
#[derive(Debug, Clone)]
pub struct MemberState {
pub member_id: String,
pub instance_id: Option<String>,
pub rack_id: Option<String>,
pub client_id: String,
pub client_host: String,
pub subscribed_topic_names: HashSet<String>,
pub subscribed_topic_regex: Option<String>,
#[allow(clippy::option_option)]
pub compiled_regex: Option<Option<Regex>>,
pub server_assignor: Option<String>,
pub rebalance_timeout: Duration,
pub member_epoch: i32,
pub previous_member_epoch: i32,
pub assignment_state: MemberAssignmentState,
pub assigned_partitions: HashMap<Uuid, Vec<i32>>,
pub partitions_pending_revocation: HashMap<Uuid, Vec<i32>>,
pub last_seen: Instant,
pub classic: Option<ClassicMemberFacade>,
}
impl MemberState {
#[must_use]
pub fn is_classic(&self) -> bool {
self.classic.is_some()
}
pub fn set_regex(&mut self, pattern: Option<String>) {
self.compiled_regex = pattern.as_deref().map(|pat| match Regex::new(pat) {
Ok(re) => Some(re),
Err(e) => {
tracing::warn!(
pattern = %pat, error = %e,
"consumer-group: subscribed_topic_regex failed to compile; ignored"
);
None
}
});
self.subscribed_topic_regex = pattern;
}
pub fn sync_regex_cache(&mut self) {
let pattern = self.subscribed_topic_regex.take();
self.set_regex(pattern);
}
#[must_use]
pub fn compiled_regex(&self) -> Option<&Regex> {
self.compiled_regex.as_ref().and_then(Option::as_ref)
}
}
#[derive(Debug, Clone, Default)]
pub struct TargetAssignment {
pub epoch: i32,
pub per_member: HashMap<String, HashMap<Uuid, Vec<i32>>>,
}
#[derive(Debug)]
pub struct GroupState {
pub group_id: String,
pub group_epoch: i32,
pub members: HashMap<String, MemberState>,
pub instance_to_member: HashMap<String, String>,
pub target: TargetAssignment,
pub dirty: bool,
}
impl GroupState {
pub fn new(group_id: impl Into<String>) -> Self {
Self {
group_id: group_id.into(),
group_epoch: 0,
members: HashMap::new(),
instance_to_member: HashMap::new(),
target: TargetAssignment::default(),
dirty: false,
}
}
pub fn bump_epoch(&mut self) {
self.group_epoch += 1;
self.dirty = true;
}
pub fn add_or_update_member(&mut self, mut m: MemberState) {
m.sync_regex_cache();
if let Some(iid) = m.instance_id.clone() {
self.instance_to_member.insert(iid, m.member_id.clone());
}
let cached: Option<(HashSet<String>, Option<String>)> =
self.members.get(&m.member_id).map(|prev| {
(
prev.subscribed_topic_names.clone(),
prev.subscribed_topic_regex.clone(),
)
});
let subscription_changed = cached.as_ref().is_none_or(|(names, regex)| {
names != &m.subscribed_topic_names || regex != &m.subscribed_topic_regex
});
self.members.insert(m.member_id.clone(), m);
if subscription_changed {
self.dirty = true;
}
}
pub fn remove_member(&mut self, member_id: &str) -> Option<MemberState> {
let m = self.members.remove(member_id)?;
if let Some(ref iid) = m.instance_id
&& self.instance_to_member.get(iid).map(String::as_str) == Some(member_id)
{
self.instance_to_member.remove(iid);
}
self.dirty = true;
Some(m)
}
pub fn evict_expired(&mut self, now: Instant, session_timeout: Duration) -> Vec<String> {
let evicted: Vec<String> = self
.members
.iter()
.filter(|(_, m)| now.duration_since(m.last_seen) > session_timeout)
.map(|(id, _)| id.clone())
.collect();
for id in &evicted {
self.remove_member(id);
}
evicted
}
pub fn install_target(&mut self, per_member: HashMap<String, HashMap<Uuid, Vec<i32>>>) {
self.target.epoch = self.group_epoch;
self.target.per_member = per_member;
for (mid, member) in &mut self.members {
let target = self.target.per_member.get(mid).cloned().unwrap_or_default();
let (revoke, assigned) = compute_revoke_split(&member.assigned_partitions, &target);
member.partitions_pending_revocation = revoke;
member.assigned_partitions = assigned;
member.assignment_state = if member.partitions_pending_revocation.is_empty() {
MemberAssignmentState::Stable
} else {
MemberAssignmentState::UnrevokedPartitions
};
}
}
pub fn advance_member_epoch(&mut self, member_id: &str) {
if let Some(m) = self.members.get_mut(member_id) {
m.previous_member_epoch = m.member_epoch;
m.member_epoch = self.group_epoch;
}
}
pub fn current_member_for_instance(&self, instance_id: &str) -> Option<&str> {
self.instance_to_member.get(instance_id).map(String::as_str)
}
}
fn compute_revoke_split(
current: &HashMap<Uuid, Vec<i32>>,
target: &HashMap<Uuid, Vec<i32>>,
) -> (HashMap<Uuid, Vec<i32>>, HashMap<Uuid, Vec<i32>>) {
let mut revoke: HashMap<Uuid, Vec<i32>> = HashMap::new();
let mut keep: HashMap<Uuid, Vec<i32>> = HashMap::new();
for (tid, parts) in current {
let target_parts = target.get(tid).cloned().unwrap_or_default();
let target_set: HashSet<i32> = target_parts.into_iter().collect();
for p in parts {
if target_set.contains(p) {
keep.entry(*tid).or_default().push(*p);
} else {
revoke.entry(*tid).or_default().push(*p);
}
}
}
(revoke, keep)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn member(id: &str) -> MemberState {
MemberState {
member_id: id.into(),
instance_id: None,
rack_id: None,
client_id: "c".into(),
client_host: "/127.0.0.1".into(),
subscribed_topic_names: HashSet::new(),
subscribed_topic_regex: None,
compiled_regex: None,
server_assignor: None,
rebalance_timeout: Duration::from_mins(1),
member_epoch: 0,
previous_member_epoch: 0,
assignment_state: MemberAssignmentState::Stable,
assigned_partitions: HashMap::new(),
partitions_pending_revocation: HashMap::new(),
last_seen: Instant::now(),
classic: None,
}
}
#[test]
fn add_member_marks_dirty_first_time() {
let mut g = GroupState::new("g");
g.add_or_update_member(member("m1"));
assert!(g.dirty);
}
#[test]
fn re_add_same_subscription_keeps_clean_after_reset() {
let mut g = GroupState::new("g");
g.add_or_update_member(member("m1"));
g.dirty = false;
g.add_or_update_member(member("m1"));
assert!(!g.dirty);
}
#[test]
fn subscription_change_marks_dirty() {
let mut g = GroupState::new("g");
g.add_or_update_member(member("m1"));
g.dirty = false;
let mut m = member("m1");
m.subscribed_topic_names.insert("t".into());
g.add_or_update_member(m);
assert!(g.dirty);
}
#[test]
fn remove_member_marks_dirty() {
let mut g = GroupState::new("g");
g.add_or_update_member(member("m1"));
g.dirty = false;
g.remove_member("m1");
assert!(g.dirty);
}
#[test]
fn evict_expired_drops_old_members() {
let mut g = GroupState::new("g");
let mut m = member("m1");
m.last_seen = Instant::now().checked_sub(Duration::from_mins(2)).unwrap();
g.add_or_update_member(m);
g.add_or_update_member(member("m2"));
let evicted = g.evict_expired(Instant::now(), Duration::from_mins(1));
assert!(evicted == vec!["m1".to_string()]);
assert!(g.members.contains_key("m2"));
}
#[test]
fn install_target_computes_revoke_split() {
let mut g = GroupState::new("g");
let t = Uuid([1; 16]);
let mut m = member("m1");
m.assigned_partitions.insert(t, vec![0, 1, 2]);
g.add_or_update_member(m);
let mut target_for_m1 = HashMap::new();
target_for_m1.insert(t, vec![0, 1]);
g.install_target([("m1".to_string(), target_for_m1)].into());
let m = &g.members["m1"];
assert!(m.partitions_pending_revocation[&t] == vec![2]);
assert!(m.assigned_partitions[&t] == vec![0, 1]);
assert!(m.assignment_state == MemberAssignmentState::UnrevokedPartitions);
}
#[test]
fn instance_binding_tracked() {
let mut g = GroupState::new("g");
let mut m = member("m1");
m.instance_id = Some("inst1".into());
g.add_or_update_member(m);
assert!(g.current_member_for_instance("inst1") == Some("m1"));
}
#[test]
fn bump_epoch_increments_and_dirties() {
let mut g = GroupState::new("g");
g.dirty = false;
g.bump_epoch();
assert!(g.group_epoch == 1);
assert!(g.dirty);
}
#[test]
fn set_regex_compiles_and_caches() {
let mut m = member("m1");
m.set_regex(Some("^orders-.*".into()));
assert!(m.subscribed_topic_regex.as_deref() == Some("^orders-.*"));
let re = m.compiled_regex().expect("valid regex must compile");
assert!(re.is_match("orders-eu"));
assert!(!re.is_match("shipments"));
}
#[test]
fn set_regex_caches_invalid_as_none() {
let mut m = member("m1");
m.set_regex(Some("*invalid".into()));
assert!(m.subscribed_topic_regex.as_deref() == Some("*invalid"));
assert!(m.compiled_regex().is_none());
}
#[test]
fn set_regex_none_clears_cache() {
let mut m = member("m1");
m.set_regex(Some("^a".into()));
assert!(m.compiled_regex().is_some());
m.set_regex(None);
assert!(m.subscribed_topic_regex.is_none());
assert!(m.compiled_regex().is_none());
}
#[test]
fn sync_regex_cache_populates_from_literal_field() {
let mut m = member("m1");
m.subscribed_topic_regex = Some("^a".into());
m.compiled_regex = None;
m.sync_regex_cache();
assert!(m.subscribed_topic_regex.as_deref() == Some("^a"));
assert!(m.compiled_regex().expect("synced").is_match("apple"));
}
#[test]
fn advance_member_epoch_records_previous() {
let mut g = GroupState::new("g");
g.add_or_update_member(member("m1"));
g.group_epoch = 5;
g.advance_member_epoch("m1");
let m = &g.members["m1"];
assert!(m.member_epoch == 5);
assert!(m.previous_member_epoch == 0);
}
}