Skip to main content

crabka_client_admin/
lib.rs

1//! Admin client for Crabka operators and control-plane services.
2//!
3//! The client targets the active controller and retries selected RPCs on a
4//! refreshed controller connection when the broker returns `NOT_CONTROLLER`.
5//! It supports plaintext by default and the same client-side TLS/SASL security
6//! surface as [`crabka_client_core`] via [`AdminClient::connect_secured`].
7//!
8//! Built on `crabka_client_core::Connection`'s typed
9//! `send::<R: ProtocolRequest>` so request-version negotiation is
10//! automatic via the `ApiVersionTable` populated at connect time. The public
11//! modules cover topic CRUD, partition expansion, config changes, SCRAM user
12//! credentials, ACLs, quotas, delegation tokens, and log-dir inspection.
13
14use std::time::Duration;
15
16use crabka_client_core::{ClientError, Connection, ConnectionOptions};
17use thiserror::Error;
18
19pub mod configs;
20pub mod delegation_tokens;
21pub mod log_dirs;
22pub mod quotas;
23pub mod topics;
24pub mod users;
25
26pub use configs::{AlterConfigsOutcome, IncrementalAlterOp, TopicConfigOverrides};
27pub use log_dirs::{AlterReplicaLogDirOutcome, LogDirInfo, LogDirPartitionInfo, LogDirTopicInfo};
28pub use quotas::{QuotaOp, UserQuotaConfig, diff_user_quotas};
29pub use topics::{
30    CreatePartitionsOp, CreatePartitionsOutcome, CreateTopicOutcome, CreateTopicSpec,
31    DeleteTopicOutcome, TopicMetadata, TopicMetadataEntry,
32};
33pub use users::{
34    AclEntry, AclEntryFilter, AclOperation, CreateAclOutcome, DEFAULT_SCRAM_ITERATIONS,
35    DeleteAclFilterOutcome, PatternType, PermissionType, ResourceType, ScramDeletion,
36    ScramUpsertion, ScramUserOutcome,
37};
38
39/// Test seam for `AdminClient`. The operator's reconcile only needs
40/// dynamic dispatch via this trait; production code wraps a concrete
41/// `AdminClient`, while tests substitute a fake.
42///
43/// Methods take `&mut self` because the underlying `AdminClient`'s
44/// `NOT_CONTROLLER` retry path reconnects the inner `Connection` in
45/// place, which requires unique access.
46#[async_trait::async_trait]
47pub trait AdminClientLike: Send {
48    async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError>;
49    async fn create_topics(
50        &mut self,
51        specs: &[CreateTopicSpec],
52        timeout_ms: i32,
53    ) -> Result<Vec<CreateTopicOutcome>, AdminError>;
54    async fn delete_topics(
55        &mut self,
56        names: &[&str],
57        timeout_ms: i32,
58    ) -> Result<Vec<DeleteTopicOutcome>, AdminError>;
59    async fn create_partitions(
60        &mut self,
61        ops: &[CreatePartitionsOp],
62        timeout_ms: i32,
63    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError>;
64    async fn describe_configs(
65        &mut self,
66        topics: &[&str],
67    ) -> Result<Vec<TopicConfigOverrides>, AdminError>;
68    async fn incremental_alter_configs(
69        &mut self,
70        ops: &[IncrementalAlterOp],
71    ) -> Result<Vec<AlterConfigsOutcome>, AdminError>;
72    async fn alter_user_scram_credentials_sha512(
73        &mut self,
74        upsertions: &[ScramUpsertion],
75        deletions: &[ScramDeletion],
76    ) -> Result<Vec<ScramUserOutcome>, AdminError>;
77    /// SCRAM-SHA-256 sibling of
78    /// [`Self::alter_user_scram_credentials_sha512`]. The operator
79    /// calls this when a `KafkaUser.spec.authentication.type ==
80    /// scram-sha-256`.
81    async fn alter_user_scram_credentials_sha256(
82        &mut self,
83        upsertions: &[ScramUpsertion],
84        deletions: &[ScramDeletion],
85    ) -> Result<Vec<ScramUserOutcome>, AdminError>;
86    async fn describe_acls(&mut self, filter: &AclEntryFilter)
87    -> Result<Vec<AclEntry>, AdminError>;
88    async fn create_acls(
89        &mut self,
90        creations: &[AclEntry],
91    ) -> Result<Vec<CreateAclOutcome>, AdminError>;
92    async fn delete_acls(
93        &mut self,
94        filters: &[AclEntryFilter],
95    ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError>;
96    async fn describe_user_quotas(&mut self, username: &str)
97    -> Result<UserQuotaConfig, AdminError>;
98    async fn alter_user_quotas(
99        &mut self,
100        username: &str,
101        ops: &[QuotaOp],
102        validate_only: bool,
103    ) -> Result<Option<KafkaError>, AdminError>;
104
105    // ── delegation-token RPCs (KIP-48) ────────────────────────────────
106    //
107    // Trait-level return type is `crabka_metadata::DelegationToken`
108    // (the image type) rather than the raw `Create/RenewDelegationToken`
109    // response. The `AdminClientLike for AdminClient` impl below
110    // reshapes wire responses into the image type — see the per-method
111    // comments there for the trade-off on how the renew path recovers
112    // the full token (the renew response carries only the new expiry).
113    async fn create_delegation_token_as_owner(
114        &mut self,
115        owner_principal_name: &str,
116        renewers: &[String],
117        max_lifetime_ms: i64,
118    ) -> Result<crabka_metadata::DelegationToken, AdminError>;
119    async fn renew_delegation_token(
120        &mut self,
121        hmac: &[u8],
122    ) -> Result<crabka_metadata::DelegationToken, AdminError>;
123    async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError>;
124    async fn describe_delegation_tokens_owned_by(
125        &mut self,
126        owner_principal: &str,
127    ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError>;
128}
129
130#[async_trait::async_trait]
131impl AdminClientLike for AdminClient {
132    async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
133        AdminClient::metadata(self, topics).await
134    }
135    async fn create_topics(
136        &mut self,
137        specs: &[CreateTopicSpec],
138        timeout_ms: i32,
139    ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
140        AdminClient::create_topics(self, specs, timeout_ms).await
141    }
142    async fn delete_topics(
143        &mut self,
144        names: &[&str],
145        timeout_ms: i32,
146    ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
147        AdminClient::delete_topics(self, names, timeout_ms).await
148    }
149    async fn create_partitions(
150        &mut self,
151        ops: &[CreatePartitionsOp],
152        timeout_ms: i32,
153    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
154        AdminClient::create_partitions(self, ops, timeout_ms).await
155    }
156    async fn describe_configs(
157        &mut self,
158        topics: &[&str],
159    ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
160        AdminClient::describe_configs(self, topics).await
161    }
162    async fn incremental_alter_configs(
163        &mut self,
164        ops: &[IncrementalAlterOp],
165    ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
166        AdminClient::incremental_alter_configs(self, ops).await
167    }
168    async fn alter_user_scram_credentials_sha512(
169        &mut self,
170        upsertions: &[ScramUpsertion],
171        deletions: &[ScramDeletion],
172    ) -> Result<Vec<ScramUserOutcome>, AdminError> {
173        AdminClient::alter_user_scram_credentials_sha512(self, upsertions, deletions).await
174    }
175    async fn alter_user_scram_credentials_sha256(
176        &mut self,
177        upsertions: &[ScramUpsertion],
178        deletions: &[ScramDeletion],
179    ) -> Result<Vec<ScramUserOutcome>, AdminError> {
180        AdminClient::alter_user_scram_credentials_sha256(self, upsertions, deletions).await
181    }
182    async fn describe_acls(
183        &mut self,
184        filter: &AclEntryFilter,
185    ) -> Result<Vec<AclEntry>, AdminError> {
186        AdminClient::describe_acls(self, filter).await
187    }
188    async fn create_acls(
189        &mut self,
190        creations: &[AclEntry],
191    ) -> Result<Vec<CreateAclOutcome>, AdminError> {
192        AdminClient::create_acls(self, creations).await
193    }
194    async fn delete_acls(
195        &mut self,
196        filters: &[AclEntryFilter],
197    ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
198        AdminClient::delete_acls(self, filters).await
199    }
200    async fn describe_user_quotas(
201        &mut self,
202        username: &str,
203    ) -> Result<UserQuotaConfig, AdminError> {
204        AdminClient::describe_user_quotas(self, username).await
205    }
206    async fn alter_user_quotas(
207        &mut self,
208        username: &str,
209        ops: &[QuotaOp],
210        validate_only: bool,
211    ) -> Result<Option<KafkaError>, AdminError> {
212        AdminClient::alter_user_quotas(self, username, ops, validate_only).await
213    }
214
215    // ── delegation-token RPCs ─────────────────────────────────────────
216    //
217    // The inherent `AdminClient` methods in `delegation_tokens.rs`
218    // return the wire-shaped response (`CreateDelegationTokenResponse`
219    // for create, `i64` new expiry for renew, `()` for expire, image
220    // `DelegationToken` for describe). The trait surface is normalised
221    // to `crabka_metadata::DelegationToken` so the operator's reconcile
222    // path is wire-agnostic.
223    async fn create_delegation_token_as_owner(
224        &mut self,
225        owner_principal_name: &str,
226        renewers: &[String],
227        max_lifetime_ms: i64,
228    ) -> Result<crabka_metadata::DelegationToken, AdminError> {
229        // The create-response carries every field the image type needs
230        // *except* the renewer list (the broker does not echo it back),
231        // so we reconstruct that from the caller's input — which is the
232        // ground truth anyway (KIP-48's create accepts the renewer set
233        // verbatim and the broker stores it as-is).
234        let resp = AdminClient::create_delegation_token_as_owner(
235            self,
236            owner_principal_name,
237            renewers,
238            max_lifetime_ms,
239        )
240        .await?;
241        let renewers_image = renewers
242            .iter()
243            .filter_map(|s| renewer_str_to_principal(s))
244            .collect();
245        Ok(crabka_metadata::DelegationToken {
246            token_id: resp.token_id,
247            owner: crabka_security::KafkaPrincipal {
248                principal_type: resp.principal_type,
249                name: resp.principal_name,
250            },
251            hmac: resp.hmac.to_vec(),
252            issue_timestamp_ms: resp.issue_timestamp_ms,
253            expiry_timestamp_ms: resp.expiry_timestamp_ms,
254            max_timestamp_ms: resp.max_timestamp_ms,
255            renewers: renewers_image,
256        })
257    }
258
259    async fn renew_delegation_token(
260        &mut self,
261        hmac: &[u8],
262    ) -> Result<crabka_metadata::DelegationToken, AdminError> {
263        // The renew-response (`RenewDelegationTokenResponse`) carries
264        // only the new `expiry_timestamp_ms`. To rebuild the full
265        // `DelegationToken` we follow up with `DescribeDelegationToken`
266        // with `owners=None` (describe all) and look up the entry by
267        // hmac. This adds one RPC per renewal — acceptable because the
268        // operator renews each user at most every `renew_before_expiry_ms`
269        // (24h by default). An alternative would have been to thread the
270        // owner principal into the trait method; keeping the surface
271        // `hmac`-only matches the inherent O3 signature.
272        let _new_expiry = AdminClient::renew_delegation_token(self, hmac).await?;
273        let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::default();
274        let resp = self.conn.send(req).await?;
275        if resp.error_code != 0 {
276            return Err(AdminError::Broker {
277                api: "DescribeDelegationToken",
278                code: resp.error_code,
279                name: kafka_error_name(resp.error_code),
280                message: None,
281            });
282        }
283        let matched = resp
284            .tokens
285            .into_iter()
286            .find(|t| t.hmac.as_ref() == hmac)
287            .ok_or_else(|| {
288                AdminError::Protocol(
289                    "RenewDelegationToken: follow-up describe did not return the renewed token"
290                        .into(),
291                )
292            })?;
293        Ok(crabka_metadata::DelegationToken {
294            token_id: matched.token_id,
295            owner: crabka_security::KafkaPrincipal {
296                principal_type: matched.principal_type,
297                name: matched.principal_name,
298            },
299            hmac: matched.hmac.to_vec(),
300            issue_timestamp_ms: matched.issue_timestamp,
301            expiry_timestamp_ms: matched.expiry_timestamp,
302            max_timestamp_ms: matched.max_timestamp,
303            renewers: matched
304                .renewers
305                .into_iter()
306                .map(|r| crabka_security::KafkaPrincipal {
307                    principal_type: r.principal_type,
308                    name: r.principal_name,
309                })
310                .collect(),
311        })
312    }
313
314    async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
315        AdminClient::expire_delegation_token(self, hmac).await
316    }
317
318    async fn describe_delegation_tokens_owned_by(
319        &mut self,
320        owner_principal: &str,
321    ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError> {
322        AdminClient::describe_delegation_tokens_owned_by(self, owner_principal).await
323    }
324}
325
326/// Split `"Type:Name"` (default type `User`) into a `KafkaPrincipal`.
327/// Empty input yields `None` so the create path doesn't manufacture a
328/// principal from a bare `""` renewer entry.
329fn renewer_str_to_principal(s: &str) -> Option<crabka_security::KafkaPrincipal> {
330    if s.is_empty() {
331        return None;
332    }
333    let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
334    Some(crabka_security::KafkaPrincipal {
335        principal_type: pt.to_string(),
336        name: pn.to_string(),
337    })
338}
339
340#[derive(Debug, Error)]
341pub enum AdminError {
342    #[error("no bootstrap address was reachable: tried {tried}")]
343    Connect { tried: usize },
344    #[error("controller routing failed after retry")]
345    NotControllerExhausted,
346    #[error("broker returned error: api={api} code={code} ({name}){detail}",
347            detail = .message.as_deref().map(|m| format!(" {m:?}")).unwrap_or_default())]
348    Broker {
349        api: &'static str,
350        code: i16,
351        name: &'static str,
352        message: Option<String>,
353    },
354    #[error("client-core: {0}")]
355    Transport(#[from] ClientError),
356    #[error("protocol: {0}")]
357    Protocol(String),
358}
359
360/// A Kafka-level error attached to a single per-resource outcome.
361#[derive(Debug, Clone)]
362pub struct KafkaError {
363    pub code: i16,
364    pub name: &'static str,
365    pub message: Option<String>,
366}
367
368/// Short-lived admin client targeting one cluster's controller.
369/// Optionally negotiates TLS/SASL via [`AdminClient::connect_secured`].
370pub struct AdminClient {
371    pub(crate) conn: Connection,
372    /// Client security carried forward to `reconnect` so a
373    /// `NOT_CONTROLLER` retry re-dials the new controller the same way.
374    security: Option<crabka_client_core::security::ClientSecurity>,
375}
376
377impl AdminClient {
378    /// Build the per-connect options for `client_id="crabka-operator"`,
379    /// carrying the supplied security policy.
380    fn opts(security: Option<crabka_client_core::security::ClientSecurity>) -> ConnectionOptions {
381        ConnectionOptions {
382            connect_timeout: Duration::from_secs(5),
383            request_timeout: Duration::from_secs(30),
384            client_id: "crabka-operator".to_string(),
385            security: security.map(Box::new),
386        }
387    }
388
389    /// Connect, applying optional client security. `None` = plaintext
390    /// (identical to [`AdminClient::connect`]).
391    ///
392    /// # Errors
393    /// Returns `AdminError::Connect { tried }` if no bootstrap address
394    /// accepted the (optionally secured) connection.
395    pub async fn connect_secured(
396        bootstrap_addrs: &[String],
397        security: Option<crabka_client_core::security::ClientSecurity>,
398    ) -> Result<Self, AdminError> {
399        let opts = Self::opts(security.clone());
400        for host_port in bootstrap_addrs {
401            match Self::connect_one(host_port, opts.clone()).await {
402                Ok(conn) => return Ok(Self { conn, security }),
403                Err(e) => {
404                    tracing::debug!(
405                        target: "crabka_client_admin",
406                        addr = %host_port,
407                        error = %e,
408                        "bootstrap connect failed",
409                    );
410                }
411            }
412        }
413        Err(AdminError::Connect {
414            tried: bootstrap_addrs.len(),
415        })
416    }
417
418    /// Try each bootstrap address in order. Each entry is `host:port`;
419    /// DNS is resolved via `tokio::net::lookup_host`. First successful
420    /// connect wins. Returns `AdminError::Connect { tried }` if none
421    /// responded. Plaintext; see [`AdminClient::connect_secured`].
422    pub async fn connect(bootstrap_addrs: &[String]) -> Result<Self, AdminError> {
423        Self::connect_secured(bootstrap_addrs, None).await
424    }
425
426    async fn connect_one(
427        host_port: &str,
428        opts: ConnectionOptions,
429    ) -> Result<Connection, AdminError> {
430        let mut addrs = tokio::net::lookup_host(host_port)
431            .await
432            .map_err(|e| AdminError::Protocol(format!("DNS lookup {host_port}: {e}")))?;
433        let addr = addrs
434            .next()
435            .ok_or_else(|| AdminError::Protocol(format!("no addresses for {host_port}")))?;
436        Connection::connect_with_options(addr, opts)
437            .await
438            .map_err(AdminError::from)
439    }
440
441    /// Replace the underlying connection. Used internally by the
442    /// `NOT_CONTROLLER` retry path to reconnect to the current controller.
443    pub(crate) async fn reconnect(&mut self, host_port: &str) -> Result<(), AdminError> {
444        let opts = Self::opts(self.security.clone());
445        self.conn = Self::connect_one(host_port, opts).await?;
446        Ok(())
447    }
448}
449
450/// Kafka error code: the broker is not the controller (KIP-129). The
451/// admin client refreshes its controller endpoint and retries once.
452pub(crate) const NOT_CONTROLLER: i16 = 41;
453
454/// Map a Kafka error code into a static name string for human-friendly
455/// `AdminError::Broker` formatting. Only the codes we actually surface
456/// today; unknown codes serialize as `"UNKNOWN"`.
457pub(crate) fn kafka_error_name(code: i16) -> &'static str {
458    match code {
459        0 => "NONE",
460        3 => "UNKNOWN_TOPIC_OR_PARTITION",
461        7 => "REQUEST_TIMED_OUT",
462        17 => "INVALID_TOPIC_EXCEPTION",
463        19 => "NOT_ENOUGH_REPLICAS",
464        36 => "TOPIC_ALREADY_EXISTS",
465        37 => "INVALID_PARTITIONS",
466        38 => "INVALID_REPLICATION_FACTOR",
467        39 => "INVALID_REPLICA_ASSIGNMENT",
468        40 => "INVALID_CONFIG",
469        41 => "NOT_CONTROLLER",
470        87 => "REASSIGNMENT_IN_PROGRESS",
471        _ => "UNKNOWN",
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use assert2::assert;
479
480    #[test]
481    fn kafka_error_name_known_codes() {
482        assert!(kafka_error_name(0) == "NONE");
483        assert!(kafka_error_name(36) == "TOPIC_ALREADY_EXISTS");
484        assert!(kafka_error_name(41) == "NOT_CONTROLLER");
485    }
486
487    #[test]
488    fn kafka_error_name_unknown_returns_unknown() {
489        assert!(kafka_error_name(9999) == "UNKNOWN");
490    }
491
492    #[tokio::test]
493    async fn connect_secured_threads_security_and_fails_to_closed_port() {
494        use crabka_client_core::security::{ClientSecurity, SaslCredentials};
495        use crabka_security::ListenerProtocol;
496
497        let security = ClientSecurity {
498            protocol: ListenerProtocol::SaslPlaintext,
499            tls: None,
500            sasl: Some(SaslCredentials::Plain {
501                username: "u".into(),
502                password: "p".into(),
503            }),
504            sasl_host: None,
505        };
506        // 127.0.0.1:1 has no listener; the secured connect must fail —
507        // proving the security arg is threaded (not a type error).
508        let res = AdminClient::connect_secured(&["127.0.0.1:1".to_string()], Some(security)).await;
509        assert!(res.is_err(), "connect to closed port must fail");
510    }
511}