use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use tracing::{info, warn};
use crate::auth::{AuthConfig, ScramMechanism};
use crate::error::{KrafkaError, ProtocolErrorKind, Result};
use crate::metadata::{ClusterMetadata, MetadataRecoveryStrategy, TopicInfo};
use crate::metrics::ConnectionMetrics;
use crate::network::{BrokerConnection, ConnectionPool};
use crate::protocol::{
AclBinding, AclOperation, AclPatternType, AclPermissionType, AclResourceType, ApiKey,
DeleteGroupsRequest, DeleteGroupsResponse, DescribeTopicPartitionsCursor,
DescribeTopicPartitionsRequest, DescribeTopicPartitionsResponse, FinalizedFeature,
FindCoordinatorRequest, FindCoordinatorResponse, SupportedFeature, VersionedDecode,
VersionedEncode, validate_topic_name, validate_topic_names, versions,
};
pub use crate::protocol::{ConfigResourceType, DescribeConfigsRequest, DescribeConfigsResource};
mod acls;
mod builder;
mod configs;
mod features;
mod group_offsets;
mod groups;
mod offsets;
mod partitions;
mod quotas;
mod scram;
mod tokens;
mod topics;
mod transactions;
pub use builder::AdminClientBuilder;
const DEFAULT_RESPONSE_PARTITION_LIMIT: i32 = 2000;
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct NewTopic {
pub name: String,
pub num_partitions: i32,
pub replication_factor: i16,
pub configs: HashMap<String, String>,
}
impl NewTopic {
pub fn new(
name: impl Into<String>,
num_partitions: i32,
replication_factor: i16,
) -> Result<Self> {
let name = name.into();
validate_topic_name(&name)?;
if num_partitions == 0 || num_partitions < -1 {
return Err(KrafkaError::config(format!(
"num_partitions must be positive or -1, got {num_partitions}"
)));
}
if replication_factor == 0 || replication_factor < -1 {
return Err(KrafkaError::config(format!(
"replication_factor must be positive or -1, got {replication_factor}"
)));
}
Ok(Self {
name,
num_partitions,
replication_factor,
configs: HashMap::new(),
})
}
pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.configs.insert(key.into(), value.into());
self
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct CreateTopicResult {
pub name: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DeleteTopicResult {
pub name: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct CreatePartitionsResult {
pub topic: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ConfigEntry {
pub name: String,
pub value: Option<String>,
pub read_only: bool,
pub is_default: bool,
pub is_sensitive: bool,
pub config_source: i8,
pub synonyms: Vec<ConfigSynonymEntry>,
pub config_type: i8,
pub documentation: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ConfigSynonymEntry {
pub name: String,
pub value: Option<String>,
pub source: i8,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterConfigResult {
pub resource_name: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeAclsResult {
pub error: Option<String>,
pub bindings: Vec<AclBinding>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct CreateAclsResult {
pub results: Vec<CreateAclResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct CreateAclResult {
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DeleteAclsResult {
pub filter_results: Vec<DeleteAclFilterResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DeleteAclFilterResult {
pub error: Option<String>,
pub deleted_count: usize,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GroupType {
Classic,
Consumer,
Unknown(String),
}
impl std::fmt::Display for GroupType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Classic => f.write_str("classic"),
Self::Consumer => f.write_str("consumer"),
Self::Unknown(s) => f.write_str(s),
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ConsumerGroupDescription {
pub group_id: String,
pub group_type: GroupType,
pub state: String,
pub protocol_type: Option<String>,
pub assignor: Option<String>,
pub group_epoch: Option<i32>,
pub assignment_epoch: Option<i32>,
pub members: Vec<ConsumerGroupMember>,
pub authorized_operations: Option<i32>,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ConsumerGroupMember {
pub member_id: String,
pub instance_id: Option<String>,
pub rack_id: Option<String>,
pub member_epoch: Option<i32>,
pub client_id: String,
pub client_host: String,
pub subscribed_topic_names: Option<Vec<String>>,
pub subscribed_topic_regex: Option<String>,
pub assignment: Option<Vec<TopicPartitionAssignment>>,
pub target_assignment: Option<Vec<TopicPartitionAssignment>>,
pub member_type: Option<i8>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct TopicPartitionAssignment {
pub topic_id: [u8; 16],
pub topic_name: String,
pub partitions: Vec<i32>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ConsumerGroupListing {
pub group_id: String,
pub protocol_type: String,
pub group_type: Option<GroupType>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeTopicPartitionsResult {
pub topics: Vec<TopicPartitionDescription>,
pub next_cursor_topic: Option<String>,
pub next_cursor_partition: Option<i32>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct TopicPartitionDescription {
pub name: Option<String>,
pub topic_id: [u8; 16],
pub is_internal: bool,
pub partitions: Vec<PartitionDescription>,
pub topic_authorized_operations: i32,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct PartitionDescription {
pub partition_index: i32,
pub leader_id: i32,
pub leader_epoch: i32,
pub replica_nodes: Vec<i32>,
pub isr_nodes: Vec<i32>,
pub eligible_leader_replicas: Option<Vec<i32>>,
pub last_known_elr: Option<Vec<i32>>,
pub offline_replicas: Vec<i32>,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DeleteRecordResult {
pub topic: String,
pub partition: i32,
pub low_watermark: i64,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct LeaderEpochResult {
pub topic: String,
pub partition: i32,
pub leader_epoch: i32,
pub end_offset: i64,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DelegationTokenRenewer {
pub principal_type: String,
pub principal_name: String,
}
#[non_exhaustive]
#[derive(Clone)]
pub struct DelegationToken {
pub principal_type: String,
pub principal_name: String,
pub issue_timestamp_ms: i64,
pub expiry_timestamp_ms: i64,
pub max_timestamp_ms: i64,
pub token_id: String,
pub hmac: Bytes,
pub renewers: Vec<DelegationTokenRenewer>,
}
impl std::fmt::Debug for DelegationToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DelegationToken")
.field("principal_type", &self.principal_type)
.field("principal_name", &self.principal_name)
.field("issue_timestamp_ms", &self.issue_timestamp_ms)
.field("expiry_timestamp_ms", &self.expiry_timestamp_ms)
.field("max_timestamp_ms", &self.max_timestamp_ms)
.field("token_id", &self.token_id)
.field("hmac", &"[REDACTED]")
.field("renewers", &self.renewers)
.finish()
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct CreateDelegationTokenResult {
pub token: Option<DelegationToken>,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct RenewDelegationTokenResult {
pub expiry_timestamp_ms: i64,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ExpireDelegationTokenResult {
pub expiry_timestamp_ms: i64,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct QuotaEntityComponent {
pub entity_type: String,
pub entity_name: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct QuotaConfig {
pub key: String,
pub value: f64,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct QuotaDescription {
pub entity: Vec<QuotaEntityComponent>,
pub values: Vec<QuotaConfig>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeClientQuotasResult {
pub entries: Vec<QuotaDescription>,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterClientQuotaResult {
pub entity: Vec<QuotaEntityComponent>,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct QuotaAlteration<'a> {
pub entity: Vec<(&'a str, Option<&'a str>)>,
pub ops: Vec<(&'a str, Option<f64>)>,
}
#[non_exhaustive]
#[derive(Debug, Clone, Default)]
pub struct AclFilter {
pub resource_type: AclResourceType,
pub resource_name: Option<String>,
pub pattern_type: AclPatternType,
pub principal: Option<String>,
pub host: Option<String>,
pub operation: AclOperation,
pub permission_type: AclPermissionType,
}
impl AclFilter {
pub fn all() -> Self {
Self::default()
}
pub fn for_resource(resource_type: AclResourceType, resource_name: impl Into<String>) -> Self {
Self {
resource_type,
resource_name: Some(resource_name.into()),
..Default::default()
}
}
pub fn for_principal(principal: impl Into<String>) -> Self {
Self {
principal: Some(principal.into()),
..Default::default()
}
}
pub fn resource_type(mut self, resource_type: AclResourceType) -> Self {
self.resource_type = resource_type;
self
}
pub fn resource_name(mut self, name: impl Into<String>) -> Self {
self.resource_name = Some(name.into());
self
}
pub fn pattern_type(mut self, pattern_type: AclPatternType) -> Self {
self.pattern_type = pattern_type;
self
}
pub fn principal(mut self, principal: impl Into<String>) -> Self {
self.principal = Some(principal.into());
self
}
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = Some(host.into());
self
}
pub fn operation(mut self, operation: AclOperation) -> Self {
self.operation = operation;
self
}
pub fn permission_type(mut self, permission_type: AclPermissionType) -> Self {
self.permission_type = permission_type;
self
}
}
#[derive(Debug, Clone)]
pub struct AdminConfig {
pub(crate) bootstrap_servers: String,
pub(crate) client_id: String,
pub(crate) request_timeout: Duration,
pub(crate) metadata_recovery_strategy: MetadataRecoveryStrategy,
pub(crate) metadata_recovery_rebootstrap_trigger: Duration,
pub(crate) auth: Option<AuthConfig>,
#[cfg(feature = "socks5")]
pub(crate) proxy: Option<crate::network::ProxyConfig>,
}
impl Default for AdminConfig {
fn default() -> Self {
Self {
bootstrap_servers: String::new(),
client_id: "krafka-admin".to_string(),
request_timeout: Duration::from_secs(30),
metadata_recovery_strategy: MetadataRecoveryStrategy::Rebootstrap,
metadata_recovery_rebootstrap_trigger: Duration::from_secs(300),
auth: None,
#[cfg(feature = "socks5")]
proxy: None,
}
}
}
impl AdminConfig {
pub fn builder() -> AdminConfigBuilder {
AdminConfigBuilder::default()
}
#[inline]
pub fn bootstrap_servers(&self) -> &str {
&self.bootstrap_servers
}
#[inline]
pub fn client_id(&self) -> &str {
&self.client_id
}
#[inline]
pub fn request_timeout(&self) -> Duration {
self.request_timeout
}
#[inline]
pub fn metadata_recovery_strategy(&self) -> MetadataRecoveryStrategy {
self.metadata_recovery_strategy
}
#[inline]
pub fn metadata_recovery_rebootstrap_trigger(&self) -> Duration {
self.metadata_recovery_rebootstrap_trigger
}
#[inline]
pub fn auth(&self) -> Option<&AuthConfig> {
self.auth.as_ref()
}
#[cfg(feature = "socks5")]
#[inline]
pub fn proxy(&self) -> Option<&crate::network::ProxyConfig> {
self.proxy.as_ref()
}
}
#[must_use = "builders do nothing until .build() is called"]
#[derive(Debug, Default)]
pub struct AdminConfigBuilder {
config: AdminConfig,
}
impl AdminConfigBuilder {
pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
self.config.bootstrap_servers = servers.into();
self
}
pub fn client_id(mut self, id: impl Into<String>) -> Self {
self.config.client_id = id.into();
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.config.request_timeout = timeout;
self
}
pub fn metadata_recovery_strategy(mut self, strategy: MetadataRecoveryStrategy) -> Self {
self.config.metadata_recovery_strategy = strategy;
self
}
pub fn metadata_recovery_rebootstrap_trigger(mut self, duration: Duration) -> Self {
self.config.metadata_recovery_rebootstrap_trigger = duration;
self
}
pub fn auth(mut self, auth: AuthConfig) -> Self {
self.config.auth = Some(auth);
self
}
#[cfg(feature = "socks5")]
pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
self.config.proxy = Some(proxy);
self
}
pub fn build(self) -> AdminConfig {
self.config
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DeleteGroupResult {
pub group_id: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeClusterResult {
pub cluster_id: String,
pub controller_id: i32,
pub brokers: Vec<DescribeClusterBrokerInfo>,
pub cluster_authorized_operations: i32,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeClusterBrokerInfo {
pub broker_id: i32,
pub host: String,
pub port: i32,
pub rack: Option<String>,
}
pub struct AdminClient {
config: AdminConfig,
metadata: Arc<ClusterMetadata>,
pool: Arc<ConnectionPool>,
closed: std::sync::atomic::AtomicBool,
}
impl Drop for AdminClient {
fn drop(&mut self) {
if !self.closed.load(std::sync::atomic::Ordering::SeqCst) && !std::thread::panicking() {
warn!(
"AdminClient dropped without close(); in-flight RPCs may fail abruptly. \
Call `AdminClient::close()` before drop."
);
}
}
}
impl AdminClient {
pub fn builder() -> AdminClientBuilder {
AdminClientBuilder::default()
}
#[inline]
fn check_not_closed(&self) -> Result<()> {
if self.is_closed() {
return Err(KrafkaError::invalid_state("AdminClient is closed"));
}
Ok(())
}
async fn get_any_broker_connection(&self) -> Result<Arc<BrokerConnection>> {
self.check_not_closed()?;
let brokers = self.metadata.brokers();
if brokers.is_empty() {
return Err(KrafkaError::broker(
crate::error::ErrorCode::UnknownServerError,
"no brokers available",
));
}
let broker = &brokers[0];
self.pool
.get_connection_by_id(broker.id(), broker.address())
.await
}
async fn find_group_coordinator(&self, group_id: &str) -> Result<Arc<BrokerConnection>> {
let any_conn = self.get_any_broker_connection().await?;
let coord_request = FindCoordinatorRequest::for_group(group_id);
let coord_version = any_conn
.negotiate_api_version(
ApiKey::FindCoordinator,
versions::FIND_COORDINATOR_MAX,
versions::FIND_COORDINATOR_MIN,
)
.await
.ok_or_else(|| {
KrafkaError::protocol_kind(
ProtocolErrorKind::UnknownApiVersion,
"no mutually supported FindCoordinator API version",
)
})?;
let coord_response_bytes = any_conn
.send_request(ApiKey::FindCoordinator, coord_version, |buf| {
coord_request.encode_versioned(coord_version, buf)
})
.await?;
let mut coord_buf = coord_response_bytes;
let coord_response =
FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
if coord_response.error_code.is_ok() {
let addr = format!("{}:{}", coord_response.host, coord_response.port);
self.pool
.get_connection_by_id(coord_response.node_id, &addr)
.await
} else {
warn!(
"FindCoordinator failed for group '{}': {:?}, using any broker",
group_id, coord_response.error_code
);
Ok(any_conn)
}
}
pub async fn delete_consumer_groups(
&self,
group_ids: Vec<String>,
) -> Result<Vec<DeleteGroupResult>> {
self.check_not_closed()?;
let conn = self.get_any_broker_connection().await?;
let request = DeleteGroupsRequest::new(group_ids);
let version = conn
.negotiate_api_version(
ApiKey::DeleteGroups,
versions::DELETE_GROUPS_MAX,
versions::DELETE_GROUPS_MIN,
)
.await
.ok_or_else(|| {
KrafkaError::protocol_kind(
ProtocolErrorKind::UnknownApiVersion,
"no mutually supported DeleteGroups API version",
)
})?;
let response_bytes = conn
.send_request(ApiKey::DeleteGroups, version, |buf| {
request.encode_versioned(version, buf)
})
.await?;
let mut buf = response_bytes;
let response = DeleteGroupsResponse::decode_versioned(version, &mut buf)?;
let results = response
.results
.into_iter()
.map(|r| DeleteGroupResult {
group_id: r.group_id,
error: if r.error_code.is_ok() {
None
} else {
Some(format!("{:?}", r.error_code))
},
})
.collect();
Ok(results)
}
pub async fn describe_topic_partitions(
&self,
topics: Vec<String>,
) -> Result<DescribeTopicPartitionsResult> {
self.check_not_closed()?;
validate_topic_names(topics.iter().map(String::as_str))?;
let conn = self.get_any_broker_connection().await?;
let version = conn
.negotiate_api_version(
ApiKey::DescribeTopicPartitions,
versions::DESCRIBE_TOPIC_PARTITIONS_MAX,
versions::DESCRIBE_TOPIC_PARTITIONS_MIN,
)
.await
.ok_or_else(|| {
KrafkaError::protocol_kind(
ProtocolErrorKind::UnknownApiVersion,
"no mutually supported DescribeTopicPartitions API version",
)
})?;
let mut all_topics: Vec<TopicPartitionDescription> = Vec::new();
let mut cursor = None;
loop {
let request = DescribeTopicPartitionsRequest {
topics: topics.clone(),
response_partition_limit: DEFAULT_RESPONSE_PARTITION_LIMIT,
cursor,
};
let response_bytes = conn
.send_request(ApiKey::DescribeTopicPartitions, version, |buf| {
request.encode_versioned(version, buf)
})
.await?;
let mut buf = response_bytes;
let response = DescribeTopicPartitionsResponse::decode_versioned(version, &mut buf)?;
for t in response.topics {
let null_uuid = [0u8; 16];
let existing = if t.topic_id != null_uuid {
all_topics.iter_mut().find(|e| e.topic_id == t.topic_id)
} else {
all_topics.iter_mut().find(|e| e.name == t.name)
};
let partitions: Vec<PartitionDescription> = t
.partitions
.into_iter()
.map(|p| PartitionDescription {
partition_index: p.partition_index,
leader_id: p.leader_id,
leader_epoch: p.leader_epoch,
replica_nodes: p.replica_nodes,
isr_nodes: p.isr_nodes,
eligible_leader_replicas: p.eligible_leader_replicas,
last_known_elr: p.last_known_elr,
offline_replicas: p.offline_replicas,
error: if p.error_code.is_ok() {
None
} else {
Some(format!("{:?}", p.error_code))
},
})
.collect();
if let Some(entry) = existing {
entry.partitions.extend(partitions);
} else {
all_topics.push(TopicPartitionDescription {
name: t.name,
topic_id: t.topic_id,
is_internal: t.is_internal,
partitions,
topic_authorized_operations: t.topic_authorized_operations,
error: if t.error_code.is_ok() {
None
} else {
Some(format!("{:?}", t.error_code))
},
});
}
}
match response.next_cursor {
Some(c) => {
cursor = Some(DescribeTopicPartitionsCursor {
topic_name: c.topic_name,
partition_index: c.partition_index,
});
}
None => break,
}
}
info!("Described partitions for {} topics", all_topics.len());
Ok(DescribeTopicPartitionsResult {
topics: all_topics,
next_cursor_topic: None,
next_cursor_partition: None,
})
}
pub fn pool(&self) -> &Arc<ConnectionPool> {
&self.pool
}
pub fn update_seed_brokers(&self, servers: Vec<String>) -> Result<()> {
self.metadata.update_seed_brokers(servers)
}
pub async fn rebootstrap(&self) {
self.metadata.rebootstrap().await;
}
pub async fn close(&self) {
if self.closed.swap(true, std::sync::atomic::Ordering::SeqCst) {
return;
}
self.pool.close_all().await;
info!("AdminClient closed");
}
#[inline]
pub fn is_closed(&self) -> bool {
self.closed.load(std::sync::atomic::Ordering::SeqCst)
}
#[inline]
pub fn connection_metrics(&self) -> Arc<ConnectionMetrics> {
self.pool.metrics()
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeFeaturesResult {
pub supported_features: Vec<SupportedFeature>,
pub finalized_features: Vec<FinalizedFeature>,
pub finalized_features_epoch: i64,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct UpdateFeaturesResult {
pub results: Vec<UpdateFeatureResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct UpdateFeatureResult {
pub feature: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct LogDirInfo {
pub broker_id: i32,
pub log_dir: String,
pub error: Option<String>,
pub topics: Vec<LogDirTopicInfo>,
pub total_bytes: i64,
pub usable_bytes: i64,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct LogDirTopicInfo {
pub name: String,
pub partitions: Vec<LogDirPartitionInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct LogDirPartitionInfo {
pub partition_index: i32,
pub partition_size: i64,
pub offset_lag: i64,
pub is_future_key: bool,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ElectLeadersResult {
pub topic: String,
pub partitions: Vec<ElectLeadersPartitionInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ElectLeadersPartitionInfo {
pub partition_id: i32,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterReassignmentsResult {
pub error: Option<String>,
pub topics: Vec<ReassignmentTopicResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ReassignmentTopicResult {
pub name: String,
pub partitions: Vec<ReassignmentPartitionResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ReassignmentPartitionResult {
pub partition_index: i32,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct PartitionReassignmentInfo {
pub name: String,
pub partitions: Vec<PartitionReassignmentPartitionInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct PartitionReassignmentPartitionInfo {
pub partition_index: i32,
pub replicas: Vec<i32>,
pub adding_replicas: Vec<i32>,
pub removing_replicas: Vec<i32>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirsResult {
pub broker_id: i32,
pub topic_name: String,
pub partitions: Vec<AlterReplicaLogDirsPartitionResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirsPartitionResult {
pub partition_index: i32,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct OffsetDeleteResult {
pub error: Option<String>,
pub topics: Vec<OffsetDeleteTopicResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct OffsetDeleteTopicResult {
pub name: String,
pub partitions: Vec<OffsetDeletePartitionResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct OffsetDeletePartitionResult {
pub partition_index: i32,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeUserScramCredentialsResult {
pub error: Option<String>,
pub users: Vec<ScramCredentialUserResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ScramCredentialUserResult {
pub name: String,
pub error: Option<String>,
pub credential_infos: Vec<ScramCredentialInfoResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ScramCredentialInfoResult {
pub mechanism: ScramMechanism,
pub iterations: i32,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterScramCredentialResult {
pub user: String,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeProducersTopicResult {
pub name: String,
pub partitions: Vec<DescribeProducersPartitionInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeProducersPartitionInfo {
pub partition_index: i32,
pub error: Option<String>,
pub active_producers: Vec<ProducerStateInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ProducerStateInfo {
pub producer_id: i64,
pub producer_epoch: i32,
pub last_sequence: i32,
pub last_timestamp: i64,
pub coordinator_epoch: i32,
pub current_txn_start_offset: i64,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct TransactionDescription {
pub transactional_id: String,
pub error: Option<String>,
pub state: String,
pub timeout_ms: i32,
pub start_time_ms: i64,
pub producer_id: i64,
pub producer_epoch: i16,
pub topics: Vec<TransactionTopicInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct TransactionTopicInfo {
pub topic: String,
pub partitions: Vec<i32>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ListTransactionsResult {
pub error: Option<String>,
pub unknown_state_filters: Vec<String>,
pub transactions: Vec<TransactionListEntry>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct TransactionListEntry {
pub transactional_id: String,
pub producer_id: i64,
pub state: String,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct WriteTxnMarkersPartitionResult {
pub partition_index: i32,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct WriteTxnMarkersTopicResult {
pub name: String,
pub partitions: Vec<WriteTxnMarkersPartitionResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct WriteTxnMarkersResult {
pub producer_id: i64,
pub topics: Vec<WriteTxnMarkersTopicResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct QuorumReplicaInfo {
pub replica_id: i32,
pub log_end_offset: i64,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct QuorumPartitionResult {
pub partition_index: i32,
pub error: Option<String>,
pub leader_id: i32,
pub leader_epoch: i32,
pub high_watermark: i64,
pub current_voters: Vec<QuorumReplicaInfo>,
pub observers: Vec<QuorumReplicaInfo>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct QuorumTopicResult {
pub topic_name: String,
pub partitions: Vec<QuorumPartitionResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct DescribeQuorumResult {
pub error: Option<String>,
pub topics: Vec<QuorumTopicResult>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ListOffsetResult {
pub topic: String,
pub partition: i32,
pub offset: i64,
pub timestamp: i64,
pub error: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OffsetSpec {
Earliest,
Latest,
Timestamp(i64),
}
impl OffsetSpec {
fn as_timestamp(self) -> i64 {
match self {
OffsetSpec::Earliest => -2,
OffsetSpec::Latest => -1,
OffsetSpec::Timestamp(ts) => ts,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ConsumerGroupLag {
pub topic: String,
pub partition: i32,
pub committed_offset: Option<i64>,
pub end_offset: i64,
pub lag: Option<i64>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct GroupOffsetEntry {
pub topic: String,
pub partition: i32,
pub committed_offset: i64,
pub metadata: Option<String>,
pub error: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlterGroupOffsetResult {
pub topic: String,
pub partition: i32,
pub error: Option<String>,
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_new_topic() {
let topic = NewTopic::new("test-topic", 3, 2)
.unwrap()
.with_config("cleanup.policy", "compact")
.with_config("retention.ms", "86400000");
assert_eq!(topic.name, "test-topic");
assert_eq!(topic.num_partitions, 3);
assert_eq!(topic.replication_factor, 2);
assert_eq!(topic.configs.len(), 2);
}
#[test]
fn test_new_topic_validation() {
assert!(NewTopic::new("t", 1, 1).is_ok());
assert!(NewTopic::new("t", -1, -1).is_ok());
assert!(NewTopic::new("t", 0, 1).is_err());
assert!(NewTopic::new("t", -2, 1).is_err());
assert!(NewTopic::new("t", 1, 0).is_err());
assert!(NewTopic::new("t", 1, -2).is_err());
}
#[test]
fn test_new_topic_name_validation_rejects_empty_and_oversize() {
let empty = NewTopic::new("", 1, 1).unwrap_err().to_string();
assert!(
empty.contains("topic name cannot be empty"),
"expected empty-name error, got: {empty}"
);
let oversize = "x".repeat(250);
let err = NewTopic::new(oversize, 1, 1).unwrap_err().to_string();
assert!(
err.contains("exceeds maximum of 249"),
"expected topic-name-length error, got: {err}"
);
let max_ok = "x".repeat(249);
assert!(NewTopic::new(max_ok, 1, 1).is_ok());
}
#[test]
fn test_admin_config_default() {
let config = AdminConfig::default();
assert_eq!(config.client_id, "krafka-admin");
assert_eq!(config.request_timeout, Duration::from_secs(30));
assert_eq!(
config.metadata_recovery_strategy,
MetadataRecoveryStrategy::Rebootstrap
);
}
#[test]
fn test_describe_acls_result() {
let result = DescribeAclsResult {
error: None,
bindings: vec![
AclBinding::allow_read_topic("my-topic", "User:alice"),
AclBinding::allow_write_topic("my-topic", "User:bob"),
],
};
assert!(result.error.is_none());
assert_eq!(result.bindings.len(), 2);
}
#[test]
fn test_create_acls_result() {
let result = CreateAclsResult {
results: vec![
CreateAclResult { error: None },
CreateAclResult {
error: Some("ACL already exists".to_string()),
},
],
};
assert!(result.results[0].error.is_none());
assert!(result.results[1].error.is_some());
}
#[test]
fn test_delete_acls_result() {
let result = DeleteAclsResult {
filter_results: vec![
DeleteAclFilterResult {
error: None,
deleted_count: 3,
},
DeleteAclFilterResult {
error: None,
deleted_count: 0,
},
],
};
assert_eq!(result.filter_results[0].deleted_count, 3);
assert_eq!(result.filter_results[1].deleted_count, 0);
}
#[test]
fn test_acl_filter_builder() {
use crate::protocol::{AclOperation, AclPatternType, AclPermissionType, AclResourceType};
let filter = AclFilter::all();
assert_eq!(filter.resource_type, AclResourceType::Any);
assert_eq!(filter.pattern_type, AclPatternType::Any);
assert_eq!(filter.operation, AclOperation::Any);
assert_eq!(filter.permission_type, AclPermissionType::Any);
assert!(filter.resource_name.is_none());
assert!(filter.principal.is_none());
assert!(filter.host.is_none());
let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
assert_eq!(filter.resource_type, AclResourceType::Topic);
assert_eq!(filter.resource_name, Some("my-topic".to_string()));
let filter = AclFilter::for_principal("User:alice");
assert_eq!(filter.principal, Some("User:alice".to_string()));
let filter = AclFilter::all()
.resource_type(AclResourceType::Group)
.resource_name("my-group")
.pattern_type(AclPatternType::Literal)
.principal("User:bob")
.host("localhost")
.operation(AclOperation::Read)
.permission_type(AclPermissionType::Allow);
assert_eq!(filter.resource_type, AclResourceType::Group);
assert_eq!(filter.resource_name, Some("my-group".to_string()));
assert_eq!(filter.pattern_type, AclPatternType::Literal);
assert_eq!(filter.principal, Some("User:bob".to_string()));
assert_eq!(filter.host, Some("localhost".to_string()));
assert_eq!(filter.operation, AclOperation::Read);
assert_eq!(filter.permission_type, AclPermissionType::Allow);
}
#[test]
fn test_consumer_group_description() {
let desc = ConsumerGroupDescription {
group_id: "my-group".to_string(),
group_type: GroupType::Classic,
state: "Stable".to_string(),
protocol_type: Some("consumer".to_string()),
assignor: Some("range".to_string()),
group_epoch: None,
assignment_epoch: None,
members: vec![
ConsumerGroupMember {
member_id: "member-1".to_string(),
instance_id: Some("instance-1".to_string()),
rack_id: None,
member_epoch: None,
client_id: "my-client".to_string(),
client_host: "/127.0.0.1".to_string(),
subscribed_topic_names: None,
subscribed_topic_regex: None,
assignment: None,
target_assignment: None,
member_type: None,
},
ConsumerGroupMember {
member_id: "member-2".to_string(),
instance_id: None,
rack_id: None,
member_epoch: None,
client_id: "other-client".to_string(),
client_host: "/192.168.1.1".to_string(),
subscribed_topic_names: None,
subscribed_topic_regex: None,
assignment: None,
target_assignment: None,
member_type: None,
},
],
authorized_operations: None,
error: None,
};
assert_eq!(desc.group_id, "my-group");
assert_eq!(desc.group_type, GroupType::Classic);
assert_eq!(desc.state, "Stable");
assert_eq!(desc.members.len(), 2);
assert!(desc.members[0].instance_id.is_some());
assert!(desc.members[1].instance_id.is_none());
assert!(desc.error.is_none());
}
#[test]
fn test_consumer_group_listing() {
let listing = ConsumerGroupListing {
group_id: "my-group".to_string(),
protocol_type: "consumer".to_string(),
group_type: Some(GroupType::Consumer),
};
assert_eq!(listing.group_id, "my-group");
assert_eq!(listing.protocol_type, "consumer");
assert_eq!(listing.group_type, Some(GroupType::Consumer));
}
#[test]
fn test_delete_record_result() {
let result = DeleteRecordResult {
topic: "my-topic".to_string(),
partition: 0,
low_watermark: 100,
error: None,
};
assert_eq!(result.topic, "my-topic");
assert_eq!(result.partition, 0);
assert_eq!(result.low_watermark, 100);
assert!(result.error.is_none());
let result_err = DeleteRecordResult {
topic: "my-topic".to_string(),
partition: 1,
low_watermark: -1,
error: Some("NotLeaderOrFollower".to_string()),
};
assert!(result_err.error.is_some());
}
#[test]
fn test_leader_epoch_result() {
let result = LeaderEpochResult {
topic: "my-topic".to_string(),
partition: 0,
leader_epoch: 5,
end_offset: 1000,
error: None,
};
assert_eq!(result.topic, "my-topic");
assert_eq!(result.leader_epoch, 5);
assert_eq!(result.end_offset, 1000);
assert!(result.error.is_none());
}
#[test]
fn test_admin_client_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<AdminClient>();
}
#[cfg(feature = "socks5")]
#[test]
fn test_admin_config_builder_proxy_round_trip() {
let config = AdminConfig::builder()
.proxy(crate::network::ProxyConfig::new("proxy:1080"))
.build();
let proxy = config.proxy().expect("proxy should be set");
assert_eq!(proxy.address(), "proxy:1080");
}
}