1use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10use std::time::Duration;
11
12use base64::Engine as _;
13use crabka_client_admin::{
14 AclEntry, AclEntryFilter, AclOperation, AdminError, DEFAULT_SCRAM_ITERATIONS, PatternType,
15 PermissionType, ResourceType, ScramDeletion, ScramUpsertion,
16};
17use futures::StreamExt as _;
18use k8s_openapi::ByteString;
19use k8s_openapi::api::core::v1::Secret;
20use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
21use kube::api::{Api, Patch, PatchParams};
22use kube::runtime::controller::{Action, Controller};
23use kube::runtime::reflector::ObjectRef;
24use kube::runtime::watcher;
25use kube::{Resource, ResourceExt as _};
26use ring::rand::{SecureRandom, SystemRandom};
27use serde_json::json;
28
29use crate::context::Context;
30use crate::controller::common::{FIELD_MANAGER, ReconcileError, condition};
31use crate::controller::topic::internal_listener_bootstrap;
32use crate::controller::user_delegation_token::{self, KubeKafkaUserStatusWriter, KubeSecretWriter};
33use crate::controller::user_tls;
34use crate::crd::{
35 AclOp, AclPatternType, AclPermission, AclResourceKind, Authentication, Kafka, KafkaUser,
36 KafkaUserAuthorization as Authorization,
37};
38
39const FINALIZER: &str = "crabka.io/user-finalizer";
40
41pub async fn run(ctx: Context) -> anyhow::Result<()> {
43 let user_api: Api<KafkaUser> = Api::all(ctx.client.clone());
44 let kafka_api: Api<Kafka> = Api::all(ctx.client.clone());
45 Controller::new(user_api, watcher::Config::default())
46 .watches(kafka_api, watcher::Config::default(), |_kafka| {
47 Vec::<ObjectRef<KafkaUser>>::new().into_iter()
50 })
51 .run(reconcile, error_policy, Arc::new(ctx))
52 .for_each(|res| async move {
53 match res {
54 Ok((obj, _)) => tracing::debug!(?obj, "user reconciled"),
55 Err(e) => tracing::warn!(error = %e, "user reconcile error"),
56 }
57 })
58 .await;
59 Ok(())
60}
61
62pub fn error_policy(_obj: Arc<KafkaUser>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
63 tracing::warn!(error = %err, "user reconcile error, requeueing");
64 Action::requeue(Duration::from_secs(15))
65}
66
67#[allow(clippy::too_many_lines)] pub async fn reconcile(obj: Arc<KafkaUser>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
69 let ns = obj.namespace().unwrap_or_else(|| "default".into());
70 let name = obj.name_any();
71 let user_api: Api<KafkaUser> = Api::namespaced(ctx.client.clone(), &ns);
72 let prior_quotas_in_sync = obj.status.as_ref().is_some_and(|s| s.quotas_in_sync);
76 let prior_tls = obj.status.as_ref().is_some_and(|s| s.tls);
77 let prior_external = obj.status.as_ref().is_some_and(|s| s.external);
78 let prior_tls_not_after = obj
79 .status
80 .as_ref()
81 .and_then(|s| s.tls_cert_not_after.clone());
82 let prior_tls_principal = obj.status.as_ref().and_then(|s| s.tls_principal.clone());
83
84 let cluster = obj
86 .meta()
87 .labels
88 .as_ref()
89 .and_then(|l| l.get("crabka.io/cluster").cloned());
90 let Some(cluster) = cluster else {
91 patch_status(
92 &user_api,
93 &name,
94 StatusPatch {
95 obj: &obj,
96 status: "False",
97 reason: "MissingClusterLabel",
98 message: "metadata.labels[\"crabka.io/cluster\"] is required",
99 scram_sha512: false,
100 scram_sha256: false,
101 tls: prior_tls,
102 external: prior_external,
103 tls_cert_not_after: prior_tls_not_after.clone(),
104 tls_principal: prior_tls_principal.clone(),
105 advance_generation: false,
106 quotas_in_sync: prior_quotas_in_sync,
107 },
108 )
109 .await?;
110 return Ok(Action::requeue(Duration::from_mins(1)));
111 };
112
113 if let Err(msg) = validate_spec(&obj.spec) {
115 patch_status(
116 &user_api,
117 &name,
118 StatusPatch {
119 obj: &obj,
120 status: "False",
121 reason: "InvalidSpec",
122 message: &msg,
123 scram_sha512: false,
124 scram_sha256: false,
125 tls: prior_tls,
126 external: prior_external,
127 tls_cert_not_after: prior_tls_not_after.clone(),
128 tls_principal: prior_tls_principal.clone(),
129 advance_generation: false,
130 quotas_in_sync: prior_quotas_in_sync,
131 },
132 )
133 .await?;
134 return Ok(Action::requeue(Duration::from_mins(5)));
135 }
136
137 let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
139 let kafka = kafka_api.get_opt(&cluster).await?;
140 let bootstrap = kafka.as_ref().and_then(internal_listener_bootstrap);
141 let Some(bootstrap) = bootstrap else {
142 patch_status(
143 &user_api,
144 &name,
145 StatusPatch {
146 obj: &obj,
147 status: "False",
148 reason: "ClusterNotReady",
149 message: &format!("Kafka/{cluster} not Ready or no internal listener"),
150 scram_sha512: false,
151 scram_sha256: false,
152 tls: prior_tls,
153 external: prior_external,
154 tls_cert_not_after: prior_tls_not_after.clone(),
155 tls_principal: prior_tls_principal.clone(),
156 advance_generation: false,
157 quotas_in_sync: prior_quotas_in_sync,
158 },
159 )
160 .await?;
161 return Ok(Action::requeue(Duration::from_secs(30)));
162 };
163
164 let principal = principal_for(&name, &obj.spec.authentication);
165 let quota_username: String = principal
169 .strip_prefix("User:")
170 .unwrap_or(&principal)
171 .to_string();
172
173 if obj.meta().deletion_timestamp.is_some() {
175 if let Ok(client) = ctx.admin_client_for(&cluster, &bootstrap).await {
177 let mut admin = client.lock().await;
178 let scram_finalizer = match &obj.spec.authentication {
183 Authentication::ScramSha512(_) => Some(true),
184 Authentication::ScramSha256(_) => Some(false),
185 _ => None,
186 };
187 if let Some(is_sha512) = scram_finalizer {
188 let deletion = ScramDeletion {
189 username: name.clone(),
190 };
191 let result = if is_sha512 {
192 admin
193 .alter_user_scram_credentials_sha512(&[], &[deletion])
194 .await
195 } else {
196 admin
197 .alter_user_scram_credentials_sha256(&[], &[deletion])
198 .await
199 };
200 if let Err(e) = result {
201 tracing::warn!(error = %e, %name, "scram delete during finalizer failed");
202 }
203 }
204 if matches!(&obj.spec.authentication, Authentication::DelegationToken(_)) {
217 drop(admin);
218 if let Err(e) = user_delegation_token::expire_owned_tokens(&name, &client).await {
219 tracing::warn!(
220 error = %e,
221 %name,
222 "delegation-token finalizer expire failed",
223 );
224 }
225 admin = client.lock().await;
227 }
228 let filter = AclEntryFilter {
229 principal: Some(principal.clone()),
230 ..Default::default()
231 };
232 match admin.delete_acls(&[filter]).await {
233 Ok(_) => {}
234 Err(e) => tracing::warn!(error = %e, %name, "acl delete during finalizer failed"),
235 }
236 match admin.describe_user_quotas("a_username).await {
241 Ok(cur) if !cur.is_empty() => {
242 let ops: Vec<_> = cur
243 .into_keys()
244 .map(|key| crabka_client_admin::QuotaOp::Remove { key })
245 .collect();
246 if let Err(e) = admin.alter_user_quotas("a_username, &ops, false).await {
247 tracing::warn!(error = %e, %name, "quota delete during finalizer failed");
248 }
249 }
250 Ok(_) => {}
251 Err(e) => {
252 tracing::warn!(error = %e, %name, "quota describe during finalizer failed");
253 }
254 }
255 }
256 remove_finalizer(&user_api, &name).await?;
257 return Ok(Action::await_change());
258 }
259
260 if !has_finalizer(&obj) {
262 add_finalizer(&user_api, &name).await?;
263 return Ok(Action::requeue(Duration::ZERO));
264 }
265
266 let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &ns);
274 let tls_not_after: Option<String> = match &obj.spec.authentication {
275 Authentication::ScramSha512(_) | Authentication::ScramSha256(_) => {
276 let (password_len, iterations, is_sha512) = match &obj.spec.authentication {
281 Authentication::ScramSha512(s) => (
282 s.password_length.unwrap_or(32),
283 s.iterations.unwrap_or(DEFAULT_SCRAM_ITERATIONS),
284 true,
285 ),
286 Authentication::ScramSha256(s) => (
287 s.password_length.unwrap_or(32),
288 s.iterations.unwrap_or(DEFAULT_SCRAM_ITERATIONS),
289 false,
290 ),
291 _ => unreachable!(),
292 };
293 let password = match ensure_password_secret(&secret_api, &obj, password_len).await {
294 Ok(p) => p,
295 Err(e) => {
296 tracing::warn!(error = %e, %name, "ensure_password_secret failed");
297 return Ok(Action::requeue(Duration::from_secs(15)));
298 }
299 };
300
301 let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
306 Ok(h) => h,
307 Err(e) => {
308 tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
309 return Ok(Action::requeue(Duration::from_secs(15)));
310 }
311 };
312 let mut admin = admin_handle.lock().await;
313 let upsertions = [ScramUpsertion {
314 username: name.clone(),
315 password,
316 iterations,
317 }];
318 let outcomes = if is_sha512 {
319 admin
320 .alter_user_scram_credentials_sha512(&upsertions, &[])
321 .await
322 } else {
323 admin
324 .alter_user_scram_credentials_sha256(&upsertions, &[])
325 .await
326 };
327 let outcomes = match outcomes {
328 Ok(v) => v,
329 Err(e) => {
330 tracing::warn!(error = %e, "AlterUserScramCredentials transport failure");
331 let is_transport = matches!(e, AdminError::Transport(_));
332 drop(admin);
333 if is_transport {
334 ctx.drop_admin_client(&cluster).await;
335 }
336 return Ok(Action::requeue(Duration::from_secs(15)));
337 }
338 };
339 if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
340 drop(admin);
341 patch_status(
342 &user_api,
343 &name,
344 StatusPatch {
345 obj: &obj,
346 status: "False",
347 reason: "BrokerError",
348 message: &format!("AlterUserScramCredentials: {} ({})", err.name, err.code),
349 scram_sha512: false,
350 scram_sha256: false,
351 tls: prior_tls,
352 external: prior_external,
353 tls_cert_not_after: prior_tls_not_after.clone(),
354 tls_principal: prior_tls_principal.clone(),
355 advance_generation: false,
356 quotas_in_sync: prior_quotas_in_sync,
357 },
358 )
359 .await?;
360 return Ok(Action::requeue(Duration::from_secs(15)));
361 }
362 None
363 }
364 Authentication::Tls(tls_auth) => {
365 let kafka_ref = kafka
366 .as_ref()
367 .expect("bootstrap presence implies Kafka resource is Some");
368 let ca_outcome = match crate::controller::cluster_ca::reconcile_ca(
369 &secret_api,
370 kafka_ref,
371 crate::controller::cluster_ca::WhichCa::Clients,
372 false,
373 false,
374 true,
375 time::OffsetDateTime::now_utc(),
376 )
377 .await
378 {
379 Ok(o) => o,
380 Err(e) => {
381 tracing::warn!(error = %e, %cluster, "clients CA reconcile failed");
382 return Ok(Action::requeue(Duration::from_secs(15)));
383 }
384 };
385 let ca = ca_outcome.signing_material;
388 let cert_status =
389 match user_tls::ensure_user_cert_secret(&secret_api, &obj, &ca, tls_auth).await {
390 Ok(s) => s,
391 Err(e) => {
392 tracing::warn!(error = %e, %name, "ensure_user_cert_secret failed");
393 return Ok(Action::requeue(Duration::from_secs(15)));
394 }
395 };
396 if cert_status.issued_new {
397 tracing::info!(
398 %name,
399 not_after = %cert_status.not_after,
400 "issued new client cert",
401 );
402 }
403 Some(cert_status.not_after)
404 }
405 Authentication::TlsExternal => None,
409 Authentication::DelegationToken(dt) => {
427 let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
428 Ok(h) => h,
429 Err(e) => {
430 tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
431 return Ok(Action::requeue(Duration::from_secs(15)));
432 }
433 };
434 let secret_writer = KubeSecretWriter {
435 api: secret_api.clone(),
436 };
437 let user_writer = KubeKafkaUserStatusWriter {
438 api: user_api.clone(),
439 };
440 let now_ms = chrono::Utc::now().timestamp_millis();
441 let out = user_delegation_token::reconcile(
442 &obj,
443 dt,
444 &admin_handle,
445 &secret_writer,
446 &user_writer,
447 now_ms,
448 )
449 .await?;
450 return Ok(out.action);
457 }
458 };
459
460 let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
463 Ok(h) => h,
464 Err(e) => {
465 tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
466 return Ok(Action::requeue(Duration::from_secs(15)));
467 }
468 };
469 let mut admin = admin_handle.lock().await;
470
471 let desired: BTreeSet<AclEntry> = expand_spec_acls(obj.spec.authorization.as_ref(), &principal)
473 .into_iter()
474 .collect();
475 let filter = AclEntryFilter {
476 principal: Some(principal.clone()),
477 ..Default::default()
478 };
479 let current_vec = match admin.describe_acls(&filter).await {
480 Ok(v) => v,
481 Err(e) => {
482 tracing::warn!(error = %e, "DescribeAcls failure");
483 let is_transport = matches!(e, AdminError::Transport(_));
484 drop(admin);
485 if is_transport {
486 ctx.drop_admin_client(&cluster).await;
487 }
488 return Ok(Action::requeue(Duration::from_secs(15)));
489 }
490 };
491 let current: BTreeSet<AclEntry> = current_vec.into_iter().collect();
492 let (additions, deletions) = diff_acls(¤t, &desired);
493
494 if !additions.is_empty()
495 && let Err(e) = apply_create_acls(&mut admin, &additions).await
496 {
497 let is_transport = matches!(e, AdminError::Transport(_));
498 drop(admin);
499 if is_transport {
500 ctx.drop_admin_client(&cluster).await;
501 }
502 return user_broker_error(&user_api, &name, &obj, e, "CreateAcls").await;
503 }
504 if !deletions.is_empty()
505 && let Err(e) = apply_delete_acls(&mut admin, &deletions).await
506 {
507 let is_transport = matches!(e, AdminError::Transport(_));
508 drop(admin);
509 if is_transport {
510 ctx.drop_admin_client(&cluster).await;
511 }
512 return user_broker_error(&user_api, &name, &obj, e, "DeleteAcls").await;
513 }
514
515 let quotas_in_sync = if let Some(spec_quotas) = obj.spec.quotas.as_ref() {
522 let desired = spec_quotas.to_quota_map();
523 let current = match admin.describe_user_quotas("a_username).await {
524 Ok(c) => c,
525 Err(e) => {
526 tracing::warn!(error = %e, "DescribeClientQuotas failure");
527 let is_transport = matches!(e, AdminError::Transport(_));
528 drop(admin);
529 if is_transport {
530 ctx.drop_admin_client(&cluster).await;
531 }
532 return Ok(Action::requeue(Duration::from_secs(15)));
533 }
534 };
535 let ops = crabka_client_admin::diff_user_quotas(¤t, &desired);
536 if !ops.is_empty()
537 && let Err(e) = apply_alter_user_quotas(&mut admin, "a_username, &ops).await
538 {
539 let is_transport = matches!(e, AdminError::Transport(_));
540 drop(admin);
541 if is_transport {
542 ctx.drop_admin_client(&cluster).await;
543 }
544 return user_broker_error(&user_api, &name, &obj, e, "AlterClientQuotas").await;
545 }
546 true
547 } else {
548 false
549 };
550
551 let is_scram_sha512 = matches!(&obj.spec.authentication, Authentication::ScramSha512(_));
552 let is_scram_sha256 = matches!(&obj.spec.authentication, Authentication::ScramSha256(_));
553 let is_tls = matches!(&obj.spec.authentication, Authentication::Tls(_));
554 let is_external = matches!(&obj.spec.authentication, Authentication::TlsExternal);
555 patch_status(
556 &user_api,
557 &name,
558 StatusPatch {
559 obj: &obj,
560 status: "True",
561 reason: "Ready",
562 message: "user in sync",
563 scram_sha512: is_scram_sha512,
564 scram_sha256: is_scram_sha256,
565 tls: is_tls && tls_not_after.is_some(),
566 external: is_external,
567 tls_cert_not_after: tls_not_after.clone(),
568 tls_principal: if is_tls || is_external {
573 Some(principal.clone())
574 } else {
575 prior_tls_principal.clone()
576 },
577 advance_generation: true,
578 quotas_in_sync,
579 },
580 )
581 .await?;
582 #[allow(clippy::match_same_arms)]
594 let requeue = match &obj.spec.authentication {
595 Authentication::ScramSha512(_) | Authentication::ScramSha256(_) => Duration::from_mins(1),
596 Authentication::Tls(_) => Duration::from_hours(6),
597 Authentication::TlsExternal => Duration::from_mins(1),
598 Authentication::DelegationToken(_) => unreachable!(
599 "delegation-token arm returns early after user_delegation_token::reconcile",
600 ),
601 };
602 Ok(Action::requeue(requeue))
603}
604
605async fn apply_alter_user_quotas(
606 admin: &mut tokio::sync::MutexGuard<'_, dyn crabka_client_admin::AdminClientLike + Send>,
607 username: &str,
608 ops: &[crabka_client_admin::QuotaOp],
609) -> Result<(), AdminError> {
610 if let Some(err) = admin.alter_user_quotas(username, ops, false).await? {
611 return Err(AdminError::Broker {
612 api: "AlterClientQuotas",
613 code: err.code,
614 name: err.name,
615 message: err.message,
616 });
617 }
618 Ok(())
619}
620
621async fn apply_create_acls(
622 admin: &mut tokio::sync::MutexGuard<'_, dyn crabka_client_admin::AdminClientLike + Send>,
623 additions: &[AclEntry],
624) -> Result<(), AdminError> {
625 let outcomes = admin.create_acls(additions).await?;
626 if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
627 return Err(AdminError::Broker {
628 api: "CreateAcls",
629 code: err.code,
630 name: err.name,
631 message: err.message,
632 });
633 }
634 Ok(())
635}
636
637async fn apply_delete_acls(
638 admin: &mut tokio::sync::MutexGuard<'_, dyn crabka_client_admin::AdminClientLike + Send>,
639 deletions: &[AclEntry],
640) -> Result<(), AdminError> {
641 let filters: Vec<AclEntryFilter> = deletions.iter().map(entry_to_exact_filter).collect();
642 let outcomes = admin.delete_acls(&filters).await?;
643 if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
644 return Err(AdminError::Broker {
645 api: "DeleteAcls",
646 code: err.code,
647 name: err.name,
648 message: err.message,
649 });
650 }
651 Ok(())
652}
653
654async fn user_broker_error(
655 api: &Api<KafkaUser>,
656 name: &str,
657 obj: &KafkaUser,
658 err: AdminError,
659 op: &str,
660) -> Result<Action, ReconcileError> {
661 let detail = match err {
662 AdminError::Broker { code, name, .. } => format!("{op}: {name} ({code})"),
663 other => format!("{op}: {other}"),
664 };
665 let prior_qis = obj.status.as_ref().is_some_and(|s| s.quotas_in_sync);
666 let prior_tls = obj.status.as_ref().is_some_and(|s| s.tls);
667 let prior_external = obj.status.as_ref().is_some_and(|s| s.external);
668 let prior_tls_not_after = obj
669 .status
670 .as_ref()
671 .and_then(|s| s.tls_cert_not_after.clone());
672 let prior_tls_principal = obj.status.as_ref().and_then(|s| s.tls_principal.clone());
673 patch_status(
674 api,
675 name,
676 StatusPatch {
677 obj,
678 status: "False",
679 reason: "BrokerError",
680 message: &detail,
681 scram_sha512: false,
682 scram_sha256: false,
683 tls: prior_tls,
684 external: prior_external,
685 tls_cert_not_after: prior_tls_not_after,
686 tls_principal: prior_tls_principal,
687 advance_generation: false,
688 quotas_in_sync: prior_qis,
689 },
690 )
691 .await?;
692 Ok(Action::requeue(Duration::from_secs(15)))
693}
694
695pub(crate) fn entry_to_exact_filter(e: &AclEntry) -> AclEntryFilter {
698 AclEntryFilter {
699 resource_type: Some(e.resource_type),
700 resource_name: Some(e.resource_name.clone()),
701 pattern_type: Some(e.pattern_type),
702 principal: Some(e.principal.clone()),
703 host: Some(e.host.clone()),
704 operation: Some(e.operation),
705 permission_type: Some(e.permission_type),
706 }
707}
708
709#[allow(clippy::match_same_arms)]
714pub(crate) fn principal_for(name: &str, auth: &Authentication) -> String {
715 match auth {
716 Authentication::ScramSha512(_) | Authentication::ScramSha256(_) => format!("User:{name}"),
717 Authentication::Tls(_) => user_tls::tls_principal(name),
718 Authentication::TlsExternal => format!("User:{name}"),
723 Authentication::DelegationToken(_) => format!("User:{name}"),
727 }
728}
729
730pub(crate) fn expand_spec_acls(auth: Option<&Authorization>, principal: &str) -> Vec<AclEntry> {
734 let Some(Authorization::Simple(simple)) = auth else {
735 return Vec::new();
736 };
737 let mut out = Vec::with_capacity(simple.acls.len());
738 for rule in &simple.acls {
739 for op in &rule.operations {
740 out.push(AclEntry {
741 resource_type: resource_kind_to_admin(rule.resource.kind),
742 resource_name: rule.resource.name.clone(),
743 pattern_type: pattern_to_admin(rule.resource.pattern_type),
744 principal: principal.to_string(),
745 host: rule.host.clone(),
746 operation: op_to_admin(*op),
747 permission_type: permission_to_admin(rule.permission),
748 });
749 }
750 }
751 out
752}
753
754pub(crate) fn diff_acls(
758 current: &BTreeSet<AclEntry>,
759 desired: &BTreeSet<AclEntry>,
760) -> (Vec<AclEntry>, Vec<AclEntry>) {
761 let additions: Vec<AclEntry> = desired.difference(current).cloned().collect();
762 let deletions: Vec<AclEntry> = current.difference(desired).cloned().collect();
763 (additions, deletions)
764}
765
766fn validate_spec(spec: &crate::crd::KafkaUserSpec) -> Result<(), String> {
767 if let Some(Authorization::Simple(simple)) = &spec.authorization {
768 for (i, rule) in simple.acls.iter().enumerate() {
769 if rule.operations.is_empty() {
770 return Err(format!("acls[{i}].operations is empty"));
771 }
772 if rule.resource.name.is_empty() {
773 return Err(format!("acls[{i}].resource.name is empty"));
774 }
775 }
776 }
777 Ok(())
778}
779
780fn resource_kind_to_admin(k: AclResourceKind) -> ResourceType {
781 match k {
782 AclResourceKind::Topic => ResourceType::Topic,
783 AclResourceKind::Group => ResourceType::Group,
784 AclResourceKind::Cluster => ResourceType::Cluster,
785 AclResourceKind::TransactionalId => ResourceType::TransactionalId,
786 }
787}
788
789fn pattern_to_admin(p: AclPatternType) -> PatternType {
790 match p {
791 AclPatternType::Literal => PatternType::Literal,
792 AclPatternType::Prefixed => PatternType::Prefixed,
793 }
794}
795
796fn permission_to_admin(p: AclPermission) -> PermissionType {
797 match p {
798 AclPermission::Allow => PermissionType::Allow,
799 AclPermission::Deny => PermissionType::Deny,
800 }
801}
802
803fn op_to_admin(op: AclOp) -> AclOperation {
804 match op {
805 AclOp::All => AclOperation::All,
806 AclOp::Read => AclOperation::Read,
807 AclOp::Write => AclOperation::Write,
808 AclOp::Create => AclOperation::Create,
809 AclOp::Delete => AclOperation::Delete,
810 AclOp::Alter => AclOperation::Alter,
811 AclOp::Describe => AclOperation::Describe,
812 AclOp::ClusterAction => AclOperation::ClusterAction,
813 AclOp::DescribeConfigs => AclOperation::DescribeConfigs,
814 AclOp::AlterConfigs => AclOperation::AlterConfigs,
815 AclOp::IdempotentWrite => AclOperation::IdempotentWrite,
816 }
817}
818
819async fn ensure_password_secret(
828 api: &Api<Secret>,
829 obj: &KafkaUser,
830 password_len_bytes: u16,
831) -> Result<String, ReconcileError> {
832 let name = obj.name_any();
833 if let Some(existing) = api.get_opt(&name).await?
834 && let Some(p) = read_password(&existing)
835 {
836 return Ok(p);
837 }
838 let password = random_password(password_len_bytes);
839 let secret = render_password_secret(obj, &password)?;
840 let params = PatchParams {
841 field_manager: Some(FIELD_MANAGER.into()),
842 force: true,
843 ..Default::default()
844 };
845 api.patch(&name, ¶ms, &Patch::Apply(&secret)).await?;
846 Ok(password)
847}
848
849fn read_password(secret: &Secret) -> Option<String> {
850 let data = secret.data.as_ref()?;
851 let bs = data.get("password")?;
852 std::str::from_utf8(&bs.0).ok().map(str::to_string)
853}
854
855fn random_password(len_bytes: u16) -> String {
856 let mut buf = vec![0u8; len_bytes as usize];
857 SystemRandom::new()
858 .fill(&mut buf)
859 .expect("system RNG must succeed");
860 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(buf)
861}
862
863fn render_password_secret(obj: &KafkaUser, password: &str) -> Result<Secret, ReconcileError> {
869 let name = obj.name_any();
870 let jaas = format!(
871 "org.apache.kafka.common.security.scram.ScramLoginModule required \
872 username=\"{name}\" password=\"{password}\";"
873 );
874 let mut labels: BTreeMap<String, String> = BTreeMap::new();
875 labels.insert("app.kubernetes.io/name".into(), "crabka-broker".into());
876 labels.insert(
877 "app.kubernetes.io/managed-by".into(),
878 "crabka-operator".into(),
879 );
880 if let Some(cluster) = obj
881 .meta()
882 .labels
883 .as_ref()
884 .and_then(|l| l.get("crabka.io/cluster"))
885 {
886 labels.insert("crabka.io/cluster".into(), cluster.clone());
887 }
888 labels.insert("crabka.io/user".into(), name.clone());
889
890 let mut data: BTreeMap<String, ByteString> = BTreeMap::new();
891 data.insert("password".into(), ByteString(password.as_bytes().to_vec()));
892 data.insert("sasl.jaas.config".into(), ByteString(jaas.into_bytes()));
893
894 Ok(Secret {
895 metadata: ObjectMeta {
896 name: Some(name),
897 namespace: obj.meta().namespace.clone(),
898 labels: Some(labels),
899 owner_references: Some(vec![user_owner_ref(obj)?]),
900 ..Default::default()
901 },
902 type_: Some("Opaque".into()),
903 data: Some(data),
904 ..Default::default()
905 })
906}
907
908fn user_owner_ref(obj: &KafkaUser) -> Result<OwnerReference, ReconcileError> {
909 let uid = obj
910 .meta()
911 .uid
912 .as_deref()
913 .ok_or(ReconcileError::MissingUid)?;
914 Ok(OwnerReference {
915 api_version: <KafkaUser as Resource>::api_version(&()).to_string(),
916 kind: <KafkaUser as Resource>::kind(&()).to_string(),
917 name: obj.name_any(),
918 uid: uid.to_string(),
919 controller: Some(true),
920 block_owner_deletion: Some(true),
921 })
922}
923
924fn has_finalizer(obj: &KafkaUser) -> bool {
927 obj.meta()
928 .finalizers
929 .as_ref()
930 .is_some_and(|f| f.iter().any(|s| s == FINALIZER))
931}
932
933async fn add_finalizer(api: &Api<KafkaUser>, name: &str) -> Result<(), ReconcileError> {
934 let patch = json!({ "metadata": { "finalizers": [FINALIZER] } });
935 let params = PatchParams {
936 field_manager: Some(FIELD_MANAGER.into()),
937 ..Default::default()
938 };
939 api.patch(name, ¶ms, &Patch::Merge(&patch)).await?;
940 Ok(())
941}
942
943async fn remove_finalizer(api: &Api<KafkaUser>, name: &str) -> Result<(), ReconcileError> {
944 let patch = json!({ "metadata": { "finalizers": [] } });
945 let params = PatchParams {
946 field_manager: Some(FIELD_MANAGER.into()),
947 ..Default::default()
948 };
949 api.patch(name, ¶ms, &Patch::Merge(&patch)).await?;
950 Ok(())
951}
952
953#[allow(clippy::struct_excessive_bools)]
957struct StatusPatch<'a> {
958 obj: &'a KafkaUser,
959 status: &'a str,
960 reason: &'a str,
961 message: &'a str,
962 scram_sha512: bool,
963 scram_sha256: bool,
966 tls: bool,
967 external: bool,
970 tls_cert_not_after: Option<String>,
971 tls_principal: Option<String>,
972 advance_generation: bool,
973 quotas_in_sync: bool,
974}
975
976async fn patch_status(
977 api: &Api<KafkaUser>,
978 name: &str,
979 p: StatusPatch<'_>,
980) -> Result<(), ReconcileError> {
981 let conditions = vec![condition("Ready", p.status, p.reason, p.message)];
982 let observed_generation = if p.advance_generation {
983 p.obj.meta().generation
984 } else {
985 p.obj.status.as_ref().and_then(|s| s.observed_generation)
986 };
987 let secret_name = if p.scram_sha512 || p.scram_sha256 || p.tls {
990 Some(name.to_string())
991 } else {
992 p.obj.status.as_ref().and_then(|s| s.secret.clone())
993 };
994 let body = json!({
995 "status": {
996 "conditions": conditions,
997 "observedGeneration": observed_generation,
998 "username": name,
999 "secret": secret_name,
1000 "scramSha512": p.scram_sha512 || p.obj.status.as_ref().is_some_and(|s| s.scram_sha512),
1001 "scramSha256": p.scram_sha256 || p.obj.status.as_ref().is_some_and(|s| s.scram_sha256),
1002 "tls": p.tls || p.obj.status.as_ref().is_some_and(|s| s.tls),
1003 "external": p.external || p.obj.status.as_ref().is_some_and(|s| s.external),
1004 "tlsCertNotAfter": p.tls_cert_not_after,
1005 "tlsPrincipal": p.tls_principal,
1006 "quotasInSync": p.quotas_in_sync,
1007 }
1008 });
1009 let params = PatchParams {
1010 field_manager: Some(FIELD_MANAGER.into()),
1011 ..Default::default()
1012 };
1013 api.patch_status(name, ¶ms, &Patch::Merge(&body))
1014 .await?;
1015 Ok(())
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020 use super::*;
1021 use crate::crd::{
1022 AclOp, AclPatternType, AclPermission, AclResource, AclResourceKind, AclRule,
1023 KafkaUserSimpleAuthorization as SimpleAuthorization,
1024 };
1025 use assert2::assert;
1026
1027 fn rule(kind: AclResourceKind, name: &str, ops: &[AclOp]) -> AclRule {
1028 AclRule {
1029 resource: AclResource {
1030 kind,
1031 name: name.into(),
1032 pattern_type: AclPatternType::Literal,
1033 },
1034 operations: ops.to_vec(),
1035 host: "*".into(),
1036 permission: AclPermission::Allow,
1037 }
1038 }
1039
1040 #[test]
1041 fn principal_uses_user_prefix_for_scram() {
1042 let scram = Authentication::ScramSha512(crate::crd::ScramSha512Auth::default());
1043 assert!(principal_for("alice", &scram) == "User:alice");
1044 }
1045
1046 #[test]
1047 fn principal_for_dispatches_on_auth_type() {
1048 let scram = Authentication::ScramSha512(crate::crd::ScramSha512Auth::default());
1049 let tls = Authentication::Tls(crate::crd::user::TlsAuth::default());
1050 assert!(principal_for("alice", &scram) == "User:alice");
1051 assert!(principal_for("alice", &tls) == "User:CN=alice");
1052 }
1053
1054 #[test]
1055 fn expand_spec_acls_with_no_authorization_is_empty() {
1056 assert!(expand_spec_acls(None, "User:alice").is_empty());
1057 }
1058
1059 #[test]
1060 fn expand_spec_acls_one_rule_one_op() {
1061 let auth = Authorization::Simple(SimpleAuthorization {
1062 acls: vec![rule(AclResourceKind::Topic, "orders", &[AclOp::Read])],
1063 });
1064 let entries = expand_spec_acls(Some(&auth), "User:alice");
1065 assert!(entries.len() == 1);
1066 let e = &entries[0];
1067 assert!(e.resource_type == ResourceType::Topic);
1068 assert!(e.resource_name == "orders");
1069 assert!(e.principal == "User:alice");
1070 assert!(e.operation == AclOperation::Read);
1071 assert!(e.permission_type == PermissionType::Allow);
1072 }
1073
1074 #[test]
1075 fn expand_spec_acls_one_rule_many_ops_fans_out() {
1076 let auth = Authorization::Simple(SimpleAuthorization {
1077 acls: vec![rule(
1078 AclResourceKind::Topic,
1079 "orders",
1080 &[AclOp::Read, AclOp::Describe, AclOp::Write],
1081 )],
1082 });
1083 let entries = expand_spec_acls(Some(&auth), "User:alice");
1084 assert!(entries.len() == 3);
1085 let ops: Vec<_> = entries.iter().map(|e| e.operation).collect();
1086 assert!(ops.contains(&AclOperation::Read));
1087 assert!(ops.contains(&AclOperation::Describe));
1088 assert!(ops.contains(&AclOperation::Write));
1089 }
1090
1091 #[test]
1092 fn diff_acls_additions_and_deletions() {
1093 let mut current = BTreeSet::new();
1094 let keep = AclEntry {
1095 resource_type: ResourceType::Topic,
1096 resource_name: "keep".into(),
1097 pattern_type: PatternType::Literal,
1098 principal: "User:alice".into(),
1099 host: "*".into(),
1100 operation: AclOperation::Read,
1101 permission_type: PermissionType::Allow,
1102 };
1103 let drop = AclEntry {
1104 resource_type: ResourceType::Topic,
1105 resource_name: "drop".into(),
1106 pattern_type: PatternType::Literal,
1107 principal: "User:alice".into(),
1108 host: "*".into(),
1109 operation: AclOperation::Read,
1110 permission_type: PermissionType::Allow,
1111 };
1112 current.insert(keep.clone());
1113 current.insert(drop.clone());
1114
1115 let mut desired = BTreeSet::new();
1116 let add = AclEntry {
1117 resource_type: ResourceType::Group,
1118 resource_name: "g".into(),
1119 pattern_type: PatternType::Literal,
1120 principal: "User:alice".into(),
1121 host: "*".into(),
1122 operation: AclOperation::Read,
1123 permission_type: PermissionType::Allow,
1124 };
1125 desired.insert(keep.clone());
1126 desired.insert(add.clone());
1127
1128 let (adds, dels) = diff_acls(¤t, &desired);
1129 assert!(adds == vec![add]);
1130 assert!(dels == vec![drop]);
1131 }
1132
1133 #[test]
1134 fn diff_acls_noop_when_matching() {
1135 let mut s = BTreeSet::new();
1136 let e = AclEntry {
1137 resource_type: ResourceType::Topic,
1138 resource_name: "x".into(),
1139 pattern_type: PatternType::Literal,
1140 principal: "User:alice".into(),
1141 host: "*".into(),
1142 operation: AclOperation::Read,
1143 permission_type: PermissionType::Allow,
1144 };
1145 s.insert(e);
1146 let (adds, dels) = diff_acls(&s, &s);
1147 assert!(adds.is_empty());
1148 assert!(dels.is_empty());
1149 }
1150
1151 #[test]
1152 fn validate_spec_rejects_empty_operations() {
1153 let spec = crate::crd::KafkaUserSpec {
1154 authentication: Authentication::ScramSha512(crate::crd::ScramSha512Auth::default()),
1155 authorization: Some(Authorization::Simple(SimpleAuthorization {
1156 acls: vec![AclRule {
1157 resource: AclResource {
1158 kind: AclResourceKind::Topic,
1159 name: "x".into(),
1160 pattern_type: AclPatternType::Literal,
1161 },
1162 operations: vec![],
1163 host: "*".into(),
1164 permission: AclPermission::Allow,
1165 }],
1166 })),
1167 quotas: None,
1168 };
1169 let err = validate_spec(&spec).unwrap_err();
1170 assert!(err.contains("operations is empty"), "got: {err}");
1171 }
1172
1173 #[test]
1174 fn validate_spec_rejects_empty_resource_name() {
1175 let spec = crate::crd::KafkaUserSpec {
1176 authentication: Authentication::ScramSha512(crate::crd::ScramSha512Auth::default()),
1177 authorization: Some(Authorization::Simple(SimpleAuthorization {
1178 acls: vec![rule(AclResourceKind::Topic, "", &[AclOp::Read])],
1179 })),
1180 quotas: None,
1181 };
1182 let err = validate_spec(&spec).unwrap_err();
1183 assert!(err.contains("resource.name is empty"), "got: {err}");
1184 }
1185
1186 #[test]
1187 fn entry_to_exact_filter_populates_every_axis() {
1188 let e = AclEntry {
1189 resource_type: ResourceType::Topic,
1190 resource_name: "orders".into(),
1191 pattern_type: PatternType::Literal,
1192 principal: "User:alice".into(),
1193 host: "*".into(),
1194 operation: AclOperation::Read,
1195 permission_type: PermissionType::Allow,
1196 };
1197 let f = entry_to_exact_filter(&e);
1198 assert!(f.resource_type == Some(ResourceType::Topic));
1199 assert!(f.resource_name.as_deref() == Some("orders"));
1200 assert!(f.pattern_type == Some(PatternType::Literal));
1201 assert!(f.principal.as_deref() == Some("User:alice"));
1202 assert!(f.host.as_deref() == Some("*"));
1203 assert!(f.operation == Some(AclOperation::Read));
1204 assert!(f.permission_type == Some(PermissionType::Allow));
1205 }
1206
1207 #[test]
1208 fn random_password_is_base64_and_uniform_length() {
1209 let p1 = random_password(32);
1210 let p2 = random_password(32);
1211 assert!(p1.len() == p2.len()); assert!(p1 != p2);
1213 assert!(
1214 p1.chars()
1215 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_'),
1216 "got: {p1}"
1217 );
1218 }
1219
1220 #[test]
1221 fn principal_for_tls_external_uses_bare_name() {
1222 assert!(principal_for("alice", &Authentication::TlsExternal) == "User:alice");
1226 }
1227
1228 #[test]
1229 fn validate_spec_accepts_tls_external_with_no_authorization_and_no_quotas() {
1230 let spec = crate::crd::KafkaUserSpec {
1231 authentication: Authentication::TlsExternal,
1232 authorization: None,
1233 quotas: None,
1234 };
1235 assert!(validate_spec(&spec).is_ok());
1236 }
1237
1238 #[test]
1239 fn validate_spec_accepts_tls_external_with_acls_and_quotas() {
1240 let spec = crate::crd::KafkaUserSpec {
1241 authentication: Authentication::TlsExternal,
1242 authorization: Some(Authorization::Simple(SimpleAuthorization {
1243 acls: vec![rule(AclResourceKind::Topic, "orders", &[AclOp::Read])],
1244 })),
1245 quotas: Some(crate::crd::KafkaUserQuotas {
1246 producer_byte_rate: Some(1_048_576),
1247 ..Default::default()
1248 }),
1249 };
1250 assert!(validate_spec(&spec).is_ok());
1251 }
1252}