1use std::collections::BTreeMap;
11use std::fmt::Debug;
12
13use k8s_openapi::ByteString;
14use k8s_openapi::api::apps::v1::StatefulSet;
15use k8s_openapi::api::core::v1::{ConfigMap, Secret, Service};
16use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
17use kube::Resource;
18use kube::api::{Api, DynamicObject, Patch, PatchParams, PostParams};
19use kube::core::{ApiResource, GroupVersionKind};
20use serde::Serialize;
21use serde::de::DeserializeOwned;
22use serde_json::json;
23use uuid::Uuid;
24
25use crate::crd::{Kafka, KafkaCondition};
26
27pub(crate) const FIELD_MANAGER: &str = "crabka-operator";
28
29pub(crate) const BROKER_PORT: i32 = 9092;
30pub(crate) const APP_LABEL: &str = "crabka-broker";
31pub(crate) const DEFAULT_BROKER_IMAGE: &str = concat!(
32 "ghcr.io/robot-head/crabka-broker:",
33 env!("CARGO_PKG_VERSION")
34);
35
36#[derive(Debug, thiserror::Error)]
38pub enum ReconcileError {
39 #[error("kube error: {0}")]
40 Kube(#[from] kube::Error),
41 #[error("resource missing uid (not yet admitted)")]
42 MissingUid,
43 #[error("serde error: {0}")]
44 Serde(#[from] serde_json::Error),
45 #[error("spec.replicas={0} is unsupported (only 1 allowed)")]
46 UnsupportedReplicas(i32),
47 #[error("cluster-id secret malformed: {0}")]
48 MalformedSecret(String),
49 #[error("metricsConfig: podMonitor and serviceMonitor are mutually exclusive")]
50 MetricsMutuallyExclusive,
51 #[error("monitoring.coreos.com/v1 is not served by the API server")]
52 PrometheusOperatorCrdsMissing,
53 #[error("malformed input: {0}")]
54 Malformed(String),
55 #[error("CA: {0}")]
56 Ca(#[from] crabka_security::ca::CaError),
57 #[error("cert parse: {0}")]
58 CertParse(String),
59 #[error(
60 "BYO CA missing: {which} requires pre-existing Secret pair (generateCertificateAuthority=false)"
61 )]
62 ByoCaMissing { which: String },
63 #[allow(dead_code)] #[error("BYO CA malformed: {which}: {reason}")]
65 ByoCaMalformed { which: String, reason: String },
66 #[error("CA Secret missing: {name}")]
67 CaSecretMissing { name: String },
68 #[error("oauth trust Secret '{0}' not found")]
69 MissingOauthTrustSecret(String),
70 #[error("oauth trust Secret '{secret}' has no key '{key}'")]
71 MissingOauthTrustKey { secret: String, key: String },
72 #[error("oauth trust Secret '{secret}' key '{key}' is empty")]
73 EmptyOauthTrustValue { secret: String, key: String },
74 #[error("listener OAuth: {0}")]
80 InvalidListenerOauthAccessTokenIsJwt(String),
81 #[error("oauth introspection Secret '{0}' not found")]
84 MissingOauthIntrospectionSecret(String),
85 #[error("oauth introspection Secret '{secret}' has no key '{key}'")]
88 MissingOauthIntrospectionKey { secret: String, key: String },
89 #[error("oauth introspection Secret '{secret}' key '{key}' is empty")]
92 EmptyOauthIntrospectionValue { secret: String, key: String },
93 #[error("gssapi keytab Secret '{0}' not found")]
95 MissingGssapiKeytabSecret(String),
96 #[error("gssapi keytab Secret '{secret}' has no key '{key}'")]
98 MissingGssapiKeytabKey { secret: String, key: String },
99 #[error("krb5.conf Secret '{0}' not found")]
101 MissingKrb5ConfSecret(String),
102 #[error("krb5.conf Secret {secret:?} is missing key {key:?}")]
104 MissingKrb5ConfKey { secret: String, key: String },
105 #[error("tieredStorage: {0}")]
112 TieredStorageInvalid(String),
113
114 #[error("tracing: {0}")]
120 TracingInvalid(String),
121}
122
123pub(crate) fn condition(type_: &str, status: &str, reason: &str, message: &str) -> KafkaCondition {
126 KafkaCondition {
127 type_: type_.into(),
128 status: status.into(),
129 reason: reason.into(),
130 message: message.into(),
131 last_transition_time: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
132 }
133}
134
135pub(crate) async fn apply_object<K>(api: &Api<K>, name: &str, obj: &K) -> Result<(), ReconcileError>
140where
141 K: Resource + Clone + Serialize + DeserializeOwned + Debug,
142{
143 let params = PatchParams {
144 field_manager: Some(FIELD_MANAGER.into()),
145 force: true,
146 ..Default::default()
147 };
148 api.patch(name, ¶ms, &Patch::Apply(obj)).await?;
149 Ok(())
150}
151
152pub(crate) async fn apply_dynamic(
157 client: &kube::Client,
158 namespace: &str,
159 api_version: &str,
160 kind: &str,
161 plural: &str,
162 name: &str,
163 body: &serde_json::Value,
164) -> Result<(), ReconcileError> {
165 let (group, version) = api_version
166 .split_once('/')
167 .ok_or_else(|| ReconcileError::Malformed("apiVersion missing '/'".into()))?;
168 let gvk = GroupVersionKind::gvk(group, version, kind);
169 let ar = ApiResource::from_gvk_with_plural(&gvk, plural);
170 let api: Api<DynamicObject> = Api::namespaced_with(client.clone(), namespace, &ar);
171 let obj: DynamicObject = serde_json::from_value(body.clone())?;
172 let pp = PatchParams::apply(FIELD_MANAGER).force();
173 api.patch(name, &pp, &Patch::Apply(&obj)).await?;
174 Ok(())
175}
176
177pub(crate) async fn patch_status<K, S>(
182 api: &Api<K>,
183 name: &str,
184 status: S,
185) -> Result<(), ReconcileError>
186where
187 K: Resource + Clone + Serialize + DeserializeOwned + Debug,
188 <K as Resource>::DynamicType: Default,
189 S: Serialize,
190{
191 let patch = json!({ "status": status });
192 let params = PatchParams {
193 field_manager: Some(FIELD_MANAGER.into()),
194 ..Default::default()
195 };
196 api.patch_status(name, ¶ms, &Patch::Merge(&patch))
197 .await?;
198 Ok(())
199}
200
201pub(crate) fn common_labels(
206 kafka_name: &str,
207 kafka_version: &str,
208 pool: Option<&str>,
209) -> BTreeMap<String, String> {
210 let mut m = BTreeMap::new();
211 m.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
212 m.insert("app.kubernetes.io/instance".into(), kafka_name.into());
213 m.insert("app.kubernetes.io/version".into(), kafka_version.into());
214 m.insert(
215 "app.kubernetes.io/managed-by".into(),
216 "crabka-operator".into(),
217 );
218 if let Some(p) = pool {
219 m.insert("crabka.io/pool".into(), p.into());
220 }
221 m
222}
223
224pub(crate) fn owner_ref<T>(obj: &T) -> Result<OwnerReference, ReconcileError>
228where
229 T: Resource<DynamicType = ()>,
230{
231 let uid = obj
232 .meta()
233 .uid
234 .as_deref()
235 .ok_or(ReconcileError::MissingUid)?;
236 Ok(OwnerReference {
237 api_version: T::api_version(&()).to_string(),
238 kind: T::kind(&()).to_string(),
239 name: obj.meta().name.clone().unwrap_or_default(),
240 uid: uid.to_string(),
241 controller: Some(true),
242 block_owner_deletion: Some(true),
243 })
244}
245
246pub(crate) fn render_service(owner: &Kafka) -> Result<Service, ReconcileError> {
250 let name = owner.meta().name.clone().unwrap_or_default();
251 let labels = common_labels(&name, &owner.spec.kafka_version, None);
252 let mut selector: BTreeMap<String, String> = BTreeMap::new();
253 selector.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
254 selector.insert("app.kubernetes.io/instance".into(), name.clone());
255
256 let svc: Service = serde_json::from_value(json!({
257 "metadata": {
258 "name": format!("{name}-broker-headless"),
259 "namespace": owner.meta().namespace.clone(),
260 "labels": labels,
261 "ownerReferences": [owner_ref::<Kafka>(owner)?],
262 },
263 "spec": {
264 "clusterIP": "None",
265 "selector": selector,
266 "ports": [{
267 "name": "kafka-internal",
268 "port": BROKER_PORT,
269 "protocol": "TCP",
270 "targetPort": BROKER_PORT,
271 }],
272 }
273 }))?;
274 Ok(svc)
275}
276
277#[allow(clippy::too_many_arguments)] pub(crate) fn render_configmap(
283 owner: &Kafka,
284 listeners: &[crate::crd::Listener],
285 addresses_per_broker: &std::collections::BTreeMap<
286 i32,
287 std::collections::BTreeMap<String, crate::controller::listeners::AdvertisedAddress>,
288 >,
289 inter_broker_listener_name: &str,
290 tls_per_broker: Option<
291 &std::collections::BTreeMap<i32, crate::controller::listeners::BrokerTlsRender>,
292 >,
293 clients_ca_path: Option<&str>,
294 logging_filter: Option<&str>,
295) -> Result<ConfigMap, ReconcileError> {
296 let name = owner.meta().name.clone().unwrap_or_default();
297 let labels = common_labels(&name, &owner.spec.kafka_version, None);
298
299 let mut data = BTreeMap::new();
300 if let Some(filter) = logging_filter {
303 data.insert("rust.log".to_string(), filter.to_string());
304 }
305 let server_properties = owner.spec.config.clone().unwrap_or_default();
311 let delegation_token_enabled = owner.spec.delegation_token.is_some();
317 let authorization = owner.spec.authorization.as_ref();
322 let tiered_storage = owner.spec.tiered_storage.as_ref();
326 let inter_broker_kerberos = owner.spec.inter_broker_kerberos.as_ref();
327 for (broker_id, addrs) in addresses_per_broker {
328 let tls_for_broker = tls_per_broker.and_then(|m| m.get(broker_id));
329 let toml = crate::controller::listeners::render_broker_toml(
330 *broker_id,
331 listeners,
332 addrs,
333 inter_broker_listener_name,
334 &server_properties,
335 tls_for_broker,
336 clients_ca_path,
337 delegation_token_enabled,
338 authorization,
339 tiered_storage,
340 inter_broker_kerberos,
341 );
342 data.insert(format!("broker-{broker_id}.toml"), toml);
343 }
344
345 Ok(ConfigMap {
346 metadata: ObjectMeta {
347 name: Some(format!("{name}-broker-config")),
348 namespace: owner.meta().namespace.clone(),
349 labels: Some(labels),
350 owner_references: Some(vec![owner_ref::<Kafka>(owner)?]),
351 ..Default::default()
352 },
353 data: Some(data),
354 ..Default::default()
355 })
356}
357
358pub(crate) fn render_secret(owner: &Kafka, cluster_id: Uuid) -> Result<Secret, ReconcileError> {
362 let name = owner.meta().name.clone().unwrap_or_default();
363 let labels = common_labels(&name, &owner.spec.kafka_version, None);
364
365 let mut data = BTreeMap::new();
366 data.insert(
367 "clusterId".to_string(),
368 ByteString(cluster_id.to_string().into_bytes()),
369 );
370
371 Ok(Secret {
372 metadata: ObjectMeta {
373 name: Some(format!("{name}-cluster-id")),
374 namespace: owner.meta().namespace.clone(),
375 labels: Some(labels),
376 owner_references: Some(vec![owner_ref::<Kafka>(owner)?]),
377 ..Default::default()
378 },
379 type_: Some("Opaque".into()),
380 data: Some(data),
381 ..Default::default()
382 })
383}
384
385pub(crate) fn uuid_from_secret(secret: &Secret) -> Result<Uuid, ReconcileError> {
390 let data = secret
391 .data
392 .as_ref()
393 .ok_or_else(|| ReconcileError::MalformedSecret("Secret.data is empty".into()))?;
394 let bytes = &data
395 .get("clusterId")
396 .ok_or_else(|| ReconcileError::MalformedSecret("missing clusterId key".into()))?
397 .0;
398 let s = std::str::from_utf8(bytes)
399 .map_err(|e| ReconcileError::MalformedSecret(format!("clusterId not UTF-8: {e}")))?;
400 Uuid::parse_str(s)
401 .map_err(|e| ReconcileError::MalformedSecret(format!("clusterId not a UUID: {e}")))
402}
403
404pub(crate) fn read_pem_key(secret: &Secret, key: &str) -> Option<String> {
407 let data = secret.data.as_ref()?;
408 let bytes = &data.get(key)?.0;
409 String::from_utf8(bytes.clone()).ok()
410}
411
412pub(crate) async fn ensure_cluster_id_secret(
419 secret_api: &Api<Secret>,
420 parent: &Kafka,
421) -> Result<Uuid, ReconcileError> {
422 let name = parent.meta().name.clone().unwrap_or_default();
423 let secret_name = format!("{name}-cluster-id");
424 if let Some(existing) = secret_api.get_opt(&secret_name).await? {
425 return uuid_from_secret(&existing);
426 }
427 let id = Uuid::new_v4();
428 let secret = render_secret(parent, id)?;
429 secret_api.create(&PostParams::default(), &secret).await?;
430 Ok(id)
431}
432
433pub(crate) fn derive_status(
437 live: Option<&StatefulSet>,
438 desired_replicas: i32,
439) -> (Option<i32>, Option<i32>, &'static str, String) {
440 let (replicas, ready_replicas) = live
441 .and_then(|s| s.status.as_ref())
442 .map_or((None, None), |st| (Some(st.replicas), st.ready_replicas));
443
444 let ready_count = ready_replicas.unwrap_or(0);
445 if ready_count == desired_replicas {
446 (
447 replicas,
448 ready_replicas,
449 "Available",
450 format!("{desired_replicas} broker(s) ready"),
451 )
452 } else if ready_count == 0 {
453 (
454 replicas,
455 ready_replicas,
456 "NoBrokersReady",
457 format!("0/{desired_replicas} brokers ready"),
458 )
459 } else {
460 (
461 replicas,
462 ready_replicas,
463 "PartiallyReady",
464 format!("{ready_count}/{desired_replicas} brokers ready"),
465 )
466 }
467}
468
469#[must_use]
478pub fn config_hash(content: &str) -> String {
479 use std::fmt::Write;
480
481 use sha2::{Digest, Sha256};
482 let mut h = Sha256::new();
483 h.update(content.as_bytes());
484 let digest = h.finalize();
485 let mut out = String::with_capacity(16);
486 for byte in digest.iter().take(8) {
487 write!(&mut out, "{byte:02x}").expect("writing to a String never fails");
488 }
489 out
490}
491
492#[must_use]
524pub fn combined_config_hash(
525 spec: &crate::crd::KafkaSpec,
526 cluster_ca_cert_pem: Option<&str>,
527 metadata_version_pin: Option<&str>,
528 logging_filter: Option<&str>,
529) -> String {
530 let config_part = spec
531 .config
532 .as_ref()
533 .map(|m| {
534 let mut s = String::new();
535 for (k, v) in m {
536 s.push_str(k);
537 s.push('=');
538 s.push_str(v);
539 s.push('\n');
540 }
541 s
542 })
543 .unwrap_or_default();
544 let intent = crate::controller::listeners::canonical_listener_intent(
545 &spec.listeners,
546 spec.inter_broker_listener_name.as_deref(),
547 );
548 let metrics_part = if spec.metrics_config.is_some() {
549 "metrics=on"
550 } else {
551 ""
552 };
553 let ca_part = cluster_ca_cert_pem.unwrap_or("");
554 let metadata_part = metadata_version_pin.unwrap_or("");
555 let logging_part = logging_filter.unwrap_or("");
556 if intent.is_empty()
563 && metrics_part.is_empty()
564 && ca_part.is_empty()
565 && metadata_part.is_empty()
566 && logging_part.is_empty()
567 {
568 return config_hash(&config_part);
569 }
570 let mut buf = String::with_capacity(
571 config_part.len()
572 + 5
573 + intent.len()
574 + metrics_part.len()
575 + ca_part.len()
576 + metadata_part.len()
577 + logging_part.len(),
578 );
579 buf.push_str(&config_part);
580 buf.push('\x1F'); buf.push_str(&intent);
582 buf.push('\x1F');
583 buf.push_str(metrics_part);
584 buf.push('\x1F');
585 buf.push_str(ca_part);
586 buf.push('\x1F');
587 buf.push_str(metadata_part);
588 buf.push('\x1F');
589 buf.push_str(logging_part);
590 config_hash(&buf)
591}
592
593#[derive(Debug, Clone, PartialEq, Eq)]
597pub(crate) struct PoolRolloutState {
598 pub name: String,
599 pub current_hash: Option<String>,
600 pub ready: bool,
601}
602
603pub(crate) fn plan_rollout(pools: &[PoolRolloutState], desired: &str) -> Vec<(String, String)> {
623 let all_have_hash = pools.iter().all(|p| p.current_hash.is_some());
624 let distinct_non_desired: std::collections::BTreeSet<&str> = pools
625 .iter()
626 .filter_map(|p| p.current_hash.as_deref())
627 .filter(|h| *h != desired)
628 .collect();
629
630 if !all_have_hash || distinct_non_desired.len() > 1 {
632 return pools
633 .iter()
634 .map(|p| (p.name.clone(), desired.to_string()))
635 .collect();
636 }
637
638 let mut gate_open = true;
640 let mut out = Vec::with_capacity(pools.len());
641 for p in pools {
642 if gate_open {
643 let converged = p.current_hash.as_deref() == Some(desired) && p.ready;
644 out.push((p.name.clone(), desired.to_string()));
646 if !converged {
647 gate_open = false;
650 }
651 } else {
652 let keep = p
654 .current_hash
655 .clone()
656 .unwrap_or_else(|| desired.to_string());
657 out.push((p.name.clone(), keep));
658 }
659 }
660 out
661}
662
663#[allow(
685 clippy::cast_precision_loss,
686 clippy::cast_possible_truncation,
687 clippy::cast_sign_loss
688)]
689pub(crate) fn parse_quantity(s: &str) -> Result<i128, &'static str> {
690 if s.is_empty() {
691 return Err("empty quantity string");
692 }
693
694 let (mantissa_str, multiplier): (&str, i128) = if let Some(rest) = s.strip_suffix("Ki") {
695 (rest, 1_024)
696 } else if let Some(rest) = s.strip_suffix("Mi") {
697 (rest, 1_024_i128.pow(2))
698 } else if let Some(rest) = s.strip_suffix("Gi") {
699 (rest, 1_024_i128.pow(3))
700 } else if let Some(rest) = s.strip_suffix("Ti") {
701 (rest, 1_024_i128.pow(4))
702 } else if let Some(rest) = s.strip_suffix("Pi") {
703 (rest, 1_024_i128.pow(5))
704 } else if let Some(rest) = s.strip_suffix("Ei") {
705 (rest, 1_024_i128.pow(6))
706 } else if let Some(rest) = s.strip_suffix('K') {
707 (rest, 1_000)
708 } else if let Some(rest) = s.strip_suffix('M') {
709 (rest, 1_000_000)
710 } else if let Some(rest) = s.strip_suffix('G') {
711 (rest, 1_000_000_000)
712 } else if let Some(rest) = s.strip_suffix('T') {
713 (rest, 1_000_000_000_000)
714 } else if let Some(rest) = s.strip_suffix('P') {
715 (rest, 1_000_000_000_000_000)
716 } else if let Some(rest) = s.strip_suffix('E') {
717 (rest, 1_000_000_000_000_000_000)
718 } else {
719 (s, 1)
720 };
721
722 if mantissa_str.is_empty() {
723 return Err("missing numeric mantissa before suffix");
724 }
725 if mantissa_str.contains(['e', 'E']) {
726 return Err("scientific notation not supported");
727 }
728 if mantissa_str.starts_with('-') {
729 return Err("negative quantity rejected");
730 }
731
732 let mantissa: f64 = mantissa_str
733 .parse()
734 .map_err(|_| "mantissa is not a valid number")?;
735 if !mantissa.is_finite() {
736 return Err("mantissa is not finite");
737 }
738 if mantissa <= 0.0 {
739 return Err("quantity must be strictly positive");
740 }
741
742 let bytes = mantissa * multiplier as f64;
743 if bytes > i128::MAX as f64 {
744 return Err("quantity overflows i128");
745 }
746 Ok(bytes as i128)
747}
748
749#[cfg(test)]
750mod config_hash_tests {
751 use super::*;
752 use assert2::assert;
753
754 #[test]
755 fn config_hash_is_truncated_sha256_hex() {
756 let h = config_hash("hello");
760 assert!(h == "2cf24dba5fb0a30e");
761 assert!(h.len() == 16, "must fit within K8s 63-char label limit");
762 }
763
764 #[test]
765 fn config_hash_empty_string() {
766 let h = config_hash("");
768 assert!(h == "e3b0c44298fc1c14");
769 }
770
771 #[test]
772 fn config_hash_fits_in_kubernetes_label_value() {
773 let h = config_hash("any content at all");
776 assert!(h.len() <= 63, "hash {h} exceeds K8s label limit");
777 }
778
779 #[test]
780 fn combined_hash_unchanged_when_listeners_empty() {
781 use crate::crd::KafkaSpec;
782
783 let spec_a = KafkaSpec {
784 kafka_version: "0.1.1".into(),
785 metadata_version: None,
786 config: Some({
787 let mut m = std::collections::BTreeMap::new();
788 m.insert("log.retention.hours".into(), "24".into());
789 m
790 }),
791 listeners: vec![],
792 inter_broker_listener_name: None,
793 metrics_config: None,
794 network_policy: None,
795 cluster_ca: None,
796 clients_ca: None,
797 logging: None,
798 delegation_token: None,
799 authorization: None,
800 tiered_storage: None,
801 inter_broker_kerberos: None,
802 krb5_conf_secret_ref: None,
803 tracing: None,
804 };
805 let h = combined_config_hash(&spec_a, None, None, None);
806 let h_again = combined_config_hash(&spec_a, None, None, None);
807 assert!(h == h_again);
808
809 let config_only_form = "log.retention.hours=24\n";
815 assert!(
816 h == config_hash(config_only_form),
817 "combined hash for empty listeners must equal config_hash(spec.config)"
818 );
819
820 let mut spec_b = spec_a.clone();
821 spec_b.listeners = vec![crate::controller::listeners::synthesized_default_listener()];
822 spec_b.inter_broker_listener_name = Some("PLAIN".into());
823 let h_with_listener = combined_config_hash(&spec_b, None, None, None);
824 assert!(
825 h != h_with_listener,
826 "non-empty listener intent must change hash"
827 );
828 }
829
830 #[test]
831 fn combined_hash_flips_when_metrics_config_toggles() {
832 use crate::crd::{KafkaSpec, MetricsConfig, PodMonitorSpec};
833
834 let spec_off = KafkaSpec {
835 kafka_version: "0.1.1".into(),
836 metadata_version: None,
837 config: None,
838 listeners: vec![],
839 inter_broker_listener_name: None,
840 metrics_config: None,
841 network_policy: None,
842 cluster_ca: None,
843 clients_ca: None,
844 logging: None,
845 delegation_token: None,
846 authorization: None,
847 tiered_storage: None,
848 inter_broker_kerberos: None,
849 krb5_conf_secret_ref: None,
850 tracing: None,
851 };
852 let h_off = combined_config_hash(&spec_off, None, None, None);
853
854 let mut spec_on = spec_off.clone();
855 spec_on.metrics_config = Some(MetricsConfig {
856 pod_monitor: Some(PodMonitorSpec::default()),
857 ..Default::default()
858 });
859 let h_on = combined_config_hash(&spec_on, None, None, None);
860 assert!(
861 h_off != h_on,
862 "enabling metrics_config must bump the hash (triggers pool reconcile + StatefulSet re-render)"
863 );
864
865 let mut spec_on_diff_interval = spec_on.clone();
869 if let Some(cfg) = spec_on_diff_interval.metrics_config.as_mut() {
870 cfg.pod_monitor = Some(PodMonitorSpec {
871 interval: Some("60s".into()),
872 ..Default::default()
873 });
874 }
875 assert!(
876 h_on == combined_config_hash(&spec_on_diff_interval, None, None, None),
877 "PodMonitor interval change must NOT roll the broker pod"
878 );
879 }
880
881 #[test]
882 fn combined_hash_changes_when_cluster_ca_cert_changes() {
883 let spec = crate::crd::KafkaSpec {
884 kafka_version: "0.1.1".into(),
885 metadata_version: None,
886 config: None,
887 listeners: vec![],
888 inter_broker_listener_name: None,
889 metrics_config: None,
890 network_policy: None,
891 cluster_ca: None,
892 clients_ca: None,
893 logging: None,
894 delegation_token: None,
895 authorization: None,
896 tiered_storage: None,
897 inter_broker_kerberos: None,
898 krb5_conf_secret_ref: None,
899 tracing: None,
900 };
901 let h_none = combined_config_hash(&spec, None, None, None);
902 let h_a = combined_config_hash(
903 &spec,
904 Some("-----BEGIN CERTIFICATE-----\nA\n-----END CERTIFICATE-----\n"),
905 None,
906 None,
907 );
908 let h_b = combined_config_hash(
909 &spec,
910 Some("-----BEGIN CERTIFICATE-----\nB\n-----END CERTIFICATE-----\n"),
911 None,
912 None,
913 );
914 assert!(h_none != h_a, "absent vs present CA must differ");
915 assert!(h_a != h_b, "different CA PEM must differ");
916 }
917
918 #[test]
919 fn combined_hash_stable_under_broker_keystore_changes() {
920 let spec = crate::crd::KafkaSpec {
925 kafka_version: "0.1.1".into(),
926 metadata_version: None,
927 config: None,
928 listeners: vec![],
929 inter_broker_listener_name: None,
930 metrics_config: None,
931 network_policy: None,
932 cluster_ca: None,
933 clients_ca: None,
934 logging: None,
935 delegation_token: None,
936 authorization: None,
937 tiered_storage: None,
938 inter_broker_kerberos: None,
939 krb5_conf_secret_ref: None,
940 tracing: None,
941 };
942 let h1 = combined_config_hash(&spec, Some("ca-pem"), None, None);
943 let h2 = combined_config_hash(&spec, Some("ca-pem"), None, None);
944 assert!(h1 == h2);
945 }
946
947 #[test]
948 fn configmap_has_one_toml_key_per_broker() {
949 use crate::controller::listeners::{AdvertisedAddress, synthesized_default_listener};
950 use crate::crd::KafkaSpec;
951
952 let mut k = Kafka::new(
953 "demo",
954 KafkaSpec {
955 kafka_version: "0.1.1".into(),
956 metadata_version: None,
957 config: None,
958 listeners: vec![],
959 inter_broker_listener_name: None,
960 metrics_config: None,
961 network_policy: None,
962 cluster_ca: None,
963 clients_ca: None,
964 logging: None,
965 delegation_token: None,
966 authorization: None,
967 tiered_storage: None,
968 inter_broker_kerberos: None,
969 krb5_conf_secret_ref: None,
970 tracing: None,
971 },
972 );
973 k.meta_mut().namespace = Some("default".into());
974 k.meta_mut().uid = Some("uid".into());
975
976 let listeners = vec![synthesized_default_listener()];
977 let mut per_broker = std::collections::BTreeMap::new();
978 let mut addrs0 = std::collections::BTreeMap::new();
979 addrs0.insert(
980 "PLAIN".into(),
981 AdvertisedAddress {
982 host: "demo-0.svc".into(),
983 port: 9092,
984 },
985 );
986 let mut addrs1 = std::collections::BTreeMap::new();
987 addrs1.insert(
988 "PLAIN".into(),
989 AdvertisedAddress {
990 host: "demo-1.svc".into(),
991 port: 9092,
992 },
993 );
994 per_broker.insert(0i32, addrs0);
995 per_broker.insert(1i32, addrs1);
996
997 let cm = render_configmap(&k, &listeners, &per_broker, "PLAIN", None, None, None).unwrap();
998 let data = cm.data.unwrap();
999 assert!(data.contains_key("broker-0.toml"));
1000 assert!(data.contains_key("broker-1.toml"));
1001 assert!(data["broker-0.toml"].contains("demo-0.svc"));
1002 assert!(data["broker-1.toml"].contains("demo-1.svc"));
1003 assert!(!data.contains_key("broker.env"));
1005 assert!(!data.contains_key("broker.properties"));
1006 }
1007
1008 #[test]
1009 fn combined_hash_changes_when_metadata_version_pin_set() {
1010 use crate::crd::KafkaSpec;
1011
1012 let spec = KafkaSpec {
1013 kafka_version: "3.7.0".into(),
1014 metadata_version: None,
1015 config: None,
1016 listeners: vec![],
1017 inter_broker_listener_name: None,
1018 metrics_config: None,
1019 network_policy: None,
1020 cluster_ca: None,
1021 clients_ca: None,
1022 logging: None,
1023 delegation_token: None,
1024 authorization: None,
1025 tiered_storage: None,
1026 inter_broker_kerberos: None,
1027 krb5_conf_secret_ref: None,
1028 tracing: None,
1029 };
1030 let h_default = combined_config_hash(&spec, None, None, None);
1033 assert!(h_default == config_hash(""));
1034
1035 let h_pin = combined_config_hash(&spec, None, Some("3.6"), None);
1037 assert!(h_default != h_pin, "explicit metadata pin must change hash");
1038 let h_pin2 = combined_config_hash(&spec, None, Some("3.7"), None);
1040 assert!(h_pin != h_pin2, "different metadata pin must differ");
1041 }
1042
1043 #[test]
1044 fn configmap_never_injects_metadata_version_into_server_properties() {
1045 use crate::controller::listeners::{AdvertisedAddress, synthesized_default_listener};
1046 use crate::crd::KafkaSpec;
1047
1048 let mut k = Kafka::new(
1052 "demo",
1053 KafkaSpec {
1054 kafka_version: "3.7.0".into(),
1055 metadata_version: Some("3.6".into()),
1056 config: None,
1057 listeners: vec![],
1058 inter_broker_listener_name: None,
1059 metrics_config: None,
1060 network_policy: None,
1061 cluster_ca: None,
1062 clients_ca: None,
1063 logging: None,
1064 delegation_token: None,
1065 authorization: None,
1066 tiered_storage: None,
1067 inter_broker_kerberos: None,
1068 krb5_conf_secret_ref: None,
1069 tracing: None,
1070 },
1071 );
1072 k.meta_mut().namespace = Some("default".into());
1073 k.meta_mut().uid = Some("uid".into());
1074
1075 let listeners = vec![synthesized_default_listener()];
1076 let mut per_broker = std::collections::BTreeMap::new();
1077 let mut addrs0 = std::collections::BTreeMap::new();
1078 addrs0.insert(
1079 "PLAIN".into(),
1080 AdvertisedAddress {
1081 host: "demo-0.svc".into(),
1082 port: 9092,
1083 },
1084 );
1085 per_broker.insert(0i32, addrs0);
1086
1087 let cm = render_configmap(&k, &listeners, &per_broker, "PLAIN", None, None, None).unwrap();
1088 let toml = &cm.data.unwrap()["broker-0.toml"];
1089 assert!(
1090 !toml.contains("metadata.version"),
1091 "metadata.version must never be injected into broker config, got:\n{toml}"
1092 );
1093 }
1094}
1095
1096#[cfg(test)]
1097mod rollout_tests {
1098 use super::{PoolRolloutState, plan_rollout};
1099 use assert2::assert;
1100
1101 fn st(name: &str, hash: Option<&str>, ready: bool) -> PoolRolloutState {
1102 PoolRolloutState {
1103 name: name.into(),
1104 current_hash: hash.map(str::to_string),
1105 ready,
1106 }
1107 }
1108
1109 fn targets(plan: &[(String, String)]) -> Vec<(&str, &str)> {
1110 plan.iter().map(|(n, h)| (n.as_str(), h.as_str())).collect()
1111 }
1112
1113 #[test]
1114 fn bring_up_all_get_desired_when_no_hash() {
1115 let pools = vec![
1118 st("a", None, false),
1119 st("b", None, false),
1120 st("c", None, false),
1121 ];
1122 let plan = plan_rollout(&pools, "H1");
1123 assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1"), ("c", "H1")]);
1124 }
1125
1126 #[test]
1127 fn single_pool_first_reconcile_gets_desired() {
1128 let pools = vec![st("only", None, false)];
1129 assert!(targets(&plan_rollout(&pools, "H1")) == vec![("only", "H1")]);
1130 }
1131
1132 #[test]
1133 fn single_pool_roll_advances() {
1134 let pools = vec![st("only", Some("H0"), true)];
1136 assert!(targets(&plan_rollout(&pools, "H1")) == vec![("only", "H1")]);
1137 }
1138
1139 #[test]
1140 fn steady_state_all_desired_is_noop() {
1141 let pools = vec![st("a", Some("H1"), true), st("b", Some("H1"), true)];
1142 assert!(targets(&plan_rollout(&pools, "H1")) == vec![("a", "H1"), ("b", "H1")]);
1143 }
1144
1145 #[test]
1146 fn established_roll_advances_first_pool_only() {
1147 let pools = vec![
1150 st("a", Some("H0"), true),
1151 st("b", Some("H0"), true),
1152 st("c", Some("H0"), true),
1153 ];
1154 let plan = plan_rollout(&pools, "H1");
1155 assert!(targets(&plan) == vec![("a", "H1"), ("b", "H0"), ("c", "H0")]);
1156 }
1157
1158 #[test]
1159 fn established_roll_holds_later_pools_until_first_ready() {
1160 let pools = vec![
1162 st("a", Some("H1"), false),
1163 st("b", Some("H0"), true),
1164 st("c", Some("H0"), true),
1165 ];
1166 let plan = plan_rollout(&pools, "H1");
1167 assert!(targets(&plan) == vec![("a", "H1"), ("b", "H0"), ("c", "H0")]);
1168 }
1169
1170 #[test]
1171 fn established_roll_advances_next_after_prefix_converges() {
1172 let pools = vec![
1174 st("a", Some("H1"), true),
1175 st("b", Some("H0"), true),
1176 st("c", Some("H0"), true),
1177 ];
1178 let plan = plan_rollout(&pools, "H1");
1179 assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1"), ("c", "H0")]);
1180 }
1181
1182 #[test]
1183 fn messy_multiple_old_hashes_falls_back_to_all_desired() {
1184 let pools = vec![st("a", Some("H0"), true), st("b", Some("HX"), true)];
1187 let plan = plan_rollout(&pools, "H1");
1188 assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1")]);
1189 }
1190}
1191
1192#[cfg(test)]
1193mod parse_quantity_tests {
1194 use super::parse_quantity;
1195 use assert2::assert;
1196
1197 #[test]
1198 fn quantity_parse_binary_suffixes() {
1199 assert!(parse_quantity("1Ki").unwrap() == 1024);
1200 assert!(parse_quantity("512Mi").unwrap() == 512 * 1024 * 1024);
1201 assert!(parse_quantity("10Gi").unwrap() == 10 * 1024 * 1024 * 1024);
1202 }
1203
1204 #[test]
1205 fn quantity_parse_decimal_suffixes() {
1206 assert!(parse_quantity("1K").unwrap() == 1_000);
1207 assert!(parse_quantity("500M").unwrap() == 500_000_000);
1208 assert!(parse_quantity("10G").unwrap() == 10_000_000_000);
1209 }
1210
1211 #[test]
1212 fn quantity_parse_decimal_mantissa() {
1213 assert!(parse_quantity("1.5Gi").unwrap() == 1_610_612_736);
1215 }
1216
1217 #[test]
1218 fn quantity_parse_no_suffix_is_bytes() {
1219 assert!(parse_quantity("1024").unwrap() == 1024);
1220 }
1221
1222 #[test]
1223 fn quantity_parse_rejects_garbage() {
1224 assert!(parse_quantity("").is_err());
1225 assert!(parse_quantity("banana").is_err());
1226 assert!(parse_quantity("1.5x").is_err());
1227 assert!(parse_quantity("Gi").is_err());
1228 assert!(parse_quantity("1e3").is_err());
1230 }
1231
1232 #[test]
1233 fn quantity_parse_zero_and_negative_are_errors() {
1234 assert!(parse_quantity("0").is_err());
1235 assert!(parse_quantity("0Gi").is_err());
1236 assert!(parse_quantity("-10Gi").is_err());
1237 }
1238}