Skip to main content

crabka_client_admin/
lib.rs

1//! Admin client for the operator. Targets one cluster's
2//! controller; plaintext only.
3//!
4//! Built on `crabka_client_core::Connection`'s typed
5//! `send::<R: ProtocolRequest>` so request-version negotiation is
6//! automatic via the `ApiVersionTable` populated at connect time.
7
8use std::time::Duration;
9
10use crabka_client_core::{ClientError, Connection, ConnectionOptions};
11use thiserror::Error;
12
13pub mod configs;
14pub mod delegation_tokens;
15pub mod log_dirs;
16pub mod quotas;
17pub mod topics;
18pub mod users;
19
20pub use configs::{AlterConfigsOutcome, IncrementalAlterOp, TopicConfigOverrides};
21pub use log_dirs::{AlterReplicaLogDirOutcome, LogDirInfo, LogDirPartitionInfo, LogDirTopicInfo};
22pub use quotas::{QuotaOp, UserQuotaConfig, diff_user_quotas};
23pub use topics::{
24    CreatePartitionsOp, CreatePartitionsOutcome, CreateTopicOutcome, CreateTopicSpec,
25    DeleteTopicOutcome, TopicMetadata, TopicMetadataEntry,
26};
27pub use users::{
28    AclEntry, AclEntryFilter, AclOperation, CreateAclOutcome, DEFAULT_SCRAM_ITERATIONS,
29    DeleteAclFilterOutcome, PatternType, PermissionType, ResourceType, ScramDeletion,
30    ScramUpsertion, ScramUserOutcome,
31};
32
33/// Test seam for `AdminClient`. The operator's reconcile only needs
34/// dynamic dispatch via this trait; production code wraps a concrete
35/// `AdminClient`, while tests substitute a fake.
36///
37/// Methods take `&mut self` because the underlying `AdminClient`'s
38/// `NOT_CONTROLLER` retry path reconnects the inner `Connection` in
39/// place, which requires unique access.
40#[async_trait::async_trait]
41pub trait AdminClientLike: Send {
42    async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError>;
43    async fn create_topics(
44        &mut self,
45        specs: &[CreateTopicSpec],
46        timeout_ms: i32,
47    ) -> Result<Vec<CreateTopicOutcome>, AdminError>;
48    async fn delete_topics(
49        &mut self,
50        names: &[&str],
51        timeout_ms: i32,
52    ) -> Result<Vec<DeleteTopicOutcome>, AdminError>;
53    async fn create_partitions(
54        &mut self,
55        ops: &[CreatePartitionsOp],
56        timeout_ms: i32,
57    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError>;
58    async fn describe_configs(
59        &mut self,
60        topics: &[&str],
61    ) -> Result<Vec<TopicConfigOverrides>, AdminError>;
62    async fn incremental_alter_configs(
63        &mut self,
64        ops: &[IncrementalAlterOp],
65    ) -> Result<Vec<AlterConfigsOutcome>, AdminError>;
66    async fn alter_user_scram_credentials_sha512(
67        &mut self,
68        upsertions: &[ScramUpsertion],
69        deletions: &[ScramDeletion],
70    ) -> Result<Vec<ScramUserOutcome>, AdminError>;
71    /// SCRAM-SHA-256 sibling of
72    /// [`Self::alter_user_scram_credentials_sha512`]. The operator
73    /// calls this when a `KafkaUser.spec.authentication.type ==
74    /// scram-sha-256`.
75    async fn alter_user_scram_credentials_sha256(
76        &mut self,
77        upsertions: &[ScramUpsertion],
78        deletions: &[ScramDeletion],
79    ) -> Result<Vec<ScramUserOutcome>, AdminError>;
80    async fn describe_acls(&mut self, filter: &AclEntryFilter)
81    -> Result<Vec<AclEntry>, AdminError>;
82    async fn create_acls(
83        &mut self,
84        creations: &[AclEntry],
85    ) -> Result<Vec<CreateAclOutcome>, AdminError>;
86    async fn delete_acls(
87        &mut self,
88        filters: &[AclEntryFilter],
89    ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError>;
90    async fn describe_user_quotas(&mut self, username: &str)
91    -> Result<UserQuotaConfig, AdminError>;
92    async fn alter_user_quotas(
93        &mut self,
94        username: &str,
95        ops: &[QuotaOp],
96        validate_only: bool,
97    ) -> Result<Option<KafkaError>, AdminError>;
98
99    // ── delegation-token RPCs (KIP-48) ────────────────────────────────
100    //
101    // Trait-level return type is `crabka_metadata::DelegationToken`
102    // (the image type) rather than the raw `Create/RenewDelegationToken`
103    // response. The `AdminClientLike for AdminClient` impl below
104    // reshapes wire responses into the image type — see the per-method
105    // comments there for the trade-off on how the renew path recovers
106    // the full token (the renew response carries only the new expiry).
107    async fn create_delegation_token_as_owner(
108        &mut self,
109        owner_principal_name: &str,
110        renewers: &[String],
111        max_lifetime_ms: i64,
112    ) -> Result<crabka_metadata::DelegationToken, AdminError>;
113    async fn renew_delegation_token(
114        &mut self,
115        hmac: &[u8],
116    ) -> Result<crabka_metadata::DelegationToken, AdminError>;
117    async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError>;
118    async fn describe_delegation_tokens_owned_by(
119        &mut self,
120        owner_principal: &str,
121    ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError>;
122}
123
124#[async_trait::async_trait]
125impl AdminClientLike for AdminClient {
126    async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
127        AdminClient::metadata(self, topics).await
128    }
129    async fn create_topics(
130        &mut self,
131        specs: &[CreateTopicSpec],
132        timeout_ms: i32,
133    ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
134        AdminClient::create_topics(self, specs, timeout_ms).await
135    }
136    async fn delete_topics(
137        &mut self,
138        names: &[&str],
139        timeout_ms: i32,
140    ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
141        AdminClient::delete_topics(self, names, timeout_ms).await
142    }
143    async fn create_partitions(
144        &mut self,
145        ops: &[CreatePartitionsOp],
146        timeout_ms: i32,
147    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
148        AdminClient::create_partitions(self, ops, timeout_ms).await
149    }
150    async fn describe_configs(
151        &mut self,
152        topics: &[&str],
153    ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
154        AdminClient::describe_configs(self, topics).await
155    }
156    async fn incremental_alter_configs(
157        &mut self,
158        ops: &[IncrementalAlterOp],
159    ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
160        AdminClient::incremental_alter_configs(self, ops).await
161    }
162    async fn alter_user_scram_credentials_sha512(
163        &mut self,
164        upsertions: &[ScramUpsertion],
165        deletions: &[ScramDeletion],
166    ) -> Result<Vec<ScramUserOutcome>, AdminError> {
167        AdminClient::alter_user_scram_credentials_sha512(self, upsertions, deletions).await
168    }
169    async fn alter_user_scram_credentials_sha256(
170        &mut self,
171        upsertions: &[ScramUpsertion],
172        deletions: &[ScramDeletion],
173    ) -> Result<Vec<ScramUserOutcome>, AdminError> {
174        AdminClient::alter_user_scram_credentials_sha256(self, upsertions, deletions).await
175    }
176    async fn describe_acls(
177        &mut self,
178        filter: &AclEntryFilter,
179    ) -> Result<Vec<AclEntry>, AdminError> {
180        AdminClient::describe_acls(self, filter).await
181    }
182    async fn create_acls(
183        &mut self,
184        creations: &[AclEntry],
185    ) -> Result<Vec<CreateAclOutcome>, AdminError> {
186        AdminClient::create_acls(self, creations).await
187    }
188    async fn delete_acls(
189        &mut self,
190        filters: &[AclEntryFilter],
191    ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
192        AdminClient::delete_acls(self, filters).await
193    }
194    async fn describe_user_quotas(
195        &mut self,
196        username: &str,
197    ) -> Result<UserQuotaConfig, AdminError> {
198        AdminClient::describe_user_quotas(self, username).await
199    }
200    async fn alter_user_quotas(
201        &mut self,
202        username: &str,
203        ops: &[QuotaOp],
204        validate_only: bool,
205    ) -> Result<Option<KafkaError>, AdminError> {
206        AdminClient::alter_user_quotas(self, username, ops, validate_only).await
207    }
208
209    // ── delegation-token RPCs ─────────────────────────────────────────
210    //
211    // The inherent `AdminClient` methods in `delegation_tokens.rs`
212    // return the wire-shaped response (`CreateDelegationTokenResponse`
213    // for create, `i64` new expiry for renew, `()` for expire, image
214    // `DelegationToken` for describe). The trait surface is normalised
215    // to `crabka_metadata::DelegationToken` so the operator's reconcile
216    // path is wire-agnostic.
217    async fn create_delegation_token_as_owner(
218        &mut self,
219        owner_principal_name: &str,
220        renewers: &[String],
221        max_lifetime_ms: i64,
222    ) -> Result<crabka_metadata::DelegationToken, AdminError> {
223        // The create-response carries every field the image type needs
224        // *except* the renewer list (the broker does not echo it back),
225        // so we reconstruct that from the caller's input — which is the
226        // ground truth anyway (KIP-48's create accepts the renewer set
227        // verbatim and the broker stores it as-is).
228        let resp = AdminClient::create_delegation_token_as_owner(
229            self,
230            owner_principal_name,
231            renewers,
232            max_lifetime_ms,
233        )
234        .await?;
235        let renewers_image = renewers
236            .iter()
237            .filter_map(|s| renewer_str_to_principal(s))
238            .collect();
239        Ok(crabka_metadata::DelegationToken {
240            token_id: resp.token_id,
241            owner: crabka_security::KafkaPrincipal {
242                principal_type: resp.principal_type,
243                name: resp.principal_name,
244            },
245            hmac: resp.hmac.to_vec(),
246            issue_timestamp_ms: resp.issue_timestamp_ms,
247            expiry_timestamp_ms: resp.expiry_timestamp_ms,
248            max_timestamp_ms: resp.max_timestamp_ms,
249            renewers: renewers_image,
250        })
251    }
252
253    async fn renew_delegation_token(
254        &mut self,
255        hmac: &[u8],
256    ) -> Result<crabka_metadata::DelegationToken, AdminError> {
257        // The renew-response (`RenewDelegationTokenResponse`) carries
258        // only the new `expiry_timestamp_ms`. To rebuild the full
259        // `DelegationToken` we follow up with `DescribeDelegationToken`
260        // with `owners=None` (describe all) and look up the entry by
261        // hmac. This adds one RPC per renewal — acceptable because the
262        // operator renews each user at most every `renew_before_expiry_ms`
263        // (24h by default). An alternative would have been to thread the
264        // owner principal into the trait method; keeping the surface
265        // `hmac`-only matches the inherent O3 signature.
266        let _new_expiry = AdminClient::renew_delegation_token(self, hmac).await?;
267        let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::default();
268        let resp = self.conn.send(req).await?;
269        if resp.error_code != 0 {
270            return Err(AdminError::Broker {
271                api: "DescribeDelegationToken",
272                code: resp.error_code,
273                name: kafka_error_name(resp.error_code),
274                message: None,
275            });
276        }
277        let matched = resp
278            .tokens
279            .into_iter()
280            .find(|t| t.hmac.as_ref() == hmac)
281            .ok_or_else(|| {
282                AdminError::Protocol(
283                    "RenewDelegationToken: follow-up describe did not return the renewed token"
284                        .into(),
285                )
286            })?;
287        Ok(crabka_metadata::DelegationToken {
288            token_id: matched.token_id,
289            owner: crabka_security::KafkaPrincipal {
290                principal_type: matched.principal_type,
291                name: matched.principal_name,
292            },
293            hmac: matched.hmac.to_vec(),
294            issue_timestamp_ms: matched.issue_timestamp,
295            expiry_timestamp_ms: matched.expiry_timestamp,
296            max_timestamp_ms: matched.max_timestamp,
297            renewers: matched
298                .renewers
299                .into_iter()
300                .map(|r| crabka_security::KafkaPrincipal {
301                    principal_type: r.principal_type,
302                    name: r.principal_name,
303                })
304                .collect(),
305        })
306    }
307
308    async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
309        AdminClient::expire_delegation_token(self, hmac).await
310    }
311
312    async fn describe_delegation_tokens_owned_by(
313        &mut self,
314        owner_principal: &str,
315    ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError> {
316        AdminClient::describe_delegation_tokens_owned_by(self, owner_principal).await
317    }
318}
319
320/// Split `"Type:Name"` (default type `User`) into a `KafkaPrincipal`.
321/// Empty input yields `None` so the create path doesn't manufacture a
322/// principal from a bare `""` renewer entry.
323fn renewer_str_to_principal(s: &str) -> Option<crabka_security::KafkaPrincipal> {
324    if s.is_empty() {
325        return None;
326    }
327    let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
328    Some(crabka_security::KafkaPrincipal {
329        principal_type: pt.to_string(),
330        name: pn.to_string(),
331    })
332}
333
334#[derive(Debug, Error)]
335pub enum AdminError {
336    #[error("no bootstrap address was reachable: tried {tried}")]
337    Connect { tried: usize },
338    #[error("controller routing failed after retry")]
339    NotControllerExhausted,
340    #[error("broker returned error: api={api} code={code} ({name}){detail}",
341            detail = .message.as_deref().map(|m| format!(" {m:?}")).unwrap_or_default())]
342    Broker {
343        api: &'static str,
344        code: i16,
345        name: &'static str,
346        message: Option<String>,
347    },
348    #[error("client-core: {0}")]
349    Transport(#[from] ClientError),
350    #[error("protocol: {0}")]
351    Protocol(String),
352}
353
354/// A Kafka-level error attached to a single per-resource outcome.
355#[derive(Debug, Clone)]
356pub struct KafkaError {
357    pub code: i16,
358    pub name: &'static str,
359    pub message: Option<String>,
360}
361
362/// Short-lived admin client targeting one cluster's controller.
363/// Optionally negotiates TLS/SASL via [`AdminClient::connect_secured`].
364pub struct AdminClient {
365    pub(crate) conn: Connection,
366    /// Client security carried forward to `reconnect` so a
367    /// `NOT_CONTROLLER` retry re-dials the new controller the same way.
368    security: Option<crabka_client_core::security::ClientSecurity>,
369}
370
371impl AdminClient {
372    /// Build the per-connect options for `client_id="crabka-operator"`,
373    /// carrying the supplied security policy.
374    fn opts(security: Option<crabka_client_core::security::ClientSecurity>) -> ConnectionOptions {
375        ConnectionOptions {
376            connect_timeout: Duration::from_secs(5),
377            request_timeout: Duration::from_secs(30),
378            client_id: "crabka-operator".to_string(),
379            security: security.map(Box::new),
380        }
381    }
382
383    /// Connect, applying optional client security. `None` = plaintext
384    /// (identical to [`AdminClient::connect`]).
385    ///
386    /// # Errors
387    /// Returns `AdminError::Connect { tried }` if no bootstrap address
388    /// accepted the (optionally secured) connection.
389    pub async fn connect_secured(
390        bootstrap_addrs: &[String],
391        security: Option<crabka_client_core::security::ClientSecurity>,
392    ) -> Result<Self, AdminError> {
393        let opts = Self::opts(security.clone());
394        for host_port in bootstrap_addrs {
395            match Self::connect_one(host_port, opts.clone()).await {
396                Ok(conn) => return Ok(Self { conn, security }),
397                Err(e) => {
398                    tracing::debug!(
399                        target: "crabka_client_admin",
400                        addr = %host_port,
401                        error = %e,
402                        "bootstrap connect failed",
403                    );
404                }
405            }
406        }
407        Err(AdminError::Connect {
408            tried: bootstrap_addrs.len(),
409        })
410    }
411
412    /// Try each bootstrap address in order. Each entry is `host:port`;
413    /// DNS is resolved via `tokio::net::lookup_host`. First successful
414    /// connect wins. Returns `AdminError::Connect { tried }` if none
415    /// responded. Plaintext; see [`AdminClient::connect_secured`].
416    pub async fn connect(bootstrap_addrs: &[String]) -> Result<Self, AdminError> {
417        Self::connect_secured(bootstrap_addrs, None).await
418    }
419
420    async fn connect_one(
421        host_port: &str,
422        opts: ConnectionOptions,
423    ) -> Result<Connection, AdminError> {
424        let mut addrs = tokio::net::lookup_host(host_port)
425            .await
426            .map_err(|e| AdminError::Protocol(format!("DNS lookup {host_port}: {e}")))?;
427        let addr = addrs
428            .next()
429            .ok_or_else(|| AdminError::Protocol(format!("no addresses for {host_port}")))?;
430        Connection::connect_with_options(addr, opts)
431            .await
432            .map_err(AdminError::from)
433    }
434
435    /// Replace the underlying connection. Used internally by the
436    /// `NOT_CONTROLLER` retry path to reconnect to the current controller.
437    pub(crate) async fn reconnect(&mut self, host_port: &str) -> Result<(), AdminError> {
438        let opts = Self::opts(self.security.clone());
439        self.conn = Self::connect_one(host_port, opts).await?;
440        Ok(())
441    }
442}
443
444/// Kafka error code: the broker is not the controller (KIP-129). The
445/// admin client refreshes its controller endpoint and retries once.
446pub(crate) const NOT_CONTROLLER: i16 = 41;
447
448/// Map a Kafka error code into a static name string for human-friendly
449/// `AdminError::Broker` formatting. Only the codes we actually surface
450/// today; unknown codes serialize as `"UNKNOWN"`.
451pub(crate) fn kafka_error_name(code: i16) -> &'static str {
452    match code {
453        0 => "NONE",
454        3 => "UNKNOWN_TOPIC_OR_PARTITION",
455        7 => "REQUEST_TIMED_OUT",
456        17 => "INVALID_TOPIC_EXCEPTION",
457        19 => "NOT_ENOUGH_REPLICAS",
458        36 => "TOPIC_ALREADY_EXISTS",
459        37 => "INVALID_PARTITIONS",
460        38 => "INVALID_REPLICATION_FACTOR",
461        39 => "INVALID_REPLICA_ASSIGNMENT",
462        40 => "INVALID_CONFIG",
463        41 => "NOT_CONTROLLER",
464        87 => "REASSIGNMENT_IN_PROGRESS",
465        _ => "UNKNOWN",
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472    use assert2::assert;
473
474    #[test]
475    fn kafka_error_name_known_codes() {
476        assert!(kafka_error_name(0) == "NONE");
477        assert!(kafka_error_name(36) == "TOPIC_ALREADY_EXISTS");
478        assert!(kafka_error_name(41) == "NOT_CONTROLLER");
479    }
480
481    #[test]
482    fn kafka_error_name_unknown_returns_unknown() {
483        assert!(kafka_error_name(9999) == "UNKNOWN");
484    }
485
486    #[tokio::test]
487    async fn connect_secured_threads_security_and_fails_to_closed_port() {
488        use crabka_client_core::security::{ClientSecurity, SaslCredentials};
489        use crabka_security::ListenerProtocol;
490
491        let security = ClientSecurity {
492            protocol: ListenerProtocol::SaslPlaintext,
493            tls: None,
494            sasl: Some(SaslCredentials::Plain {
495                username: "u".into(),
496                password: "p".into(),
497            }),
498            sasl_host: None,
499        };
500        // 127.0.0.1:1 has no listener; the secured connect must fail —
501        // proving the security arg is threaded (not a type error).
502        let res = AdminClient::connect_secured(&["127.0.0.1:1".to_string()], Some(security)).await;
503        assert!(res.is_err(), "connect to closed port must fail");
504    }
505}