1use std::time::Duration;
15
16use crabka_client_core::{ClientError, Connection, ConnectionOptions};
17use thiserror::Error;
18
19pub mod configs;
20pub mod delegation_tokens;
21pub mod log_dirs;
22pub mod quotas;
23pub mod topics;
24pub mod users;
25
26pub use configs::{AlterConfigsOutcome, IncrementalAlterOp, TopicConfigOverrides};
27pub use log_dirs::{AlterReplicaLogDirOutcome, LogDirInfo, LogDirPartitionInfo, LogDirTopicInfo};
28pub use quotas::{QuotaOp, UserQuotaConfig, diff_user_quotas};
29pub use topics::{
30 CreatePartitionsOp, CreatePartitionsOutcome, CreateTopicOutcome, CreateTopicSpec,
31 DeleteTopicOutcome, TopicMetadata, TopicMetadataEntry,
32};
33pub use users::{
34 AclEntry, AclEntryFilter, AclOperation, CreateAclOutcome, DEFAULT_SCRAM_ITERATIONS,
35 DeleteAclFilterOutcome, PatternType, PermissionType, ResourceType, ScramDeletion,
36 ScramUpsertion, ScramUserOutcome,
37};
38
39#[async_trait::async_trait]
47pub trait AdminClientLike: Send {
48 async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError>;
49 async fn create_topics(
50 &mut self,
51 specs: &[CreateTopicSpec],
52 timeout_ms: i32,
53 ) -> Result<Vec<CreateTopicOutcome>, AdminError>;
54 async fn delete_topics(
55 &mut self,
56 names: &[&str],
57 timeout_ms: i32,
58 ) -> Result<Vec<DeleteTopicOutcome>, AdminError>;
59 async fn create_partitions(
60 &mut self,
61 ops: &[CreatePartitionsOp],
62 timeout_ms: i32,
63 ) -> Result<Vec<CreatePartitionsOutcome>, AdminError>;
64 async fn describe_configs(
65 &mut self,
66 topics: &[&str],
67 ) -> Result<Vec<TopicConfigOverrides>, AdminError>;
68 async fn incremental_alter_configs(
69 &mut self,
70 ops: &[IncrementalAlterOp],
71 ) -> Result<Vec<AlterConfigsOutcome>, AdminError>;
72 async fn alter_user_scram_credentials_sha512(
73 &mut self,
74 upsertions: &[ScramUpsertion],
75 deletions: &[ScramDeletion],
76 ) -> Result<Vec<ScramUserOutcome>, AdminError>;
77 async fn alter_user_scram_credentials_sha256(
82 &mut self,
83 upsertions: &[ScramUpsertion],
84 deletions: &[ScramDeletion],
85 ) -> Result<Vec<ScramUserOutcome>, AdminError>;
86 async fn describe_acls(&mut self, filter: &AclEntryFilter)
87 -> Result<Vec<AclEntry>, AdminError>;
88 async fn create_acls(
89 &mut self,
90 creations: &[AclEntry],
91 ) -> Result<Vec<CreateAclOutcome>, AdminError>;
92 async fn delete_acls(
93 &mut self,
94 filters: &[AclEntryFilter],
95 ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError>;
96 async fn describe_user_quotas(&mut self, username: &str)
97 -> Result<UserQuotaConfig, AdminError>;
98 async fn alter_user_quotas(
99 &mut self,
100 username: &str,
101 ops: &[QuotaOp],
102 validate_only: bool,
103 ) -> Result<Option<KafkaError>, AdminError>;
104
105 async fn create_delegation_token_as_owner(
114 &mut self,
115 owner_principal_name: &str,
116 renewers: &[String],
117 max_lifetime_ms: i64,
118 ) -> Result<crabka_metadata::DelegationToken, AdminError>;
119 async fn renew_delegation_token(
120 &mut self,
121 hmac: &[u8],
122 ) -> Result<crabka_metadata::DelegationToken, AdminError>;
123 async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError>;
124 async fn describe_delegation_tokens_owned_by(
125 &mut self,
126 owner_principal: &str,
127 ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError>;
128}
129
130#[async_trait::async_trait]
131impl AdminClientLike for AdminClient {
132 async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
133 AdminClient::metadata(self, topics).await
134 }
135 async fn create_topics(
136 &mut self,
137 specs: &[CreateTopicSpec],
138 timeout_ms: i32,
139 ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
140 AdminClient::create_topics(self, specs, timeout_ms).await
141 }
142 async fn delete_topics(
143 &mut self,
144 names: &[&str],
145 timeout_ms: i32,
146 ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
147 AdminClient::delete_topics(self, names, timeout_ms).await
148 }
149 async fn create_partitions(
150 &mut self,
151 ops: &[CreatePartitionsOp],
152 timeout_ms: i32,
153 ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
154 AdminClient::create_partitions(self, ops, timeout_ms).await
155 }
156 async fn describe_configs(
157 &mut self,
158 topics: &[&str],
159 ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
160 AdminClient::describe_configs(self, topics).await
161 }
162 async fn incremental_alter_configs(
163 &mut self,
164 ops: &[IncrementalAlterOp],
165 ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
166 AdminClient::incremental_alter_configs(self, ops).await
167 }
168 async fn alter_user_scram_credentials_sha512(
169 &mut self,
170 upsertions: &[ScramUpsertion],
171 deletions: &[ScramDeletion],
172 ) -> Result<Vec<ScramUserOutcome>, AdminError> {
173 AdminClient::alter_user_scram_credentials_sha512(self, upsertions, deletions).await
174 }
175 async fn alter_user_scram_credentials_sha256(
176 &mut self,
177 upsertions: &[ScramUpsertion],
178 deletions: &[ScramDeletion],
179 ) -> Result<Vec<ScramUserOutcome>, AdminError> {
180 AdminClient::alter_user_scram_credentials_sha256(self, upsertions, deletions).await
181 }
182 async fn describe_acls(
183 &mut self,
184 filter: &AclEntryFilter,
185 ) -> Result<Vec<AclEntry>, AdminError> {
186 AdminClient::describe_acls(self, filter).await
187 }
188 async fn create_acls(
189 &mut self,
190 creations: &[AclEntry],
191 ) -> Result<Vec<CreateAclOutcome>, AdminError> {
192 AdminClient::create_acls(self, creations).await
193 }
194 async fn delete_acls(
195 &mut self,
196 filters: &[AclEntryFilter],
197 ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
198 AdminClient::delete_acls(self, filters).await
199 }
200 async fn describe_user_quotas(
201 &mut self,
202 username: &str,
203 ) -> Result<UserQuotaConfig, AdminError> {
204 AdminClient::describe_user_quotas(self, username).await
205 }
206 async fn alter_user_quotas(
207 &mut self,
208 username: &str,
209 ops: &[QuotaOp],
210 validate_only: bool,
211 ) -> Result<Option<KafkaError>, AdminError> {
212 AdminClient::alter_user_quotas(self, username, ops, validate_only).await
213 }
214
215 async fn create_delegation_token_as_owner(
224 &mut self,
225 owner_principal_name: &str,
226 renewers: &[String],
227 max_lifetime_ms: i64,
228 ) -> Result<crabka_metadata::DelegationToken, AdminError> {
229 let resp = AdminClient::create_delegation_token_as_owner(
235 self,
236 owner_principal_name,
237 renewers,
238 max_lifetime_ms,
239 )
240 .await?;
241 let renewers_image = renewers
242 .iter()
243 .filter_map(|s| renewer_str_to_principal(s))
244 .collect();
245 Ok(crabka_metadata::DelegationToken {
246 token_id: resp.token_id,
247 owner: crabka_security::KafkaPrincipal {
248 principal_type: resp.principal_type,
249 name: resp.principal_name,
250 },
251 hmac: resp.hmac.to_vec(),
252 issue_timestamp_ms: resp.issue_timestamp_ms,
253 expiry_timestamp_ms: resp.expiry_timestamp_ms,
254 max_timestamp_ms: resp.max_timestamp_ms,
255 renewers: renewers_image,
256 })
257 }
258
259 async fn renew_delegation_token(
260 &mut self,
261 hmac: &[u8],
262 ) -> Result<crabka_metadata::DelegationToken, AdminError> {
263 let _new_expiry = AdminClient::renew_delegation_token(self, hmac).await?;
273 let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::default();
274 let resp = self.conn.send(req).await?;
275 if resp.error_code != 0 {
276 return Err(AdminError::Broker {
277 api: "DescribeDelegationToken",
278 code: resp.error_code,
279 name: kafka_error_name(resp.error_code),
280 message: None,
281 });
282 }
283 let matched = resp
284 .tokens
285 .into_iter()
286 .find(|t| t.hmac.as_ref() == hmac)
287 .ok_or_else(|| {
288 AdminError::Protocol(
289 "RenewDelegationToken: follow-up describe did not return the renewed token"
290 .into(),
291 )
292 })?;
293 Ok(crabka_metadata::DelegationToken {
294 token_id: matched.token_id,
295 owner: crabka_security::KafkaPrincipal {
296 principal_type: matched.principal_type,
297 name: matched.principal_name,
298 },
299 hmac: matched.hmac.to_vec(),
300 issue_timestamp_ms: matched.issue_timestamp,
301 expiry_timestamp_ms: matched.expiry_timestamp,
302 max_timestamp_ms: matched.max_timestamp,
303 renewers: matched
304 .renewers
305 .into_iter()
306 .map(|r| crabka_security::KafkaPrincipal {
307 principal_type: r.principal_type,
308 name: r.principal_name,
309 })
310 .collect(),
311 })
312 }
313
314 async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
315 AdminClient::expire_delegation_token(self, hmac).await
316 }
317
318 async fn describe_delegation_tokens_owned_by(
319 &mut self,
320 owner_principal: &str,
321 ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError> {
322 AdminClient::describe_delegation_tokens_owned_by(self, owner_principal).await
323 }
324}
325
326fn renewer_str_to_principal(s: &str) -> Option<crabka_security::KafkaPrincipal> {
330 if s.is_empty() {
331 return None;
332 }
333 let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
334 Some(crabka_security::KafkaPrincipal {
335 principal_type: pt.to_string(),
336 name: pn.to_string(),
337 })
338}
339
340#[derive(Debug, Error)]
341pub enum AdminError {
342 #[error("no bootstrap address was reachable: tried {tried}")]
343 Connect { tried: usize },
344 #[error("controller routing failed after retry")]
345 NotControllerExhausted,
346 #[error("broker returned error: api={api} code={code} ({name}){detail}",
347 detail = .message.as_deref().map(|m| format!(" {m:?}")).unwrap_or_default())]
348 Broker {
349 api: &'static str,
350 code: i16,
351 name: &'static str,
352 message: Option<String>,
353 },
354 #[error("client-core: {0}")]
355 Transport(#[from] ClientError),
356 #[error("protocol: {0}")]
357 Protocol(String),
358}
359
360#[derive(Debug, Clone)]
362pub struct KafkaError {
363 pub code: i16,
364 pub name: &'static str,
365 pub message: Option<String>,
366}
367
368pub struct AdminClient {
371 pub(crate) conn: Connection,
372 security: Option<crabka_client_core::security::ClientSecurity>,
375}
376
377impl AdminClient {
378 fn opts(security: Option<crabka_client_core::security::ClientSecurity>) -> ConnectionOptions {
381 ConnectionOptions {
382 connect_timeout: Duration::from_secs(5),
383 request_timeout: Duration::from_secs(30),
384 client_id: "crabka-operator".to_string(),
385 security: security.map(Box::new),
386 }
387 }
388
389 pub async fn connect_secured(
396 bootstrap_addrs: &[String],
397 security: Option<crabka_client_core::security::ClientSecurity>,
398 ) -> Result<Self, AdminError> {
399 let opts = Self::opts(security.clone());
400 for host_port in bootstrap_addrs {
401 match Self::connect_one(host_port, opts.clone()).await {
402 Ok(conn) => return Ok(Self { conn, security }),
403 Err(e) => {
404 tracing::debug!(
405 target: "crabka_client_admin",
406 addr = %host_port,
407 error = %e,
408 "bootstrap connect failed",
409 );
410 }
411 }
412 }
413 Err(AdminError::Connect {
414 tried: bootstrap_addrs.len(),
415 })
416 }
417
418 pub async fn connect(bootstrap_addrs: &[String]) -> Result<Self, AdminError> {
423 Self::connect_secured(bootstrap_addrs, None).await
424 }
425
426 async fn connect_one(
427 host_port: &str,
428 opts: ConnectionOptions,
429 ) -> Result<Connection, AdminError> {
430 let mut addrs = tokio::net::lookup_host(host_port)
431 .await
432 .map_err(|e| AdminError::Protocol(format!("DNS lookup {host_port}: {e}")))?;
433 let addr = addrs
434 .next()
435 .ok_or_else(|| AdminError::Protocol(format!("no addresses for {host_port}")))?;
436 Connection::connect_with_options(addr, opts)
437 .await
438 .map_err(AdminError::from)
439 }
440
441 pub(crate) async fn reconnect(&mut self, host_port: &str) -> Result<(), AdminError> {
444 let opts = Self::opts(self.security.clone());
445 self.conn = Self::connect_one(host_port, opts).await?;
446 Ok(())
447 }
448}
449
450pub(crate) const NOT_CONTROLLER: i16 = 41;
453
454pub(crate) fn kafka_error_name(code: i16) -> &'static str {
458 match code {
459 0 => "NONE",
460 3 => "UNKNOWN_TOPIC_OR_PARTITION",
461 7 => "REQUEST_TIMED_OUT",
462 17 => "INVALID_TOPIC_EXCEPTION",
463 19 => "NOT_ENOUGH_REPLICAS",
464 36 => "TOPIC_ALREADY_EXISTS",
465 37 => "INVALID_PARTITIONS",
466 38 => "INVALID_REPLICATION_FACTOR",
467 39 => "INVALID_REPLICA_ASSIGNMENT",
468 40 => "INVALID_CONFIG",
469 41 => "NOT_CONTROLLER",
470 87 => "REASSIGNMENT_IN_PROGRESS",
471 _ => "UNKNOWN",
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use assert2::assert;
479
480 #[test]
481 fn kafka_error_name_known_codes() {
482 assert!(kafka_error_name(0) == "NONE");
483 assert!(kafka_error_name(36) == "TOPIC_ALREADY_EXISTS");
484 assert!(kafka_error_name(41) == "NOT_CONTROLLER");
485 }
486
487 #[test]
488 fn kafka_error_name_unknown_returns_unknown() {
489 assert!(kafka_error_name(9999) == "UNKNOWN");
490 }
491
492 #[tokio::test]
493 async fn connect_secured_threads_security_and_fails_to_closed_port() {
494 use crabka_client_core::security::{ClientSecurity, SaslCredentials};
495 use crabka_security::ListenerProtocol;
496
497 let security = ClientSecurity {
498 protocol: ListenerProtocol::SaslPlaintext,
499 tls: None,
500 sasl: Some(SaslCredentials::Plain {
501 username: "u".into(),
502 password: "p".into(),
503 }),
504 sasl_host: None,
505 };
506 let res = AdminClient::connect_secured(&["127.0.0.1:1".to_string()], Some(security)).await;
509 assert!(res.is_err(), "connect to closed port must fail");
510 }
511}