use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
use crabka_protocol::primitives::uuid::Uuid;
#[derive(Debug, Clone)]
pub struct ShareMemberState {
pub member_id: String,
pub rack_id: Option<String>,
pub client_id: String,
pub client_host: String,
pub subscribed_topic_names: HashSet<String>,
pub member_epoch: i32,
pub assigned_partitions: HashMap<Uuid, Vec<i32>>,
pub last_seen: Instant,
}
impl ShareMemberState {
pub fn joining(
member_id: impl Into<String>,
client_id: impl Into<String>,
client_host: impl Into<String>,
subs: HashSet<String>,
) -> Self {
Self {
member_id: member_id.into(),
rack_id: None,
client_id: client_id.into(),
client_host: client_host.into(),
subscribed_topic_names: subs,
member_epoch: 0,
assigned_partitions: HashMap::new(),
last_seen: Instant::now(),
}
}
}
#[derive(Debug)]
pub struct ShareTargetAssignment {
pub epoch: i32,
pub per_member: HashMap<String, HashMap<Uuid, Vec<i32>>>,
}
#[derive(Debug)]
pub struct ShareGroupState {
pub group_id: String,
pub group_epoch: i32,
pub members: HashMap<String, ShareMemberState>,
pub target: ShareTargetAssignment,
pub dirty: bool,
pub initialized: HashSet<(Uuid, i32)>,
}
impl ShareGroupState {
pub fn new(group_id: impl Into<String>) -> Self {
Self {
group_id: group_id.into(),
group_epoch: 0,
members: HashMap::new(),
target: ShareTargetAssignment {
epoch: 0,
per_member: HashMap::new(),
},
dirty: false,
initialized: HashSet::new(),
}
}
pub fn bump_epoch(&mut self) {
self.group_epoch += 1;
}
pub fn add_or_update_member(&mut self, m: ShareMemberState) {
self.members.insert(m.member_id.clone(), m);
self.dirty = true;
}
pub fn remove_member(&mut self, member_id: &str) -> Option<ShareMemberState> {
let r = self.members.remove(member_id);
if r.is_some() {
self.dirty = true;
}
r
}
pub fn evict_expired(&mut self, now: Instant, session_timeout: Duration) -> Vec<String> {
let expired: Vec<String> = self
.members
.iter()
.filter(|(_, m)| now.duration_since(m.last_seen) > session_timeout)
.map(|(id, _)| id.clone())
.collect();
for id in &expired {
self.members.remove(id);
}
if !expired.is_empty() {
self.dirty = true;
}
expired
}
pub fn install_target(&mut self, per_member: HashMap<String, HashMap<Uuid, Vec<i32>>>) {
self.target = ShareTargetAssignment {
epoch: self.group_epoch,
per_member,
};
}
pub fn advance_member_epoch(&mut self, member_id: &str) {
if let Some(m) = self.members.get_mut(member_id) {
m.member_epoch = self.group_epoch;
if let Some(a) = self.target.per_member.get(member_id) {
m.assigned_partitions.clone_from(a);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::time::{Duration, Instant};
#[test]
fn add_member_bumps_nothing_until_reconcile() {
let mut g = ShareGroupState::new("g1");
assert!(g.group_epoch == 0);
g.add_or_update_member(ShareMemberState::joining(
"m1",
"c1",
"h1",
["t1".to_string()].into_iter().collect(),
));
assert!(g.members.len() == 1);
assert!(g.dirty);
}
#[test]
fn evict_expired_removes_silent_members() {
let mut g = ShareGroupState::new("g1");
let mut m = ShareMemberState::joining("m1", "c1", "h1", HashSet::default());
m.last_seen = Instant::now();
g.add_or_update_member(m);
let recent = Instant::now() + Duration::from_millis(50);
let kept = g.evict_expired(recent, Duration::from_secs(45));
assert!(kept.is_empty());
assert!(g.members.len() == 1);
let later = Instant::now() + Duration::from_millis(50);
let evicted = g.evict_expired(later, Duration::from_millis(1));
assert!(evicted == vec!["m1".to_string()]);
assert!(g.members.is_empty());
}
}