use std::collections::HashMap;
use std::sync::Arc;
use crabka_metadata::{MetadataRecord, PartitionRecord, TopicRecord};
use crabka_protocol::records::RecordBatch;
use crabka_raft::RaftError;
use crate::broker::spawn_partition;
use crate::config::BrokerConfig;
use crate::coordinator::GroupCoordinator;
use crate::coordinator::persistence::{self, GroupMetadataValue, Key, OffsetCommitValue};
use crate::coordinator::unified::classic_state::{
Group as ClassicState, GroupState as ClassicGroupState, Member, OffsetEntry,
};
use crate::coordinator::unified::group::{Group, GroupKind};
use crate::error::BrokerError;
use crate::log_dir;
use crate::partition_registry::PartitionRegistry;
pub const OFFSETS_TOPIC: &str = "__consumer_offsets";
pub const OFFSETS_PARTITION: i32 = 0;
pub const OFFSETS_NUM_PARTITIONS: i32 = 1;
#[derive(Default)]
struct Replayed {
classic: HashMap<String, ClassicState>,
committed: HashMap<String, HashMap<(String, i32), OffsetEntry>>,
}
pub async fn bootstrap(
config: &BrokerConfig,
controller: &Arc<dyn crate::metadata_source::MetadataSource>,
partitions: &Arc<PartitionRegistry>,
coordinator: &Arc<GroupCoordinator>,
log_dir_status: &crate::log_dir_status::LogDirRegistry,
) -> Result<(), BrokerError> {
let placement_dirs = log_dir_status.online_subset(&config.all_log_dirs());
if placement_dirs.is_empty() {
return Err(BrokerError::Io(std::io::Error::other(
"every configured log.dir failed the startup writability probe; \
cannot bootstrap the group-coordinator partition",
)));
}
let topic_dir = log_dir::place_partition_dir(&placement_dirs, OFFSETS_TOPIC, OFFSETS_PARTITION);
std::fs::create_dir_all(&topic_dir)?;
let log = crabka_log::Log::open(&topic_dir, config.log_config.clone())?;
let owning_dir = topic_dir
.parent()
.expect("placed partition dir always has a parent log.dir")
.to_path_buf();
if controller.current_image().topic(OFFSETS_TOPIC).is_none() {
let am_leader = *controller.watch_leader().borrow() == Some(config.node_id);
if am_leader {
let records = vec![
MetadataRecord::V1Topic(TopicRecord {
name: OFFSETS_TOPIC.to_string(),
topic_id: uuid::Uuid::new_v4(),
partitions: 1,
replication_factor: 1,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: OFFSETS_TOPIC.to_string(),
partition: OFFSETS_PARTITION,
leader: config.node_id,
replicas: vec![config.node_id],
isr: vec![config.node_id],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
];
match controller.submit_change(records).await {
Ok(())
| Err(RaftError::Metadata(crabka_metadata::MetadataError::TopicExists(_))) => {}
Err(e) => return Err(BrokerError::Startup(e.to_string())),
}
} else {
let mut images = controller.watch_image();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
while controller.current_image().topic(OFFSETS_TOPIC).is_none() {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(BrokerError::Startup(format!(
"timed out waiting for the controller leader to register \
{OFFSETS_TOPIC} in the metadata image"
)));
}
if tokio::time::timeout(remaining, images.changed())
.await
.is_err()
{
return Err(BrokerError::Startup(format!(
"timed out waiting for the controller leader to register \
{OFFSETS_TOPIC} in the metadata image"
)));
}
}
}
}
let replayed = replay_records(&log, coordinator)?;
finalize(coordinator, replayed);
let partition = spawn_partition(
OFFSETS_TOPIC.to_string(),
OFFSETS_PARTITION,
owning_dir,
log,
log_dir_status.clone(),
);
partitions.insert(OFFSETS_TOPIC.into(), OFFSETS_PARTITION, partition);
Ok(())
}
fn replay_records(
log: &crabka_log::Log,
coordinator: &Arc<GroupCoordinator>,
) -> Result<Replayed, BrokerError> {
let mut acc = Replayed::default();
let mut next = log.log_start_offset();
let end = log.log_end_offset();
while next < end {
let out = log.read(next, 1024 * 1024)?;
if out.batches.is_empty() {
break;
}
let mut advanced_to = next;
for batch in &out.batches {
for record in &batch.records {
let Some(key_bytes) = &record.key else {
continue;
};
let key = persistence::parse_key(key_bytes)?;
match &record.value {
Some(value_bytes) => {
apply_record(coordinator, &mut acc, key, value_bytes, batch)?;
}
None => {
apply_tombstone(coordinator, key);
}
}
}
advanced_to = batch.base_offset + i64::from(batch.last_offset_delta) + 1;
}
if advanced_to <= next {
break;
}
next = advanced_to;
}
Ok(acc)
}
fn apply_record(
coordinator: &Arc<GroupCoordinator>,
acc: &mut Replayed,
key: Key,
value_bytes: &bytes::Bytes,
batch: &RecordBatch,
) -> Result<(), BrokerError> {
match key {
Key::OffsetCommit {
group_id,
topic,
partition,
} => {
let v = OffsetCommitValue::decode_value(value_bytes)?;
acc.committed.entry(group_id).or_default().insert(
(topic, partition),
OffsetEntry {
offset: v.offset,
leader_epoch: v.leader_epoch,
metadata: v.metadata,
commit_timestamp_ms: v.commit_timestamp_ms,
},
);
}
Key::GroupMetadata { group_id } => {
let v = GroupMetadataValue::decode_value(value_bytes)?;
let state = acc
.classic
.entry(group_id.clone())
.or_insert_with(|| ClassicState::new(group_id));
apply_group_metadata(state, v, batch.max_timestamp);
}
Key::NextGen(ng_key) => {
apply_next_gen_record(coordinator, ng_key, value_bytes)?;
}
Key::Share(share_key) => {
apply_share_record(coordinator, share_key, value_bytes)?;
}
Key::Streams(streams_key) => apply_streams_record(coordinator, streams_key, value_bytes)?,
}
Ok(())
}
fn apply_next_gen_record(
coordinator: &Arc<GroupCoordinator>,
key: crate::coordinator::unified::persistence_next_gen::NextGenKey,
value_bytes: &bytes::Bytes,
) -> Result<(), BrokerError> {
use crate::coordinator::unified::persistence_next_gen as ng;
match key {
ng::NextGenKey::GroupMetadata { group_id } => {
coordinator
.replay_group_metadata(&group_id, ng::GroupMetadataValue::decode(value_bytes)?);
}
ng::NextGenKey::MemberMetadata {
group_id,
member_id,
} => {
coordinator.replay_member_metadata(
&group_id,
&member_id,
ng::MemberMetadataValue::decode(value_bytes)?,
);
}
ng::NextGenKey::TargetAssignmentMetadata { group_id } => {
coordinator.replay_target_assignment_metadata(
&group_id,
ng::TargetAssignmentMetadataValue::decode(value_bytes)?,
);
}
ng::NextGenKey::TargetAssignmentMember {
group_id,
member_id,
} => {
coordinator.replay_target_assignment_member(
&group_id,
&member_id,
ng::TargetAssignmentMemberValue::decode(value_bytes)?,
);
}
ng::NextGenKey::CurrentMemberAssignment {
group_id,
member_id,
} => {
coordinator.replay_current_member_assignment(
&group_id,
&member_id,
ng::CurrentMemberAssignmentValue::decode(value_bytes)?,
);
}
}
Ok(())
}
fn apply_share_record(
coordinator: &Arc<GroupCoordinator>,
key: crate::coordinator::unified::share::persistence::ShareGroupKey,
value_bytes: &bytes::Bytes,
) -> Result<(), BrokerError> {
use crate::coordinator::unified::share::persistence as sp;
match key {
sp::ShareGroupKey::GroupMetadata { group_id } => {
coordinator.mark_share(&group_id);
coordinator.replay_share_group_metadata(
&group_id,
sp::ShareGroupMetadataValue::decode(value_bytes)?,
);
}
sp::ShareGroupKey::MemberMetadata {
group_id,
member_id,
} => {
coordinator.mark_share(&group_id);
coordinator.replay_share_member_metadata(
&group_id,
&member_id,
sp::ShareGroupMemberMetadataValue::decode(value_bytes)?,
);
}
sp::ShareGroupKey::TargetAssignmentMetadata { group_id } => {
coordinator.mark_share(&group_id);
coordinator.replay_share_target_assignment_metadata(
&group_id,
sp::ShareGroupTargetAssignmentMetadataValue::decode(value_bytes)?,
);
}
sp::ShareGroupKey::TargetAssignmentMember {
group_id,
member_id,
} => {
coordinator.mark_share(&group_id);
coordinator.replay_share_target_assignment_member(
&group_id,
&member_id,
sp::ShareGroupTargetAssignmentMemberValue::decode(value_bytes)?,
);
}
sp::ShareGroupKey::CurrentMemberAssignment {
group_id,
member_id,
} => {
coordinator.mark_share(&group_id);
coordinator.replay_share_current_member_assignment(
&group_id,
&member_id,
sp::ShareGroupCurrentMemberAssignmentValue::decode(value_bytes)?,
);
}
sp::ShareGroupKey::StatePartitionMetadata { group_id } => {
coordinator.mark_share(&group_id);
coordinator.replay_share_state_partition_metadata(
&group_id,
sp::ShareGroupStatePartitionMetadataValue::decode(value_bytes)?,
);
}
}
Ok(())
}
fn apply_streams_record(
coordinator: &Arc<GroupCoordinator>,
key: crate::coordinator::unified::streams::persistence::StreamsGroupKey,
value_bytes: &bytes::Bytes,
) -> Result<(), BrokerError> {
use crate::coordinator::unified::streams::persistence as sp;
match key {
sp::StreamsGroupKey::GroupMetadata { group_id } => {
coordinator.mark_streams(&group_id);
let v = sp::StreamsGroupMetadataValue::decode(value_bytes)?;
coordinator.replay_streams_group_metadata(&group_id, v.epoch);
}
sp::StreamsGroupKey::MemberMetadata {
group_id,
member_id,
} => {
coordinator.mark_streams(&group_id);
coordinator.replay_streams_member_metadata(
&group_id,
&member_id,
sp::StreamsGroupMemberMetadataValue::decode(value_bytes)?,
);
}
sp::StreamsGroupKey::Topology { group_id } => {
coordinator.mark_streams(&group_id);
coordinator.replay_streams_topology(
&group_id,
sp::StreamsGroupTopologyValue::decode(value_bytes)?,
);
}
sp::StreamsGroupKey::PartitionMetadata { group_id } => {
coordinator.mark_streams(&group_id);
coordinator.replay_streams_partition_metadata(
&group_id,
sp::StreamsGroupPartitionMetadataValue::decode(value_bytes)?,
);
}
sp::StreamsGroupKey::TargetAssignmentMetadata { group_id } => {
coordinator.mark_streams(&group_id);
let v = sp::StreamsGroupTargetAssignmentMetadataValue::decode(value_bytes)?;
coordinator.replay_streams_target_assignment_metadata(&group_id, v.assignment_epoch);
}
sp::StreamsGroupKey::TargetAssignmentMember {
group_id,
member_id,
} => {
coordinator.mark_streams(&group_id);
coordinator.replay_streams_target_assignment_member(
&group_id,
&member_id,
sp::StreamsGroupTargetAssignmentMemberValue::decode(value_bytes)?,
);
}
sp::StreamsGroupKey::CurrentMemberAssignment {
group_id,
member_id,
} => {
coordinator.mark_streams(&group_id);
coordinator.replay_streams_current_member_assignment(
&group_id,
&member_id,
sp::StreamsGroupCurrentMemberAssignmentValue::decode(value_bytes)?,
);
}
}
Ok(())
}
fn apply_tombstone(coordinator: &Arc<GroupCoordinator>, key: Key) {
match key {
Key::NextGen(ng_key) => coordinator.replay_next_gen_tombstone(&ng_key),
Key::Share(share_key) => coordinator.replay_share_tombstone(&share_key),
Key::Streams(streams_key) => coordinator.replay_streams_tombstone(&streams_key),
Key::OffsetCommit { .. } | Key::GroupMetadata { .. } => {}
}
}
fn finalize(coordinator: &Arc<GroupCoordinator>, mut replayed: Replayed) {
let next_gen_ids: std::collections::HashSet<String> =
coordinator.seeds.iter().map(|e| e.key().clone()).collect();
coordinator.finalize_bootstrap();
let committed_groups: Vec<String> = replayed.committed.keys().cloned().collect();
for gid in committed_groups {
if next_gen_ids.contains(&gid)
&& let Some(offsets) = replayed.committed.remove(&gid)
&& let Some(handle) = coordinator.find(&gid)
{
let entries: Vec<((String, i32), OffsetEntry)> = offsets.into_iter().collect();
let (tx, _rx) = tokio::sync::oneshot::channel();
let _ = handle.tx.try_send(
crate::coordinator::unified::actor::GroupActorMessage::UpdateCommitted {
entries,
reply: tx,
},
);
}
}
let classic_ids: std::collections::HashSet<String> = replayed
.classic
.keys()
.cloned()
.chain(replayed.committed.keys().cloned())
.filter(|gid| !next_gen_ids.contains(gid))
.collect();
for gid in classic_ids {
let state = replayed
.classic
.remove(&gid)
.unwrap_or_else(|| ClassicState::new(gid.clone()));
let committed_offsets = replayed.committed.remove(&gid).unwrap_or_default();
let group = Box::new(Group {
group_id: gid.clone(),
kind: GroupKind::Classic(state),
committed_offsets,
});
coordinator.seed_classic(&gid, group);
}
}
fn apply_group_metadata(g: &mut ClassicState, v: GroupMetadataValue, replay_timestamp_ms: i64) {
g.protocol_type = Some(v.protocol_type);
g.generation_id = v.generation;
g.leader_id = v.leader;
g.protocol_name = v.protocol_name;
g.members.clear();
g.static_members.clear();
for m in v.members {
let session_timeout = std::time::Duration::from_millis(
u64::try_from(m.session_timeout_ms.max(0)).unwrap_or(30_000),
);
let rebalance_timeout = std::time::Duration::from_millis(
u64::try_from(m.rebalance_timeout_ms.max(0)).unwrap_or(60_000),
);
let mut member = Member::new(
m.member_id.clone(),
m.client_id,
m.client_host,
session_timeout,
rebalance_timeout,
Vec::new(),
)
.with_instance_id(m.group_instance_id.clone());
member.protocol_metadata = m.subscription;
member.assignment = Some(m.assignment);
if let Some(iid) = m.group_instance_id {
g.static_members.insert(iid, m.member_id.clone());
}
g.members.insert(m.member_id, member);
}
g.state = if g.members.is_empty() {
ClassicGroupState::Empty
} else {
ClassicGroupState::Stable
};
let _ = replay_timestamp_ms; }
#[cfg(test)]
mod tests {
use super::*;
use crate::config::BrokerConfig;
use assert2::assert;
use crabka_raft::ControllerHandle;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::tempdir;
async fn controller_with_leader(log_dir: std::path::PathBuf) -> Arc<ControllerHandle> {
let cfg = crabka_raft::ControllerConfig {
election_timeout: Duration::from_millis(200),
heartbeat_interval: Duration::from_millis(50),
client_id: "test".into(),
..crabka_raft::ControllerConfig::for_tests(1, log_dir)
};
let handle = Arc::new(crabka_raft::Controller::start(cfg).await.unwrap());
let mut rx = handle.watch_leader();
let deadline = Instant::now() + Duration::from_secs(5);
while rx.borrow().is_none() {
assert!(Instant::now() < deadline, "no leader elected in 5s");
let _ = tokio::time::timeout(Duration::from_millis(100), rx.changed()).await;
}
handle
}
#[tokio::test]
async fn share_group_records_replay_into_seed() {
use crate::coordinator::unified::GroupCoordinator;
use crate::coordinator::unified::offsets_log::fake::InMemoryOffsetsLog;
use crate::coordinator::unified::reconciler::ReconcileInput;
use crate::coordinator::unified::share::persistence as sp;
use crabka_protocol::primitives::uuid::Uuid;
#[derive(Debug)]
struct EmptyMeta;
impl crate::coordinator::unified::actor::MetadataProvider for EmptyMeta {
fn snapshot(&self) -> ReconcileInput {
ReconcileInput::default()
}
}
let coord = Arc::new(GroupCoordinator::new(
crate::coordinator::unified::config::NextGenConfig::default(),
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
Arc::new(EmptyMeta),
Arc::new(InMemoryOffsetsLog::default()),
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
));
let tid = Uuid([9; 16]);
let recs: Vec<(bytes::Bytes, bytes::Bytes)> = vec![
(
sp::encode_share_key(&sp::ShareGroupKey::GroupMetadata {
group_id: "sg".into(),
}),
sp::ShareGroupMetadataValue { epoch: 4 }.encode(),
),
(
sp::encode_share_key(&sp::ShareGroupKey::MemberMetadata {
group_id: "sg".into(),
member_id: "m1".into(),
}),
sp::ShareGroupMemberMetadataValue {
rack_id: None,
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
subscribed_topic_names: vec!["t".into()],
}
.encode(),
),
(
sp::encode_share_key(&sp::ShareGroupKey::CurrentMemberAssignment {
group_id: "sg".into(),
member_id: "m1".into(),
}),
sp::ShareGroupCurrentMemberAssignmentValue {
member_epoch: 4,
assigned_partitions: vec![(tid, vec![0, 1])],
}
.encode(),
),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in recs {
let key = persistence::parse_key(&k).unwrap();
apply_record(&coord, &mut acc, key, &v, &batch).unwrap();
}
assert!(coord.group_type("sg") == Some(crate::coordinator::unified::GroupType::Share));
let seed = coord.cached_share_seed("sg").expect("seed cached");
assert!(seed.group_epoch == 4);
assert!(seed.members.contains_key("m1"));
assert!(seed.current_per_member["m1"].member_epoch == 4);
let tomb_key =
persistence::parse_key(&sp::encode_share_key(&sp::ShareGroupKey::MemberMetadata {
group_id: "sg".into(),
member_id: "m1".into(),
}))
.unwrap();
apply_tombstone(&coord, tomb_key);
let seed = coord.cached_share_seed("sg").expect("seed still present");
assert!(!seed.members.contains_key("m1"), "tombstone removed member");
}
#[tokio::test]
async fn streams_group_records_replay_into_seed() {
use crate::coordinator::unified::GroupCoordinator;
use crate::coordinator::unified::offsets_log::fake::InMemoryOffsetsLog;
use crate::coordinator::unified::reconciler::ReconcileInput;
use crate::coordinator::unified::streams::persistence as sp;
use std::collections::BTreeMap;
#[derive(Debug)]
struct EmptyMeta;
impl crate::coordinator::unified::actor::MetadataProvider for EmptyMeta {
fn snapshot(&self) -> ReconcileInput {
ReconcileInput::default()
}
}
let coord = Arc::new(GroupCoordinator::new(
crate::coordinator::unified::config::NextGenConfig::default(),
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
Arc::new(EmptyMeta),
Arc::new(InMemoryOffsetsLog::default()),
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
));
let recs: Vec<(bytes::Bytes, bytes::Bytes)> = vec![
(
sp::encode_streams_key(&sp::StreamsGroupKey::GroupMetadata {
group_id: "stg".into(),
}),
sp::StreamsGroupMetadataValue { epoch: 7 }.encode(),
),
(
sp::encode_streams_key(&sp::StreamsGroupKey::MemberMetadata {
group_id: "stg".into(),
member_id: "m1".into(),
}),
sp::StreamsGroupMemberMetadataValue {
instance_id: None,
rack_id: None,
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
process_id: "p1".into(),
user_endpoint: None,
client_tags: vec![],
rebalance_timeout_ms: 60_000,
topology_epoch: 2,
}
.encode(),
),
(
sp::encode_streams_key(&sp::StreamsGroupKey::CurrentMemberAssignment {
group_id: "stg".into(),
member_id: "m1".into(),
}),
sp::StreamsGroupCurrentMemberAssignmentValue {
member_epoch: 7,
previous_member_epoch: 6,
state: 0,
active: BTreeMap::from([("0".to_string(), vec![0, 1])]),
standby: BTreeMap::new(),
warmup: BTreeMap::new(),
active_pending_revocation: BTreeMap::new(),
}
.encode(),
),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in recs {
let key = persistence::parse_key(&k).unwrap();
apply_record(&coord, &mut acc, key, &v, &batch).unwrap();
}
assert!(coord.group_type("stg") == Some(crate::coordinator::unified::GroupType::Streams));
let seed = coord.cached_streams_seed("stg").expect("seed cached");
assert!(seed.group_epoch == 7);
assert!(seed.members.contains_key("m1"));
assert!(seed.current_per_member["m1"].member_epoch == 7);
let tomb_key = persistence::parse_key(&sp::encode_streams_key(
&sp::StreamsGroupKey::MemberMetadata {
group_id: "stg".into(),
member_id: "m1".into(),
},
))
.unwrap();
apply_tombstone(&coord, tomb_key);
let seed = coord
.cached_streams_seed("stg")
.expect("seed still present");
assert!(!seed.members.contains_key("m1"), "tombstone removed member");
}
fn test_coordinator(
controller: &Arc<dyn crate::metadata_source::MetadataSource>,
partitions: &Arc<PartitionRegistry>,
) -> Arc<GroupCoordinator> {
let offsets_log: Arc<dyn crate::coordinator::unified::offsets_log::OffsetsLog> = Arc::new(
crate::coordinator::unified::offsets_log::ProductionOffsetsLog::new(partitions.clone()),
);
Arc::new(GroupCoordinator::new(
crate::coordinator::unified::config::NextGenConfig::default(),
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
Arc::new(crate::coordinator::unified::ImageMetadataProvider {
controller: controller.clone(),
}),
offsets_log,
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
))
}
#[tokio::test]
async fn bootstrap_creates_topic_dir() {
let dir = tempdir().unwrap();
let config = BrokerConfig::for_tests(dir.path().to_path_buf());
let controller: Arc<dyn crate::metadata_source::MetadataSource> =
controller_with_leader(dir.path().join("__cluster_metadata_test")).await;
let partitions: Arc<PartitionRegistry> = Arc::new(PartitionRegistry::new());
let coordinator = test_coordinator(&controller, &partitions);
let log_dir_status = crate::log_dir_status::LogDirRegistry::probe(&config.all_log_dirs());
bootstrap(
&config,
&controller,
&partitions,
&coordinator,
&log_dir_status,
)
.await
.unwrap();
let topic_dir = log_dir::partition_dir(&config.log_dir, OFFSETS_TOPIC, OFFSETS_PARTITION);
assert!(topic_dir.exists());
assert!(partitions.contains(OFFSETS_TOPIC, OFFSETS_PARTITION));
assert!(controller.current_image().topic(OFFSETS_TOPIC).is_some());
}
#[tokio::test]
async fn second_bootstrap_does_not_duplicate_offsets_topic() {
let dir = tempdir().unwrap();
let config = BrokerConfig::for_tests(dir.path().to_path_buf());
let controller: Arc<dyn crate::metadata_source::MetadataSource> =
controller_with_leader(dir.path().join("__cluster_metadata_test")).await;
let partitions: Arc<PartitionRegistry> = Arc::new(PartitionRegistry::new());
let coordinator = test_coordinator(&controller, &partitions);
let log_dir_status = crate::log_dir_status::LogDirRegistry::probe(&config.all_log_dirs());
bootstrap(
&config,
&controller,
&partitions,
&coordinator,
&log_dir_status,
)
.await
.unwrap();
let id_after_first = controller
.current_image()
.topic(OFFSETS_TOPIC)
.expect("offsets topic registered on first boot")
.topic_id;
bootstrap(
&config,
&controller,
&partitions,
&coordinator,
&log_dir_status,
)
.await
.unwrap();
let image = controller.current_image();
let count = image.topics().filter(|t| t.name == OFFSETS_TOPIC).count();
assert!(
count == 1,
"expected exactly one __consumer_offsets, got {count}"
);
assert!(
image.topic(OFFSETS_TOPIC).unwrap().topic_id == id_after_first,
"topic_id changed across boots — a duplicate TopicRecord was submitted"
);
}
#[test]
fn apply_group_metadata_rebuilds_members_and_state() {
use crate::coordinator::persistence::MemberMetadata;
use bytes::Bytes;
let mut g = ClassicState::new("g");
let v = GroupMetadataValue {
protocol_type: "consumer".into(),
generation: 5,
protocol_name: Some("range".into()),
leader: Some("m1".into()),
current_state_timestamp_ms: 0,
members: vec![MemberMetadata {
member_id: "m1".into(),
group_instance_id: Some("inst".into()),
client_id: "c".into(),
client_host: "h".into(),
rebalance_timeout_ms: 60_000,
session_timeout_ms: 30_000,
subscription: Bytes::new(),
assignment: Bytes::from_static(b"asn"),
}],
};
apply_group_metadata(&mut g, v, 0);
assert!(g.generation_id == 5);
assert!(g.protocol_type.as_deref() == Some("consumer"));
assert!(g.leader_id.as_deref() == Some("m1"));
assert!(g.state == ClassicGroupState::Stable);
assert!(g.members.contains_key("m1"));
assert!(g.members["m1"].assignment.as_deref() == Some(b"asn" as &[u8]));
assert!(g.current_member_id_for_instance("inst") == Some("m1"));
let mut empty = ClassicState::new("g2");
apply_group_metadata(
&mut empty,
GroupMetadataValue {
protocol_type: "consumer".into(),
generation: 0,
protocol_name: None,
leader: None,
current_state_timestamp_ms: 0,
members: vec![],
},
0,
);
assert!(empty.state == ClassicGroupState::Empty);
}
fn bare_coordinator() -> Arc<GroupCoordinator> {
use crate::coordinator::unified::offsets_log::fake::InMemoryOffsetsLog;
use crate::coordinator::unified::reconciler::ReconcileInput;
#[derive(Debug)]
struct EmptyMeta;
impl crate::coordinator::unified::actor::MetadataProvider for EmptyMeta {
fn snapshot(&self) -> ReconcileInput {
ReconcileInput::default()
}
}
Arc::new(GroupCoordinator::new(
crate::coordinator::unified::config::NextGenConfig::default(),
crate::coordinator::unified::share::config::ShareGroupConfig::default(),
Arc::new(EmptyMeta),
Arc::new(InMemoryOffsetsLog::default()),
crate::coordinator::unified::streams::config::StreamsGroupConfig::default(),
))
}
fn classic_group_record(group_id: &str, member_id: &str) -> (bytes::Bytes, bytes::Bytes) {
use crate::coordinator::persistence::MemberMetadata;
let key = GroupMetadataValue::encode_key(group_id);
let value = GroupMetadataValue {
protocol_type: "consumer".into(),
generation: 3,
protocol_name: Some("range".into()),
leader: Some(member_id.into()),
current_state_timestamp_ms: 0,
members: vec![MemberMetadata {
member_id: member_id.into(),
group_instance_id: None,
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
rebalance_timeout_ms: 60_000,
session_timeout_ms: 30_000,
subscription: bytes::Bytes::new(),
assignment: bytes::Bytes::from_static(b"asn"),
}],
}
.encode_value();
(key, value)
}
#[tokio::test]
async fn downgraded_group_replays_as_classic() {
use crate::coordinator::unified::persistence_next_gen as ng;
use crate::coordinator::unified::{GroupType, persistence_next_gen};
let coord = bare_coordinator();
let ng_group_key = |gid: &str| {
ng::encode_key(&ng::NextGenKey::GroupMetadata {
group_id: gid.into(),
})
};
let ng_member_key = |gid: &str, mid: &str| {
ng::encode_key(&ng::NextGenKey::MemberMetadata {
group_id: gid.into(),
member_id: mid.into(),
})
};
let (k2_key, k2_val) = classic_group_record("g", "m1");
let (k2_key2, k2_val2) = classic_group_record("g", "m1");
let stream: Vec<(bytes::Bytes, Option<bytes::Bytes>)> = vec![
(k2_key, Some(k2_val)),
(GroupMetadataValue::encode_key("g"), None),
(
ng_group_key("g"),
Some(persistence_next_gen::GroupMetadataValue { epoch: 1 }.encode()),
),
(
ng_member_key("g", "m1"),
Some(
persistence_next_gen::MemberMetadataValue {
instance_id: None,
rack_id: None,
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
subscribed_topic_names: vec!["t".into()],
subscribed_topic_regex: None,
server_assignor: None,
rebalance_timeout_ms: 60_000,
classic: None,
}
.encode(),
),
),
(ng_group_key("g"), None),
(ng_member_key("g", "m1"), None),
(k2_key2, Some(k2_val2)),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in stream {
let key = persistence::parse_key(&k).unwrap();
match v {
Some(value) => apply_record(&coord, &mut acc, key, &value, &batch).unwrap(),
None => apply_tombstone(&coord, key),
}
}
finalize(&coord, acc);
assert!(coord.group_type("g") != Some(GroupType::NextGen));
let snap = coord
.describe_group("g")
.await
.expect("classic group present");
assert!(snap.members.iter().any(|m| m.member_id == "m1"));
assert!(
coord.find("g").is_some_and(
|h| h.kind == crate::coordinator::unified::actor::GroupKindTag::Classic
)
);
}
#[tokio::test]
async fn compacted_downgrade_residue_replays_as_classic() {
use crate::coordinator::unified::GroupType;
use crate::coordinator::unified::persistence_next_gen as ng;
let coord = bare_coordinator();
let (k2_key, k2_val) = classic_group_record("g", "m1");
let stream: Vec<(bytes::Bytes, Option<bytes::Bytes>)> = vec![
(
ng::encode_key(&ng::NextGenKey::TargetAssignmentMetadata {
group_id: "g".into(),
}),
None,
),
(k2_key, Some(k2_val)),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in stream {
let key = persistence::parse_key(&k).unwrap();
match v {
Some(value) => apply_record(&coord, &mut acc, key, &value, &batch).unwrap(),
None => apply_tombstone(&coord, key),
}
}
finalize(&coord, acc);
assert!(coord.group_type("g") != Some(GroupType::NextGen));
let snap = coord
.describe_group("g")
.await
.expect("classic group present");
assert!(snap.members.iter().any(|m| m.member_id == "m1"));
assert!(
coord.find("g").is_some_and(
|h| h.kind == crate::coordinator::unified::actor::GroupKindTag::Classic
)
);
}
#[tokio::test]
async fn surviving_k6_write_resurrects_as_next_gen_without_fix() {
use crate::coordinator::unified::persistence_next_gen as ng;
let coord = bare_coordinator();
let (k2_key, k2_val) = classic_group_record("g", "m1");
let stream: Vec<(bytes::Bytes, Option<bytes::Bytes>)> = vec![
(
ng::encode_key(&ng::NextGenKey::TargetAssignmentMetadata {
group_id: "g".into(),
}),
Some(
ng::TargetAssignmentMetadataValue {
assignment_epoch: 1,
}
.encode(),
),
),
(k2_key, Some(k2_val)),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in stream {
let key = persistence::parse_key(&k).unwrap();
match v {
Some(value) => apply_record(&coord, &mut acc, key, &value, &batch).unwrap(),
None => apply_tombstone(&coord, key),
}
}
assert!(coord.seeds.contains_key("g"));
finalize(&coord, acc);
assert!(
coord.find("g").is_some_and(
|h| h.kind == crate::coordinator::unified::actor::GroupKindTag::Consumer
)
);
}
#[tokio::test]
async fn upgraded_group_without_tombstone_replays_as_consumer() {
use crate::coordinator::unified::persistence_next_gen as ng;
use crate::coordinator::unified::{GroupType, persistence_next_gen};
let coord = bare_coordinator();
let stream: Vec<(bytes::Bytes, bytes::Bytes)> = vec![
(
ng::encode_key(&ng::NextGenKey::GroupMetadata {
group_id: "g".into(),
}),
persistence_next_gen::GroupMetadataValue { epoch: 1 }.encode(),
),
(
ng::encode_key(&ng::NextGenKey::MemberMetadata {
group_id: "g".into(),
member_id: "m1".into(),
}),
persistence_next_gen::MemberMetadataValue {
instance_id: None,
rack_id: None,
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
subscribed_topic_names: vec!["t".into()],
subscribed_topic_regex: None,
server_assignor: None,
rebalance_timeout_ms: 60_000,
classic: None,
}
.encode(),
),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in stream {
let key = persistence::parse_key(&k).unwrap();
apply_record(&coord, &mut acc, key, &v, &batch).unwrap();
}
finalize(&coord, acc);
assert!(coord.group_type("g") != Some(GroupType::Classic));
let handle = coord.find("g").expect("consumer actor present");
assert!(handle.kind == crate::coordinator::unified::actor::GroupKindTag::Consumer);
}
#[tokio::test]
async fn member_with_classic_block_replays_facade() {
use crate::coordinator::unified::actor::GroupActorMessage;
use crate::coordinator::unified::actor::GroupKindTag;
use crate::coordinator::unified::persistence_next_gen as ng;
use crate::coordinator::unified::persistence_next_gen;
use tokio::sync::oneshot;
let coord = bare_coordinator();
let stream: Vec<(bytes::Bytes, bytes::Bytes)> = vec![
(
ng::encode_key(&ng::NextGenKey::GroupMetadata {
group_id: "g".into(),
}),
persistence_next_gen::GroupMetadataValue { epoch: 2 }.encode(),
),
(
ng::encode_key(&ng::NextGenKey::MemberMetadata {
group_id: "g".into(),
member_id: "m1".into(),
}),
persistence_next_gen::MemberMetadataValue {
instance_id: None,
rack_id: None,
client_id: "c1".into(),
client_host: "/127.0.0.1".into(),
subscribed_topic_names: vec!["t".into()],
subscribed_topic_regex: None,
server_assignor: None,
rebalance_timeout_ms: 60_000,
classic: Some(persistence_next_gen::ClassicMemberMetadata {
session_timeout_ms: 30_000,
supported_protocols: vec![(
"range".into(),
bytes::Bytes::from_static(b"meta"),
)],
last_synced_assignment: bytes::Bytes::from_static(b"asn"),
}),
}
.encode(),
),
];
let batch = RecordBatch::default();
let mut acc = Replayed::default();
for (k, v) in stream {
let key = persistence::parse_key(&k).unwrap();
apply_record(&coord, &mut acc, key, &v, &batch).unwrap();
}
finalize(&coord, acc);
let handle = coord.find("g").expect("consumer actor present");
assert!(handle.kind == GroupKindTag::Consumer);
let (tx, rx) = oneshot::channel();
handle
.tx
.send(GroupActorMessage::Describe { reply: tx })
.await
.unwrap();
let view = rx.await.unwrap();
let m1 = view
.members
.iter()
.find(|m| m.member_id == "m1")
.expect("member m1 present");
assert!(m1.is_classic, "facade reconstructed from k5 classic block");
}
}