use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[kube(
group = "crabka.io",
version = "v1alpha1",
kind = "KafkaUser",
plural = "kafkausers",
singular = "kafkauser",
shortname = "ku",
namespaced,
status = "KafkaUserStatus",
derive = "PartialEq"
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaUserSpec {
pub authentication: Authentication,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub authorization: Option<Authorization>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quotas: Option<KafkaUserQuotas>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaUserQuotas {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 0))]
pub producer_byte_rate: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 0))]
pub consumer_byte_rate: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 0, max = 100))]
pub request_percentage: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub controller_mutation_rate: Option<f64>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(tag = "type", rename_all = "kebab-case")]
#[schemars(schema_with = "authentication_schema")]
pub enum Authentication {
#[serde(rename = "scram-sha-512")]
ScramSha512(ScramSha512Auth),
#[serde(rename = "scram-sha-256")]
ScramSha256(ScramSha256Auth),
#[serde(rename = "tls")]
Tls(TlsAuth),
#[serde(rename = "tls-external")]
TlsExternal,
#[serde(rename = "delegation-token")]
DelegationToken(DelegationTokenAuth),
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct DelegationTokenAuth {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub renewers: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1))]
pub max_lifetime_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 60_000))]
pub renew_before_expiry_ms: Option<i64>,
}
fn authentication_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "object",
"required": ["type"],
"properties": {
"type": {
"type": "string",
"enum": ["scram-sha-512", "scram-sha-256", "tls", "tls-external", "delegation-token"],
},
"iterations": { "type": "integer", "minimum": 4096, "maximum": 1_000_000 },
"passwordLength": { "type": "integer", "minimum": 16, "maximum": 256 },
"validityDays": { "type": "integer", "minimum": 1, "maximum": 36500 },
"renewalDays": { "type": "integer", "minimum": 1, "maximum": 3650 },
"renewers": {
"type": "array",
"items": { "type": "string", "pattern": "^User:.+$" },
},
"maxLifetimeMs": { "type": "integer", "minimum": 1 },
"renewBeforeExpiryMs": { "type": "integer", "minimum": 60000 },
},
})
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ScramSha512Auth {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 4096, max = 1_000_000))]
pub iterations: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 16, max = 256))]
pub password_length: Option<u16>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ScramSha256Auth {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 4096, max = 1_000_000))]
pub iterations: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 16, max = 256))]
pub password_length: Option<u16>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct TlsAuth {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1, max = 36500))]
pub validity_days: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1, max = 3650))]
pub renewal_days: Option<u32>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum Authorization {
Simple(SimpleAuthorization),
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SimpleAuthorization {
#[serde(default)]
pub acls: Vec<AclRule>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct AclRule {
pub resource: AclResource,
pub operations: Vec<AclOp>,
#[serde(default = "default_host", skip_serializing_if = "is_default_host")]
pub host: String,
#[serde(default, rename = "type")]
pub permission: AclPermission,
}
fn default_host() -> String {
"*".into()
}
fn is_default_host(h: &String) -> bool {
h == "*"
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct AclResource {
#[serde(rename = "type")]
pub kind: AclResourceKind,
pub name: String,
#[serde(default)]
pub pattern_type: AclPatternType,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub enum AclResourceKind {
Topic,
Group,
Cluster,
TransactionalId,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AclPatternType {
#[default]
Literal,
Prefixed,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash)]
pub enum AclOp {
All,
Read,
Write,
Create,
Delete,
Alter,
Describe,
ClusterAction,
DescribeConfigs,
AlterConfigs,
IdempotentWrite,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AclPermission {
#[default]
Allow,
Deny,
}
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaUserStatus {
#[serde(default)]
pub conditions: Vec<crate::crd::KafkaCondition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub observed_generation: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub username: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub secret: Option<String>,
#[serde(default)]
pub scram_sha512: bool,
#[serde(default)]
pub scram_sha256: bool,
#[serde(default)]
pub quotas_in_sync: bool,
#[serde(default)]
pub tls: bool,
#[serde(default)]
pub external: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tls_cert_not_after: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tls_principal: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delegation_token_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delegation_token_expiry_timestamp_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delegation_token_max_timestamp_ms: Option<i64>,
}
impl KafkaUserQuotas {
#[must_use]
pub fn to_quota_map(&self) -> std::collections::BTreeMap<String, f64> {
let mut out = std::collections::BTreeMap::new();
if let Some(v) = self.producer_byte_rate {
out.insert("producer_byte_rate".into(), f64::from(v));
}
if let Some(v) = self.consumer_byte_rate {
out.insert("consumer_byte_rate".into(), f64::from(v));
}
if let Some(v) = self.request_percentage {
out.insert("request_percentage".into(), f64::from(v));
}
if let Some(v) = self.controller_mutation_rate {
out.insert("controller_mutation_rate".into(), v);
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use kube::CustomResourceExt as _;
#[test]
fn crd_metadata_is_correct() {
let crd = KafkaUser::crd();
assert!(crd.spec.group == "crabka.io");
assert!(crd.spec.names.kind == "KafkaUser");
assert!(crd.spec.names.plural == "kafkausers");
assert!(
crd.spec
.names
.short_names
.as_ref()
.is_some_and(|v| v.contains(&"ku".to_string())),
"expected shortname `ku`",
);
assert!(crd.spec.versions.len() == 1);
assert!(crd.spec.versions[0].name == "v1alpha1");
}
#[test]
fn full_spec_round_trips_through_json() {
let ku = KafkaUser::new(
"alice",
KafkaUserSpec {
authentication: Authentication::ScramSha512(ScramSha512Auth {
iterations: Some(16384),
password_length: Some(48),
}),
authorization: Some(Authorization::Simple(SimpleAuthorization {
acls: vec![AclRule {
resource: AclResource {
kind: AclResourceKind::Topic,
name: "orders".into(),
pattern_type: AclPatternType::Literal,
},
operations: vec![AclOp::Read, AclOp::Describe],
host: "*".into(),
permission: AclPermission::Allow,
}],
})),
quotas: Some(KafkaUserQuotas {
producer_byte_rate: Some(1_048_576),
consumer_byte_rate: Some(2_097_152),
request_percentage: Some(55),
controller_mutation_rate: Some(10.0),
}),
},
);
let json = serde_json::to_string(&ku).unwrap();
assert!(json.contains("\"type\":\"scram-sha-512\""), "got: {json}");
assert!(json.contains("\"iterations\":16384"), "got: {json}");
assert!(json.contains("\"type\":\"simple\""), "got: {json}");
assert!(json.contains("\"name\":\"orders\""), "got: {json}");
let back: KafkaUser = serde_json::from_str(&json).unwrap();
assert!(back.spec == ku.spec);
}
#[test]
fn minimum_spec_parses() {
let json = r#"{"authentication":{"type":"scram-sha-512"}}"#;
let spec: KafkaUserSpec = serde_json::from_str(json).unwrap();
assert!(matches!(
spec.authentication,
Authentication::ScramSha512(ScramSha512Auth {
iterations: None,
password_length: None,
})
));
assert!(spec.authorization.is_none());
}
#[test]
fn acl_rule_defaults_host_and_permission() {
let json = r#"{
"resource": {"type":"topic","name":"orders"},
"operations":["Read"]
}"#;
let rule: AclRule = serde_json::from_str(json).unwrap();
assert!(rule.host == "*");
assert!(rule.permission == AclPermission::Allow);
assert!(rule.resource.pattern_type == AclPatternType::Literal);
}
#[test]
fn acl_rule_omits_default_host_on_serialize() {
let rule = AclRule {
resource: AclResource {
kind: AclResourceKind::Topic,
name: "orders".into(),
pattern_type: AclPatternType::Literal,
},
operations: vec![AclOp::Read],
host: "*".into(),
permission: AclPermission::Allow,
};
let j = serde_json::to_string(&rule).unwrap();
assert!(!j.contains("host"), "default host should be omitted: {j}");
}
#[test]
fn acl_rule_emits_non_default_host() {
let rule = AclRule {
resource: AclResource {
kind: AclResourceKind::Topic,
name: "orders".into(),
pattern_type: AclPatternType::Literal,
},
operations: vec![AclOp::Read],
host: "10.0.0.0".into(),
permission: AclPermission::Allow,
};
let j = serde_json::to_string(&rule).unwrap();
assert!(j.contains("\"host\":\"10.0.0.0\""), "got: {j}");
}
#[test]
fn status_omits_optional_fields_when_unset() {
let status = KafkaUserStatus::default();
let j = serde_json::to_string(&status).unwrap();
assert!(!j.contains("observedGeneration"), "got: {j}");
assert!(!j.contains("username"), "got: {j}");
assert!(!j.contains("secret"), "got: {j}");
assert!(j.contains("\"scramSha512\":false"), "got: {j}");
assert!(j.contains("\"quotasInSync\":false"), "got: {j}");
}
#[test]
fn quotas_empty_serializes_as_empty_object() {
let q = KafkaUserQuotas::default();
let j = serde_json::to_string(&q).unwrap();
assert!(j == "{}");
assert!(q.to_quota_map().is_empty());
}
#[test]
fn quotas_to_map_only_emits_set_fields() {
let q = KafkaUserQuotas {
producer_byte_rate: Some(1_048_576),
consumer_byte_rate: None,
request_percentage: Some(25),
controller_mutation_rate: None,
};
let m = q.to_quota_map();
assert!(m.len() == 2);
assert!((m["producer_byte_rate"] - 1_048_576.0).abs() < f64::EPSILON);
assert!((m["request_percentage"] - 25.0).abs() < f64::EPSILON);
}
#[test]
fn quotas_to_map_carries_controller_mutation_rate_as_double() {
let q = KafkaUserQuotas {
controller_mutation_rate: Some(2.5),
..Default::default()
};
let m = q.to_quota_map();
assert!(m.len() == 1);
assert!((m["controller_mutation_rate"] - 2.5).abs() < f64::EPSILON);
}
#[test]
fn quotas_parse_from_strimzi_shape() {
let json = r#"{
"producerByteRate": 1048576,
"consumerByteRate": 2097152,
"requestPercentage": 55,
"controllerMutationRate": 10.5
}"#;
let q: KafkaUserQuotas = serde_json::from_str(json).unwrap();
assert!(q.producer_byte_rate == Some(1_048_576));
assert!(q.consumer_byte_rate == Some(2_097_152));
assert!(q.request_percentage == Some(55));
assert!(q.controller_mutation_rate == Some(10.5));
}
#[test]
fn empty_quotas_object_parses_and_is_a_clear_signal() {
let json = r#"{"authentication":{"type":"scram-sha-512"},"quotas":{}}"#;
let spec: KafkaUserSpec = serde_json::from_str(json).unwrap();
let q = spec.quotas.expect("quotas section present");
assert!(q.to_quota_map().is_empty());
}
#[test]
fn omitted_quotas_means_operator_does_not_manage() {
let json = r#"{"authentication":{"type":"scram-sha-512"}}"#;
let spec: KafkaUserSpec = serde_json::from_str(json).unwrap();
assert!(spec.quotas.is_none());
}
#[test]
fn tls_auth_round_trips() {
let auth = Authentication::Tls(TlsAuth::default());
let v = serde_json::to_value(&auth).unwrap();
assert!(v == serde_json::json!({"type": "tls"}));
let back: Authentication = serde_json::from_value(v).unwrap();
assert!(back == auth);
}
#[test]
fn tls_auth_with_validity_days_round_trips() {
let auth = Authentication::Tls(TlsAuth {
validity_days: Some(180),
renewal_days: Some(14),
});
let v = serde_json::to_value(&auth).unwrap();
assert!(
v == serde_json::json!({
"type": "tls",
"validityDays": 180,
"renewalDays": 14,
})
);
let back: Authentication = serde_json::from_value(v).unwrap();
assert!(back == auth);
}
#[test]
fn authentication_scram_round_trips_unchanged() {
let auth = Authentication::ScramSha512(ScramSha512Auth::default());
let v = serde_json::to_value(&auth).unwrap();
assert!(v == serde_json::json!({"type": "scram-sha-512"}));
let back: Authentication = serde_json::from_value(v).unwrap();
assert!(back == auth);
}
#[test]
fn authentication_scram_sha256_round_trips_with_overrides() {
let auth_default = Authentication::ScramSha256(ScramSha256Auth::default());
let v = serde_json::to_value(&auth_default).unwrap();
assert!(v == serde_json::json!({"type": "scram-sha-256"}));
let back: Authentication = serde_json::from_value(v).unwrap();
assert!(back == auth_default);
let auth_overrides = Authentication::ScramSha256(ScramSha256Auth {
iterations: Some(16_384),
password_length: Some(64),
});
let v = serde_json::to_value(&auth_overrides).unwrap();
assert!(
v == serde_json::json!({
"type": "scram-sha-256",
"iterations": 16_384,
"passwordLength": 64,
})
);
let back: Authentication = serde_json::from_value(v).unwrap();
assert!(back == auth_overrides);
}
#[test]
fn status_tls_fields_omit_when_unset() {
let status = KafkaUserStatus {
tls: false,
tls_cert_not_after: None,
tls_principal: None,
..Default::default()
};
let j = serde_json::to_string(&status).unwrap();
assert!(!j.contains("tlsCertNotAfter"), "got: {j}");
assert!(!j.contains("tlsPrincipal"), "got: {j}");
assert!(j.contains("\"tls\":false"), "got: {j}");
}
#[test]
fn status_tls_fields_emit_when_set() {
let status = KafkaUserStatus {
tls: true,
tls_cert_not_after: Some("2027-05-19T00:00:00Z".into()),
tls_principal: Some("User:CN=alice".into()),
..Default::default()
};
let v = serde_json::to_value(&status).unwrap();
assert!(v.get("tls") == Some(&serde_json::Value::Bool(true)));
assert!(v.get("tlsCertNotAfter").and_then(|x| x.as_str()) == Some("2027-05-19T00:00:00Z"));
assert!(v.get("tlsPrincipal").and_then(|x| x.as_str()) == Some("User:CN=alice"));
}
#[test]
fn tls_external_round_trips() {
let auth = Authentication::TlsExternal;
let j = serde_json::to_string(&auth).unwrap();
assert!(j == r#"{"type":"tls-external"}"#);
let back: Authentication = serde_json::from_str(&j).unwrap();
assert!(back == auth);
}
#[test]
fn tls_external_with_quotas_and_acls_round_trips() {
let spec = KafkaUserSpec {
authentication: Authentication::TlsExternal,
authorization: Some(Authorization::Simple(SimpleAuthorization {
acls: vec![AclRule {
resource: AclResource {
kind: AclResourceKind::Topic,
name: "orders".into(),
pattern_type: AclPatternType::Literal,
},
operations: vec![AclOp::Read],
host: "*".into(),
permission: AclPermission::Allow,
}],
})),
quotas: Some(KafkaUserQuotas {
producer_byte_rate: Some(1_048_576),
..Default::default()
}),
};
let j = serde_json::to_string(&spec).unwrap();
assert!(j.contains("\"type\":\"tls-external\""), "got: {j}");
assert!(j.contains("\"name\":\"orders\""), "got: {j}");
assert!(j.contains("\"producerByteRate\":1048576"), "got: {j}");
let back: KafkaUserSpec = serde_json::from_str(&j).unwrap();
assert!(back == spec);
}
#[test]
fn tls_external_minimum_spec_parses() {
let json = r#"{"authentication":{"type":"tls-external"}}"#;
let spec: KafkaUserSpec = serde_json::from_str(json).unwrap();
assert!(spec.authentication == Authentication::TlsExternal);
assert!(spec.authorization.is_none());
assert!(spec.quotas.is_none());
}
#[test]
fn status_external_field_emits_when_true() {
let status = KafkaUserStatus {
external: true,
..Default::default()
};
let v = serde_json::to_value(&status).unwrap();
assert!(v.get("external") == Some(&serde_json::Value::Bool(true)));
}
#[test]
fn status_external_field_emits_default_false() {
let status = KafkaUserStatus::default();
let j = serde_json::to_string(&status).unwrap();
assert!(j.contains("\"external\":false"), "got: {j}");
}
#[test]
fn delegation_token_authentication_round_trip() {
let yaml = r#"
apiVersion: crabka.io/v1alpha1
kind: KafkaUser
metadata:
name: alice
spec:
authentication:
type: delegation-token
renewers: ["User:bob", "User:carol"]
maxLifetimeMs: 86400000
renewBeforeExpiryMs: 7200000
"#;
let user: KafkaUser = serde_yaml::from_str(yaml).unwrap();
let Authentication::DelegationToken(dt) = user.spec.authentication else {
panic!("expected DelegationToken variant");
};
assert!(dt.renewers == vec!["User:bob", "User:carol"]);
assert!(dt.max_lifetime_ms == Some(86_400_000));
assert!(dt.renew_before_expiry_ms == Some(7_200_000));
}
#[test]
fn delegation_token_authentication_minimal_omits_optional_fields() {
let yaml = "
apiVersion: crabka.io/v1alpha1
kind: KafkaUser
metadata:
name: alice
spec:
authentication:
type: delegation-token
";
let user: KafkaUser = serde_yaml::from_str(yaml).unwrap();
let Authentication::DelegationToken(dt) = user.spec.authentication else {
panic!("expected DelegationToken variant");
};
assert!(dt.renewers.is_empty());
assert!(dt.max_lifetime_ms.is_none());
assert!(dt.renew_before_expiry_ms.is_none());
}
}