use std::time::Duration;
use crabka_client_core::{ClientError, Connection, ConnectionOptions};
use thiserror::Error;
pub mod configs;
pub mod delegation_tokens;
pub mod log_dirs;
pub mod quotas;
pub mod topics;
pub mod users;
pub use configs::{AlterConfigsOutcome, IncrementalAlterOp, TopicConfigOverrides};
pub use log_dirs::{AlterReplicaLogDirOutcome, LogDirInfo, LogDirPartitionInfo, LogDirTopicInfo};
pub use quotas::{QuotaOp, UserQuotaConfig, diff_user_quotas};
pub use topics::{
CreatePartitionsOp, CreatePartitionsOutcome, CreateTopicOutcome, CreateTopicSpec,
DeleteTopicOutcome, TopicMetadata, TopicMetadataEntry,
};
pub use users::{
AclEntry, AclEntryFilter, AclOperation, CreateAclOutcome, DEFAULT_SCRAM_ITERATIONS,
DeleteAclFilterOutcome, PatternType, PermissionType, ResourceType, ScramDeletion,
ScramUpsertion, ScramUserOutcome,
};
#[async_trait::async_trait]
pub trait AdminClientLike: Send {
async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError>;
async fn create_topics(
&mut self,
specs: &[CreateTopicSpec],
timeout_ms: i32,
) -> Result<Vec<CreateTopicOutcome>, AdminError>;
async fn delete_topics(
&mut self,
names: &[&str],
timeout_ms: i32,
) -> Result<Vec<DeleteTopicOutcome>, AdminError>;
async fn create_partitions(
&mut self,
ops: &[CreatePartitionsOp],
timeout_ms: i32,
) -> Result<Vec<CreatePartitionsOutcome>, AdminError>;
async fn describe_configs(
&mut self,
topics: &[&str],
) -> Result<Vec<TopicConfigOverrides>, AdminError>;
async fn incremental_alter_configs(
&mut self,
ops: &[IncrementalAlterOp],
) -> Result<Vec<AlterConfigsOutcome>, AdminError>;
async fn alter_user_scram_credentials_sha512(
&mut self,
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
) -> Result<Vec<ScramUserOutcome>, AdminError>;
async fn alter_user_scram_credentials_sha256(
&mut self,
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
) -> Result<Vec<ScramUserOutcome>, AdminError>;
async fn describe_acls(&mut self, filter: &AclEntryFilter)
-> Result<Vec<AclEntry>, AdminError>;
async fn create_acls(
&mut self,
creations: &[AclEntry],
) -> Result<Vec<CreateAclOutcome>, AdminError>;
async fn delete_acls(
&mut self,
filters: &[AclEntryFilter],
) -> Result<Vec<DeleteAclFilterOutcome>, AdminError>;
async fn describe_user_quotas(&mut self, username: &str)
-> Result<UserQuotaConfig, AdminError>;
async fn alter_user_quotas(
&mut self,
username: &str,
ops: &[QuotaOp],
validate_only: bool,
) -> Result<Option<KafkaError>, AdminError>;
async fn create_delegation_token_as_owner(
&mut self,
owner_principal_name: &str,
renewers: &[String],
max_lifetime_ms: i64,
) -> Result<crabka_metadata::DelegationToken, AdminError>;
async fn renew_delegation_token(
&mut self,
hmac: &[u8],
) -> Result<crabka_metadata::DelegationToken, AdminError>;
async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError>;
async fn describe_delegation_tokens_owned_by(
&mut self,
owner_principal: &str,
) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError>;
}
#[async_trait::async_trait]
impl AdminClientLike for AdminClient {
async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
AdminClient::metadata(self, topics).await
}
async fn create_topics(
&mut self,
specs: &[CreateTopicSpec],
timeout_ms: i32,
) -> Result<Vec<CreateTopicOutcome>, AdminError> {
AdminClient::create_topics(self, specs, timeout_ms).await
}
async fn delete_topics(
&mut self,
names: &[&str],
timeout_ms: i32,
) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
AdminClient::delete_topics(self, names, timeout_ms).await
}
async fn create_partitions(
&mut self,
ops: &[CreatePartitionsOp],
timeout_ms: i32,
) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
AdminClient::create_partitions(self, ops, timeout_ms).await
}
async fn describe_configs(
&mut self,
topics: &[&str],
) -> Result<Vec<TopicConfigOverrides>, AdminError> {
AdminClient::describe_configs(self, topics).await
}
async fn incremental_alter_configs(
&mut self,
ops: &[IncrementalAlterOp],
) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
AdminClient::incremental_alter_configs(self, ops).await
}
async fn alter_user_scram_credentials_sha512(
&mut self,
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
) -> Result<Vec<ScramUserOutcome>, AdminError> {
AdminClient::alter_user_scram_credentials_sha512(self, upsertions, deletions).await
}
async fn alter_user_scram_credentials_sha256(
&mut self,
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
) -> Result<Vec<ScramUserOutcome>, AdminError> {
AdminClient::alter_user_scram_credentials_sha256(self, upsertions, deletions).await
}
async fn describe_acls(
&mut self,
filter: &AclEntryFilter,
) -> Result<Vec<AclEntry>, AdminError> {
AdminClient::describe_acls(self, filter).await
}
async fn create_acls(
&mut self,
creations: &[AclEntry],
) -> Result<Vec<CreateAclOutcome>, AdminError> {
AdminClient::create_acls(self, creations).await
}
async fn delete_acls(
&mut self,
filters: &[AclEntryFilter],
) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
AdminClient::delete_acls(self, filters).await
}
async fn describe_user_quotas(
&mut self,
username: &str,
) -> Result<UserQuotaConfig, AdminError> {
AdminClient::describe_user_quotas(self, username).await
}
async fn alter_user_quotas(
&mut self,
username: &str,
ops: &[QuotaOp],
validate_only: bool,
) -> Result<Option<KafkaError>, AdminError> {
AdminClient::alter_user_quotas(self, username, ops, validate_only).await
}
async fn create_delegation_token_as_owner(
&mut self,
owner_principal_name: &str,
renewers: &[String],
max_lifetime_ms: i64,
) -> Result<crabka_metadata::DelegationToken, AdminError> {
let resp = AdminClient::create_delegation_token_as_owner(
self,
owner_principal_name,
renewers,
max_lifetime_ms,
)
.await?;
let renewers_image = renewers
.iter()
.filter_map(|s| renewer_str_to_principal(s))
.collect();
Ok(crabka_metadata::DelegationToken {
token_id: resp.token_id,
owner: crabka_security::KafkaPrincipal {
principal_type: resp.principal_type,
name: resp.principal_name,
},
hmac: resp.hmac.to_vec(),
issue_timestamp_ms: resp.issue_timestamp_ms,
expiry_timestamp_ms: resp.expiry_timestamp_ms,
max_timestamp_ms: resp.max_timestamp_ms,
renewers: renewers_image,
})
}
async fn renew_delegation_token(
&mut self,
hmac: &[u8],
) -> Result<crabka_metadata::DelegationToken, AdminError> {
let _new_expiry = AdminClient::renew_delegation_token(self, hmac).await?;
let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::default();
let resp = self.conn.send(req).await?;
if resp.error_code != 0 {
return Err(AdminError::Broker {
api: "DescribeDelegationToken",
code: resp.error_code,
name: kafka_error_name(resp.error_code),
message: None,
});
}
let matched = resp
.tokens
.into_iter()
.find(|t| t.hmac.as_ref() == hmac)
.ok_or_else(|| {
AdminError::Protocol(
"RenewDelegationToken: follow-up describe did not return the renewed token"
.into(),
)
})?;
Ok(crabka_metadata::DelegationToken {
token_id: matched.token_id,
owner: crabka_security::KafkaPrincipal {
principal_type: matched.principal_type,
name: matched.principal_name,
},
hmac: matched.hmac.to_vec(),
issue_timestamp_ms: matched.issue_timestamp,
expiry_timestamp_ms: matched.expiry_timestamp,
max_timestamp_ms: matched.max_timestamp,
renewers: matched
.renewers
.into_iter()
.map(|r| crabka_security::KafkaPrincipal {
principal_type: r.principal_type,
name: r.principal_name,
})
.collect(),
})
}
async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
AdminClient::expire_delegation_token(self, hmac).await
}
async fn describe_delegation_tokens_owned_by(
&mut self,
owner_principal: &str,
) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError> {
AdminClient::describe_delegation_tokens_owned_by(self, owner_principal).await
}
}
fn renewer_str_to_principal(s: &str) -> Option<crabka_security::KafkaPrincipal> {
if s.is_empty() {
return None;
}
let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
Some(crabka_security::KafkaPrincipal {
principal_type: pt.to_string(),
name: pn.to_string(),
})
}
#[derive(Debug, Error)]
pub enum AdminError {
#[error("no bootstrap address was reachable: tried {tried}")]
Connect { tried: usize },
#[error("controller routing failed after retry")]
NotControllerExhausted,
#[error("broker returned error: api={api} code={code} ({name}){detail}",
detail = .message.as_deref().map(|m| format!(" {m:?}")).unwrap_or_default())]
Broker {
api: &'static str,
code: i16,
name: &'static str,
message: Option<String>,
},
#[error("client-core: {0}")]
Transport(#[from] ClientError),
#[error("protocol: {0}")]
Protocol(String),
}
#[derive(Debug, Clone)]
pub struct KafkaError {
pub code: i16,
pub name: &'static str,
pub message: Option<String>,
}
pub struct AdminClient {
pub(crate) conn: Connection,
security: Option<crabka_client_core::security::ClientSecurity>,
}
impl AdminClient {
fn opts(security: Option<crabka_client_core::security::ClientSecurity>) -> ConnectionOptions {
ConnectionOptions {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(30),
client_id: "crabka-operator".to_string(),
security: security.map(Box::new),
}
}
pub async fn connect_secured(
bootstrap_addrs: &[String],
security: Option<crabka_client_core::security::ClientSecurity>,
) -> Result<Self, AdminError> {
let opts = Self::opts(security.clone());
for host_port in bootstrap_addrs {
match Self::connect_one(host_port, opts.clone()).await {
Ok(conn) => return Ok(Self { conn, security }),
Err(e) => {
tracing::debug!(
target: "crabka_client_admin",
addr = %host_port,
error = %e,
"bootstrap connect failed",
);
}
}
}
Err(AdminError::Connect {
tried: bootstrap_addrs.len(),
})
}
pub async fn connect(bootstrap_addrs: &[String]) -> Result<Self, AdminError> {
Self::connect_secured(bootstrap_addrs, None).await
}
async fn connect_one(
host_port: &str,
opts: ConnectionOptions,
) -> Result<Connection, AdminError> {
let mut addrs = tokio::net::lookup_host(host_port)
.await
.map_err(|e| AdminError::Protocol(format!("DNS lookup {host_port}: {e}")))?;
let addr = addrs
.next()
.ok_or_else(|| AdminError::Protocol(format!("no addresses for {host_port}")))?;
Connection::connect_with_options(addr, opts)
.await
.map_err(AdminError::from)
}
pub(crate) async fn reconnect(&mut self, host_port: &str) -> Result<(), AdminError> {
let opts = Self::opts(self.security.clone());
self.conn = Self::connect_one(host_port, opts).await?;
Ok(())
}
}
pub(crate) const NOT_CONTROLLER: i16 = 41;
pub(crate) fn kafka_error_name(code: i16) -> &'static str {
match code {
0 => "NONE",
3 => "UNKNOWN_TOPIC_OR_PARTITION",
7 => "REQUEST_TIMED_OUT",
17 => "INVALID_TOPIC_EXCEPTION",
19 => "NOT_ENOUGH_REPLICAS",
36 => "TOPIC_ALREADY_EXISTS",
37 => "INVALID_PARTITIONS",
38 => "INVALID_REPLICATION_FACTOR",
39 => "INVALID_REPLICA_ASSIGNMENT",
40 => "INVALID_CONFIG",
41 => "NOT_CONTROLLER",
87 => "REASSIGNMENT_IN_PROGRESS",
_ => "UNKNOWN",
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn kafka_error_name_known_codes() {
assert!(kafka_error_name(0) == "NONE");
assert!(kafka_error_name(36) == "TOPIC_ALREADY_EXISTS");
assert!(kafka_error_name(41) == "NOT_CONTROLLER");
}
#[test]
fn kafka_error_name_unknown_returns_unknown() {
assert!(kafka_error_name(9999) == "UNKNOWN");
}
#[tokio::test]
async fn connect_secured_threads_security_and_fails_to_closed_port() {
use crabka_client_core::security::{ClientSecurity, SaslCredentials};
use crabka_security::ListenerProtocol;
let security = ClientSecurity {
protocol: ListenerProtocol::SaslPlaintext,
tls: None,
sasl: Some(SaslCredentials::Plain {
username: "u".into(),
password: "p".into(),
}),
sasl_host: None,
};
let res = AdminClient::connect_secured(&["127.0.0.1:1".to_string()], Some(security)).await;
assert!(res.is_err(), "connect to closed port must fail");
}
}