use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use crabka_protocol::owned::common::streams_group_heartbeat_response::status::Status;
use crabka_protocol::owned::common::streams_group_heartbeat_response::task_ids::TaskIds as RespTaskIds;
use crabka_protocol::owned::streams_group_heartbeat_request::StreamsGroupHeartbeatRequest;
use crabka_protocol::owned::streams_group_heartbeat_response::StreamsGroupHeartbeatResponse;
use crate::codes;
use crate::coordinator::unified::offsets_log::OffsetsLog;
use crate::metadata_source::MetadataSource;
use super::assignor::{self, AssignorInput, AssignorMember};
use super::config::StreamsGroupConfig;
use super::persistence::{
PendingStreamsRecords, StreamsEndpoint, StreamsGroupCurrentMemberAssignmentValue,
StreamsGroupMemberMetadataValue, StreamsGroupMetadataValue, StreamsGroupPartitionMetadataValue,
StreamsGroupTargetAssignmentMemberValue, StreamsGroupTargetAssignmentMetadataValue,
StreamsGroupTopologyValue,
};
use super::state::{
StoredTopologyHandle, StreamsGroupState, StreamsGroupStatePhase, StreamsMemberAssignmentState,
StreamsMemberState, StreamsTargetAssignment,
};
use super::topology::{self, status as topo_status};
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum StreamsGroupActorMessage {
Heartbeat {
request: StreamsGroupHeartbeatRequest,
client_id: String,
client_host: String,
reply: oneshot::Sender<StreamsGroupHeartbeatResponse>,
},
Describe {
reply: oneshot::Sender<StreamsDescribeView>,
},
ValidateCommit {
member_id: String,
member_epoch: i32,
reply: oneshot::Sender<Result<(), i16>>,
},
Seed(super::super::StreamsGroupSeed),
Shutdown(oneshot::Sender<()>),
}
#[derive(Debug, Clone)]
pub struct StreamsDescribeView {
pub group_id: String,
pub group_epoch: i32,
pub assignment_epoch: i32,
pub topology_epoch: i32,
pub group_state: String,
pub topology: Option<StreamsGroupTopologyValue>,
pub members: Vec<StreamsDescribeMember>,
}
#[derive(Debug, Clone)]
pub struct StreamsDescribeMember {
pub member_id: String,
pub member_epoch: i32,
pub instance_id: Option<String>,
pub rack_id: Option<String>,
pub client_id: String,
pub client_host: String,
pub process_id: String,
pub active: BTreeMap<String, Vec<i32>>,
pub standby: BTreeMap<String, Vec<i32>>,
pub warmup: BTreeMap<String, Vec<i32>>,
}
#[derive(Debug)]
pub struct StreamsGroupActorHandle {
pub tx: mpsc::Sender<StreamsGroupActorMessage>,
_task: JoinHandle<()>,
}
impl StreamsGroupActorHandle {
pub fn spawn(
group_id: String,
config: Arc<StreamsGroupConfig>,
offsets_log: Arc<dyn OffsetsLog>,
metadata_source: Option<Arc<dyn MetadataSource>>,
coordinator: Arc<super::super::GroupCoordinator>,
) -> Self {
let (tx, rx) = mpsc::channel(64);
let task = tokio::spawn(actor_loop(
group_id,
config,
offsets_log,
metadata_source,
coordinator,
rx,
));
Self { tx, _task: task }
}
}
pub(crate) async fn validate_streams_group_commit(
handle: &StreamsGroupActorHandle,
member_id: &str,
member_epoch: i32,
) -> Option<i16> {
let (tx, rx) = oneshot::channel();
if handle
.tx
.send(StreamsGroupActorMessage::ValidateCommit {
member_id: member_id.to_string(),
member_epoch,
reply: tx,
})
.await
.is_err()
{
return Some(codes::UNKNOWN_SERVER_ERROR);
}
match rx.await {
Ok(Ok(())) => None,
Ok(Err(code)) => Some(code),
Err(_) => Some(codes::UNKNOWN_SERVER_ERROR),
}
}
struct ActorState {
state: StreamsGroupState,
topology: Option<StreamsGroupTopologyValue>,
partition_metadata: Option<StreamsGroupPartitionMetadataValue>,
}
impl ActorState {
fn new(group_id: String) -> Self {
Self {
state: StreamsGroupState::new(group_id),
topology: None,
partition_metadata: None,
}
}
}
async fn actor_loop(
group_id: String,
config: Arc<StreamsGroupConfig>,
offsets_log: Arc<dyn OffsetsLog>,
metadata_source: Option<Arc<dyn MetadataSource>>,
coordinator: Arc<super::super::GroupCoordinator>,
mut rx: mpsc::Receiver<StreamsGroupActorMessage>,
) {
let mut actor = ActorState::new(group_id);
let mut tick = tokio::time::interval(config.heartbeat_interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
msg = rx.recv() => {
let Some(msg) = msg else { break };
match msg {
StreamsGroupActorMessage::Heartbeat { request, client_id, client_host, reply } => {
match handle_heartbeat(
&mut actor,
&config,
&*offsets_log,
metadata_source.as_ref(),
&coordinator,
&request,
&client_id,
&client_host,
)
.await
{
Ok(resp) => {
let _ = reply.send(resp);
}
Err(e) => {
tracing::warn!(
group_id = %actor.state.group_id,
error = %e,
"streams-group actor exiting after log-write failure",
);
let _ = reply.send(StreamsGroupHeartbeatResponse {
error_code: codes::COORDINATOR_LOAD_IN_PROGRESS,
..Default::default()
});
break;
}
}
}
StreamsGroupActorMessage::Describe { reply } => {
let _ = reply.send(build_describe(&actor.state, actor.topology.as_ref()));
}
StreamsGroupActorMessage::ValidateCommit { member_id, member_epoch, reply } => {
let result: Result<(), i16> = if member_id.is_empty() {
Ok(())
} else {
match actor.state.members.get(&member_id) {
None => Err(codes::UNKNOWN_MEMBER_ID),
Some(m) if member_epoch < m.member_epoch => Err(codes::STALE_MEMBER_EPOCH),
Some(m) if member_epoch > m.member_epoch => Err(codes::FENCED_MEMBER_EPOCH),
Some(_) => Ok(()),
}
};
let _ = reply.send(result);
}
StreamsGroupActorMessage::Seed(seed) => {
apply_seed(&mut actor, seed);
}
StreamsGroupActorMessage::Shutdown(reply) => {
let _ = reply.send(());
break;
}
}
}
_ = tick.tick() => {
if handle_session_tick(&mut actor, &config, &*offsets_log, metadata_source.as_ref(), &coordinator).await.is_err() {
break;
}
}
}
}
}
async fn handle_session_tick(
actor: &mut ActorState,
config: &StreamsGroupConfig,
offsets_log: &dyn OffsetsLog,
metadata_source: Option<&Arc<dyn MetadataSource>>,
coordinator: &super::super::GroupCoordinator,
) -> Result<(), crate::error::BrokerError> {
let evicted = actor
.state
.evict_expired(Instant::now(), config.session_timeout);
if evicted.is_empty() {
return Ok(());
}
reconcile(actor, config, metadata_source).await;
let mut pending = snapshot_pending_after_change(actor, &[]);
for mid in &evicted {
pending.member_metadata.push((mid.clone(), None));
pending.target_per_member.push((mid.clone(), None));
pending.current_per_member.push((mid.clone(), None));
}
let now_ms = chrono_now_ms();
flush_pending(actor, pending, offsets_log, coordinator, now_ms).await
}
#[allow(clippy::too_many_arguments)]
async fn handle_heartbeat(
actor: &mut ActorState,
config: &StreamsGroupConfig,
offsets_log: &dyn OffsetsLog,
metadata_source: Option<&Arc<dyn MetadataSource>>,
coordinator: &super::super::GroupCoordinator,
req: &StreamsGroupHeartbeatRequest,
client_id: &str,
client_host: &str,
) -> Result<StreamsGroupHeartbeatResponse, crate::error::BrokerError> {
let now = Instant::now();
let now_ms = chrono_now_ms();
if req.member_epoch == -1 {
return handle_leave(
actor,
config,
offsets_log,
metadata_source,
coordinator,
req,
now_ms,
)
.await;
}
if req.member_epoch == 0 && !actor.state.members.contains_key(&req.member_id) {
let new_member_id = if req.member_id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
req.member_id.clone()
};
let m = build_member(&new_member_id, req, client_id, client_host, now);
actor.state.add_or_update_member(m);
if let Some(topo) = &req.topology {
accept_topology(actor, topo);
}
apply_shutdown_application(actor, req);
reconcile(actor, config, metadata_source).await;
actor.state.advance_member_epoch(&new_member_id);
let pending = snapshot_pending_after_change(actor, std::slice::from_ref(&new_member_id));
flush_pending(actor, pending, offsets_log, coordinator, now_ms).await?;
return Ok(build_assignment_resp(&actor.state, &new_member_id, config));
}
let cur_epoch = actor
.state
.members
.get(&req.member_id)
.map_or(-2, |m| m.member_epoch);
if cur_epoch == -2 {
return Ok(error_resp(codes::UNKNOWN_MEMBER_ID, config));
}
if req.member_epoch < cur_epoch {
return Ok(error_resp(codes::STALE_MEMBER_EPOCH, config));
}
if req.member_epoch > cur_epoch {
return Ok(error_resp(codes::FENCED_MEMBER_EPOCH, config));
}
let mut changed = update_member_steady_state(actor, req, now);
if let Some(topo) = &req.topology {
let cur_topo_epoch = actor.state.topology_epoch;
if topo.epoch > cur_topo_epoch {
accept_topology(actor, topo);
changed = true;
} else if topo.epoch < cur_topo_epoch {
set_status(
actor,
topo_status::STALE_TOPOLOGY,
"member reported a stale topology",
);
}
}
if apply_shutdown_application(actor, req) {
changed = true;
}
if actor.state.dirty {
reconcile(actor, config, metadata_source).await;
changed = true;
}
if actor.state.target.epoch > cur_epoch {
actor.state.advance_member_epoch(&req.member_id);
changed = true;
}
if changed {
let pending = snapshot_pending_after_change(actor, std::slice::from_ref(&req.member_id));
flush_pending(actor, pending, offsets_log, coordinator, now_ms).await?;
}
Ok(build_assignment_resp(&actor.state, &req.member_id, config))
}
fn update_member_steady_state(
actor: &mut ActorState,
req: &StreamsGroupHeartbeatRequest,
now: Instant,
) -> bool {
let Some(m) = actor.state.members.get_mut(&req.member_id) else {
return false;
};
m.last_seen = now;
let mut changed = false;
if let Some(active) = &req.active_tasks {
let map = task_ids_to_map(active);
if map != m.active {
m.active = map;
changed = true;
}
}
if let Some(standby) = &req.standby_tasks {
let map = task_ids_to_map(standby);
if map != m.standby {
m.standby = map;
changed = true;
}
}
if let Some(warmup) = &req.warmup_tasks {
let map = task_ids_to_map(warmup);
if map != m.warmup {
m.warmup = map;
changed = true;
}
}
if let Some(offsets) = &req.task_offsets {
let map = task_offsets_to_map(offsets);
if map != m.task_offsets {
m.task_offsets = map;
changed = true;
}
}
if let Some(end_offsets) = &req.task_end_offsets {
let map = task_offsets_to_map(end_offsets);
if map != m.task_end_offsets {
m.task_end_offsets = map;
changed = true;
}
}
changed
}
async fn handle_leave(
actor: &mut ActorState,
config: &StreamsGroupConfig,
offsets_log: &dyn OffsetsLog,
metadata_source: Option<&Arc<dyn MetadataSource>>,
coordinator: &super::super::GroupCoordinator,
req: &StreamsGroupHeartbeatRequest,
now_ms: i64,
) -> Result<StreamsGroupHeartbeatResponse, crate::error::BrokerError> {
let was_member = actor.state.members.contains_key(&req.member_id);
actor.state.remove_member(&req.member_id);
actor.state.dirty = true;
reconcile(actor, config, metadata_source).await;
let mut pending = snapshot_pending_after_change(actor, &[]);
if was_member {
pending.member_metadata.push((req.member_id.clone(), None));
pending
.target_per_member
.push((req.member_id.clone(), None));
pending
.current_per_member
.push((req.member_id.clone(), None));
}
flush_pending(actor, pending, offsets_log, coordinator, now_ms).await?;
Ok(base_resp(codes::NONE, -1, config))
}
fn accept_topology(
actor: &mut ActorState,
wire_topology: &crabka_protocol::owned::streams_group_heartbeat_request::Topology,
) {
let stored = topology::to_stored_topology(wire_topology);
actor.state.topology = Some(StoredTopologyHandle {
epoch: stored.epoch,
});
actor.state.topology_epoch = stored.epoch;
actor.topology = Some(stored);
actor.state.dirty = true;
}
fn apply_shutdown_application(actor: &mut ActorState, req: &StreamsGroupHeartbeatRequest) -> bool {
if !req.shutdown_application {
return false;
}
set_status(
actor,
topo_status::SHUTDOWN_APPLICATION,
"a member requested application shutdown",
)
}
fn set_status(actor: &mut ActorState, code: i8, detail: &str) -> bool {
if actor.state.status.iter().any(|(c, _)| *c == code) {
return false;
}
actor.state.status.push((code, detail.to_string()));
true
}
async fn reconcile(
actor: &mut ActorState,
config: &StreamsGroupConfig,
metadata_source: Option<&Arc<dyn MetadataSource>>,
) {
if !actor.state.dirty {
return;
}
let (Some(source), Some(topology)) = (metadata_source, actor.topology.clone()) else {
install_empty_target(&mut actor.state, StreamsGroupStatePhase::NotReady);
return;
};
let image = source.current_image();
let mut status = topology::validate_topology(&topology, &image);
let derived = topology::derive_tasks(&topology, &image);
actor.partition_metadata = Some(derived.partition_metadata.clone());
let specs = topology::required_internal_topics(&topology, &derived.num_tasks);
if !specs.is_empty() {
match topology::ensure_internal_topics(source, &specs).await {
Ok(still_missing) => {
if !still_missing.is_empty() {
status.push((
topo_status::MISSING_INTERNAL_TOPICS,
format!(
"internal topics not yet created: {}",
still_missing.join(", ")
),
));
}
}
Err(e) => {
status.push((
topo_status::MISSING_INTERNAL_TOPICS,
format!("internal-topic creation failed: {e}"),
));
}
}
}
let preserved: Vec<(i8, String)> = actor
.state
.status
.iter()
.filter(|(c, _)| *c == topo_status::SHUTDOWN_APPLICATION)
.cloned()
.collect();
let blocking = status.iter().any(|(c, _)| {
*c == topo_status::MISSING_SOURCE_TOPICS
|| *c == topo_status::INCORRECTLY_PARTITIONED_TOPICS
|| *c == topo_status::MISSING_INTERNAL_TOPICS
});
status.extend(preserved);
actor.state.status = status;
if blocking {
install_empty_target(&mut actor.state, StreamsGroupStatePhase::NotReady);
return;
}
compute_and_install_target(actor, config, &topology, &derived.num_tasks);
}
fn compute_and_install_target(
actor: &mut ActorState,
config: &StreamsGroupConfig,
topology: &StreamsGroupTopologyValue,
num_tasks: &BTreeMap<String, i32>,
) {
let members: Vec<AssignorMember> = actor
.state
.members
.values()
.map(|m| AssignorMember {
member_id: m.member_id.clone(),
process_id: m.process_id.clone(),
rack_id: m.rack_id.clone(),
current_active: m.active.clone(),
current_standby: m.standby.clone(),
current_warmup: m.warmup.clone(),
task_lag: task_lag(m),
})
.collect();
let stateful: BTreeSet<String> = topology
.subtopologies
.iter()
.filter(|s| !s.state_changelog_topics.is_empty())
.map(|s| s.subtopology_id.clone())
.collect();
let input = AssignorInput {
tasks: topology::task_set(num_tasks),
stateful,
num_standby_replicas: config.num_standby_replicas,
num_warmup_replicas: config.num_warmup_replicas,
acceptable_recovery_lag: config.acceptable_recovery_lag,
kind: config.assignor,
};
let assignment = assignor::assign(&members, &input);
let target = StreamsTargetAssignment {
epoch: 0,
active: assignment.active,
standby: assignment.standby,
warmup: assignment.warmup,
};
actor.state.bump_epoch();
actor.state.install_target(target);
let pending_revocation = actor
.state
.members
.values()
.any(|m| m.assignment_state == StreamsMemberAssignmentState::UnrevokedActiveTasks);
actor.state.phase = if pending_revocation {
StreamsGroupStatePhase::Reconciling
} else {
StreamsGroupStatePhase::Stable
};
actor.state.dirty = false;
}
fn install_empty_target(state: &mut StreamsGroupState, phase: StreamsGroupStatePhase) {
state.bump_epoch();
state.install_target(StreamsTargetAssignment::default());
state.phase = phase;
state.dirty = false;
}
fn task_lag(m: &StreamsMemberState) -> BTreeMap<(String, i32), i64> {
let mut lag = BTreeMap::new();
for (key, &end) in &m.task_end_offsets {
if let Some(&pos) = m.task_offsets.get(key) {
lag.insert(key.clone(), end - pos);
}
}
lag
}
fn task_ids_to_map(
tasks: &[crabka_protocol::owned::common::streams_group_heartbeat_request::task_ids::TaskIds],
) -> BTreeMap<String, Vec<i32>> {
let mut map: BTreeMap<String, Vec<i32>> = BTreeMap::new();
for t in tasks {
let entry = map.entry(t.subtopology_id.clone()).or_default();
entry.extend_from_slice(&t.partitions);
}
for parts in map.values_mut() {
parts.sort_unstable();
parts.dedup();
}
map
}
fn task_offsets_to_map(
offsets: &[crabka_protocol::owned::common::streams_group_heartbeat_request::task_offset::TaskOffset],
) -> BTreeMap<(String, i32), i64> {
offsets
.iter()
.map(|o| ((o.subtopology_id.clone(), o.partition), o.offset))
.collect()
}
fn map_to_task_ids(map: &BTreeMap<String, Vec<i32>>) -> Vec<RespTaskIds> {
map.iter()
.map(|(sub, parts)| RespTaskIds {
subtopology_id: sub.clone(),
partitions: parts.clone(),
..Default::default()
})
.collect()
}
fn build_member(
member_id: &str,
req: &StreamsGroupHeartbeatRequest,
client_id: &str,
host: &str,
now: Instant,
) -> StreamsMemberState {
let mut m = StreamsMemberState::joining(member_id, client_id, host);
if let Some(pid) = &req.process_id
&& !pid.is_empty()
{
m.process_id.clone_from(pid);
}
m.rack_id.clone_from(&req.rack_id);
m.instance_id.clone_from(&req.instance_id);
m.user_endpoint = req
.user_endpoint
.as_ref()
.map(|ep| (ep.host.clone(), u32::from(ep.port)));
if let Some(tags) = &req.client_tags {
m.client_tags = tags
.iter()
.map(|kv| (kv.key.clone(), kv.value.clone()))
.collect();
}
m.rebalance_timeout_ms = req.rebalance_timeout_ms;
if let Some(topo) = &req.topology {
m.topology_epoch = topo.epoch;
}
m.last_seen = now;
m
}
fn base_resp(
error_code: i16,
member_epoch: i32,
config: &StreamsGroupConfig,
) -> StreamsGroupHeartbeatResponse {
StreamsGroupHeartbeatResponse {
error_code,
member_epoch,
heartbeat_interval_ms: duration_ms(config.heartbeat_interval, 5_000),
acceptable_recovery_lag: i32::try_from(config.acceptable_recovery_lag).unwrap_or(i32::MAX),
task_offset_interval_ms: duration_ms(config.task_offset_interval, 30_000),
..Default::default()
}
}
fn error_resp(error_code: i16, config: &StreamsGroupConfig) -> StreamsGroupHeartbeatResponse {
base_resp(error_code, 0, config)
}
fn build_assignment_resp(
state: &StreamsGroupState,
member_id: &str,
config: &StreamsGroupConfig,
) -> StreamsGroupHeartbeatResponse {
let m = state
.members
.get(member_id)
.expect("member exists at build_assignment_resp");
let status = if state.status.is_empty() {
None
} else {
Some(
state
.status
.iter()
.map(|(code, detail)| Status {
status_code: *code,
status_detail: detail.clone(),
..Default::default()
})
.collect(),
)
};
StreamsGroupHeartbeatResponse {
error_code: codes::NONE,
member_id: member_id.to_string(),
member_epoch: m.member_epoch,
heartbeat_interval_ms: duration_ms(config.heartbeat_interval, 5_000),
acceptable_recovery_lag: i32::try_from(config.acceptable_recovery_lag).unwrap_or(i32::MAX),
task_offset_interval_ms: duration_ms(config.task_offset_interval, 30_000),
status,
active_tasks: Some(map_to_task_ids(&m.active)),
standby_tasks: Some(map_to_task_ids(&m.standby)),
warmup_tasks: Some(map_to_task_ids(&m.warmup)),
..Default::default()
}
}
fn build_describe(
state: &StreamsGroupState,
topology: Option<&StreamsGroupTopologyValue>,
) -> StreamsDescribeView {
StreamsDescribeView {
group_id: state.group_id.clone(),
group_epoch: state.group_epoch,
assignment_epoch: state.target.epoch,
topology_epoch: state.topology_epoch,
group_state: state.phase.as_str().to_string(),
topology: topology.cloned(),
members: state
.members
.values()
.map(|m| StreamsDescribeMember {
member_id: m.member_id.clone(),
member_epoch: m.member_epoch,
instance_id: m.instance_id.clone(),
rack_id: m.rack_id.clone(),
client_id: m.client_id.clone(),
client_host: m.client_host.clone(),
process_id: m.process_id.clone(),
active: m.active.clone(),
standby: m.standby.clone(),
warmup: m.warmup.clone(),
})
.collect(),
}
}
fn duration_ms(d: std::time::Duration, fallback: i32) -> i32 {
i32::try_from(d.as_millis()).unwrap_or(fallback)
}
fn snapshot_pending_after_change(
actor: &ActorState,
affected_members: &[String],
) -> PendingStreamsRecords {
let state = &actor.state;
let mut pending = PendingStreamsRecords {
group_metadata: Some(StreamsGroupMetadataValue {
epoch: state.group_epoch,
}),
..Default::default()
};
if let Some(topology) = &actor.topology {
pending.topology = Some(topology.clone());
}
if let Some(pm) = &actor.partition_metadata {
pending.partition_metadata = Some(pm.clone());
}
if state.target.epoch > 0 {
pending.target_metadata = Some(StreamsGroupTargetAssignmentMetadataValue {
assignment_epoch: state.target.epoch,
});
}
for mid in affected_members {
if let Some(m) = state.members.get(mid) {
pending
.member_metadata
.push((mid.clone(), Some(member_metadata_value(m))));
pending
.current_per_member
.push((mid.clone(), Some(current_assignment_value(m))));
if let Some(tv) = target_member_value(state, mid) {
pending.target_per_member.push((mid.clone(), Some(tv)));
}
}
}
pending
}
fn member_metadata_value(m: &StreamsMemberState) -> StreamsGroupMemberMetadataValue {
StreamsGroupMemberMetadataValue {
instance_id: m.instance_id.clone(),
rack_id: m.rack_id.clone(),
client_id: m.client_id.clone(),
client_host: m.client_host.clone(),
process_id: m.process_id.clone(),
user_endpoint: m
.user_endpoint
.as_ref()
.map(|(host, port)| StreamsEndpoint {
host: host.clone(),
port: *port,
}),
client_tags: m.client_tags.clone(),
rebalance_timeout_ms: m.rebalance_timeout_ms,
topology_epoch: m.topology_epoch,
}
}
fn current_assignment_value(m: &StreamsMemberState) -> StreamsGroupCurrentMemberAssignmentValue {
StreamsGroupCurrentMemberAssignmentValue {
member_epoch: m.member_epoch,
previous_member_epoch: m.previous_member_epoch,
state: m.assignment_state.as_i8(),
active: m.active.clone(),
standby: m.standby.clone(),
warmup: m.warmup.clone(),
active_pending_revocation: m.active_pending_revocation.clone(),
}
}
fn target_member_value(
state: &StreamsGroupState,
member_id: &str,
) -> Option<StreamsGroupTargetAssignmentMemberValue> {
let active = state.target.active.get(member_id).cloned();
let standby = state.target.standby.get(member_id).cloned();
let warmup = state.target.warmup.get(member_id).cloned();
if active.is_none() && standby.is_none() && warmup.is_none() {
return None;
}
Some(StreamsGroupTargetAssignmentMemberValue {
active: active.unwrap_or_default(),
standby: standby.unwrap_or_default(),
warmup: warmup.unwrap_or_default(),
})
}
async fn flush_pending(
actor: &ActorState,
pending: PendingStreamsRecords,
offsets_log: &dyn OffsetsLog,
coordinator: &super::super::GroupCoordinator,
now_ms: i64,
) -> Result<(), crate::error::BrokerError> {
if pending.is_empty() {
return Ok(());
}
let batch = pending.into_batch(&actor.state.group_id, now_ms);
offsets_log.append(batch).await?;
coordinator.update_streams_cache(&actor.state.group_id, snapshot_seed(actor));
Ok(())
}
fn snapshot_seed(actor: &ActorState) -> super::super::StreamsGroupSeed {
let state = &actor.state;
let mut members = std::collections::HashMap::new();
let mut target_per_member = std::collections::HashMap::new();
let mut current_per_member = std::collections::HashMap::new();
for (mid, m) in &state.members {
members.insert(mid.clone(), member_metadata_value(m));
current_per_member.insert(mid.clone(), current_assignment_value(m));
if let Some(tv) = target_member_value(state, mid) {
target_per_member.insert(mid.clone(), tv);
}
}
super::super::StreamsGroupSeed {
group_epoch: state.group_epoch,
assignment_epoch: state.target.epoch,
topology: actor.topology.clone(),
partition_metadata: actor.partition_metadata.clone(),
members,
target_per_member,
current_per_member,
}
}
fn apply_seed(actor: &mut ActorState, seed: super::super::StreamsGroupSeed) {
let state = &mut actor.state;
state.group_epoch = seed.group_epoch;
state.target.epoch = seed.assignment_epoch;
state.assignment_epoch = seed.assignment_epoch;
if let Some(topology) = &seed.topology {
state.topology = Some(StoredTopologyHandle {
epoch: topology.epoch,
});
state.topology_epoch = topology.epoch;
}
actor.topology = seed.topology;
actor.partition_metadata = seed.partition_metadata;
for (mid, meta) in seed.members {
let mut m = StreamsMemberState::joining(mid.clone(), meta.client_id, meta.client_host);
m.instance_id = meta.instance_id;
m.rack_id = meta.rack_id;
m.process_id = meta.process_id;
m.user_endpoint = meta.user_endpoint.map(|ep| (ep.host, ep.port));
m.client_tags = meta.client_tags;
m.rebalance_timeout_ms = meta.rebalance_timeout_ms;
m.topology_epoch = meta.topology_epoch;
state.members.insert(mid, m);
}
for (mid, cur) in seed.current_per_member {
if let Some(m) = state.members.get_mut(&mid) {
m.member_epoch = cur.member_epoch;
m.previous_member_epoch = cur.previous_member_epoch;
m.assignment_state =
StreamsMemberAssignmentState::from_i8(cur.state).unwrap_or_default();
m.active = cur.active;
m.standby = cur.standby;
m.warmup = cur.warmup;
m.active_pending_revocation = cur.active_pending_revocation;
}
}
for (mid, tv) in seed.target_per_member {
state.target.active.insert(mid.clone(), tv.active);
state.target.standby.insert(mid.clone(), tv.standby);
state.target.warmup.insert(mid, tv.warmup);
}
state.phase = if state.members.is_empty() {
StreamsGroupStatePhase::Empty
} else if actor.topology.is_some() {
StreamsGroupStatePhase::Stable
} else {
StreamsGroupStatePhase::NotReady
};
state.dirty = false;
}
fn chrono_now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(0))
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crate::coordinator::unified::GroupCoordinator;
use crate::coordinator::unified::actor::MetadataProvider;
use crate::coordinator::unified::config::NextGenConfig;
use crate::coordinator::unified::offsets_log::fake::InMemoryOffsetsLog;
use crate::coordinator::unified::reconciler::ReconcileInput;
use crate::coordinator::unified::share::config::ShareGroupConfig;
#[derive(Debug)]
struct EmptyMetadata;
impl MetadataProvider for EmptyMetadata {
fn snapshot(&self) -> ReconcileInput {
ReconcileInput::default()
}
}
fn make_coordinator() -> (Arc<GroupCoordinator>, Arc<InMemoryOffsetsLog>) {
let log = Arc::new(InMemoryOffsetsLog::default());
let metadata: Arc<dyn MetadataProvider> = Arc::new(EmptyMetadata);
let coord = Arc::new(GroupCoordinator::new(
NextGenConfig::default(),
ShareGroupConfig::default(),
metadata,
log.clone(),
StreamsGroupConfig::default(),
));
(coord, log)
}
async fn heartbeat(
handle: &StreamsGroupActorHandle,
req: StreamsGroupHeartbeatRequest,
) -> StreamsGroupHeartbeatResponse {
let (tx, rx) = oneshot::channel();
handle
.tx
.send(StreamsGroupActorMessage::Heartbeat {
request: req,
client_id: "client".into(),
client_host: "/127.0.0.1".into(),
reply: tx,
})
.await
.unwrap();
rx.await.unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn first_join_mints_id_advances_epoch_not_ready() {
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_streams("g");
let resp = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
..Default::default()
},
)
.await;
assert!(resp.error_code == codes::NONE);
assert!(!resp.member_id.is_empty(), "server mints a member id");
assert!(resp.member_epoch == 1);
assert!(resp.active_tasks == Some(vec![]));
assert!(resp.standby_tasks == Some(vec![]));
assert!(resp.warmup_tasks == Some(vec![]));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn second_heartbeat_at_right_epoch_accepted() {
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_streams("g");
let join = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "m1".into(),
member_epoch: 0,
..Default::default()
},
)
.await;
assert!(join.error_code == codes::NONE);
let epoch = join.member_epoch;
let resp = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "m1".into(),
member_epoch: epoch,
..Default::default()
},
)
.await;
assert!(resp.error_code == codes::NONE);
assert!(resp.member_epoch == epoch);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stale_epoch_is_rejected() {
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_streams("g");
let join = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "m1".into(),
member_epoch: 0,
..Default::default()
},
)
.await;
assert!(join.member_epoch == 1);
let resp = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "m1".into(),
member_epoch: -2,
..Default::default()
},
)
.await;
assert!(resp.error_code == codes::STALE_MEMBER_EPOCH);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fenced_epoch_is_rejected() {
let (coord, _log) = make_coordinator();
let handle = coord.get_or_create_streams("g");
let join = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "m1".into(),
member_epoch: 0,
..Default::default()
},
)
.await;
assert!(join.member_epoch == 1);
let resp = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: "m1".into(),
member_epoch: 99,
..Default::default()
},
)
.await;
assert!(resp.error_code == codes::FENCED_MEMBER_EPOCH);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn leave_removes_member() {
let (coord, log) = make_coordinator();
let handle = coord.get_or_create_streams("g");
let join = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: String::new(),
member_epoch: 0,
..Default::default()
},
)
.await;
let mid = join.member_id.clone();
let pre_leave = log.batches().await.len();
let resp = heartbeat(
&handle,
StreamsGroupHeartbeatRequest {
group_id: "g".into(),
member_id: mid,
member_epoch: -1,
..Default::default()
},
)
.await;
assert!(resp.error_code == codes::NONE);
assert!(resp.member_epoch == -1);
let batches = log.batches().await;
assert!(batches.len() == pre_leave + 1);
let leave_batch = &batches[batches.len() - 1];
assert!(
leave_batch.records.iter().any(|r| r.value.is_none()),
"leave batch must contain at least one tombstone"
);
}
#[test]
fn seed_hydrates_state() {
let mut actor = ActorState::new("g".into());
let mut members = std::collections::HashMap::new();
members.insert(
"m1".to_string(),
StreamsGroupMemberMetadataValue {
instance_id: Some("i1".into()),
rack_id: Some("r1".into()),
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
process_id: "p1".into(),
user_endpoint: Some(StreamsEndpoint {
host: "h".into(),
port: 9092,
}),
client_tags: vec![],
rebalance_timeout_ms: 60_000,
topology_epoch: 2,
},
);
let mut current = std::collections::HashMap::new();
current.insert(
"m1".to_string(),
StreamsGroupCurrentMemberAssignmentValue {
member_epoch: 4,
previous_member_epoch: 3,
state: 0,
active: BTreeMap::from([("0".to_string(), vec![0, 1])]),
standby: BTreeMap::new(),
warmup: BTreeMap::new(),
active_pending_revocation: BTreeMap::new(),
},
);
let mut target = std::collections::HashMap::new();
target.insert(
"m1".to_string(),
StreamsGroupTargetAssignmentMemberValue {
active: BTreeMap::from([("0".to_string(), vec![0, 1])]),
standby: BTreeMap::new(),
warmup: BTreeMap::new(),
},
);
let seed = super::super::super::StreamsGroupSeed {
group_epoch: 4,
assignment_epoch: 4,
topology: Some(StreamsGroupTopologyValue {
epoch: 2,
subtopologies: vec![],
}),
partition_metadata: None,
members,
target_per_member: target,
current_per_member: current,
};
apply_seed(&mut actor, seed);
assert!(actor.state.group_epoch == 4);
assert!(actor.state.target.epoch == 4);
assert!(actor.state.topology_epoch == 2);
let m = actor.state.members.get("m1").expect("member restored");
assert!(m.member_epoch == 4);
assert!(m.previous_member_epoch == 3);
assert!(m.process_id == "p1");
assert!(m.active == BTreeMap::from([("0".to_string(), vec![0, 1])]));
assert!(actor.state.target.active["m1"] == BTreeMap::from([("0".to_string(), vec![0, 1])]));
assert!(actor.state.phase == StreamsGroupStatePhase::Stable);
}
}