use std::collections::BTreeMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt as _;
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::core::v1::{PersistentVolumeClaim, ResourceRequirements};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use kube::api::Api;
use kube::runtime::controller::{Action, Controller};
use kube::runtime::watcher;
use kube::{Resource, ResourceExt as _};
use serde_json::json;
use crate::context::Context;
use crate::controller::common::{
self, APP_LABEL, BROKER_PORT, DEFAULT_BROKER_IMAGE, ReconcileError, apply_object,
common_labels, condition, derive_status, owner_ref,
};
use crate::crd::{
JbodVolume, Kafka, KafkaCondition, KafkaNodePool, KafkaNodePoolStatus, NodeRole, Storage,
};
pub(crate) const METRICS_PORT: i32 = 9404;
#[derive(Debug, thiserror::Error)]
pub enum PoolValidationError {
#[error("spec.roles must equal {{Controller, Broker}}; got {0:?}")]
RolesNotMixed(Vec<NodeRole>),
#[error("spec.replicas={0} is unsupported (only 1 allowed)")]
ReplicasNotOne(i32),
#[error("spec.nodeIdStart={0} is out of range 0..=999999")]
NodeIdOutOfRange(i32),
#[error("metadata.labels.\"crabka.io/cluster\" missing")]
MissingClusterLabel,
#[error("spec.storage.size={0:?} is not a valid positive Quantity ({1})")]
StorageSizeInvalid(String, &'static str),
#[error("spec.storage.type changed from {from} to {to}: immutable")]
StorageTypeChanged {
from: &'static str,
to: &'static str,
},
#[error("spec.storage.class changed from {from:?} to {to:?}: immutable")]
StorageClassChanged {
from: Option<String>,
to: Option<String>,
},
#[error("spec.storage.size decrease from {current} to {desired}: shrink not allowed")]
StorageShrinkNotAllowed { current: String, desired: String },
#[error(
"spec.storage.volumes must list at least 2 disks for Jbod (use PersistentClaim for one disk)"
)]
JbodNeedsTwoVolumes(usize),
#[error("spec.storage.volumes has a duplicate id {0}")]
JbodDuplicateVolumeId(i32),
#[error("spec.storage.volumes set changed: adding/removing JBOD disks is not yet supported")]
JbodVolumesImmutable,
}
pub(crate) fn validate(pool: &KafkaNodePool) -> Result<(), PoolValidationError> {
let roles: HashSet<NodeRole> = pool.spec.roles.iter().copied().collect();
let expected: HashSet<NodeRole> = [NodeRole::Controller, NodeRole::Broker]
.into_iter()
.collect();
if roles != expected {
return Err(PoolValidationError::RolesNotMixed(pool.spec.roles.clone()));
}
if pool.spec.replicas != 1 {
return Err(PoolValidationError::ReplicasNotOne(pool.spec.replicas));
}
if !(0..=999_999).contains(&pool.spec.node_id_start) {
return Err(PoolValidationError::NodeIdOutOfRange(
pool.spec.node_id_start,
));
}
match pool.spec.storage.as_ref() {
Some(Storage::PersistentClaim(pc)) => {
common::parse_quantity(&pc.size)
.map_err(|why| PoolValidationError::StorageSizeInvalid(pc.size.clone(), why))?;
}
Some(Storage::Jbod(j)) => {
if j.volumes.len() < 2 {
return Err(PoolValidationError::JbodNeedsTwoVolumes(j.volumes.len()));
}
let mut seen = HashSet::new();
for v in &j.volumes {
if !seen.insert(v.id) {
return Err(PoolValidationError::JbodDuplicateVolumeId(v.id));
}
common::parse_quantity(&v.size)
.map_err(|why| PoolValidationError::StorageSizeInvalid(v.size.clone(), why))?;
}
}
None | Some(Storage::Ephemeral) => {}
}
Ok(())
}
fn jbod_volumes_sorted(storage: Option<&Storage>) -> Vec<JbodVolume> {
match storage {
Some(Storage::Jbod(j)) => {
let mut v = j.volumes.clone();
v.sort_by_key(|vol| vol.id);
v
}
_ => Vec::new(),
}
}
fn jbod_mount(volume_id: i32, is_primary: bool) -> (String, String) {
if is_primary {
("data".to_string(), "/var/lib/crabka/data".to_string())
} else {
(
format!("data-{volume_id}"),
format!("/var/lib/crabka/data-{volume_id}"),
)
}
}
fn jbod_extra_mounts(storage: Option<&Storage>) -> Vec<(String, String)> {
jbod_volumes_sorted(storage)
.iter()
.enumerate()
.filter(|(i, _)| *i != 0)
.map(|(_, v)| jbod_mount(v.id, false))
.collect()
}
const INIT_SCRIPT: &str = "set -eu\n\
ORDINAL=\"${HOSTNAME##*-}\"\n\
NODE_ID=$((NODE_ID_START + ORDINAL))\n\
mkdir -p /var/lib/crabka/data\n\
if [ ! -f /var/lib/crabka/data/.formatted ]; then\n\
/usr/bin/crabka format --log-dir /var/lib/crabka/data --cluster-id \"$CRABKA_CLUSTER_ID\" --release-version \"$CRABKA_METADATA_VERSION\"\n\
touch /var/lib/crabka/data/.formatted\n\
fi\n\
printf '%s' \"$NODE_ID\" > /var/lib/crabka/data/.node-id\n";
const MAIN_SCRIPT: &str = "set -eu\n\
NODE_ID=\"$(cat /var/lib/crabka/data/.node-id)\"\n\
cp /etc/crabka/config/broker-${NODE_ID}.toml /run/crabka/broker.toml\n\
exec /usr/bin/crabka-broker \\\n --config-file=/run/crabka/broker.toml \\\n --broker-id=\"${NODE_ID}\"\n";
fn build_main_script(metrics_enabled: bool) -> String {
if !metrics_enabled {
return MAIN_SCRIPT.to_string();
}
"set -eu\n\
NODE_ID=\"$(cat /var/lib/crabka/data/.node-id)\"\n\
cp /etc/crabka/config/broker-${NODE_ID}.toml /run/crabka/broker.toml\n\
exec /usr/bin/crabka-broker \\\n \
--config-file=/run/crabka/broker.toml \\\n \
--broker-id=\"${NODE_ID}\" \\\n \
--metrics-listen-addr=0.0.0.0:9404\n"
.to_string()
}
fn render_init_container(
broker_image: &str,
secret_name: &str,
node_id_start: i32,
metadata_version: &str,
) -> serde_json::Value {
json!({
"name": "format",
"image": broker_image,
"command": ["/bin/sh", "-c"],
"args": [INIT_SCRIPT],
"env": [
{ "name": "NODE_ID_START", "value": node_id_start.to_string() },
{ "name": "CRABKA_CLUSTER_ID", "valueFrom": { "secretKeyRef": { "name": secret_name, "key": "clusterId" } } },
{ "name": "CRABKA_METADATA_VERSION", "value": metadata_version.to_string() }
],
"volumeMounts": [{ "name": "data", "mountPath": "/var/lib/crabka/data" }],
"securityContext": {
"allowPrivilegeEscalation": false,
"readOnlyRootFilesystem": true,
"capabilities": { "drop": ["ALL"] }
}
})
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::fn_params_excessive_bools)] #[allow(clippy::too_many_lines)] fn render_broker_container(
broker_image: &str,
secret_name: &str,
cm_name: &str,
resources: &ResourceRequirements,
metrics_enabled: bool,
logging_enabled: bool,
jbod_extra_mounts: &[(String, String)],
oauth_jwks_trust_mount: Option<&str>,
oauth_introspection_mount_path: Option<&str>,
gssapi_keytab: bool,
krb5_conf: bool,
delegation_token: Option<&crate::crd::kafka::DelegationTokenConfig>,
tiered_storage: Option<&crate::crd::kafka::TieredStorage>,
tracing: Option<&crate::crd::kafka::Tracing>,
) -> serde_json::Value {
use crate::crd::kafka::TieredStorageType;
let tier_storage_local = matches!(
tiered_storage.map(|t| t.kind),
Some(TieredStorageType::Local)
);
let mut ports = vec![json!({
"containerPort": BROKER_PORT, "name": "kafka-internal", "protocol": "TCP"
})];
if metrics_enabled {
ports.push(json!({
"containerPort": METRICS_PORT, "name": "metrics", "protocol": "TCP"
}));
}
let mut env = vec![
json!({ "name": "POD_NAME", "valueFrom": { "fieldRef": { "fieldPath": "metadata.name" } } }),
json!({ "name": "POD_NAMESPACE", "valueFrom": { "fieldRef": { "fieldPath": "metadata.namespace" } } }),
json!({ "name": "CRABKA_CLUSTER_ID", "valueFrom": { "secretKeyRef": { "name": secret_name, "key": "clusterId" } } }),
];
if logging_enabled {
env.push(json!({
"name": "RUST_LOG",
"valueFrom": {
"configMapKeyRef": { "name": cm_name, "key": "rust.log", "optional": true }
}
}));
}
if !jbod_extra_mounts.is_empty() {
let value = jbod_extra_mounts
.iter()
.map(|(_, path)| path.as_str())
.collect::<Vec<_>>()
.join(",");
env.push(json!({ "name": "CRABKA_EXTRA_LOG_DIRS", "value": value }));
}
if let Some(dt) = delegation_token {
let key = dt.secret_key_ref.key.as_deref().unwrap_or("secret-key");
env.push(json!({
"name": "CRABKA_DELEGATION_TOKEN_SECRET_KEY",
"valueFrom": {
"secretKeyRef": {
"name": dt.secret_key_ref.name,
"key": key,
}
}
}));
}
if let Some(s3) = tiered_storage
.filter(|t| matches!(t.kind, TieredStorageType::S3))
.and_then(|t| t.s3.as_ref())
&& let Some(creds) = &s3.credentials
{
let ak_key = creds
.access_key_id
.key
.as_deref()
.unwrap_or("access-key-id");
let sk_key = creds
.secret_access_key
.key
.as_deref()
.unwrap_or("secret-access-key");
env.push(json!({
"name": "AWS_ACCESS_KEY_ID",
"valueFrom": {
"secretKeyRef": {
"name": creds.access_key_id.name,
"key": ak_key,
}
}
}));
env.push(json!({
"name": "AWS_SECRET_ACCESS_KEY",
"valueFrom": {
"secretKeyRef": {
"name": creds.secret_access_key.name,
"key": sk_key,
}
}
}));
}
if let Some(t) = tracing
&& let crate::crd::kafka::TracingType::Otlp = t.kind
&& let Some(otlp) = t.otlp.as_ref()
{
env.push(json!({ "name": "CRABKA_OTLP_ENABLED", "value": "true" }));
env.push(json!({ "name": "CRABKA_OTLP_ENDPOINT", "value": otlp.endpoint }));
if let Some(p) = otlp.protocol {
env.push(json!({
"name": "CRABKA_OTLP_PROTOCOL",
"value": p.as_env_value(),
}));
}
if let Some(r) = otlp.sample_ratio {
env.push(json!({
"name": "CRABKA_OTLP_SAMPLE_RATIO",
"value": r.to_string(),
}));
}
if let Some(name) = otlp.service_name.as_deref() {
env.push(json!({ "name": "OTEL_SERVICE_NAME", "value": name }));
}
if let Some(t) = otlp.timeout_secs {
env.push(json!({
"name": "CRABKA_OTLP_TIMEOUT_SECS",
"value": t.to_string(),
}));
}
}
let main_script = build_main_script(metrics_enabled);
let mut volume_mounts = vec![
json!({ "name": "data", "mountPath": "/var/lib/crabka/data" }),
json!({ "name": "broker-config", "mountPath": "/etc/crabka/config", "readOnly": true }),
json!({ "name": "broker-runtime", "mountPath": "/run/crabka" }),
json!({ "name": "cluster-ca-cert", "mountPath": "/etc/crabka/cluster-ca", "readOnly": true }),
json!({ "name": "broker-tls", "mountPath": "/etc/crabka/broker-tls", "readOnly": true }),
json!({ "name": "clients-ca-cert", "mountPath": "/etc/crabka/clients-ca", "readOnly": true }),
];
for (name, path) in jbod_extra_mounts {
volume_mounts.push(json!({ "name": name, "mountPath": path }));
}
if let Some(mount_path) = oauth_jwks_trust_mount {
volume_mounts.push(json!({
"name": "oauth-jwks-trust",
"mountPath": mount_path,
"readOnly": true,
}));
}
if let Some(mount_path) = oauth_introspection_mount_path {
volume_mounts.push(json!({
"name": "oauth-introspection-secret",
"mountPath": mount_path,
"readOnly": true,
}));
}
if gssapi_keytab {
volume_mounts.push(json!({
"name": "gssapi-keytab",
"mountPath": crate::controller::listeners::GSSAPI_KEYTAB_DIR,
"readOnly": true,
}));
}
if krb5_conf {
volume_mounts.push(json!({
"name": "krb5-conf",
"mountPath": "/etc/crabka/krb5",
"readOnly": true,
}));
env.push(json!({ "name": "KRB5_CONFIG", "value": "/etc/crabka/krb5/krb5.conf" }));
}
if tier_storage_local {
volume_mounts.push(json!({
"name": "tier-storage",
"mountPath": crate::controller::listeners::TIER_STORAGE_PATH,
}));
}
json!({
"name": "broker",
"image": broker_image,
"command": ["/bin/sh", "-c"],
"args": [main_script],
"env": env,
"ports": ports,
"readinessProbe": {
"tcpSocket": { "port": BROKER_PORT },
"initialDelaySeconds": 2,
"periodSeconds": 5
},
"livenessProbe": {
"tcpSocket": { "port": BROKER_PORT },
"initialDelaySeconds": 30,
"periodSeconds": 10
},
"resources": resources,
"volumeMounts": volume_mounts,
"securityContext": {
"allowPrivilegeEscalation": false,
"readOnlyRootFilesystem": true,
"capabilities": { "drop": ["ALL"] }
}
})
}
fn pvc_template(
name: &str,
size: &str,
class: Option<&str>,
pod_labels: &BTreeMap<String, String>,
) -> serde_json::Value {
let mut template = json!({
"metadata": {
"name": name,
"labels": pod_labels,
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {
"requests": { "storage": size }
}
}
});
if let Some(class) = class {
template["spec"]["storageClassName"] = serde_json::Value::String(class.to_string());
}
template
}
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_arguments)] fn render_storage(
storage: Option<&Storage>,
pod_labels: &BTreeMap<String, String>,
parent_name: &str,
oauth_jwks_trust_secret: Option<&str>,
oauth_introspection_mount: Option<&crate::controller::kafka::OauthIntrospectionMount>,
gssapi_keytab: Option<&crate::controller::kafka::GssapiKeytabMount>,
krb5_conf: Option<(&str, &str)>,
tier_storage_local: bool,
tier_storage_persistence: Option<&crate::crd::kafka::TieredStoragePersistence>,
) -> (serde_json::Value, Vec<serde_json::Value>) {
let broker_config_vol = json!({
"name": "broker-config",
"configMap": { "name": format!("{parent_name}-broker-config") }
});
let runtime_vol = json!({ "name": "broker-runtime", "emptyDir": {} });
let cluster_ca_cert_vol = json!({
"name": "cluster-ca-cert",
"secret": {
"secretName": format!("{parent_name}-cluster-ca-cert"),
"defaultMode": 0o400_i32,
}
});
let broker_tls_vol = json!({
"name": "broker-tls",
"secret": {
"secretName": format!("{parent_name}-kafka-brokers"),
"defaultMode": 0o400_i32,
}
});
let clients_ca_cert_vol = json!({
"name": "clients-ca-cert",
"secret": {
"secretName": format!("{parent_name}-clients-ca-cert"),
"defaultMode": 0o400_i32,
}
});
let (mut volumes, mut templates) = match storage {
None | Some(Storage::Ephemeral) => {
let volumes = json!([
{ "name": "data", "emptyDir": {} },
broker_config_vol,
runtime_vol,
cluster_ca_cert_vol,
broker_tls_vol,
clients_ca_cert_vol,
]);
(volumes, Vec::new())
}
Some(Storage::PersistentClaim(pc)) => {
let template = pvc_template("data", &pc.size, pc.class.as_deref(), pod_labels);
(
json!([
broker_config_vol,
runtime_vol,
cluster_ca_cert_vol,
broker_tls_vol,
clients_ca_cert_vol,
]),
vec![template],
)
}
Some(Storage::Jbod(_)) => {
let jbod_vols = jbod_volumes_sorted(storage);
let templates = jbod_vols
.iter()
.enumerate()
.map(|(i, v)| {
let (name, _) = jbod_mount(v.id, i == 0);
pvc_template(&name, &v.size, v.class.as_deref(), pod_labels)
})
.collect();
(
json!([
broker_config_vol,
runtime_vol,
cluster_ca_cert_vol,
broker_tls_vol,
clients_ca_cert_vol,
]),
templates,
)
}
};
if let Some(secret_name) = oauth_jwks_trust_secret {
volumes
.as_array_mut()
.expect("render_storage built `volumes` via json!([...])")
.push(json!({
"name": "oauth-jwks-trust",
"secret": {
"secretName": secret_name,
"defaultMode": 0o400_i32,
}
}));
}
if let Some(mount) = oauth_introspection_mount {
volumes
.as_array_mut()
.expect("render_storage built `volumes` via json!([...])")
.push(json!({
"name": "oauth-introspection-secret",
"secret": {
"secretName": mount.secret_name,
"items": [{ "key": mount.key, "path": "client-secret" }],
"defaultMode": 0o400_i32,
}
}));
}
if let Some(m) = gssapi_keytab {
volumes
.as_array_mut()
.expect("render_storage built `volumes` via json!([...])")
.push(json!({
"name": "gssapi-keytab",
"secret": {
"secretName": m.secret_name,
"items": [{ "key": m.key, "path": "keytab" }],
"defaultMode": 0o400_i32,
}
}));
}
if let Some((secret_name, key)) = krb5_conf {
volumes
.as_array_mut()
.expect("render_storage built `volumes` via json!([...])")
.push(json!({
"name": "krb5-conf",
"secret": {
"secretName": secret_name,
"items": [{ "key": key, "path": "krb5.conf" }],
"defaultMode": 0o400_i32,
}
}));
}
if tier_storage_local {
if let Some(p) = tier_storage_persistence {
templates.push(pvc_template(
"tier-storage",
&p.size,
p.class.as_deref(),
pod_labels,
));
} else {
volumes
.as_array_mut()
.expect("render_storage built `volumes` via json!([...])")
.push(json!({
"name": "tier-storage",
"emptyDir": {}
}));
}
}
(volumes, templates)
}
fn render_pvc_retention_policy(
storage: Option<&Storage>,
tier_persistence: Option<&crate::crd::kafka::TieredStoragePersistence>,
) -> Option<serde_json::Value> {
let delete_claim = match storage {
Some(Storage::PersistentClaim(pc)) => pc.delete_claim,
Some(Storage::Jbod(j)) => j.delete_claim,
_ => match tier_persistence {
Some(p) => p.delete_claim,
None => return None,
},
};
Some(json!({
"whenDeleted": if delete_claim { "Delete" } else { "Retain" },
"whenScaled": "Retain",
}))
}
fn pod_spec_with_data_volume(
mut pod_spec: serde_json::Value,
pod_volumes: serde_json::Value,
) -> serde_json::Value {
pod_spec["volumes"] = pod_volumes;
pod_spec
}
#[allow(clippy::too_many_lines)] pub(crate) fn render_statefulset(
parent: &Kafka,
pool: &KafkaNodePool,
broker_image: &str,
) -> Result<StatefulSet, ReconcileError> {
let parent_name = parent.meta().name.clone().unwrap_or_default();
let pool_name = pool.meta().name.clone().unwrap_or_default();
let namespace = pool.meta().namespace.clone().unwrap_or_default();
let labels = common_labels(&parent_name, &parent.spec.kafka_version, Some(&pool_name));
let mut selector: BTreeMap<String, String> = BTreeMap::new();
selector.insert("app.kubernetes.io/name".into(), APP_LABEL.into());
selector.insert("app.kubernetes.io/instance".into(), parent_name.clone());
selector.insert("crabka.io/pool".into(), pool_name.clone());
let resources = pool
.spec
.resources
.clone()
.unwrap_or_else(default_resources);
let secret_name = format!("{parent_name}-cluster-id");
let service_name = format!("{parent_name}-broker-headless");
let sts_name = format!("{parent_name}-{pool_name}");
let chosen = parent
.status
.as_ref()
.and_then(|s| s.metadata_version.as_deref())
.or(parent.spec.metadata_version.as_deref())
.unwrap_or(&parent.spec.kafka_version);
let normalized = crate::version::KafkaVersion::parse(chosen)
.map_or_else(|_| chosen.to_string(), |v| v.short());
let resolved_metadata_version =
if crabka_metadata::metadata_version::from_version_string(&normalized).is_some() {
normalized
} else {
crabka_metadata::metadata_version::from_feature_level(
crabka_metadata::metadata_version::METADATA_VERSION_MAX,
)
.expect("MAX level is in the table")
.short()
.to_string()
};
let init = render_init_container(
broker_image,
&secret_name,
pool.spec.node_id_start,
&resolved_metadata_version,
);
let metrics_enabled = parent.spec.metrics_config.is_some();
let logging_enabled = parent.spec.logging.is_some();
let cm_name = format!("{parent_name}-broker-config");
let jbod_extra = jbod_extra_mounts(pool.spec.storage.as_ref());
let oauth_jwks_trust_secret = crate::controller::kafka::oauth_jwks_trust_secret_name(parent);
let oauth_jwks_trust_mount = oauth_jwks_trust_secret
.as_deref()
.map(|_| "/etc/crabka/oauth-jwks-trust");
let oauth_introspection_mount =
crate::controller::kafka::oauth_introspection_secret_mount(parent);
let oauth_introspection_mount_path = oauth_introspection_mount
.as_ref()
.map(|_| "/etc/crabka/oauth-introspection");
let gssapi_keytab_mount = crate::controller::kafka::gssapi_keytab_mount(parent);
let krb5_conf_mount = crate::controller::kafka::krb5_conf_mount(parent);
let tiered_storage = parent.spec.tiered_storage.as_ref();
let tier_storage_local = matches!(
tiered_storage.map(|t| t.kind),
Some(crate::crd::kafka::TieredStorageType::Local)
);
let main = render_broker_container(
broker_image,
&secret_name,
&cm_name,
&resources,
metrics_enabled,
logging_enabled,
&jbod_extra,
oauth_jwks_trust_mount,
oauth_introspection_mount_path,
gssapi_keytab_mount.is_some(),
krb5_conf_mount.is_some(),
parent.spec.delegation_token.as_ref(),
tiered_storage,
parent.spec.tracing.as_ref(),
);
let mut pod_labels = labels.clone();
let mut pod_annotations: BTreeMap<String, String> = BTreeMap::new();
if let Some(meta) = pool
.spec
.template
.as_ref()
.and_then(|t| t.metadata.as_ref())
{
for (k, v) in &meta.labels {
pod_labels.entry(k.clone()).or_insert_with(|| v.clone());
}
for (k, v) in &meta.annotations {
pod_annotations.insert(k.clone(), v.clone());
}
}
if let Some(hash) = pool
.metadata
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/config-hash"))
{
pod_annotations.insert("crabka.io/config-hash".into(), hash.clone());
}
let mut template_meta = json!({ "labels": pod_labels });
if !pod_annotations.is_empty() {
template_meta["annotations"] = serde_json::to_value(&pod_annotations)?;
}
let mut pod_spec = json!({
"securityContext": {
"runAsNonRoot": true,
"runAsUser": 65532,
"fsGroup": 65532,
"seccompProfile": { "type": "RuntimeDefault" }
},
"initContainers": [init],
"containers": [main],
"volumes": [{ "name": "data", "emptyDir": {} }],
});
if let Some(tpl) = pool.spec.template.as_ref() {
if let Some(affinity) = tpl.affinity.as_ref() {
pod_spec["affinity"] = serde_json::to_value(affinity)?;
}
if !tpl.tolerations.is_empty() {
pod_spec["tolerations"] = serde_json::to_value(&tpl.tolerations)?;
}
if let Some(ns) = tpl.node_selector.as_ref()
&& !ns.is_empty()
{
pod_spec["nodeSelector"] = serde_json::to_value(ns)?;
}
}
let tier_storage_persistence = tiered_storage.and_then(|t| t.persistence.as_ref());
if let Some(tp) = tier_storage_persistence {
let pool_data_delete_claim = match pool.spec.storage.as_ref() {
Some(Storage::PersistentClaim(pc)) => Some(pc.delete_claim),
Some(Storage::Jbod(j)) => Some(j.delete_claim),
_ => None,
};
if let Some(dc) = pool_data_delete_claim
&& dc != tp.delete_claim
{
return Err(ReconcileError::TieredStorageInvalid(format!(
"tiered storage persistence.deleteClaim={} but pool '{}' storage.deleteClaim={}; \
K8s StatefulSets have a single set-wide PVC retention policy — these must match",
tp.delete_claim, pool_name, dc,
)));
}
}
let (pod_volumes, volume_claim_templates) = render_storage(
pool.spec.storage.as_ref(),
&pod_labels,
&parent_name,
oauth_jwks_trust_secret.as_deref(),
oauth_introspection_mount.as_ref(),
gssapi_keytab_mount.as_ref(),
krb5_conf_mount
.as_ref()
.map(|(s, k)| (s.as_str(), k.as_str())),
tier_storage_local,
tier_storage_persistence,
);
let retention_policy =
render_pvc_retention_policy(pool.spec.storage.as_ref(), tier_storage_persistence);
let mut sts_spec = json!({
"serviceName": service_name,
"replicas": pool.spec.replicas,
"podManagementPolicy": "Parallel",
"selector": { "matchLabels": selector },
"template": {
"metadata": template_meta,
"spec": pod_spec_with_data_volume(pod_spec, pod_volumes),
}
});
if !volume_claim_templates.is_empty() {
sts_spec["volumeClaimTemplates"] = serde_json::Value::Array(volume_claim_templates);
}
if let Some(policy) = retention_policy {
sts_spec["persistentVolumeClaimRetentionPolicy"] = policy;
}
let sts: StatefulSet = serde_json::from_value(json!({
"metadata": {
"name": sts_name,
"namespace": namespace,
"labels": labels,
"ownerReferences": [owner_ref::<KafkaNodePool>(pool)?],
},
"spec": sts_spec,
}))?;
Ok(sts)
}
fn default_resources() -> ResourceRequirements {
let mut requests = BTreeMap::new();
requests.insert("cpu".into(), Quantity("100m".into()));
requests.insert("memory".into(), Quantity("256Mi".into()));
let mut limits = BTreeMap::new();
limits.insert("cpu".into(), Quantity("1000m".into()));
limits.insert("memory".into(), Quantity("1Gi".into()));
ResourceRequirements {
requests: Some(requests),
limits: Some(limits),
..Default::default()
}
}
fn validate_storage_change(
desired: Option<&Storage>,
observed: Option<&[PersistentVolumeClaim]>,
) -> Result<(), PoolValidationError> {
let Some(observed) = observed else {
return Ok(());
};
let desired_kind = storage_kind(desired);
let observed_kind = observed_storage_kind(observed);
if desired_kind != observed_kind {
return Err(PoolValidationError::StorageTypeChanged {
from: observed_kind,
to: desired_kind,
});
}
match desired {
Some(Storage::PersistentClaim(desired_pc)) => {
let (observed_size, observed_class) = size_class_from_pvc(&observed[0]);
check_class_and_shrink(
&desired_pc.size,
desired_pc.class.as_deref(),
&observed_size,
observed_class.as_deref(),
)
}
Some(Storage::Jbod(desired_jbod)) => validate_jbod_change(desired_jbod, observed),
None | Some(Storage::Ephemeral) => Ok(()),
}
}
fn observed_storage_kind(templates: &[PersistentVolumeClaim]) -> &'static str {
match templates.len() {
0 => "Ephemeral",
1 => "PersistentClaim",
_ => "Jbod",
}
}
fn size_class_from_pvc(pvc: &PersistentVolumeClaim) -> (String, Option<String>) {
let Some(spec) = pvc.spec.as_ref() else {
return (String::new(), None);
};
let size = spec
.resources
.as_ref()
.and_then(|r| r.requests.as_ref())
.and_then(|m| m.get("storage"))
.map(|q| q.0.clone())
.unwrap_or_default();
(size, spec.storage_class_name.clone())
}
fn check_class_and_shrink(
desired_size: &str,
desired_class: Option<&str>,
observed_size: &str,
observed_class: Option<&str>,
) -> Result<(), PoolValidationError> {
if desired_class != observed_class {
return Err(PoolValidationError::StorageClassChanged {
from: observed_class.map(String::from),
to: desired_class.map(String::from),
});
}
let observed_bytes = common::parse_quantity(observed_size).unwrap_or(0);
let desired_bytes = common::parse_quantity(desired_size).unwrap_or(0);
if desired_bytes < observed_bytes {
return Err(PoolValidationError::StorageShrinkNotAllowed {
current: observed_size.to_string(),
desired: desired_size.to_string(),
});
}
Ok(())
}
fn validate_jbod_change(
desired: &crate::crd::JbodSpec,
observed: &[PersistentVolumeClaim],
) -> Result<(), PoolValidationError> {
use std::collections::{BTreeMap, BTreeSet};
let mut observed_primary: Option<(String, Option<String>)> = None;
let mut observed_extra: BTreeMap<i32, (String, Option<String>)> = BTreeMap::new();
for t in observed {
let name = t.metadata.name.as_deref().unwrap_or_default();
if name == "data" {
observed_primary = Some(size_class_from_pvc(t));
} else if let Some(n) = name
.strip_prefix("data-")
.and_then(|s| s.parse::<i32>().ok())
{
observed_extra.insert(n, size_class_from_pvc(t));
}
}
let mut volumes = desired.volumes.clone();
volumes.sort_by_key(|v| v.id);
let Some((primary, extras)) = volumes.split_first() else {
return Ok(()); };
let desired_extra_ids: BTreeSet<i32> = extras.iter().map(|v| v.id).collect();
let observed_extra_ids: BTreeSet<i32> = observed_extra.keys().copied().collect();
if desired_extra_ids != observed_extra_ids {
return Err(PoolValidationError::JbodVolumesImmutable);
}
if let Some((obs_size, obs_class)) = observed_primary.as_ref() {
check_class_and_shrink(
&primary.size,
primary.class.as_deref(),
obs_size,
obs_class.as_deref(),
)?;
}
for v in extras {
if let Some((obs_size, obs_class)) = observed_extra.get(&v.id) {
check_class_and_shrink(&v.size, v.class.as_deref(), obs_size, obs_class.as_deref())?;
}
}
Ok(())
}
fn storage_kind(s: Option<&Storage>) -> &'static str {
match s {
None | Some(Storage::Ephemeral) => "Ephemeral",
Some(Storage::PersistentClaim(_)) => "PersistentClaim",
Some(Storage::Jbod(_)) => "Jbod",
}
}
fn condition_for_validation_error(err: &PoolValidationError) -> KafkaCondition {
let (reason, message) = match err {
PoolValidationError::RolesNotMixed(roles) => (
"RolesNotMixed",
format!("spec.roles must equal {{Controller, Broker}}; got {roles:?}"),
),
PoolValidationError::ReplicasNotOne(n) => (
"UnsupportedReplicaCount",
format!("spec.replicas={n} is unsupported (only 1 allowed)"),
),
PoolValidationError::NodeIdOutOfRange(n) => (
"NodeIdOutOfRange",
format!("spec.nodeIdStart={n} is out of range 0..=999999"),
),
PoolValidationError::MissingClusterLabel => (
"MissingClusterLabel",
"metadata.labels.\"crabka.io/cluster\" missing".to_string(),
),
PoolValidationError::StorageSizeInvalid(value, why) => (
"StorageSizeInvalid",
format!("spec.storage.size={value:?} ({why})"),
),
PoolValidationError::StorageTypeChanged { from, to } => (
"StorageImmutable",
format!("spec.storage.type changed from {from} to {to}"),
),
PoolValidationError::StorageClassChanged { from, to } => (
"StorageImmutable",
format!("spec.storage.class changed from {from:?} to {to:?}"),
),
PoolValidationError::StorageShrinkNotAllowed { current, desired } => (
"StorageImmutable",
format!("spec.storage.size {current} -> {desired} (shrink rejected)"),
),
PoolValidationError::JbodNeedsTwoVolumes(n) => (
"JbodNeedsTwoVolumes",
format!(
"spec.storage.volumes has {n} disk(s); Jbod needs >= 2 (use PersistentClaim for one)"
),
),
PoolValidationError::JbodDuplicateVolumeId(id) => (
"JbodDuplicateVolumeId",
format!("spec.storage.volumes has a duplicate id {id}"),
),
PoolValidationError::JbodVolumesImmutable => (
"StorageImmutable",
"spec.storage.volumes set changed: adding/removing JBOD disks is not yet supported"
.to_string(),
),
};
condition("Ready", "False", reason, &message)
}
async fn patch_status_for_pool(
pool_api: &Api<KafkaNodePool>,
name: &str,
cond: KafkaCondition,
) -> Result<(), ReconcileError> {
let status = KafkaNodePoolStatus {
conditions: vec![cond],
replicas: None,
ready_replicas: None,
};
common::patch_status::<KafkaNodePool, KafkaNodePoolStatus>(pool_api, name, status).await
}
#[derive(Debug)]
enum VersionGate {
Cleared,
Blocked(KafkaCondition),
}
fn version_gate(parent: &Kafka) -> VersionGate {
let status = parent.status.as_ref();
let version_cond =
status.and_then(|s| s.conditions.iter().find(|c| c.type_ == "KafkaVersionValid"));
let finalized = status.and_then(|s| s.metadata_version.as_deref());
let cleared = finalized.is_some() || version_cond.is_some_and(|c| c.status == "True");
if cleared {
return VersionGate::Cleared;
}
let cond = match version_cond {
Some(c) => condition(
"Ready",
"False",
"KafkaVersionInvalid",
&format!(
"refusing to format brokers: parent Kafka '{}' KafkaVersionValid={} ({}): {}",
parent.name_any(),
c.status,
c.reason,
c.message
),
),
None => condition(
"Ready",
"False",
"WaitingForVersionValidation",
&format!(
"waiting for parent Kafka '{}' to publish a KafkaVersionValid verdict before formatting brokers",
parent.name_any()
),
),
};
VersionGate::Blocked(cond)
}
pub async fn run(ctx: Context) -> anyhow::Result<()> {
let api: Api<KafkaNodePool> = Api::all(ctx.client.clone());
let sts_api: Api<StatefulSet> = Api::all(ctx.client.clone());
Controller::new(api, watcher::Config::default())
.owns(sts_api, watcher::Config::default())
.run(reconcile, error_policy, Arc::new(ctx))
.for_each(|res| async move {
match res {
Ok((obj, _)) => tracing::debug!(?obj, "pool reconciled"),
Err(e) => tracing::warn!(error = %e, "pool reconcile error"),
}
})
.await;
Ok(())
}
pub async fn reconcile(
pool: Arc<KafkaNodePool>,
ctx: Arc<Context>,
) -> Result<Action, ReconcileError> {
let ns = pool.namespace().unwrap_or_else(|| "default".into());
let name = pool.name_any();
let pool_api: Api<KafkaNodePool> = Api::namespaced(ctx.client.clone(), &ns);
if let Err(e) = validate(&pool) {
let cond = condition_for_validation_error(&e);
patch_status_for_pool(&pool_api, &name, cond).await?;
return Ok(Action::await_change());
}
let Some(kafka_name) = pool
.meta()
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/cluster").cloned())
else {
let cond = condition(
"Ready",
"False",
"MissingClusterLabel",
"metadata.labels.\"crabka.io/cluster\" is required to link a pool to its parent Kafka",
);
patch_status_for_pool(&pool_api, &name, cond).await?;
return Ok(Action::await_change());
};
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
let Some(parent) = kafka_api.get_opt(&kafka_name).await? else {
let cond = condition(
"Ready",
"False",
"ParentNotFound",
&format!("Kafka '{kafka_name}' not found in namespace '{ns}'"),
);
patch_status_for_pool(&pool_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
};
if let VersionGate::Blocked(cond) = version_gate(&parent) {
patch_status_for_pool(&pool_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
}
let image = pool
.spec
.image
.clone()
.or_else(|| ctx.config.default_broker_image.clone())
.unwrap_or_else(|| DEFAULT_BROKER_IMAGE.into());
let sts_api: Api<StatefulSet> = Api::namespaced(ctx.client.clone(), &ns);
let sts_name = format!("{kafka_name}-{name}");
let observed_sts = sts_api.get_opt(&sts_name).await?;
let observed_pvc_templates: Option<Vec<PersistentVolumeClaim>> =
observed_sts.as_ref().map(|s| {
s.spec
.as_ref()
.and_then(|spec| spec.volume_claim_templates.clone())
.unwrap_or_default()
});
if let Err(e) = validate_storage_change(
pool.spec.storage.as_ref(),
observed_pvc_templates.as_deref(),
) {
let cond = condition_for_validation_error(&e);
patch_status_for_pool(&pool_api, &name, cond).await?;
return Ok(Action::await_change());
}
let sts = render_statefulset(&parent, &pool, &image)?;
apply_object(&sts_api, &sts_name, &sts).await?;
let live = sts_api.get_opt(&sts_name).await?;
let (replicas, ready_replicas, reason, message) =
derive_status(live.as_ref(), pool.spec.replicas);
let status_value = if reason == "Available" {
"True"
} else {
"False"
};
let status = KafkaNodePoolStatus {
conditions: vec![condition("Ready", status_value, reason, &message)],
replicas,
ready_replicas,
};
common::patch_status::<KafkaNodePool, KafkaNodePoolStatus>(&pool_api, &name, status).await?;
Ok(Action::requeue(Duration::from_secs(30)))
}
pub fn error_policy(_obj: Arc<KafkaNodePool>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
tracing::warn!(error = %err, "pool reconcile error, requeueing");
Action::requeue(Duration::from_secs(15))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crd::{
KafkaNodePoolSpec, KafkaSpec, MetadataTemplate, PersistentClaimSpec, PodTemplate, Storage,
};
use assert2::assert;
use std::collections::BTreeMap;
fn parent_fixture(name: &str) -> Kafka {
let mut k = Kafka::new(
name,
KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
k.metadata.namespace = Some("default".into());
k.metadata.uid = Some("parent-u".into());
k
}
fn pool_fixture(name: &str, parent: &str, replicas: i32) -> KafkaNodePool {
let mut p = KafkaNodePool::new(
name,
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas,
node_id_start: 0,
image: None,
resources: None,
template: None,
storage: None,
},
);
p.metadata.namespace = Some("default".into());
p.metadata.uid = Some("pool-u".into());
let mut labels = BTreeMap::new();
labels.insert("crabka.io/cluster".into(), parent.to_string());
p.metadata.labels = Some(labels);
p
}
#[test]
fn render_statefulset_name_is_kafka_dash_pool() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
assert!(sts.metadata.name.as_deref() == Some("demo-brokers"));
}
#[test]
fn render_statefulset_service_name_is_shared_headless() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let spec = sts.spec.expect("sts spec");
assert!(spec.service_name.as_deref() == Some("demo-broker-headless"));
}
#[test]
fn render_statefulset_pod_labels_include_kafka_instance_and_pool_name() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let spec = sts.spec.expect("sts spec");
let pod_labels = spec
.template
.metadata
.as_ref()
.and_then(|m| m.labels.as_ref())
.expect("pod template labels");
assert!(
pod_labels
.get("app.kubernetes.io/instance")
.map(String::as_str)
== Some("demo")
);
assert!(pod_labels.get("crabka.io/pool").map(String::as_str) == Some("brokers"));
}
#[test]
fn render_statefulset_init_script_uses_nodeidstart() {
let parent = parent_fixture("demo");
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.node_id_start = 42;
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod = sts.spec.unwrap().template.spec.unwrap();
let init = &pod.init_containers.expect("init containers")[0];
let env = init.env.as_ref().expect("init env");
let node_id_start = env
.iter()
.find(|e| e.name == "NODE_ID_START")
.expect("NODE_ID_START env");
assert!(node_id_start.value.as_deref() == Some("42"));
let args = init.args.as_ref().expect("init args");
let script = args.iter().find(|s| s.contains("NODE_ID_START"));
let script = script.expect("init script references NODE_ID_START");
assert!(
script.contains("NODE_ID_START + ORDINAL"),
"expected the init script to compute NODE_ID = NODE_ID_START + ORDINAL, got: {script}"
);
let format_pos = script
.find("crabka format")
.expect("init script must invoke `crabka format`");
let node_id_write_pos = script
.find(".node-id")
.expect("init script must write .node-id");
assert!(
node_id_write_pos > format_pos,
"init script must write .node-id AFTER crabka format. \
Otherwise `crabka format` refuses to overwrite a non-empty \
log_dir on the first boot of an empty PVC. \
format at byte {format_pos}, .node-id at byte {node_id_write_pos}",
);
}
#[test]
fn init_script_passes_release_version() {
assert!(
INIT_SCRIPT.contains("--release-version \"$CRABKA_METADATA_VERSION\""),
"init script must pass the resolved metadata.version to crabka format"
);
}
#[test]
fn init_container_wires_metadata_version_env() {
let c = render_init_container("img:tag", "sec", 0, "4.0");
let env = c["env"].as_array().expect("env array");
let mv = env
.iter()
.find(|e| e["name"] == "CRABKA_METADATA_VERSION")
.expect("CRABKA_METADATA_VERSION env present");
assert!(mv["value"] == "4.0");
}
#[test]
fn statefulset_init_normalizes_metadata_version_to_short() {
let parent = parent_fixture("demo"); let mut parent37 = parent.clone();
parent37.spec.kafka_version = "3.7.1".into();
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent37, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod = sts.spec.unwrap().template.spec.unwrap();
let init = &pod.init_containers.expect("init containers")[0];
let env = init.env.as_ref().expect("init env");
let mv = env
.iter()
.find(|e| e.name == "CRABKA_METADATA_VERSION")
.expect("CRABKA_METADATA_VERSION env present");
assert!(
mv.value.as_deref() == Some("3.7"),
"init container must receive short major.minor form, not the 3-part kafka_version"
);
}
#[test]
fn statefulset_init_clamps_out_of_range_version_to_max() {
let mut parent = parent_fixture("demo");
parent.spec.kafka_version = "4.1.0".into();
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod = sts.spec.unwrap().template.spec.unwrap();
let init = &pod.init_containers.expect("init containers")[0];
let env = init.env.as_ref().expect("init env");
let mv = env
.iter()
.find(|e| e.name == "CRABKA_METADATA_VERSION")
.expect("CRABKA_METADATA_VERSION env present");
let max_short = crabka_metadata::metadata_version::from_feature_level(
crabka_metadata::metadata_version::METADATA_VERSION_MAX,
)
.unwrap()
.short();
assert!(
mv.value.as_deref() == Some(max_short),
"out-of-range kafka_version must clamp to MAX short form ({max_short}), \
not the unsupported \"4.1\""
);
}
#[test]
fn validate_rejects_replicas_two() {
let pool = pool_fixture("brokers", "demo", 2);
let err = validate(&pool).unwrap_err();
assert!(
matches!(err, PoolValidationError::ReplicasNotOne(2)),
"expected ReplicasNotOne(2), got {err:?}"
);
}
#[test]
fn validate_rejects_controller_only_roles() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.roles = vec![NodeRole::Controller];
let err = validate(&pool).unwrap_err();
assert!(
matches!(err, PoolValidationError::RolesNotMixed(_)),
"expected RolesNotMixed, got {err:?}"
);
}
#[test]
fn validate_rejects_broker_only_roles() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.roles = vec![NodeRole::Broker];
let err = validate(&pool).unwrap_err();
assert!(
matches!(err, PoolValidationError::RolesNotMixed(_)),
"expected RolesNotMixed, got {err:?}"
);
}
#[test]
fn validate_rejects_negative_nodeidstart() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.node_id_start = -1;
let err = validate(&pool).unwrap_err();
assert!(
matches!(err, PoolValidationError::NodeIdOutOfRange(-1)),
"expected NodeIdOutOfRange(-1), got {err:?}"
);
}
fn pool_with_template(template: PodTemplate) -> KafkaNodePool {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.template = Some(template);
pool
}
#[test]
fn render_statefulset_template_labels_merge_under_operator_labels() {
let mut user_labels = BTreeMap::new();
user_labels.insert("team".into(), "platform".into());
user_labels.insert("app.kubernetes.io/name".into(), "hijack".into());
let pool = pool_with_template(PodTemplate {
metadata: Some(MetadataTemplate {
labels: user_labels,
annotations: BTreeMap::new(),
}),
..Default::default()
});
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let pod_labels = sts.spec.unwrap().template.metadata.unwrap().labels.unwrap();
assert!(pod_labels.get("team").map(String::as_str) == Some("platform"));
assert!(pod_labels.get("app.kubernetes.io/name").map(String::as_str) == Some(APP_LABEL));
}
#[test]
fn render_statefulset_template_annotations_apply() {
let mut annos = BTreeMap::new();
annos.insert("crabka.io/test-anno".into(), "yes".into());
let pool = pool_with_template(PodTemplate {
metadata: Some(MetadataTemplate {
labels: BTreeMap::new(),
annotations: annos,
}),
..Default::default()
});
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let anno = sts
.spec
.unwrap()
.template
.metadata
.unwrap()
.annotations
.unwrap();
assert!(anno.get("crabka.io/test-anno").map(String::as_str) == Some("yes"));
}
#[test]
fn render_statefulset_affinity_passes_through() {
use k8s_openapi::api::core::v1::{Affinity, NodeAffinity, NodeSelector, NodeSelectorTerm};
let affinity = Affinity {
node_affinity: Some(NodeAffinity {
required_during_scheduling_ignored_during_execution: Some(NodeSelector {
node_selector_terms: vec![NodeSelectorTerm::default()],
}),
preferred_during_scheduling_ignored_during_execution: None,
}),
..Default::default()
};
let pool = pool_with_template(PodTemplate {
affinity: Some(affinity.clone()),
..Default::default()
});
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let rendered = sts.spec.unwrap().template.spec.unwrap().affinity;
assert!(rendered == Some(affinity));
}
#[test]
fn render_statefulset_tolerations_passes_through() {
use k8s_openapi::api::core::v1::Toleration;
let tol = Toleration {
key: Some("dedicated".into()),
operator: Some("Exists".into()),
effect: Some("NoSchedule".into()),
..Default::default()
};
let pool = pool_with_template(PodTemplate {
tolerations: vec![tol.clone()],
..Default::default()
});
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let tols = sts
.spec
.unwrap()
.template
.spec
.unwrap()
.tolerations
.unwrap();
assert!(tols == vec![tol]);
}
#[test]
fn render_statefulset_node_selector_passes_through() {
let mut ns = BTreeMap::new();
ns.insert("disktype".into(), "ssd".into());
let pool = pool_with_template(PodTemplate {
node_selector: Some(ns.clone()),
..Default::default()
});
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let rendered = sts
.spec
.unwrap()
.template
.spec
.unwrap()
.node_selector
.unwrap();
assert!(rendered.get("disktype").map(String::as_str) == Some("ssd"));
}
#[test]
fn render_statefulset_no_template_no_extra_fields() {
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let spec = sts.spec.unwrap().template.spec.unwrap();
assert!(spec.affinity.is_none());
assert!(spec.tolerations.is_none() || spec.tolerations.as_ref().unwrap().is_empty());
assert!(spec.node_selector.is_none() || spec.node_selector.as_ref().unwrap().is_empty());
}
#[test]
fn render_statefulset_propagates_config_hash_from_label() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.metadata
.labels
.get_or_insert_with(BTreeMap::new)
.insert("crabka.io/config-hash".into(), "abc123".into());
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let anno = sts
.spec
.unwrap()
.template
.metadata
.unwrap()
.annotations
.unwrap();
assert!(anno.get("crabka.io/config-hash").map(String::as_str) == Some("abc123"));
}
#[test]
fn render_statefulset_no_config_hash_when_label_absent() {
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
if let Some(anno) = sts.spec.unwrap().template.metadata.unwrap().annotations {
assert!(!anno.contains_key("crabka.io/config-hash"));
}
}
#[test]
fn render_statefulset_emptydir_when_storage_none() {
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let spec = sts.spec.unwrap();
assert!(
spec.volume_claim_templates.is_none()
|| spec.volume_claim_templates.as_ref().unwrap().is_empty()
);
let volumes = spec.template.spec.unwrap().volumes.unwrap();
let data_vol = volumes
.iter()
.find(|v| v.name == "data")
.expect("data volume present");
assert!(
data_vol.empty_dir.is_some(),
"data volume must be emptyDir; got {data_vol:?}"
);
}
#[test]
fn render_statefulset_emptydir_when_storage_ephemeral() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::Ephemeral);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let spec = sts.spec.unwrap();
assert!(
spec.volume_claim_templates.is_none()
|| spec.volume_claim_templates.as_ref().unwrap().is_empty()
);
let volumes = spec.template.spec.unwrap().volumes.unwrap();
let data_vol = volumes.iter().find(|v| v.name == "data").unwrap();
assert!(data_vol.empty_dir.is_some());
}
#[test]
fn render_statefulset_volume_claim_template_when_persistent() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "10Gi".into(),
class: Some("fast-ssd".into()),
delete_claim: false,
}));
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let spec = sts.spec.unwrap();
let volumes = spec.template.spec.as_ref().unwrap().volumes.as_ref();
if let Some(vols) = volumes {
assert!(
vols.iter()
.all(|v| v.name != "data" || v.empty_dir.is_none()),
"expected no emptyDir for data; got {vols:?}"
);
}
let vct = spec.volume_claim_templates.unwrap();
assert!(vct.len() == 1);
let data_pvc = &vct[0];
assert!(data_pvc.metadata.name.as_deref() == Some("data"));
let pvc_spec = data_pvc.spec.as_ref().unwrap();
assert!(pvc_spec.access_modes.as_deref() == Some(["ReadWriteOnce".to_string()].as_slice()));
let req = pvc_spec
.resources
.as_ref()
.unwrap()
.requests
.as_ref()
.unwrap();
assert!(req.get("storage").map(|q| q.0.as_str()) == Some("10Gi"));
assert!(pvc_spec.storage_class_name.as_deref() == Some("fast-ssd"));
}
#[test]
fn render_statefulset_no_storage_class_when_class_absent() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "1Gi".into(),
class: None,
delete_claim: false,
}));
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let pvc_spec = sts.spec.unwrap().volume_claim_templates.unwrap()[0]
.spec
.clone()
.unwrap();
assert!(
pvc_spec.storage_class_name.is_none(),
"must omit storageClassName when class is None"
);
}
#[test]
fn render_statefulset_pvc_labels_inherit_pod_labels() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "1Gi".into(),
class: None,
delete_claim: false,
}));
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let labels = sts.spec.unwrap().volume_claim_templates.unwrap()[0]
.metadata
.labels
.clone()
.expect("PVC has labels");
assert!(labels.get("app.kubernetes.io/instance").map(String::as_str) == Some("demo"));
assert!(labels.get("crabka.io/pool").map(String::as_str) == Some("brokers"));
}
#[test]
fn render_statefulset_retention_policy_delete_when_delete_claim_true() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "1Gi".into(),
class: None,
delete_claim: true,
}));
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let policy = sts
.spec
.unwrap()
.persistent_volume_claim_retention_policy
.unwrap();
assert!(policy.when_deleted.as_deref() == Some("Delete"));
assert!(policy.when_scaled.as_deref() == Some("Retain"));
}
#[test]
fn render_statefulset_retention_policy_retain_when_delete_claim_false() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "1Gi".into(),
class: None,
delete_claim: false,
}));
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let policy = sts
.spec
.unwrap()
.persistent_volume_claim_retention_policy
.unwrap();
assert!(policy.when_deleted.as_deref() == Some("Retain"));
assert!(policy.when_scaled.as_deref() == Some("Retain"));
}
#[test]
fn render_statefulset_no_retention_policy_when_ephemeral() {
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
assert!(
sts.spec
.unwrap()
.persistent_volume_claim_retention_policy
.is_none()
);
}
fn pvc_template(size: &str, class: Option<&str>) -> PersistentVolumeClaim {
let mut map = std::collections::BTreeMap::new();
map.insert(
"storage".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(size.into()),
);
PersistentVolumeClaim {
metadata: kube::core::ObjectMeta {
name: Some("data".into()),
..Default::default()
},
spec: Some(k8s_openapi::api::core::v1::PersistentVolumeClaimSpec {
access_modes: Some(vec!["ReadWriteOnce".into()]),
resources: Some(k8s_openapi::api::core::v1::VolumeResourceRequirements {
requests: Some(map),
..Default::default()
}),
storage_class_name: class.map(String::from),
..Default::default()
}),
status: None,
}
}
fn pc(size: &str, class: Option<&str>) -> Storage {
Storage::PersistentClaim(PersistentClaimSpec {
size: size.into(),
class: class.map(String::from),
delete_claim: false,
})
}
#[test]
fn validate_storage_change_first_reconcile_accepts_any() {
assert!(validate_storage_change(None, None).is_ok());
assert!(validate_storage_change(Some(&Storage::Ephemeral), None).is_ok());
assert!(validate_storage_change(Some(&pc("10Gi", None)), None).is_ok());
}
#[test]
fn validate_storage_change_rejects_type_switch() {
let observed = pvc_template("10Gi", None);
let err = validate_storage_change(
Some(&Storage::Ephemeral),
Some(std::slice::from_ref(&observed)),
)
.unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageTypeChanged { .. }
));
}
#[test]
fn validate_storage_change_rejects_class_change() {
let observed = pvc_template("10Gi", Some("class-a"));
let err = validate_storage_change(
Some(&pc("10Gi", Some("class-b"))),
Some(std::slice::from_ref(&observed)),
)
.unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageClassChanged { .. }
));
}
#[test]
fn validate_storage_change_rejects_shrink() {
let observed = pvc_template("10Gi", None);
let err = validate_storage_change(
Some(&pc("5Gi", None)),
Some(std::slice::from_ref(&observed)),
)
.unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageShrinkNotAllowed { .. }
));
}
#[test]
fn validate_storage_change_allows_grow() {
let observed = pvc_template("10Gi", None);
assert!(
validate_storage_change(
Some(&pc("20Gi", None)),
Some(std::slice::from_ref(&observed))
)
.is_ok()
);
}
#[test]
fn validate_storage_change_allows_delete_claim_flip() {
let observed = pvc_template("10Gi", None);
let mut desired = pc("10Gi", None);
if let Storage::PersistentClaim(ref mut p) = desired {
p.delete_claim = true;
}
assert!(
validate_storage_change(Some(&desired), Some(std::slice::from_ref(&observed))).is_ok()
);
}
#[test]
fn validate_static_rejects_unparseable_size() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(pc("banana", None));
let err = validate(&pool).unwrap_err();
assert!(matches!(err, PoolValidationError::StorageSizeInvalid(_, _)));
}
fn pvc_template_named(name: &str, size: &str, class: Option<&str>) -> PersistentVolumeClaim {
let mut t = pvc_template(size, class);
t.metadata.name = Some(name.into());
t
}
fn jbod(volumes: &[(i32, &str, Option<&str>)], delete_claim: bool) -> Storage {
Storage::Jbod(crate::crd::JbodSpec {
volumes: volumes
.iter()
.map(|(id, size, class)| JbodVolume {
id: *id,
size: (*size).into(),
class: class.map(String::from),
})
.collect(),
delete_claim,
})
}
fn jbod_pool(volumes: &[(i32, &str, Option<&str>)], delete_claim: bool) -> KafkaNodePool {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(jbod(volumes, delete_claim));
pool
}
#[test]
fn validate_rejects_jbod_single_volume() {
let pool = jbod_pool(&[(0, "1Gi", None)], false);
let err = validate(&pool).unwrap_err();
assert!(matches!(err, PoolValidationError::JbodNeedsTwoVolumes(1)));
}
#[test]
fn validate_rejects_jbod_duplicate_ids() {
let pool = jbod_pool(&[(0, "1Gi", None), (0, "2Gi", None)], false);
let err = validate(&pool).unwrap_err();
assert!(matches!(err, PoolValidationError::JbodDuplicateVolumeId(0)));
}
#[test]
fn validate_rejects_jbod_bad_size() {
let pool = jbod_pool(&[(0, "1Gi", None), (1, "banana", None)], false);
let err = validate(&pool).unwrap_err();
assert!(matches!(err, PoolValidationError::StorageSizeInvalid(_, _)));
}
#[test]
fn validate_accepts_valid_jbod() {
let pool = jbod_pool(&[(0, "1Gi", None), (1, "2Gi", Some("fast"))], true);
assert!(validate(&pool).is_ok());
}
#[test]
fn render_statefulset_jbod_renders_one_pvc_per_volume() {
let pool = jbod_pool(&[(0, "10Gi", None), (1, "20Gi", Some("fast-ssd"))], false);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let vct = sts.spec.unwrap().volume_claim_templates.unwrap();
assert!(vct.len() == 2);
assert!(vct[0].metadata.name.as_deref() == Some("data"));
assert!(vct[1].metadata.name.as_deref() == Some("data-1"));
let req0 = vct[0]
.spec
.as_ref()
.unwrap()
.resources
.as_ref()
.unwrap()
.requests
.as_ref()
.unwrap();
assert!(req0.get("storage").map(|q| q.0.as_str()) == Some("10Gi"));
let req1 = vct[1]
.spec
.as_ref()
.unwrap()
.resources
.as_ref()
.unwrap()
.requests
.as_ref()
.unwrap();
assert!(req1.get("storage").map(|q| q.0.as_str()) == Some("20Gi"));
assert!(vct[0].spec.as_ref().unwrap().storage_class_name.as_deref() == None);
assert!(vct[1].spec.as_ref().unwrap().storage_class_name.as_deref() == Some("fast-ssd"));
}
#[test]
fn render_statefulset_jbod_sorts_volumes_by_id() {
let pool = jbod_pool(
&[(2, "1Gi", None), (0, "1Gi", None), (1, "1Gi", None)],
false,
);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let vct = sts.spec.unwrap().volume_claim_templates.unwrap();
let names: Vec<&str> = vct
.iter()
.map(|t| t.metadata.name.as_deref().unwrap())
.collect();
assert!(names == vec!["data", "data-1", "data-2"]);
}
#[test]
fn render_statefulset_jbod_sets_extra_log_dirs_env() {
let pool = jbod_pool(
&[(0, "1Gi", None), (1, "1Gi", None), (2, "1Gi", None)],
false,
);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
let extra = env
.iter()
.find(|e| e.name == "CRABKA_EXTRA_LOG_DIRS")
.expect("CRABKA_EXTRA_LOG_DIRS env present for JBOD");
assert!(extra.value.as_deref() == Some("/var/lib/crabka/data-1,/var/lib/crabka/data-2"));
}
#[test]
fn render_statefulset_jbod_mounts_extra_volumes() {
let pool = jbod_pool(&[(0, "1Gi", None), (1, "1Gi", None)], false);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let mounts = sts.spec.unwrap().template.spec.unwrap().containers[0]
.volume_mounts
.clone()
.unwrap();
let by_name: Vec<(&str, &str)> = mounts
.iter()
.map(|m| (m.name.as_str(), m.mount_path.as_str()))
.collect();
assert!(
by_name.contains(&("data", "/var/lib/crabka/data")),
"primary data mount; got {by_name:?}"
);
assert!(
by_name.contains(&("data-1", "/var/lib/crabka/data-1")),
"extra disk mount; got {by_name:?}"
);
}
#[test]
fn render_statefulset_jbod_no_extra_log_dirs_env_for_non_jbod() {
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(pc("1Gi", None));
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
assert!(env.iter().all(|e| e.name != "CRABKA_EXTRA_LOG_DIRS"));
}
#[test]
fn render_statefulset_jbod_retention_policy_delete_when_delete_claim_true() {
let pool = jbod_pool(&[(0, "1Gi", None), (1, "1Gi", None)], true);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let policy = sts
.spec
.unwrap()
.persistent_volume_claim_retention_policy
.unwrap();
assert!(policy.when_deleted.as_deref() == Some("Delete"));
assert!(policy.when_scaled.as_deref() == Some("Retain"));
}
#[test]
fn render_statefulset_jbod_pvc_labels_inherit_pod_labels() {
let pool = jbod_pool(&[(0, "1Gi", None), (1, "1Gi", None)], false);
let sts = render_statefulset(&parent_fixture("demo"), &pool, "img:1").unwrap();
let vct = sts.spec.unwrap().volume_claim_templates.unwrap();
for t in &vct {
let labels = t.metadata.labels.clone().expect("PVC labels");
assert!(
labels.get("app.kubernetes.io/instance").map(String::as_str) == Some("demo"),
"every JBOD PVC inherits the GC instance label"
);
}
}
fn jbod_observed(volumes: &[(&str, &str, Option<&str>)]) -> Vec<PersistentVolumeClaim> {
volumes
.iter()
.map(|(name, size, class)| pvc_template_named(name, size, *class))
.collect()
}
#[test]
fn validate_storage_change_rejects_switch_into_jbod() {
let observed = jbod_observed(&[("data", "10Gi", None)]);
let desired = jbod(&[(0, "10Gi", None), (1, "10Gi", None)], false);
let err = validate_storage_change(Some(&desired), Some(&observed)).unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageTypeChanged { .. }
));
}
#[test]
fn validate_storage_change_rejects_switch_out_of_jbod() {
let observed = jbod_observed(&[("data", "10Gi", None), ("data-1", "10Gi", None)]);
let err = validate_storage_change(Some(&pc("10Gi", None)), Some(&observed)).unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageTypeChanged { .. }
));
}
#[test]
fn validate_storage_change_jbod_allows_grow() {
let observed = jbod_observed(&[("data", "10Gi", None), ("data-1", "10Gi", None)]);
let desired = jbod(&[(0, "20Gi", None), (1, "30Gi", None)], false);
assert!(validate_storage_change(Some(&desired), Some(&observed)).is_ok());
}
#[test]
fn validate_storage_change_jbod_rejects_shrink() {
let observed = jbod_observed(&[("data", "10Gi", None), ("data-1", "10Gi", None)]);
let desired = jbod(&[(0, "10Gi", None), (1, "5Gi", None)], false);
let err = validate_storage_change(Some(&desired), Some(&observed)).unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageShrinkNotAllowed { .. }
));
}
#[test]
fn validate_storage_change_jbod_rejects_class_change() {
let observed = jbod_observed(&[("data", "10Gi", None), ("data-1", "10Gi", Some("a"))]);
let desired = jbod(&[(0, "10Gi", None), (1, "10Gi", Some("b"))], false);
let err = validate_storage_change(Some(&desired), Some(&observed)).unwrap_err();
assert!(matches!(
err,
PoolValidationError::StorageClassChanged { .. }
));
}
#[test]
fn validate_storage_change_jbod_rejects_adding_disk() {
let observed = jbod_observed(&[("data", "10Gi", None), ("data-1", "10Gi", None)]);
let desired = jbod(
&[(0, "10Gi", None), (1, "10Gi", None), (2, "10Gi", None)],
false,
);
let err = validate_storage_change(Some(&desired), Some(&observed)).unwrap_err();
assert!(matches!(err, PoolValidationError::JbodVolumesImmutable));
}
#[test]
fn validate_storage_change_jbod_rejects_removing_disk() {
let observed = jbod_observed(&[
("data", "10Gi", None),
("data-1", "10Gi", None),
("data-2", "10Gi", None),
]);
let desired = jbod(&[(0, "10Gi", None), (1, "10Gi", None)], false);
let err = validate_storage_change(Some(&desired), Some(&observed)).unwrap_err();
assert!(matches!(err, PoolValidationError::JbodVolumesImmutable));
}
#[test]
fn validate_storage_change_jbod_unchanged_is_ok() {
let observed = jbod_observed(&[("data", "10Gi", None), ("data-1", "20Gi", Some("fast"))]);
let desired = jbod(&[(0, "10Gi", None), (1, "20Gi", Some("fast"))], false);
assert!(validate_storage_change(Some(&desired), Some(&observed)).is_ok());
}
#[test]
fn build_main_script_disabled_matches_constant() {
assert!(build_main_script(false) == MAIN_SCRIPT);
}
#[test]
fn build_main_script_enabled_appends_metrics_flag() {
let s = build_main_script(true);
assert!(
s.contains("--metrics-listen-addr=0.0.0.0:9404"),
"got: {s:?}"
);
assert!(s.contains("--config-file=/run/crabka/broker.toml"));
assert!(s.ends_with('\n'));
}
#[test]
fn render_statefulset_mounts_cluster_ca_and_broker_tls_secrets() {
let parent = parent_fixture("mycluster");
let pool = pool_fixture("brokers", "mycluster", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
let mounts: Vec<&str> = pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.map(|m| m.mount_path.as_str())
.collect();
assert!(
mounts.contains(&"/etc/crabka/cluster-ca"),
"missing /etc/crabka/cluster-ca; got {mounts:?}"
);
assert!(
mounts.contains(&"/etc/crabka/broker-tls"),
"missing /etc/crabka/broker-tls; got {mounts:?}"
);
assert!(
mounts.contains(&"/etc/crabka/clients-ca"),
"missing /etc/crabka/clients-ca; got {mounts:?}"
);
}
#[test]
fn render_statefulset_mounts_gssapi_keytab() {
let mut parent = parent_fixture("kerb");
parent.spec.listeners = vec![crate::crd::Listener {
name: "gss".into(),
port: 9092,
type_: crate::crd::ListenerType::Internal,
tls: true,
authentication: Some(crate::crd::ListenerAuthentication::Gssapi(
crate::crd::ListenerAuthenticationGssapi {
keytab_secret_ref: crate::crd::KeytabSecretRef {
secret_name: "broker-keytab".into(),
key: "krb5.keytab".into(),
},
service_name: None,
principal_to_local_rules: vec![],
realm: None,
kdc: None,
},
)),
configuration: None,
network_policy_peers: None,
}];
let pool = pool_fixture("brokers", "kerb", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
let mounts: Vec<&str> = pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.map(|m| m.mount_path.as_str())
.collect();
assert!(
mounts.contains(&"/etc/crabka/gssapi-keytab"),
"missing /etc/crabka/gssapi-keytab; got {mounts:?}"
);
let volumes = pod_spec.volumes.unwrap_or_default();
let kt = volumes
.iter()
.find(|v| v.name == "gssapi-keytab")
.expect("gssapi-keytab volume present");
let secret = kt.secret.as_ref().expect("keytab volume is a Secret");
assert!(secret.secret_name.as_deref() == Some("broker-keytab"));
let items = secret.items.as_ref().expect("projected items");
assert!(items.len() == 1);
assert!(items[0].key == "krb5.keytab");
assert!(items[0].path == "keytab");
}
#[test]
fn render_statefulset_mounts_krb5_conf_and_sets_env() {
let mut parent = parent_fixture("kerb");
parent.spec.krb5_conf_secret_ref = Some(crate::crd::Krb5ConfSecretRef {
secret_name: "krb5-conf".into(),
key: "config".into(),
});
let pool = pool_fixture("brokers", "kerb", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
let mounts: Vec<&str> = pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.map(|m| m.mount_path.as_str())
.collect();
assert!(
mounts.contains(&"/etc/crabka/krb5"),
"missing /etc/crabka/krb5; got {mounts:?}"
);
let env = pod_spec.containers[0].env.as_ref().expect("env present");
let krb5_config = env
.iter()
.find(|e| e.name == "KRB5_CONFIG")
.expect("KRB5_CONFIG env present");
assert!(krb5_config.value.as_deref() == Some("/etc/crabka/krb5/krb5.conf"));
let volumes = pod_spec.volumes.unwrap_or_default();
let kc = volumes
.iter()
.find(|v| v.name == "krb5-conf")
.expect("krb5-conf volume present");
let secret = kc.secret.as_ref().expect("krb5-conf volume is a Secret");
assert!(secret.secret_name.as_deref() == Some("krb5-conf"));
let items = secret.items.as_ref().expect("projected items");
assert!(items[0].key == "config");
assert!(items[0].path == "krb5.conf");
}
#[test]
fn render_statefulset_volume_secret_names_match_cluster() {
let parent = parent_fixture("mycluster");
let pool = pool_fixture("brokers", "mycluster", 1);
let cluster = parent.metadata.name.clone().unwrap();
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let volumes = ss
.spec
.unwrap()
.template
.spec
.unwrap()
.volumes
.unwrap_or_default();
let names: Vec<String> = volumes
.iter()
.filter_map(|v| v.secret.as_ref().and_then(|s| s.secret_name.clone()))
.collect();
assert!(
names.contains(&format!("{cluster}-cluster-ca-cert")),
"missing {cluster}-cluster-ca-cert; got {names:?}"
);
assert!(
names.contains(&format!("{cluster}-kafka-brokers")),
"missing {cluster}-kafka-brokers; got {names:?}"
);
assert!(
names.contains(&format!("{cluster}-clients-ca-cert")),
"missing {cluster}-clients-ca-cert; got {names:?}"
);
}
#[test]
fn render_statefulset_metrics_off_no_port() {
let parent = parent_fixture("demo");
assert!(parent.spec.metrics_config.is_none());
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let ports = sts.spec.unwrap().template.spec.unwrap().containers[0]
.ports
.clone()
.unwrap();
assert!(ports.len() == 1);
assert!(ports[0].name.as_deref() == Some("kafka-internal"));
}
#[test]
fn render_statefulset_logging_off_no_rust_log_env() {
let parent = parent_fixture("demo");
assert!(parent.spec.logging.is_none());
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
assert!(
env.iter().all(|e| e.name != "RUST_LOG"),
"logging-off cluster must not set RUST_LOG; got {env:?}"
);
}
#[test]
fn render_statefulset_logging_on_adds_rust_log_env_from_configmap() {
use crate::crd::Logging;
let mut parent = parent_fixture("demo");
parent.spec.logging = Some(Logging::default());
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
let rust_log = env
.iter()
.find(|e| e.name == "RUST_LOG")
.expect("RUST_LOG env present when logging set");
let cm_ref = rust_log
.value_from
.as_ref()
.and_then(|vf| vf.config_map_key_ref.as_ref())
.expect("RUST_LOG sourced from configMapKeyRef");
assert!(cm_ref.name == "demo-broker-config");
assert!(cm_ref.key == "rust.log");
assert!(cm_ref.optional == Some(true));
assert!(rust_log.value.is_none());
}
#[test]
fn render_statefulset_omits_dt_master_key_env_when_unset() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
assert!(
env.iter()
.all(|e| e.name != "CRABKA_DELEGATION_TOKEN_SECRET_KEY"),
"env: {env:#?}"
);
}
#[test]
fn render_statefulset_dt_master_key_env_from_secret_default_key() {
use crate::crd::kafka::{DelegationTokenConfig, SecretKeyRef};
let mut parent = parent_fixture("demo");
parent.spec.delegation_token = Some(DelegationTokenConfig {
secret_key_ref: SecretKeyRef {
name: "dt-master".into(),
key: None,
},
});
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
let dt_env = env
.iter()
.find(|e| e.name == "CRABKA_DELEGATION_TOKEN_SECRET_KEY")
.expect("dt env present when spec.delegationToken set");
assert!(
dt_env.value.is_none(),
"literal value must not be set; valueFrom only"
);
let secret_ref = dt_env
.value_from
.as_ref()
.and_then(|vf| vf.secret_key_ref.as_ref())
.expect("secretKeyRef present");
assert!(secret_ref.name == "dt-master");
assert!(secret_ref.key == "secret-key");
}
#[test]
fn render_statefulset_dt_master_key_env_honors_explicit_key() {
use crate::crd::kafka::{DelegationTokenConfig, SecretKeyRef};
let mut parent = parent_fixture("demo");
parent.spec.delegation_token = Some(DelegationTokenConfig {
secret_key_ref: SecretKeyRef {
name: "dt-master".into(),
key: Some("hmac".into()),
},
});
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let env = sts.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.unwrap();
let secret_ref = env
.iter()
.find(|e| e.name == "CRABKA_DELEGATION_TOKEN_SECRET_KEY")
.and_then(|e| e.value_from.as_ref())
.and_then(|vf| vf.secret_key_ref.as_ref())
.expect("secretKeyRef present");
assert!(secret_ref.name == "dt-master");
assert!(secret_ref.key == "hmac");
}
#[test]
fn render_statefulset_metrics_on_adds_port() {
use crate::crd::{MetricsConfig, PodMonitorSpec};
let mut parent = parent_fixture("demo");
parent.spec.metrics_config = Some(MetricsConfig {
pod_monitor: Some(PodMonitorSpec::default()),
..Default::default()
});
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, "img:latest").unwrap();
let ports = sts.spec.unwrap().template.spec.unwrap().containers[0]
.ports
.clone()
.unwrap();
assert!(ports.len() == 2);
assert!(
ports
.iter()
.any(|p| p.name.as_deref() == Some("metrics") && p.container_port == 9404)
);
}
fn parent_with_oauth_trust(name: &str) -> Kafka {
use crate::crd::{
Listener, ListenerAuthentication, ListenerAuthenticationOAuth, ListenerType,
TlsTrustedCertificate,
};
let mut k = parent_fixture(name);
k.spec.listeners = vec![Listener {
name: "oauth".into(),
port: 9094,
type_: ListenerType::Internal,
tls: true,
authentication: Some(ListenerAuthentication::OAuth(ListenerAuthenticationOAuth {
valid_issuer_uri: "https://iss.example/".into(),
jwks_endpoint_uri: Some("https://iss.example/jwks".into()),
valid_audience: None,
user_name_claim: None,
custom_claim_check: None,
jwks_refresh_seconds: None,
max_clock_skew_seconds: None,
enable_oauth_bearer: true,
tls_trusted_certificates: vec![TlsTrustedCertificate {
secret_name: "my-idp-ca".into(),
certificate: "ca.crt".into(),
}],
access_token_is_jwt: true,
introspection_endpoint_uri: None,
user_info_endpoint_uri: None,
client_id: None,
client_secret: None,
introspection_http_timeout_seconds: None,
max_seconds_without_reauthentication: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
jwks_min_refresh_pause_seconds: None,
jwks_expiry_seconds: None,
jwks_ignore_key_use: None,
})),
configuration: None,
network_policy_peers: None,
}];
k
}
#[test]
fn render_statefulset_mounts_oauth_jwks_trust_secret_when_some() {
let parent = parent_with_oauth_trust("demo");
let pool = pool_fixture("brokers", "demo", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
let mount = pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.find(|m| m.name == "oauth-jwks-trust")
.expect("oauth-jwks-trust mount present");
assert!(mount.mount_path == "/etc/crabka/oauth-jwks-trust");
assert!(mount.read_only == Some(true));
let volume = pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.find(|v| v.name == "oauth-jwks-trust")
.expect("oauth-jwks-trust volume present");
let secret = volume.secret.as_ref().expect("secret volume source");
assert!(secret.secret_name.as_deref() == Some("demo-oauth-jwks-trust"));
assert!(secret.default_mode == Some(0o400));
}
#[test]
fn render_statefulset_omits_oauth_jwks_trust_volume_when_none() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
assert!(
pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.all(|m| m.name != "oauth-jwks-trust"),
"no OAuth listener → no oauth-jwks-trust mount",
);
assert!(
pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.all(|v| v.name != "oauth-jwks-trust"),
"no OAuth listener → no oauth-jwks-trust pod volume",
);
}
#[test]
fn render_statefulset_omits_oauth_jwks_trust_volume_when_certs_empty() {
use crate::crd::{
Listener, ListenerAuthentication, ListenerAuthenticationOAuth, ListenerType,
};
let mut parent = parent_fixture("demo");
parent.spec.listeners = vec![Listener {
name: "oauth".into(),
port: 9094,
type_: ListenerType::Internal,
tls: true,
authentication: Some(ListenerAuthentication::OAuth(ListenerAuthenticationOAuth {
valid_issuer_uri: "https://iss.example/".into(),
jwks_endpoint_uri: Some("https://iss.example/jwks".into()),
valid_audience: None,
user_name_claim: None,
custom_claim_check: None,
jwks_refresh_seconds: None,
max_clock_skew_seconds: None,
enable_oauth_bearer: true,
tls_trusted_certificates: vec![],
access_token_is_jwt: true,
introspection_endpoint_uri: None,
user_info_endpoint_uri: None,
client_id: None,
client_secret: None,
introspection_http_timeout_seconds: None,
max_seconds_without_reauthentication: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
jwks_min_refresh_pause_seconds: None,
jwks_expiry_seconds: None,
jwks_ignore_key_use: None,
})),
configuration: None,
network_policy_peers: None,
}];
let pool = pool_fixture("brokers", "demo", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
assert!(
pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.all(|m| m.name != "oauth-jwks-trust"),
);
assert!(
pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.all(|v| v.name != "oauth-jwks-trust"),
);
}
#[test]
fn render_statefulset_mounts_oauth_introspection_secret_when_introspection_mode() {
use crate::crd::{
Listener, ListenerAuthentication, ListenerAuthenticationOAuth, ListenerType,
OauthClientSecretRef,
};
let mut parent = parent_fixture("demo");
parent.spec.listeners = vec![Listener {
name: "oauth".into(),
port: 9094,
type_: ListenerType::Internal,
tls: true,
authentication: Some(ListenerAuthentication::OAuth(ListenerAuthenticationOAuth {
valid_issuer_uri: "https://iss.example/".into(),
jwks_endpoint_uri: None,
valid_audience: None,
user_name_claim: None,
custom_claim_check: None,
jwks_refresh_seconds: None,
max_clock_skew_seconds: None,
enable_oauth_bearer: true,
tls_trusted_certificates: vec![],
access_token_is_jwt: false,
introspection_endpoint_uri: Some("https://iss.example/introspect".into()),
user_info_endpoint_uri: None,
client_id: Some("kafka-broker".into()),
client_secret: Some(OauthClientSecretRef {
secret_name: "my-oauth-secret".into(),
key: "my-key".into(),
}),
introspection_http_timeout_seconds: None,
max_seconds_without_reauthentication: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
jwks_min_refresh_pause_seconds: None,
jwks_expiry_seconds: None,
jwks_ignore_key_use: None,
})),
configuration: None,
network_policy_peers: None,
}];
let pool = pool_fixture("brokers", "demo", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
let mount = pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.find(|m| m.name == "oauth-introspection-secret")
.expect("oauth-introspection-secret mount present");
assert!(mount.mount_path == "/etc/crabka/oauth-introspection");
assert!(mount.read_only == Some(true));
let volume = pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.find(|v| v.name == "oauth-introspection-secret")
.expect("oauth-introspection-secret volume present");
let secret = volume.secret.as_ref().expect("secret volume source");
assert!(secret.secret_name.as_deref() == Some("my-oauth-secret"));
assert!(secret.default_mode == Some(0o400));
let items = secret.items.as_ref().expect("projected items present");
assert!(items.len() == 1);
assert!(items[0].key == "my-key");
assert!(items[0].path == "client-secret");
}
#[test]
fn render_statefulset_omits_oauth_introspection_volume_when_jwt_mode() {
let parent = parent_with_oauth_trust("demo");
let pool = pool_fixture("brokers", "demo", 1);
let ss = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).expect("render");
let pod_spec = ss.spec.unwrap().template.spec.unwrap();
assert!(
pod_spec.containers[0]
.volume_mounts
.as_ref()
.unwrap()
.iter()
.all(|m| m.name != "oauth-introspection-secret"),
"JWT-mode OAuth must not produce an oauth-introspection-secret mount",
);
assert!(
pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.all(|v| v.name != "oauth-introspection-secret"),
"JWT-mode OAuth must not produce an oauth-introspection-secret volume",
);
}
fn parent_with_tiered_storage(name: &str) -> Kafka {
let mut k = parent_fixture(name);
k.spec.tiered_storage = Some(crate::crd::kafka::TieredStorage {
kind: crate::crd::kafka::TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: None,
});
k
}
fn parent_with_s3_tiered_storage(name: &str, with_creds: bool) -> Kafka {
let mut k = parent_fixture(name);
let credentials = with_creds.then(|| crate::crd::kafka::S3Credentials {
access_key_id: crate::crd::kafka::SecretKeyRef {
name: "crabka-s3-creds".into(),
key: Some("access-key-id".into()),
},
secret_access_key: crate::crd::kafka::SecretKeyRef {
name: "crabka-s3-creds".into(),
key: Some("secret-access-key".into()),
},
});
k.spec.tiered_storage = Some(crate::crd::kafka::TieredStorage {
kind: crate::crd::kafka::TieredStorageType::S3,
s3: Some(crate::crd::kafka::S3StorageSpec {
bucket: "crabka-tier".into(),
region: "us-east-1".into(),
credentials,
..Default::default()
}),
metadata_manager: None,
persistence: None,
});
k
}
#[test]
fn pod_template_mounts_tier_storage_emptydir_when_tiered_set() {
let parent = parent_with_tiered_storage("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod_spec = sts.spec.as_ref().unwrap().template.spec.as_ref().unwrap();
let volumes = pod_spec.volumes.as_ref().expect("pod volumes");
let tier = volumes
.iter()
.find(|v| v.name == "tier-storage")
.expect("tier-storage volume present");
assert!(
tier.empty_dir.is_some(),
"tier-storage must be an emptyDir, got: {tier:?}"
);
let broker = pod_spec
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container");
let mount = broker
.volume_mounts
.as_ref()
.expect("broker volumeMounts")
.iter()
.find(|m| m.name == "tier-storage")
.expect("tier-storage mount present");
assert!(mount.mount_path == crate::controller::listeners::TIER_STORAGE_PATH);
assert!(
mount.read_only.is_none() || mount.read_only == Some(false),
"tier-storage mount must be writable, got read_only={:?}",
mount.read_only
);
}
#[test]
fn pod_template_injects_aws_credentials_env_from_secret_when_s3() {
let parent = parent_with_s3_tiered_storage("demo", true);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod_spec = sts.spec.as_ref().unwrap().template.spec.as_ref().unwrap();
let broker = pod_spec
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container");
let env = broker.env.as_ref().expect("env present");
let ak = env
.iter()
.find(|e| e.name == "AWS_ACCESS_KEY_ID")
.expect("AWS_ACCESS_KEY_ID env present");
assert!(
ak.value.is_none(),
"literal value must not be set; valueFrom only"
);
let ak_ref = ak
.value_from
.as_ref()
.and_then(|v| v.secret_key_ref.as_ref())
.expect("secretKeyRef present");
assert!(ak_ref.name == "crabka-s3-creds");
assert!(ak_ref.key == "access-key-id");
let sk = env
.iter()
.find(|e| e.name == "AWS_SECRET_ACCESS_KEY")
.expect("AWS_SECRET_ACCESS_KEY env present");
assert!(sk.value.is_none());
let sk_ref = sk
.value_from
.as_ref()
.and_then(|v| v.secret_key_ref.as_ref())
.expect("secretKeyRef present");
assert!(sk_ref.name == "crabka-s3-creds");
assert!(sk_ref.key == "secret-access-key");
}
#[test]
fn pod_template_omits_aws_credentials_env_when_s3_credentials_absent() {
let parent = parent_with_s3_tiered_storage("demo", false);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod_spec = sts.spec.as_ref().unwrap().template.spec.as_ref().unwrap();
let broker = pod_spec
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container");
let env = broker.env.as_ref().expect("env present");
assert!(
env.iter()
.all(|e| e.name != "AWS_ACCESS_KEY_ID" && e.name != "AWS_SECRET_ACCESS_KEY"),
"credentialless S3 must not inject AWS env, got: {env:?}",
);
}
#[test]
fn pod_template_omits_tier_storage_volume_when_s3() {
let parent = parent_with_s3_tiered_storage("demo", true);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod_spec = sts.spec.as_ref().unwrap().template.spec.as_ref().unwrap();
assert!(
pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.all(|v| v.name != "tier-storage"),
"S3 must not allocate the Local tier-storage emptyDir",
);
let broker = pod_spec
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container");
assert!(
broker
.volume_mounts
.as_ref()
.is_none_or(|m| m.iter().all(|x| x.name != "tier-storage")),
"S3 must not mount the Local tier-storage path",
);
}
#[test]
fn pod_template_omits_tier_storage_when_tiered_none() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod_spec = sts.spec.as_ref().unwrap().template.spec.as_ref().unwrap();
assert!(
pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.all(|v| v.name != "tier-storage"),
"non-tiered cluster must not have a tier-storage volume",
);
let broker = pod_spec
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container");
assert!(
broker
.volume_mounts
.as_ref()
.is_none_or(|m| m.iter().all(|x| x.name != "tier-storage")),
"non-tiered cluster must not mount tier-storage",
);
}
fn parent_with_tier_storage_pvc(name: &str, size: &str, class: Option<&str>) -> Kafka {
let mut k = parent_fixture(name);
k.spec.tiered_storage = Some(crate::crd::kafka::TieredStorage {
kind: crate::crd::kafka::TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(crate::crd::kafka::TieredStoragePersistence {
size: size.into(),
class: class.map(str::to_string),
delete_claim: false,
}),
});
k
}
#[test]
fn pod_template_emits_pvc_template_when_tier_persistence_set() {
let parent = parent_with_tier_storage_pvc("demo", "50Gi", Some("fast-ssd"));
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let pod_spec = sts.spec.as_ref().unwrap().template.spec.as_ref().unwrap();
assert!(
pod_spec
.volumes
.as_ref()
.unwrap()
.iter()
.all(|v| v.name != "tier-storage"),
"explicit pod-volume `tier-storage` must not exist when PVC-backed"
);
let tmpls = sts
.spec
.as_ref()
.unwrap()
.volume_claim_templates
.as_ref()
.expect("volumeClaimTemplates");
let tier = tmpls
.iter()
.find(|t| t.metadata.name.as_deref() == Some("tier-storage"))
.expect("tier-storage volumeClaimTemplate");
let spec = tier.spec.as_ref().expect("template spec");
let req = spec
.resources
.as_ref()
.and_then(|r| r.requests.as_ref())
.expect("resources.requests");
assert!(req.get("storage").map(|q| q.0.as_str()) == Some("50Gi"));
assert!(spec.storage_class_name.as_deref() == Some("fast-ssd"));
let broker = pod_spec
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container");
let mount = broker
.volume_mounts
.as_ref()
.unwrap()
.iter()
.find(|m| m.name == "tier-storage")
.expect("tier-storage mount");
assert!(mount.mount_path == crate::controller::listeners::TIER_STORAGE_PATH);
}
#[test]
fn pod_template_pvc_template_omits_storage_class_when_unset() {
let parent = parent_with_tier_storage_pvc("demo", "25Gi", None);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let tier = sts
.spec
.as_ref()
.unwrap()
.volume_claim_templates
.as_ref()
.unwrap()
.iter()
.find(|t| t.metadata.name.as_deref() == Some("tier-storage"))
.expect("tier-storage volumeClaimTemplate");
assert!(
tier.spec.as_ref().unwrap().storage_class_name.is_none(),
"storageClassName must be omitted when class is None"
);
}
#[test]
fn tier_persistence_delete_claim_mismatch_fails_validation() {
use crate::crd::kafka::{TieredStorage, TieredStoragePersistence, TieredStorageType};
use crate::crd::kafka_node_pool::{PersistentClaimSpec, Storage};
let mut parent = parent_fixture("demo");
parent.spec.tiered_storage = Some(TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: "20Gi".into(),
class: None,
delete_claim: true,
}),
});
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "100Gi".into(),
class: None,
delete_claim: false,
}));
let err = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE)
.expect_err("must reject deleteClaim mismatch");
let msg = format!("{err:?}");
assert!(msg.contains("TieredStorageInvalid"), "got: {msg}");
assert!(msg.contains("deleteClaim"), "got: {msg}");
}
#[test]
fn tier_persistence_delete_claim_matching_pool_passes() {
use crate::crd::kafka::{TieredStorage, TieredStoragePersistence, TieredStorageType};
use crate::crd::kafka_node_pool::{PersistentClaimSpec, Storage};
let mut parent = parent_fixture("demo");
parent.spec.tiered_storage = Some(TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: "20Gi".into(),
class: None,
delete_claim: false,
}),
});
let mut pool = pool_fixture("brokers", "demo", 1);
pool.spec.storage = Some(Storage::PersistentClaim(PersistentClaimSpec {
size: "100Gi".into(),
class: None,
delete_claim: false,
}));
let _sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE)
.expect("matching deleteClaim must pass");
}
#[test]
fn tier_persistence_with_ephemeral_pool_storage_passes() {
use crate::crd::kafka::{TieredStorage, TieredStoragePersistence, TieredStorageType};
let mut parent = parent_fixture("demo");
parent.spec.tiered_storage = Some(TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: "20Gi".into(),
class: None,
delete_claim: true,
}),
});
let pool = pool_fixture("brokers", "demo", 1);
let _sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE)
.expect("ephemeral pool + tier persistence must pass regardless of tier deleteClaim");
}
#[test]
fn ephemeral_pool_with_tier_persistence_emits_retention_policy() {
use crate::crd::kafka::{TieredStorage, TieredStoragePersistence, TieredStorageType};
let mut parent = parent_fixture("demo");
parent.spec.tiered_storage = Some(TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: "20Gi".into(),
class: None,
delete_claim: true,
}),
});
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let policy = sts
.spec
.as_ref()
.unwrap()
.persistent_volume_claim_retention_policy
.as_ref()
.expect("policy must exist when tier PVC is present");
assert!(
policy.when_deleted.as_deref() == Some("Delete"),
"delete_claim=true should map to whenDeleted=Delete"
);
assert!(policy.when_scaled.as_deref() == Some("Retain"));
}
#[test]
fn ephemeral_pool_without_tier_persistence_emits_no_retention_policy() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
assert!(
sts.spec
.as_ref()
.unwrap()
.persistent_volume_claim_retention_policy
.is_none(),
"no PVCs => no retention policy"
);
}
fn parent_with_tracing(name: &str, otlp: crate::crd::kafka::OtlpTracing) -> Kafka {
let mut k = parent_fixture(name);
k.spec.tracing = Some(crate::crd::kafka::Tracing {
kind: crate::crd::kafka::TracingType::Otlp,
otlp: Some(otlp),
});
k
}
#[test]
fn pod_template_emits_otlp_env_when_tracing_set() {
let parent = parent_with_tracing(
"demo",
crate::crd::kafka::OtlpTracing {
endpoint: "http://otel:4317".into(),
protocol: Some(crate::crd::kafka::OtlpProtocol::HttpProtobuf),
sample_ratio: Some(0.25),
service_name: Some("svc".into()),
timeout_secs: Some(7),
},
);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let env = sts
.spec
.as_ref()
.unwrap()
.template
.spec
.as_ref()
.unwrap()
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container")
.env
.as_ref()
.expect("env present")
.clone();
let by_name = |needle: &str| {
env.iter()
.find(|e| e.name == needle)
.unwrap_or_else(|| panic!("{needle} env missing"))
.value
.clone()
.unwrap_or_default()
};
assert!(by_name("CRABKA_OTLP_ENABLED") == "true");
assert!(by_name("CRABKA_OTLP_ENDPOINT") == "http://otel:4317");
assert!(by_name("CRABKA_OTLP_PROTOCOL") == "http/protobuf");
assert!(by_name("CRABKA_OTLP_SAMPLE_RATIO") == "0.25");
assert!(by_name("OTEL_SERVICE_NAME") == "svc");
assert!(by_name("CRABKA_OTLP_TIMEOUT_SECS") == "7");
}
#[test]
fn pod_template_omits_optional_otlp_env_when_unset() {
let parent = parent_with_tracing(
"demo",
crate::crd::kafka::OtlpTracing {
endpoint: "http://otel:4317".into(),
protocol: None,
sample_ratio: None,
service_name: None,
timeout_secs: None,
},
);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let env = sts
.spec
.as_ref()
.unwrap()
.template
.spec
.as_ref()
.unwrap()
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container")
.env
.as_ref()
.expect("env present")
.clone();
assert!(env.iter().any(|e| e.name == "CRABKA_OTLP_ENABLED"));
assert!(env.iter().any(|e| e.name == "CRABKA_OTLP_ENDPOINT"));
for unset in [
"CRABKA_OTLP_PROTOCOL",
"CRABKA_OTLP_SAMPLE_RATIO",
"OTEL_SERVICE_NAME",
"CRABKA_OTLP_TIMEOUT_SECS",
] {
assert!(
env.iter().all(|e| e.name != unset),
"{unset} should not be emitted when unset"
);
}
}
#[test]
fn pod_template_omits_all_otlp_env_when_tracing_none() {
let parent = parent_fixture("demo");
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE).unwrap();
let env = sts
.spec
.as_ref()
.unwrap()
.template
.spec
.as_ref()
.unwrap()
.containers
.iter()
.find(|c| c.name == "broker")
.expect("broker container")
.env
.as_ref()
.expect("env present")
.clone();
for never in [
"CRABKA_OTLP_ENABLED",
"CRABKA_OTLP_ENDPOINT",
"CRABKA_OTLP_PROTOCOL",
"CRABKA_OTLP_SAMPLE_RATIO",
"OTEL_SERVICE_NAME",
"CRABKA_OTLP_TIMEOUT_SECS",
] {
assert!(
env.iter().all(|e| e.name != never),
"{never} must not leak when tracing is None"
);
}
}
fn parent_with_version_status(
name: &str,
version_valid: Option<bool>,
finalized_metadata: Option<&str>,
) -> Kafka {
let mut parent = parent_fixture(name);
let mut conditions = Vec::new();
if let Some(valid) = version_valid {
let (status, reason, message) = if valid {
("True", "Valid", "kafkaVersion 3.7.0 metadata.version 3.7")
} else {
(
"False",
"InvalidVersion",
"spec.kafkaVersion \"99.9\" is not a valid version",
)
};
conditions.push(condition("KafkaVersionValid", status, reason, message));
}
parent.status = Some(crate::crd::KafkaStatus {
conditions,
metadata_version: finalized_metadata.map(str::to_string),
..Default::default()
});
parent
}
#[test]
fn version_gate_blocks_fresh_cluster_with_invalid_version() {
let parent = parent_with_version_status("demo", Some(false), None);
match version_gate(&parent) {
VersionGate::Blocked(cond) => {
assert!(cond.type_ == "Ready");
assert!(cond.status == "False");
assert!(cond.reason == "KafkaVersionInvalid");
assert!(
cond.message.contains("KafkaVersionValid=False"),
"pool condition should surface the parent's verdict, got: {}",
cond.message
);
}
VersionGate::Cleared => {
panic!("invalid parent version must block pod creation")
}
}
}
#[test]
fn version_gate_blocks_when_parent_has_no_version_status_yet() {
let parent = parent_fixture("demo");
assert!(parent.status.is_none(), "fixture precondition");
match version_gate(&parent) {
VersionGate::Blocked(cond) => {
assert!(cond.type_ == "Ready");
assert!(cond.status == "False");
assert!(cond.reason == "WaitingForVersionValidation");
}
VersionGate::Cleared => {
panic!("missing parent version status must block pod creation")
}
}
}
#[test]
fn version_gate_clears_when_kafkaversionvalid_true() {
let parent = parent_with_version_status("demo", Some(true), Some("3.7"));
assert!(
matches!(version_gate(&parent), VersionGate::Cleared),
"a valid parent version must clear the gate"
);
let pool = pool_fixture("brokers", "demo", 1);
let sts = render_statefulset(&parent, &pool, DEFAULT_BROKER_IMAGE)
.expect("pods are created as today when the version is valid");
assert!(sts.metadata.name.as_deref() == Some("demo-brokers"));
}
#[test]
fn version_gate_clears_when_metadata_version_finalized() {
let parent = parent_with_version_status("demo", Some(false), Some("3.7"));
assert!(
matches!(version_gate(&parent), VersionGate::Cleared),
"a finalized metadata version keeps a running cluster's pods"
);
}
}