use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt as _;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::{ConfigMap, Node, Pod, Secret, Service};
use k8s_openapi::api::networking::v1::Ingress;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::{Api, ListParams, Patch, PatchParams};
use kube::runtime::controller::{Action, Controller};
use kube::runtime::reflector::ObjectRef;
use kube::runtime::watcher;
use kube::{Resource, ResourceExt as _};
use serde_json::json;
use crate::context::Context;
use crate::controller::cluster_ca;
use crate::controller::common::{
self, FIELD_MANAGER, ReconcileError, apply_dynamic, apply_object, condition,
ensure_cluster_id_secret, owner_ref, patch_status, render_service,
};
use crate::controller::listeners::{
self, AdvertisedAddress, INGRESS_PORT, compute_advertised,
effective_inter_broker_listener_name, ingress_bootstrap_host, render_bootstrap_ingress,
render_bootstrap_route, render_bootstrap_service, render_broker_ingress, render_broker_route,
render_broker_service, synthesized_default_listener, validate_listeners,
};
use crate::controller::logging;
use crate::controller::network_policy;
use crate::crd::{
Kafka, KafkaCondition, KafkaNodePool, KafkaStatus, Listener, ListenerAddress,
ListenerAuthentication, ListenerAuthenticationOAuth, ListenerStatus, ListenerType,
};
pub(crate) struct ClusterRollup {
pub replicas: i32,
pub ready_replicas: i32,
pub pool_count: usize,
}
pub(crate) fn aggregate_pool_status<'a>(
pools: impl IntoIterator<Item = &'a KafkaNodePool>,
) -> ClusterRollup {
let mut r = ClusterRollup {
replicas: 0,
ready_replicas: 0,
pool_count: 0,
};
for pool in pools {
r.pool_count += 1;
let s = pool.status.as_ref();
r.replicas += s.and_then(|s| s.replicas).unwrap_or(0);
r.ready_replicas += s.and_then(|s| s.ready_replicas).unwrap_or(0);
}
r
}
pub(crate) fn rolling_condition_from_rollup(
rollup: &ClusterRollup,
) -> (bool, &'static str, String) {
if rollup.pool_count > 0 && rollup.ready_replicas < rollup.replicas {
(
true,
"RollingUpdate",
format!(
"{}/{} brokers ready (roll in progress)",
rollup.ready_replicas, rollup.replicas
),
)
} else {
(
false,
"Stable",
"all brokers on current revision".to_string(),
)
}
}
pub(crate) fn rollup_condition(rollup: &ClusterRollup) -> (bool, &'static str, String) {
if rollup.pool_count == 0 {
(
false,
"NoNodePools",
"no KafkaNodePool with label crabka.io/cluster=<name>".into(),
)
} else if rollup.ready_replicas == rollup.replicas && rollup.replicas > 0 {
(
true,
"Available",
format!(
"{}/{} brokers ready across {} pool(s)",
rollup.ready_replicas, rollup.replicas, rollup.pool_count
),
)
} else {
(
false,
"PartiallyReady",
format!(
"{}/{} brokers ready",
rollup.ready_replicas, rollup.replicas
),
)
}
}
pub async fn run(ctx: Context) -> anyhow::Result<()> {
let api: Api<Kafka> = Api::all(ctx.client.clone());
let pools: Api<KafkaNodePool> = Api::all(ctx.client.clone());
let nodes: Api<Node> = Api::all(ctx.client.clone());
Controller::new(api, watcher::Config::default())
.watches(pools, watcher::Config::default(), |pool| {
let ns = pool.meta().namespace.clone();
let kafka_name = pool
.meta()
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/cluster").cloned());
match (kafka_name, ns) {
(Some(name), Some(ns)) => Some(ObjectRef::<Kafka>::new(&name).within(&ns)),
_ => None,
}
.into_iter()
})
.watches(nodes, watcher::Config::default(), |_node| {
Vec::<ObjectRef<Kafka>>::new().into_iter()
})
.run(reconcile, error_policy, Arc::new(ctx))
.for_each(|res| async move {
match res {
Ok((obj, _)) => tracing::debug!(?obj, "reconciled"),
Err(e) => tracing::warn!(error = %e, "reconcile error"),
}
})
.await;
Ok(())
}
#[derive(Debug, Clone)]
pub(crate) struct BrokerInfo {
pub broker_id: i32,
pub pod_name: String,
pub pod_fqdn: String,
}
pub(crate) fn enumerate_brokers(
cluster_name: &str,
namespace: &str,
pools: &[KafkaNodePool],
) -> Vec<BrokerInfo> {
let svc = format!("{cluster_name}-broker-headless");
let mut out = Vec::with_capacity(pools.len());
let mut sorted: Vec<&KafkaNodePool> = pools.iter().collect();
sorted.sort_by_key(|p| p.name_any());
for pool in sorted {
let pool_name = pool.name_any();
let pod_name = format!("{cluster_name}-{pool_name}-0");
let pod_fqdn = format!("{pod_name}.{svc}.{namespace}.svc.cluster.local");
out.push(BrokerInfo {
broker_id: pool.spec.node_id_start,
pod_name,
pod_fqdn,
});
}
out
}
pub(crate) fn build_listener_status(
effective_listeners: &[Listener],
addresses_per_broker: &BTreeMap<i32, BTreeMap<String, AdvertisedAddress>>,
bootstrap_services: &HashMap<String, Service>,
nodes: &HashMap<String, Node>,
cluster_name: &str,
namespace: &str,
) -> Vec<ListenerStatus> {
let mut out = Vec::new();
for l in effective_listeners {
let mut addresses: Vec<ListenerAddress> = addresses_per_broker
.values()
.filter_map(|m| m.get(&l.name))
.map(|a| ListenerAddress {
host: a.host.clone(),
port: a.port,
})
.collect();
addresses.sort_by(|a, b| a.host.cmp(&b.host).then(a.port.cmp(&b.port)));
let bootstrap =
resolve_bootstrap_servers(l, bootstrap_services, nodes, cluster_name, namespace);
if let Some(bootstrap_servers) = bootstrap {
out.push(ListenerStatus {
name: l.name.clone(),
type_: l.type_,
bootstrap_servers,
addresses,
});
}
}
out
}
fn resolve_bootstrap_servers(
listener: &Listener,
bootstrap_services: &HashMap<String, Service>,
nodes: &HashMap<String, Node>,
cluster_name: &str,
namespace: &str,
) -> Option<String> {
match listener.type_ {
ListenerType::Internal => Some(format!(
"{cluster_name}-broker-headless.{namespace}.svc.cluster.local:{}",
listener.port
)),
ListenerType::Nodeport => {
let svc_name = format!("{cluster_name}-{}-bootstrap", listener.name);
let svc = bootstrap_services.get(&svc_name)?;
let node_port = svc
.spec
.as_ref()
.and_then(|s| s.ports.as_ref())
.and_then(|ps| ps.first())
.and_then(|p| p.node_port)?;
let host = nodes.values().find_map(|n| {
let addrs = n.status.as_ref().and_then(|s| s.addresses.as_ref())?;
addrs
.iter()
.find(|a| a.type_ == "ExternalIP")
.or_else(|| addrs.iter().find(|a| a.type_ == "InternalIP"))
.map(|a| a.address.clone())
})?;
Some(format!("{host}:{node_port}"))
}
ListenerType::Loadbalancer => {
let svc_name = format!("{cluster_name}-{}-bootstrap", listener.name);
let svc = bootstrap_services.get(&svc_name)?;
let ingress = svc
.status
.as_ref()
.and_then(|st| st.load_balancer.as_ref())
.and_then(|lb| lb.ingress.as_ref())
.and_then(|ig| ig.first())?;
let host = ingress.hostname.clone().or_else(|| ingress.ip.clone())?;
Some(format!("{host}:{}", listener.port))
}
ListenerType::Ingress | ListenerType::Route => {
let host = ingress_bootstrap_host(listener)?;
Some(format!("{host}:{INGRESS_PORT}"))
}
}
}
async fn apply_external_services(
ctx: &Context,
svc_api: &Api<Service>,
owner: &Kafka,
namespace: &str,
cluster_name: &str,
effective_listeners: &[Listener],
brokers: &[BrokerInfo],
) -> Result<(), ReconcileError> {
let ingress_api: Api<Ingress> = Api::namespaced(ctx.client.clone(), namespace);
for l in effective_listeners
.iter()
.filter(|l| l.type_ != ListenerType::Internal)
{
let bs = render_bootstrap_service(owner, l)?;
let bs_name = format!("{cluster_name}-{}-bootstrap", l.name);
apply_object(svc_api, &bs_name, &bs).await?;
for b in brokers {
let per = render_broker_service(owner, l, b.broker_id, &b.pod_name)?;
let per_name = format!("{cluster_name}-{}-{}", l.name, b.broker_id);
apply_object(svc_api, &per_name, &per).await?;
}
match l.type_ {
ListenerType::Ingress => {
if let Some(host) = ingress_bootstrap_host(l) {
let ing = render_bootstrap_ingress(owner, l, &host)?;
apply_object(&ingress_api, &bs_name, &ing).await?;
}
for b in brokers {
if let Some(host) = listeners::ingress_broker_host(l, b.broker_id) {
let ing = render_broker_ingress(owner, l, b.broker_id, &host)?;
let per_name = format!("{cluster_name}-{}-{}", l.name, b.broker_id);
apply_object(&ingress_api, &per_name, &ing).await?;
}
}
}
ListenerType::Route => {
if let Some(host) = ingress_bootstrap_host(l) {
let body = render_bootstrap_route(owner, l, &host)?;
apply_route(ctx, namespace, &bs_name, &body).await?;
}
for b in brokers {
if let Some(host) = listeners::ingress_broker_host(l, b.broker_id) {
let body = render_broker_route(owner, l, b.broker_id, &host)?;
let per_name = format!("{cluster_name}-{}-{}", l.name, b.broker_id);
apply_route(ctx, namespace, &per_name, &body).await?;
}
}
}
_ => {}
}
}
Ok(())
}
fn canonical_oauth_config(listeners: &[Listener]) -> Option<ListenerAuthenticationOAuth> {
listeners.iter().find_map(|l| match &l.authentication {
Some(ListenerAuthentication::OAuth(cfg)) => Some(cfg.clone()),
_ => None,
})
}
pub(crate) fn oauth_jwks_trust_secret_name(kafka: &Kafka) -> Option<String> {
let canonical = canonical_oauth_config(&kafka.spec.listeners)?;
if canonical.tls_trusted_certificates.is_empty() {
return None;
}
Some(format!("{}-oauth-jwks-trust", kafka.name_any()))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct OauthIntrospectionMount {
pub secret_name: String,
pub key: String,
}
pub(crate) fn oauth_introspection_secret_mount(kafka: &Kafka) -> Option<OauthIntrospectionMount> {
let canonical = canonical_oauth_config(&kafka.spec.listeners)?;
if canonical.access_token_is_jwt {
return None;
}
let cs = canonical.client_secret.as_ref()?;
Some(OauthIntrospectionMount {
secret_name: cs.secret_name.clone(),
key: cs.key.clone(),
})
}
pub(crate) struct GssapiKeytabMount {
pub secret_name: String,
pub key: String,
}
pub(crate) fn gssapi_keytab_mount(kafka: &Kafka) -> Option<GssapiKeytabMount> {
kafka
.spec
.listeners
.iter()
.find_map(|l| match &l.authentication {
Some(ListenerAuthentication::Gssapi(c)) => Some(GssapiKeytabMount {
secret_name: c.keytab_secret_ref.secret_name.clone(),
key: c.keytab_secret_ref.key.clone(),
}),
_ => None,
})
}
pub(crate) fn krb5_conf_mount(kafka: &Kafka) -> Option<(String, String)> {
kafka
.spec
.krb5_conf_secret_ref
.as_ref()
.map(|r| (r.secret_name.clone(), r.key.clone()))
}
async fn reconcile_oauth_jwks_trust(
secret_api: &Api<Secret>,
kafka: &Kafka,
canonical: Option<&ListenerAuthenticationOAuth>,
) -> Result<Option<String>, ReconcileError> {
let Some(canonical) = canonical else {
return Ok(None);
};
if canonical.tls_trusted_certificates.is_empty() {
return Ok(None);
}
let mut bundle = Vec::<u8>::new();
for entry in &canonical.tls_trusted_certificates {
let src = secret_api
.get_opt(&entry.secret_name)
.await?
.ok_or_else(|| ReconcileError::MissingOauthTrustSecret(entry.secret_name.clone()))?;
let key_bytes = src
.data
.as_ref()
.and_then(|d| d.get(&entry.certificate))
.ok_or_else(|| ReconcileError::MissingOauthTrustKey {
secret: entry.secret_name.clone(),
key: entry.certificate.clone(),
})?;
if key_bytes.0.is_empty() {
return Err(ReconcileError::EmptyOauthTrustValue {
secret: entry.secret_name.clone(),
key: entry.certificate.clone(),
});
}
if !bundle.is_empty() && !bundle.ends_with(b"\n") {
bundle.push(b'\n');
}
bundle.extend_from_slice(&key_bytes.0);
}
let managed_name = format!("{}-oauth-jwks-trust", kafka.name_any());
upsert_oauth_trust_secret(secret_api, kafka, &managed_name, bundle).await?;
Ok(Some(managed_name))
}
async fn upsert_oauth_trust_secret(
secret_api: &Api<Secret>,
kafka: &Kafka,
managed_name: &str,
bundle: Vec<u8>,
) -> Result<(), ReconcileError> {
let labels = common::common_labels(&kafka.name_any(), &kafka.spec.kafka_version, None);
let mut data = BTreeMap::new();
data.insert("ca.crt".to_string(), ByteString(bundle));
let secret = Secret {
metadata: ObjectMeta {
name: Some(managed_name.to_string()),
namespace: kafka.meta().namespace.clone(),
labels: Some(labels),
owner_references: Some(vec![owner_ref::<Kafka>(kafka)?]),
..Default::default()
},
type_: Some("Opaque".into()),
data: Some(data),
..Default::default()
};
apply_object(secret_api, managed_name, &secret).await
}
async fn reconcile_oauth_introspection_secret(
secret_api: &Api<Secret>,
_kafka: &Kafka,
canonical: Option<&ListenerAuthenticationOAuth>,
) -> Result<Option<OauthIntrospectionMount>, ReconcileError> {
let Some(c) = canonical else {
return Ok(None);
};
if c.access_token_is_jwt {
return Ok(None);
}
let cs = c.client_secret.as_ref().ok_or_else(|| {
ReconcileError::InvalidListenerOauthAccessTokenIsJwt(
"introspection mode requires clientSecret".into(),
)
})?;
let src = secret_api
.get_opt(&cs.secret_name)
.await?
.ok_or_else(|| ReconcileError::MissingOauthIntrospectionSecret(cs.secret_name.clone()))?;
let val = src
.data
.as_ref()
.and_then(|d| d.get(&cs.key))
.ok_or_else(|| ReconcileError::MissingOauthIntrospectionKey {
secret: cs.secret_name.clone(),
key: cs.key.clone(),
})?;
if val.0.is_empty() {
return Err(ReconcileError::EmptyOauthIntrospectionValue {
secret: cs.secret_name.clone(),
key: cs.key.clone(),
});
}
Ok(Some(OauthIntrospectionMount {
secret_name: cs.secret_name.clone(),
key: cs.key.clone(),
}))
}
async fn apply_route(
ctx: &Context,
namespace: &str,
name: &str,
body: &serde_json::Value,
) -> Result<(), ReconcileError> {
apply_dynamic(
&ctx.client,
namespace,
"route.openshift.io/v1",
"Route",
"routes",
name,
body,
)
.await
}
#[allow(clippy::type_complexity)]
async fn read_external_state(
ctx: &Context,
svc_api: &Api<Service>,
namespace: &str,
cluster_name: &str,
effective_listeners: &[Listener],
brokers: &[BrokerInfo],
) -> Result<
(
HashMap<String, Node>,
HashMap<String, Pod>,
HashMap<String, Service>,
HashMap<(String, i32), Service>,
),
ReconcileError,
> {
let needs_node_pod = effective_listeners
.iter()
.any(|l| matches!(l.type_, ListenerType::Nodeport | ListenerType::Loadbalancer));
let mut nodes = HashMap::new();
let mut pods_by_name = HashMap::new();
if needs_node_pod {
let node_api: Api<Node> = Api::all(ctx.client.clone());
for n in node_api.list(&ListParams::default()).await?.items {
if let Some(nname) = n.metadata.name.clone() {
nodes.insert(nname, n);
}
}
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), namespace);
let pod_lp =
ListParams::default().labels(&format!("app.kubernetes.io/instance={cluster_name}"));
for p in pod_api.list(&pod_lp).await?.items {
if let Some(pname) = p.metadata.name.clone() {
pods_by_name.insert(pname, p);
}
}
}
let mut bootstrap_services = HashMap::new();
let mut broker_services = HashMap::new();
for l in effective_listeners
.iter()
.filter(|l| matches!(l.type_, ListenerType::Nodeport | ListenerType::Loadbalancer))
{
let bs_name = format!("{cluster_name}-{}-bootstrap", l.name);
if let Some(bs) = svc_api.get_opt(&bs_name).await? {
bootstrap_services.insert(bs_name, bs);
}
for b in brokers {
let per_name = format!("{cluster_name}-{}-{}", l.name, b.broker_id);
if let Some(s) = svc_api.get_opt(&per_name).await? {
broker_services.insert((l.name.clone(), b.broker_id), s);
}
}
}
Ok((nodes, pods_by_name, bootstrap_services, broker_services))
}
fn resolve_addresses_per_broker(
effective_listeners: &[Listener],
brokers: &[BrokerInfo],
pods_by_name: &HashMap<String, Pod>,
nodes: &HashMap<String, Node>,
broker_services: &HashMap<(String, i32), Service>,
) -> Result<BTreeMap<i32, BTreeMap<String, AdvertisedAddress>>, listeners::AdvertisedError> {
let mut out: BTreeMap<i32, BTreeMap<String, AdvertisedAddress>> = BTreeMap::new();
for b in brokers {
let mut listener_map: BTreeMap<String, AdvertisedAddress> = BTreeMap::new();
for l in effective_listeners {
let pod_node = pods_by_name
.get(&b.pod_name)
.and_then(|p| p.spec.as_ref())
.and_then(|s| s.node_name.as_deref());
let svc_ref = broker_services.get(&(l.name.clone(), b.broker_id));
let addr = compute_advertised(l, b.broker_id, &b.pod_fqdn, pod_node, nodes, svc_ref)?;
listener_map.insert(l.name.clone(), addr);
}
out.insert(b.broker_id, listener_map);
}
Ok(out)
}
async fn patch_status_with_condition(
kafka_api: &Api<Kafka>,
name: &str,
new_cond: KafkaCondition,
) -> Result<(), ReconcileError> {
let current = kafka_api.get_status(name).await?.status.unwrap_or_default();
let mut conditions: Vec<KafkaCondition> = current
.conditions
.into_iter()
.filter(|c| c.type_ != new_cond.type_)
.collect();
conditions.push(new_cond);
let status = KafkaStatus {
conditions,
..current
};
patch_status::<Kafka, KafkaStatus>(kafka_api, name, status).await
}
#[allow(clippy::too_many_lines)] pub async fn reconcile(obj: Arc<Kafka>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
let ns = obj.namespace().unwrap_or_else(|| "default".into());
let name = obj.name_any();
tracing::info!(%ns, %name, "reconciling Kafka");
let svc_api: Api<Service> = Api::namespaced(ctx.client.clone(), &ns);
let svc = render_service(&obj)?;
apply_object(&svc_api, &svc_name(&name), &svc).await?;
let validation = validate_listeners(
&obj.spec.listeners,
obj.spec.inter_broker_listener_name.as_deref(),
);
let effective_listeners: Vec<Listener> = if obj.spec.listeners.is_empty() {
vec![synthesized_default_listener()]
} else {
obj.spec.listeners.clone()
};
let inter_broker_name = effective_inter_broker_listener_name(
&obj.spec.listeners,
obj.spec.inter_broker_listener_name.as_deref(),
);
for msg in listeners::weak_auth_warnings(&effective_listeners) {
emit_weak_auth_event(&ctx.client, &ns, &obj, &msg)
.await
.ok();
}
let cm_api: Api<ConfigMap> = Api::namespaced(ctx.client.clone(), &ns);
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &ns);
let _cluster_id = ensure_cluster_id_secret(&secret_api, &obj).await?;
let kafka_api_for_ts: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
if let Some(ts) = &obj.spec.tiered_storage {
match ts.validate() {
Ok(()) => {
let cond = condition(
"TieredStorageReady",
"True",
"Validated",
"tieredStorage spec is well-formed",
);
patch_status_with_condition(&kafka_api_for_ts, &name, cond).await?;
}
Err(why) => {
let cond = condition(
"TieredStorageReady",
"False",
"TieredStorageInvalid",
&format!("tieredStorage: {why}"),
);
patch_status_with_condition(&kafka_api_for_ts, &name, cond).await?;
return Err(ReconcileError::TieredStorageInvalid(why));
}
}
}
if let Some(tr) = &obj.spec.tracing {
match tr.validate() {
Ok(()) => {
let cond = condition(
"TracingReady",
"True",
"Validated",
"tracing spec is well-formed",
);
patch_status_with_condition(&kafka_api_for_ts, &name, cond).await?;
}
Err(why) => {
let cond = condition(
"TracingReady",
"False",
"TracingInvalid",
&format!("tracing: {why}"),
);
patch_status_with_condition(&kafka_api_for_ts, &name, cond).await?;
return Err(ReconcileError::TracingInvalid(why));
}
}
}
let finalized_metadata = obj
.status
.as_ref()
.and_then(|s| s.metadata_version.as_deref());
let version_outcome = crate::version::evaluate(
&obj.spec.kafka_version,
obj.spec.metadata_version.as_deref(),
finalized_metadata,
);
let (version_cond, resolved_metadata): (KafkaCondition, Option<String>) = match &version_outcome
{
crate::version::VersionOutcome::Valid { resolved_metadata } => (
condition(
"KafkaVersionValid",
"True",
"Valid",
&format!(
"kafkaVersion {} metadata.version {resolved_metadata}",
obj.spec.kafka_version
),
),
Some(resolved_metadata.clone()),
),
crate::version::VersionOutcome::Invalid { reason, message } => (
condition("KafkaVersionValid", "False", reason.as_str(), message),
None,
),
};
let explicit_pin: Option<&str> = if resolved_metadata.is_some() {
obj.spec.metadata_version.as_deref()
} else {
None
};
let logging_outcome = logging::resolve_logging(&ctx, &obj, &ns).await?;
let logging_filter = logging_outcome.filter().map(str::to_string);
let logging_condition = logging::condition_for(&logging_outcome);
let pool_api: Api<KafkaNodePool> = Api::namespaced(ctx.client.clone(), &ns);
let lp = ListParams::default().labels(&format!("crabka.io/cluster={name}"));
let pools = pool_api.list(&lp).await?;
let rollout_converged = pools_converged(pools.iter());
let cr_anns = obj.meta().annotations.clone().unwrap_or_default();
let force_renew = cr_anns.contains_key(cluster_ca::ANN_FORCE_RENEW)
|| cr_anns.contains_key(cluster_ca::ANN_RENEW_AFTER);
let force_replace_key = cr_anns.contains_key(cluster_ca::ANN_FORCE_REPLACE_KEY);
let now = time::OffsetDateTime::now_utc();
let (cluster_ca_outcome, clients_ca_outcome, cluster_ca_cond, clients_ca_cond) = {
let cluster_result = cluster_ca::reconcile_ca(
&secret_api,
&obj,
cluster_ca::WhichCa::Cluster,
force_renew,
force_replace_key,
rollout_converged,
now,
)
.await;
let cluster_outcome = match cluster_result {
Ok(o) => o,
Err(ReconcileError::ByoCaMissing { ref which }) => {
let cond = condition(
which,
"False",
"ByoCaMissing",
"spec.clusterCa.generateCertificateAuthority=false but the CA Secret pair is absent",
);
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status_with_condition(&kafka_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_mins(1)));
}
Err(e) => return Err(e),
};
let clients_result = cluster_ca::reconcile_ca(
&secret_api,
&obj,
cluster_ca::WhichCa::Clients,
false,
false,
true,
now,
)
.await;
let clients_outcome = match clients_result {
Ok(o) => o,
Err(ReconcileError::ByoCaMissing { ref which }) => {
let cond = condition(
which,
"False",
"ByoCaMissing",
"spec.clientsCa.generateCertificateAuthority=false but the CA Secret pair is absent",
);
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status_with_condition(&kafka_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_mins(1)));
}
Err(e) => return Err(e),
};
let cc = condition(
"ClusterCaReady",
"True",
"CaReady",
"cluster CA Secret pair present and parseable",
);
let clic = condition(
"ClientsCaReady",
"True",
"CaReady",
"clients CA Secret pair present and parseable",
);
(cluster_outcome, clients_outcome, cc, clic)
};
let strip_keys: Vec<&str> = [
cluster_ca::ANN_FORCE_RENEW,
cluster_ca::ANN_FORCE_REPLACE_KEY,
cluster_ca::ANN_RENEW_AFTER,
]
.into_iter()
.filter(|k| cr_anns.contains_key(*k))
.collect();
if !strip_keys.is_empty() {
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
strip_annotations(&kafka_api, &name, &strip_keys).await?;
}
if cluster_ca_outcome.refused.is_some() {
emit_ca_rotation_refused_event(
&ctx.client,
&ns,
&obj,
&cluster_ca_outcome.rotation_message,
)
.await
.ok();
}
let ca_rotation_cond = condition(
"CaRotation",
if cluster_ca_outcome.rotation_in_progress {
"True"
} else {
"False"
},
cluster_ca_outcome.rotation_reason,
&cluster_ca_outcome.rotation_message,
);
let cfg_hash = common::combined_config_hash(
&obj.spec,
Some(&cluster_ca_outcome.trust_bundle_pem),
explicit_pin,
logging_filter.as_deref(),
);
let listener_status: Vec<ListenerStatus>;
let (listeners_valid_cond, listeners_ready_cond);
let mut lb_pending: Vec<(i32, String)> = Vec::new();
if let Err(e) = validation {
adopt_pools(&pool_api, &obj, pools.iter(), &cfg_hash).await?;
listener_status = vec![];
listeners_valid_cond = condition("ListenersValid", "False", e.reason(), &e.message());
listeners_ready_cond =
condition("ListenersReady", "False", "ListenersInvalid", &e.message());
} else {
let oauth_canonical = canonical_oauth_config(&effective_listeners);
match reconcile_oauth_jwks_trust(&secret_api, &obj, oauth_canonical.as_ref()).await {
Ok(_) => {}
Err(
e @ (ReconcileError::MissingOauthTrustSecret(_)
| ReconcileError::MissingOauthTrustKey { .. }
| ReconcileError::EmptyOauthTrustValue { .. }),
) => {
let reason = match &e {
ReconcileError::MissingOauthTrustSecret(_) => "MissingOauthTrustSecret",
ReconcileError::MissingOauthTrustKey { .. } => "MissingOauthTrustKey",
ReconcileError::EmptyOauthTrustValue { .. } => "EmptyOauthTrustValue",
_ => unreachable!(),
};
let cond = condition("Ready", "False", reason, &e.to_string());
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status_with_condition(&kafka_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
}
Err(e) => return Err(e),
}
match reconcile_oauth_introspection_secret(&secret_api, &obj, oauth_canonical.as_ref())
.await
{
Ok(_) => {}
Err(
e @ (ReconcileError::InvalidListenerOauthAccessTokenIsJwt(_)
| ReconcileError::MissingOauthIntrospectionSecret(_)
| ReconcileError::MissingOauthIntrospectionKey { .. }
| ReconcileError::EmptyOauthIntrospectionValue { .. }),
) => {
let reason = match &e {
ReconcileError::InvalidListenerOauthAccessTokenIsJwt(_) => {
"InvalidListenerOauthAccessTokenIsJwt"
}
ReconcileError::MissingOauthIntrospectionSecret(_) => {
"MissingOauthIntrospectionSecret"
}
ReconcileError::MissingOauthIntrospectionKey { .. } => {
"MissingOauthIntrospectionKey"
}
ReconcileError::EmptyOauthIntrospectionValue { .. } => {
"EmptyOauthIntrospectionValue"
}
_ => unreachable!(),
};
let cond = condition("Ready", "False", reason, &e.to_string());
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status_with_condition(&kafka_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
}
Err(e) => return Err(e),
}
let gssapi_secret_check: Result<(), ReconcileError> = async {
if let Some(m) = gssapi_keytab_mount(&obj) {
let secret = secret_api.get_opt(&m.secret_name).await?.ok_or_else(|| {
ReconcileError::MissingGssapiKeytabSecret(m.secret_name.clone())
})?;
let has_key = secret.data.as_ref().is_some_and(|d| d.contains_key(&m.key))
|| secret
.string_data
.as_ref()
.is_some_and(|d| d.contains_key(&m.key));
if !has_key {
return Err(ReconcileError::MissingGssapiKeytabKey {
secret: m.secret_name,
key: m.key,
});
}
}
if let Some((secret_name, key)) = krb5_conf_mount(&obj) {
let secret = secret_api
.get_opt(&secret_name)
.await?
.ok_or_else(|| ReconcileError::MissingKrb5ConfSecret(secret_name.clone()))?;
let has_key = secret.data.as_ref().is_some_and(|d| d.contains_key(&key))
|| secret
.string_data
.as_ref()
.is_some_and(|d| d.contains_key(&key));
if !has_key {
return Err(ReconcileError::MissingKrb5ConfKey {
secret: secret_name,
key,
});
}
}
Ok(())
}
.await;
match gssapi_secret_check {
Ok(()) => {}
Err(
e @ (ReconcileError::MissingGssapiKeytabSecret(_)
| ReconcileError::MissingGssapiKeytabKey { .. }
| ReconcileError::MissingKrb5ConfSecret(_)
| ReconcileError::MissingKrb5ConfKey { .. }),
) => {
let reason = match &e {
ReconcileError::MissingGssapiKeytabSecret(_) => "MissingGssapiKeytabSecret",
ReconcileError::MissingGssapiKeytabKey { .. } => "MissingGssapiKeytabKey",
ReconcileError::MissingKrb5ConfSecret(_) => "MissingKrb5ConfSecret",
ReconcileError::MissingKrb5ConfKey { .. } => "MissingKrb5ConfKey",
_ => unreachable!(),
};
let cond = condition("Ready", "False", reason, &e.to_string());
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status_with_condition(&kafka_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
}
Err(e) => return Err(e),
}
if let Err(e) = listeners::validate_inter_broker_gssapi(
&effective_listeners,
&inter_broker_name,
obj.spec.inter_broker_kerberos.is_some(),
) {
let cond = condition("ListenersValid", "False", e.reason(), &e.message());
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status_with_condition(&kafka_api, &name, cond).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
}
let pool_items: Vec<KafkaNodePool> = pools.items.clone();
let brokers = enumerate_brokers(&name, &ns, &pool_items);
let broker_ids: Vec<i32> = brokers.iter().map(|b| b.broker_id).collect();
let observed = listeners::observe_listener_addresses(
&ctx,
&ns,
&name,
&effective_listeners,
&broker_ids,
)
.await?;
let extra_sans_per_broker: BTreeMap<i32, Vec<crabka_security::ca::SubjectAltName>> =
brokers
.iter()
.filter_map(|b| {
match listeners::compute_extra_sans(
b.broker_id,
&effective_listeners,
&observed,
) {
Ok(sans) => Some((b.broker_id, sans)),
Err(listeners::SanComputationError::SansNotReady {
broker_id,
listener,
}) => {
tracing::warn!(
broker_id,
%listener,
"LB ingress not ready; skipping cert SAN extension for this broker"
);
lb_pending.push((broker_id, listener));
None
}
}
})
.collect();
let keystore_requests: Vec<cluster_ca::BrokerCertRequest> = brokers
.iter()
.map(|b| {
let id = b.broker_id;
let cn = b.pod_name.clone();
let sans = vec![
crabka_security::ca::SubjectAltName::Dns(b.pod_fqdn.clone()),
crabka_security::ca::SubjectAltName::Dns(b.pod_name.clone()),
crabka_security::ca::SubjectAltName::Dns(format!(
"{name}-broker-headless.{ns}.svc.cluster.local"
)),
crabka_security::ca::SubjectAltName::Ip(std::net::IpAddr::V4(
std::net::Ipv4Addr::LOCALHOST,
)),
];
let extra = extra_sans_per_broker.get(&id).cloned().unwrap_or_default();
cluster_ca::BrokerCertRequest {
broker_id: id,
cn,
sans,
extra_sans: extra,
}
})
.collect();
cluster_ca::ensure_broker_keystore(
&secret_api,
&obj,
&keystore_requests,
&cluster_ca_outcome.signing_material,
cluster_ca_outcome.force_reissue_leafs,
)
.await?;
let tls_per_broker: std::collections::BTreeMap<i32, listeners::BrokerTlsRender> = brokers
.iter()
.map(|b| {
let id = b.broker_id;
(
id,
listeners::BrokerTlsRender {
controller_listener_protocol: "Ssl".into(),
cert_path: format!("/etc/crabka/broker-tls/{id}.crt"),
key_path: format!("/etc/crabka/broker-tls/{id}.key"),
client_ca_path: "/etc/crabka/cluster-ca/ca.crt".into(),
client_auth: "Required".into(),
},
)
})
.collect();
let clients_ca_path: Option<&str> = if effective_listeners
.iter()
.any(|l| matches!(l.authentication, Some(ListenerAuthentication::Tls)))
{
Some("/etc/crabka/clients-ca/ca.crt")
} else {
None
};
let apply_cm = async |listeners_for_cm: &[Listener],
addresses: &BTreeMap<i32, BTreeMap<String, AdvertisedAddress>>|
-> Result<(), ReconcileError> {
let cm = common::render_configmap(
&obj,
listeners_for_cm,
addresses,
&inter_broker_name,
Some(&tls_per_broker),
clients_ca_path,
logging_filter.as_deref(),
)?;
apply_object(&cm_api, &cm_name(&name), &cm).await?;
Ok(())
};
let has_external = effective_listeners
.iter()
.any(|l| l.type_ != ListenerType::Internal);
let (nodes, pods_by_name, bootstrap_services, broker_services) = if has_external {
apply_external_services(
&ctx,
&svc_api,
&obj,
&ns,
&name,
&effective_listeners,
&brokers,
)
.await?;
read_external_state(&ctx, &svc_api, &ns, &name, &effective_listeners, &brokers).await?
} else {
(
HashMap::new(),
HashMap::new(),
HashMap::new(),
HashMap::new(),
)
};
match resolve_addresses_per_broker(
&effective_listeners,
&brokers,
&pods_by_name,
&nodes,
&broker_services,
) {
Err(err) => {
adopt_pools(&pool_api, &obj, pools.iter(), &cfg_hash).await?;
listener_status = vec![];
listeners_valid_cond =
condition("ListenersValid", "True", "Valid", "listeners validated");
listeners_ready_cond = condition(
"ListenersReady",
"False",
"PendingExternalAddresses",
&err.message(),
);
}
Ok(addresses_per_broker) => {
apply_cm(&effective_listeners, &addresses_per_broker).await?;
adopt_pools(&pool_api, &obj, pools.iter(), &cfg_hash).await?;
listener_status = build_listener_status(
&effective_listeners,
&addresses_per_broker,
&bootstrap_services,
&nodes,
&name,
&ns,
);
listeners_valid_cond =
condition("ListenersValid", "True", "Valid", "listeners validated");
let msg = format!("{} listener(s) ready", effective_listeners.len());
listeners_ready_cond = condition("ListenersReady", "True", "Ready", &msg);
}
}
}
let metrics_outcome =
crate::controller::metrics::reconcile_metrics(&ctx, &obj, &name, &ns).await;
let metrics_condition = match &metrics_outcome {
None => condition(
"MetricsReady",
"False",
"Disabled",
"spec.metricsConfig is not set",
),
Some(Ok(())) => condition(
"MetricsReady",
"True",
"Available",
"metrics resources reconciled",
),
Some(Err(ReconcileError::MetricsMutuallyExclusive)) => condition(
"MetricsReady",
"False",
"MutuallyExclusive",
"podMonitor and serviceMonitor are mutually exclusive",
),
Some(Err(ReconcileError::PrometheusOperatorCrdsMissing)) => condition(
"MetricsReady",
"False",
"PrometheusOperatorCrdsMissing",
"monitoring.coreos.com/v1 is not served by the API server",
),
Some(Err(_)) => condition("MetricsReady", "False", "Error", "metrics reconcile failed"),
};
let inter_broker_port = effective_listeners
.iter()
.find(|l| l.name == inter_broker_name)
.map_or(common::BROKER_PORT, |l| l.port);
let np_outcome = network_policy::reconcile_network_policy(
&ctx,
&obj,
&name,
&ns,
&effective_listeners,
inter_broker_port,
)
.await;
let np_condition = match &np_outcome {
None => condition(
"NetworkPolicyReady",
"False",
"Disabled",
"spec.networkPolicy is not set",
),
Some(Ok(())) => condition(
"NetworkPolicyReady",
"True",
"Available",
"network policy reconciled",
),
Some(Err(_)) => condition(
"NetworkPolicyReady",
"False",
"Error",
"network policy reconcile failed",
),
};
let rollup = aggregate_pool_status(pools.iter());
let (ready, reason, message) = rollup_condition(&rollup);
let (rolling, rolling_reason, rolling_message) = rolling_condition_from_rollup(&rollup);
let mut conditions = vec![
condition(
"Ready",
if ready { "True" } else { "False" },
reason,
&message,
),
condition(
"Rolling",
if rolling { "True" } else { "False" },
rolling_reason,
&rolling_message,
),
listeners_valid_cond,
listeners_ready_cond,
metrics_condition,
np_condition,
cluster_ca_cond,
clients_ca_cond,
ca_rotation_cond,
version_cond,
logging_condition,
];
let has_lb_tls_listener = effective_listeners
.iter()
.any(|l| l.type_ == ListenerType::Loadbalancer && l.tls);
if has_lb_tls_listener {
if lb_pending.is_empty() {
conditions.push(condition(
"WaitingForLoadBalancerIp",
"False",
"LoadBalancerReady",
"all broker LB ingress addresses assigned",
));
} else {
let detail: Vec<String> = lb_pending
.iter()
.map(|(id, l)| format!("broker {id} listener '{l}'"))
.collect();
conditions.push(condition(
"WaitingForLoadBalancerIp",
"True",
"LoadBalancerPending",
&format!("LB ingress not ready for: {}", detail.join(", ")),
));
}
}
let status = KafkaStatus {
conditions,
replicas: Some(rollup.replicas),
ready_replicas: Some(rollup.ready_replicas),
listeners: listener_status,
cluster_ca: Some(crate::crd::CertificateAuthorityStatus {
not_after: cluster_ca_outcome.not_after.clone(),
generated: cluster_ca_outcome.generated,
cert_generation: cluster_ca_outcome.cert_generation,
key_generation: cluster_ca_outcome.key_generation,
rotation_phase: Some(cluster_ca_outcome.phase.as_str().to_string()),
trust_anchors: Some(cluster_ca_outcome.trust_anchors),
}),
clients_ca: Some(crate::crd::CertificateAuthorityStatus {
not_after: clients_ca_outcome.not_after.clone(),
generated: clients_ca_outcome.generated,
cert_generation: clients_ca_outcome.cert_generation,
key_generation: clients_ca_outcome.key_generation,
rotation_phase: Some(clients_ca_outcome.phase.as_str().to_string()),
trust_anchors: Some(clients_ca_outcome.trust_anchors),
}),
kafka_version: Some(obj.spec.kafka_version.clone()),
metadata_version: resolved_metadata
.clone()
.or_else(|| finalized_metadata.map(str::to_string)),
};
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
patch_status::<Kafka, KafkaStatus>(&kafka_api, &name, status).await?;
if let Some(Err(e)) = metrics_outcome
&& !matches!(
e,
ReconcileError::MetricsMutuallyExclusive
| ReconcileError::PrometheusOperatorCrdsMissing
)
{
return Err(e);
}
if let Some(Err(e)) = np_outcome {
return Err(e);
}
Ok(Action::requeue(Duration::from_secs(30)))
}
async fn adopt_pools<'a>(
pool_api: &Api<KafkaNodePool>,
parent: &Kafka,
pools: impl IntoIterator<Item = &'a KafkaNodePool>,
config_hash: &str,
) -> Result<(), ReconcileError> {
let owner = owner_ref::<Kafka>(parent)?;
let params = PatchParams {
field_manager: Some(FIELD_MANAGER.into()),
force: true,
..Default::default()
};
let mut ordered: Vec<&KafkaNodePool> = pools.into_iter().collect();
ordered.sort_by(|a, b| {
a.spec
.node_id_start
.cmp(&b.spec.node_id_start)
.then_with(|| a.name_any().cmp(&b.name_any()))
});
let states: Vec<common::PoolRolloutState> = ordered
.iter()
.map(|p| common::PoolRolloutState {
name: p.name_any(),
current_hash: p
.metadata
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/config-hash").cloned()),
ready: p
.status
.as_ref()
.and_then(|s| s.ready_replicas)
.unwrap_or(0)
>= 1,
})
.collect();
let plan = common::plan_rollout(&states, config_hash);
for (pool_name, target_hash) in plan {
let patch_body = json!({
"apiVersion": KafkaNodePool::api_version(&()),
"kind": KafkaNodePool::kind(&()),
"metadata": {
"ownerReferences": [owner],
"labels": { "crabka.io/config-hash": target_hash },
}
});
pool_api
.patch(&pool_name, ¶ms, &Patch::Apply(&patch_body))
.await?;
}
Ok(())
}
pub fn error_policy(_obj: Arc<Kafka>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
tracing::warn!(error = %err, "reconcile error, requeueing");
Action::requeue(Duration::from_secs(15))
}
async fn emit_weak_auth_event(
client: &kube::Client,
namespace: &str,
kafka: &Kafka,
message: &str,
) -> Result<(), ReconcileError> {
crate::controller::cluster_ca::emit_event(
client,
namespace,
kafka,
"Warning",
"WeakAuth",
message,
"crabka-listener-auth-",
"ListenerValidation",
"crabka-operator/listener-auth-check",
)
.await
}
async fn emit_ca_rotation_refused_event(
client: &kube::Client,
namespace: &str,
kafka: &Kafka,
message: &str,
) -> Result<(), ReconcileError> {
crate::controller::cluster_ca::emit_event(
client,
namespace,
kafka,
"Warning",
"CaRotationRefused",
message,
"crabka-ca-rotation-",
"CaRotation",
"crabka-operator/ca-rotation",
)
.await
}
pub(crate) fn pools_converged<'a>(pools: impl IntoIterator<Item = &'a KafkaNodePool>) -> bool {
let mut hashes = std::collections::BTreeSet::new();
let mut all_ready = true;
let mut any = false;
for p in pools {
any = true;
let h = p
.metadata
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/config-hash").cloned());
hashes.insert(h);
if p.status
.as_ref()
.and_then(|s| s.ready_replicas)
.unwrap_or(0)
< 1
{
all_ready = false;
}
}
!any || (hashes.len() == 1 && !hashes.contains(&None) && all_ready)
}
async fn strip_annotations(
kafka_api: &Api<Kafka>,
name: &str,
keys: &[&str],
) -> Result<(), ReconcileError> {
let mut ann = serde_json::Map::new();
for k in keys {
ann.insert((*k).to_string(), serde_json::Value::Null);
}
let patch = json!({ "metadata": { "annotations": serde_json::Value::Object(ann) } });
kafka_api
.patch(name, &PatchParams::default(), &Patch::Merge(&patch))
.await?;
Ok(())
}
fn svc_name(kafka: &str) -> String {
format!("{kafka}-broker-headless")
}
fn cm_name(kafka: &str) -> String {
format!("{kafka}-broker-config")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crd::{KafkaNodePoolSpec, KafkaNodePoolStatus, NodeRole};
use assert2::assert;
fn pool_with_status(name: &str, replicas: i32, ready: i32) -> KafkaNodePool {
let mut p = KafkaNodePool::new(
name,
KafkaNodePoolSpec {
roles: vec![NodeRole::Controller, NodeRole::Broker],
replicas: 1,
node_id_start: 0,
image: None,
resources: None,
template: None,
storage: None,
},
);
p.status = Some(KafkaNodePoolStatus {
conditions: vec![],
replicas: Some(replicas),
ready_replicas: Some(ready),
});
p
}
#[test]
fn aggregate_status_no_pools_is_no_node_pools() {
let r = aggregate_pool_status(std::iter::empty::<&KafkaNodePool>());
let (ready, reason, _) = rollup_condition(&r);
assert!(!ready);
assert!(reason == "NoNodePools");
}
#[test]
fn aggregate_status_partial_pool_is_partially_ready() {
let p = pool_with_status("brokers", 3, 1);
let r = aggregate_pool_status([&p]);
let (ready, reason, _) = rollup_condition(&r);
assert!(!ready);
assert!(reason == "PartiallyReady");
}
#[test]
fn aggregate_status_all_ready_pools_is_available() {
let p = pool_with_status("brokers", 1, 1);
let r = aggregate_pool_status([&p]);
let (ready, reason, _) = rollup_condition(&r);
assert!(ready);
assert!(reason == "Available");
}
#[test]
fn rolling_condition_when_pool_partial() {
let r = ClusterRollup {
replicas: 3,
ready_replicas: 1,
pool_count: 1,
};
let (rolling, reason, _) = rolling_condition_from_rollup(&r);
assert!(rolling);
assert!(reason == "RollingUpdate");
}
#[test]
fn rolling_condition_when_pool_stable() {
let r = ClusterRollup {
replicas: 1,
ready_replicas: 1,
pool_count: 1,
};
let (rolling, reason, _) = rolling_condition_from_rollup(&r);
assert!(!rolling);
assert!(reason == "Stable");
}
fn listener_with_auth(name: &str, auth: Option<ListenerAuthentication>) -> Listener {
Listener {
name: name.into(),
port: 9092,
type_: ListenerType::Internal,
tls: true,
authentication: auth,
configuration: None,
network_policy_peers: None,
}
}
fn sample_oauth_cfg(
certs: Vec<crate::crd::TlsTrustedCertificate>,
) -> ListenerAuthenticationOAuth {
ListenerAuthenticationOAuth {
valid_issuer_uri: "https://iss.example/".into(),
jwks_endpoint_uri: Some("https://iss.example/jwks".into()),
valid_audience: None,
user_name_claim: None,
custom_claim_check: None,
jwks_refresh_seconds: None,
max_clock_skew_seconds: None,
enable_oauth_bearer: true,
tls_trusted_certificates: certs,
access_token_is_jwt: true,
introspection_endpoint_uri: None,
user_info_endpoint_uri: None,
client_id: None,
client_secret: None,
introspection_http_timeout_seconds: None,
max_seconds_without_reauthentication: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
jwks_min_refresh_pause_seconds: None,
jwks_expiry_seconds: None,
jwks_ignore_key_use: None,
}
}
fn sample_oauth_cfg_introspection(secret_name: &str, key: &str) -> ListenerAuthenticationOAuth {
ListenerAuthenticationOAuth {
valid_issuer_uri: "https://iss.example/".into(),
jwks_endpoint_uri: None,
valid_audience: None,
user_name_claim: None,
custom_claim_check: None,
jwks_refresh_seconds: None,
max_clock_skew_seconds: None,
enable_oauth_bearer: true,
tls_trusted_certificates: vec![],
access_token_is_jwt: false,
introspection_endpoint_uri: Some("https://iss.example/introspect".into()),
user_info_endpoint_uri: None,
client_id: Some("broker-client".into()),
client_secret: Some(crate::crd::OauthClientSecretRef {
secret_name: secret_name.into(),
key: key.into(),
}),
introspection_http_timeout_seconds: None,
max_seconds_without_reauthentication: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
jwks_min_refresh_pause_seconds: None,
jwks_expiry_seconds: None,
jwks_ignore_key_use: None,
}
}
fn kafka_with_listeners(listeners: Vec<Listener>) -> Kafka {
use crate::crd::KafkaSpec;
let mut k = Kafka::new(
"c1",
KafkaSpec {
kafka_version: "3.7.0".into(),
metadata_version: None,
config: None,
listeners,
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
k.metadata.namespace = Some("ns".into());
k
}
#[test]
fn canonical_oauth_config_none_when_no_oauth_listener() {
let ls = vec![
listener_with_auth("plain", None),
listener_with_auth("scram", Some(ListenerAuthentication::ScramSha512)),
];
assert!(canonical_oauth_config(&ls).is_none());
}
#[test]
fn canonical_oauth_config_picks_first_oauth() {
let cfg = sample_oauth_cfg(vec![]);
let ls = vec![
listener_with_auth("plain", None),
listener_with_auth("oauth", Some(ListenerAuthentication::OAuth(cfg.clone()))),
];
assert!(canonical_oauth_config(&ls) == Some(cfg));
}
#[test]
fn canonical_oauth_config_with_empty_trust_certs_is_some_but_empty() {
let cfg = sample_oauth_cfg(vec![]);
let ls = vec![listener_with_auth(
"oauth",
Some(ListenerAuthentication::OAuth(cfg)),
)];
let got = canonical_oauth_config(&ls).expect("OAuth listener present");
assert!(got.tls_trusted_certificates.is_empty());
}
#[test]
fn oauth_introspection_secret_mount_returns_none_when_no_oauth_listener() {
let kafka = kafka_with_listeners(vec![
listener_with_auth("plain", None),
listener_with_auth("scram", Some(ListenerAuthentication::ScramSha512)),
]);
assert!(oauth_introspection_secret_mount(&kafka).is_none());
}
#[test]
fn oauth_introspection_secret_mount_returns_none_when_access_token_is_jwt_true() {
let cfg = sample_oauth_cfg(vec![]);
let kafka = kafka_with_listeners(vec![listener_with_auth(
"oauth",
Some(ListenerAuthentication::OAuth(cfg)),
)]);
assert!(oauth_introspection_secret_mount(&kafka).is_none());
}
#[test]
fn oauth_introspection_secret_mount_returns_none_when_client_secret_absent_introspection_mode()
{
let mut cfg = sample_oauth_cfg_introspection("oauth-cs", "client-secret");
cfg.client_secret = None;
let kafka = kafka_with_listeners(vec![listener_with_auth(
"oauth",
Some(ListenerAuthentication::OAuth(cfg)),
)]);
assert!(oauth_introspection_secret_mount(&kafka).is_none());
}
#[test]
fn oauth_introspection_secret_mount_returns_some_for_introspection_config() {
let cfg = sample_oauth_cfg_introspection("oauth-cs", "client-secret");
let kafka = kafka_with_listeners(vec![listener_with_auth(
"oauth",
Some(ListenerAuthentication::OAuth(cfg)),
)]);
let mount = oauth_introspection_secret_mount(&kafka).expect("mount derived");
assert!(mount.secret_name == "oauth-cs");
assert!(mount.key == "client-secret");
}
#[test]
fn gssapi_keytab_mount_extracted_from_listener() {
let g = crate::crd::ListenerAuthenticationGssapi {
keytab_secret_ref: crate::crd::KeytabSecretRef {
secret_name: "kt".into(),
key: "krb5.keytab".into(),
},
service_name: None,
principal_to_local_rules: vec!["DEFAULT".into()],
realm: None,
kdc: None,
};
let k = kafka_with_listeners(vec![listener_with_auth(
"gss",
Some(ListenerAuthentication::Gssapi(g)),
)]);
let m = gssapi_keytab_mount(&k).expect("keytab mount present");
assert!(m.secret_name == "kt");
assert!(m.key == "krb5.keytab");
}
#[test]
fn no_keytab_mount_without_gssapi_listener() {
let k = kafka_with_listeners(vec![listener_with_auth("plain", None)]);
assert!(gssapi_keytab_mount(&k).is_none());
}
#[test]
fn krb5_conf_mount_extracted_from_spec() {
let mut k = kafka_with_listeners(vec![listener_with_auth("plain", None)]);
assert!(krb5_conf_mount(&k).is_none());
k.spec.krb5_conf_secret_ref = Some(crate::crd::Krb5ConfSecretRef {
secret_name: "krb5".into(),
key: "krb5.conf".into(),
});
let (secret_name, key) = krb5_conf_mount(&k).expect("krb5.conf mount present");
assert!(secret_name == "krb5");
assert!(key == "krb5.conf");
}
}