pub mod actor;
pub mod assignor;
pub(crate) mod classic_ops;
pub(crate) mod classic_state;
pub mod config;
pub(crate) mod consumer_state;
pub(crate) mod group;
pub(crate) mod migration;
pub mod offsets_log;
pub(crate) mod persistence;
pub mod persistence_next_gen;
pub mod reconciler;
pub mod share;
pub mod streams;
use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::oneshot;
use actor::{GroupActorHandle, GroupActorMessage, GroupKindTag, MetadataProvider};
use config::NextGenConfig;
use group::Group;
use offsets_log::OffsetsLog;
use share::actor::{ShareGroupActorHandle, ShareGroupActorMessage};
use share::config::ShareGroupConfig;
use streams::actor::{StreamsGroupActorHandle, StreamsGroupActorMessage};
use streams::config::StreamsGroupConfig;
use crate::coordinator::{DeleteGroupError, GroupSnapshot};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupType {
Classic,
NextGen,
Share,
Streams,
}
#[derive(Debug)]
pub struct GroupCoordinator {
pub config: Arc<NextGenConfig>,
pub share_config: Arc<ShareGroupConfig>,
pub metadata: Arc<dyn MetadataProvider>,
pub offsets_log: Arc<dyn OffsetsLog>,
pub groups: Arc<DashMap<String, Arc<GroupActorHandle>>>,
pub share_groups: Arc<DashMap<String, Arc<ShareGroupActorHandle>>>,
pub group_types: Arc<DashMap<String, GroupType>>,
pub seeds: Arc<DashMap<String, GroupSeed>>,
pub share_seeds: Arc<DashMap<String, ShareGroupSeed>>,
pub seeds_cache: Arc<DashMap<String, GroupSeed>>,
pub share_seeds_cache: Arc<DashMap<String, ShareGroupSeed>>,
pub(crate) share_persister:
std::sync::OnceLock<Arc<crate::share_coordinator::persister_client::SharePersister>>,
pub streams_config: Arc<StreamsGroupConfig>,
pub streams_groups: Arc<DashMap<String, Arc<StreamsGroupActorHandle>>>,
pub streams_seeds: Arc<DashMap<String, StreamsGroupSeed>>,
pub streams_seeds_cache: Arc<DashMap<String, StreamsGroupSeed>>,
pub(crate) metadata_source: std::sync::OnceLock<MetadataSourceHandle>,
}
#[derive(Clone)]
pub(crate) struct MetadataSourceHandle(pub(crate) Arc<dyn crate::metadata_source::MetadataSource>);
impl std::fmt::Debug for MetadataSourceHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetadataSourceHandle")
.finish_non_exhaustive()
}
}
impl GroupCoordinator {
pub fn new(
config: NextGenConfig,
share_config: ShareGroupConfig,
metadata: Arc<dyn MetadataProvider>,
offsets_log: Arc<dyn OffsetsLog>,
streams_config: StreamsGroupConfig,
) -> Self {
Self {
config: Arc::new(config),
share_config: Arc::new(share_config),
metadata,
offsets_log,
groups: Arc::new(DashMap::new()),
share_groups: Arc::new(DashMap::new()),
group_types: Arc::new(DashMap::new()),
seeds: Arc::new(DashMap::new()),
share_seeds: Arc::new(DashMap::new()),
seeds_cache: Arc::new(DashMap::new()),
share_seeds_cache: Arc::new(DashMap::new()),
share_persister: std::sync::OnceLock::new(),
streams_config: Arc::new(streams_config),
streams_groups: Arc::new(DashMap::new()),
streams_seeds: Arc::new(DashMap::new()),
streams_seeds_cache: Arc::new(DashMap::new()),
metadata_source: std::sync::OnceLock::new(),
}
}
pub(crate) fn set_share_persister(
&self,
persister: Arc<crate::share_coordinator::persister_client::SharePersister>,
) {
let _ = self.share_persister.set(persister);
}
#[must_use]
pub(crate) fn share_persister(
&self,
) -> Option<&Arc<crate::share_coordinator::persister_client::SharePersister>> {
self.share_persister.get()
}
pub(crate) fn set_metadata_source(&self, src: Arc<dyn crate::metadata_source::MetadataSource>) {
let _ = self.metadata_source.set(MetadataSourceHandle(src));
}
#[must_use]
pub(crate) fn metadata_source(
&self,
) -> Option<Arc<dyn crate::metadata_source::MetadataSource>> {
self.metadata_source.get().map(|h| h.0.clone())
}
pub fn update_cache(&self, group_id: &str, seed: GroupSeed) {
self.seeds_cache.insert(group_id.into(), seed);
}
#[must_use]
pub fn cached_seed(&self, group_id: &str) -> Option<GroupSeed> {
self.seeds_cache.get(group_id).map(|e| e.value().clone())
}
#[must_use]
pub fn group_type(&self, group_id: &str) -> Option<GroupType> {
self.group_types.get(group_id).map(|e| *e.value())
}
pub fn mark_classic(&self, group_id: &str) {
self.group_types
.entry(group_id.into())
.or_insert(GroupType::Classic);
}
pub fn mark_classic_after_downgrade(&self, group_id: &str) {
self.seeds.remove(group_id);
self.seeds_cache.remove(group_id);
self.group_types.insert(group_id.into(), GroupType::Classic);
}
pub fn mark_streams_after_upgrade(&self, group_id: &str) {
self.seeds.remove(group_id);
self.seeds_cache.remove(group_id);
self.group_types.insert(group_id.into(), GroupType::Streams);
}
pub fn mark_next_gen(&self, group_id: &str) {
self.group_types
.entry(group_id.into())
.or_insert(GroupType::NextGen);
}
pub fn mark_share(&self, group_id: &str) {
self.group_types
.entry(group_id.into())
.or_insert(GroupType::Share);
}
pub fn mark_streams(&self, group_id: &str) {
self.group_types
.entry(group_id.into())
.or_insert(GroupType::Streams);
}
pub fn update_share_cache(&self, group_id: &str, seed: ShareGroupSeed) {
self.share_seeds_cache.insert(group_id.into(), seed);
}
#[must_use]
pub fn cached_share_seed(&self, group_id: &str) -> Option<ShareGroupSeed> {
self.share_seeds_cache
.get(group_id)
.map(|e| e.value().clone())
}
pub fn update_streams_cache(&self, group_id: &str, seed: StreamsGroupSeed) {
self.streams_seeds_cache.insert(group_id.into(), seed);
}
#[must_use]
pub fn cached_streams_seed(&self, group_id: &str) -> Option<StreamsGroupSeed> {
self.streams_seeds_cache
.get(group_id)
.map(|e| e.value().clone())
}
#[must_use]
pub fn get_or_create_group(
self: &Arc<Self>,
group_id: &str,
initial_kind: GroupKindTag,
) -> Arc<GroupActorHandle> {
if let Some(h) = self.groups.get(group_id) {
if !h.value().tx.is_closed() {
return h.value().clone();
}
drop(h);
self.groups.remove(group_id);
}
let h = Arc::new(GroupActorHandle::spawn(
group_id.into(),
initial_kind,
self.config.clone(),
self.metadata.clone(),
self.offsets_log.clone(),
self.clone(),
));
let inserted = self
.groups
.entry(group_id.into())
.or_insert(h)
.value()
.clone();
if initial_kind == GroupKindTag::Consumer
&& let Some(seed) = self.cached_seed(group_id)
{
let _ = inserted.tx.try_send(GroupActorMessage::Seed(seed));
}
inserted
}
#[must_use]
pub fn get_or_create_classic(self: &Arc<Self>, group_id: &str) -> Arc<GroupActorHandle> {
self.get_or_create_group(group_id, GroupKindTag::Classic)
}
#[must_use]
pub fn get_or_create_consumer(self: &Arc<Self>, group_id: &str) -> Arc<GroupActorHandle> {
self.get_or_create_group(group_id, GroupKindTag::Consumer)
}
#[must_use]
pub fn find(&self, group_id: &str) -> Option<Arc<GroupActorHandle>> {
self.groups.get(group_id).map(|e| e.value().clone())
}
#[must_use]
pub fn get_or_create_share(self: &Arc<Self>, group_id: &str) -> Arc<ShareGroupActorHandle> {
if let Some(h) = self.share_groups.get(group_id) {
if !h.value().tx.is_closed() {
return h.value().clone();
}
drop(h);
self.share_groups.remove(group_id);
}
let h = Arc::new(ShareGroupActorHandle::spawn(
group_id.into(),
self.share_config.clone(),
self.metadata.clone(),
self.offsets_log.clone(),
self.clone(),
));
let inserted = self
.share_groups
.entry(group_id.into())
.or_insert(h)
.value()
.clone();
if let Some(seed) = self.cached_share_seed(group_id) {
let _ = inserted.tx.try_send(ShareGroupActorMessage::Seed(seed));
}
inserted
}
#[must_use]
pub fn find_share(&self, group_id: &str) -> Option<Arc<ShareGroupActorHandle>> {
self.share_groups.get(group_id).map(|e| e.value().clone())
}
#[must_use]
pub fn share_group_ids(&self) -> Vec<String> {
self.share_groups.iter().map(|e| e.key().clone()).collect()
}
#[must_use]
pub fn get_or_create_streams(self: &Arc<Self>, group_id: &str) -> Arc<StreamsGroupActorHandle> {
if let Some(h) = self.streams_groups.get(group_id) {
if !h.value().tx.is_closed() {
return h.value().clone();
}
drop(h);
self.streams_groups.remove(group_id);
}
let h = Arc::new(StreamsGroupActorHandle::spawn(
group_id.into(),
self.streams_config.clone(),
self.offsets_log.clone(),
self.metadata_source(),
self.clone(),
));
let inserted = self
.streams_groups
.entry(group_id.into())
.or_insert(h)
.value()
.clone();
if let Some(seed) = self.cached_streams_seed(group_id) {
let _ = inserted.tx.try_send(StreamsGroupActorMessage::Seed(seed));
}
inserted
}
#[must_use]
pub fn find_streams(&self, group_id: &str) -> Option<Arc<StreamsGroupActorHandle>> {
self.streams_groups.get(group_id).map(|e| e.value().clone())
}
pub(crate) async fn try_convert_classic_to_streams(
self: &Arc<Self>,
group_id: &str,
now_ms: i64,
) -> Result<streams::migration::ConvertOutcome, crate::error::BrokerError> {
use streams::migration::{ConvertOutcome, classic_group_metadata_tombstone_batch};
if self.group_type(group_id) != Some(GroupType::Classic) {
return Ok(ConvertOutcome::NotClassic);
}
if let Some(handle) = self.find(group_id) {
let (tx, rx) = tokio::sync::oneshot::channel();
if handle
.tx
.send(GroupActorMessage::ClassicInspect { reply: tx })
.await
.is_ok()
&& let Ok(view) = rx.await
&& !view.members.is_empty()
{
return Ok(ConvertOutcome::RejectLiveMembers);
}
}
let batch = classic_group_metadata_tombstone_batch(group_id, now_ms);
self.offsets_log.append(batch).await?;
self.mark_streams_after_upgrade(group_id);
Ok(ConvertOutcome::Converted)
}
#[must_use]
pub fn streams_group_ids(&self) -> Vec<String> {
self.streams_groups
.iter()
.map(|e| e.key().clone())
.collect()
}
pub fn consumer_group_ids(&self) -> Vec<String> {
self.groups.iter().map(|e| e.key().clone()).collect()
}
pub fn seed_classic(self: &Arc<Self>, group_id: &str, group: Box<Group>) {
let handle = self.get_or_create_classic(group_id);
let _ = handle.tx.try_send(GroupActorMessage::ClassicSeed(group));
}
pub async fn list_groups(&self) -> Vec<GroupSnapshot> {
let handles: Vec<Arc<GroupActorHandle>> =
self.groups.iter().map(|e| e.value().clone()).collect();
let mut out = Vec::with_capacity(handles.len());
for h in handles {
let (tx, rx) = oneshot::channel();
if h.tx
.send(GroupActorMessage::ClassicInspect { reply: tx })
.await
.is_ok()
&& let Ok(view) = rx.await
{
out.push(view.snapshot());
}
}
out
}
pub async fn describe_group(&self, group_id: &str) -> Option<GroupSnapshot> {
let handle = self.find(group_id)?;
let (tx, rx) = oneshot::channel();
handle
.tx
.send(GroupActorMessage::InspectAny { reply: tx })
.await
.ok()?;
rx.await.ok()
}
pub async fn delete_group(&self, group_id: &str) -> Result<(), DeleteGroupError> {
let handle = self.find(group_id).ok_or(DeleteGroupError::NotFound)?;
let (tx, rx) = oneshot::channel();
handle
.tx
.send(GroupActorMessage::ClassicInspect { reply: tx })
.await
.map_err(|_| DeleteGroupError::NotFound)?;
let view = rx.await.map_err(|_| DeleteGroupError::NotFound)?;
if !view.members.is_empty() {
return Err(DeleteGroupError::NonEmpty);
}
self.groups.remove(group_id);
Ok(())
}
pub async fn shutdown_all(&self) {
let handles: Vec<Arc<GroupActorHandle>> =
self.groups.iter().map(|e| e.value().clone()).collect();
for h in handles {
let (tx, rx) = oneshot::channel();
if h.tx.send(GroupActorMessage::Shutdown(tx)).await.is_ok() {
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), rx).await;
}
}
let share_handles: Vec<Arc<ShareGroupActorHandle>> = self
.share_groups
.iter()
.map(|e| e.value().clone())
.collect();
for h in share_handles {
let (tx, rx) = oneshot::channel();
if h.tx
.send(ShareGroupActorMessage::Shutdown(tx))
.await
.is_ok()
{
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), rx).await;
}
}
}
}
impl GroupCoordinator {
pub fn replay_group_metadata(
&self,
group_id: &str,
v: persistence_next_gen::GroupMetadataValue,
) {
{
let mut seed = self.seeds.entry(group_id.into()).or_default();
seed.group_epoch = v.epoch;
}
let mut cached = self.seeds_cache.entry(group_id.into()).or_default();
cached.group_epoch = v.epoch;
}
pub fn replay_member_metadata(
&self,
group_id: &str,
member_id: &str,
v: persistence_next_gen::MemberMetadataValue,
) {
{
let mut seed = self.seeds.entry(group_id.into()).or_default();
seed.members.insert(member_id.into(), v.clone());
}
let mut cached = self.seeds_cache.entry(group_id.into()).or_default();
cached.members.insert(member_id.into(), v);
}
pub fn replay_target_assignment_metadata(
&self,
group_id: &str,
v: persistence_next_gen::TargetAssignmentMetadataValue,
) {
{
let mut seed = self.seeds.entry(group_id.into()).or_default();
seed.target_epoch = v.assignment_epoch;
}
let mut cached = self.seeds_cache.entry(group_id.into()).or_default();
cached.target_epoch = v.assignment_epoch;
}
pub fn replay_target_assignment_member(
&self,
group_id: &str,
member_id: &str,
v: persistence_next_gen::TargetAssignmentMemberValue,
) {
{
let mut seed = self.seeds.entry(group_id.into()).or_default();
seed.target_per_member.insert(member_id.into(), v.clone());
}
let mut cached = self.seeds_cache.entry(group_id.into()).or_default();
cached.target_per_member.insert(member_id.into(), v);
}
pub fn replay_current_member_assignment(
&self,
group_id: &str,
member_id: &str,
v: persistence_next_gen::CurrentMemberAssignmentValue,
) {
{
let mut seed = self.seeds.entry(group_id.into()).or_default();
seed.current_per_member.insert(member_id.into(), v.clone());
}
let mut cached = self.seeds_cache.entry(group_id.into()).or_default();
cached.current_per_member.insert(member_id.into(), v);
}
pub fn replay_next_gen_tombstone(&self, key: &persistence_next_gen::NextGenKey) {
use persistence_next_gen::NextGenKey as K;
if let K::GroupMetadata { group_id } = key {
self.seeds.remove(group_id);
self.seeds_cache.remove(group_id);
return;
}
let group_id = match key {
K::GroupMetadata { group_id }
| K::MemberMetadata { group_id, .. }
| K::TargetAssignmentMetadata { group_id }
| K::TargetAssignmentMember { group_id, .. }
| K::CurrentMemberAssignment { group_id, .. } => group_id.as_str(),
};
let scrub = |seed: &mut GroupSeed| match key {
K::GroupMetadata { .. } => {
seed.group_epoch = 0;
}
K::MemberMetadata { member_id, .. } => {
seed.members.remove(member_id);
}
K::TargetAssignmentMetadata { .. } => {
seed.target_epoch = 0;
}
K::TargetAssignmentMember { member_id, .. } => {
seed.target_per_member.remove(member_id);
}
K::CurrentMemberAssignment { member_id, .. } => {
seed.current_per_member.remove(member_id);
}
};
{
if let Some(mut s) = self.seeds.get_mut(group_id) {
scrub(s.value_mut());
}
}
if let Some(mut s) = self.seeds_cache.get_mut(group_id) {
scrub(s.value_mut());
}
}
pub fn replay_share_group_metadata(
&self,
group_id: &str,
v: share::persistence::ShareGroupMetadataValue,
) {
{
let mut seed = self.share_seeds.entry(group_id.into()).or_default();
seed.group_epoch = v.epoch;
}
let mut cached = self.share_seeds_cache.entry(group_id.into()).or_default();
cached.group_epoch = v.epoch;
}
pub fn replay_share_member_metadata(
&self,
group_id: &str,
member_id: &str,
v: share::persistence::ShareGroupMemberMetadataValue,
) {
{
let mut seed = self.share_seeds.entry(group_id.into()).or_default();
seed.members.insert(member_id.into(), v.clone());
}
let mut cached = self.share_seeds_cache.entry(group_id.into()).or_default();
cached.members.insert(member_id.into(), v);
}
pub fn replay_share_target_assignment_metadata(
&self,
group_id: &str,
v: share::persistence::ShareGroupTargetAssignmentMetadataValue,
) {
{
let mut seed = self.share_seeds.entry(group_id.into()).or_default();
seed.target_epoch = v.assignment_epoch;
}
let mut cached = self.share_seeds_cache.entry(group_id.into()).or_default();
cached.target_epoch = v.assignment_epoch;
}
pub fn replay_share_target_assignment_member(
&self,
group_id: &str,
member_id: &str,
v: share::persistence::ShareGroupTargetAssignmentMemberValue,
) {
{
let mut seed = self.share_seeds.entry(group_id.into()).or_default();
seed.target_per_member.insert(member_id.into(), v.clone());
}
let mut cached = self.share_seeds_cache.entry(group_id.into()).or_default();
cached.target_per_member.insert(member_id.into(), v);
}
pub fn replay_share_current_member_assignment(
&self,
group_id: &str,
member_id: &str,
v: share::persistence::ShareGroupCurrentMemberAssignmentValue,
) {
{
let mut seed = self.share_seeds.entry(group_id.into()).or_default();
seed.current_per_member.insert(member_id.into(), v.clone());
}
let mut cached = self.share_seeds_cache.entry(group_id.into()).or_default();
cached.current_per_member.insert(member_id.into(), v);
}
pub fn replay_share_state_partition_metadata(
&self,
group_id: &str,
v: share::persistence::ShareGroupStatePartitionMetadataValue,
) {
{
let mut seed = self.share_seeds.entry(group_id.into()).or_default();
seed.state_partition_metadata = v.clone();
}
let mut cached = self.share_seeds_cache.entry(group_id.into()).or_default();
cached.state_partition_metadata = v;
}
#[must_use]
pub fn share_state_partition_metadata(
&self,
group_id: &str,
) -> Option<share::persistence::ShareGroupStatePartitionMetadataValue> {
self.share_seeds_cache
.get(group_id)
.map(|e| e.value().state_partition_metadata.clone())
}
pub fn replay_share_tombstone(&self, key: &share::persistence::ShareGroupKey) {
use share::persistence::ShareGroupKey as K;
let group_id = match key {
K::GroupMetadata { group_id }
| K::MemberMetadata { group_id, .. }
| K::TargetAssignmentMetadata { group_id }
| K::TargetAssignmentMember { group_id, .. }
| K::CurrentMemberAssignment { group_id, .. }
| K::StatePartitionMetadata { group_id } => group_id.as_str(),
};
let scrub = |seed: &mut ShareGroupSeed| match key {
K::GroupMetadata { .. } => seed.group_epoch = 0,
K::MemberMetadata { member_id, .. } => {
seed.members.remove(member_id);
}
K::TargetAssignmentMetadata { .. } => seed.target_epoch = 0,
K::TargetAssignmentMember { member_id, .. } => {
seed.target_per_member.remove(member_id);
}
K::CurrentMemberAssignment { member_id, .. } => {
seed.current_per_member.remove(member_id);
}
K::StatePartitionMetadata { .. } => {
seed.state_partition_metadata =
share::persistence::ShareGroupStatePartitionMetadataValue::default();
}
};
{
if let Some(mut s) = self.share_seeds.get_mut(group_id) {
scrub(s.value_mut());
}
}
if let Some(mut s) = self.share_seeds_cache.get_mut(group_id) {
scrub(s.value_mut());
}
}
pub fn replay_streams_group_metadata(&self, group_id: &str, epoch: i32) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.group_epoch = epoch;
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.group_epoch = epoch;
}
pub fn replay_streams_member_metadata(
&self,
group_id: &str,
member_id: &str,
v: streams::persistence::StreamsGroupMemberMetadataValue,
) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.members.insert(member_id.into(), v.clone());
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.members.insert(member_id.into(), v);
}
pub fn replay_streams_topology(
&self,
group_id: &str,
v: streams::persistence::StreamsGroupTopologyValue,
) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.topology = Some(v.clone());
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.topology = Some(v);
}
pub fn replay_streams_partition_metadata(
&self,
group_id: &str,
v: streams::persistence::StreamsGroupPartitionMetadataValue,
) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.partition_metadata = Some(v.clone());
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.partition_metadata = Some(v);
}
pub fn replay_streams_target_assignment_metadata(&self, group_id: &str, assignment_epoch: i32) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.assignment_epoch = assignment_epoch;
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.assignment_epoch = assignment_epoch;
}
pub fn replay_streams_target_assignment_member(
&self,
group_id: &str,
member_id: &str,
v: streams::persistence::StreamsGroupTargetAssignmentMemberValue,
) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.target_per_member.insert(member_id.into(), v.clone());
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.target_per_member.insert(member_id.into(), v);
}
pub fn replay_streams_current_member_assignment(
&self,
group_id: &str,
member_id: &str,
v: streams::persistence::StreamsGroupCurrentMemberAssignmentValue,
) {
{
let mut seed = self.streams_seeds.entry(group_id.into()).or_default();
seed.current_per_member.insert(member_id.into(), v.clone());
}
let mut cached = self.streams_seeds_cache.entry(group_id.into()).or_default();
cached.current_per_member.insert(member_id.into(), v);
}
pub fn replay_streams_tombstone(&self, key: &streams::persistence::StreamsGroupKey) {
use streams::persistence::StreamsGroupKey as K;
let group_id = match key {
K::GroupMetadata { group_id }
| K::MemberMetadata { group_id, .. }
| K::Topology { group_id }
| K::PartitionMetadata { group_id }
| K::TargetAssignmentMetadata { group_id }
| K::TargetAssignmentMember { group_id, .. }
| K::CurrentMemberAssignment { group_id, .. } => group_id.as_str(),
};
let scrub = |seed: &mut StreamsGroupSeed| match key {
K::GroupMetadata { .. } => seed.group_epoch = 0,
K::MemberMetadata { member_id, .. } => {
seed.members.remove(member_id);
}
K::Topology { .. } => seed.topology = None,
K::PartitionMetadata { .. } => seed.partition_metadata = None,
K::TargetAssignmentMetadata { .. } => seed.assignment_epoch = 0,
K::TargetAssignmentMember { member_id, .. } => {
seed.target_per_member.remove(member_id);
}
K::CurrentMemberAssignment { member_id, .. } => {
seed.current_per_member.remove(member_id);
}
};
{
if let Some(mut s) = self.streams_seeds.get_mut(group_id) {
scrub(s.value_mut());
}
}
if let Some(mut s) = self.streams_seeds_cache.get_mut(group_id) {
scrub(s.value_mut());
}
}
pub fn finalize_bootstrap(self: &Arc<Self>) {
let group_ids: Vec<String> = self.seeds.iter().map(|e| e.key().clone()).collect();
for gid in group_ids {
if let Some((_, seed)) = self.seeds.remove(&gid) {
let handle = self.get_or_create_consumer(&gid);
let _ = handle.tx.try_send(actor::GroupActorMessage::Seed(seed));
}
}
let share_ids: Vec<String> = self.share_seeds.iter().map(|e| e.key().clone()).collect();
for gid in share_ids {
if let Some((_, seed)) = self.share_seeds.remove(&gid) {
let handle = self.get_or_create_share(&gid);
let _ = handle.tx.try_send(ShareGroupActorMessage::Seed(seed));
}
}
let streams_ids: Vec<String> = self.streams_seeds.iter().map(|e| e.key().clone()).collect();
for gid in streams_ids {
if let Some((_, seed)) = self.streams_seeds.remove(&gid) {
let handle = self.get_or_create_streams(&gid);
let _ = handle.tx.try_send(StreamsGroupActorMessage::Seed(seed));
}
}
}
}
pub struct ImageMetadataProvider {
pub controller: Arc<dyn crate::metadata_source::MetadataSource>,
}
impl std::fmt::Debug for ImageMetadataProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ImageMetadataProvider")
.finish_non_exhaustive()
}
}
impl MetadataProvider for ImageMetadataProvider {
fn snapshot(&self) -> reconciler::ReconcileInput {
use crabka_protocol::primitives::uuid::Uuid as ProtoUuid;
let image = self.controller.current_image();
let mut topic_id_by_name = std::collections::HashMap::new();
let mut partitions_per_topic = std::collections::HashMap::new();
let mut partition_racks: std::collections::HashMap<(ProtoUuid, i32), Vec<String>> =
std::collections::HashMap::new();
for topic in image.topics() {
let proto_id = ProtoUuid(*topic.topic_id.as_bytes());
topic_id_by_name.insert(topic.name.clone(), proto_id);
partitions_per_topic.insert(proto_id, topic.partitions);
for pr in image.partitions_of(&topic.name) {
let mut racks: Vec<String> = pr
.replicas
.iter()
.filter_map(|&node_id| image.broker(node_id).and_then(|b| b.rack.clone()))
.collect();
racks.sort();
racks.dedup();
if !racks.is_empty() {
partition_racks.insert((proto_id, pr.partition), racks);
}
}
}
reconciler::ReconcileInput {
topic_id_by_name,
partitions_per_topic,
partition_racks,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct GroupSeed {
pub group_epoch: i32,
pub target_epoch: i32,
pub members: std::collections::HashMap<String, persistence_next_gen::MemberMetadataValue>,
pub target_per_member:
std::collections::HashMap<String, persistence_next_gen::TargetAssignmentMemberValue>,
pub current_per_member:
std::collections::HashMap<String, persistence_next_gen::CurrentMemberAssignmentValue>,
}
#[derive(Debug, Default, Clone)]
pub struct ShareGroupSeed {
pub group_epoch: i32,
pub target_epoch: i32,
pub members:
std::collections::HashMap<String, share::persistence::ShareGroupMemberMetadataValue>,
pub target_per_member: std::collections::HashMap<
String,
share::persistence::ShareGroupTargetAssignmentMemberValue,
>,
pub current_per_member: std::collections::HashMap<
String,
share::persistence::ShareGroupCurrentMemberAssignmentValue,
>,
pub state_partition_metadata: share::persistence::ShareGroupStatePartitionMetadataValue,
}
#[derive(Debug, Default, Clone)]
pub struct StreamsGroupSeed {
pub group_epoch: i32,
pub assignment_epoch: i32,
pub topology: Option<streams::persistence::StreamsGroupTopologyValue>,
pub partition_metadata: Option<streams::persistence::StreamsGroupPartitionMetadataValue>,
pub members:
std::collections::HashMap<String, streams::persistence::StreamsGroupMemberMetadataValue>,
pub target_per_member: std::collections::HashMap<
String,
streams::persistence::StreamsGroupTargetAssignmentMemberValue,
>,
pub current_per_member: std::collections::HashMap<
String,
streams::persistence::StreamsGroupCurrentMemberAssignmentValue,
>,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn group_type_has_share_variant() {
let t = GroupType::Share;
assert!(t == GroupType::Share);
assert!(t != GroupType::Classic);
assert!(t != GroupType::NextGen);
}
fn make_coord() -> Arc<GroupCoordinator> {
use crate::coordinator::unified::offsets_log::fake::InMemoryOffsetsLog;
let metadata: Arc<dyn MetadataProvider> = Arc::new(ImageMetadatalessProvider);
Arc::new(GroupCoordinator::new(
NextGenConfig::default(),
ShareGroupConfig::default(),
metadata,
Arc::new(InMemoryOffsetsLog::default()),
StreamsGroupConfig::default(),
))
}
#[derive(Debug)]
struct ImageMetadatalessProvider;
impl MetadataProvider for ImageMetadatalessProvider {
fn snapshot(&self) -> reconciler::ReconcileInput {
reconciler::ReconcileInput::default()
}
}
#[test]
fn mark_share_locks_group_type() {
let coord = make_coord();
coord.mark_share("sg");
assert!(coord.group_type("sg") == Some(GroupType::Share));
coord.mark_classic("sg");
assert!(coord.group_type("sg") == Some(GroupType::Share));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_or_create_group_returns_the_one_actor_regardless_of_kind() {
let coord = make_coord();
let a = coord.get_or_create_group("g", GroupKindTag::Classic);
let b = coord.get_or_create_group("g", GroupKindTag::Consumer);
assert!(Arc::ptr_eq(&a, &b));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_or_create_share_is_idempotent() {
let coord = make_coord();
let a = coord.get_or_create_share("sg");
let b = coord.get_or_create_share("sg");
assert!(Arc::ptr_eq(&a, &b));
assert!(coord.find_share("sg").is_some());
}
#[test]
fn mark_streams_after_upgrade_forces_streams_over_classic() {
let c = make_coord();
c.mark_classic("g");
assert!(c.group_type("g") == Some(GroupType::Classic));
c.mark_streams("g");
assert!(c.group_type("g") == Some(GroupType::Classic));
c.mark_streams_after_upgrade("g");
assert!(c.group_type("g") == Some(GroupType::Streams));
}
#[test]
fn share_state_partition_metadata_none_then_some() {
let coord = make_coord();
assert!(coord.share_state_partition_metadata("sg").is_none());
let tid = uuid::Uuid::from_u128(1);
let v = share::persistence::ShareGroupStatePartitionMetadataValue {
initialized: vec![(tid, vec![0, 1])],
deleting: vec![],
};
coord.replay_share_state_partition_metadata("sg", v.clone());
assert!(coord.share_state_partition_metadata("sg") == Some(v));
}
}