1use std::time::Duration;
9
10use crabka_client_core::{ClientError, Connection, ConnectionOptions};
11use thiserror::Error;
12
13pub mod configs;
14pub mod delegation_tokens;
15pub mod log_dirs;
16pub mod quotas;
17pub mod topics;
18pub mod users;
19
20pub use configs::{AlterConfigsOutcome, IncrementalAlterOp, TopicConfigOverrides};
21pub use log_dirs::{AlterReplicaLogDirOutcome, LogDirInfo, LogDirPartitionInfo, LogDirTopicInfo};
22pub use quotas::{QuotaOp, UserQuotaConfig, diff_user_quotas};
23pub use topics::{
24 CreatePartitionsOp, CreatePartitionsOutcome, CreateTopicOutcome, CreateTopicSpec,
25 DeleteTopicOutcome, TopicMetadata, TopicMetadataEntry,
26};
27pub use users::{
28 AclEntry, AclEntryFilter, AclOperation, CreateAclOutcome, DEFAULT_SCRAM_ITERATIONS,
29 DeleteAclFilterOutcome, PatternType, PermissionType, ResourceType, ScramDeletion,
30 ScramUpsertion, ScramUserOutcome,
31};
32
33#[async_trait::async_trait]
41pub trait AdminClientLike: Send {
42 async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError>;
43 async fn create_topics(
44 &mut self,
45 specs: &[CreateTopicSpec],
46 timeout_ms: i32,
47 ) -> Result<Vec<CreateTopicOutcome>, AdminError>;
48 async fn delete_topics(
49 &mut self,
50 names: &[&str],
51 timeout_ms: i32,
52 ) -> Result<Vec<DeleteTopicOutcome>, AdminError>;
53 async fn create_partitions(
54 &mut self,
55 ops: &[CreatePartitionsOp],
56 timeout_ms: i32,
57 ) -> Result<Vec<CreatePartitionsOutcome>, AdminError>;
58 async fn describe_configs(
59 &mut self,
60 topics: &[&str],
61 ) -> Result<Vec<TopicConfigOverrides>, AdminError>;
62 async fn incremental_alter_configs(
63 &mut self,
64 ops: &[IncrementalAlterOp],
65 ) -> Result<Vec<AlterConfigsOutcome>, AdminError>;
66 async fn alter_user_scram_credentials_sha512(
67 &mut self,
68 upsertions: &[ScramUpsertion],
69 deletions: &[ScramDeletion],
70 ) -> Result<Vec<ScramUserOutcome>, AdminError>;
71 async fn alter_user_scram_credentials_sha256(
76 &mut self,
77 upsertions: &[ScramUpsertion],
78 deletions: &[ScramDeletion],
79 ) -> Result<Vec<ScramUserOutcome>, AdminError>;
80 async fn describe_acls(&mut self, filter: &AclEntryFilter)
81 -> Result<Vec<AclEntry>, AdminError>;
82 async fn create_acls(
83 &mut self,
84 creations: &[AclEntry],
85 ) -> Result<Vec<CreateAclOutcome>, AdminError>;
86 async fn delete_acls(
87 &mut self,
88 filters: &[AclEntryFilter],
89 ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError>;
90 async fn describe_user_quotas(&mut self, username: &str)
91 -> Result<UserQuotaConfig, AdminError>;
92 async fn alter_user_quotas(
93 &mut self,
94 username: &str,
95 ops: &[QuotaOp],
96 validate_only: bool,
97 ) -> Result<Option<KafkaError>, AdminError>;
98
99 async fn create_delegation_token_as_owner(
108 &mut self,
109 owner_principal_name: &str,
110 renewers: &[String],
111 max_lifetime_ms: i64,
112 ) -> Result<crabka_metadata::DelegationToken, AdminError>;
113 async fn renew_delegation_token(
114 &mut self,
115 hmac: &[u8],
116 ) -> Result<crabka_metadata::DelegationToken, AdminError>;
117 async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError>;
118 async fn describe_delegation_tokens_owned_by(
119 &mut self,
120 owner_principal: &str,
121 ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError>;
122}
123
124#[async_trait::async_trait]
125impl AdminClientLike for AdminClient {
126 async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
127 AdminClient::metadata(self, topics).await
128 }
129 async fn create_topics(
130 &mut self,
131 specs: &[CreateTopicSpec],
132 timeout_ms: i32,
133 ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
134 AdminClient::create_topics(self, specs, timeout_ms).await
135 }
136 async fn delete_topics(
137 &mut self,
138 names: &[&str],
139 timeout_ms: i32,
140 ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
141 AdminClient::delete_topics(self, names, timeout_ms).await
142 }
143 async fn create_partitions(
144 &mut self,
145 ops: &[CreatePartitionsOp],
146 timeout_ms: i32,
147 ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
148 AdminClient::create_partitions(self, ops, timeout_ms).await
149 }
150 async fn describe_configs(
151 &mut self,
152 topics: &[&str],
153 ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
154 AdminClient::describe_configs(self, topics).await
155 }
156 async fn incremental_alter_configs(
157 &mut self,
158 ops: &[IncrementalAlterOp],
159 ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
160 AdminClient::incremental_alter_configs(self, ops).await
161 }
162 async fn alter_user_scram_credentials_sha512(
163 &mut self,
164 upsertions: &[ScramUpsertion],
165 deletions: &[ScramDeletion],
166 ) -> Result<Vec<ScramUserOutcome>, AdminError> {
167 AdminClient::alter_user_scram_credentials_sha512(self, upsertions, deletions).await
168 }
169 async fn alter_user_scram_credentials_sha256(
170 &mut self,
171 upsertions: &[ScramUpsertion],
172 deletions: &[ScramDeletion],
173 ) -> Result<Vec<ScramUserOutcome>, AdminError> {
174 AdminClient::alter_user_scram_credentials_sha256(self, upsertions, deletions).await
175 }
176 async fn describe_acls(
177 &mut self,
178 filter: &AclEntryFilter,
179 ) -> Result<Vec<AclEntry>, AdminError> {
180 AdminClient::describe_acls(self, filter).await
181 }
182 async fn create_acls(
183 &mut self,
184 creations: &[AclEntry],
185 ) -> Result<Vec<CreateAclOutcome>, AdminError> {
186 AdminClient::create_acls(self, creations).await
187 }
188 async fn delete_acls(
189 &mut self,
190 filters: &[AclEntryFilter],
191 ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
192 AdminClient::delete_acls(self, filters).await
193 }
194 async fn describe_user_quotas(
195 &mut self,
196 username: &str,
197 ) -> Result<UserQuotaConfig, AdminError> {
198 AdminClient::describe_user_quotas(self, username).await
199 }
200 async fn alter_user_quotas(
201 &mut self,
202 username: &str,
203 ops: &[QuotaOp],
204 validate_only: bool,
205 ) -> Result<Option<KafkaError>, AdminError> {
206 AdminClient::alter_user_quotas(self, username, ops, validate_only).await
207 }
208
209 async fn create_delegation_token_as_owner(
218 &mut self,
219 owner_principal_name: &str,
220 renewers: &[String],
221 max_lifetime_ms: i64,
222 ) -> Result<crabka_metadata::DelegationToken, AdminError> {
223 let resp = AdminClient::create_delegation_token_as_owner(
229 self,
230 owner_principal_name,
231 renewers,
232 max_lifetime_ms,
233 )
234 .await?;
235 let renewers_image = renewers
236 .iter()
237 .filter_map(|s| renewer_str_to_principal(s))
238 .collect();
239 Ok(crabka_metadata::DelegationToken {
240 token_id: resp.token_id,
241 owner: crabka_security::KafkaPrincipal {
242 principal_type: resp.principal_type,
243 name: resp.principal_name,
244 },
245 hmac: resp.hmac.to_vec(),
246 issue_timestamp_ms: resp.issue_timestamp_ms,
247 expiry_timestamp_ms: resp.expiry_timestamp_ms,
248 max_timestamp_ms: resp.max_timestamp_ms,
249 renewers: renewers_image,
250 })
251 }
252
253 async fn renew_delegation_token(
254 &mut self,
255 hmac: &[u8],
256 ) -> Result<crabka_metadata::DelegationToken, AdminError> {
257 let _new_expiry = AdminClient::renew_delegation_token(self, hmac).await?;
267 let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::default();
268 let resp = self.conn.send(req).await?;
269 if resp.error_code != 0 {
270 return Err(AdminError::Broker {
271 api: "DescribeDelegationToken",
272 code: resp.error_code,
273 name: kafka_error_name(resp.error_code),
274 message: None,
275 });
276 }
277 let matched = resp
278 .tokens
279 .into_iter()
280 .find(|t| t.hmac.as_ref() == hmac)
281 .ok_or_else(|| {
282 AdminError::Protocol(
283 "RenewDelegationToken: follow-up describe did not return the renewed token"
284 .into(),
285 )
286 })?;
287 Ok(crabka_metadata::DelegationToken {
288 token_id: matched.token_id,
289 owner: crabka_security::KafkaPrincipal {
290 principal_type: matched.principal_type,
291 name: matched.principal_name,
292 },
293 hmac: matched.hmac.to_vec(),
294 issue_timestamp_ms: matched.issue_timestamp,
295 expiry_timestamp_ms: matched.expiry_timestamp,
296 max_timestamp_ms: matched.max_timestamp,
297 renewers: matched
298 .renewers
299 .into_iter()
300 .map(|r| crabka_security::KafkaPrincipal {
301 principal_type: r.principal_type,
302 name: r.principal_name,
303 })
304 .collect(),
305 })
306 }
307
308 async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
309 AdminClient::expire_delegation_token(self, hmac).await
310 }
311
312 async fn describe_delegation_tokens_owned_by(
313 &mut self,
314 owner_principal: &str,
315 ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError> {
316 AdminClient::describe_delegation_tokens_owned_by(self, owner_principal).await
317 }
318}
319
320fn renewer_str_to_principal(s: &str) -> Option<crabka_security::KafkaPrincipal> {
324 if s.is_empty() {
325 return None;
326 }
327 let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
328 Some(crabka_security::KafkaPrincipal {
329 principal_type: pt.to_string(),
330 name: pn.to_string(),
331 })
332}
333
334#[derive(Debug, Error)]
335pub enum AdminError {
336 #[error("no bootstrap address was reachable: tried {tried}")]
337 Connect { tried: usize },
338 #[error("controller routing failed after retry")]
339 NotControllerExhausted,
340 #[error("broker returned error: api={api} code={code} ({name}){detail}",
341 detail = .message.as_deref().map(|m| format!(" {m:?}")).unwrap_or_default())]
342 Broker {
343 api: &'static str,
344 code: i16,
345 name: &'static str,
346 message: Option<String>,
347 },
348 #[error("client-core: {0}")]
349 Transport(#[from] ClientError),
350 #[error("protocol: {0}")]
351 Protocol(String),
352}
353
354#[derive(Debug, Clone)]
356pub struct KafkaError {
357 pub code: i16,
358 pub name: &'static str,
359 pub message: Option<String>,
360}
361
362pub struct AdminClient {
365 pub(crate) conn: Connection,
366 security: Option<crabka_client_core::security::ClientSecurity>,
369}
370
371impl AdminClient {
372 fn opts(security: Option<crabka_client_core::security::ClientSecurity>) -> ConnectionOptions {
375 ConnectionOptions {
376 connect_timeout: Duration::from_secs(5),
377 request_timeout: Duration::from_secs(30),
378 client_id: "crabka-operator".to_string(),
379 security: security.map(Box::new),
380 }
381 }
382
383 pub async fn connect_secured(
390 bootstrap_addrs: &[String],
391 security: Option<crabka_client_core::security::ClientSecurity>,
392 ) -> Result<Self, AdminError> {
393 let opts = Self::opts(security.clone());
394 for host_port in bootstrap_addrs {
395 match Self::connect_one(host_port, opts.clone()).await {
396 Ok(conn) => return Ok(Self { conn, security }),
397 Err(e) => {
398 tracing::debug!(
399 target: "crabka_client_admin",
400 addr = %host_port,
401 error = %e,
402 "bootstrap connect failed",
403 );
404 }
405 }
406 }
407 Err(AdminError::Connect {
408 tried: bootstrap_addrs.len(),
409 })
410 }
411
412 pub async fn connect(bootstrap_addrs: &[String]) -> Result<Self, AdminError> {
417 Self::connect_secured(bootstrap_addrs, None).await
418 }
419
420 async fn connect_one(
421 host_port: &str,
422 opts: ConnectionOptions,
423 ) -> Result<Connection, AdminError> {
424 let mut addrs = tokio::net::lookup_host(host_port)
425 .await
426 .map_err(|e| AdminError::Protocol(format!("DNS lookup {host_port}: {e}")))?;
427 let addr = addrs
428 .next()
429 .ok_or_else(|| AdminError::Protocol(format!("no addresses for {host_port}")))?;
430 Connection::connect_with_options(addr, opts)
431 .await
432 .map_err(AdminError::from)
433 }
434
435 pub(crate) async fn reconnect(&mut self, host_port: &str) -> Result<(), AdminError> {
438 let opts = Self::opts(self.security.clone());
439 self.conn = Self::connect_one(host_port, opts).await?;
440 Ok(())
441 }
442}
443
444pub(crate) const NOT_CONTROLLER: i16 = 41;
447
448pub(crate) fn kafka_error_name(code: i16) -> &'static str {
452 match code {
453 0 => "NONE",
454 3 => "UNKNOWN_TOPIC_OR_PARTITION",
455 7 => "REQUEST_TIMED_OUT",
456 17 => "INVALID_TOPIC_EXCEPTION",
457 19 => "NOT_ENOUGH_REPLICAS",
458 36 => "TOPIC_ALREADY_EXISTS",
459 37 => "INVALID_PARTITIONS",
460 38 => "INVALID_REPLICATION_FACTOR",
461 39 => "INVALID_REPLICA_ASSIGNMENT",
462 40 => "INVALID_CONFIG",
463 41 => "NOT_CONTROLLER",
464 87 => "REASSIGNMENT_IN_PROGRESS",
465 _ => "UNKNOWN",
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use assert2::assert;
473
474 #[test]
475 fn kafka_error_name_known_codes() {
476 assert!(kafka_error_name(0) == "NONE");
477 assert!(kafka_error_name(36) == "TOPIC_ALREADY_EXISTS");
478 assert!(kafka_error_name(41) == "NOT_CONTROLLER");
479 }
480
481 #[test]
482 fn kafka_error_name_unknown_returns_unknown() {
483 assert!(kafka_error_name(9999) == "UNKNOWN");
484 }
485
486 #[tokio::test]
487 async fn connect_secured_threads_security_and_fails_to_closed_port() {
488 use crabka_client_core::security::{ClientSecurity, SaslCredentials};
489 use crabka_security::ListenerProtocol;
490
491 let security = ClientSecurity {
492 protocol: ListenerProtocol::SaslPlaintext,
493 tls: None,
494 sasl: Some(SaslCredentials::Plain {
495 username: "u".into(),
496 password: "p".into(),
497 }),
498 sasl_host: None,
499 };
500 let res = AdminClient::connect_secured(&["127.0.0.1:1".to_string()], Some(security)).await;
503 assert!(res.is_err(), "connect to closed port must fail");
504 }
505}