use std::collections::BTreeMap;
use anyhow::anyhow;
use bytes::{Buf, Bytes};
use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
use kafka_protocol::messages::create_partitions_request::{
CreatePartitionsAssignment, CreatePartitionsTopic,
};
use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
use kafka_protocol::messages::incremental_alter_configs_request::{
AlterConfigsResource, AlterableConfig,
};
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
use kafka_protocol::messages::{
AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerProtocolAssignment,
ConsumerProtocolSubscription, CreatePartitionsRequest, CreatePartitionsResponse,
CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest, DeleteGroupsResponse,
DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest, DescribeClusterResponse,
DescribeConfigsRequest, DescribeConfigsResponse, DescribeGroupsRequest, DescribeGroupsResponse,
IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
ListGroupsResponse, MetadataRequest, MetadataResponse, ShareGroupDescribeRequest,
ShareGroupDescribeResponse, UpdateFeaturesRequest, UpdateFeaturesResponse,
};
use kafka_protocol::protocol::{Decodable, Request, StrBytes};
use tracing::{debug, instrument};
use uuid::Uuid;
use crate::config::{AdminConfig, SaslMechanism};
use crate::constants::{
ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
DESCRIBE_GROUPS_VERSION_CAP, INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP,
METADATA_VERSION_CAP, SHARE_GROUP_DESCRIBE_VERSION_CAP, UPDATE_FEATURES_VERSION_CAP,
};
use crate::network::scram;
use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
use crate::{AdminError, Result};
#[derive(Debug, Clone)]
pub struct NewTopic {
pub name: String,
pub num_partitions: i32,
pub replication_factor: i16,
pub configs: BTreeMap<String, String>,
}
impl NewTopic {
pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
Self {
name: name.into(),
num_partitions,
replication_factor,
configs: BTreeMap::new(),
}
}
pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.configs.insert(key.into(), value.into());
self
}
fn into_request_topic(self) -> Result<CreatableTopic> {
let name = validate_topic_name(self.name)?;
if self.num_partitions <= 0 {
return Err(AdminError::InvalidPartitionCount {
partitions: self.num_partitions,
}
.into());
}
if self.replication_factor <= 0 {
return Err(AdminError::InvalidReplicationFactor {
replication_factor: self.replication_factor,
}
.into());
}
let configs = self
.configs
.into_iter()
.map(|(key, value)| {
CreatableTopicConfig::default()
.with_name(StrBytes::from_string(key))
.with_value(Some(StrBytes::from_string(value)))
})
.collect();
Ok(CreatableTopic::default()
.with_name(StrBytes::from_string(name).into())
.with_num_partitions(self.num_partitions)
.with_replication_factor(self.replication_factor)
.with_configs(configs))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NewPartitions {
pub total_count: i32,
pub assignments: Vec<Vec<i32>>,
}
impl NewPartitions {
pub fn increase_to(total_count: i32) -> Self {
Self {
total_count,
assignments: Vec::new(),
}
}
pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
where
I: IntoIterator<Item = i32>,
{
self.assignments.push(broker_ids.into_iter().collect());
self
}
fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
let name = validate_topic_name(topic_name)?;
if self.total_count <= 0 {
return Err(AdminError::InvalidPartitionCount {
partitions: self.total_count,
}
.into());
}
let assignments = (!self.assignments.is_empty()).then(|| {
self.assignments
.into_iter()
.map(|broker_ids| {
CreatePartitionsAssignment::default()
.with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
})
.collect()
});
Ok(CreatePartitionsTopic::default()
.with_name(StrBytes::from_string(name).into())
.with_count(self.total_count)
.with_assignments(assignments))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopicListing {
pub name: String,
pub topic_id: Option<Uuid>,
pub is_internal: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopicPartitionDescription {
pub partition: i32,
pub leader_id: i32,
pub leader_epoch: i32,
pub replica_nodes: Vec<i32>,
pub isr_nodes: Vec<i32>,
pub offline_replicas: Vec<i32>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopicDescription {
pub name: String,
pub topic_id: Option<Uuid>,
pub is_internal: bool,
pub partitions: Vec<TopicPartitionDescription>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrokerDescription {
pub broker_id: i32,
pub host: String,
pub port: i32,
pub rack: Option<String>,
pub is_fenced: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClusterDescription {
pub cluster_id: String,
pub controller_id: i32,
pub brokers: Vec<BrokerDescription>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BrokerFeatureLevel {
pub name: String,
pub level: i16,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FeatureUpdate {
pub name: String,
pub max_version_level: i16,
pub upgrade_type: FeatureUpgradeType,
}
impl FeatureUpdate {
pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
Self {
name: name.into(),
max_version_level,
upgrade_type: FeatureUpgradeType::Upgrade,
}
}
pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
Self {
name: name.into(),
max_version_level,
upgrade_type: FeatureUpgradeType::SafeDowngrade,
}
}
pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
Self {
name: name.into(),
max_version_level,
upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
}
}
fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
let name = validate_feature_name(self.name)?;
let allow_downgrade = self.upgrade_type.allows_downgrade();
let mut update = FeatureUpdateKey::default()
.with_feature(StrBytes::from_string(name))
.with_max_version_level(self.max_version_level);
if version == 0 {
update = update.with_allow_downgrade(allow_downgrade);
} else {
update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
}
Ok(update)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FeatureUpgradeType {
Upgrade,
SafeDowngrade,
UnsafeDowngrade,
}
impl FeatureUpgradeType {
fn as_protocol_value(self) -> i8 {
match self {
Self::Upgrade => 1,
Self::SafeDowngrade => 2,
Self::UnsafeDowngrade => 3,
}
}
fn allows_downgrade(self) -> bool {
!matches!(self, Self::Upgrade)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerGroupListing {
pub group_id: String,
pub protocol_type: String,
pub state: Option<String>,
pub group_type: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerGroupDescription {
pub group_id: String,
pub state: String,
pub protocol_type: String,
pub protocol_data: String,
pub members: Vec<ConsumerGroupMemberDescription>,
pub authorized_operations: Option<i32>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerGroupMemberDescription {
pub member_id: String,
pub group_instance_id: Option<String>,
pub client_id: String,
pub client_host: String,
pub member_metadata_bytes: usize,
pub member_assignment_bytes: usize,
}
pub type GroupListing = ConsumerGroupListing;
pub type GroupDescription = ConsumerGroupDescription;
pub type GroupMemberDescription = ConsumerGroupMemberDescription;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ConfigResourceType {
Unknown,
Topic,
Broker,
BrokerLogger,
ClientMetrics,
Group,
}
impl ConfigResourceType {
fn as_protocol_value(self) -> i8 {
match self {
Self::Unknown => 0,
Self::Topic => 2,
Self::Broker => 4,
Self::BrokerLogger => 8,
Self::ClientMetrics => 16,
Self::Group => 32,
}
}
fn from_protocol_value(value: i8) -> Self {
match value {
2 => Self::Topic,
4 => Self::Broker,
8 => Self::BrokerLogger,
16 => Self::ClientMetrics,
32 => Self::Group,
_ => Self::Unknown,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ConfigResource {
pub resource_type: ConfigResourceType,
pub resource_name: String,
}
impl ConfigResource {
pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
Self {
resource_type,
resource_name: resource_name.into(),
}
}
pub fn topic(resource_name: impl Into<String>) -> Self {
Self::new(ConfigResourceType::Topic, resource_name)
}
pub fn group(resource_name: impl Into<String>) -> Self {
Self::new(ConfigResourceType::Group, resource_name)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConfigEntry {
pub name: String,
pub value: Option<String>,
pub read_only: bool,
pub config_source: i8,
pub is_sensitive: bool,
pub config_type: Option<i8>,
pub documentation: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConfigResourceConfig {
pub resource: ConfigResource,
pub entries: BTreeMap<String, ConfigEntry>,
}
impl ConfigResourceConfig {
pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
self.entries.get(name)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AlterConfigOpType {
Set,
Delete,
Append,
Subtract,
}
impl AlterConfigOpType {
fn as_protocol_value(self) -> i8 {
match self {
Self::Set => 0,
Self::Delete => 1,
Self::Append => 2,
Self::Subtract => 3,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AlterConfigOp {
pub name: String,
pub op_type: AlterConfigOpType,
pub value: Option<String>,
}
impl AlterConfigOp {
pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
Self {
name: name.into(),
op_type: AlterConfigOpType::Set,
value: Some(value.into()),
}
}
pub fn delete(name: impl Into<String>) -> Self {
Self {
name: name.into(),
op_type: AlterConfigOpType::Delete,
value: None,
}
}
}
#[derive(Debug, Clone)]
pub struct KafkaAdmin {
config: AdminConfig,
}
impl KafkaAdmin {
#[instrument(
name = "admin.connect",
level = "debug",
skip(config),
fields(
bootstrap_server_count = config.bootstrap_servers.len(),
client_id = %config.client_id
)
)]
pub async fn connect(config: AdminConfig) -> Result<Self> {
let admin = Self { config };
admin.warm_up().await?;
debug!("admin client connected");
Ok(admin)
}
#[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
pub async fn create_topics<I>(&self, topics: I) -> Result<()>
where
I: IntoIterator<Item = NewTopic>,
{
let topics = topics
.into_iter()
.map(NewTopic::into_request_topic)
.collect::<Result<Vec<_>>>()?;
if topics.is_empty() {
return Ok(());
}
let request = CreateTopicsRequest::default()
.with_topics(topics)
.with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
.with_validate_only(false);
let response: CreateTopicsResponse = self
.send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
.await?;
for topic in response.topics {
let name = topic.name.0.to_string();
if let Some(error) = topic
.error_code
.err()
.filter(|error| !is_ignorable_create_topic_error(*error))
{
return Err(anyhow!("create topic '{name}' failed: {error}").into());
}
}
Ok(())
}
#[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let topic_names = topics
.into_iter()
.map(|topic| validate_topic_name(topic.into()))
.collect::<Result<Vec<_>>>()?;
if topic_names.is_empty() {
return Ok(());
}
let request = DeleteTopicsRequest::default()
.with_topic_names(
topic_names
.iter()
.cloned()
.map(StrBytes::from_string)
.map(Into::into)
.collect(),
)
.with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
let response: DeleteTopicsResponse = self
.send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
.await?;
for topic in response.responses {
let name = topic
.name
.as_ref()
.map(|name| name.0.to_string())
.unwrap_or_else(|| "<unknown>".to_owned());
if let Some(error) = topic.error_code.err() {
return Err(anyhow!("delete topic '{name}' failed: {error}").into());
}
}
Ok(())
}
#[instrument(
name = "admin.create_partitions",
level = "debug",
skip(self, partitions)
)]
pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
where
I: IntoIterator<Item = (S, NewPartitions)>,
S: Into<String>,
{
let topics = partitions
.into_iter()
.map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
.collect::<Result<Vec<_>>>()?;
if topics.is_empty() {
return Ok(());
}
let request = CreatePartitionsRequest::default()
.with_topics(topics)
.with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
.with_validate_only(false);
let response: CreatePartitionsResponse = self
.send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
.await?;
for topic in response.results {
let name = topic.name.0.to_string();
if let Some(error) = topic.error_code.err() {
return Err(anyhow!(
"create partitions for topic '{name}' failed: {}",
topic
.error_message
.as_ref()
.map(|message| message.to_string())
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string())
)
.into());
}
}
Ok(())
}
#[instrument(name = "admin.list_topics", level = "debug", skip(self))]
pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
let response = self.fetch_metadata(None).await?;
let mut topics = Vec::new();
for topic in response.topics {
let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
continue;
};
if let Some(error) = topic.error_code.err() {
return Err(anyhow!("list topics failed for '{name}': {error}").into());
}
topics.push(TopicListing {
name,
topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
is_internal: topic.is_internal,
});
}
topics.sort_by(|left, right| left.name.cmp(&right.name));
Ok(topics)
}
#[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let requested_topics = topics
.into_iter()
.map(|topic| validate_topic_name(topic.into()))
.collect::<Result<Vec<_>>>()?;
if requested_topics.is_empty() {
return Ok(Vec::new());
}
let response = self.fetch_metadata(Some(&requested_topics)).await?;
let mut descriptions = BTreeMap::new();
for topic in response.topics {
let name = topic
.name
.as_ref()
.map(|name| name.0.to_string())
.unwrap_or_default();
if let Some(error) = topic.error_code.err() {
let label = if name.is_empty() { "<unknown>" } else { &name };
return Err(anyhow!("describe topic '{label}' failed: {error}").into());
}
let mut partitions = topic
.partitions
.into_iter()
.map(|partition| {
if let Some(error) = partition.error_code.err() {
return Err(anyhow!(
"describe topic '{name}' partition {} failed: {error}",
partition.partition_index
));
}
Ok(TopicPartitionDescription {
partition: partition.partition_index,
leader_id: partition.leader_id.0,
leader_epoch: partition.leader_epoch,
replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
offline_replicas: partition
.offline_replicas
.into_iter()
.map(|id| id.0)
.collect(),
})
})
.collect::<std::result::Result<Vec<_>, _>>()?;
partitions.sort_by_key(|partition| partition.partition);
descriptions.insert(
name.clone(),
TopicDescription {
name,
topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
is_internal: topic.is_internal,
partitions,
},
);
}
requested_topics
.into_iter()
.map(|topic| {
descriptions.remove(&topic).ok_or_else(|| {
anyhow!("metadata response did not include topic '{topic}'").into()
})
})
.collect()
}
#[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
let (mut connection, version) = self
.connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
.await?;
let mut request =
DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
if version >= 1 {
request = request.with_endpoint_type(1);
}
if version >= 2 {
request = request.with_include_fenced_brokers(true);
}
let response: DescribeClusterResponse = connection
.send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
.await?;
if let Some(error) = response.error_code.err() {
return Err(anyhow!("describe cluster failed: {error}").into());
}
let mut brokers = response
.brokers
.into_iter()
.map(|broker| BrokerDescription {
broker_id: broker.broker_id.0,
host: broker.host.to_string(),
port: broker.port,
rack: broker.rack.map(|rack| rack.to_string()),
is_fenced: broker.is_fenced,
})
.collect::<Vec<_>>();
brokers.sort_by_key(|broker| broker.broker_id);
Ok(ClusterDescription {
cluster_id: response.cluster_id.to_string(),
controller_id: response.controller_id.0,
brokers,
})
}
#[instrument(name = "admin.list_groups", level = "debug", skip(self))]
pub async fn list_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
let response: ListGroupsResponse = self
.send_request::<ListGroupsRequest>(
LIST_GROUPS_VERSION_CAP,
&ListGroupsRequest::default(),
)
.await?;
if let Some(error) = response.error_code.err() {
return Err(anyhow!("list groups failed: {error}").into());
}
let mut groups = response
.groups
.into_iter()
.map(|group| ConsumerGroupListing {
group_id: group.group_id.to_string(),
protocol_type: group.protocol_type.to_string(),
state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
})
.collect::<Vec<_>>();
groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
Ok(groups)
}
#[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
self.list_groups().await
}
#[instrument(name = "admin.describe_groups", level = "debug", skip(self, groups))]
pub async fn describe_groups<I, S>(&self, groups: I) -> Result<Vec<GroupDescription>>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let group_ids = groups
.into_iter()
.map(|group| validate_group_id(group.into()))
.collect::<Result<Vec<_>>>()?;
if group_ids.is_empty() {
return Ok(Vec::new());
}
let listed_group_types = self
.list_groups()
.await?
.into_iter()
.map(|group| (group.group_id, group.group_type))
.collect::<BTreeMap<_, _>>();
let mut consumer_groups = Vec::new();
let mut share_groups = Vec::new();
let mut classic_groups = Vec::new();
for group_id in &group_ids {
match listed_group_types
.get(group_id)
.and_then(|group_type| group_type.as_deref())
{
Some(group_type) if group_type.eq_ignore_ascii_case("share") => {
share_groups.push(group_id.clone());
}
Some(group_type) if group_type.eq_ignore_ascii_case("classic") => {
classic_groups.push(group_id.clone());
}
_ => consumer_groups.push(group_id.clone()),
}
}
let mut descriptions = BTreeMap::new();
for group in self.describe_consumer_groups(consumer_groups).await? {
descriptions.insert(group.group_id.clone(), group);
}
for group in self.describe_share_groups(share_groups).await? {
descriptions.insert(group.group_id.clone(), group);
}
for group in self.describe_classic_groups(classic_groups).await? {
descriptions.insert(group.group_id.clone(), group);
}
group_ids
.into_iter()
.map(|group_id| {
descriptions.remove(&group_id).ok_or_else(|| {
anyhow!("describe groups response did not include group '{group_id}'").into()
})
})
.collect()
}
#[instrument(
name = "admin.describe_consumer_groups",
level = "debug",
skip(self, groups)
)]
pub async fn describe_consumer_groups<I, S>(
&self,
groups: I,
) -> Result<Vec<ConsumerGroupDescription>>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let group_ids = groups
.into_iter()
.map(|group| validate_group_id(group.into()))
.collect::<Result<Vec<_>>>()?;
if group_ids.is_empty() {
return Ok(Vec::new());
}
let request = ConsumerGroupDescribeRequest::default()
.with_group_ids(
group_ids
.iter()
.cloned()
.map(StrBytes::from_string)
.map(Into::into)
.collect(),
)
.with_include_authorized_operations(false);
let response: ConsumerGroupDescribeResponse = self
.send_request::<ConsumerGroupDescribeRequest>(
CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
&request,
)
.await?;
let mut descriptions = BTreeMap::new();
for group in response.groups {
let group_id = group.group_id.to_string();
if let Some(error) = group.error_code.err() {
let message = group
.error_message
.as_ref()
.map(ToString::to_string)
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string());
return Err(
anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
);
}
descriptions.insert(
group_id.clone(),
ConsumerGroupDescription {
group_id,
state: group.group_state.to_string(),
protocol_type: "consumer".to_owned(),
protocol_data: group.assignor_name.to_string(),
members: group
.members
.into_iter()
.map(|member| ConsumerGroupMemberDescription {
member_id: member.member_id.to_string(),
group_instance_id: member
.instance_id
.map(|instance_id| instance_id.to_string()),
client_id: member.client_id.to_string(),
client_host: member.client_host.to_string(),
member_metadata_bytes: member.subscribed_topic_names.len(),
member_assignment_bytes: assignment_partition_count(&member.assignment),
})
.collect(),
authorized_operations: (group.authorized_operations != i32::MIN)
.then_some(group.authorized_operations),
},
);
}
group_ids
.into_iter()
.map(|group_id| {
descriptions.remove(&group_id).ok_or_else(|| {
anyhow!("describe groups response did not include group '{group_id}'").into()
})
})
.collect()
}
#[instrument(
name = "admin.describe_classic_groups",
level = "debug",
skip(self, groups)
)]
pub async fn describe_classic_groups<I, S>(
&self,
groups: I,
) -> Result<Vec<ConsumerGroupDescription>>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let group_ids = groups
.into_iter()
.map(|group| validate_group_id(group.into()))
.collect::<Result<Vec<_>>>()?;
if group_ids.is_empty() {
return Ok(Vec::new());
}
let request = DescribeGroupsRequest::default()
.with_groups(
group_ids
.iter()
.cloned()
.map(StrBytes::from_string)
.map(Into::into)
.collect(),
)
.with_include_authorized_operations(false);
let response: DescribeGroupsResponse = self
.send_request::<DescribeGroupsRequest>(DESCRIBE_GROUPS_VERSION_CAP, &request)
.await?;
let mut descriptions = BTreeMap::new();
for group in response.groups {
let group_id = group.group_id.to_string();
if let Some(error) = group.error_code.err() {
let message = group
.error_message
.as_ref()
.map(ToString::to_string)
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string());
return Err(
anyhow!("describe classic group '{group_id}' failed: {message}").into(),
);
}
descriptions.insert(
group_id.clone(),
ConsumerGroupDescription {
group_id,
state: group.group_state.to_string(),
protocol_type: group.protocol_type.to_string(),
protocol_data: group.protocol_data.to_string(),
members: group
.members
.into_iter()
.map(|member| {
let member_metadata_bytes =
classic_subscription_topic_count(&member.member_metadata);
let member_assignment_bytes =
classic_assignment_partition_count(&member.member_assignment);
ConsumerGroupMemberDescription {
member_id: member.member_id.to_string(),
group_instance_id: member
.group_instance_id
.map(|instance_id| instance_id.to_string()),
client_id: member.client_id.to_string(),
client_host: member.client_host.to_string(),
member_metadata_bytes,
member_assignment_bytes,
}
})
.collect(),
authorized_operations: (group.authorized_operations != i32::MIN)
.then_some(group.authorized_operations),
},
);
}
group_ids
.into_iter()
.map(|group_id| {
descriptions.remove(&group_id).ok_or_else(|| {
anyhow!("describe classic groups response did not include group '{group_id}'")
.into()
})
})
.collect()
}
#[instrument(
name = "admin.describe_share_groups",
level = "debug",
skip(self, groups)
)]
pub async fn describe_share_groups<I, S>(
&self,
groups: I,
) -> Result<Vec<ConsumerGroupDescription>>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let group_ids = groups
.into_iter()
.map(|group| validate_group_id(group.into()))
.collect::<Result<Vec<_>>>()?;
if group_ids.is_empty() {
return Ok(Vec::new());
}
let request = ShareGroupDescribeRequest::default()
.with_group_ids(
group_ids
.iter()
.cloned()
.map(StrBytes::from_string)
.map(Into::into)
.collect(),
)
.with_include_authorized_operations(false);
let response: ShareGroupDescribeResponse = self
.send_request::<ShareGroupDescribeRequest>(SHARE_GROUP_DESCRIBE_VERSION_CAP, &request)
.await?;
let mut descriptions = BTreeMap::new();
for group in response.groups {
let group_id = group.group_id.to_string();
if let Some(error) = group.error_code.err() {
let message = group
.error_message
.as_ref()
.map(ToString::to_string)
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string());
return Err(anyhow!("describe share group '{group_id}' failed: {message}").into());
}
descriptions.insert(
group_id.clone(),
ConsumerGroupDescription {
group_id,
state: group.group_state.to_string(),
protocol_type: "share".to_owned(),
protocol_data: group.assignor_name.to_string(),
members: group
.members
.into_iter()
.map(|member| ConsumerGroupMemberDescription {
member_id: member.member_id.to_string(),
group_instance_id: None,
client_id: member.client_id.to_string(),
client_host: member.client_host.to_string(),
member_metadata_bytes: member.subscribed_topic_names.len(),
member_assignment_bytes: share_assignment_partition_count(
&member.assignment,
),
})
.collect(),
authorized_operations: (group.authorized_operations != i32::MIN)
.then_some(group.authorized_operations),
},
);
}
group_ids
.into_iter()
.map(|group_id| {
descriptions.remove(&group_id).ok_or_else(|| {
anyhow!("describe share groups response did not include group '{group_id}'")
.into()
})
})
.collect()
}
#[instrument(name = "admin.delete_groups", level = "debug", skip(self, groups))]
pub async fn delete_groups<I, S>(&self, groups: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let group_ids = groups
.into_iter()
.map(|group| validate_group_id(group.into()))
.collect::<Result<Vec<_>>>()?;
if group_ids.is_empty() {
return Ok(());
}
let request = DeleteGroupsRequest::default().with_groups_names(
group_ids
.iter()
.cloned()
.map(StrBytes::from_string)
.map(Into::into)
.collect(),
);
let response: DeleteGroupsResponse = self
.send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
.await?;
for result in response.results {
if let Some(error) = result.error_code.err() {
return Err(anyhow!("delete group '{}' failed: {error}", &*result.group_id).into());
}
}
Ok(())
}
#[instrument(
name = "admin.delete_consumer_groups",
level = "debug",
skip(self, groups)
)]
pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.delete_groups(groups).await
}
#[instrument(
name = "admin.describe_configs",
level = "debug",
skip(self, resources)
)]
pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
where
I: IntoIterator<Item = ConfigResource>,
{
let resources = resources.into_iter().collect::<Vec<_>>();
if resources.is_empty() {
return Ok(Vec::new());
}
let request = DescribeConfigsRequest::default()
.with_resources(
resources
.iter()
.map(|resource| {
DescribeConfigsResource::default()
.with_resource_type(resource.resource_type.as_protocol_value())
.with_resource_name(StrBytes::from_string(
resource.resource_name.clone(),
))
.with_configuration_keys(None)
})
.collect(),
)
.with_include_synonyms(false);
let response: DescribeConfigsResponse = self
.send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
.await?;
let mut described = BTreeMap::new();
for resource in response.results {
let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
let resource_name = resource.resource_name.to_string();
if let Some(error) = resource.error_code.err() {
return Err(anyhow!(
"describe configs for {:?} '{}' failed: {}",
resource_type,
resource_name,
resource
.error_message
.as_ref()
.map(|message| message.to_string())
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string())
)
.into());
}
let entries = resource
.configs
.into_iter()
.map(|entry| {
let name = entry.name.to_string();
let config_entry = ConfigEntry {
name: name.clone(),
value: entry.value.map(|value| value.to_string()),
read_only: entry.read_only,
config_source: entry.config_source,
is_sensitive: entry.is_sensitive,
config_type: (response_supported_config_type(entry.config_type))
.then_some(entry.config_type),
documentation: entry.documentation.map(|doc| doc.to_string()),
};
(name, config_entry)
})
.collect();
described.insert(
(resource_type, resource_name.clone()),
ConfigResourceConfig {
resource: ConfigResource::new(resource_type, resource_name),
entries,
},
);
}
resources
.into_iter()
.map(|resource| {
described
.remove(&(resource.resource_type, resource.resource_name.clone()))
.ok_or_else(|| {
anyhow!("describe configs response did not include {:?}", resource).into()
})
})
.collect()
}
#[instrument(
name = "admin.incremental_alter_configs",
level = "debug",
skip(self, resources)
)]
pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
where
I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
{
let resources = resources
.into_iter()
.map(|(resource, ops)| {
AlterConfigsResource::default()
.with_resource_type(resource.resource_type.as_protocol_value())
.with_resource_name(StrBytes::from_string(resource.resource_name))
.with_configs(
ops.into_iter()
.map(|op| {
AlterableConfig::default()
.with_name(StrBytes::from_string(op.name))
.with_config_operation(op.op_type.as_protocol_value())
.with_value(op.value.map(StrBytes::from_string))
})
.collect(),
)
})
.collect::<Vec<_>>();
if resources.is_empty() {
return Ok(());
}
let request = IncrementalAlterConfigsRequest::default()
.with_resources(resources)
.with_validate_only(false);
let response: IncrementalAlterConfigsResponse = self
.send_request::<IncrementalAlterConfigsRequest>(
INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
&request,
)
.await?;
for resource in response.responses {
if let Some(error) = resource.error_code.err() {
return Err(anyhow!(
"incremental alter configs for {:?} '{}' failed: {}",
ConfigResourceType::from_protocol_value(resource.resource_type),
resource.resource_name,
resource
.error_message
.as_ref()
.map(|message| message.to_string())
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string())
)
.into());
}
}
Ok(())
}
#[instrument(
name = "admin.upsert_scram_credential",
level = "debug",
skip(self, user, password)
)]
pub async fn upsert_scram_credential(
&self,
user: impl Into<String>,
mechanism: SaslMechanism,
password: impl AsRef<[u8]>,
) -> Result<()> {
self.upsert_scram_credential_with_iterations(
user,
mechanism,
password,
scram::MIN_ITERATIONS,
)
.await
}
#[instrument(
name = "admin.upsert_scram_credential_with_iterations",
level = "debug",
skip(self, user, password)
)]
pub async fn upsert_scram_credential_with_iterations(
&self,
user: impl Into<String>,
mechanism: SaslMechanism,
password: impl AsRef<[u8]>,
iterations: i32,
) -> Result<()> {
let user = user.into();
let mechanism_type = mechanism
.scram_type()
.ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
let salt = scram::secure_random_bytes()?;
let salted_password =
scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
ScramCredentialUpsertion::default()
.with_name(StrBytes::from_string(user.clone()))
.with_mechanism(mechanism_type)
.with_iterations(iterations)
.with_salt(Bytes::from(salt))
.with_salted_password(Bytes::from(salted_password)),
]);
let response: AlterUserScramCredentialsResponse = self
.send_request::<AlterUserScramCredentialsRequest>(
ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
&request,
)
.await?;
for result in response.results {
if let Some(error) = result.error_code.err() {
return Err(anyhow!(
"alter SCRAM credential for user '{}' failed: {}",
result.user,
result
.error_message
.as_ref()
.map(|message| message.to_string())
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string())
)
.into());
}
}
Ok(())
}
pub fn config(&self) -> &AdminConfig {
&self.config
}
pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
let connection = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
Ok(connection
.finalized_feature_levels()
.into_iter()
.map(|(name, level)| BrokerFeatureLevel { name, level })
.collect())
}
pub async fn update_features<I>(&self, updates: I) -> Result<()>
where
I: IntoIterator<Item = FeatureUpdate>,
{
self.update_features_inner(updates, false).await
}
pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
where
I: IntoIterator<Item = FeatureUpdate>,
{
self.update_features_inner(updates, true).await
}
async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
where
I: IntoIterator<Item = FeatureUpdate>,
{
let (mut connection, version) = self
.connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
.await?;
let feature_updates = updates
.into_iter()
.map(|update| update.into_request_update(version))
.collect::<Result<Vec<_>>>()?;
if feature_updates.is_empty() {
return Ok(());
}
let mut request = UpdateFeaturesRequest::default()
.with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
.with_feature_updates(feature_updates);
if version >= 1 {
request = request.with_validate_only(validate_only);
} else if validate_only {
return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
}
let response: UpdateFeaturesResponse = connection
.send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
.await?;
if let Some(error) = response.error_code.err() {
return Err(anyhow!(
"update features failed: {}",
response
.error_message
.as_ref()
.map(|message| message.to_string())
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string())
)
.into());
}
for result in response.results {
if let Some(error) = result.error_code.err() {
let feature = result.feature.to_string();
return Err(anyhow!(
"update feature '{feature}' failed: {}",
result
.error_message
.as_ref()
.map(|message| message.to_string())
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string())
)
.into());
}
}
Ok(())
}
async fn warm_up(&self) -> Result<()> {
let _ = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
Ok(())
}
async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
let (mut connection, version) = self
.connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
.await?;
let request = MetadataRequest::default()
.with_topics(topics.map(|topics| {
topics
.iter()
.cloned()
.map(StrBytes::from_string)
.map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
.collect()
}))
.with_allow_auto_topic_creation(false)
.with_include_cluster_authorized_operations(false)
.with_include_topic_authorized_operations(false);
Ok(connection
.send_request::<MetadataRequest>(&self.config.client_id, version, &request)
.await?)
}
async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
where
Req: Request,
{
let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
Ok(connection
.send_request::<Req>(&self.config.client_id, version, request)
.await?)
}
async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
where
Req: Request,
{
let connection = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
let version = connection.version_with_cap::<Req>(version_cap)?;
Ok((connection, version))
}
}
fn validate_topic_name(topic: String) -> Result<String> {
let topic = topic.trim();
if topic.is_empty() {
return Err(AdminError::EmptyTopicName.into());
}
Ok(topic.to_owned())
}
fn validate_group_id(group_id: String) -> Result<String> {
let group_id = group_id.trim();
if group_id.is_empty() {
return Err(anyhow!("consumer group id cannot be empty").into());
}
Ok(group_id.to_owned())
}
fn validate_feature_name(feature: String) -> Result<String> {
let feature = feature.trim();
if feature.is_empty() {
return Err(anyhow!("feature names must be non-empty").into());
}
Ok(feature.to_owned())
}
fn assignment_partition_count(
assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
) -> usize {
assignment
.topic_partitions
.iter()
.map(|topic| topic.partitions.len())
.sum()
}
fn share_assignment_partition_count(
assignment: &kafka_protocol::messages::share_group_describe_response::Assignment,
) -> usize {
assignment
.topic_partitions
.iter()
.map(|topic| topic.partitions.len())
.sum()
}
fn classic_subscription_topic_count(metadata: &Bytes) -> usize {
let Some((version, mut body)) = classic_protocol_body(metadata) else {
return 0;
};
ConsumerProtocolSubscription::decode(&mut body, version)
.map(|subscription| subscription.topics.len())
.unwrap_or(0)
}
fn classic_assignment_partition_count(assignment: &Bytes) -> usize {
let Some((version, mut body)) = classic_protocol_body(assignment) else {
return 0;
};
ConsumerProtocolAssignment::decode(&mut body, version)
.map(|assignment| {
assignment
.assigned_partitions
.iter()
.map(|topic| topic.partitions.len())
.sum()
})
.unwrap_or(0)
}
fn classic_protocol_body(bytes: &Bytes) -> Option<(i16, Bytes)> {
let mut body = bytes.clone();
if body.remaining() < 2 {
return None;
}
let version = body.get_i16();
Some((version, body))
}
fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
error == ResponseError::TopicAlreadyExists
}
fn response_supported_config_type(config_type: i8) -> bool {
config_type >= 0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_topic_maps_to_create_topics_request() {
let topic = NewTopic::new("orders", 3, 2)
.with_config("cleanup.policy", "compact")
.into_request_topic()
.expect("topic should be valid");
assert_eq!(topic.name.0.to_string(), "orders");
assert_eq!(topic.num_partitions, 3);
assert_eq!(topic.replication_factor, 2);
assert_eq!(topic.configs.len(), 1);
assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
}
#[test]
fn new_topic_rejects_invalid_partition_count() {
let error = NewTopic::new("orders", 0, 1)
.into_request_topic()
.expect_err("invalid topic should fail");
assert!(
error
.to_string()
.contains("topic partition count must be positive")
);
}
#[test]
fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
let error = NewTopic::new(" ", 1, 1).into_request_topic().unwrap_err();
assert!(matches!(
error,
crate::Error::Admin(AdminError::EmptyTopicName)
));
let error = NewTopic::new("orders", 1, 0)
.into_request_topic()
.unwrap_err();
assert!(matches!(
error,
crate::Error::Admin(AdminError::InvalidReplicationFactor {
replication_factor: 0
})
));
}
#[test]
fn new_partitions_maps_assignments_and_rejects_invalid_input() {
let topic = NewPartitions::increase_to(4)
.with_assignment([1, 2])
.with_assignment([2, 3])
.into_request_topic("orders".to_owned())
.unwrap();
assert_eq!(topic.name.to_string(), "orders");
assert_eq!(topic.count, 4);
assert_eq!(topic.assignments.unwrap().len(), 2);
let error = NewPartitions::increase_to(0)
.into_request_topic("orders".to_owned())
.unwrap_err();
assert!(matches!(
error,
crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
));
let error = NewPartitions::increase_to(1)
.into_request_topic(" ".to_owned())
.unwrap_err();
assert!(matches!(
error,
crate::Error::Admin(AdminError::EmptyTopicName)
));
}
#[test]
fn config_resource_and_operation_protocol_values_are_stable() {
assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
assert_eq!(
ConfigResourceType::from_protocol_value(2),
ConfigResourceType::Topic
);
assert_eq!(
ConfigResourceType::from_protocol_value(99),
ConfigResourceType::Unknown
);
assert_eq!(
AlterConfigOp::set("cleanup.policy", "compact")
.op_type
.as_protocol_value(),
0
);
assert_eq!(
AlterConfigOp::delete("retention.ms")
.op_type
.as_protocol_value(),
1
);
assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
assert!(response_supported_config_type(0));
assert!(!response_supported_config_type(-1));
}
#[test]
fn config_resource_config_entry_lookup_returns_named_entry() {
let mut entries = BTreeMap::new();
entries.insert(
"cleanup.policy".to_owned(),
ConfigEntry {
name: "cleanup.policy".to_owned(),
value: Some("compact".to_owned()),
read_only: false,
config_source: 1,
is_sensitive: false,
config_type: Some(2),
documentation: None,
},
);
let config = ConfigResourceConfig {
resource: ConfigResource::topic("orders"),
entries,
};
assert_eq!(
config.entry("cleanup.policy").unwrap().value.as_deref(),
Some("compact")
);
assert!(config.entry("missing").is_none());
}
#[test]
fn feature_update_maps_to_modern_update_features_request() {
let update = FeatureUpdate::upgrade("share.version", 1)
.into_request_update(2)
.unwrap();
assert_eq!(update.feature.to_string(), "share.version");
assert_eq!(update.max_version_level, 1);
assert_eq!(update.upgrade_type, 1);
assert!(!update.allow_downgrade);
}
#[test]
fn feature_update_maps_to_legacy_downgrade_flag() {
let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
.into_request_update(0)
.unwrap();
assert_eq!(update.feature.to_string(), "metadata.version");
assert_eq!(update.max_version_level, 0);
assert_eq!(update.upgrade_type, 1);
assert!(update.allow_downgrade);
}
#[test]
fn feature_update_rejects_empty_name() {
let error = FeatureUpdate::upgrade(" ", 1)
.into_request_update(2)
.unwrap_err();
assert!(
error
.to_string()
.contains("feature names must be non-empty")
);
}
#[test]
fn topic_already_exists_is_ignorable() {
assert!(is_ignorable_create_topic_error(
ResponseError::TopicAlreadyExists
));
assert!(!is_ignorable_create_topic_error(
ResponseError::UnknownTopicOrPartition
));
}
}