use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt as _;
use k8s_openapi::ByteString;
use k8s_openapi::api::apps::v1::Deployment;
use k8s_openapi::api::core::v1::{Secret, Service};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::Api;
use kube::runtime::controller::{Action, Controller};
use kube::runtime::watcher;
use kube::{Resource, ResourceExt as _};
use serde_json::json;
use time::OffsetDateTime;
use crabka_security::ca::{SubjectAltName, issue_broker_cert};
use crate::context::Context;
use crate::controller::cluster_ca::{cluster_ca_cert_name, cluster_ca_key_name, renew_if_expiring};
use crate::controller::common::{
ReconcileError, apply_object, condition, owner_ref, patch_status, read_pem_key,
};
use crate::crd::user::{
AclOp, AclPermission, AclResource, AclResourceKind, AclRule, Authentication, Authorization,
KafkaUser, KafkaUserSpec, SimpleAuthorization, TlsAuth,
};
use crate::crd::{Kafka, KafkaCondition, KafkaGrpcGateway, KafkaGrpcGatewayStatus};
const GATEWAY_PORT: i32 = 9500;
const DEFAULT_REPLICAS: i32 = 1;
const DEFAULT_VALIDITY_DAYS: u32 = 365;
const DEFAULT_GATEWAY_IMAGE: &str = concat!(
"ghcr.io/robot-head/crabka-grpc-gateway:",
env!("CARGO_PKG_VERSION")
);
const SERVING_DIR: &str = "/etc/crabka-gw/serving";
const BROKER_CLIENT_DIR: &str = "/etc/crabka-gw/broker-client";
const CLUSTER_CA_DIR: &str = "/etc/crabka-gw/cluster-ca";
const CLIENTS_CA_DIR: &str = "/etc/crabka-gw/clients-ca";
const CONFIG_DIR: &str = "/etc/crabka-gw/config";
fn serving_secret_name(gw_name: &str) -> String {
format!("{gw_name}-serving")
}
fn broker_user_name(gw_name: &str) -> String {
format!("{gw_name}-broker")
}
fn config_secret_name(gw_name: &str) -> String {
format!("{gw_name}-config")
}
fn gateway_labels(parent_name: &str, gw_name: &str) -> BTreeMap<String, String> {
let mut m = BTreeMap::new();
m.insert(
"app.kubernetes.io/name".into(),
"crabka-grpc-gateway".into(),
);
m.insert("app.kubernetes.io/instance".into(), parent_name.into());
m.insert(
"app.kubernetes.io/managed-by".into(),
"crabka-operator".into(),
);
m.insert("crabka.io/gateway".into(), gw_name.into());
m
}
fn field_ref_env(name: &str, field_path: &str) -> serde_json::Value {
json!({ "name": name, "valueFrom": { "fieldRef": { "fieldPath": field_path } } })
}
fn value_env(name: &str, value: impl Into<String>) -> serde_json::Value {
json!({ "name": name, "value": value.into() })
}
#[allow(clippy::too_many_lines)] fn deployment(
gw: &KafkaGrpcGateway,
parent_name: &str,
image: &str,
bootstrap: &str,
broker_sni: &str,
) -> Result<Deployment, ReconcileError> {
let gw_name = gw.name_any();
let namespace = gw.meta().namespace.clone().unwrap_or_default();
let labels = gateway_labels(parent_name, &gw_name);
let replicas = gw.spec.replicas.unwrap_or(DEFAULT_REPLICAS);
let serving_cert = format!("{SERVING_DIR}/tls.crt");
let serving_key = format!("{SERVING_DIR}/tls.key");
let broker_cert = format!("{BROKER_CLIENT_DIR}/user.crt");
let broker_key = format!("{BROKER_CLIENT_DIR}/user.key");
let cluster_ca_crt = format!("{CLUSTER_CA_DIR}/ca.crt");
let clients_ca_crt = format!("{CLIENTS_CA_DIR}/ca.crt");
let webhooks_toml = format!("{CONFIG_DIR}/webhooks.toml");
let outbound_toml = format!("{CONFIG_DIR}/outbound.toml");
let advertised_addr = format!("$(POD_IP):{GATEWAY_PORT}");
let client_auth = gw
.spec
.tls
.as_ref()
.and_then(|t| t.client_auth.clone())
.unwrap_or_else(|| "required".into());
let authz_mode = gw
.spec
.authz
.as_ref()
.and_then(|a| a.mode.clone())
.unwrap_or_else(|| "simple".into());
let super_users = gw
.spec
.authz
.as_ref()
.map(|a| a.super_users.join(","))
.unwrap_or_default();
let acl_refresh = gw
.spec
.authz
.as_ref()
.and_then(|a| a.acl_refresh_secs)
.unwrap_or(60);
let bearer_mode = gw
.spec
.authz
.as_ref()
.and_then(|a| a.bearer.as_ref())
.and_then(|b| b.mode.clone())
.unwrap_or_else(|| "off".into());
let bearer_claim = gw
.spec
.authz
.as_ref()
.and_then(|a| a.bearer.as_ref())
.and_then(|b| b.principal_claim.clone())
.unwrap_or_else(|| "sub".into());
let mut args = vec![
format!("--bootstrap-servers={bootstrap}"),
format!("--listen-addr=0.0.0.0:{GATEWAY_PORT}"),
format!("--advertised-addr={advertised_addr}"),
format!(
"--dedup-topic={}",
gw.spec
.dedup
.as_ref()
.and_then(|d| d.topic.clone())
.unwrap_or_else(|| format!("{gw_name}-dedup"))
),
format!(
"--dedup-partitions={}",
gw.spec
.dedup
.as_ref()
.and_then(|d| d.partitions)
.unwrap_or(8)
),
format!(
"--dedup-window-ms={}",
gw.spec
.dedup
.as_ref()
.and_then(|d| d.window_ms)
.unwrap_or(86_400_000)
),
format!(
"--dedup-txn-id-prefix={}",
gw.spec
.dedup
.as_ref()
.and_then(|d| d.txn_id_prefix.clone())
.unwrap_or_else(|| gw_name.clone())
),
format!("--tls-cert={serving_cert}"),
format!("--tls-key={serving_key}"),
format!("--tls-client-ca={clients_ca_crt}"),
format!("--tls-trust-roots={cluster_ca_crt}"),
format!("--tls-client-auth={client_auth}"),
format!("--broker-tls-cert={broker_cert}"),
format!("--broker-tls-key={broker_key}"),
format!("--broker-tls-ca={cluster_ca_crt}"),
format!("--broker-tls-server-name={broker_sni}"),
format!("--authz={authz_mode}"),
format!("--authz-super-users={super_users}"),
format!("--acl-refresh-secs={acl_refresh}"),
format!("--bearer={bearer_mode}"),
format!("--bearer-principal-claim={bearer_claim}"),
format!("--webhooks-config={webhooks_toml}"),
format!("--outbound-webhooks-config={outbound_toml}"),
];
args.sort_unstable();
let mut env = vec![
field_ref_env("POD_NAME", "metadata.name"),
field_ref_env("POD_IP", "status.podIP"),
field_ref_env("POD_NAMESPACE", "metadata.namespace"),
value_env("CRABKA_GATEWAY_CLIENT_ID", "$(POD_NAME)"),
];
if let Some(t) = gw.spec.telemetry.as_ref() {
if let Some(ep) = t.otlp_endpoint.as_deref() {
env.push(value_env("CRABKA_OTLP_ENABLED", "true"));
env.push(value_env("CRABKA_OTLP_ENDPOINT", ep));
}
if let Some(p) = t.otlp_protocol.as_deref() {
let proto = if p == "http" { "http/protobuf" } else { "grpc" };
env.push(value_env("CRABKA_OTLP_PROTOCOL", proto));
}
if let Some(r) = t.sample_ratio {
env.push(value_env("CRABKA_OTLP_SAMPLE_RATIO", r.to_string()));
}
}
let volumes = json!([
{
"name": "serving",
"secret": { "secretName": serving_secret_name(&gw_name), "defaultMode": 0o400_i32 }
},
{
"name": "broker-client",
"secret": { "secretName": broker_user_name(&gw_name), "defaultMode": 0o400_i32 }
},
{
"name": "cluster-ca",
"secret": { "secretName": cluster_ca_cert_name(parent_name), "defaultMode": 0o400_i32 }
},
{
"name": "clients-ca",
"secret": { "secretName": format!("{parent_name}-clients-ca-cert"), "defaultMode": 0o400_i32 }
},
{
"name": "config",
"secret": { "secretName": config_secret_name(&gw_name), "defaultMode": 0o400_i32 }
},
]);
let volume_mounts = json!([
{ "name": "serving", "mountPath": SERVING_DIR, "readOnly": true },
{ "name": "broker-client", "mountPath": BROKER_CLIENT_DIR, "readOnly": true },
{ "name": "cluster-ca", "mountPath": CLUSTER_CA_DIR, "readOnly": true },
{ "name": "clients-ca", "mountPath": CLIENTS_CA_DIR, "readOnly": true },
{ "name": "config", "mountPath": CONFIG_DIR, "readOnly": true },
]);
let resources = gw
.spec
.resources
.clone()
.map(serde_json::to_value)
.transpose()?
.unwrap_or_else(|| {
json!({
"requests": { "cpu": "100m", "memory": "128Mi" },
"limits": { "cpu": "1000m", "memory": "512Mi" }
})
});
let container = json!({
"name": "gateway",
"image": image,
"args": args,
"env": env,
"ports": [{ "containerPort": GATEWAY_PORT, "name": "grpc", "protocol": "TCP" }],
"resources": resources,
"volumeMounts": volume_mounts,
"readinessProbe": {
"httpGet": { "path": "/readyz", "port": GATEWAY_PORT },
"initialDelaySeconds": 2,
"periodSeconds": 5
},
"livenessProbe": {
"httpGet": { "path": "/healthz", "port": GATEWAY_PORT },
"initialDelaySeconds": 10,
"periodSeconds": 10
},
"securityContext": {
"allowPrivilegeEscalation": false,
"readOnlyRootFilesystem": true,
"capabilities": { "drop": ["ALL"] }
}
});
let dep: Deployment = serde_json::from_value(json!({
"metadata": {
"name": gw_name,
"namespace": namespace,
"labels": labels,
"ownerReferences": [owner_ref::<KafkaGrpcGateway>(gw)?],
},
"spec": {
"replicas": replicas,
"selector": { "matchLabels": labels },
"template": {
"metadata": { "labels": labels },
"spec": {
"securityContext": {
"runAsNonRoot": true,
"runAsUser": 65532,
"fsGroup": 65532,
"seccompProfile": { "type": "RuntimeDefault" }
},
"containers": [container],
"volumes": volumes,
}
}
}
}))?;
Ok(dep)
}
fn service(gw: &KafkaGrpcGateway, parent_name: &str) -> Result<Service, ReconcileError> {
let gw_name = gw.name_any();
let labels = gateway_labels(parent_name, &gw_name);
let svc: Service = serde_json::from_value(json!({
"metadata": {
"name": gw_name,
"namespace": gw.meta().namespace.clone(),
"labels": labels,
"ownerReferences": [owner_ref::<KafkaGrpcGateway>(gw)?],
},
"spec": {
"type": "ClusterIP",
"selector": labels,
"ports": [{
"name": "grpc",
"port": GATEWAY_PORT,
"protocol": "TCP",
"targetPort": GATEWAY_PORT,
}],
}
}))?;
Ok(svc)
}
fn config_secret(
gw: &KafkaGrpcGateway,
resolved_webhook_secrets: &BTreeMap<String, String>,
resolved_outbound_secrets: &BTreeMap<String, String>,
) -> Result<Secret, ReconcileError> {
let gw_name = gw.name_any();
let webhooks_toml = render_webhooks_toml(gw, resolved_webhook_secrets)?;
let outbound_toml = render_outbound_toml(gw, resolved_outbound_secrets)?;
let mut data: BTreeMap<String, ByteString> = BTreeMap::new();
data.insert(
"webhooks.toml".into(),
ByteString(webhooks_toml.into_bytes()),
);
data.insert(
"outbound.toml".into(),
ByteString(outbound_toml.into_bytes()),
);
Ok(Secret {
metadata: ObjectMeta {
name: Some(config_secret_name(&gw_name)),
namespace: gw.meta().namespace.clone(),
labels: Some(gateway_labels(
gw.meta()
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/cluster"))
.map_or(gw_name.as_str(), String::as_str),
&gw_name,
)),
owner_references: Some(vec![owner_ref::<KafkaGrpcGateway>(gw)?]),
..Default::default()
},
type_: Some("Opaque".into()),
data: Some(data),
..Default::default()
})
}
fn render_webhooks_toml(
gw: &KafkaGrpcGateway,
resolved: &BTreeMap<String, String>,
) -> Result<String, ReconcileError> {
let endpoints: Vec<serde_json::Value> = gw
.spec
.webhooks
.iter()
.map(|w| {
let mut e = serde_json::Map::new();
e.insert("name".into(), json!(w.name));
e.insert("target_topic".into(), json!(w.target_topic));
if let Some(v) = &w.principal {
e.insert("principal".into(), json!(v));
}
if let Some(secret) = resolved.get(&w.name) {
e.insert("secret".into(), json!(secret));
}
if let Some(v) = &w.signature_header {
e.insert("signature_header".into(), json!(v));
}
if let Some(v) = &w.signature_encoding {
e.insert("signature_encoding".into(), json!(v));
}
if let Some(v) = &w.signature_prefix {
e.insert("signature_prefix".into(), json!(v));
}
if let Some(v) = &w.timestamp_header {
e.insert("timestamp_header".into(), json!(v));
}
if let Some(v) = w.timestamp_tolerance_secs {
e.insert("timestamp_tolerance_secs".into(), json!(v));
}
if let Some(v) = &w.idempotency_source {
e.insert("idempotency_source".into(), json!(v));
}
if let Some(v) = &w.key_source {
e.insert("key_source".into(), json!(v));
}
if let Some(v) = w.max_body_bytes {
e.insert("max_body_bytes".into(), json!(v));
}
serde_json::Value::Object(e)
})
.collect();
let doc = json!({ "endpoints": endpoints });
toml::to_string(&doc).map_err(|e| ReconcileError::Malformed(format!("webhooks.toml: {e}")))
}
fn render_outbound_toml(
gw: &KafkaGrpcGateway,
resolved: &BTreeMap<String, String>,
) -> Result<String, ReconcileError> {
let subscriptions: Vec<serde_json::Value> = gw
.spec
.outbound_subscriptions
.iter()
.map(|s| {
let mut e = serde_json::Map::new();
e.insert("name".into(), json!(s.name));
e.insert("source_topics".into(), json!(s.source_topics));
e.insert("target_url".into(), json!(s.target_url));
if let Some(secret) = resolved.get(&s.name) {
e.insert("signing_secret".into(), json!(secret));
}
if let Some(v) = &s.dead_letter_topic {
e.insert("dead_letter_topic".into(), json!(v));
}
if let Some(v) = s.max_attempts {
e.insert("max_attempts".into(), json!(v));
}
if let Some(v) = s.base_backoff_ms {
e.insert("base_backoff_ms".into(), json!(v));
}
if let Some(v) = s.max_backoff_ms {
e.insert("max_backoff_ms".into(), json!(v));
}
if let Some(v) = s.request_timeout_ms {
e.insert("request_timeout_ms".into(), json!(v));
}
if let Some(v) = &s.filter {
e.insert("filter".into(), json!(v));
}
if !s.headers.is_empty() {
e.insert("headers".into(), json!(s.headers));
}
serde_json::Value::Object(e)
})
.collect();
let allowed = derive_allowed_targets(gw);
let doc = json!({
"subscriptions": subscriptions,
"allowed_targets": allowed,
});
toml::to_string(&doc).map_err(|e| ReconcileError::Malformed(format!("outbound.toml: {e}")))
}
fn derive_allowed_targets(gw: &KafkaGrpcGateway) -> Vec<serde_json::Value> {
let mut out: Vec<(String, String)> = Vec::new();
let mut push = |scheme: String, host: String| {
if !out.iter().any(|(s, h)| *s == scheme && *h == host) {
out.push((scheme, host));
}
};
for s in &gw.spec.outbound_subscriptions {
if let Ok(url) = reqwest_url_parse(&s.target_url) {
push(url.0, url.1);
}
}
for a in &gw.spec.allowed_targets {
push(a.scheme.clone(), a.host.clone());
}
out.into_iter()
.map(|(scheme, host)| json!({ "scheme": scheme, "host": host }))
.collect()
}
fn reqwest_url_parse(target: &str) -> Result<(String, String), ()> {
let (scheme, rest) = target.split_once("://").ok_or(())?;
if scheme.is_empty() {
return Err(());
}
let authority = rest.split(['/', '?', '#']).next().unwrap_or_default();
let host_port = authority.rsplit_once('@').map_or(authority, |(_, h)| h);
let host = host_port.rsplit_once(':').map_or(host_port, |(h, _)| h);
if host.is_empty() {
return Err(());
}
Ok((scheme.to_ascii_lowercase(), host.to_string()))
}
fn child_kafkauser(gw: &KafkaGrpcGateway, parent_name: &str) -> Result<KafkaUser, ReconcileError> {
let gw_name = gw.name_any();
let user_name = broker_user_name(&gw_name);
let acls = vec![
broad_acl(AclResourceKind::Topic, "*"),
broad_acl(AclResourceKind::Group, "*"),
broad_acl(AclResourceKind::TransactionalId, "*"),
broad_acl(AclResourceKind::Cluster, "kafka-cluster"),
];
let mut user = KafkaUser::new(
&user_name,
KafkaUserSpec {
authentication: Authentication::Tls(TlsAuth::default()),
authorization: Some(Authorization::Simple(SimpleAuthorization { acls })),
quotas: None,
},
);
user.metadata.namespace.clone_from(&gw.meta().namespace);
let mut labels = BTreeMap::new();
labels.insert("crabka.io/cluster".into(), parent_name.to_string());
labels.insert("crabka.io/gateway".into(), gw_name.clone());
user.metadata.labels = Some(labels);
user.metadata.owner_references = Some(vec![owner_ref::<KafkaGrpcGateway>(gw)?]);
Ok(user)
}
fn broad_acl(kind: AclResourceKind, name: &str) -> AclRule {
AclRule {
resource: AclResource {
kind,
name: name.into(),
pattern_type: crate::crd::user::AclPatternType::Literal,
},
operations: vec![AclOp::All],
host: "*".into(),
permission: AclPermission::Allow,
}
}
async fn ensure_serving_cert(
secret_api: &Api<Secret>,
gw: &KafkaGrpcGateway,
namespace: &str,
parent_name: &str,
) -> Result<(), ReconcileError> {
let gw_name = gw.name_any();
let secret_name = serving_secret_name(&gw_name);
let now = OffsetDateTime::now_utc();
if let Some(existing) = secret_api.get_opt(&secret_name).await?
&& let Some(cert_pem) = read_pem_key(&existing, "tls.crt")
&& !renew_if_expiring(&cert_pem, 30, now).unwrap_or(true)
{
return Ok(());
}
let key_name = cluster_ca_key_name(parent_name);
let cert_name = cluster_ca_cert_name(parent_name);
let key_secret =
secret_api
.get_opt(&key_name)
.await?
.ok_or_else(|| ReconcileError::CaSecretMissing {
name: key_name.clone(),
})?;
let cert_secret =
secret_api
.get_opt(&cert_name)
.await?
.ok_or_else(|| ReconcileError::CaSecretMissing {
name: cert_name.clone(),
})?;
let ca_key_pem = read_pem_key(&key_secret, "ca.key")
.ok_or_else(|| ReconcileError::CertParse(format!("{key_name} ca.key unreadable")))?;
let ca_cert_pem = read_pem_key(&cert_secret, "ca.crt")
.ok_or_else(|| ReconcileError::CertParse(format!("{cert_name} ca.crt unreadable")))?;
let validity = gw
.spec
.tls
.as_ref()
.and_then(|t| t.validity_days)
.unwrap_or(DEFAULT_VALIDITY_DAYS);
let base_sans = vec![
SubjectAltName::Dns(format!("{gw_name}.{namespace}.svc")),
SubjectAltName::Dns(format!("{gw_name}.{namespace}.svc.cluster.local")),
SubjectAltName::Dns(gw_name.clone()),
];
let leaf = issue_broker_cert(
&ca_cert_pem,
&ca_key_pem,
&gw_name,
&base_sans,
&[],
validity,
)?;
let mut data: BTreeMap<String, ByteString> = BTreeMap::new();
data.insert("tls.crt".into(), ByteString(leaf.cert_pem.into_bytes()));
data.insert("tls.key".into(), ByteString(leaf.key_pem.into_bytes()));
let secret = Secret {
metadata: ObjectMeta {
name: Some(secret_name.clone()),
namespace: Some(namespace.to_string()),
labels: Some(gateway_labels(parent_name, &gw_name)),
owner_references: Some(vec![owner_ref::<KafkaGrpcGateway>(gw)?]),
..Default::default()
},
type_: Some("Opaque".into()),
data: Some(data),
..Default::default()
};
apply_object(secret_api, &secret_name, &secret).await?;
Ok(())
}
async fn resolve_secret_ref(
secret_api: &Api<Secret>,
secret_name: &str,
key: &str,
) -> Result<String, ReconcileError> {
let secret = secret_api.get_opt(secret_name).await?.ok_or_else(|| {
ReconcileError::Malformed(format!("secretRef Secret '{secret_name}' not found"))
})?;
let bytes = secret
.data
.as_ref()
.and_then(|d| d.get(key))
.map(|b| b.0.clone())
.ok_or_else(|| {
ReconcileError::Malformed(format!(
"secretRef Secret '{secret_name}' has no key '{key}'"
))
})?;
String::from_utf8(bytes).map_err(|e| {
ReconcileError::Malformed(format!("secretRef Secret '{secret_name}' key '{key}': {e}"))
})
}
async fn resolve_all_secret_refs(
secret_api: &Api<Secret>,
gw: &KafkaGrpcGateway,
) -> Result<(BTreeMap<String, String>, BTreeMap<String, String>), ReconcileError> {
let mut webhook_secrets = BTreeMap::new();
for w in &gw.spec.webhooks {
if let Some(r) = &w.secret_ref {
let value = resolve_secret_ref(secret_api, &r.name, &r.key).await?;
webhook_secrets.insert(w.name.clone(), value);
}
}
let mut outbound_secrets = BTreeMap::new();
for s in &gw.spec.outbound_subscriptions {
if let Some(r) = &s.signing_secret_ref {
let value = resolve_secret_ref(secret_api, &r.name, &r.key).await?;
outbound_secrets.insert(s.name.clone(), value);
}
}
Ok((webhook_secrets, outbound_secrets))
}
fn version_gate(parent: &Kafka) -> Option<KafkaCondition> {
let status = parent.status.as_ref();
let version_cond =
status.and_then(|s| s.conditions.iter().find(|c| c.type_ == "KafkaVersionValid"));
let finalized = status.and_then(|s| s.metadata_version.as_deref());
let cleared = finalized.is_some() || version_cond.is_some_and(|c| c.status == "True");
if cleared {
return None;
}
let cond = match version_cond {
Some(c) => condition(
"Ready",
"False",
"KafkaVersionInvalid",
&format!(
"refusing to deploy gateway: parent Kafka '{}' KafkaVersionValid={} ({}): {}",
parent.name_any(),
c.status,
c.reason,
c.message
),
),
None => condition(
"Ready",
"False",
"WaitingForVersionValidation",
&format!(
"waiting for parent Kafka '{}' to publish a KafkaVersionValid verdict before deploying the gateway",
parent.name_any()
),
),
};
Some(cond)
}
fn resolve_broker_endpoint(parent: &Kafka, namespace: &str) -> Option<(String, String)> {
let status = parent.status.as_ref()?;
let listener = parent.spec.listeners.iter().find(|l| {
l.type_ == crate::crd::ListenerType::Internal
&& l.tls
&& matches!(
l.authentication,
Some(crate::crd::ListenerAuthentication::Tls)
)
})?;
let bootstrap = status
.listeners
.iter()
.find(|s| s.name == listener.name)
.map(|s| s.bootstrap_servers.clone())?;
let sni = format!(
"{}-broker-headless.{namespace}.svc.cluster.local",
parent.name_any()
);
Some((bootstrap, sni))
}
async fn patch_conditions(
gw_api: &Api<KafkaGrpcGateway>,
name: &str,
observed_generation: Option<i64>,
conditions: Vec<KafkaCondition>,
) -> Result<(), ReconcileError> {
let status = KafkaGrpcGatewayStatus {
conditions,
observed_generation,
ready_replicas: None,
};
patch_status::<KafkaGrpcGateway, KafkaGrpcGatewayStatus>(gw_api, name, status).await
}
#[allow(clippy::too_many_lines)] pub async fn reconcile(
gw: Arc<KafkaGrpcGateway>,
ctx: Arc<Context>,
) -> Result<Action, ReconcileError> {
let ns = gw.namespace().unwrap_or_else(|| "default".into());
let name = gw.name_any();
let observed_generation = gw.meta().generation;
let gw_api: Api<KafkaGrpcGateway> = Api::namespaced(ctx.client.clone(), &ns);
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &ns);
let Some(parent_name) = gw
.meta()
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/cluster").cloned())
else {
let cond = condition(
"Ready",
"False",
"MissingClusterLabel",
"metadata.labels.\"crabka.io/cluster\" is required to link a gateway to its parent Kafka",
);
patch_conditions(&gw_api, &name, observed_generation, vec![cond]).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
};
let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
let Some(parent) = kafka_api.get_opt(&parent_name).await? else {
let cond = condition(
"Ready",
"False",
"ParentNotFound",
&format!("Kafka '{parent_name}' not found in namespace '{ns}'"),
);
patch_conditions(&gw_api, &name, observed_generation, vec![cond]).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
};
if let Some(cond) = version_gate(&parent) {
let version_valid = condition("KafkaVersionValid", "False", &cond.reason, &cond.message);
patch_conditions(
&gw_api,
&name,
observed_generation,
vec![cond, version_valid],
)
.await?;
return Ok(Action::requeue(Duration::from_secs(30)));
}
let user = child_kafkauser(&gw, &parent_name)?;
let user_name = broker_user_name(&name);
let user_api: Api<KafkaUser> = Api::namespaced(ctx.client.clone(), &ns);
apply_object(&user_api, &user_name, &user).await?;
if secret_api.get_opt(&user_name).await?.is_none() {
let cond = condition(
"CertReady",
"False",
"WaitingForBrokerCert",
&format!(
"child KafkaUser '{user_name}' has not yet been issued its broker-client cert Secret"
),
);
let ready = condition(
"Ready",
"False",
"WaitingForBrokerCert",
"waiting for the gateway's broker-mTLS client cert",
);
patch_conditions(&gw_api, &name, observed_generation, vec![ready, cond]).await?;
return Ok(Action::requeue(Duration::from_secs(15)));
}
ensure_serving_cert(&secret_api, &gw, &ns, &parent_name).await?;
let (webhook_secrets, outbound_secrets) = resolve_all_secret_refs(&secret_api, &gw).await?;
let cfg = config_secret(&gw, &webhook_secrets, &outbound_secrets)?;
apply_object(&secret_api, &config_secret_name(&name), &cfg).await?;
let Some((bootstrap, broker_sni)) = resolve_broker_endpoint(&parent, &ns) else {
let ready = condition(
"Ready",
"False",
"NoTlsListener",
&format!(
"no internal TLS listener with authentication=tls found on Kafka '{parent_name}'; the gateway requires mTLS to the broker"
),
);
let degraded = condition(
"Degraded",
"True",
"NoTlsListener",
&format!(
"Kafka '{parent_name}' exposes no internal TLS+mTLS listener for the gateway to dial"
),
);
patch_conditions(&gw_api, &name, observed_generation, vec![ready, degraded]).await?;
return Ok(Action::requeue(Duration::from_secs(30)));
};
let image = gw
.spec
.image
.clone()
.or_else(|| ctx.config.default_gateway_image.clone())
.unwrap_or_else(|| DEFAULT_GATEWAY_IMAGE.into());
let dep = deployment(&gw, &parent_name, &image, &bootstrap, &broker_sni)?;
let dep_api: Api<Deployment> = Api::namespaced(ctx.client.clone(), &ns);
apply_object(&dep_api, &name, &dep).await?;
let svc = service(&gw, &parent_name)?;
let svc_api: Api<Service> = Api::namespaced(ctx.client.clone(), &ns);
apply_object(&svc_api, &name, &svc).await?;
let live = dep_api.get_opt(&name).await?;
let desired_replicas = gw.spec.replicas.unwrap_or(DEFAULT_REPLICAS);
let ready_replicas = live
.as_ref()
.and_then(|d| d.status.as_ref())
.and_then(|s| s.ready_replicas);
let ready = ready_replicas.unwrap_or(0) == desired_replicas;
let (status_val, reason, message) = if ready {
(
"True",
"Available",
format!("{desired_replicas} gateway replica(s) ready"),
)
} else {
(
"False",
"Progressing",
format!(
"{}/{desired_replicas} gateway replica(s) ready",
ready_replicas.unwrap_or(0)
),
)
};
let status = KafkaGrpcGatewayStatus {
conditions: vec![
condition("Ready", status_val, reason, &message),
condition(
"CertReady",
"True",
"Issued",
"serving + broker-client certs present",
),
],
observed_generation,
ready_replicas,
};
patch_status::<KafkaGrpcGateway, KafkaGrpcGatewayStatus>(&gw_api, &name, status).await?;
Ok(Action::requeue(Duration::from_secs(30)))
}
pub fn error_policy(
_obj: Arc<KafkaGrpcGateway>,
err: &ReconcileError,
_ctx: Arc<Context>,
) -> Action {
tracing::warn!(error = %err, "gateway reconcile error, requeueing");
Action::requeue(Duration::from_secs(15))
}
pub async fn run(ctx: Context) -> anyhow::Result<()> {
let api: Api<KafkaGrpcGateway> = Api::all(ctx.client.clone());
let deployments: Api<Deployment> = Api::all(ctx.client.clone());
let services: Api<Service> = Api::all(ctx.client.clone());
let secrets: Api<Secret> = Api::all(ctx.client.clone());
let kafkausers: Api<KafkaUser> = Api::all(ctx.client.clone());
let kafkas: Api<Kafka> = Api::all(ctx.client.clone());
Controller::new(api, watcher::Config::default())
.owns(deployments, watcher::Config::default())
.owns(services, watcher::Config::default())
.owns(secrets, watcher::Config::default())
.owns(kafkausers, watcher::Config::default())
.watches(kafkas, watcher::Config::default(), |kafka| {
let _ = kafka;
std::iter::empty::<kube::runtime::reflector::ObjectRef<KafkaGrpcGateway>>()
})
.run(reconcile, error_policy, Arc::new(ctx))
.for_each(|res| async move {
match res {
Ok((obj, _)) => tracing::debug!(?obj, "gateway reconciled"),
Err(e) => tracing::warn!(error = %e, "gateway reconcile error"),
}
})
.await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crd::grpc_gateway::{
AllowedTargetSpec, DedupSpec, GatewayAuthzSpec, GatewayBearerSpec, GatewayTlsSpec,
InboundWebhookSpec, KafkaGrpcGatewaySpec, OutboundSubscriptionSpec, SecretKeyRef,
TelemetrySpec,
};
use assert2::assert;
fn empty_spec() -> KafkaGrpcGatewaySpec {
KafkaGrpcGatewaySpec {
replicas: None,
image: None,
resources: None,
dedup: None,
tls: None,
authz: None,
webhooks: vec![],
outbound_subscriptions: vec![],
allowed_targets: vec![],
telemetry: None,
}
}
fn gateway_fixture(name: &str, parent: &str) -> KafkaGrpcGateway {
let mut gw = KafkaGrpcGateway::new(name, empty_spec());
gw.metadata.namespace = Some("default".into());
gw.metadata.uid = Some("gw-uid".into());
let mut labels = BTreeMap::new();
labels.insert("crabka.io/cluster".into(), parent.to_string());
gw.metadata.labels = Some(labels);
gw
}
#[test]
fn deployment_name_and_owner_ref() {
let gw = gateway_fixture("gw", "demo");
let dep = deployment(
&gw,
"demo",
"img:1",
"demo-broker-headless.default.svc.cluster.local:9092",
"demo-broker-headless.default.svc.cluster.local",
)
.unwrap();
assert!(dep.metadata.name.as_deref() == Some("gw"));
let owner = &dep.metadata.owner_references.as_ref().unwrap()[0];
assert!(owner.kind == "KafkaGrpcGateway");
assert!(owner.name == "gw");
assert!(owner.controller == Some(true));
}
#[test]
fn deployment_has_five_volume_mounts() {
let gw = gateway_fixture("gw", "demo");
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
let pod = dep.spec.unwrap().template.spec.unwrap();
let container = &pod.containers[0];
let mounts = container.volume_mounts.as_ref().expect("volume mounts");
let names: std::collections::BTreeSet<&str> =
mounts.iter().map(|m| m.name.as_str()).collect();
for want in [
"serving",
"broker-client",
"cluster-ca",
"clients-ca",
"config",
] {
assert!(names.contains(want), "missing mount {want}; got {names:?}");
}
assert!(
mounts.len() == 5,
"expected exactly 5 mounts, got {}",
mounts.len()
);
let vols = pod.volumes.as_ref().expect("volumes");
let secret_names: std::collections::BTreeSet<String> = vols
.iter()
.filter_map(|v| v.secret.as_ref().and_then(|s| s.secret_name.clone()))
.collect();
for want in [
"gw-serving",
"gw-broker",
"demo-cluster-ca-cert",
"demo-clients-ca-cert",
"gw-config",
] {
assert!(
secret_names.contains(want),
"missing backing Secret volume {want}; got {secret_names:?}"
);
}
}
#[test]
fn deployment_has_broker_tls_args_pointing_at_mounts() {
let gw = gateway_fixture("gw", "demo");
let dep = deployment(
&gw,
"demo",
"img:1",
"demo-broker-headless.default.svc.cluster.local:9092",
"demo-broker-headless.default.svc.cluster.local",
)
.unwrap();
let pod = dep.spec.unwrap().template.spec.unwrap();
let args = pod.containers[0].args.as_ref().expect("args");
let joined = args.join(" ");
assert!(
joined.contains("--broker-tls-cert=/etc/crabka-gw/broker-client/user.crt"),
"args: {joined}"
);
assert!(
joined.contains("--broker-tls-key=/etc/crabka-gw/broker-client/user.key"),
"args: {joined}"
);
assert!(
joined.contains("--broker-tls-ca=/etc/crabka-gw/cluster-ca/ca.crt"),
"args: {joined}"
);
assert!(
joined.contains(
"--broker-tls-server-name=demo-broker-headless.default.svc.cluster.local"
),
"args: {joined}"
);
assert!(
joined.contains("--tls-client-ca=/etc/crabka-gw/clients-ca/ca.crt"),
"args: {joined}"
);
assert!(
joined.contains("--tls-trust-roots=/etc/crabka-gw/cluster-ca/ca.crt"),
"args: {joined}"
);
assert!(
joined.contains("--tls-cert=/etc/crabka-gw/serving/tls.crt"),
"args: {joined}"
);
assert!(
joined.contains("--bootstrap-servers=demo-broker-headless"),
"args: {joined}"
);
}
#[test]
fn deployment_advertised_addr_uses_pod_ip_field_ref() {
let gw = gateway_fixture("gw", "demo");
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
let pod = dep.spec.unwrap().template.spec.unwrap();
let container = &pod.containers[0];
let args = container.args.as_ref().expect("args");
assert!(
args.iter().any(|a| a == "--advertised-addr=$(POD_IP):9500"),
"args: {args:?}"
);
let env = container.env.as_ref().expect("env");
let pod_ip = env.iter().find(|e| e.name == "POD_IP").expect("POD_IP env");
let fr = pod_ip
.value_from
.as_ref()
.and_then(|v| v.field_ref.as_ref())
.expect("POD_IP fieldRef");
assert!(fr.field_path == "status.podIP");
let client_id = env
.iter()
.find(|e| e.name == "CRABKA_GATEWAY_CLIENT_ID")
.expect("client id env");
assert!(client_id.value.as_deref() == Some("$(POD_NAME)"));
}
#[test]
fn deployment_default_replicas_is_one() {
let gw = gateway_fixture("gw", "demo");
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
assert!(dep.spec.unwrap().replicas == Some(1));
}
#[test]
fn deployment_honors_explicit_replicas() {
let mut gw = gateway_fixture("gw", "demo");
gw.spec.replicas = Some(3);
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
assert!(dep.spec.unwrap().replicas == Some(3));
}
#[test]
fn deployment_probes_on_gateway_port() {
let gw = gateway_fixture("gw", "demo");
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
let container = dep
.spec
.unwrap()
.template
.spec
.unwrap()
.containers
.remove(0);
let readiness = container.readiness_probe.expect("readiness probe");
let get = readiness.http_get.expect("httpGet readiness");
assert!(get.path.as_deref() == Some("/readyz"));
let liveness = container.liveness_probe.expect("liveness probe");
let get = liveness.http_get.expect("httpGet liveness");
assert!(get.path.as_deref() == Some("/healthz"));
let ports = container.ports.expect("ports");
assert!(ports.iter().any(|p| p.container_port == 9500));
}
#[test]
fn deployment_telemetry_env_present_when_configured() {
let mut gw = gateway_fixture("gw", "demo");
gw.spec.telemetry = Some(TelemetrySpec {
otlp_endpoint: Some("http://otel:4317".into()),
otlp_protocol: Some("http".into()),
sample_ratio: Some(0.5),
});
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
let env = dep
.spec
.unwrap()
.template
.spec
.unwrap()
.containers
.remove(0)
.env
.unwrap();
let by_name = |n: &str| {
env.iter()
.find(|e| e.name == n)
.and_then(|e| e.value.clone())
};
assert!(by_name("CRABKA_OTLP_ENABLED").as_deref() == Some("true"));
assert!(by_name("CRABKA_OTLP_ENDPOINT").as_deref() == Some("http://otel:4317"));
assert!(by_name("CRABKA_OTLP_PROTOCOL").as_deref() == Some("http/protobuf"));
assert!(by_name("CRABKA_OTLP_SAMPLE_RATIO").as_deref() == Some("0.5"));
}
#[test]
fn service_selector_matches_labels_and_port() {
let gw = gateway_fixture("gw", "demo");
let svc = service(&gw, "demo").unwrap();
let spec = svc.spec.expect("svc spec");
assert!(spec.type_.as_deref() == Some("ClusterIP"));
let selector = spec.selector.expect("selector");
let labels = gateway_labels("demo", "gw");
assert!(
selector == labels,
"selector {selector:?} != labels {labels:?}"
);
let port = &spec.ports.expect("ports")[0];
assert!(port.port == 9500);
assert!(svc.metadata.owner_references.as_ref().unwrap()[0].name == "gw");
}
#[test]
fn child_kafkauser_is_tls_with_broad_acls() {
let gw = gateway_fixture("gw", "demo");
let user = child_kafkauser(&gw, "demo").unwrap();
assert!(user.metadata.name.as_deref() == Some("gw-broker"));
assert!(matches!(user.spec.authentication, Authentication::Tls(_)));
assert!(
user.metadata
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/cluster"))
.map(String::as_str)
== Some("demo")
);
assert!(user.metadata.owner_references.as_ref().unwrap()[0].kind == "KafkaGrpcGateway");
let Some(Authorization::Simple(authz)) = &user.spec.authorization else {
panic!("expected simple authorization");
};
let kinds: Vec<AclResourceKind> = authz.acls.iter().map(|a| a.resource.kind).collect();
for want in [
AclResourceKind::Topic,
AclResourceKind::Group,
AclResourceKind::TransactionalId,
AclResourceKind::Cluster,
] {
assert!(kinds.contains(&want), "missing ACL kind {want:?}");
}
for rule in &authz.acls {
assert!(rule.permission == AclPermission::Allow);
assert!(rule.operations.contains(&AclOp::All));
}
}
#[test]
fn config_secret_renders_webhooks_and_outbound_toml() {
let mut gw = gateway_fixture("gw", "demo");
gw.spec.webhooks = vec![InboundWebhookSpec {
name: "orders".into(),
target_topic: "raw-orders".into(),
principal: Some("User:webhook".into()),
signature_header: Some("X-Hub-Signature-256".into()),
signature_encoding: None,
signature_prefix: Some("sha256=".into()),
timestamp_header: None,
timestamp_tolerance_secs: None,
idempotency_source: Some("header:X-Idempotency-Key".into()),
key_source: None,
max_body_bytes: None,
secret_ref: Some(SecretKeyRef {
name: "orders-secret".into(),
key: "hmac".into(),
}),
}];
gw.spec.outbound_subscriptions = vec![OutboundSubscriptionSpec {
name: "processed".into(),
source_topics: vec!["processed-orders".into()],
target_url: "https://hooks.example.com/deliver".into(),
dead_letter_topic: Some("dlq".into()),
max_attempts: Some(5),
base_backoff_ms: None,
max_backoff_ms: None,
request_timeout_ms: None,
filter: Some("json:$.type".into()),
headers: BTreeMap::from([("X-Tenant".to_string(), "acme".to_string())]),
signing_secret_ref: Some(SecretKeyRef {
name: "sign-secret".into(),
key: "hmac".into(),
}),
}];
let mut webhook_secrets = BTreeMap::new();
webhook_secrets.insert("orders".to_string(), "WEBHOOK-HMAC".to_string());
let mut outbound_secrets = BTreeMap::new();
outbound_secrets.insert("processed".to_string(), "SIGN-HMAC".to_string());
let secret = config_secret(&gw, &webhook_secrets, &outbound_secrets).unwrap();
assert!(secret.metadata.name.as_deref() == Some("gw-config"));
let data = secret.data.unwrap();
let webhooks_toml = String::from_utf8(data["webhooks.toml"].0.clone()).unwrap();
assert!(
webhooks_toml.contains("name = \"orders\""),
"{webhooks_toml}"
);
assert!(
webhooks_toml.contains("target_topic = \"raw-orders\""),
"{webhooks_toml}"
);
assert!(
webhooks_toml.contains("secret = \"WEBHOOK-HMAC\""),
"{webhooks_toml}"
);
assert!(
webhooks_toml.contains("signature_prefix = \"sha256=\""),
"{webhooks_toml}"
);
let outbound_toml = String::from_utf8(data["outbound.toml"].0.clone()).unwrap();
assert!(
outbound_toml.contains("name = \"processed\""),
"{outbound_toml}"
);
assert!(
outbound_toml.contains("target_url = \"https://hooks.example.com/deliver\""),
"{outbound_toml}"
);
assert!(
outbound_toml.contains("signing_secret = \"SIGN-HMAC\""),
"{outbound_toml}"
);
assert!(
outbound_toml.contains("hooks.example.com"),
"{outbound_toml}"
);
assert!(secret.metadata.owner_references.as_ref().unwrap()[0].name == "gw");
}
#[test]
fn config_secret_round_trips_through_gateway_parsers_shape() {
let mut gw = gateway_fixture("gw", "demo");
gw.spec.outbound_subscriptions = vec![OutboundSubscriptionSpec {
name: "s".into(),
source_topics: vec!["t".into()],
target_url: "https://h.example.com/x".into(),
dead_letter_topic: None,
max_attempts: None,
base_backoff_ms: None,
max_backoff_ms: None,
request_timeout_ms: None,
filter: None,
headers: BTreeMap::new(),
signing_secret_ref: None,
}];
let secret = config_secret(&gw, &BTreeMap::new(), &BTreeMap::new()).unwrap();
let data = secret.data.unwrap();
let outbound_toml = String::from_utf8(data["outbound.toml"].0.clone()).unwrap();
let parsed: toml::Value = toml::from_str(&outbound_toml).expect("valid TOML");
assert!(parsed.get("subscriptions").is_some());
assert!(parsed.get("allowed_targets").is_some());
let subs = parsed["subscriptions"].as_array().unwrap();
assert!(subs[0]["name"].as_str() == Some("s"));
let allowed = parsed["allowed_targets"].as_array().unwrap();
assert!(allowed[0]["host"].as_str() == Some("h.example.com"));
assert!(allowed[0]["scheme"].as_str() == Some("https"));
}
#[test]
fn derive_allowed_targets_unions_explicit_and_subscription_hosts() {
let mut gw = gateway_fixture("gw", "demo");
gw.spec.outbound_subscriptions = vec![OutboundSubscriptionSpec {
name: "s".into(),
source_topics: vec!["t".into()],
target_url: "https://a.example.com:8443/x".into(),
dead_letter_topic: None,
max_attempts: None,
base_backoff_ms: None,
max_backoff_ms: None,
request_timeout_ms: None,
filter: None,
headers: BTreeMap::new(),
signing_secret_ref: None,
}];
gw.spec.allowed_targets = vec![AllowedTargetSpec {
scheme: "https".into(),
host: "b.example.com".into(),
}];
let allowed = derive_allowed_targets(&gw);
let hosts: std::collections::BTreeSet<String> = allowed
.iter()
.map(|v| v["host"].as_str().unwrap().to_string())
.collect();
assert!(hosts.contains("a.example.com"), "{hosts:?}");
assert!(hosts.contains("b.example.com"), "{hosts:?}");
}
#[test]
fn reqwest_url_parse_strips_port_and_path() {
assert!(
reqwest_url_parse("https://h.example.com:8443/a/b?x=1")
== Ok(("https".into(), "h.example.com".into()))
);
assert!(
reqwest_url_parse("http://h.example.com")
== Ok(("http".into(), "h.example.com".into()))
);
assert!(
reqwest_url_parse("https://user@h.example.com/x")
== Ok(("https".into(), "h.example.com".into()))
);
assert!(reqwest_url_parse("not-a-url").is_err());
}
#[test]
fn version_gate_blocks_when_not_validated() {
let mut parent = Kafka::new(
"demo",
crate::crd::KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
parent.metadata.namespace = Some("default".into());
let cond = version_gate(&parent).expect("blocked");
assert!(cond.reason == "WaitingForVersionValidation");
parent.status = Some(crate::crd::KafkaStatus {
conditions: vec![condition("KafkaVersionValid", "True", "Valid", "ok")],
..Default::default()
});
assert!(version_gate(&parent).is_none());
}
#[test]
fn deployment_authz_and_dedup_flags() {
let mut gw = gateway_fixture("gw", "demo");
gw.spec.authz = Some(GatewayAuthzSpec {
mode: Some("simple".into()),
super_users: vec!["User:admin".into(), "User:ops".into()],
acl_refresh_secs: Some(42),
bearer: Some(GatewayBearerSpec {
mode: Some("unsecured".into()),
principal_claim: Some("email".into()),
}),
});
gw.spec.dedup = Some(DedupSpec {
topic: Some("gw-dedup".into()),
partitions: Some(16),
window_ms: Some(123),
txn_id_prefix: Some("pfx".into()),
});
gw.spec.tls = Some(GatewayTlsSpec {
client_auth: Some("optional".into()),
validity_days: Some(90),
});
let dep = deployment(&gw, "demo", "img:1", "boot:9092", "sni").unwrap();
let args = dep
.spec
.unwrap()
.template
.spec
.unwrap()
.containers
.remove(0)
.args
.unwrap();
let joined = args.join(" ");
assert!(joined.contains("--authz=simple"), "{joined}");
assert!(
joined.contains("--authz-super-users=User:admin,User:ops"),
"{joined}"
);
assert!(joined.contains("--acl-refresh-secs=42"), "{joined}");
assert!(joined.contains("--bearer=unsecured"), "{joined}");
assert!(
joined.contains("--bearer-principal-claim=email"),
"{joined}"
);
assert!(joined.contains("--dedup-topic=gw-dedup"), "{joined}");
assert!(joined.contains("--dedup-partitions=16"), "{joined}");
assert!(joined.contains("--dedup-window-ms=123"), "{joined}");
assert!(joined.contains("--dedup-txn-id-prefix=pfx"), "{joined}");
assert!(joined.contains("--tls-client-auth=optional"), "{joined}");
}
use crate::crd::{
KafkaSpec, KafkaStatus, Listener, ListenerAuthentication, ListenerStatus, ListenerType,
};
fn parent_with_listeners(
spec_listeners: Vec<Listener>,
status_listeners: Vec<ListenerStatus>,
) -> Kafka {
let mut parent = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: spec_listeners,
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
parent.metadata.namespace = Some("default".into());
parent.status = Some(KafkaStatus {
listeners: status_listeners,
..Default::default()
});
parent
}
fn tls_mtls_listener(name: &str, port: i32) -> Listener {
Listener {
name: name.into(),
port,
type_: ListenerType::Internal,
tls: true,
authentication: Some(ListenerAuthentication::Tls),
configuration: None,
network_policy_peers: None,
}
}
fn plain_listener(name: &str, port: i32) -> Listener {
Listener {
name: name.into(),
port,
type_: ListenerType::Internal,
tls: false,
authentication: None,
configuration: None,
network_policy_peers: None,
}
}
fn internal_status(name: &str, port: i32) -> ListenerStatus {
ListenerStatus {
name: name.into(),
type_: ListenerType::Internal,
bootstrap_servers: format!("demo-broker-headless.default.svc.cluster.local:{port}"),
addresses: vec![],
}
}
#[test]
fn resolve_broker_endpoint_picks_tls_mtls_internal_listener() {
let parent = parent_with_listeners(
vec![
plain_listener("PLAIN", 9092),
tls_mtls_listener("secured", 9093),
],
vec![
internal_status("PLAIN", 9092),
internal_status("secured", 9093),
],
);
let (bootstrap, sni) = resolve_broker_endpoint(&parent, "default").expect("resolved");
assert!(
bootstrap == "demo-broker-headless.default.svc.cluster.local:9093",
"must resolve the secured listener bootstrap, got {bootstrap}"
);
assert!(
sni == "demo-broker-headless.default.svc.cluster.local",
"SNI must be the headless-svc SAN, got {sni}"
);
}
#[test]
fn resolve_broker_endpoint_none_without_tls_listener() {
let parent = parent_with_listeners(
vec![plain_listener("PLAIN", 9092)],
vec![internal_status("PLAIN", 9092)],
);
assert!(resolve_broker_endpoint(&parent, "default").is_none());
}
#[test]
fn resolve_broker_endpoint_none_when_tls_listener_has_no_auth() {
let anon_tls = Listener {
authentication: None,
..tls_mtls_listener("anon", 9093)
};
let parent = parent_with_listeners(vec![anon_tls], vec![internal_status("anon", 9093)]);
assert!(resolve_broker_endpoint(&parent, "default").is_none());
}
#[test]
fn resolve_broker_endpoint_none_when_bootstrap_not_in_status() {
let parent = parent_with_listeners(vec![tls_mtls_listener("secured", 9093)], vec![]);
assert!(resolve_broker_endpoint(&parent, "default").is_none());
}
}