use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use crabka_protocol::owned::consumer_group_heartbeat_request::ConsumerGroupHeartbeatRequest;
use crabka_protocol::owned::consumer_group_heartbeat_response::{
Assignment as RespAssignment, ConsumerGroupHeartbeatResponse,
};
use crabka_protocol::owned::heartbeat_request::HeartbeatRequest;
use crabka_protocol::owned::join_group_request::JoinGroupRequest;
use crabka_protocol::owned::leave_group_request::LeaveGroupRequest;
use crabka_protocol::owned::leave_group_response::MemberResponse;
use crabka_protocol::owned::sync_group_request::SyncGroupRequest;
use crabka_protocol::primitives::uuid::Uuid;
use crate::codes;
use crate::coordinator::{GroupSnapshot, MemberSnapshot};
use super::classic_ops;
use super::classic_state::{Group as ClassicState, GroupState as ClassicGroupState, OffsetEntry};
use super::config::NextGenConfig;
use super::consumer_state::{GroupState, MemberState};
use super::group::{Group, GroupKind};
use super::migration;
use super::offsets_log::OffsetsLog;
use super::persistence_next_gen::MemberAssignmentState;
use super::reconciler::{self, ReconcileInput};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupKindTag {
Classic,
Consumer,
}
#[derive(Debug)]
pub enum GroupActorMessage {
Heartbeat {
request: ConsumerGroupHeartbeatRequest,
client_host: String,
reply: oneshot::Sender<ConsumerGroupHeartbeatResponse>,
},
ValidateCommit {
member_id: String,
group_instance_id: Option<String>,
generation_or_epoch: i32,
reply: oneshot::Sender<Result<(), i16>>,
},
Describe {
reply: oneshot::Sender<DescribeView>,
},
ClassicJoin {
req: JoinGroupRequest,
client_host: String,
reply: oneshot::Sender<JoinResult>,
},
ClassicSync {
req: SyncGroupRequest,
reply: oneshot::Sender<SyncResult>,
},
ClassicHeartbeat {
req: HeartbeatRequest,
reply: oneshot::Sender<i16>,
},
ClassicLeave {
req: LeaveGroupRequest,
version: i16,
reply: oneshot::Sender<Vec<MemberResponse>>,
},
ClassicInspect {
reply: oneshot::Sender<ClassicView>,
},
InspectAny {
reply: oneshot::Sender<GroupSnapshot>,
},
UpdateCommitted {
entries: Vec<((String, i32), OffsetEntry)>,
reply: oneshot::Sender<()>,
},
FetchCommitted {
reply: oneshot::Sender<HashMap<(String, i32), OffsetEntry>>,
},
RemoveCommitted {
keys: Vec<(String, i32)>,
reply: oneshot::Sender<()>,
},
Seed(super::GroupSeed),
ClassicSeed(Box<Group>),
Shutdown(oneshot::Sender<()>),
#[cfg(test)]
TestForceConsumerKind,
}
#[derive(Debug, Default, Clone)]
pub struct JoinResult {
pub error_code: i16,
pub generation_id: i32,
pub protocol_type: Option<String>,
pub protocol_name: Option<String>,
pub leader: String,
pub member_id: String,
pub members: Vec<JoinResultMember>,
}
#[derive(Debug, Clone)]
pub struct JoinResultMember {
pub member_id: String,
pub group_instance_id: Option<String>,
pub metadata: Bytes,
}
#[derive(Debug, Default, Clone)]
pub struct SyncResult {
pub error_code: i16,
pub assignment: Bytes,
pub protocol_type: Option<String>,
pub protocol_name: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ClassicView {
pub group_id: String,
pub state: ClassicGroupState,
pub protocol_type: Option<String>,
pub protocol_name: Option<String>,
pub generation_id: i32,
pub members: Vec<ClassicMemberView>,
}
#[derive(Debug, Clone)]
pub struct ClassicMemberView {
pub member_id: String,
pub client_id: String,
pub host: String,
pub group_instance_id: Option<String>,
pub protocol_metadata: Bytes,
pub assignment: Option<Bytes>,
}
impl ClassicView {
#[must_use]
pub fn snapshot(&self) -> GroupSnapshot {
GroupSnapshot {
group_id: self.group_id.clone(),
state: self.state,
protocol_type: self.protocol_type.clone(),
protocol_name: self.protocol_name.clone(),
generation_id: self.generation_id,
members: self
.members
.iter()
.map(|m| MemberSnapshot {
member_id: m.member_id.clone(),
client_id: m.client_id.clone(),
client_host: m.host.clone(),
assignment: m
.assignment
.as_ref()
.map(|b| b.to_vec())
.unwrap_or_default(),
protocol_metadata: m.protocol_metadata.to_vec(),
})
.collect(),
}
}
}
fn build_consumer_snapshot(state: &GroupState, image: &ReconcileInput) -> GroupSnapshot {
GroupSnapshot {
group_id: state.group_id.clone(),
state: ClassicGroupState::Stable,
protocol_type: Some("consumer".into()),
protocol_name: None,
generation_id: state.group_epoch,
members: state
.members
.values()
.map(|m| {
let target = state
.target
.per_member
.get(&m.member_id)
.cloned()
.unwrap_or_default();
let assignment = migration::target_to_consumer_assignment(&target, image).to_vec();
MemberSnapshot {
member_id: m.member_id.clone(),
client_id: m.client_id.clone(),
client_host: m.client_host.clone(),
assignment,
protocol_metadata: Vec::new(),
}
})
.collect(),
}
}
#[derive(Debug, Clone)]
pub struct DescribeView {
pub group_id: String,
pub group_epoch: i32,
pub assignment_epoch: i32,
pub members: Vec<DescribeMember>,
}
#[derive(Debug, Clone)]
pub struct DescribeMember {
pub member_id: String,
pub instance_id: Option<String>,
pub member_epoch: i32,
pub client_id: String,
pub client_host: String,
pub subscribed_topic_names: Vec<String>,
pub assigned_partitions: HashMap<Uuid, Vec<i32>>,
pub is_classic: bool,
}
#[derive(Debug)]
pub struct GroupActorHandle {
pub tx: mpsc::Sender<GroupActorMessage>,
pub kind: GroupKindTag,
_task: JoinHandle<()>,
}
impl GroupActorHandle {
pub fn spawn(
group_id: String,
kind: GroupKindTag,
config: Arc<NextGenConfig>,
metadata_provider: Arc<dyn MetadataProvider>,
offsets_log: Arc<dyn OffsetsLog>,
coordinator: Arc<super::GroupCoordinator>,
) -> Self {
let (tx, rx) = mpsc::channel(64);
let task = tokio::spawn(actor_loop(
group_id,
kind,
config,
metadata_provider,
offsets_log,
coordinator,
rx,
));
Self {
tx,
kind,
_task: task,
}
}
}
pub trait MetadataProvider: Send + Sync + std::fmt::Debug {
fn snapshot(&self) -> ReconcileInput;
}
#[derive(Default)]
struct ParkedWaiters {
joiners: HashMap<String, oneshot::Sender<JoinResult>>,
followers: HashMap<String, oneshot::Sender<SyncResult>>,
}
#[allow(clippy::too_many_lines)] async fn actor_loop(
group_id: String,
kind: GroupKindTag,
config: Arc<NextGenConfig>,
metadata: Arc<dyn MetadataProvider>,
offsets_log: Arc<dyn OffsetsLog>,
coordinator: Arc<super::GroupCoordinator>,
mut rx: mpsc::Receiver<GroupActorMessage>,
) {
let mut group = match kind {
GroupKindTag::Classic => Group::new_classic(group_id),
GroupKindTag::Consumer => Group::new_consumer(group_id),
};
let mut parked = ParkedWaiters::default();
let mut tick = tokio::time::interval(Duration::from_secs(1));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
let deadline = classic_deadline(&group);
tokio::select! {
msg = rx.recv() => {
let Some(msg) = msg else { break };
match msg {
GroupActorMessage::Heartbeat { request, client_host, reply } => {
if group.is_classic() {
let convertible = group
.as_classic()
.is_some_and(migration::classic_is_convertible);
if config.migration_policy.allows_upgrade() && convertible {
let classic = group.as_classic().expect("classic kind");
let new_state = migration::convert_classic_to_consumer(classic);
let pending = migration::upgrade_pending_records(&new_state);
if flush_pending(
&new_state, pending, &*offsets_log, &coordinator,
chrono_now_ms(),
)
.await
.is_err()
{
let _ = reply.send(ConsumerGroupHeartbeatResponse {
error_code: codes::COORDINATOR_LOAD_IN_PROGRESS,
..Default::default()
});
break;
}
*group.kind_mut() = GroupKind::Consumer(new_state);
} else {
let _ = reply.send(ConsumerGroupHeartbeatResponse {
error_code: codes::GROUP_ID_NOT_FOUND,
..Default::default()
});
continue;
}
}
let Some(state) = group.as_consumer_mut() else {
let _ = reply.send(ConsumerGroupHeartbeatResponse {
error_code: codes::GROUP_ID_NOT_FOUND,
..Default::default()
});
continue;
};
match handle_heartbeat(
state, &config, &*metadata, &*offsets_log, &coordinator,
&request, &client_host,
).await {
Ok(resp) => { let _ = reply.send(resp); }
Err(e) => {
tracing::warn!(
group_id = %group.group_id, error = %e,
"next-gen actor exiting after log-write failure",
);
let _ = reply.send(ConsumerGroupHeartbeatResponse {
error_code: codes::COORDINATOR_LOAD_IN_PROGRESS,
..Default::default()
});
break;
}
}
if let Err(e) = maybe_downgrade(
&mut group, &config, &*metadata, &*offsets_log, &coordinator,
).await {
tracing::warn!(
group_id = %group.group_id, error = %e,
"next-gen actor exiting after downgrade log-write failure",
);
break;
}
}
GroupActorMessage::ValidateCommit { member_id, group_instance_id, generation_or_epoch, reply } => {
let result: Result<(), i16> = if let Some(s) = group.as_consumer() {
match s.members.get(&member_id) {
None => Err(codes::UNKNOWN_MEMBER_ID),
Some(m) if generation_or_epoch < m.member_epoch => Err(codes::STALE_MEMBER_EPOCH),
Some(m) if generation_or_epoch > m.member_epoch => Err(codes::FENCED_MEMBER_EPOCH),
Some(_) => Ok(()),
}
} else if let Some(s) = group.as_classic() {
match classic_ops::validate_commit(s, &member_id, group_instance_id.as_deref(), generation_or_epoch) {
None => Ok(()),
Some(code) => Err(code),
}
} else {
Ok(())
};
let _ = reply.send(result);
}
GroupActorMessage::Describe { reply } => {
if let Some(state) = group.as_consumer() {
let _ = reply.send(build_describe(state));
}
}
GroupActorMessage::ClassicJoin { req, client_host, reply } => {
if let Some(state) = group.as_classic_mut() {
match classic_ops::handle_join(state, &req, &client_host) {
classic_ops::JoinAction::Immediate(result) => {
let _ = reply.send(result);
}
classic_ops::JoinAction::Park => {
parked.joiners.insert(req.member_id, reply);
}
classic_ops::JoinAction::CompleteNow => {
parked.joiners.insert(req.member_id, reply);
complete_classic_rebalance(state, &mut parked.joiners);
}
}
} else if group.as_consumer().is_some() {
if classic_join_hosted(
&mut group, &config, &*metadata, &*offsets_log,
&coordinator, &req, &client_host, reply, chrono_now_ms(),
)
.await
.is_err()
{
break;
}
} else {
let _ = reply.send(JoinResult {
error_code: codes::INCONSISTENT_GROUP_PROTOCOL,
member_id: req.member_id,
..JoinResult::default()
});
}
}
GroupActorMessage::ClassicSync { req, reply } => {
let Some(state) = group.as_classic_mut() else {
if let Some(state) = group.as_consumer_mut() {
let result = migration::serve_classic_sync(
state, &req.member_id, &metadata.snapshot(),
);
let _ = reply.send(result);
} else {
let _ = reply.send(SyncResult {
error_code: codes::UNKNOWN_MEMBER_ID,
..SyncResult::default()
});
}
continue;
};
match classic_ops::handle_sync(state, &req) {
classic_ops::SyncAction::Immediate(result) => {
let _ = reply.send(result);
}
classic_ops::SyncAction::Park => {
parked.followers.insert(req.member_id, reply);
}
classic_ops::SyncAction::LeaderInstalled(result) => {
let _ = reply.send(result);
drain_parked_followers(state, &mut parked.followers);
}
}
}
GroupActorMessage::ClassicHeartbeat { req, reply } => {
if let Some(state) = group.as_classic_mut() {
let code = classic_ops::handle_heartbeat(state, &req);
let _ = reply.send(code);
} else if let Some(state) = group.as_consumer_mut() {
let code = migration::serve_classic_heartbeat(
state, &req.member_id, &metadata.snapshot(),
);
let _ = reply.send(code);
} else {
let _ = reply.send(codes::UNKNOWN_MEMBER_ID);
}
}
GroupActorMessage::ClassicLeave { req, version, reply } => {
if let Some(state) = group.as_classic_mut() {
let responses = classic_ops::handle_leave(state, &req, version);
let _ = reply.send(responses);
maybe_complete_classic(state, &mut parked.joiners);
} else {
let _ = reply.send(Vec::new());
}
}
GroupActorMessage::ClassicInspect { reply } => {
if let Some(state) = group.as_classic() {
let _ = reply.send(build_classic_view(state));
}
}
GroupActorMessage::InspectAny { reply } => {
if let Some(state) = group.as_classic() {
let _ = reply.send(build_classic_view(state).snapshot());
} else if let Some(state) = group.as_consumer() {
let _ = reply.send(build_consumer_snapshot(state, &metadata.snapshot()));
}
}
GroupActorMessage::UpdateCommitted { entries, reply } => {
for (k, v) in entries {
group.committed_offsets.insert(k, v);
}
let _ = reply.send(());
}
GroupActorMessage::FetchCommitted { reply } => {
let _ = reply.send(group.committed_offsets.clone());
}
GroupActorMessage::RemoveCommitted { keys, reply } => {
for k in &keys {
group.committed_offsets.remove(k);
}
let _ = reply.send(());
}
GroupActorMessage::Seed(seed) => {
if let Some(state) = group.as_consumer_mut() {
apply_seed(state, seed);
}
}
GroupActorMessage::ClassicSeed(seeded) => {
group = *seeded;
}
GroupActorMessage::Shutdown(reply) => {
let _ = reply.send(());
break;
}
#[cfg(test)]
GroupActorMessage::TestForceConsumerKind => {
group = Group::new_consumer(group.group_id.clone());
}
}
}
_ = tick.tick() => {
let gid = group.group_id.clone();
if let Some(state) = group.as_consumer_mut() {
if handle_session_tick(state, &config, &*metadata, &*offsets_log, &coordinator)
.await
.is_err()
{
break;
}
if let Err(e) = maybe_downgrade(
&mut group, &config, &*metadata, &*offsets_log, &coordinator,
).await {
tracing::warn!(
group_id = %gid, error = %e,
"next-gen actor exiting after tick downgrade log-write failure",
);
break;
}
} else if let Some(state) = group.as_classic_mut() {
let dropped = state.expire_dead_members(Instant::now());
if !dropped.is_empty() {
tracing::info!(
group = %gid, dropped = ?dropped,
"expired members; waking joiners",
);
maybe_complete_classic(state, &mut parked.joiners);
}
}
}
() = opt_sleep(deadline) => {
if let Some(state) = group.as_classic_mut() {
complete_classic_rebalance(state, &mut parked.joiners);
}
}
}
}
}
fn classic_deadline(group: &Group) -> Option<Instant> {
group.as_classic().and_then(|s| s.rebalance_deadline)
}
async fn opt_sleep(deadline: Option<Instant>) {
match deadline {
Some(d) => tokio::time::sleep_until(d.into()).await,
None => std::future::pending::<()>().await,
}
}
fn complete_classic_rebalance(
state: &mut ClassicState,
joiners: &mut HashMap<String, oneshot::Sender<JoinResult>>,
) {
let inconsistent = classic_ops::try_complete(state).is_err();
for (member_id, sender) in joiners.drain() {
let result = if inconsistent {
JoinResult {
error_code: codes::INCONSISTENT_GROUP_PROTOCOL,
member_id: member_id.clone(),
protocol_type: state.protocol_type.clone(),
..JoinResult::default()
}
} else {
classic_ops::build_join_result(state, &member_id)
};
let _ = sender.send(result);
}
}
fn maybe_complete_classic(
state: &mut ClassicState,
joiners: &mut HashMap<String, oneshot::Sender<JoinResult>>,
) {
let should = state.generation_id > 0
&& matches!(state.state, ClassicGroupState::PreparingRebalance)
&& state.all_members_joined_this_round();
if should {
complete_classic_rebalance(state, joiners);
}
}
fn drain_parked_followers(
state: &ClassicState,
followers: &mut HashMap<String, oneshot::Sender<SyncResult>>,
) {
let protocol_type = state.protocol_type.clone();
let protocol_name = state.protocol_name.clone();
for (member_id, sender) in followers.drain() {
let result = classic_ops::read_sync_result(
state,
&member_id,
protocol_type.clone(),
protocol_name.clone(),
);
let _ = sender.send(result);
}
}
fn build_classic_view(state: &ClassicState) -> ClassicView {
ClassicView {
group_id: state.group_id.clone(),
state: state.state,
protocol_type: state.protocol_type.clone(),
protocol_name: state.protocol_name.clone(),
generation_id: state.generation_id,
members: state
.members
.values()
.map(|m| ClassicMemberView {
member_id: m.member_id.clone(),
client_id: m.client_id.clone(),
host: m.host.clone(),
group_instance_id: m.group_instance_id.clone(),
protocol_metadata: m.protocol_metadata.clone(),
assignment: m.assignment.clone(),
})
.collect(),
}
}
async fn handle_session_tick(
state: &mut GroupState,
config: &NextGenConfig,
metadata: &dyn MetadataProvider,
offsets_log: &dyn OffsetsLog,
coordinator: &super::GroupCoordinator,
) -> Result<(), crate::error::BrokerError> {
let evicted = state.evict_expired(Instant::now(), config.session_timeout);
if evicted.is_empty() {
return Ok(());
}
run_reconcile(state, config, metadata);
let mut pending = PendingRecords {
group_metadata: Some(GroupMetadataValue {
epoch: state.group_epoch,
}),
..Default::default()
};
if state.target.epoch > 0 {
pending.target_metadata = Some(TargetAssignmentMetadataValue {
assignment_epoch: state.target.epoch,
});
}
for mid in &evicted {
pending.member_metadata.push((mid.clone(), None));
pending.target_per_member.push((mid.clone(), None));
pending.current_per_member.push((mid.clone(), None));
}
let now_ms = chrono_now_ms();
if let Err(e) = flush_pending(state, pending, offsets_log, coordinator, now_ms).await {
tracing::warn!(
group_id = %state.group_id,
error = %e,
"next-gen actor exiting after tick log-write failure",
);
return Err(e);
}
Ok(())
}
async fn maybe_downgrade(
group: &mut Group,
config: &NextGenConfig,
metadata: &dyn MetadataProvider,
offsets_log: &dyn OffsetsLog,
coordinator: &super::GroupCoordinator,
) -> Result<bool, crate::error::BrokerError> {
let Some(state) = group.as_consumer() else {
return Ok(false);
};
if !config.migration_policy.allows_downgrade() {
return Ok(false);
}
if state.members.is_empty() {
return Ok(false);
}
if state.members.values().any(|m| m.classic.is_none()) {
return Ok(false);
}
if !migration::consumer_is_convertible() {
return Ok(false);
}
let image = metadata.snapshot();
{
let state = group
.as_consumer_mut()
.expect("consumer-kind verified above");
state.dirty = true;
run_reconcile(state, config, metadata);
}
let state = group.as_consumer().expect("consumer-kind verified above");
let classic = migration::convert_consumer_to_classic(state, &image);
let pending = migration::downgrade_pending_records(state, &classic);
let group_id = group.group_id.clone();
let batch = pending.into_batch(&group_id, chrono_now_ms());
offsets_log.append(batch).await?;
coordinator.mark_classic_after_downgrade(&group_id);
*group.kind_mut() = GroupKind::Classic(classic);
Ok(true)
}
fn apply_seed(state: &mut GroupState, seed: super::GroupSeed) {
use super::consumer_state::ClassicMemberFacade;
state.group_epoch = seed.group_epoch;
state.target.epoch = seed.target_epoch;
let group_generation = seed.group_epoch;
for (mid, meta) in seed.members {
let mut sub = std::collections::HashSet::new();
for n in meta.subscribed_topic_names {
sub.insert(n);
}
let classic = meta.classic.as_ref().map(|c| ClassicMemberFacade {
generation_id: group_generation,
supported_protocols: c.supported_protocols.clone(),
session_timeout: Duration::from_millis(
u64::try_from(c.session_timeout_ms.max(0)).unwrap_or(30_000),
),
last_synced_assignment: c.last_synced_assignment.clone(),
awaiting_sync: true,
});
state.add_or_update_member(MemberState {
member_id: mid.clone(),
instance_id: meta.instance_id,
rack_id: meta.rack_id,
client_id: meta.client_id,
client_host: meta.client_host,
subscribed_topic_names: sub,
subscribed_topic_regex: meta.subscribed_topic_regex,
compiled_regex: None,
server_assignor: meta.server_assignor,
rebalance_timeout: Duration::from_millis(
u64::try_from(meta.rebalance_timeout_ms.max(0)).unwrap_or(60_000),
),
member_epoch: 0,
previous_member_epoch: 0,
assignment_state: MemberAssignmentState::Stable,
assigned_partitions: HashMap::new(),
partitions_pending_revocation: HashMap::new(),
last_seen: Instant::now(),
classic,
});
}
for (mid, cur) in seed.current_per_member {
if let Some(m) = state.members.get_mut(&mid) {
m.member_epoch = cur.member_epoch;
m.previous_member_epoch = cur.previous_member_epoch;
m.assignment_state = cur.state;
for tp in cur.assigned_partitions {
m.assigned_partitions.insert(tp.topic_id, tp.partitions);
}
for tp in cur.partitions_pending_revocation {
m.partitions_pending_revocation
.insert(tp.topic_id, tp.partitions);
}
}
}
state.dirty = false;
}
#[allow(clippy::too_many_arguments)]
async fn classic_join_hosted(
group: &mut Group,
config: &NextGenConfig,
metadata: &dyn MetadataProvider,
offsets_log: &dyn OffsetsLog,
coordinator: &super::GroupCoordinator,
req: &JoinGroupRequest,
client_host: &str,
reply: oneshot::Sender<JoinResult>,
now_ms: i64,
) -> Result<(), crate::error::BrokerError> {
let decoded = req.protocols.iter().find_map(|p| {
migration::decode_consumer_subscription(&p.metadata).map(|sub| (p.name.clone(), sub.topics))
});
let (protocol_name, topics) = match decoded {
Some((name, topics)) => (Some(name), topics.into_iter().collect()),
None => (
req.protocols.first().map(|p| p.name.clone()),
std::collections::HashSet::new(),
),
};
let protocols: Vec<(String, Bytes)> = req
.protocols
.iter()
.map(|p| (p.name.clone(), p.metadata.clone()))
.collect();
let session_timeout =
Duration::from_millis(u64::try_from(req.session_timeout_ms.max(0)).unwrap_or(30_000));
let rebalance_timeout =
Duration::from_millis(u64::try_from(req.rebalance_timeout_ms.max(0)).unwrap_or(60_000));
let state = group
.as_consumer_mut()
.expect("caller verified consumer kind");
migration::upsert_classic_member(
state,
&req.member_id,
topics,
protocols,
String::new(), client_host.to_string(),
session_timeout,
rebalance_timeout,
req.group_instance_id.clone(),
);
if state.dirty {
run_reconcile(state, config, metadata);
state.advance_member_epoch(&req.member_id);
let pending = snapshot_pending_after_change(state, std::slice::from_ref(&req.member_id));
if let Err(e) = flush_pending(state, pending, offsets_log, coordinator, now_ms).await {
tracing::warn!(
group_id = %state.group_id, error = %e,
"next-gen actor exiting after hosted classic-join log-write failure",
);
let _ = reply.send(JoinResult {
error_code: codes::COORDINATOR_LOAD_IN_PROGRESS,
member_id: req.member_id.clone(),
..JoinResult::default()
});
return Err(e);
}
}
let result = migration::build_hosted_classic_join_result(state, &req.member_id, protocol_name);
let _ = reply.send(result);
Ok(())
}
async fn handle_heartbeat(
state: &mut super::consumer_state::GroupState,
config: &NextGenConfig,
metadata: &dyn MetadataProvider,
offsets_log: &dyn OffsetsLog,
coordinator: &super::GroupCoordinator,
req: &ConsumerGroupHeartbeatRequest,
client_host: &str,
) -> Result<ConsumerGroupHeartbeatResponse, crate::error::BrokerError> {
let now = Instant::now();
let now_ms = chrono_now_ms();
if req.member_epoch == -1 {
return handle_leave(state, config, offsets_log, coordinator, req, now_ms).await;
}
if req
.server_assignor
.as_deref()
.is_some_and(|name| !config.assignor_enabled(name))
{
return Ok(error_resp(codes::UNSUPPORTED_ASSIGNOR, config));
}
if req.member_epoch == 0 && !state.members.contains_key(&req.member_id) {
let new_member_id = if req.member_id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
req.member_id.clone()
};
if let Some(iid) = req.instance_id.as_deref()
&& state
.current_member_for_instance(iid)
.and_then(|existing| state.members.get(existing))
.is_some_and(|m| m.member_epoch != 0)
{
return Ok(error_resp(codes::UNRELEASED_INSTANCE_ID, config));
}
let m = build_member(&new_member_id, req, client_host, now);
state.add_or_update_member(m);
run_reconcile(state, config, metadata);
state.advance_member_epoch(&new_member_id);
let pending = snapshot_pending_after_change(state, std::slice::from_ref(&new_member_id));
flush_pending(state, pending, offsets_log, coordinator, now_ms).await?;
return Ok(build_assignment_resp(state, &new_member_id, config));
}
let cur_epoch = state
.members
.get(&req.member_id)
.map_or(-2, |m| m.member_epoch);
if cur_epoch == -2 {
return Ok(error_resp(codes::UNKNOWN_MEMBER_ID, config));
}
if req.member_epoch < cur_epoch {
return Ok(error_resp(codes::STALE_MEMBER_EPOCH, config));
}
if req.member_epoch > cur_epoch {
return Ok(error_resp(codes::FENCED_MEMBER_EPOCH, config));
}
let any_change = update_member_state(state, config, metadata, req, now, cur_epoch);
if any_change {
let pending = snapshot_pending_after_change(state, std::slice::from_ref(&req.member_id));
flush_pending(state, pending, offsets_log, coordinator, now_ms).await?;
}
Ok(build_assignment_resp(state, &req.member_id, config))
}
fn update_member_state(
state: &mut super::consumer_state::GroupState,
config: &NextGenConfig,
metadata: &dyn MetadataProvider,
req: &ConsumerGroupHeartbeatRequest,
now: Instant,
cur_epoch: i32,
) -> bool {
let mut subscription_changed = false;
let mut became_dirty = false;
if let Some(m) = state.members.get_mut(&req.member_id) {
m.last_seen = now;
if let Some(ref names) = req.subscribed_topic_names {
let set: std::collections::HashSet<String> = names.iter().cloned().collect();
if set != m.subscribed_topic_names {
m.subscribed_topic_names = set;
became_dirty = true;
subscription_changed = true;
}
}
if req.subscribed_topic_regex != m.subscribed_topic_regex {
m.set_regex(req.subscribed_topic_regex.clone());
state.dirty = true;
}
if let Some(ref tp) = req.topic_partitions {
let owned: HashMap<Uuid, Vec<i32>> = tp
.iter()
.map(|t| (t.topic_id, t.partitions.clone()))
.collect();
m.assigned_partitions = owned;
if m.partitions_pending_revocation.is_empty() {
m.assignment_state = MemberAssignmentState::Stable;
}
}
}
if became_dirty {
state.dirty = true;
}
let was_dirty = state.dirty;
run_reconcile(state, config, metadata);
let epoch_advanced = state.target.epoch > cur_epoch;
if epoch_advanced {
state.advance_member_epoch(&req.member_id);
}
subscription_changed || was_dirty || epoch_advanced
}
async fn handle_leave(
state: &mut super::consumer_state::GroupState,
config: &NextGenConfig,
offsets_log: &dyn OffsetsLog,
coordinator: &super::GroupCoordinator,
req: &ConsumerGroupHeartbeatRequest,
now_ms: i64,
) -> Result<ConsumerGroupHeartbeatResponse, crate::error::BrokerError> {
let mut pending = PendingRecords::default();
if state.members.contains_key(&req.member_id) {
pending.member_metadata.push((req.member_id.clone(), None));
pending
.target_per_member
.push((req.member_id.clone(), None));
pending
.current_per_member
.push((req.member_id.clone(), None));
}
state.remove_member(&req.member_id);
state.bump_epoch();
pending.group_metadata = Some(GroupMetadataValue {
epoch: state.group_epoch,
});
flush_pending(state, pending, offsets_log, coordinator, now_ms).await?;
Ok(base_resp(0, req.member_epoch, config))
}
fn run_reconcile(state: &mut GroupState, config: &NextGenConfig, metadata: &dyn MetadataProvider) {
if !state.dirty {
return;
}
let input = metadata.snapshot();
let assignor = pick_assignor(state, config);
reconciler::reconcile_if_dirty(state, &input, &*assignor);
}
fn pick_assignor(
state: &GroupState,
config: &NextGenConfig,
) -> std::sync::Arc<dyn super::assignor::Assignor> {
for m in state.members.values() {
if let Some(name) = m.server_assignor.as_deref()
&& let Some(a) = config.find_assignor(name)
{
return a;
}
}
config
.assignors
.first()
.cloned()
.expect("NextGenConfig must have at least one registered assignor")
}
fn build_member(
member_id: &str,
req: &ConsumerGroupHeartbeatRequest,
host: &str,
now: Instant,
) -> MemberState {
let subs: std::collections::HashSet<String> = req
.subscribed_topic_names
.clone()
.unwrap_or_default()
.into_iter()
.collect();
MemberState {
member_id: member_id.into(),
instance_id: req.instance_id.clone(),
rack_id: req.rack_id.clone(),
client_id: String::new(),
client_host: host.into(),
subscribed_topic_names: subs,
subscribed_topic_regex: req.subscribed_topic_regex.clone(),
compiled_regex: None,
server_assignor: req.server_assignor.clone(),
rebalance_timeout: Duration::from_millis(
u64::try_from(req.rebalance_timeout_ms.max(0)).unwrap_or(60_000),
),
member_epoch: 0,
previous_member_epoch: 0,
assignment_state: MemberAssignmentState::Stable,
assigned_partitions: HashMap::new(),
partitions_pending_revocation: HashMap::new(),
last_seen: now,
classic: None,
}
}
fn base_resp(
error_code: i16,
member_epoch: i32,
config: &NextGenConfig,
) -> ConsumerGroupHeartbeatResponse {
ConsumerGroupHeartbeatResponse {
error_code,
member_epoch,
heartbeat_interval_ms: i32::try_from(config.heartbeat_interval.as_millis())
.unwrap_or(5_000),
..Default::default()
}
}
fn error_resp(error_code: i16, config: &NextGenConfig) -> ConsumerGroupHeartbeatResponse {
base_resp(error_code, 0, config)
}
fn build_assignment_resp(
state: &GroupState,
member_id: &str,
config: &NextGenConfig,
) -> ConsumerGroupHeartbeatResponse {
let m = state
.members
.get(member_id)
.expect("member exists at build_assignment_resp");
let target_partitions = state
.target
.per_member
.get(member_id)
.cloned()
.unwrap_or_default();
let assignment = Some(RespAssignment {
topic_partitions: target_partitions
.iter()
.map(
|(tid, parts)| crabka_protocol::owned::common::consumer_group_heartbeat_response::topic_partitions::TopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
..Default::default()
},
)
.collect(),
..Default::default()
});
ConsumerGroupHeartbeatResponse {
error_code: 0,
member_id: Some(member_id.into()),
member_epoch: m.member_epoch,
heartbeat_interval_ms: i32::try_from(config.heartbeat_interval.as_millis())
.unwrap_or(5_000),
assignment,
..Default::default()
}
}
fn build_describe(state: &GroupState) -> DescribeView {
DescribeView {
group_id: state.group_id.clone(),
group_epoch: state.group_epoch,
assignment_epoch: state.target.epoch,
members: state
.members
.values()
.map(|m| DescribeMember {
member_id: m.member_id.clone(),
instance_id: m.instance_id.clone(),
member_epoch: m.member_epoch,
client_id: m.client_id.clone(),
client_host: m.client_host.clone(),
subscribed_topic_names: m.subscribed_topic_names.iter().cloned().collect(),
assigned_partitions: m.assigned_partitions.clone(),
is_classic: m.is_classic(),
})
.collect(),
}
}
use crabka_protocol::records::{Record, RecordBatch};
use super::persistence_next_gen::{
CurrentMemberAssignmentValue, GroupMetadataValue, MemberMetadataValue, NextGenKey,
TargetAssignmentMemberValue, TargetAssignmentMetadataValue, encode_key,
};
#[derive(Debug, Default)]
pub(crate) struct PendingRecords {
pub group_metadata: Option<GroupMetadataValue>,
pub member_metadata: Vec<(String, Option<MemberMetadataValue>)>,
pub target_metadata: Option<TargetAssignmentMetadataValue>,
pub target_per_member: Vec<(String, Option<TargetAssignmentMemberValue>)>,
pub current_per_member: Vec<(String, Option<CurrentMemberAssignmentValue>)>,
pub classic_group_metadata_tombstone: bool,
pub next_gen_group_metadata_tombstone: bool,
pub next_gen_target_metadata_tombstone: bool,
pub classic_group_metadata:
Option<crate::coordinator::unified::persistence::GroupMetadataValue>,
}
impl PendingRecords {
pub fn is_empty(&self) -> bool {
self.group_metadata.is_none()
&& self.member_metadata.is_empty()
&& self.target_metadata.is_none()
&& self.target_per_member.is_empty()
&& self.current_per_member.is_empty()
&& !self.classic_group_metadata_tombstone
&& !self.next_gen_group_metadata_tombstone
&& !self.next_gen_target_metadata_tombstone
&& self.classic_group_metadata.is_none()
}
pub fn into_batch(self, group_id: &str, now_ms: i64) -> RecordBatch {
let mut records: Vec<Record> = Vec::new();
let mut push = |key: Bytes, value: Option<Bytes>| {
let delta = i32::try_from(records.len()).expect("batch size fits i32");
records.push(Record {
offset_delta: delta,
timestamp_delta: 0,
key: Some(key),
value,
..Default::default()
});
};
if let Some(v) = self.group_metadata {
push(
encode_key(&NextGenKey::GroupMetadata {
group_id: group_id.into(),
}),
Some(v.encode()),
);
}
for (member_id, v) in self.member_metadata {
push(
encode_key(&NextGenKey::MemberMetadata {
group_id: group_id.into(),
member_id,
}),
v.map(|x| x.encode()),
);
}
if let Some(v) = self.target_metadata {
push(
encode_key(&NextGenKey::TargetAssignmentMetadata {
group_id: group_id.into(),
}),
Some(v.encode()),
);
}
for (member_id, v) in self.target_per_member {
push(
encode_key(&NextGenKey::TargetAssignmentMember {
group_id: group_id.into(),
member_id,
}),
v.map(|x| x.encode()),
);
}
for (member_id, v) in self.current_per_member {
push(
encode_key(&NextGenKey::CurrentMemberAssignment {
group_id: group_id.into(),
member_id,
}),
v.map(|x| x.encode()),
);
}
if self.classic_group_metadata_tombstone {
push(
crate::coordinator::unified::persistence::encode_key(
&crate::coordinator::unified::persistence::Key::GroupMetadata {
group_id: group_id.into(),
},
),
None,
);
}
if self.next_gen_group_metadata_tombstone {
push(
encode_key(&NextGenKey::GroupMetadata {
group_id: group_id.into(),
}),
None,
);
}
if self.next_gen_target_metadata_tombstone {
push(
encode_key(&NextGenKey::TargetAssignmentMetadata {
group_id: group_id.into(),
}),
None,
);
}
if let Some(v) = self.classic_group_metadata {
push(
crate::coordinator::unified::persistence::encode_key(
&crate::coordinator::unified::persistence::Key::GroupMetadata {
group_id: group_id.into(),
},
),
Some(v.encode_value()),
);
}
let last_delta = i32::try_from(records.len().saturating_sub(1)).unwrap_or(0);
RecordBatch {
max_timestamp: now_ms,
records,
last_offset_delta: last_delta,
..RecordBatch::default()
}
}
}
fn classic_member_metadata(
m: &super::consumer_state::MemberState,
) -> Option<super::persistence_next_gen::ClassicMemberMetadata> {
m.classic
.as_ref()
.map(|f| super::persistence_next_gen::ClassicMemberMetadata {
session_timeout_ms: i32::try_from(f.session_timeout.as_millis()).unwrap_or(30_000),
supported_protocols: f.supported_protocols.clone(),
last_synced_assignment: f.last_synced_assignment.clone(),
})
}
pub(crate) fn snapshot_seed(state: &super::consumer_state::GroupState) -> super::GroupSeed {
use crate::coordinator::unified::persistence_next_gen as p;
let mut members = std::collections::HashMap::new();
let mut target_per_member = std::collections::HashMap::new();
let mut current_per_member = std::collections::HashMap::new();
for (mid, m) in &state.members {
let mm = p::MemberMetadataValue {
instance_id: m.instance_id.clone(),
rack_id: m.rack_id.clone(),
client_id: m.client_id.clone(),
client_host: m.client_host.clone(),
subscribed_topic_names: m.subscribed_topic_names.iter().cloned().collect(),
subscribed_topic_regex: m.subscribed_topic_regex.clone(),
server_assignor: m.server_assignor.clone(),
rebalance_timeout_ms: i32::try_from(m.rebalance_timeout.as_millis()).unwrap_or(60_000),
classic: classic_member_metadata(m),
};
members.insert(mid.clone(), mm);
let cur = p::CurrentMemberAssignmentValue {
member_epoch: m.member_epoch,
previous_member_epoch: m.previous_member_epoch,
state: m.assignment_state,
assigned_partitions: m
.assigned_partitions
.iter()
.map(|(tid, parts)| p::AssignedTopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
})
.collect(),
partitions_pending_revocation: m
.partitions_pending_revocation
.iter()
.map(|(tid, parts)| p::AssignedTopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
})
.collect(),
};
current_per_member.insert(mid.clone(), cur);
if let Some(target) = state.target.per_member.get(mid) {
let tv = p::TargetAssignmentMemberValue {
topic_partitions: target
.iter()
.map(|(tid, parts)| p::AssignedTopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
})
.collect(),
};
target_per_member.insert(mid.clone(), tv);
}
}
super::GroupSeed {
group_epoch: state.group_epoch,
target_epoch: state.target.epoch,
members,
target_per_member,
current_per_member,
}
}
fn chrono_now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(0))
}
fn snapshot_pending_after_change(
state: &super::consumer_state::GroupState,
affected_members: &[String],
) -> PendingRecords {
use crate::coordinator::unified::persistence_next_gen as p;
let mut pending = PendingRecords {
group_metadata: Some(p::GroupMetadataValue {
epoch: state.group_epoch,
}),
..Default::default()
};
if state.target.epoch > 0 {
pending.target_metadata = Some(p::TargetAssignmentMetadataValue {
assignment_epoch: state.target.epoch,
});
}
for mid in affected_members {
if let Some(m) = state.members.get(mid) {
pending.member_metadata.push((
mid.clone(),
Some(p::MemberMetadataValue {
instance_id: m.instance_id.clone(),
rack_id: m.rack_id.clone(),
client_id: m.client_id.clone(),
client_host: m.client_host.clone(),
subscribed_topic_names: m.subscribed_topic_names.iter().cloned().collect(),
subscribed_topic_regex: m.subscribed_topic_regex.clone(),
server_assignor: m.server_assignor.clone(),
rebalance_timeout_ms: i32::try_from(m.rebalance_timeout.as_millis())
.unwrap_or(60_000),
classic: classic_member_metadata(m),
}),
));
pending.current_per_member.push((
mid.clone(),
Some(p::CurrentMemberAssignmentValue {
member_epoch: m.member_epoch,
previous_member_epoch: m.previous_member_epoch,
state: m.assignment_state,
assigned_partitions: m
.assigned_partitions
.iter()
.map(|(tid, parts)| p::AssignedTopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
})
.collect(),
partitions_pending_revocation: m
.partitions_pending_revocation
.iter()
.map(|(tid, parts)| p::AssignedTopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
})
.collect(),
}),
));
if let Some(target) = state.target.per_member.get(mid) {
pending.target_per_member.push((
mid.clone(),
Some(p::TargetAssignmentMemberValue {
topic_partitions: target
.iter()
.map(|(tid, parts)| p::AssignedTopicPartitions {
topic_id: *tid,
partitions: parts.clone(),
})
.collect(),
}),
));
}
}
}
pending
}
pub(crate) fn full_pending_records(state: &super::consumer_state::GroupState) -> PendingRecords {
let all_member_ids: Vec<String> = state.members.keys().cloned().collect();
snapshot_pending_after_change(state, &all_member_ids)
}
pub(crate) fn classic_group_metadata_record(
state: &super::classic_state::Group,
) -> crate::coordinator::unified::persistence::GroupMetadataValue {
use crate::coordinator::unified::persistence::{GroupMetadataValue, MemberMetadata};
let members = state
.members
.values()
.map(|m| MemberMetadata {
member_id: m.member_id.clone(),
group_instance_id: m.group_instance_id.clone(),
client_id: m.client_id.clone(),
client_host: m.host.clone(),
rebalance_timeout_ms: i32::try_from(m.rebalance_timeout.as_millis()).unwrap_or(60_000),
session_timeout_ms: i32::try_from(m.session_timeout.as_millis()).unwrap_or(30_000),
subscription: m.protocol_metadata.clone(),
assignment: m.assignment.clone().unwrap_or_default(),
})
.collect();
GroupMetadataValue {
protocol_type: state
.protocol_type
.clone()
.unwrap_or_else(|| "consumer".into()),
generation: state.generation_id,
protocol_name: state.protocol_name.clone(),
leader: state.leader_id.clone(),
current_state_timestamp_ms: chrono_now_ms(),
members,
}
}
async fn flush_pending(
state: &super::consumer_state::GroupState,
pending: PendingRecords,
offsets_log: &dyn OffsetsLog,
coordinator: &super::GroupCoordinator,
now_ms: i64,
) -> Result<(), crate::error::BrokerError> {
if pending.is_empty() {
return Ok(());
}
let batch = pending.into_batch(&state.group_id, now_ms);
offsets_log.append(batch).await?;
coordinator.update_cache(&state.group_id, snapshot_seed(state));
Ok(())
}
pub(crate) async fn validate_group_commit(
handle: &GroupActorHandle,
member_id: &str,
generation_or_epoch: i32,
group_instance_id: Option<&str>,
) -> Option<i16> {
let (tx, rx) = oneshot::channel();
if handle
.tx
.send(GroupActorMessage::ValidateCommit {
member_id: member_id.to_string(),
group_instance_id: group_instance_id.map(str::to_string),
generation_or_epoch,
reply: tx,
})
.await
.is_err()
{
return Some(codes::UNKNOWN_SERVER_ERROR);
}
match rx.await {
Ok(Ok(())) => None,
Ok(Err(code)) => Some(code),
Err(_) => Some(codes::UNKNOWN_SERVER_ERROR),
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crate::coordinator::unified::GroupCoordinator;
use crate::coordinator::unified::config::NextGenConfig;
use crate::coordinator::unified::offsets_log::fake::InMemoryOffsetsLog;
use crate::coordinator::unified::reconciler::ReconcileInput;
use std::sync::Arc;
#[derive(Debug)]
struct StaticMetadata {
input: ReconcileInput,
}
impl MetadataProvider for StaticMetadata {
fn snapshot(&self) -> ReconcileInput {
self.input.clone()
}
}
fn empty_metadata() -> Arc<dyn MetadataProvider> {
Arc::new(StaticMetadata {
input: ReconcileInput::default(),
})
}
fn make_coordinator() -> (Arc<GroupCoordinator>, Arc<InMemoryOffsetsLog>) {
let log = Arc::new(InMemoryOffsetsLog::default());
let coord = Arc::new(GroupCoordinator::new(
NextGenConfig::default(),
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
empty_metadata(),
log.clone(),
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
));
(coord, log)
}
fn make_coordinator_with_topic(
topic: &str,
partitions: i32,
) -> (Arc<GroupCoordinator>, Arc<InMemoryOffsetsLog>) {
make_coordinator_with_topic_policy(
topic,
partitions,
crate::coordinator::unified::config::ConsumerGroupMigrationPolicy::default(),
)
}
fn make_coordinator_with_topic_policy(
topic: &str,
partitions: i32,
policy: crate::coordinator::unified::config::ConsumerGroupMigrationPolicy,
) -> (Arc<GroupCoordinator>, Arc<InMemoryOffsetsLog>) {
let topic_id = Uuid([7; 16]);
let input = ReconcileInput {
topic_id_by_name: [(topic.to_string(), topic_id)].into(),
partitions_per_topic: [(topic_id, partitions)].into(),
..Default::default()
};
let metadata: Arc<dyn MetadataProvider> = Arc::new(StaticMetadata { input });
let log = Arc::new(InMemoryOffsetsLog::default());
let coord = Arc::new(GroupCoordinator::new(
NextGenConfig {
migration_policy: policy,
..NextGenConfig::default()
},
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
metadata,
log.clone(),
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
));
(coord, log)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn first_join_emits_one_batch() {
let (coord, log) = make_coordinator();
let handle = coord.get_or_create_consumer("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
assert!(resp.error_code == 0);
let batches = log.batches().await;
assert!(
batches.len() == 1,
"first join should write exactly one batch"
);
assert!(batches[0].records.len() >= 3);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn first_join_adopts_client_member_id() {
let (coord, log) = make_coordinator();
let handle = coord.get_or_create_consumer("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "client-uuid-1".into(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
assert!(resp.error_code == 0, "client-id first-join must succeed");
assert!(
resp.member_id.as_deref() == Some("client-uuid-1"),
"response must echo the client-supplied member id"
);
assert!(resp.member_epoch >= 1, "epoch must advance off 0 on join");
assert!(
log.batches().await.len() == 1,
"client-id first join writes exactly one batch"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn known_member_id_epoch_zero_is_stale() {
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_consumer("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "client-uuid-2".into(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
assert!(rx.await.unwrap().error_code == 0);
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "client-uuid-2".into(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
assert!(rx.await.unwrap().error_code == codes::STALE_MEMBER_EPOCH);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unchanged_heartbeat_emits_no_batch() {
let (coord, log) = make_coordinator();
let handle = coord.get_or_create_consumer("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp1 = rx.await.unwrap();
let mid = resp1.member_id.clone().unwrap();
let batches_after_join = log.batches().await.len();
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: mid,
member_epoch: resp1.member_epoch,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let _ = rx.await.unwrap();
let batches_after_steady = log.batches().await.len();
assert!(
batches_after_steady == batches_after_join,
"steady-state heartbeat should not write"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn leave_emits_tombstone_batch() {
let (coord, log) = make_coordinator();
let handle = coord.get_or_create_consumer("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let mid = rx.await.unwrap().member_id.unwrap();
let pre_leave = log.batches().await.len();
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: mid,
member_epoch: -1,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let _ = rx.await.unwrap();
let batches = log.batches().await;
assert!(batches.len() == pre_leave + 1);
let leave_batch = &batches[batches.len() - 1];
assert!(
leave_batch.records.iter().any(|r| r.value.is_none()),
"leave batch must contain at least one tombstone"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn actor_exits_on_append_error() {
let (coord, log) = make_coordinator();
let handle = coord.get_or_create_consumer("g");
log.fail_next
.store(true, std::sync::atomic::Ordering::SeqCst);
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await;
let resp = rx.await.unwrap();
assert!(resp.error_code == codes::COORDINATOR_LOAD_IN_PROGRESS);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert!(
handle.tx.is_closed(),
"actor mpsc should be closed after exit"
);
let fresh = coord.get_or_create_consumer("g");
assert!(!fresh.tx.is_closed());
}
#[test]
fn pending_records_empty_yields_empty_batch() {
let p = PendingRecords::default();
let batch = p.into_batch("g", 0);
assert!(batch.records.is_empty());
}
#[test]
fn pending_records_offset_deltas_are_sequential() {
let p = PendingRecords {
group_metadata: Some(GroupMetadataValue { epoch: 1 }),
member_metadata: vec![(
"m1".into(),
Some(MemberMetadataValue {
instance_id: None,
rack_id: None,
client_id: "c".into(),
client_host: "h".into(),
subscribed_topic_names: vec!["t".into()],
subscribed_topic_regex: None,
server_assignor: None,
rebalance_timeout_ms: 60_000,
classic: None,
}),
)],
target_metadata: Some(TargetAssignmentMetadataValue {
assignment_epoch: 1,
}),
..Default::default()
};
let batch = p.into_batch("g", 0);
assert!(batch.records.len() == 3);
let deltas: Vec<i32> = batch.records.iter().map(|r| r.offset_delta).collect();
assert!(deltas == vec![0, 1, 2]);
assert!(batch.last_offset_delta == 2);
}
#[test]
fn pending_records_tombstone_omits_value() {
let p = PendingRecords {
member_metadata: vec![("m1".into(), None)],
..Default::default()
};
let batch = p.into_batch("g", 0);
assert!(batch.records.len() == 1);
assert!(batch.records[0].value.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn single_eviction_advances_epoch_by_one() {
use crate::coordinator::unified::consumer_state::GroupState;
let (coord, log) = make_coordinator();
let config = NextGenConfig {
session_timeout: Duration::from_millis(1),
..NextGenConfig::default()
};
let metadata = empty_metadata();
let mut state = GroupState::new("g");
let mut m = build_member(
"m1",
&ConsumerGroupHeartbeatRequest {
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
"h",
Instant::now(),
);
m.last_seen = Instant::now()
.checked_sub(Duration::from_millis(50))
.expect("50ms is always within Instant range");
state.add_or_update_member(m);
run_reconcile(&mut state, &config, &*metadata);
assert!(!state.dirty, "baseline must be clean before eviction");
let epoch_before = state.group_epoch;
handle_session_tick(&mut state, &config, &*metadata, &*log, &coord)
.await
.expect("tick should succeed");
assert!(
state.members.is_empty(),
"expired member must have been evicted"
);
assert!(
state.group_epoch == epoch_before + 1,
"a single eviction must advance the group epoch by exactly 1"
);
}
use crate::coordinator::unified::assignor::{
Assignment, Assignor, MemberSubscription, TopicMetadata,
};
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct CountingAssignor {
calls: Arc<AtomicUsize>,
}
impl Assignor for CountingAssignor {
fn name(&self) -> &'static str {
"counting"
}
fn assign(&self, _members: &[MemberSubscription], _topics: &TopicMetadata) -> Assignment {
self.calls.fetch_add(1, Ordering::SeqCst);
std::collections::HashMap::new()
}
}
#[test]
fn pick_assignor_skips_unregistered_member_preference() {
let config = NextGenConfig::default();
let mut state = crate::coordinator::unified::consumer_state::GroupState::new("g");
let mut m = build_member(
"m1",
&ConsumerGroupHeartbeatRequest::default(),
"h",
Instant::now(),
);
m.server_assignor = Some("ghost".into());
state.members.insert("m1".into(), m);
let picked = pick_assignor(&state, &config);
assert!(picked.name() == "uniform");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn custom_assignor_invoked_when_requested() {
let calls = Arc::new(AtomicUsize::new(0));
let mut config = NextGenConfig::default();
config
.register_assignor(Arc::new(CountingAssignor {
calls: calls.clone(),
}))
.unwrap();
let log = Arc::new(InMemoryOffsetsLog::default());
let coord = Arc::new(GroupCoordinator::new(
config,
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
empty_metadata(),
log,
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
));
let handle = coord.get_or_create_consumer("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
server_assignor: Some("counting".into()),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
assert!(resp.error_code == 0);
assert!(
calls.load(Ordering::SeqCst) >= 1,
"custom assignor must be invoked at least once",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn classic_admin_surface_and_immediate_join() {
use crabka_protocol::owned::join_group_request::JoinGroupRequest;
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_classic("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicJoin {
req: JoinGroupRequest {
group_id: "g".into(),
member_id: String::new(),
protocol_type: "consumer".into(),
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let r = rx.await.unwrap();
assert!(r.error_code == codes::MEMBER_ID_REQUIRED);
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicInspect { reply: tx })
.await
.unwrap();
let view = rx.await.unwrap();
assert!(view.group_id == "g" && view.members.is_empty());
let listed = coord.list_groups().await;
assert!(listed.iter().any(|s| s.group_id == "g"));
assert!(coord.describe_group("g").await.is_some());
assert!(coord.delete_group("g").await.is_ok());
assert!(coord.describe_group("g").await.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn classic_offset_validate_heartbeat_arms() {
use crabka_protocol::owned::heartbeat_request::HeartbeatRequest;
use super::super::classic_state::OffsetEntry;
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_classic("g");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::UpdateCommitted {
entries: vec![(
("t".to_string(), 0),
OffsetEntry {
offset: 42,
leader_epoch: 1,
metadata: String::new(),
commit_timestamp_ms: 0,
},
)],
reply: tx,
})
.await
.unwrap();
rx.await.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::FetchCommitted { reply: tx })
.await
.unwrap();
let committed = rx.await.unwrap();
assert!(committed.get(&("t".to_string(), 0)).unwrap().offset == 42);
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ValidateCommit {
member_id: String::new(),
group_instance_id: None,
generation_or_epoch: -1,
reply: tx,
})
.await
.unwrap();
assert!(rx.await.unwrap() == Ok(()));
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicHeartbeat {
req: HeartbeatRequest {
group_id: "g".into(),
member_id: "ghost".into(),
generation_id: 0,
..Default::default()
},
reply: tx,
})
.await
.unwrap();
assert!(rx.await.unwrap() == codes::UNKNOWN_MEMBER_ID);
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::RemoveCommitted {
keys: vec![("t".to_string(), 0)],
reply: tx,
})
.await
.unwrap();
rx.await.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::FetchCommitted { reply: tx })
.await
.unwrap();
assert!(rx.await.unwrap().is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn classic_seed_hydrates_group_and_blocks_delete_when_nonempty() {
use std::time::Duration;
use super::super::classic_state::{Group as ClassicState, Member, OffsetEntry};
use super::super::group::{Group, GroupKind};
let (coord, _log) = make_coordinator();
let mut cs = ClassicState::new("g");
cs.add_member(Member::new(
"m1",
"client",
"127.0.0.1",
Duration::from_secs(30),
Duration::from_mins(1),
vec![("range".into(), bytes::Bytes::new())],
));
let group = Box::new(Group {
group_id: "g".into(),
kind: GroupKind::Classic(cs),
committed_offsets: [(
("t".to_string(), 0),
OffsetEntry {
offset: 7,
leader_epoch: 0,
metadata: String::new(),
commit_timestamp_ms: 0,
},
)]
.into(),
});
coord.seed_classic("g", group);
let handle = coord.find("g").expect("seeded actor");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::FetchCommitted { reply: tx })
.await
.unwrap();
assert!(rx.await.unwrap().get(&("t".to_string(), 0)).unwrap().offset == 7);
assert!(
coord.delete_group("g").await == Err(crate::coordinator::DeleteGroupError::NonEmpty)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cross_protocol_get_or_create_returns_the_one_actor() {
let (coord, _log) = make_coordinator();
let c_consumer = coord.get_or_create_consumer("c");
let c_classic = coord.get_or_create_classic("c");
assert!(Arc::ptr_eq(&c_consumer, &c_classic));
let k_classic = coord.get_or_create_classic("k");
let k_consumer = coord.get_or_create_consumer("k");
assert!(Arc::ptr_eq(&k_classic, &k_consumer));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn actor_tick_does_not_panic_after_in_place_flip() {
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_group("g", GroupKindTag::Classic);
handle
.tx
.send(GroupActorMessage::TestForceConsumerKind)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
assert!(!handle.tx.is_closed());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn consumer_heartbeat_upgrades_a_classic_group() {
use super::super::classic_state::{Group as ClassicState, Member};
use super::super::group::{Group, GroupKind};
let (coord, log) = make_coordinator_with_topic("t", 2);
let mut cs = ClassicState::new("g");
cs.protocol_type = Some("consumer".into());
cs.generation_id = 1;
cs.add_member(Member::new(
"m-classic",
"client",
"127.0.0.1",
std::time::Duration::from_secs(30),
std::time::Duration::from_mins(1),
vec![("range".into(), subscription_blob(&["t"]))],
));
let group = Box::new(Group {
group_id: "g".into(),
kind: GroupKind::Classic(cs),
committed_offsets: HashMap::new(),
});
coord.seed_classic("g", group);
let handle = coord.find("g").expect("seeded classic actor");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
assert!(resp.error_code == codes::NONE);
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Describe { reply: tx })
.await
.unwrap();
let describe = rx.await.unwrap();
assert!(describe.members.len() == 2);
assert!(
describe.members.iter().any(|m| m.is_classic),
"the hosted classic member must survive the upgrade"
);
assert!(
describe.members.iter().any(|m| !m.is_classic),
"the new native consumer member must be present"
);
assert!(log.has_classic_group_metadata_tombstone("g").await);
}
fn subscription_blob(topics: &[&str]) -> Bytes {
use bytes::{BufMut, BytesMut};
use crabka_protocol::Encode;
use crabka_protocol::owned::consumer_protocol_subscription::ConsumerProtocolSubscription;
let sub = ConsumerProtocolSubscription {
topics: topics.iter().map(|s| (*s).to_string()).collect(),
..Default::default()
};
let mut out = BytesMut::new();
out.put_i16(0);
sub.encode(&mut out, 0).unwrap();
out.freeze()
}
fn decode_assignment(
blob: &Bytes,
) -> crabka_protocol::owned::consumer_protocol_assignment::ConsumerProtocolAssignment {
use bytes::Buf;
use crabka_protocol::Decode;
use crabka_protocol::owned::consumer_protocol_assignment::ConsumerProtocolAssignment;
let mut cur = &blob[..];
let version = cur.get_i16();
ConsumerProtocolAssignment::decode(&mut cur, version).expect("assignment decodes")
}
async fn seed_and_upgrade(coord: &Arc<GroupCoordinator>, topic: &str) -> Arc<GroupActorHandle> {
use super::super::classic_state::{Group as ClassicState, Member};
use super::super::group::{Group, GroupKind};
let mut cs = ClassicState::new("g");
cs.protocol_type = Some("consumer".into());
cs.generation_id = 1;
cs.add_member(Member::new(
"m-classic",
"client",
"127.0.0.1",
std::time::Duration::from_secs(30),
std::time::Duration::from_mins(1),
vec![("range".into(), subscription_blob(&[topic]))],
));
let group = Box::new(Group {
group_id: "g".into(),
kind: GroupKind::Classic(cs),
committed_offsets: HashMap::new(),
});
coord.seed_classic("g", group);
let handle = coord.find("g").expect("seeded classic actor");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec![topic.into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
assert!(resp.error_code == codes::NONE);
let native_id = resp.member_id.expect("native member id");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: native_id,
member_epoch: -1,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
assert!(rx.await.unwrap().error_code == codes::NONE);
handle
}
async fn classic_join(handle: &GroupActorHandle, member_id: &str, topic: &str) -> JoinResult {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicJoin {
req: JoinGroupRequest {
group_id: "g".into(),
member_id: member_id.into(),
protocol_type: "consumer".into(),
protocols: vec![
crabka_protocol::owned::join_group_request::JoinGroupRequestProtocol {
name: "range".into(),
metadata: subscription_blob(&[topic]),
..Default::default()
},
],
session_timeout_ms: 30_000,
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: "127.0.0.1".into(),
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
async fn classic_sync(
handle: &GroupActorHandle,
member_id: &str,
generation: i32,
) -> SyncResult {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicSync {
req: SyncGroupRequest {
group_id: "g".into(),
member_id: member_id.into(),
generation_id: generation,
..Default::default()
},
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
async fn classic_heartbeat(handle: &GroupActorHandle, member_id: &str) -> i16 {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicHeartbeat {
req: HeartbeatRequest {
group_id: "g".into(),
member_id: member_id.into(),
generation_id: 0,
..Default::default()
},
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hosted_classic_member_syncs_translated_assignment() {
let (coord, _log) = make_coordinator_with_topic_policy(
"t",
2,
crate::coordinator::unified::config::ConsumerGroupMigrationPolicy::Upgrade,
);
let handle = seed_and_upgrade(&coord, "t").await;
assert!(
classic_heartbeat(&handle, "m-classic").await == codes::REBALANCE_IN_PROGRESS,
"post-upgrade heartbeat must signal a re-sync"
);
let join = classic_join(&handle, "m-classic", "t").await;
assert!(join.error_code == codes::NONE);
assert!(join.leader == "m-classic");
assert!(join.member_id == "m-classic");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Describe { reply: tx })
.await
.unwrap();
let describe = rx.await.unwrap();
assert!(join.generation_id == describe.group_epoch);
let sync = classic_sync(&handle, "m-classic", join.generation_id).await;
assert!(sync.error_code == codes::NONE);
assert!(sync.protocol_type.as_deref() == Some("consumer"));
let asn = decode_assignment(&sync.assignment);
let t_assign = asn
.assigned_partitions
.iter()
.find(|tp| tp.topic == "t")
.expect("assignment contains topic t");
assert!(
!t_assign.partitions.is_empty(),
"m-classic must own partitions of t"
);
assert!(
classic_heartbeat(&handle, "m-classic").await == codes::NONE,
"after sync the member is in sync → NONE"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn new_classic_member_joins_upgraded_group_and_gets_assignment() {
let (coord, _log) = make_coordinator_with_topic_policy(
"t",
2,
crate::coordinator::unified::config::ConsumerGroupMigrationPolicy::Upgrade,
);
let handle = seed_and_upgrade(&coord, "t").await;
let join_c = classic_join(&handle, "m-classic", "t").await;
let _ = classic_sync(&handle, "m-classic", join_c.generation_id).await;
let join2 = classic_join(&handle, "m2", "t").await;
assert!(join2.error_code == codes::NONE);
assert!(join2.leader == "m2");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Describe { reply: tx })
.await
.unwrap();
let epoch = rx.await.unwrap().group_epoch;
let sync_c = classic_sync(&handle, "m-classic", epoch).await;
let sync2 = classic_sync(&handle, "m2", epoch).await;
assert!(sync_c.error_code == codes::NONE);
assert!(sync2.error_code == codes::NONE);
let parts = |s: &SyncResult| -> Vec<i32> {
decode_assignment(&s.assignment)
.assigned_partitions
.iter()
.find(|tp| tp.topic == "t")
.map(|tp| tp.partitions.clone())
.unwrap_or_default()
};
let p_c = parts(&sync_c);
let p_2 = parts(&sync2);
assert!(!p_2.is_empty(), "the new member must receive an assignment");
let set_c: std::collections::HashSet<i32> = p_c.iter().copied().collect();
let set_2: std::collections::HashSet<i32> = p_2.iter().copied().collect();
assert!(
set_c.is_disjoint(&set_2),
"the two members must hold disjoint partitions"
);
let mut union: Vec<i32> = set_c.union(&set_2).copied().collect();
union.sort_unstable();
assert!(
union == vec![0, 1],
"the union of partitions must be {{0, 1}}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn last_consumer_member_leaving_downgrades_to_classic() {
use super::super::classic_state::{Group as ClassicState, Member};
use super::super::group::{Group, GroupKind};
let (coord, log) = make_coordinator_with_topic("t", 2);
let mut cs = ClassicState::new("g");
cs.protocol_type = Some("consumer".into());
cs.generation_id = 1;
cs.add_member(Member::new(
"m-classic",
"client",
"127.0.0.1",
std::time::Duration::from_secs(30),
std::time::Duration::from_mins(1),
vec![("range".into(), subscription_blob(&["t"]))],
));
let group = Box::new(Group {
group_id: "g".into(),
kind: GroupKind::Classic(cs),
committed_offsets: HashMap::new(),
});
coord.seed_classic("g", group);
let handle = coord.find("g").expect("seeded classic actor");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(vec!["t".into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
let resp = rx.await.unwrap();
assert!(resp.error_code == codes::NONE);
let native_id = resp.member_id.expect("native member id");
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: native_id,
member_epoch: -1,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
assert!(rx.await.unwrap().error_code == codes::NONE);
let snap = coord
.describe_group("g")
.await
.expect("group downgraded to classic");
assert!(
snap.members.len() == 1,
"exactly the hosted classic member must remain after downgrade"
);
assert!(
snap.members.iter().any(|m| m.member_id == "m-classic"),
"the hosted classic member must survive the downgrade"
);
assert!(log.has_next_gen_group_metadata_tombstone("g").await);
assert!(log.has_next_gen_target_metadata_tombstone("g").await);
assert!(log_has_classic_group_metadata_write(&log, "g").await);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn describe_reports_an_upgraded_consumer_group() {
let (coord, _log) = make_coordinator_with_topic_policy(
"t",
2,
crate::coordinator::unified::config::ConsumerGroupMigrationPolicy::Upgrade,
);
let _handle = seed_and_upgrade(&coord, "t").await;
let snap = coord
.describe_group("g")
.await
.expect("describe must surface an upgraded consumer group");
assert!(snap.group_id == "g");
assert!(
!snap.members.is_empty(),
"an upgraded consumer group must report its members"
);
assert!(
snap.members.iter().any(|m| m.member_id == "m-classic"),
"the hosted classic member must appear in the describe snapshot"
);
assert!(snap.protocol_type.as_deref() == Some("consumer"));
assert!(
snap.members
.iter()
.any(|m| m.member_id == "m-classic" && !m.assignment.is_empty()),
"the assigned hosted classic member must carry a translated assignment"
);
assert!(
snap.generation_id >= 1,
"an upgraded group's generation must have advanced off 0"
);
let listed = coord.list_groups().await;
assert!(
!listed.iter().any(|s| s.group_id == "g"),
"an upgraded consumer group must not be reported as a classic row"
);
assert!(
coord.consumer_group_ids().contains(&"g".to_string()),
"the upgraded consumer group must be listed for the wire `consumer` pass"
);
}
async fn log_has_classic_group_metadata_write(
log: &InMemoryOffsetsLog,
group_id: &str,
) -> bool {
use crate::coordinator::unified::persistence::{Key, parse_key};
log.batches().await.iter().any(|batch| {
batch.records.iter().any(|rec| {
rec.value.is_some()
&& rec.key.as_ref().is_some_and(|k| {
matches!(
parse_key(k),
Ok(Key::GroupMetadata { group_id: ref gid }) if gid == group_id
)
})
})
})
}
fn seed_classic_member(
coord: &Arc<GroupCoordinator>,
member_id: &str,
topic: &str,
instance_id: Option<&str>,
) -> Arc<GroupActorHandle> {
use super::super::classic_state::{Group as ClassicState, Member};
use super::super::group::{Group, GroupKind};
let mut cs = ClassicState::new("g");
cs.protocol_type = Some("consumer".into());
cs.generation_id = 1;
cs.add_member(
Member::new(
member_id,
"client",
"127.0.0.1",
std::time::Duration::from_secs(30),
std::time::Duration::from_mins(1),
vec![("range".into(), subscription_blob(&[topic]))],
)
.with_instance_id(instance_id.map(str::to_string)),
);
let group = Box::new(Group {
group_id: "g".into(),
kind: GroupKind::Classic(cs),
committed_offsets: HashMap::new(),
});
coord.seed_classic("g", group);
coord.find("g").expect("seeded classic actor")
}
async fn consumer_heartbeat(
handle: &GroupActorHandle,
member_id: &str,
member_epoch: i32,
topic: Option<&str>,
) -> ConsumerGroupHeartbeatResponse {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Heartbeat {
request: ConsumerGroupHeartbeatRequest {
group_id: "g".into(),
member_id: member_id.into(),
member_epoch,
subscribed_topic_names: topic.map(|t| vec![t.into()]),
rebalance_timeout_ms: 60_000,
..Default::default()
},
client_host: String::new(),
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
async fn classic_inspect(handle: &GroupActorHandle) -> ClassicView {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicInspect { reply: tx })
.await
.unwrap();
rx.await.unwrap()
}
async fn classic_leave(handle: &GroupActorHandle, member_id: &str) -> Vec<MemberResponse> {
use crabka_protocol::owned::leave_group_request::MemberIdentity;
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicLeave {
req: LeaveGroupRequest {
group_id: "g".into(),
members: vec![MemberIdentity {
member_id: member_id.into(),
group_instance_id: None,
..Default::default()
}],
..Default::default()
},
version: 3,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
async fn validate_commit(
handle: &GroupActorHandle,
member_id: &str,
generation_or_epoch: i32,
) -> Result<(), i16> {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ValidateCommit {
member_id: member_id.into(),
group_instance_id: None,
generation_or_epoch,
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
async fn fetch_committed(
handle: &GroupActorHandle,
) -> HashMap<(String, i32), super::super::classic_state::OffsetEntry> {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::FetchCommitted { reply: tx })
.await
.unwrap();
rx.await.unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn upgrade_then_downgrade_round_trip() {
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Bidirectional);
let handle = seed_classic_member(&coord, "m1", "t", None);
let up = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(up.error_code == codes::NONE);
let c1 = up.member_id.expect("native member id");
let describe = {
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::Describe { reply: tx })
.await
.unwrap();
rx.await.unwrap()
};
assert!(
describe.members.len() == 2,
"upgraded group hosts both m1 and c1"
);
let leave = consumer_heartbeat(&handle, &c1, -1, None).await;
assert!(leave.error_code == codes::NONE);
let snap = coord
.describe_group("g")
.await
.expect("group downgraded back to classic");
assert!(
snap.members.iter().any(|m| m.member_id == "m1"),
"m1 must survive the upgrade→downgrade round trip"
);
let m1 = snap
.members
.iter()
.find(|m| m.member_id == "m1")
.expect("m1 present");
let assignment_bytes = bytes::Bytes::from(m1.assignment.clone());
let decoded = decode_assignment(&assignment_bytes);
let tp = decoded
.assigned_partitions
.iter()
.find(|tp| tp.topic == "t")
.expect("decoded assignment must contain topic t");
let mut parts = tp.partitions.clone();
parts.sort_unstable();
assert!(
parts == vec![0, 1],
"m1 (sole surviving member) must own BOTH partitions after the downgrade re-reconcile; got {parts:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn static_member_identity_survives_both_flips() {
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Bidirectional);
let handle = seed_classic_member(&coord, "m1", "t", Some("inst-a"));
let up = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(up.error_code == codes::NONE);
let native = up.member_id.expect("native member id");
let leave = consumer_heartbeat(&handle, &native, -1, None).await;
assert!(leave.error_code == codes::NONE);
let view = classic_inspect(&handle).await;
let m1 = view
.members
.iter()
.find(|m| m.member_id == "m1")
.expect("m1 restored as a classic member");
assert!(
m1.group_instance_id.as_deref() == Some("inst-a"),
"the static identity must survive both flips"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn policy_disabled_keeps_group_classic() {
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Disabled);
let handle = seed_classic_member(&coord, "m1", "t", None);
let resp = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(
resp.error_code != codes::NONE,
"Disabled policy must reject the upgrade heartbeat"
);
assert!(
resp.error_code == codes::GROUP_ID_NOT_FOUND,
"an un-upgradable classic group surfaces as GROUP_ID_NOT_FOUND"
);
let view = classic_inspect(&handle).await;
assert!(
view.members.iter().any(|m| m.member_id == "m1"),
"the group must remain classic with m1 intact"
);
assert!(handle.kind == GroupKindTag::Classic);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn committed_offsets_survive_a_flip() {
use super::super::classic_state::OffsetEntry;
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Bidirectional);
let handle = seed_classic_member(&coord, "m1", "t", None);
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::UpdateCommitted {
entries: vec![(
("t".to_string(), 0),
OffsetEntry {
offset: 99,
leader_epoch: 3,
metadata: String::new(),
commit_timestamp_ms: 0,
},
)],
reply: tx,
})
.await
.unwrap();
rx.await.unwrap();
let up = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(up.error_code == codes::NONE);
let native = up.member_id.expect("native member id");
let after_upgrade = fetch_committed(&handle).await;
assert!(
after_upgrade.get(&("t".to_string(), 0)).map(|e| e.offset) == Some(99),
"committed offset must survive the upgrade"
);
let leave = consumer_heartbeat(&handle, &native, -1, None).await;
assert!(leave.error_code == codes::NONE);
let after_downgrade = fetch_committed(&handle).await;
assert!(
after_downgrade.get(&("t".to_string(), 0)).map(|e| e.offset) == Some(99),
"committed offset must survive the downgrade too"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawned_consumer_group_downgrade_allows_classic_offset_commit() {
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Bidirectional);
let handle = coord.get_or_create_consumer("g");
assert!(
handle.kind == GroupKindTag::Consumer,
"the group must be spawned consumer-kind"
);
let up = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(up.error_code == codes::NONE);
let native = up.member_id.expect("native member id");
let join = classic_join(&handle, "m-classic", "t").await;
assert!(join.error_code == codes::NONE);
let leave = consumer_heartbeat(&handle, &native, -1, None).await;
assert!(leave.error_code == codes::NONE);
let view = classic_inspect(&handle).await;
assert!(
handle.kind == GroupKindTag::Consumer,
"spawn-time kind unchanged"
);
assert!(
view.members.iter().any(|m| m.member_id == "m-classic"),
"the hosted classic member must survive the downgrade"
);
let generation = view.generation_id;
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.tx
.send(GroupActorMessage::ValidateCommit {
member_id: "m-classic".into(),
group_instance_id: None,
generation_or_epoch: generation,
reply: tx,
})
.await
.unwrap();
let result = rx.await.unwrap();
assert!(
result == Ok(()),
"ValidateCommit must dispatch on the live (classic) kind and accept \
the downgraded member (got {result:?})"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn downgraded_group_is_deletable_once_empty() {
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Bidirectional);
let handle = coord.get_or_create_consumer("g");
assert!(handle.kind == GroupKindTag::Consumer);
let up = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(up.error_code == codes::NONE);
let native = up.member_id.expect("native member id");
let join = classic_join(&handle, "m-classic", "t").await;
assert!(join.error_code == codes::NONE);
let leave = consumer_heartbeat(&handle, &native, -1, None).await;
assert!(leave.error_code == codes::NONE);
let view = classic_inspect(&handle).await;
assert!(view.members.iter().any(|m| m.member_id == "m-classic"));
assert!(handle.kind == GroupKindTag::Consumer);
assert!(
coord.delete_group("g").await == Err(crate::coordinator::DeleteGroupError::NonEmpty),
"a downgraded non-empty group must report NonEmpty (seen as classic), \
not the stale-handle.kind NotFound"
);
let resp = classic_leave(&handle, "m-classic").await;
assert!(!resp.is_empty());
let view = classic_inspect(&handle).await;
assert!(
view.members.is_empty(),
"the group must be empty after the classic member leaves"
);
assert!(
coord.delete_group("g").await == Ok(()),
"an empty downgraded group must be deletable"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn upgraded_group_fences_stale_native_consumer_commit() {
use crate::coordinator::unified::config::ConsumerGroupMigrationPolicy;
let (coord, _log) =
make_coordinator_with_topic_policy("t", 2, ConsumerGroupMigrationPolicy::Bidirectional);
let handle = seed_classic_member(&coord, "m1", "t", None);
assert!(handle.kind == GroupKindTag::Classic);
let up = consumer_heartbeat(&handle, "", 0, Some("t")).await;
assert!(up.error_code == codes::NONE);
let native = up.member_id.expect("native member id");
let current_epoch = up.member_epoch;
assert!(handle.kind == GroupKindTag::Classic);
let stale = validate_commit(&handle, &native, current_epoch - 1).await;
assert!(
stale == Err(codes::STALE_MEMBER_EPOCH),
"an upgraded group must run the consumer epoch fence (stale); got {stale:?}"
);
let fenced = validate_commit(&handle, &native, current_epoch + 1).await;
assert!(
fenced == Err(codes::FENCED_MEMBER_EPOCH),
"an upgraded group must run the consumer epoch fence (fenced); got {fenced:?}"
);
let ok = validate_commit(&handle, &native, current_epoch).await;
assert!(
ok == Ok(()),
"the current epoch must be accepted; got {ok:?}"
);
}
}