Skip to main content

crabka_operator/controller/
user.rs

1//! `KafkaUser` reconciler — unidirectional (CRD wins).
2//!
3//! Provisions SCRAM-SHA-512 credentials via `AlterUserScramCredentials`
4//! and keeps the ACL set in sync via `CreateAcls` / `DeleteAcls`. The
5//! generated password lives in a Kubernetes Secret owner-referenced to
6//! the `KafkaUser`, so cluster delete cascades automatically.
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10use std::time::Duration;
11
12use base64::Engine as _;
13use crabka_client_admin::{
14    AclEntry, AclEntryFilter, AclOperation, AdminError, DEFAULT_SCRAM_ITERATIONS, PatternType,
15    PermissionType, ResourceType, ScramDeletion, ScramUpsertion,
16};
17use futures::StreamExt as _;
18use k8s_openapi::ByteString;
19use k8s_openapi::api::core::v1::Secret;
20use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
21use kube::api::{Api, Patch, PatchParams};
22use kube::runtime::controller::{Action, Controller};
23use kube::runtime::reflector::ObjectRef;
24use kube::runtime::watcher;
25use kube::{Resource, ResourceExt as _};
26use ring::rand::{SecureRandom, SystemRandom};
27use serde_json::json;
28
29use crate::context::Context;
30use crate::controller::common::{FIELD_MANAGER, ReconcileError, condition};
31use crate::controller::topic::internal_listener_bootstrap;
32use crate::controller::user_delegation_token::{self, KubeKafkaUserStatusWriter, KubeSecretWriter};
33use crate::controller::user_tls;
34use crate::crd::{
35    AclOp, AclPatternType, AclPermission, AclResourceKind, Authentication, Kafka, KafkaUser,
36    KafkaUserAuthorization as Authorization,
37};
38
39const FINALIZER: &str = "crabka.io/user-finalizer";
40
41/// Run the controller forever.
42pub async fn run(ctx: Context) -> anyhow::Result<()> {
43    let user_api: Api<KafkaUser> = Api::all(ctx.client.clone());
44    let kafka_api: Api<Kafka> = Api::all(ctx.client.clone());
45    Controller::new(user_api, watcher::Config::default())
46        .watches(kafka_api, watcher::Config::default(), |_kafka| {
47            // Empty mapper — periodic requeue picks up the transition.
48            // Same posture as `topic.rs`.
49            Vec::<ObjectRef<KafkaUser>>::new().into_iter()
50        })
51        .run(reconcile, error_policy, Arc::new(ctx))
52        .for_each(|res| async move {
53            match res {
54                Ok((obj, _)) => tracing::debug!(?obj, "user reconciled"),
55                Err(e) => tracing::warn!(error = %e, "user reconcile error"),
56            }
57        })
58        .await;
59    Ok(())
60}
61
62pub fn error_policy(_obj: Arc<KafkaUser>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
63    tracing::warn!(error = %err, "user reconcile error, requeueing");
64    Action::requeue(Duration::from_secs(15))
65}
66
67#[allow(clippy::too_many_lines)] // linear pipeline; extraction hurts more than helps
68pub async fn reconcile(obj: Arc<KafkaUser>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
69    let ns = obj.namespace().unwrap_or_else(|| "default".into());
70    let name = obj.name_any();
71    let user_api: Api<KafkaUser> = Api::namespaced(ctx.client.clone(), &ns);
72    // Failure paths below preserve whatever `quotasInSync` / `tls*` was
73    // already published — they haven't touched broker state, so the
74    // prior value is still the truth.
75    let prior_quotas_in_sync = obj.status.as_ref().is_some_and(|s| s.quotas_in_sync);
76    let prior_tls = obj.status.as_ref().is_some_and(|s| s.tls);
77    let prior_external = obj.status.as_ref().is_some_and(|s| s.external);
78    let prior_tls_not_after = obj
79        .status
80        .as_ref()
81        .and_then(|s| s.tls_cert_not_after.clone());
82    let prior_tls_principal = obj.status.as_ref().and_then(|s| s.tls_principal.clone());
83
84    // 1. Cluster label
85    let cluster = obj
86        .meta()
87        .labels
88        .as_ref()
89        .and_then(|l| l.get("crabka.io/cluster").cloned());
90    let Some(cluster) = cluster else {
91        patch_status(
92            &user_api,
93            &name,
94            StatusPatch {
95                obj: &obj,
96                status: "False",
97                reason: "MissingClusterLabel",
98                message: "metadata.labels[\"crabka.io/cluster\"] is required",
99                scram_sha512: false,
100                scram_sha256: false,
101                tls: prior_tls,
102                external: prior_external,
103                tls_cert_not_after: prior_tls_not_after.clone(),
104                tls_principal: prior_tls_principal.clone(),
105                advance_generation: false,
106                quotas_in_sync: prior_quotas_in_sync,
107            },
108        )
109        .await?;
110        return Ok(Action::requeue(Duration::from_mins(1)));
111    };
112
113    // 2. Validate spec
114    if let Err(msg) = validate_spec(&obj.spec) {
115        patch_status(
116            &user_api,
117            &name,
118            StatusPatch {
119                obj: &obj,
120                status: "False",
121                reason: "InvalidSpec",
122                message: &msg,
123                scram_sha512: false,
124                scram_sha256: false,
125                tls: prior_tls,
126                external: prior_external,
127                tls_cert_not_after: prior_tls_not_after.clone(),
128                tls_principal: prior_tls_principal.clone(),
129                advance_generation: false,
130                quotas_in_sync: prior_quotas_in_sync,
131            },
132        )
133        .await?;
134        return Ok(Action::requeue(Duration::from_mins(5)));
135    }
136
137    // 3. Look up the Kafka + bootstrap
138    let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
139    let kafka = kafka_api.get_opt(&cluster).await?;
140    let bootstrap = kafka.as_ref().and_then(internal_listener_bootstrap);
141    let Some(bootstrap) = bootstrap else {
142        patch_status(
143            &user_api,
144            &name,
145            StatusPatch {
146                obj: &obj,
147                status: "False",
148                reason: "ClusterNotReady",
149                message: &format!("Kafka/{cluster} not Ready or no internal listener"),
150                scram_sha512: false,
151                scram_sha256: false,
152                tls: prior_tls,
153                external: prior_external,
154                tls_cert_not_after: prior_tls_not_after.clone(),
155                tls_principal: prior_tls_principal.clone(),
156                advance_generation: false,
157                quotas_in_sync: prior_quotas_in_sync,
158            },
159        )
160        .await?;
161        return Ok(Action::requeue(Duration::from_secs(30)));
162    };
163
164    let principal = principal_for(&name, &obj.spec.authentication);
165    // For TLS users the broker stores quotas keyed by the cert DN
166    // (i.e. `principal` without the `User:` prefix).  For SCRAM users
167    // this is just `name`.
168    let quota_username: String = principal
169        .strip_prefix("User:")
170        .unwrap_or(&principal)
171        .to_string();
172
173    // 4. Finalizer / delete path
174    if obj.meta().deletion_timestamp.is_some() {
175        // Best-effort cleanup; errors are logged but don't block finalizer removal.
176        if let Ok(client) = ctx.admin_client_for(&cluster, &bootstrap).await {
177            let mut admin = client.lock().await;
178            // SCRAM-SHA-256 + SCRAM-SHA-512 both reach
179            // `alter_user_scram_credentials_*` for the finalizer
180            // deletion — the broker stores credentials per mechanism,
181            // so we tear down whichever mechanism this user used.
182            let scram_finalizer = match &obj.spec.authentication {
183                Authentication::ScramSha512(_) => Some(true),
184                Authentication::ScramSha256(_) => Some(false),
185                _ => None,
186            };
187            if let Some(is_sha512) = scram_finalizer {
188                let deletion = ScramDeletion {
189                    username: name.clone(),
190                };
191                let result = if is_sha512 {
192                    admin
193                        .alter_user_scram_credentials_sha512(&[], &[deletion])
194                        .await
195                } else {
196                    admin
197                        .alter_user_scram_credentials_sha256(&[], &[deletion])
198                        .await
199                };
200                if let Err(e) = result {
201                    tracing::warn!(error = %e, %name, "scram delete during finalizer failed");
202                }
203            }
204            // Delegation-token finalizer — expire every token
205            // owned by `User:<name>` via `ExpireDelegationToken` (period
206            // -1 = immediate tombstone). Best-effort: errors are logged
207            // and never block finalizer removal.
208            //
209            // The `expire_owned_tokens` helper Describe-lists tokens owned
210            // by `User:<name>` then calls `ExpireDelegationToken` for each.
211            // It takes a `&dyn DelegationTokenAdmin` — the
212            // `AdminClientHandle` itself implements that trait (see
213            // `user_delegation_token.rs`), but we already hold the mutex
214            // guard `admin` here, so we drop it briefly so the trait impl
215            // can re-acquire.
216            if matches!(&obj.spec.authentication, Authentication::DelegationToken(_)) {
217                drop(admin);
218                if let Err(e) = user_delegation_token::expire_owned_tokens(&name, &client).await {
219                    tracing::warn!(
220                        error = %e,
221                        %name,
222                        "delegation-token finalizer expire failed",
223                    );
224                }
225                // Re-acquire for the ACL + quota tombstones below.
226                admin = client.lock().await;
227            }
228            let filter = AclEntryFilter {
229                principal: Some(principal.clone()),
230                ..Default::default()
231            };
232            match admin.delete_acls(&[filter]).await {
233                Ok(_) => {}
234                Err(e) => tracing::warn!(error = %e, %name, "acl delete during finalizer failed"),
235            }
236            // Quotas: best-effort tombstone every key the broker
237            // currently has for this user. Same posture as the SCRAM
238            // and ACL paths above — log on error, never block the
239            // finalizer.  Keyed by DN-minus-`User:` for TLS users.
240            match admin.describe_user_quotas(&quota_username).await {
241                Ok(cur) if !cur.is_empty() => {
242                    let ops: Vec<_> = cur
243                        .into_keys()
244                        .map(|key| crabka_client_admin::QuotaOp::Remove { key })
245                        .collect();
246                    if let Err(e) = admin.alter_user_quotas(&quota_username, &ops, false).await {
247                        tracing::warn!(error = %e, %name, "quota delete during finalizer failed");
248                    }
249                }
250                Ok(_) => {}
251                Err(e) => {
252                    tracing::warn!(error = %e, %name, "quota describe during finalizer failed");
253                }
254            }
255        }
256        remove_finalizer(&user_api, &name).await?;
257        return Ok(Action::await_change());
258    }
259
260    // 5. Ensure finalizer
261    if !has_finalizer(&obj) {
262        add_finalizer(&user_api, &name).await?;
263        return Ok(Action::requeue(Duration::ZERO));
264    }
265
266    // 6. Provision credentials per auth type.
267    //
268    // SCRAM arm: ensure password Secret, then issue
269    // `AlterUserScramCredentials` upsert.
270    // TLS arm: ensure clients-CA, then issue (or reuse) the per-user
271    // cert Secret. No broker call needed — the broker learns the user
272    // from the certificate at mTLS handshake time.
273    let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &ns);
274    let tls_not_after: Option<String> = match &obj.spec.authentication {
275        Authentication::ScramSha512(_) | Authentication::ScramSha256(_) => {
276            // SCRAM-SHA-256 + SCRAM-SHA-512 share the
277            // password-secret + admin-client + status-patch flow; the
278            // only difference is which `_sha256` / `_sha512` admin
279            // method we call. Pluck the per-variant knobs here.
280            let (password_len, iterations, is_sha512) = match &obj.spec.authentication {
281                Authentication::ScramSha512(s) => (
282                    s.password_length.unwrap_or(32),
283                    s.iterations.unwrap_or(DEFAULT_SCRAM_ITERATIONS),
284                    true,
285                ),
286                Authentication::ScramSha256(s) => (
287                    s.password_length.unwrap_or(32),
288                    s.iterations.unwrap_or(DEFAULT_SCRAM_ITERATIONS),
289                    false,
290                ),
291                _ => unreachable!(),
292            };
293            let password = match ensure_password_secret(&secret_api, &obj, password_len).await {
294                Ok(p) => p,
295                Err(e) => {
296                    tracing::warn!(error = %e, %name, "ensure_password_secret failed");
297                    return Ok(Action::requeue(Duration::from_secs(15)));
298                }
299            };
300
301            // 7. Connect + upsert SCRAM (within the SCRAM arm). The
302            // admin connection is opened again below for ACL + quota
303            // work; the brief duplication is intentional and keeps
304            // each arm self-contained.
305            let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
306                Ok(h) => h,
307                Err(e) => {
308                    tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
309                    return Ok(Action::requeue(Duration::from_secs(15)));
310                }
311            };
312            let mut admin = admin_handle.lock().await;
313            let upsertions = [ScramUpsertion {
314                username: name.clone(),
315                password,
316                iterations,
317            }];
318            let outcomes = if is_sha512 {
319                admin
320                    .alter_user_scram_credentials_sha512(&upsertions, &[])
321                    .await
322            } else {
323                admin
324                    .alter_user_scram_credentials_sha256(&upsertions, &[])
325                    .await
326            };
327            let outcomes = match outcomes {
328                Ok(v) => v,
329                Err(e) => {
330                    tracing::warn!(error = %e, "AlterUserScramCredentials transport failure");
331                    let is_transport = matches!(e, AdminError::Transport(_));
332                    drop(admin);
333                    if is_transport {
334                        ctx.drop_admin_client(&cluster).await;
335                    }
336                    return Ok(Action::requeue(Duration::from_secs(15)));
337                }
338            };
339            if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
340                drop(admin);
341                patch_status(
342                    &user_api,
343                    &name,
344                    StatusPatch {
345                        obj: &obj,
346                        status: "False",
347                        reason: "BrokerError",
348                        message: &format!("AlterUserScramCredentials: {} ({})", err.name, err.code),
349                        scram_sha512: false,
350                        scram_sha256: false,
351                        tls: prior_tls,
352                        external: prior_external,
353                        tls_cert_not_after: prior_tls_not_after.clone(),
354                        tls_principal: prior_tls_principal.clone(),
355                        advance_generation: false,
356                        quotas_in_sync: prior_quotas_in_sync,
357                    },
358                )
359                .await?;
360                return Ok(Action::requeue(Duration::from_secs(15)));
361            }
362            None
363        }
364        Authentication::Tls(tls_auth) => {
365            let kafka_ref = kafka
366                .as_ref()
367                .expect("bootstrap presence implies Kafka resource is Some");
368            let ca_outcome = match crate::controller::cluster_ca::reconcile_ca(
369                &secret_api,
370                kafka_ref,
371                crate::controller::cluster_ca::WhichCa::Clients,
372                false,
373                false,
374                true,
375                time::OffsetDateTime::now_utc(),
376            )
377            .await
378            {
379                Ok(o) => o,
380                Err(e) => {
381                    tracing::warn!(error = %e, %cluster, "clients CA reconcile failed");
382                    return Ok(Action::requeue(Duration::from_secs(15)));
383                }
384            };
385            // Sign user certs with the clients CA's active signer (the
386            // first block of the trust bundle, paired with the active key).
387            let ca = ca_outcome.signing_material;
388            let cert_status =
389                match user_tls::ensure_user_cert_secret(&secret_api, &obj, &ca, tls_auth).await {
390                    Ok(s) => s,
391                    Err(e) => {
392                        tracing::warn!(error = %e, %name, "ensure_user_cert_secret failed");
393                        return Ok(Action::requeue(Duration::from_secs(15)));
394                    }
395                };
396            if cert_status.issued_new {
397                tracing::info!(
398                    %name,
399                    not_after = %cert_status.not_after,
400                    "issued new client cert",
401                );
402            }
403            Some(cert_status.not_after)
404        }
405        // Credential-less user: the operator does not create a Secret
406        // or issue a cert. ACL + quota reconciliation below is
407        // principal-driven and Just Works for this arm.
408        Authentication::TlsExternal => None,
409        // Dispatch the delegation-token reconcile.
410        //
411        // The token lifecycle (Describe → decide → Create/Renew/NoOp/Cycle
412        // → Secret + status patch) lives in
413        // `controller::user_delegation_token`. The `AdminClientHandle`
414        // now implements `DelegationTokenAdmin` directly (see the
415        // `impl` block in `user_delegation_token.rs`), so we hand the
416        // handle in as `&dyn DelegationTokenAdmin`.
417        //
418        // The module's `reconcile` returns an `Action` on its own —
419        // computed from the live token's `expiry_timestamp_ms` minus
420        // the spec's `renew_before_expiry_ms` — so this arm bypasses
421        // the trailing ACL/quota reconciliation block by returning
422        // early. (ACLs + quotas for delegation-token users are reached
423        // on the next requeue: `reconcile` here returns just the
424        // credential-side `Action`; the ACL/quota side would otherwise
425        // need a second admin-client lock under the held mutex.)
426        Authentication::DelegationToken(dt) => {
427            let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
428                Ok(h) => h,
429                Err(e) => {
430                    tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
431                    return Ok(Action::requeue(Duration::from_secs(15)));
432                }
433            };
434            let secret_writer = KubeSecretWriter {
435                api: secret_api.clone(),
436            };
437            let user_writer = KubeKafkaUserStatusWriter {
438                api: user_api.clone(),
439            };
440            let now_ms = chrono::Utc::now().timestamp_millis();
441            let out = user_delegation_token::reconcile(
442                &obj,
443                dt,
444                &admin_handle,
445                &secret_writer,
446                &user_writer,
447                now_ms,
448            )
449            .await?;
450            // ACL + quota reconciliation for delegation-token users
451            // happens in a follow-up reconcile pass — the token
452            // module's `reconcile` already wrote the credential Secret
453            // and patched status, and the operator's per-user requeue
454            // will pick up ACL drift on the next pass. Return early
455            // with the renew-driven `Action`.
456            return Ok(out.action);
457        }
458    };
459
460    // Open admin for ACL + quota reconciliation (steps 8 + 9). Common
461    // to both auth arms.
462    let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
463        Ok(h) => h,
464        Err(e) => {
465            tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
466            return Ok(Action::requeue(Duration::from_secs(15)));
467        }
468    };
469    let mut admin = admin_handle.lock().await;
470
471    // 8. Reconcile ACLs
472    let desired: BTreeSet<AclEntry> = expand_spec_acls(obj.spec.authorization.as_ref(), &principal)
473        .into_iter()
474        .collect();
475    let filter = AclEntryFilter {
476        principal: Some(principal.clone()),
477        ..Default::default()
478    };
479    let current_vec = match admin.describe_acls(&filter).await {
480        Ok(v) => v,
481        Err(e) => {
482            tracing::warn!(error = %e, "DescribeAcls failure");
483            let is_transport = matches!(e, AdminError::Transport(_));
484            drop(admin);
485            if is_transport {
486                ctx.drop_admin_client(&cluster).await;
487            }
488            return Ok(Action::requeue(Duration::from_secs(15)));
489        }
490    };
491    let current: BTreeSet<AclEntry> = current_vec.into_iter().collect();
492    let (additions, deletions) = diff_acls(&current, &desired);
493
494    if !additions.is_empty()
495        && let Err(e) = apply_create_acls(&mut admin, &additions).await
496    {
497        let is_transport = matches!(e, AdminError::Transport(_));
498        drop(admin);
499        if is_transport {
500            ctx.drop_admin_client(&cluster).await;
501        }
502        return user_broker_error(&user_api, &name, &obj, e, "CreateAcls").await;
503    }
504    if !deletions.is_empty()
505        && let Err(e) = apply_delete_acls(&mut admin, &deletions).await
506    {
507        let is_transport = matches!(e, AdminError::Transport(_));
508        drop(admin);
509        if is_transport {
510            ctx.drop_admin_client(&cluster).await;
511        }
512        return user_broker_error(&user_api, &name, &obj, e, "DeleteAcls").await;
513    }
514
515    // 9. Reconcile quotas. `spec.quotas == None` leaves the
516    // broker's quota state untouched — a deliberate opt-in posture
517    // matching Strimzi (the quotas section being absent ≠ "wipe all
518    // quotas"). To clear quotas via the CRD, set `quotas: {}` (empty
519    // object) — that desired-state map is empty and the diff
520    // tombstones whatever the broker has.
521    let quotas_in_sync = if let Some(spec_quotas) = obj.spec.quotas.as_ref() {
522        let desired = spec_quotas.to_quota_map();
523        let current = match admin.describe_user_quotas(&quota_username).await {
524            Ok(c) => c,
525            Err(e) => {
526                tracing::warn!(error = %e, "DescribeClientQuotas failure");
527                let is_transport = matches!(e, AdminError::Transport(_));
528                drop(admin);
529                if is_transport {
530                    ctx.drop_admin_client(&cluster).await;
531                }
532                return Ok(Action::requeue(Duration::from_secs(15)));
533            }
534        };
535        let ops = crabka_client_admin::diff_user_quotas(&current, &desired);
536        if !ops.is_empty()
537            && let Err(e) = apply_alter_user_quotas(&mut admin, &quota_username, &ops).await
538        {
539            let is_transport = matches!(e, AdminError::Transport(_));
540            drop(admin);
541            if is_transport {
542                ctx.drop_admin_client(&cluster).await;
543            }
544            return user_broker_error(&user_api, &name, &obj, e, "AlterClientQuotas").await;
545        }
546        true
547    } else {
548        false
549    };
550
551    let is_scram_sha512 = matches!(&obj.spec.authentication, Authentication::ScramSha512(_));
552    let is_scram_sha256 = matches!(&obj.spec.authentication, Authentication::ScramSha256(_));
553    let is_tls = matches!(&obj.spec.authentication, Authentication::Tls(_));
554    let is_external = matches!(&obj.spec.authentication, Authentication::TlsExternal);
555    patch_status(
556        &user_api,
557        &name,
558        StatusPatch {
559            obj: &obj,
560            status: "True",
561            reason: "Ready",
562            message: "user in sync",
563            scram_sha512: is_scram_sha512,
564            scram_sha256: is_scram_sha256,
565            tls: is_tls && tls_not_after.is_some(),
566            external: is_external,
567            tls_cert_not_after: tls_not_after.clone(),
568            // `tls_principal` is the principal the operator pinned in ACLs.
569            // For TLS users that's `User:CN=<name>`; for `tls-external`
570            // users it's `User:<name>`. SCRAM users skip this — `username`
571            // already carries the same string for them.
572            tls_principal: if is_tls || is_external {
573                Some(principal.clone())
574            } else {
575                prior_tls_principal.clone()
576            },
577            advance_generation: true,
578            quotas_in_sync,
579        },
580    )
581    .await?;
582    // TLS certs live ~365d by default with a 30d renewal window — once
583    // a minute requeue is overkill. SCRAM stays at the existing
584    // cadence (password rotation is operator-driven, not time-driven,
585    // but the per-minute requeue covers external drift in ACLs /
586    // quotas).
587    // `tls-external` users have no operator-owned credential to rotate,
588    // but ACLs + quotas can drift externally — keep the per-minute
589    // requeue to detect that (same cadence as SCRAM, different reason).
590    // Delegation-token users do not reach this match — that
591    // arm returns its renew-driven `Action` earlier (see the
592    // `Authentication::DelegationToken(dt)` arm above).
593    #[allow(clippy::match_same_arms)]
594    let requeue = match &obj.spec.authentication {
595        Authentication::ScramSha512(_) | Authentication::ScramSha256(_) => Duration::from_mins(1),
596        Authentication::Tls(_) => Duration::from_hours(6),
597        Authentication::TlsExternal => Duration::from_mins(1),
598        Authentication::DelegationToken(_) => unreachable!(
599            "delegation-token arm returns early after user_delegation_token::reconcile",
600        ),
601    };
602    Ok(Action::requeue(requeue))
603}
604
605async fn apply_alter_user_quotas(
606    admin: &mut tokio::sync::MutexGuard<'_, dyn crabka_client_admin::AdminClientLike + Send>,
607    username: &str,
608    ops: &[crabka_client_admin::QuotaOp],
609) -> Result<(), AdminError> {
610    if let Some(err) = admin.alter_user_quotas(username, ops, false).await? {
611        return Err(AdminError::Broker {
612            api: "AlterClientQuotas",
613            code: err.code,
614            name: err.name,
615            message: err.message,
616        });
617    }
618    Ok(())
619}
620
621async fn apply_create_acls(
622    admin: &mut tokio::sync::MutexGuard<'_, dyn crabka_client_admin::AdminClientLike + Send>,
623    additions: &[AclEntry],
624) -> Result<(), AdminError> {
625    let outcomes = admin.create_acls(additions).await?;
626    if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
627        return Err(AdminError::Broker {
628            api: "CreateAcls",
629            code: err.code,
630            name: err.name,
631            message: err.message,
632        });
633    }
634    Ok(())
635}
636
637async fn apply_delete_acls(
638    admin: &mut tokio::sync::MutexGuard<'_, dyn crabka_client_admin::AdminClientLike + Send>,
639    deletions: &[AclEntry],
640) -> Result<(), AdminError> {
641    let filters: Vec<AclEntryFilter> = deletions.iter().map(entry_to_exact_filter).collect();
642    let outcomes = admin.delete_acls(&filters).await?;
643    if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
644        return Err(AdminError::Broker {
645            api: "DeleteAcls",
646            code: err.code,
647            name: err.name,
648            message: err.message,
649        });
650    }
651    Ok(())
652}
653
654async fn user_broker_error(
655    api: &Api<KafkaUser>,
656    name: &str,
657    obj: &KafkaUser,
658    err: AdminError,
659    op: &str,
660) -> Result<Action, ReconcileError> {
661    let detail = match err {
662        AdminError::Broker { code, name, .. } => format!("{op}: {name} ({code})"),
663        other => format!("{op}: {other}"),
664    };
665    let prior_qis = obj.status.as_ref().is_some_and(|s| s.quotas_in_sync);
666    let prior_tls = obj.status.as_ref().is_some_and(|s| s.tls);
667    let prior_external = obj.status.as_ref().is_some_and(|s| s.external);
668    let prior_tls_not_after = obj
669        .status
670        .as_ref()
671        .and_then(|s| s.tls_cert_not_after.clone());
672    let prior_tls_principal = obj.status.as_ref().and_then(|s| s.tls_principal.clone());
673    patch_status(
674        api,
675        name,
676        StatusPatch {
677            obj,
678            status: "False",
679            reason: "BrokerError",
680            message: &detail,
681            scram_sha512: false,
682            scram_sha256: false,
683            tls: prior_tls,
684            external: prior_external,
685            tls_cert_not_after: prior_tls_not_after,
686            tls_principal: prior_tls_principal,
687            advance_generation: false,
688            quotas_in_sync: prior_qis,
689        },
690    )
691    .await?;
692    Ok(Action::requeue(Duration::from_secs(15)))
693}
694
695/// Build an `AclEntryFilter` that matches exactly one `AclEntry`.
696/// Used to scope `DeleteAcls` to a single tuple at a time.
697pub(crate) fn entry_to_exact_filter(e: &AclEntry) -> AclEntryFilter {
698    AclEntryFilter {
699        resource_type: Some(e.resource_type),
700        resource_name: Some(e.resource_name.clone()),
701        pattern_type: Some(e.pattern_type),
702        principal: Some(e.principal.clone()),
703        host: Some(e.host.clone()),
704        operation: Some(e.operation),
705        permission_type: Some(e.permission_type),
706    }
707}
708
709/// Kafka principal for the user. SCRAM and `tls-external` users use
710/// bare `User:<name>`; TLS users use `User:CN=<name>` (matches the
711/// cert's Subject DN, which is what the broker's
712/// `extract_principal_from_cert` returns at runtime).
713#[allow(clippy::match_same_arms)]
714pub(crate) fn principal_for(name: &str, auth: &Authentication) -> String {
715    match auth {
716        Authentication::ScramSha512(_) | Authentication::ScramSha256(_) => format!("User:{name}"),
717        Authentication::Tls(_) => user_tls::tls_principal(name),
718        // `tls-external` credentials are managed out-of-band (OIDC for
719        // OAUTHBEARER, or an external CA whose certs carry the bare
720        // metadata.name as the principal). Same string shape as SCRAM
721        // but different rationale — kept as a distinct arm.
722        Authentication::TlsExternal => format!("User:{name}"),
723        // Delegation tokens carry the owner principal
724        // (`User:<metadata.name>`); ACLs continue to be authored
725        // against the owner, not the token-id.
726        Authentication::DelegationToken(_) => format!("User:{name}"),
727    }
728}
729
730/// Expand the CRD's `acls` list (one rule may carry many operations)
731/// into the per-tuple `AclEntry` set the broker stores. Empty when
732/// `authorization` is absent.
733pub(crate) fn expand_spec_acls(auth: Option<&Authorization>, principal: &str) -> Vec<AclEntry> {
734    let Some(Authorization::Simple(simple)) = auth else {
735        return Vec::new();
736    };
737    let mut out = Vec::with_capacity(simple.acls.len());
738    for rule in &simple.acls {
739        for op in &rule.operations {
740            out.push(AclEntry {
741                resource_type: resource_kind_to_admin(rule.resource.kind),
742                resource_name: rule.resource.name.clone(),
743                pattern_type: pattern_to_admin(rule.resource.pattern_type),
744                principal: principal.to_string(),
745                host: rule.host.clone(),
746                operation: op_to_admin(*op),
747                permission_type: permission_to_admin(rule.permission),
748            });
749        }
750    }
751    out
752}
753
754/// Pure: split `(desired, current)` into `(additions, deletions)`.
755/// `additions` are entries in `desired` but not in `current`; `deletions`
756/// are entries in `current` but not in `desired`.
757pub(crate) fn diff_acls(
758    current: &BTreeSet<AclEntry>,
759    desired: &BTreeSet<AclEntry>,
760) -> (Vec<AclEntry>, Vec<AclEntry>) {
761    let additions: Vec<AclEntry> = desired.difference(current).cloned().collect();
762    let deletions: Vec<AclEntry> = current.difference(desired).cloned().collect();
763    (additions, deletions)
764}
765
766fn validate_spec(spec: &crate::crd::KafkaUserSpec) -> Result<(), String> {
767    if let Some(Authorization::Simple(simple)) = &spec.authorization {
768        for (i, rule) in simple.acls.iter().enumerate() {
769            if rule.operations.is_empty() {
770                return Err(format!("acls[{i}].operations is empty"));
771            }
772            if rule.resource.name.is_empty() {
773                return Err(format!("acls[{i}].resource.name is empty"));
774            }
775        }
776    }
777    Ok(())
778}
779
780fn resource_kind_to_admin(k: AclResourceKind) -> ResourceType {
781    match k {
782        AclResourceKind::Topic => ResourceType::Topic,
783        AclResourceKind::Group => ResourceType::Group,
784        AclResourceKind::Cluster => ResourceType::Cluster,
785        AclResourceKind::TransactionalId => ResourceType::TransactionalId,
786    }
787}
788
789fn pattern_to_admin(p: AclPatternType) -> PatternType {
790    match p {
791        AclPatternType::Literal => PatternType::Literal,
792        AclPatternType::Prefixed => PatternType::Prefixed,
793    }
794}
795
796fn permission_to_admin(p: AclPermission) -> PermissionType {
797    match p {
798        AclPermission::Allow => PermissionType::Allow,
799        AclPermission::Deny => PermissionType::Deny,
800    }
801}
802
803fn op_to_admin(op: AclOp) -> AclOperation {
804    match op {
805        AclOp::All => AclOperation::All,
806        AclOp::Read => AclOperation::Read,
807        AclOp::Write => AclOperation::Write,
808        AclOp::Create => AclOperation::Create,
809        AclOp::Delete => AclOperation::Delete,
810        AclOp::Alter => AclOperation::Alter,
811        AclOp::Describe => AclOperation::Describe,
812        AclOp::ClusterAction => AclOperation::ClusterAction,
813        AclOp::DescribeConfigs => AclOperation::DescribeConfigs,
814        AclOp::AlterConfigs => AclOperation::AlterConfigs,
815        AclOp::IdempotentWrite => AclOperation::IdempotentWrite,
816    }
817}
818
819// --- Secret management ---------------------------------------------------
820
821/// Get-or-create the password Secret. If a Secret already exists with a
822/// `password` key, reuse it (the SCRAM credential is regenerated each
823/// reconcile from the same plaintext so the operator-managed password
824/// is stable). On first reconcile the operator allocates
825/// `password_len_bytes` of cryptographically-random data and stores its
826/// URL-safe base64 encoding.
827async fn ensure_password_secret(
828    api: &Api<Secret>,
829    obj: &KafkaUser,
830    password_len_bytes: u16,
831) -> Result<String, ReconcileError> {
832    let name = obj.name_any();
833    if let Some(existing) = api.get_opt(&name).await?
834        && let Some(p) = read_password(&existing)
835    {
836        return Ok(p);
837    }
838    let password = random_password(password_len_bytes);
839    let secret = render_password_secret(obj, &password)?;
840    let params = PatchParams {
841        field_manager: Some(FIELD_MANAGER.into()),
842        force: true,
843        ..Default::default()
844    };
845    api.patch(&name, &params, &Patch::Apply(&secret)).await?;
846    Ok(password)
847}
848
849fn read_password(secret: &Secret) -> Option<String> {
850    let data = secret.data.as_ref()?;
851    let bs = data.get("password")?;
852    std::str::from_utf8(&bs.0).ok().map(str::to_string)
853}
854
855fn random_password(len_bytes: u16) -> String {
856    let mut buf = vec![0u8; len_bytes as usize];
857    SystemRandom::new()
858        .fill(&mut buf)
859        .expect("system RNG must succeed");
860    base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(buf)
861}
862
863/// Render the user-credential Secret. Owner-ref'd to the `KafkaUser`
864/// so cluster delete cascades. Keys:
865///   - `password` (raw plaintext, used by `KafkaUser` admin RPCs and
866///     pasted into application configuration)
867///   - `sasl.jaas.config` (ready-to-paste JAAS line for JVM clients)
868fn render_password_secret(obj: &KafkaUser, password: &str) -> Result<Secret, ReconcileError> {
869    let name = obj.name_any();
870    let jaas = format!(
871        "org.apache.kafka.common.security.scram.ScramLoginModule required \
872         username=\"{name}\" password=\"{password}\";"
873    );
874    let mut labels: BTreeMap<String, String> = BTreeMap::new();
875    labels.insert("app.kubernetes.io/name".into(), "crabka-broker".into());
876    labels.insert(
877        "app.kubernetes.io/managed-by".into(),
878        "crabka-operator".into(),
879    );
880    if let Some(cluster) = obj
881        .meta()
882        .labels
883        .as_ref()
884        .and_then(|l| l.get("crabka.io/cluster"))
885    {
886        labels.insert("crabka.io/cluster".into(), cluster.clone());
887    }
888    labels.insert("crabka.io/user".into(), name.clone());
889
890    let mut data: BTreeMap<String, ByteString> = BTreeMap::new();
891    data.insert("password".into(), ByteString(password.as_bytes().to_vec()));
892    data.insert("sasl.jaas.config".into(), ByteString(jaas.into_bytes()));
893
894    Ok(Secret {
895        metadata: ObjectMeta {
896            name: Some(name),
897            namespace: obj.meta().namespace.clone(),
898            labels: Some(labels),
899            owner_references: Some(vec![user_owner_ref(obj)?]),
900            ..Default::default()
901        },
902        type_: Some("Opaque".into()),
903        data: Some(data),
904        ..Default::default()
905    })
906}
907
908fn user_owner_ref(obj: &KafkaUser) -> Result<OwnerReference, ReconcileError> {
909    let uid = obj
910        .meta()
911        .uid
912        .as_deref()
913        .ok_or(ReconcileError::MissingUid)?;
914    Ok(OwnerReference {
915        api_version: <KafkaUser as Resource>::api_version(&()).to_string(),
916        kind: <KafkaUser as Resource>::kind(&()).to_string(),
917        name: obj.name_any(),
918        uid: uid.to_string(),
919        controller: Some(true),
920        block_owner_deletion: Some(true),
921    })
922}
923
924// --- Finalizer + status helpers -----------------------------------------
925
926fn has_finalizer(obj: &KafkaUser) -> bool {
927    obj.meta()
928        .finalizers
929        .as_ref()
930        .is_some_and(|f| f.iter().any(|s| s == FINALIZER))
931}
932
933async fn add_finalizer(api: &Api<KafkaUser>, name: &str) -> Result<(), ReconcileError> {
934    let patch = json!({ "metadata": { "finalizers": [FINALIZER] } });
935    let params = PatchParams {
936        field_manager: Some(FIELD_MANAGER.into()),
937        ..Default::default()
938    };
939    api.patch(name, &params, &Patch::Merge(&patch)).await?;
940    Ok(())
941}
942
943async fn remove_finalizer(api: &Api<KafkaUser>, name: &str) -> Result<(), ReconcileError> {
944    let patch = json!({ "metadata": { "finalizers": [] } });
945    let params = PatchParams {
946        field_manager: Some(FIELD_MANAGER.into()),
947        ..Default::default()
948    };
949    api.patch(name, &params, &Patch::Merge(&patch)).await?;
950    Ok(())
951}
952
953// Five bools = "status flags" — each is an independent axis of the
954// reconcile outcome. A state machine here would just rename the
955// problem; keep the flat shape.
956#[allow(clippy::struct_excessive_bools)]
957struct StatusPatch<'a> {
958    obj: &'a KafkaUser,
959    status: &'a str,
960    reason: &'a str,
961    message: &'a str,
962    scram_sha512: bool,
963    /// `true` iff SCRAM-SHA-256 credentials are
964    /// provisioned. Mirrors `scram_sha512`.
965    scram_sha256: bool,
966    tls: bool,
967    /// True iff this is a credential-less (`tls-external`) user that
968    /// has reached Ready. Sticky in `patch_status`.
969    external: bool,
970    tls_cert_not_after: Option<String>,
971    tls_principal: Option<String>,
972    advance_generation: bool,
973    quotas_in_sync: bool,
974}
975
976async fn patch_status(
977    api: &Api<KafkaUser>,
978    name: &str,
979    p: StatusPatch<'_>,
980) -> Result<(), ReconcileError> {
981    let conditions = vec![condition("Ready", p.status, p.reason, p.message)];
982    let observed_generation = if p.advance_generation {
983        p.obj.meta().generation
984    } else {
985        p.obj.status.as_ref().and_then(|s| s.observed_generation)
986    };
987    // Once any credential (SCRAM password or TLS cert) is provisioned,
988    // the secret name == metadata.name.
989    let secret_name = if p.scram_sha512 || p.scram_sha256 || p.tls {
990        Some(name.to_string())
991    } else {
992        p.obj.status.as_ref().and_then(|s| s.secret.clone())
993    };
994    let body = json!({
995        "status": {
996            "conditions": conditions,
997            "observedGeneration": observed_generation,
998            "username": name,
999            "secret": secret_name,
1000            "scramSha512": p.scram_sha512 || p.obj.status.as_ref().is_some_and(|s| s.scram_sha512),
1001            "scramSha256": p.scram_sha256 || p.obj.status.as_ref().is_some_and(|s| s.scram_sha256),
1002            "tls": p.tls || p.obj.status.as_ref().is_some_and(|s| s.tls),
1003            "external": p.external || p.obj.status.as_ref().is_some_and(|s| s.external),
1004            "tlsCertNotAfter": p.tls_cert_not_after,
1005            "tlsPrincipal": p.tls_principal,
1006            "quotasInSync": p.quotas_in_sync,
1007        }
1008    });
1009    let params = PatchParams {
1010        field_manager: Some(FIELD_MANAGER.into()),
1011        ..Default::default()
1012    };
1013    api.patch_status(name, &params, &Patch::Merge(&body))
1014        .await?;
1015    Ok(())
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020    use super::*;
1021    use crate::crd::{
1022        AclOp, AclPatternType, AclPermission, AclResource, AclResourceKind, AclRule,
1023        KafkaUserSimpleAuthorization as SimpleAuthorization,
1024    };
1025    use assert2::assert;
1026
1027    fn rule(kind: AclResourceKind, name: &str, ops: &[AclOp]) -> AclRule {
1028        AclRule {
1029            resource: AclResource {
1030                kind,
1031                name: name.into(),
1032                pattern_type: AclPatternType::Literal,
1033            },
1034            operations: ops.to_vec(),
1035            host: "*".into(),
1036            permission: AclPermission::Allow,
1037        }
1038    }
1039
1040    #[test]
1041    fn principal_uses_user_prefix_for_scram() {
1042        let scram = Authentication::ScramSha512(crate::crd::ScramSha512Auth::default());
1043        assert!(principal_for("alice", &scram) == "User:alice");
1044    }
1045
1046    #[test]
1047    fn principal_for_dispatches_on_auth_type() {
1048        let scram = Authentication::ScramSha512(crate::crd::ScramSha512Auth::default());
1049        let tls = Authentication::Tls(crate::crd::user::TlsAuth::default());
1050        assert!(principal_for("alice", &scram) == "User:alice");
1051        assert!(principal_for("alice", &tls) == "User:CN=alice");
1052    }
1053
1054    #[test]
1055    fn expand_spec_acls_with_no_authorization_is_empty() {
1056        assert!(expand_spec_acls(None, "User:alice").is_empty());
1057    }
1058
1059    #[test]
1060    fn expand_spec_acls_one_rule_one_op() {
1061        let auth = Authorization::Simple(SimpleAuthorization {
1062            acls: vec![rule(AclResourceKind::Topic, "orders", &[AclOp::Read])],
1063        });
1064        let entries = expand_spec_acls(Some(&auth), "User:alice");
1065        assert!(entries.len() == 1);
1066        let e = &entries[0];
1067        assert!(e.resource_type == ResourceType::Topic);
1068        assert!(e.resource_name == "orders");
1069        assert!(e.principal == "User:alice");
1070        assert!(e.operation == AclOperation::Read);
1071        assert!(e.permission_type == PermissionType::Allow);
1072    }
1073
1074    #[test]
1075    fn expand_spec_acls_one_rule_many_ops_fans_out() {
1076        let auth = Authorization::Simple(SimpleAuthorization {
1077            acls: vec![rule(
1078                AclResourceKind::Topic,
1079                "orders",
1080                &[AclOp::Read, AclOp::Describe, AclOp::Write],
1081            )],
1082        });
1083        let entries = expand_spec_acls(Some(&auth), "User:alice");
1084        assert!(entries.len() == 3);
1085        let ops: Vec<_> = entries.iter().map(|e| e.operation).collect();
1086        assert!(ops.contains(&AclOperation::Read));
1087        assert!(ops.contains(&AclOperation::Describe));
1088        assert!(ops.contains(&AclOperation::Write));
1089    }
1090
1091    #[test]
1092    fn diff_acls_additions_and_deletions() {
1093        let mut current = BTreeSet::new();
1094        let keep = AclEntry {
1095            resource_type: ResourceType::Topic,
1096            resource_name: "keep".into(),
1097            pattern_type: PatternType::Literal,
1098            principal: "User:alice".into(),
1099            host: "*".into(),
1100            operation: AclOperation::Read,
1101            permission_type: PermissionType::Allow,
1102        };
1103        let drop = AclEntry {
1104            resource_type: ResourceType::Topic,
1105            resource_name: "drop".into(),
1106            pattern_type: PatternType::Literal,
1107            principal: "User:alice".into(),
1108            host: "*".into(),
1109            operation: AclOperation::Read,
1110            permission_type: PermissionType::Allow,
1111        };
1112        current.insert(keep.clone());
1113        current.insert(drop.clone());
1114
1115        let mut desired = BTreeSet::new();
1116        let add = AclEntry {
1117            resource_type: ResourceType::Group,
1118            resource_name: "g".into(),
1119            pattern_type: PatternType::Literal,
1120            principal: "User:alice".into(),
1121            host: "*".into(),
1122            operation: AclOperation::Read,
1123            permission_type: PermissionType::Allow,
1124        };
1125        desired.insert(keep.clone());
1126        desired.insert(add.clone());
1127
1128        let (adds, dels) = diff_acls(&current, &desired);
1129        assert!(adds == vec![add]);
1130        assert!(dels == vec![drop]);
1131    }
1132
1133    #[test]
1134    fn diff_acls_noop_when_matching() {
1135        let mut s = BTreeSet::new();
1136        let e = AclEntry {
1137            resource_type: ResourceType::Topic,
1138            resource_name: "x".into(),
1139            pattern_type: PatternType::Literal,
1140            principal: "User:alice".into(),
1141            host: "*".into(),
1142            operation: AclOperation::Read,
1143            permission_type: PermissionType::Allow,
1144        };
1145        s.insert(e);
1146        let (adds, dels) = diff_acls(&s, &s);
1147        assert!(adds.is_empty());
1148        assert!(dels.is_empty());
1149    }
1150
1151    #[test]
1152    fn validate_spec_rejects_empty_operations() {
1153        let spec = crate::crd::KafkaUserSpec {
1154            authentication: Authentication::ScramSha512(crate::crd::ScramSha512Auth::default()),
1155            authorization: Some(Authorization::Simple(SimpleAuthorization {
1156                acls: vec![AclRule {
1157                    resource: AclResource {
1158                        kind: AclResourceKind::Topic,
1159                        name: "x".into(),
1160                        pattern_type: AclPatternType::Literal,
1161                    },
1162                    operations: vec![],
1163                    host: "*".into(),
1164                    permission: AclPermission::Allow,
1165                }],
1166            })),
1167            quotas: None,
1168        };
1169        let err = validate_spec(&spec).unwrap_err();
1170        assert!(err.contains("operations is empty"), "got: {err}");
1171    }
1172
1173    #[test]
1174    fn validate_spec_rejects_empty_resource_name() {
1175        let spec = crate::crd::KafkaUserSpec {
1176            authentication: Authentication::ScramSha512(crate::crd::ScramSha512Auth::default()),
1177            authorization: Some(Authorization::Simple(SimpleAuthorization {
1178                acls: vec![rule(AclResourceKind::Topic, "", &[AclOp::Read])],
1179            })),
1180            quotas: None,
1181        };
1182        let err = validate_spec(&spec).unwrap_err();
1183        assert!(err.contains("resource.name is empty"), "got: {err}");
1184    }
1185
1186    #[test]
1187    fn entry_to_exact_filter_populates_every_axis() {
1188        let e = AclEntry {
1189            resource_type: ResourceType::Topic,
1190            resource_name: "orders".into(),
1191            pattern_type: PatternType::Literal,
1192            principal: "User:alice".into(),
1193            host: "*".into(),
1194            operation: AclOperation::Read,
1195            permission_type: PermissionType::Allow,
1196        };
1197        let f = entry_to_exact_filter(&e);
1198        assert!(f.resource_type == Some(ResourceType::Topic));
1199        assert!(f.resource_name.as_deref() == Some("orders"));
1200        assert!(f.pattern_type == Some(PatternType::Literal));
1201        assert!(f.principal.as_deref() == Some("User:alice"));
1202        assert!(f.host.as_deref() == Some("*"));
1203        assert!(f.operation == Some(AclOperation::Read));
1204        assert!(f.permission_type == Some(PermissionType::Allow));
1205    }
1206
1207    #[test]
1208    fn random_password_is_base64_and_uniform_length() {
1209        let p1 = random_password(32);
1210        let p2 = random_password(32);
1211        assert!(p1.len() == p2.len()); // base64 of 32 random bytes = 43 chars
1212        assert!(p1 != p2);
1213        assert!(
1214            p1.chars()
1215                .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_'),
1216            "got: {p1}"
1217        );
1218    }
1219
1220    #[test]
1221    fn principal_for_tls_external_uses_bare_name() {
1222        // `tls-external` users share SCRAM's bare-name principal shape:
1223        // credentials are managed out-of-band but the operator still
1224        // pins ACLs / quotas under `User:<metadata.name>`.
1225        assert!(principal_for("alice", &Authentication::TlsExternal) == "User:alice");
1226    }
1227
1228    #[test]
1229    fn validate_spec_accepts_tls_external_with_no_authorization_and_no_quotas() {
1230        let spec = crate::crd::KafkaUserSpec {
1231            authentication: Authentication::TlsExternal,
1232            authorization: None,
1233            quotas: None,
1234        };
1235        assert!(validate_spec(&spec).is_ok());
1236    }
1237
1238    #[test]
1239    fn validate_spec_accepts_tls_external_with_acls_and_quotas() {
1240        let spec = crate::crd::KafkaUserSpec {
1241            authentication: Authentication::TlsExternal,
1242            authorization: Some(Authorization::Simple(SimpleAuthorization {
1243                acls: vec![rule(AclResourceKind::Topic, "orders", &[AclOp::Read])],
1244            })),
1245            quotas: Some(crate::crd::KafkaUserQuotas {
1246                producer_byte_rate: Some(1_048_576),
1247                ..Default::default()
1248            }),
1249        };
1250        assert!(validate_spec(&spec).is_ok());
1251    }
1252}