crabka-client-admin 0.3.2

Operator-side admin client for Crabka clusters
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
//! Admin client for Crabka operators and control-plane services.
//!
//! The client targets the active controller and retries selected RPCs on a
//! refreshed controller connection when the broker returns `NOT_CONTROLLER`.
//! It supports plaintext by default and the same client-side TLS/SASL security
//! surface as [`crabka_client_core`] via [`AdminClient::connect_secured`].
//!
//! Built on `crabka_client_core::Connection`'s typed
//! `send::<R: ProtocolRequest>` so request-version negotiation is
//! automatic via the `ApiVersionTable` populated at connect time. The public
//! modules cover topic CRUD, partition expansion, config changes, SCRAM user
//! credentials, ACLs, quotas, delegation tokens, and log-dir inspection.

use std::time::Duration;

use crabka_client_core::{ClientError, Connection, ConnectionOptions};
use thiserror::Error;

pub mod configs;
pub mod delegation_tokens;
pub mod log_dirs;
pub mod quotas;
pub mod topics;
pub mod users;

pub use configs::{AlterConfigsOutcome, IncrementalAlterOp, TopicConfigOverrides};
pub use log_dirs::{AlterReplicaLogDirOutcome, LogDirInfo, LogDirPartitionInfo, LogDirTopicInfo};
pub use quotas::{QuotaOp, UserQuotaConfig, diff_user_quotas};
pub use topics::{
    CreatePartitionsOp, CreatePartitionsOutcome, CreateTopicOutcome, CreateTopicSpec,
    DeleteTopicOutcome, TopicMetadata, TopicMetadataEntry,
};
pub use users::{
    AclEntry, AclEntryFilter, AclOperation, CreateAclOutcome, DEFAULT_SCRAM_ITERATIONS,
    DeleteAclFilterOutcome, PatternType, PermissionType, ResourceType, ScramDeletion,
    ScramUpsertion, ScramUserOutcome,
};

/// Test seam for `AdminClient`. The operator's reconcile only needs
/// dynamic dispatch via this trait; production code wraps a concrete
/// `AdminClient`, while tests substitute a fake.
///
/// Methods take `&mut self` because the underlying `AdminClient`'s
/// `NOT_CONTROLLER` retry path reconnects the inner `Connection` in
/// place, which requires unique access.
#[async_trait::async_trait]
pub trait AdminClientLike: Send {
    async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError>;
    async fn create_topics(
        &mut self,
        specs: &[CreateTopicSpec],
        timeout_ms: i32,
    ) -> Result<Vec<CreateTopicOutcome>, AdminError>;
    async fn delete_topics(
        &mut self,
        names: &[&str],
        timeout_ms: i32,
    ) -> Result<Vec<DeleteTopicOutcome>, AdminError>;
    async fn create_partitions(
        &mut self,
        ops: &[CreatePartitionsOp],
        timeout_ms: i32,
    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError>;
    async fn describe_configs(
        &mut self,
        topics: &[&str],
    ) -> Result<Vec<TopicConfigOverrides>, AdminError>;
    async fn incremental_alter_configs(
        &mut self,
        ops: &[IncrementalAlterOp],
    ) -> Result<Vec<AlterConfigsOutcome>, AdminError>;
    async fn alter_user_scram_credentials_sha512(
        &mut self,
        upsertions: &[ScramUpsertion],
        deletions: &[ScramDeletion],
    ) -> Result<Vec<ScramUserOutcome>, AdminError>;
    /// SCRAM-SHA-256 sibling of
    /// [`Self::alter_user_scram_credentials_sha512`]. The operator
    /// calls this when a `KafkaUser.spec.authentication.type ==
    /// scram-sha-256`.
    async fn alter_user_scram_credentials_sha256(
        &mut self,
        upsertions: &[ScramUpsertion],
        deletions: &[ScramDeletion],
    ) -> Result<Vec<ScramUserOutcome>, AdminError>;
    async fn describe_acls(&mut self, filter: &AclEntryFilter)
    -> Result<Vec<AclEntry>, AdminError>;
    async fn create_acls(
        &mut self,
        creations: &[AclEntry],
    ) -> Result<Vec<CreateAclOutcome>, AdminError>;
    async fn delete_acls(
        &mut self,
        filters: &[AclEntryFilter],
    ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError>;
    async fn describe_user_quotas(&mut self, username: &str)
    -> Result<UserQuotaConfig, AdminError>;
    async fn alter_user_quotas(
        &mut self,
        username: &str,
        ops: &[QuotaOp],
        validate_only: bool,
    ) -> Result<Option<KafkaError>, AdminError>;

    // ── delegation-token RPCs (KIP-48) ────────────────────────────────
    //
    // Trait-level return type is `crabka_metadata::DelegationToken`
    // (the image type) rather than the raw `Create/RenewDelegationToken`
    // response. The `AdminClientLike for AdminClient` impl below
    // reshapes wire responses into the image type — see the per-method
    // comments there for the trade-off on how the renew path recovers
    // the full token (the renew response carries only the new expiry).
    async fn create_delegation_token_as_owner(
        &mut self,
        owner_principal_name: &str,
        renewers: &[String],
        max_lifetime_ms: i64,
    ) -> Result<crabka_metadata::DelegationToken, AdminError>;
    async fn renew_delegation_token(
        &mut self,
        hmac: &[u8],
    ) -> Result<crabka_metadata::DelegationToken, AdminError>;
    async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError>;
    async fn describe_delegation_tokens_owned_by(
        &mut self,
        owner_principal: &str,
    ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError>;
}

#[async_trait::async_trait]
impl AdminClientLike for AdminClient {
    async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
        AdminClient::metadata(self, topics).await
    }
    async fn create_topics(
        &mut self,
        specs: &[CreateTopicSpec],
        timeout_ms: i32,
    ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
        AdminClient::create_topics(self, specs, timeout_ms).await
    }
    async fn delete_topics(
        &mut self,
        names: &[&str],
        timeout_ms: i32,
    ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
        AdminClient::delete_topics(self, names, timeout_ms).await
    }
    async fn create_partitions(
        &mut self,
        ops: &[CreatePartitionsOp],
        timeout_ms: i32,
    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
        AdminClient::create_partitions(self, ops, timeout_ms).await
    }
    async fn describe_configs(
        &mut self,
        topics: &[&str],
    ) -> Result<Vec<TopicConfigOverrides>, AdminError> {
        AdminClient::describe_configs(self, topics).await
    }
    async fn incremental_alter_configs(
        &mut self,
        ops: &[IncrementalAlterOp],
    ) -> Result<Vec<AlterConfigsOutcome>, AdminError> {
        AdminClient::incremental_alter_configs(self, ops).await
    }
    async fn alter_user_scram_credentials_sha512(
        &mut self,
        upsertions: &[ScramUpsertion],
        deletions: &[ScramDeletion],
    ) -> Result<Vec<ScramUserOutcome>, AdminError> {
        AdminClient::alter_user_scram_credentials_sha512(self, upsertions, deletions).await
    }
    async fn alter_user_scram_credentials_sha256(
        &mut self,
        upsertions: &[ScramUpsertion],
        deletions: &[ScramDeletion],
    ) -> Result<Vec<ScramUserOutcome>, AdminError> {
        AdminClient::alter_user_scram_credentials_sha256(self, upsertions, deletions).await
    }
    async fn describe_acls(
        &mut self,
        filter: &AclEntryFilter,
    ) -> Result<Vec<AclEntry>, AdminError> {
        AdminClient::describe_acls(self, filter).await
    }
    async fn create_acls(
        &mut self,
        creations: &[AclEntry],
    ) -> Result<Vec<CreateAclOutcome>, AdminError> {
        AdminClient::create_acls(self, creations).await
    }
    async fn delete_acls(
        &mut self,
        filters: &[AclEntryFilter],
    ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
        AdminClient::delete_acls(self, filters).await
    }
    async fn describe_user_quotas(
        &mut self,
        username: &str,
    ) -> Result<UserQuotaConfig, AdminError> {
        AdminClient::describe_user_quotas(self, username).await
    }
    async fn alter_user_quotas(
        &mut self,
        username: &str,
        ops: &[QuotaOp],
        validate_only: bool,
    ) -> Result<Option<KafkaError>, AdminError> {
        AdminClient::alter_user_quotas(self, username, ops, validate_only).await
    }

    // ── delegation-token RPCs ─────────────────────────────────────────
    //
    // The inherent `AdminClient` methods in `delegation_tokens.rs`
    // return the wire-shaped response (`CreateDelegationTokenResponse`
    // for create, `i64` new expiry for renew, `()` for expire, image
    // `DelegationToken` for describe). The trait surface is normalised
    // to `crabka_metadata::DelegationToken` so the operator's reconcile
    // path is wire-agnostic.
    async fn create_delegation_token_as_owner(
        &mut self,
        owner_principal_name: &str,
        renewers: &[String],
        max_lifetime_ms: i64,
    ) -> Result<crabka_metadata::DelegationToken, AdminError> {
        // The create-response carries every field the image type needs
        // *except* the renewer list (the broker does not echo it back),
        // so we reconstruct that from the caller's input — which is the
        // ground truth anyway (KIP-48's create accepts the renewer set
        // verbatim and the broker stores it as-is).
        let resp = AdminClient::create_delegation_token_as_owner(
            self,
            owner_principal_name,
            renewers,
            max_lifetime_ms,
        )
        .await?;
        let renewers_image = renewers
            .iter()
            .filter_map(|s| renewer_str_to_principal(s))
            .collect();
        Ok(crabka_metadata::DelegationToken {
            token_id: resp.token_id,
            owner: crabka_security::KafkaPrincipal {
                principal_type: resp.principal_type,
                name: resp.principal_name,
            },
            hmac: resp.hmac.to_vec(),
            issue_timestamp_ms: resp.issue_timestamp_ms,
            expiry_timestamp_ms: resp.expiry_timestamp_ms,
            max_timestamp_ms: resp.max_timestamp_ms,
            renewers: renewers_image,
        })
    }

    async fn renew_delegation_token(
        &mut self,
        hmac: &[u8],
    ) -> Result<crabka_metadata::DelegationToken, AdminError> {
        // The renew-response (`RenewDelegationTokenResponse`) carries
        // only the new `expiry_timestamp_ms`. To rebuild the full
        // `DelegationToken` we follow up with `DescribeDelegationToken`
        // with `owners=None` (describe all) and look up the entry by
        // hmac. This adds one RPC per renewal — acceptable because the
        // operator renews each user at most every `renew_before_expiry_ms`
        // (24h by default). An alternative would have been to thread the
        // owner principal into the trait method; keeping the surface
        // `hmac`-only matches the inherent O3 signature.
        let _new_expiry = AdminClient::renew_delegation_token(self, hmac).await?;
        let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::default();
        let resp = self.conn.send(req).await?;
        if resp.error_code != 0 {
            return Err(AdminError::Broker {
                api: "DescribeDelegationToken",
                code: resp.error_code,
                name: kafka_error_name(resp.error_code),
                message: None,
            });
        }
        let matched = resp
            .tokens
            .into_iter()
            .find(|t| t.hmac.as_ref() == hmac)
            .ok_or_else(|| {
                AdminError::Protocol(
                    "RenewDelegationToken: follow-up describe did not return the renewed token"
                        .into(),
                )
            })?;
        Ok(crabka_metadata::DelegationToken {
            token_id: matched.token_id,
            owner: crabka_security::KafkaPrincipal {
                principal_type: matched.principal_type,
                name: matched.principal_name,
            },
            hmac: matched.hmac.to_vec(),
            issue_timestamp_ms: matched.issue_timestamp,
            expiry_timestamp_ms: matched.expiry_timestamp,
            max_timestamp_ms: matched.max_timestamp,
            renewers: matched
                .renewers
                .into_iter()
                .map(|r| crabka_security::KafkaPrincipal {
                    principal_type: r.principal_type,
                    name: r.principal_name,
                })
                .collect(),
        })
    }

    async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
        AdminClient::expire_delegation_token(self, hmac).await
    }

    async fn describe_delegation_tokens_owned_by(
        &mut self,
        owner_principal: &str,
    ) -> Result<Vec<crabka_metadata::DelegationToken>, AdminError> {
        AdminClient::describe_delegation_tokens_owned_by(self, owner_principal).await
    }
}

/// Split `"Type:Name"` (default type `User`) into a `KafkaPrincipal`.
/// Empty input yields `None` so the create path doesn't manufacture a
/// principal from a bare `""` renewer entry.
fn renewer_str_to_principal(s: &str) -> Option<crabka_security::KafkaPrincipal> {
    if s.is_empty() {
        return None;
    }
    let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
    Some(crabka_security::KafkaPrincipal {
        principal_type: pt.to_string(),
        name: pn.to_string(),
    })
}

#[derive(Debug, Error)]
pub enum AdminError {
    #[error("no bootstrap address was reachable: tried {tried}")]
    Connect { tried: usize },
    #[error("controller routing failed after retry")]
    NotControllerExhausted,
    #[error("broker returned error: api={api} code={code} ({name}){detail}",
            detail = .message.as_deref().map(|m| format!(" {m:?}")).unwrap_or_default())]
    Broker {
        api: &'static str,
        code: i16,
        name: &'static str,
        message: Option<String>,
    },
    #[error("client-core: {0}")]
    Transport(#[from] ClientError),
    #[error("protocol: {0}")]
    Protocol(String),
}

/// A Kafka-level error attached to a single per-resource outcome.
#[derive(Debug, Clone)]
pub struct KafkaError {
    pub code: i16,
    pub name: &'static str,
    pub message: Option<String>,
}

/// Short-lived admin client targeting one cluster's controller.
/// Optionally negotiates TLS/SASL via [`AdminClient::connect_secured`].
pub struct AdminClient {
    pub(crate) conn: Connection,
    /// Client security carried forward to `reconnect` so a
    /// `NOT_CONTROLLER` retry re-dials the new controller the same way.
    security: Option<crabka_client_core::security::ClientSecurity>,
}

impl AdminClient {
    /// Build the per-connect options for `client_id="crabka-operator"`,
    /// carrying the supplied security policy.
    fn opts(security: Option<crabka_client_core::security::ClientSecurity>) -> ConnectionOptions {
        ConnectionOptions {
            connect_timeout: Duration::from_secs(5),
            request_timeout: Duration::from_secs(30),
            client_id: "crabka-operator".to_string(),
            security: security.map(Box::new),
        }
    }

    /// Connect, applying optional client security. `None` = plaintext
    /// (identical to [`AdminClient::connect`]).
    ///
    /// # Errors
    /// Returns `AdminError::Connect { tried }` if no bootstrap address
    /// accepted the (optionally secured) connection.
    pub async fn connect_secured(
        bootstrap_addrs: &[String],
        security: Option<crabka_client_core::security::ClientSecurity>,
    ) -> Result<Self, AdminError> {
        let opts = Self::opts(security.clone());
        for host_port in bootstrap_addrs {
            match Self::connect_one(host_port, opts.clone()).await {
                Ok(conn) => return Ok(Self { conn, security }),
                Err(e) => {
                    tracing::debug!(
                        target: "crabka_client_admin",
                        addr = %host_port,
                        error = %e,
                        "bootstrap connect failed",
                    );
                }
            }
        }
        Err(AdminError::Connect {
            tried: bootstrap_addrs.len(),
        })
    }

    /// Try each bootstrap address in order. Each entry is `host:port`;
    /// DNS is resolved via `tokio::net::lookup_host`. First successful
    /// connect wins. Returns `AdminError::Connect { tried }` if none
    /// responded. Plaintext; see [`AdminClient::connect_secured`].
    pub async fn connect(bootstrap_addrs: &[String]) -> Result<Self, AdminError> {
        Self::connect_secured(bootstrap_addrs, None).await
    }

    async fn connect_one(
        host_port: &str,
        opts: ConnectionOptions,
    ) -> Result<Connection, AdminError> {
        let mut addrs = tokio::net::lookup_host(host_port)
            .await
            .map_err(|e| AdminError::Protocol(format!("DNS lookup {host_port}: {e}")))?;
        let addr = addrs
            .next()
            .ok_or_else(|| AdminError::Protocol(format!("no addresses for {host_port}")))?;
        Connection::connect_with_options(addr, opts)
            .await
            .map_err(AdminError::from)
    }

    /// Replace the underlying connection. Used internally by the
    /// `NOT_CONTROLLER` retry path to reconnect to the current controller.
    pub(crate) async fn reconnect(&mut self, host_port: &str) -> Result<(), AdminError> {
        let opts = Self::opts(self.security.clone());
        self.conn = Self::connect_one(host_port, opts).await?;
        Ok(())
    }
}

/// Kafka error code: the broker is not the controller (KIP-129). The
/// admin client refreshes its controller endpoint and retries once.
pub(crate) const NOT_CONTROLLER: i16 = 41;

/// Map a Kafka error code into a static name string for human-friendly
/// `AdminError::Broker` formatting. Only the codes we actually surface
/// today; unknown codes serialize as `"UNKNOWN"`.
pub(crate) fn kafka_error_name(code: i16) -> &'static str {
    match code {
        0 => "NONE",
        3 => "UNKNOWN_TOPIC_OR_PARTITION",
        7 => "REQUEST_TIMED_OUT",
        17 => "INVALID_TOPIC_EXCEPTION",
        19 => "NOT_ENOUGH_REPLICAS",
        36 => "TOPIC_ALREADY_EXISTS",
        37 => "INVALID_PARTITIONS",
        38 => "INVALID_REPLICATION_FACTOR",
        39 => "INVALID_REPLICA_ASSIGNMENT",
        40 => "INVALID_CONFIG",
        41 => "NOT_CONTROLLER",
        87 => "REASSIGNMENT_IN_PROGRESS",
        _ => "UNKNOWN",
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn kafka_error_name_known_codes() {
        assert!(kafka_error_name(0) == "NONE");
        assert!(kafka_error_name(36) == "TOPIC_ALREADY_EXISTS");
        assert!(kafka_error_name(41) == "NOT_CONTROLLER");
    }

    #[test]
    fn kafka_error_name_unknown_returns_unknown() {
        assert!(kafka_error_name(9999) == "UNKNOWN");
    }

    #[tokio::test]
    async fn connect_secured_threads_security_and_fails_to_closed_port() {
        use crabka_client_core::security::{ClientSecurity, SaslCredentials};
        use crabka_security::ListenerProtocol;

        let security = ClientSecurity {
            protocol: ListenerProtocol::SaslPlaintext,
            tls: None,
            sasl: Some(SaslCredentials::Plain {
                username: "u".into(),
                password: "p".into(),
            }),
            sasl_host: None,
        };
        // 127.0.0.1:1 has no listener; the secured connect must fail —
        // proving the security arg is threaded (not a type error).
        let res = AdminClient::connect_secured(&["127.0.0.1:1".to_string()], Some(security)).await;
        assert!(res.is_err(), "connect to closed port must fail");
    }
}