use std::collections::BTreeMap;
use std::fmt::Debug;
use k8s_openapi::ByteString;
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::core::v1::{ConfigMap, Secret, Service};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
use kube::Resource;
use kube::api::{Api, DynamicObject, Patch, PatchParams, PostParams};
use kube::core::{ApiResource, GroupVersionKind};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::json;
use uuid::Uuid;
use crate::crd::{Kafka, KafkaCondition};
pub(crate) const FIELD_MANAGER: &str = "crabka-operator";
pub(crate) const BROKER_PORT: i32 = 9092;
pub(crate) const APP_LABEL: &str = "crabka-broker";
pub(crate) const DEFAULT_BROKER_IMAGE: &str = concat!(
"ghcr.io/robot-head/crabka-broker:",
env!("CARGO_PKG_VERSION")
);
#[derive(Debug, thiserror::Error)]
pub enum ReconcileError {
#[error("kube error: {0}")]
Kube(#[from] kube::Error),
#[error("resource missing uid (not yet admitted)")]
MissingUid,
#[error("serde error: {0}")]
Serde(#[from] serde_json::Error),
#[error("spec.replicas={0} is unsupported (only 1 allowed)")]
UnsupportedReplicas(i32),
#[error("cluster-id secret malformed: {0}")]
MalformedSecret(String),
#[error("metricsConfig: podMonitor and serviceMonitor are mutually exclusive")]
MetricsMutuallyExclusive,
#[error("monitoring.coreos.com/v1 is not served by the API server")]
PrometheusOperatorCrdsMissing,
#[error("malformed input: {0}")]
Malformed(String),
#[error("CA: {0}")]
Ca(#[from] crabka_security::ca::CaError),
#[error("cert parse: {0}")]
CertParse(String),
#[error(
"BYO CA missing: {which} requires pre-existing Secret pair (generateCertificateAuthority=false)"
)]
ByoCaMissing { which: String },
#[allow(dead_code)] #[error("BYO CA malformed: {which}: {reason}")]
ByoCaMalformed { which: String, reason: String },
#[error("CA Secret missing: {name}")]
CaSecretMissing { name: String },
#[error("oauth trust Secret '{0}' not found")]
MissingOauthTrustSecret(String),
#[error("oauth trust Secret '{secret}' has no key '{key}'")]
MissingOauthTrustKey { secret: String, key: String },
#[error("oauth trust Secret '{secret}' key '{key}' is empty")]
EmptyOauthTrustValue { secret: String, key: String },
#[error("listener OAuth: {0}")]
InvalidListenerOauthAccessTokenIsJwt(String),
#[error("oauth introspection Secret '{0}' not found")]
MissingOauthIntrospectionSecret(String),
#[error("oauth introspection Secret '{secret}' has no key '{key}'")]
MissingOauthIntrospectionKey { secret: String, key: String },
#[error("oauth introspection Secret '{secret}' key '{key}' is empty")]
EmptyOauthIntrospectionValue { secret: String, key: String },
#[error("gssapi keytab Secret '{0}' not found")]
MissingGssapiKeytabSecret(String),
#[error("gssapi keytab Secret '{secret}' has no key '{key}'")]
MissingGssapiKeytabKey { secret: String, key: String },
#[error("krb5.conf Secret '{0}' not found")]
MissingKrb5ConfSecret(String),
#[error("krb5.conf Secret {secret:?} is missing key {key:?}")]
MissingKrb5ConfKey { secret: String, key: String },
#[error("tieredStorage: {0}")]
TieredStorageInvalid(String),
#[error("tracing: {0}")]
TracingInvalid(String),
}
pub(crate) fn condition(type_: &str, status: &str, reason: &str, message: &str) -> KafkaCondition {
KafkaCondition {
type_: type_.into(),
status: status.into(),
reason: reason.into(),
message: message.into(),
last_transition_time: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
}
}
pub(crate) async fn apply_object<K>(api: &Api<K>, name: &str, obj: &K) -> Result<(), ReconcileError>
where
K: Resource + Clone + Serialize + DeserializeOwned + Debug,
{
let params = PatchParams {
field_manager: Some(FIELD_MANAGER.into()),
force: true,
..Default::default()
};
api.patch(name, ¶ms, &Patch::Apply(obj)).await?;
Ok(())
}
pub(crate) async fn apply_dynamic(
client: &kube::Client,
namespace: &str,
api_version: &str,
kind: &str,
plural: &str,
name: &str,
body: &serde_json::Value,
) -> Result<(), ReconcileError> {
let (group, version) = api_version
.split_once('/')
.ok_or_else(|| ReconcileError::Malformed("apiVersion missing '/'".into()))?;
let gvk = GroupVersionKind::gvk(group, version, kind);
let ar = ApiResource::from_gvk_with_plural(&gvk, plural);
let api: Api<DynamicObject> = Api::namespaced_with(client.clone(), namespace, &ar);
let obj: DynamicObject = serde_json::from_value(body.clone())?;
let pp = PatchParams::apply(FIELD_MANAGER).force();
api.patch(name, &pp, &Patch::Apply(&obj)).await?;
Ok(())
}
pub(crate) async fn patch_status<K, S>(
api: &Api<K>,
name: &str,
status: S,
) -> Result<(), ReconcileError>
where
K: Resource + Clone + Serialize + DeserializeOwned + Debug,
<K as Resource>::DynamicType: Default,
S: Serialize,
{
let patch = json!({ "status": status });
let params = PatchParams {
field_manager: Some(FIELD_MANAGER.into()),
..Default::default()
};
api.patch_status(name, ¶ms, &Patch::Merge(&patch))
.await?;
Ok(())
}
pub(crate) fn common_labels(
kafka_name: &str,
kafka_version: &str,
pool: Option<&str>,
) -> BTreeMap<String, String> {
let mut m = BTreeMap::new();
m.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
m.insert("app.kubernetes.io/instance".into(), kafka_name.into());
m.insert("app.kubernetes.io/version".into(), kafka_version.into());
m.insert(
"app.kubernetes.io/managed-by".into(),
"crabka-operator".into(),
);
if let Some(p) = pool {
m.insert("crabka.io/pool".into(), p.into());
}
m
}
pub(crate) fn owner_ref<T>(obj: &T) -> Result<OwnerReference, ReconcileError>
where
T: Resource<DynamicType = ()>,
{
let uid = obj
.meta()
.uid
.as_deref()
.ok_or(ReconcileError::MissingUid)?;
Ok(OwnerReference {
api_version: T::api_version(&()).to_string(),
kind: T::kind(&()).to_string(),
name: obj.meta().name.clone().unwrap_or_default(),
uid: uid.to_string(),
controller: Some(true),
block_owner_deletion: Some(true),
})
}
pub(crate) fn render_service(owner: &Kafka) -> Result<Service, ReconcileError> {
let name = owner.meta().name.clone().unwrap_or_default();
let labels = common_labels(&name, &owner.spec.kafka_version, None);
let mut selector: BTreeMap<String, String> = BTreeMap::new();
selector.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
selector.insert("app.kubernetes.io/instance".into(), name.clone());
let svc: Service = serde_json::from_value(json!({
"metadata": {
"name": format!("{name}-broker-headless"),
"namespace": owner.meta().namespace.clone(),
"labels": labels,
"ownerReferences": [owner_ref::<Kafka>(owner)?],
},
"spec": {
"clusterIP": "None",
"selector": selector,
"ports": [{
"name": "kafka-internal",
"port": BROKER_PORT,
"protocol": "TCP",
"targetPort": BROKER_PORT,
}],
}
}))?;
Ok(svc)
}
#[allow(clippy::too_many_arguments)] pub(crate) fn render_configmap(
owner: &Kafka,
listeners: &[crate::crd::Listener],
addresses_per_broker: &std::collections::BTreeMap<
i32,
std::collections::BTreeMap<String, crate::controller::listeners::AdvertisedAddress>,
>,
inter_broker_listener_name: &str,
tls_per_broker: Option<
&std::collections::BTreeMap<i32, crate::controller::listeners::BrokerTlsRender>,
>,
clients_ca_path: Option<&str>,
logging_filter: Option<&str>,
) -> Result<ConfigMap, ReconcileError> {
let name = owner.meta().name.clone().unwrap_or_default();
let labels = common_labels(&name, &owner.spec.kafka_version, None);
let mut data = BTreeMap::new();
if let Some(filter) = logging_filter {
data.insert("rust.log".to_string(), filter.to_string());
}
let server_properties = owner.spec.config.clone().unwrap_or_default();
let delegation_token_enabled = owner.spec.delegation_token.is_some();
let authorization = owner.spec.authorization.as_ref();
let tiered_storage = owner.spec.tiered_storage.as_ref();
let inter_broker_kerberos = owner.spec.inter_broker_kerberos.as_ref();
for (broker_id, addrs) in addresses_per_broker {
let tls_for_broker = tls_per_broker.and_then(|m| m.get(broker_id));
let toml = crate::controller::listeners::render_broker_toml(
*broker_id,
listeners,
addrs,
inter_broker_listener_name,
&server_properties,
tls_for_broker,
clients_ca_path,
delegation_token_enabled,
authorization,
tiered_storage,
inter_broker_kerberos,
);
data.insert(format!("broker-{broker_id}.toml"), toml);
}
Ok(ConfigMap {
metadata: ObjectMeta {
name: Some(format!("{name}-broker-config")),
namespace: owner.meta().namespace.clone(),
labels: Some(labels),
owner_references: Some(vec![owner_ref::<Kafka>(owner)?]),
..Default::default()
},
data: Some(data),
..Default::default()
})
}
pub(crate) fn render_secret(owner: &Kafka, cluster_id: Uuid) -> Result<Secret, ReconcileError> {
let name = owner.meta().name.clone().unwrap_or_default();
let labels = common_labels(&name, &owner.spec.kafka_version, None);
let mut data = BTreeMap::new();
data.insert(
"clusterId".to_string(),
ByteString(cluster_id.to_string().into_bytes()),
);
Ok(Secret {
metadata: ObjectMeta {
name: Some(format!("{name}-cluster-id")),
namespace: owner.meta().namespace.clone(),
labels: Some(labels),
owner_references: Some(vec![owner_ref::<Kafka>(owner)?]),
..Default::default()
},
type_: Some("Opaque".into()),
data: Some(data),
..Default::default()
})
}
pub(crate) fn uuid_from_secret(secret: &Secret) -> Result<Uuid, ReconcileError> {
let data = secret
.data
.as_ref()
.ok_or_else(|| ReconcileError::MalformedSecret("Secret.data is empty".into()))?;
let bytes = &data
.get("clusterId")
.ok_or_else(|| ReconcileError::MalformedSecret("missing clusterId key".into()))?
.0;
let s = std::str::from_utf8(bytes)
.map_err(|e| ReconcileError::MalformedSecret(format!("clusterId not UTF-8: {e}")))?;
Uuid::parse_str(s)
.map_err(|e| ReconcileError::MalformedSecret(format!("clusterId not a UUID: {e}")))
}
pub(crate) fn read_pem_key(secret: &Secret, key: &str) -> Option<String> {
let data = secret.data.as_ref()?;
let bytes = &data.get(key)?.0;
String::from_utf8(bytes.clone()).ok()
}
pub(crate) async fn ensure_cluster_id_secret(
secret_api: &Api<Secret>,
parent: &Kafka,
) -> Result<Uuid, ReconcileError> {
let name = parent.meta().name.clone().unwrap_or_default();
let secret_name = format!("{name}-cluster-id");
if let Some(existing) = secret_api.get_opt(&secret_name).await? {
return uuid_from_secret(&existing);
}
let id = Uuid::new_v4();
let secret = render_secret(parent, id)?;
secret_api.create(&PostParams::default(), &secret).await?;
Ok(id)
}
pub(crate) fn derive_status(
live: Option<&StatefulSet>,
desired_replicas: i32,
) -> (Option<i32>, Option<i32>, &'static str, String) {
let (replicas, ready_replicas) = live
.and_then(|s| s.status.as_ref())
.map_or((None, None), |st| (Some(st.replicas), st.ready_replicas));
let ready_count = ready_replicas.unwrap_or(0);
if ready_count == desired_replicas {
(
replicas,
ready_replicas,
"Available",
format!("{desired_replicas} broker(s) ready"),
)
} else if ready_count == 0 {
(
replicas,
ready_replicas,
"NoBrokersReady",
format!("0/{desired_replicas} brokers ready"),
)
} else {
(
replicas,
ready_replicas,
"PartiallyReady",
format!("{ready_count}/{desired_replicas} brokers ready"),
)
}
}
#[must_use]
pub fn config_hash(content: &str) -> String {
use std::fmt::Write;
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(content.as_bytes());
let digest = h.finalize();
let mut out = String::with_capacity(16);
for byte in digest.iter().take(8) {
write!(&mut out, "{byte:02x}").expect("writing to a String never fails");
}
out
}
#[must_use]
pub fn combined_config_hash(
spec: &crate::crd::KafkaSpec,
cluster_ca_cert_pem: Option<&str>,
metadata_version_pin: Option<&str>,
logging_filter: Option<&str>,
) -> String {
let config_part = spec
.config
.as_ref()
.map(|m| {
let mut s = String::new();
for (k, v) in m {
s.push_str(k);
s.push('=');
s.push_str(v);
s.push('\n');
}
s
})
.unwrap_or_default();
let intent = crate::controller::listeners::canonical_listener_intent(
&spec.listeners,
spec.inter_broker_listener_name.as_deref(),
);
let metrics_part = if spec.metrics_config.is_some() {
"metrics=on"
} else {
""
};
let ca_part = cluster_ca_cert_pem.unwrap_or("");
let metadata_part = metadata_version_pin.unwrap_or("");
let logging_part = logging_filter.unwrap_or("");
if intent.is_empty()
&& metrics_part.is_empty()
&& ca_part.is_empty()
&& metadata_part.is_empty()
&& logging_part.is_empty()
{
return config_hash(&config_part);
}
let mut buf = String::with_capacity(
config_part.len()
+ 5
+ intent.len()
+ metrics_part.len()
+ ca_part.len()
+ metadata_part.len()
+ logging_part.len(),
);
buf.push_str(&config_part);
buf.push('\x1F'); buf.push_str(&intent);
buf.push('\x1F');
buf.push_str(metrics_part);
buf.push('\x1F');
buf.push_str(ca_part);
buf.push('\x1F');
buf.push_str(metadata_part);
buf.push('\x1F');
buf.push_str(logging_part);
config_hash(&buf)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PoolRolloutState {
pub name: String,
pub current_hash: Option<String>,
pub ready: bool,
}
pub(crate) fn plan_rollout(pools: &[PoolRolloutState], desired: &str) -> Vec<(String, String)> {
let all_have_hash = pools.iter().all(|p| p.current_hash.is_some());
let distinct_non_desired: std::collections::BTreeSet<&str> = pools
.iter()
.filter_map(|p| p.current_hash.as_deref())
.filter(|h| *h != desired)
.collect();
if !all_have_hash || distinct_non_desired.len() > 1 {
return pools
.iter()
.map(|p| (p.name.clone(), desired.to_string()))
.collect();
}
let mut gate_open = true;
let mut out = Vec::with_capacity(pools.len());
for p in pools {
if gate_open {
let converged = p.current_hash.as_deref() == Some(desired) && p.ready;
out.push((p.name.clone(), desired.to_string()));
if !converged {
gate_open = false;
}
} else {
let keep = p
.current_hash
.clone()
.unwrap_or_else(|| desired.to_string());
out.push((p.name.clone(), keep));
}
}
out
}
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
pub(crate) fn parse_quantity(s: &str) -> Result<i128, &'static str> {
if s.is_empty() {
return Err("empty quantity string");
}
let (mantissa_str, multiplier): (&str, i128) = if let Some(rest) = s.strip_suffix("Ki") {
(rest, 1_024)
} else if let Some(rest) = s.strip_suffix("Mi") {
(rest, 1_024_i128.pow(2))
} else if let Some(rest) = s.strip_suffix("Gi") {
(rest, 1_024_i128.pow(3))
} else if let Some(rest) = s.strip_suffix("Ti") {
(rest, 1_024_i128.pow(4))
} else if let Some(rest) = s.strip_suffix("Pi") {
(rest, 1_024_i128.pow(5))
} else if let Some(rest) = s.strip_suffix("Ei") {
(rest, 1_024_i128.pow(6))
} else if let Some(rest) = s.strip_suffix('K') {
(rest, 1_000)
} else if let Some(rest) = s.strip_suffix('M') {
(rest, 1_000_000)
} else if let Some(rest) = s.strip_suffix('G') {
(rest, 1_000_000_000)
} else if let Some(rest) = s.strip_suffix('T') {
(rest, 1_000_000_000_000)
} else if let Some(rest) = s.strip_suffix('P') {
(rest, 1_000_000_000_000_000)
} else if let Some(rest) = s.strip_suffix('E') {
(rest, 1_000_000_000_000_000_000)
} else {
(s, 1)
};
if mantissa_str.is_empty() {
return Err("missing numeric mantissa before suffix");
}
if mantissa_str.contains(['e', 'E']) {
return Err("scientific notation not supported");
}
if mantissa_str.starts_with('-') {
return Err("negative quantity rejected");
}
let mantissa: f64 = mantissa_str
.parse()
.map_err(|_| "mantissa is not a valid number")?;
if !mantissa.is_finite() {
return Err("mantissa is not finite");
}
if mantissa <= 0.0 {
return Err("quantity must be strictly positive");
}
let bytes = mantissa * multiplier as f64;
if bytes > i128::MAX as f64 {
return Err("quantity overflows i128");
}
Ok(bytes as i128)
}
#[cfg(test)]
mod config_hash_tests {
use super::*;
use assert2::assert;
#[test]
fn config_hash_is_truncated_sha256_hex() {
let h = config_hash("hello");
assert!(h == "2cf24dba5fb0a30e");
assert!(h.len() == 16, "must fit within K8s 63-char label limit");
}
#[test]
fn config_hash_empty_string() {
let h = config_hash("");
assert!(h == "e3b0c44298fc1c14");
}
#[test]
fn config_hash_fits_in_kubernetes_label_value() {
let h = config_hash("any content at all");
assert!(h.len() <= 63, "hash {h} exceeds K8s label limit");
}
#[test]
fn combined_hash_unchanged_when_listeners_empty() {
use crate::crd::KafkaSpec;
let spec_a = KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: Some({
let mut m = std::collections::BTreeMap::new();
m.insert("log.retention.hours".into(), "24".into());
m
}),
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
};
let h = combined_config_hash(&spec_a, None, None, None);
let h_again = combined_config_hash(&spec_a, None, None, None);
assert!(h == h_again);
let config_only_form = "log.retention.hours=24\n";
assert!(
h == config_hash(config_only_form),
"combined hash for empty listeners must equal config_hash(spec.config)"
);
let mut spec_b = spec_a.clone();
spec_b.listeners = vec![crate::controller::listeners::synthesized_default_listener()];
spec_b.inter_broker_listener_name = Some("PLAIN".into());
let h_with_listener = combined_config_hash(&spec_b, None, None, None);
assert!(
h != h_with_listener,
"non-empty listener intent must change hash"
);
}
#[test]
fn combined_hash_flips_when_metrics_config_toggles() {
use crate::crd::{KafkaSpec, MetricsConfig, PodMonitorSpec};
let spec_off = KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
};
let h_off = combined_config_hash(&spec_off, None, None, None);
let mut spec_on = spec_off.clone();
spec_on.metrics_config = Some(MetricsConfig {
pod_monitor: Some(PodMonitorSpec::default()),
..Default::default()
});
let h_on = combined_config_hash(&spec_on, None, None, None);
assert!(
h_off != h_on,
"enabling metrics_config must bump the hash (triggers pool reconcile + StatefulSet re-render)"
);
let mut spec_on_diff_interval = spec_on.clone();
if let Some(cfg) = spec_on_diff_interval.metrics_config.as_mut() {
cfg.pod_monitor = Some(PodMonitorSpec {
interval: Some("60s".into()),
..Default::default()
});
}
assert!(
h_on == combined_config_hash(&spec_on_diff_interval, None, None, None),
"PodMonitor interval change must NOT roll the broker pod"
);
}
#[test]
fn combined_hash_changes_when_cluster_ca_cert_changes() {
let spec = crate::crd::KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
};
let h_none = combined_config_hash(&spec, None, None, None);
let h_a = combined_config_hash(
&spec,
Some("-----BEGIN CERTIFICATE-----\nA\n-----END CERTIFICATE-----\n"),
None,
None,
);
let h_b = combined_config_hash(
&spec,
Some("-----BEGIN CERTIFICATE-----\nB\n-----END CERTIFICATE-----\n"),
None,
None,
);
assert!(h_none != h_a, "absent vs present CA must differ");
assert!(h_a != h_b, "different CA PEM must differ");
}
#[test]
fn combined_hash_stable_under_broker_keystore_changes() {
let spec = crate::crd::KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
};
let h1 = combined_config_hash(&spec, Some("ca-pem"), None, None);
let h2 = combined_config_hash(&spec, Some("ca-pem"), None, None);
assert!(h1 == h2);
}
#[test]
fn configmap_has_one_toml_key_per_broker() {
use crate::controller::listeners::{AdvertisedAddress, synthesized_default_listener};
use crate::crd::KafkaSpec;
let mut k = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
k.meta_mut().namespace = Some("default".into());
k.meta_mut().uid = Some("uid".into());
let listeners = vec![synthesized_default_listener()];
let mut per_broker = std::collections::BTreeMap::new();
let mut addrs0 = std::collections::BTreeMap::new();
addrs0.insert(
"PLAIN".into(),
AdvertisedAddress {
host: "demo-0.svc".into(),
port: 9092,
},
);
let mut addrs1 = std::collections::BTreeMap::new();
addrs1.insert(
"PLAIN".into(),
AdvertisedAddress {
host: "demo-1.svc".into(),
port: 9092,
},
);
per_broker.insert(0i32, addrs0);
per_broker.insert(1i32, addrs1);
let cm = render_configmap(&k, &listeners, &per_broker, "PLAIN", None, None, None).unwrap();
let data = cm.data.unwrap();
assert!(data.contains_key("broker-0.toml"));
assert!(data.contains_key("broker-1.toml"));
assert!(data["broker-0.toml"].contains("demo-0.svc"));
assert!(data["broker-1.toml"].contains("demo-1.svc"));
assert!(!data.contains_key("broker.env"));
assert!(!data.contains_key("broker.properties"));
}
#[test]
fn combined_hash_changes_when_metadata_version_pin_set() {
use crate::crd::KafkaSpec;
let spec = KafkaSpec {
kafka_version: "3.7.0".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
};
let h_default = combined_config_hash(&spec, None, None, None);
assert!(h_default == config_hash(""));
let h_pin = combined_config_hash(&spec, None, Some("3.6"), None);
assert!(h_default != h_pin, "explicit metadata pin must change hash");
let h_pin2 = combined_config_hash(&spec, None, Some("3.7"), None);
assert!(h_pin != h_pin2, "different metadata pin must differ");
}
#[test]
fn configmap_never_injects_metadata_version_into_server_properties() {
use crate::controller::listeners::{AdvertisedAddress, synthesized_default_listener};
use crate::crd::KafkaSpec;
let mut k = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "3.7.0".into(),
metadata_version: Some("3.6".into()),
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
k.meta_mut().namespace = Some("default".into());
k.meta_mut().uid = Some("uid".into());
let listeners = vec![synthesized_default_listener()];
let mut per_broker = std::collections::BTreeMap::new();
let mut addrs0 = std::collections::BTreeMap::new();
addrs0.insert(
"PLAIN".into(),
AdvertisedAddress {
host: "demo-0.svc".into(),
port: 9092,
},
);
per_broker.insert(0i32, addrs0);
let cm = render_configmap(&k, &listeners, &per_broker, "PLAIN", None, None, None).unwrap();
let toml = &cm.data.unwrap()["broker-0.toml"];
assert!(
!toml.contains("metadata.version"),
"metadata.version must never be injected into broker config, got:\n{toml}"
);
}
}
#[cfg(test)]
mod rollout_tests {
use super::{PoolRolloutState, plan_rollout};
use assert2::assert;
fn st(name: &str, hash: Option<&str>, ready: bool) -> PoolRolloutState {
PoolRolloutState {
name: name.into(),
current_hash: hash.map(str::to_string),
ready,
}
}
fn targets(plan: &[(String, String)]) -> Vec<(&str, &str)> {
plan.iter().map(|(n, h)| (n.as_str(), h.as_str())).collect()
}
#[test]
fn bring_up_all_get_desired_when_no_hash() {
let pools = vec![
st("a", None, false),
st("b", None, false),
st("c", None, false),
];
let plan = plan_rollout(&pools, "H1");
assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1"), ("c", "H1")]);
}
#[test]
fn single_pool_first_reconcile_gets_desired() {
let pools = vec![st("only", None, false)];
assert!(targets(&plan_rollout(&pools, "H1")) == vec![("only", "H1")]);
}
#[test]
fn single_pool_roll_advances() {
let pools = vec![st("only", Some("H0"), true)];
assert!(targets(&plan_rollout(&pools, "H1")) == vec![("only", "H1")]);
}
#[test]
fn steady_state_all_desired_is_noop() {
let pools = vec![st("a", Some("H1"), true), st("b", Some("H1"), true)];
assert!(targets(&plan_rollout(&pools, "H1")) == vec![("a", "H1"), ("b", "H1")]);
}
#[test]
fn established_roll_advances_first_pool_only() {
let pools = vec![
st("a", Some("H0"), true),
st("b", Some("H0"), true),
st("c", Some("H0"), true),
];
let plan = plan_rollout(&pools, "H1");
assert!(targets(&plan) == vec![("a", "H1"), ("b", "H0"), ("c", "H0")]);
}
#[test]
fn established_roll_holds_later_pools_until_first_ready() {
let pools = vec![
st("a", Some("H1"), false),
st("b", Some("H0"), true),
st("c", Some("H0"), true),
];
let plan = plan_rollout(&pools, "H1");
assert!(targets(&plan) == vec![("a", "H1"), ("b", "H0"), ("c", "H0")]);
}
#[test]
fn established_roll_advances_next_after_prefix_converges() {
let pools = vec![
st("a", Some("H1"), true),
st("b", Some("H0"), true),
st("c", Some("H0"), true),
];
let plan = plan_rollout(&pools, "H1");
assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1"), ("c", "H0")]);
}
#[test]
fn messy_multiple_old_hashes_falls_back_to_all_desired() {
let pools = vec![st("a", Some("H0"), true), st("b", Some("HX"), true)];
let plan = plan_rollout(&pools, "H1");
assert!(targets(&plan) == vec![("a", "H1"), ("b", "H1")]);
}
}
#[cfg(test)]
mod parse_quantity_tests {
use super::parse_quantity;
use assert2::assert;
#[test]
fn quantity_parse_binary_suffixes() {
assert!(parse_quantity("1Ki").unwrap() == 1024);
assert!(parse_quantity("512Mi").unwrap() == 512 * 1024 * 1024);
assert!(parse_quantity("10Gi").unwrap() == 10 * 1024 * 1024 * 1024);
}
#[test]
fn quantity_parse_decimal_suffixes() {
assert!(parse_quantity("1K").unwrap() == 1_000);
assert!(parse_quantity("500M").unwrap() == 500_000_000);
assert!(parse_quantity("10G").unwrap() == 10_000_000_000);
}
#[test]
fn quantity_parse_decimal_mantissa() {
assert!(parse_quantity("1.5Gi").unwrap() == 1_610_612_736);
}
#[test]
fn quantity_parse_no_suffix_is_bytes() {
assert!(parse_quantity("1024").unwrap() == 1024);
}
#[test]
fn quantity_parse_rejects_garbage() {
assert!(parse_quantity("").is_err());
assert!(parse_quantity("banana").is_err());
assert!(parse_quantity("1.5x").is_err());
assert!(parse_quantity("Gi").is_err());
assert!(parse_quantity("1e3").is_err());
}
#[test]
fn quantity_parse_zero_and_negative_are_errors() {
assert!(parse_quantity("0").is_err());
assert!(parse_quantity("0Gi").is_err());
assert!(parse_quantity("-10Gi").is_err());
}
}