use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[kube(
group = "crabka.io",
version = "v1alpha1",
kind = "Kafka",
plural = "kafkas",
singular = "kafka",
shortname = "kk",
namespaced,
status = "KafkaStatus",
derive = "PartialEq"
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaSpec {
pub kafka_version: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<std::collections::BTreeMap<String, String>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub listeners: Vec<crate::crd::Listener>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub inter_broker_listener_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metrics_config: Option<crate::crd::MetricsConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network_policy: Option<crate::crd::NetworkPolicySpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cluster_ca: Option<crate::crd::CertificateAuthority>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub clients_ca: Option<crate::crd::CertificateAuthority>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub logging: Option<crate::crd::Logging>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delegation_token: Option<DelegationTokenConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub authorization: Option<Authorization>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tiered_storage: Option<TieredStorage>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub inter_broker_kerberos: Option<InterBrokerKerberos>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub krb5_conf_secret_ref: Option<Krb5ConfSecretRef>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tracing: Option<Tracing>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct InterBrokerKerberos {
pub client_principal: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_name: Option<String>,
pub kdc_url: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Krb5ConfSecretRef {
pub secret_name: String,
pub key: String,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct TieredStorage {
#[serde(rename = "type")]
pub kind: TieredStorageType,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub s3: Option<S3StorageSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_manager: Option<MetadataManagerSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub persistence: Option<TieredStoragePersistence>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct TieredStoragePersistence {
pub size: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub class: Option<String>,
#[serde(default)]
pub delete_claim: bool,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
pub enum TieredStorageType {
#[default]
Local,
S3,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct S3StorageSpec {
pub bucket: String,
pub region: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prefix: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub credentials: Option<S3Credentials>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub allow_http: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub multipart_threshold: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub multipart_chunk_size: Option<u64>,
}
impl TieredStorage {
pub fn validate(&self) -> Result<(), String> {
match (self.kind, &self.s3) {
(TieredStorageType::Local, Some(_)) => {
return Err("type=Local must not set `s3`".into());
}
(TieredStorageType::S3, None) => {
return Err("type=S3 requires `s3` (bucket + region at minimum)".into());
}
(TieredStorageType::Local, None) => {}
(TieredStorageType::S3, Some(s3)) => {
if s3.bucket.trim().is_empty() {
return Err("s3.bucket is required and must be non-empty".into());
}
if s3.region.trim().is_empty() {
return Err("s3.region is required and must be non-empty".into());
}
}
}
if let Some(mm) = self.metadata_manager.as_ref() {
mm.validate()?;
}
if let Some(p) = self.persistence.as_ref() {
if self.kind != TieredStorageType::Local {
return Err("persistence is only valid with type=Local".into());
}
if p.size.trim().is_empty() {
return Err("persistence.size is required and must be non-empty".into());
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct MetadataManagerSpec {
#[serde(rename = "type")]
pub kind: MetadataManagerType,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub topic: Option<TopicMetadataManagerSpec>,
}
impl MetadataManagerSpec {
pub fn validate(&self) -> Result<(), String> {
match (self.kind, &self.topic) {
(MetadataManagerType::InMemory, Some(_)) => {
Err("metadataManager.type=InMemory must not set `topic`".into())
}
(MetadataManagerType::Topic | MetadataManagerType::InMemory, None) => Ok(()),
(MetadataManagerType::Topic, Some(topic)) => topic.validate(),
}
}
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
pub enum MetadataManagerType {
InMemory,
#[default]
Topic,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct TopicMetadataManagerSpec {
pub bootstrap: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub num_partitions: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replication: Option<i32>,
}
impl TopicMetadataManagerSpec {
pub fn validate(&self) -> Result<(), String> {
if self.bootstrap.trim().is_empty() {
return Err("metadataManager.topic.bootstrap is required and must be non-empty".into());
}
if let Some(p) = self.num_partitions
&& p <= 0
{
return Err(format!(
"metadataManager.topic.numPartitions must be > 0 (got {p})"
));
}
if let Some(r) = self.replication
&& r <= 0
{
return Err(format!(
"metadataManager.topic.replication must be > 0 (got {r})"
));
}
Ok(())
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Tracing {
#[serde(rename = "type")]
pub kind: TracingType,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub otlp: Option<OtlpTracing>,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
pub enum TracingType {
#[default]
Otlp,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct OtlpTracing {
pub endpoint: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub protocol: Option<OtlpProtocol>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sample_ratio: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_secs: Option<u64>,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum OtlpProtocol {
Grpc,
HttpProtobuf,
}
impl OtlpProtocol {
#[must_use]
pub fn as_env_value(self) -> &'static str {
match self {
Self::Grpc => "grpc",
Self::HttpProtobuf => "http/protobuf",
}
}
}
impl Tracing {
pub fn validate(&self) -> Result<(), String> {
match (self.kind, &self.otlp) {
(TracingType::Otlp, None) => {
Err("type=Otlp requires `otlp` (endpoint at minimum)".into())
}
(TracingType::Otlp, Some(otlp)) => {
if otlp.endpoint.trim().is_empty() {
return Err("otlp.endpoint is required and must be non-empty".into());
}
if let Some(r) = otlp.sample_ratio
&& !(0.0..=1.0).contains(&r)
{
return Err(format!("otlp.sampleRatio must be in [0.0, 1.0] (got {r})"));
}
if let Some(s) = otlp.service_name.as_deref()
&& s.trim().is_empty()
{
return Err("otlp.serviceName, when set, must be non-empty".into());
}
if otlp.timeout_secs == Some(0) {
return Err("otlp.timeoutSecs, when set, must be > 0".into());
}
Ok(())
}
}
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct S3Credentials {
pub access_key_id: SecretKeyRef,
pub secret_access_key: SecretKeyRef,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DelegationTokenConfig {
pub secret_key_ref: SecretKeyRef,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SecretKeyRef {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(tag = "type", rename_all = "kebab-case")]
#[schemars(schema_with = "authorization_schema")]
pub enum Authorization {
#[serde(rename = "simple")]
Simple(SimpleAuthorization),
#[serde(rename = "opa")]
Opa(OpaAuthorization),
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SimpleAuthorization {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub super_users: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct OpaAuthorization {
pub url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub allow_on_error: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 0))]
pub initial_cache_capacity: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1))]
pub maximum_cache_size: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(range(min = 1000))]
pub expire_after_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub super_users: Vec<String>,
}
fn authorization_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
schemars::json_schema!({
"type": "object",
"required": ["type"],
"properties": {
"type": {
"type": "string",
"enum": ["simple", "opa"],
},
"superUsers": {
"type": "array",
"items": { "type": "string" },
},
"url": { "type": "string" },
"allowOnError": { "type": "boolean" },
"initialCacheCapacity": { "type": "integer", "minimum": 0 },
"maximumCacheSize": { "type": "integer", "minimum": 1 },
"expireAfterMs": { "type": "integer", "minimum": 1000 },
},
})
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaStatus {
#[serde(default)]
pub conditions: Vec<KafkaCondition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replicas: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ready_replicas: Option<i32>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub listeners: Vec<crate::crd::ListenerStatus>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cluster_ca: Option<crate::crd::CertificateAuthorityStatus>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub clients_ca: Option<crate::crd::CertificateAuthorityStatus>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub kafka_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_version: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaCondition {
#[serde(rename = "type")]
pub type_: String,
pub status: String,
pub reason: String,
pub message: String,
pub last_transition_time: String,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use kube::CustomResourceExt as _;
#[test]
fn crd_metadata_is_correct() {
let crd = Kafka::crd();
assert!(crd.spec.group == "crabka.io");
assert!(crd.spec.names.kind == "Kafka");
assert!(crd.spec.names.plural == "kafkas");
assert!(crd.spec.versions.len() == 1);
assert!(crd.spec.versions[0].name == "v1alpha1");
}
#[test]
fn round_trips_through_json() {
let k = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
let json = serde_json::to_string(&k).unwrap();
assert!(
json.contains("\"kafkaVersion\""),
"expected camelCase wire shape, got: {json}"
);
let back: Kafka = serde_json::from_str(&json).unwrap();
assert!(back.spec == k.spec);
}
#[test]
fn spec_omits_metrics_config_when_none() {
let k = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
let j = serde_json::to_string(&k.spec).unwrap();
assert!(!j.contains("metricsConfig"), "got: {j}");
}
#[test]
fn spec_carries_metrics_config_pod_monitor() {
use crate::crd::{MetricsConfig, PodMonitorSpec};
let json = r#"{"kafkaVersion":"0.1.1","metricsConfig":{"podMonitor":{"interval":"30s"}}}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
let cfg: MetricsConfig = spec.metrics_config.expect("metricsConfig present");
let pm: PodMonitorSpec = cfg.pod_monitor.expect("podMonitor present");
assert!(pm.interval.as_deref() == Some("30s"));
}
#[test]
fn spec_only_carries_kafka_version() {
let json = r#"{"kafkaVersion":"0.1.1"}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.kafka_version == "0.1.1");
assert!(spec.config.is_none());
}
#[test]
fn spec_carries_config() {
let json = r#"{"kafkaVersion":"0.1.1","config":{"log.retention.hours":"24"}}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
let cfg = spec.config.expect("config present");
assert!(cfg.get("log.retention.hours").map(String::as_str) == Some("24"));
}
#[test]
fn spec_carries_listeners() {
use crate::crd::ListenerType;
let json = r#"{
"kafkaVersion":"0.1.1",
"listeners":[{"name":"PLAIN","port":9092,"type":"internal","tls":false}],
"interBrokerListenerName":"PLAIN"
}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.listeners.len() == 1);
assert!(spec.listeners[0].name == "PLAIN");
assert!(spec.listeners[0].type_ == ListenerType::Internal);
assert!(spec.inter_broker_listener_name.as_deref() == Some("PLAIN"));
}
#[test]
fn spec_defaults_listeners_to_empty() {
let json = r#"{"kafkaVersion":"0.1.1"}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.listeners.is_empty());
assert!(spec.inter_broker_listener_name.is_none());
}
#[test]
fn status_carries_listener_status() {
use crate::crd::{ListenerAddress, ListenerStatus, ListenerType};
let status = KafkaStatus {
conditions: vec![],
replicas: Some(1),
ready_replicas: Some(1),
listeners: vec![ListenerStatus {
name: "PLAIN".into(),
type_: ListenerType::Internal,
bootstrap_servers: "demo-broker-headless.default.svc.cluster.local:9092".into(),
addresses: vec![ListenerAddress {
host: "demo-broker-headless.default.svc.cluster.local".into(),
port: 9092,
}],
}],
cluster_ca: None,
clients_ca: None,
kafka_version: None,
metadata_version: None,
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("\"bootstrapServers\""), "got: {json}");
let back: KafkaStatus = serde_json::from_str(&json).unwrap();
assert!(back == status);
}
#[test]
fn spec_carries_metadata_version() {
let json = r#"{"kafkaVersion":"3.7.0","metadataVersion":"3.6"}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.metadata_version.as_deref() == Some("3.6"));
}
#[test]
fn spec_omits_metadata_version_when_none() {
let k = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "3.7.0".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
let j = serde_json::to_string(&k.spec).unwrap();
assert!(!j.contains("metadataVersion"), "got: {j}");
}
#[test]
fn status_carries_version_fields() {
let status = KafkaStatus {
kafka_version: Some("3.7.0".into()),
metadata_version: Some("3.7".into()),
..Default::default()
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("\"metadataVersion\":\"3.7\""), "got: {json}");
let back: KafkaStatus = serde_json::from_str(&json).unwrap();
assert!(back == status);
}
#[test]
fn spec_omits_network_policy_when_none() {
let k = Kafka::new(
"demo",
KafkaSpec {
kafka_version: "0.1.1".into(),
metadata_version: None,
config: None,
listeners: vec![],
inter_broker_listener_name: None,
metrics_config: None,
network_policy: None,
cluster_ca: None,
clients_ca: None,
logging: None,
delegation_token: None,
authorization: None,
tiered_storage: None,
inter_broker_kerberos: None,
krb5_conf_secret_ref: None,
tracing: None,
},
);
let j = serde_json::to_string(&k.spec).unwrap();
assert!(!j.contains("networkPolicy"), "got: {j}");
}
#[test]
fn spec_carries_network_policy_when_set() {
let json = r#"{"kafkaVersion":"0.1.1","networkPolicy":{}}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.network_policy.is_some(), "networkPolicy parsed");
}
#[test]
fn spec_omits_logging_when_none() {
let json = r#"{"kafkaVersion":"0.1.1"}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.logging.is_none());
let j = serde_json::to_string(&spec).unwrap();
assert!(!j.contains("logging"), "got: {j}");
}
#[test]
fn spec_carries_inline_logging() {
use crate::crd::LoggingType;
let json = r#"{"kafkaVersion":"0.1.1","logging":{"loggers":{"root":"info","crabka_broker":"debug"}}}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
let lg = spec.logging.expect("logging present");
assert!(lg.r#type == LoggingType::Inline);
assert!(lg.loggers.get("crabka_broker").map(String::as_str) == Some("debug"));
}
#[test]
fn kafka_spec_parses_without_ca_fields() {
let v: KafkaSpec = serde_json::from_value(serde_json::json!({
"kafkaVersion": "3.7.0",
}))
.expect("parse minimal spec");
assert!(v.cluster_ca.is_none());
assert!(v.clients_ca.is_none());
}
#[test]
fn spec_omits_delegation_token_when_none() {
let json = r#"{"kafkaVersion":"0.1.1"}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.delegation_token.is_none());
let j = serde_json::to_string(&spec).unwrap();
assert!(!j.contains("delegationToken"), "got: {j}");
}
#[test]
fn spec_carries_delegation_token_with_default_key() {
let json = r#"{
"kafkaVersion":"0.1.1",
"delegationToken":{"secretKeyRef":{"name":"dt-master"}}
}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
let dt = spec.delegation_token.expect("delegationToken present");
assert!(dt.secret_key_ref.name == "dt-master");
assert!(dt.secret_key_ref.key.is_none());
}
#[test]
fn spec_carries_delegation_token_with_explicit_key() {
let json = r#"{
"kafkaVersion":"0.1.1",
"delegationToken":{"secretKeyRef":{"name":"dt-master","key":"hmac"}}
}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
let dt = spec.delegation_token.expect("delegationToken present");
assert!(dt.secret_key_ref.name == "dt-master");
assert!(dt.secret_key_ref.key.as_deref() == Some("hmac"));
}
#[test]
fn kafka_spec_parses_with_ca_fields() {
let v: KafkaSpec = serde_json::from_value(serde_json::json!({
"kafkaVersion": "3.7.0",
"clusterCa": { "validityDays": 30 },
"clientsCa": { "generateCertificateAuthority": false },
}))
.expect("parse with CAs");
assert!(v.cluster_ca.as_ref().unwrap().validity_days == 30);
assert!(
!v.clients_ca
.as_ref()
.unwrap()
.generate_certificate_authority
);
}
#[test]
fn simple_authorization_round_trip() {
let yaml = r"
kafkaVersion: 0.1.1
authorization:
type: simple
superUsers:
- User:admin
- ANONYMOUS
";
let spec: KafkaSpec = serde_yaml::from_str(yaml).expect("yaml must parse");
let Some(Authorization::Simple(simple)) = spec.authorization.clone() else {
panic!("expected Simple variant, got {:?}", spec.authorization);
};
assert!(simple.super_users == vec!["User:admin".to_string(), "ANONYMOUS".to_string()]);
let json = serde_json::to_string(&spec).unwrap();
assert!(json.contains("\"type\":\"simple\""), "got: {json}");
assert!(
json.contains("\"superUsers\":[\"User:admin\",\"ANONYMOUS\"]"),
"got: {json}"
);
let back: KafkaSpec = serde_json::from_str(&json).unwrap();
assert!(back == spec);
}
#[test]
fn opa_authorization_round_trip_full_fields() {
let yaml = r"
kafkaVersion: 0.1.1
authorization:
type: opa
url: http://opa.opa.svc:8181/v1/data/kafka/authz/allow
allowOnError: true
initialCacheCapacity: 1000
maximumCacheSize: 50000
expireAfterMs: 60000
superUsers:
- User:admin
- ANONYMOUS
";
let spec: KafkaSpec = serde_yaml::from_str(yaml).expect("yaml must parse");
let Some(Authorization::Opa(opa)) = spec.authorization.clone() else {
panic!("expected Opa variant, got {:?}", spec.authorization);
};
assert!(opa.url == "http://opa.opa.svc:8181/v1/data/kafka/authz/allow");
assert!(opa.allow_on_error == Some(true));
assert!(opa.initial_cache_capacity == Some(1000));
assert!(opa.maximum_cache_size == Some(50_000));
assert!(opa.expire_after_ms == Some(60_000));
assert!(opa.super_users == vec!["User:admin".to_string(), "ANONYMOUS".to_string()]);
let json = serde_json::to_string(&spec).unwrap();
assert!(json.contains("\"type\":\"opa\""), "got: {json}");
assert!(json.contains("\"allowOnError\":true"), "got: {json}");
assert!(
json.contains("\"initialCacheCapacity\":1000"),
"got: {json}"
);
assert!(json.contains("\"maximumCacheSize\":50000"), "got: {json}");
assert!(json.contains("\"expireAfterMs\":60000"), "got: {json}");
let back: KafkaSpec = serde_json::from_str(&json).unwrap();
assert!(back == spec);
}
#[test]
fn opa_authorization_minimal_omits_optional_fields() {
let yaml = r"
kafkaVersion: 0.1.1
authorization:
type: opa
url: http://opa.opa.svc:8181/v1/data/kafka/authz/allow
";
let spec: KafkaSpec = serde_yaml::from_str(yaml).expect("yaml must parse");
let Some(Authorization::Opa(opa)) = spec.authorization.clone() else {
panic!("expected Opa variant, got {:?}", spec.authorization);
};
assert!(opa.url == "http://opa.opa.svc:8181/v1/data/kafka/authz/allow");
assert!(opa.allow_on_error == None);
assert!(opa.initial_cache_capacity == None);
assert!(opa.maximum_cache_size == None);
assert!(opa.expire_after_ms == None);
assert!(opa.super_users.is_empty());
let json = serde_json::to_string(&spec).unwrap();
for absent in [
"allowOnError",
"initialCacheCapacity",
"maximumCacheSize",
"expireAfterMs",
"superUsers",
] {
assert!(
!json.contains(absent),
"{absent} must be omitted when None/empty; got: {json}"
);
}
let back: KafkaSpec = serde_json::from_str(&json).unwrap();
assert!(back == spec);
}
#[test]
fn tiered_storage_round_trips_through_json() {
let json = r#"{"kafkaVersion":"0.1.1","tieredStorage":{"type":"Local"}}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
let ts = spec.tiered_storage.as_ref().expect("tieredStorage parsed");
assert!(ts.kind == TieredStorageType::Local);
let serialized = serde_json::to_string(&spec).unwrap();
assert!(
serialized.contains("\"tieredStorage\":{\"type\":\"Local\"}"),
"round-trip JSON: {serialized}"
);
}
#[test]
fn tiered_storage_omitted_when_none() {
let json = r#"{"kafkaVersion":"0.1.1"}"#;
let spec: KafkaSpec = serde_json::from_str(json).unwrap();
assert!(spec.tiered_storage.is_none());
let j = serde_json::to_string(&spec).unwrap();
assert!(!j.contains("tieredStorage"), "got: {j}");
}
#[test]
fn tiered_storage_rejects_unknown_type() {
let json = r#"{"kafkaVersion":"0.1.1","tieredStorage":{"type":"Bogus"}}"#;
let res: Result<KafkaSpec, _> = serde_json::from_str(json);
assert!(res.is_err(), "unknown TieredStorageType must fail");
}
#[test]
fn tiered_storage_s3_round_trips_through_json() {
let ts = TieredStorage {
kind: TieredStorageType::S3,
s3: Some(S3StorageSpec {
bucket: "b".into(),
region: "r".into(),
prefix: Some("p".into()),
endpoint: Some("http://m:9000".into()),
credentials: Some(S3Credentials {
access_key_id: SecretKeyRef {
name: "creds".into(),
key: Some("ak".into()),
},
secret_access_key: SecretKeyRef {
name: "creds".into(),
key: Some("sk".into()),
},
}),
allow_http: true,
multipart_threshold: Some(1024),
multipart_chunk_size: Some(512),
}),
metadata_manager: None,
persistence: None,
};
let j = serde_json::to_string(&ts).unwrap();
assert!(j.contains("\"type\":\"S3\""), "got: {j}");
assert!(j.contains("\"s3\""), "got: {j}");
assert!(j.contains("\"accessKeyId\""), "got: {j}");
assert!(j.contains("\"secretAccessKey\""), "got: {j}");
assert!(j.contains("\"allowHttp\":true"), "got: {j}");
assert!(j.contains("\"multipartThreshold\":1024"), "got: {j}");
let back: TieredStorage = serde_json::from_str(&j).unwrap();
assert!(back == ts);
}
#[test]
fn tiered_storage_validate_local_ok_only_without_s3() {
let ok = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: None,
};
assert!(ok.validate().is_ok());
let bad = TieredStorage {
kind: TieredStorageType::Local,
s3: Some(S3StorageSpec::default()),
metadata_manager: None,
persistence: None,
};
assert!(
bad.validate().is_err(),
"type=Local with s3 must be rejected",
);
}
#[test]
fn tiered_storage_validate_s3_requires_s3_and_non_empty_bucket_region() {
let missing_s3 = TieredStorage {
kind: TieredStorageType::S3,
s3: None,
metadata_manager: None,
persistence: None,
};
assert!(missing_s3.validate().is_err());
let missing_bucket = TieredStorage {
kind: TieredStorageType::S3,
s3: Some(S3StorageSpec {
bucket: String::new(),
region: "r".into(),
..Default::default()
}),
metadata_manager: None,
persistence: None,
};
assert!(missing_bucket.validate().is_err());
let missing_region = TieredStorage {
kind: TieredStorageType::S3,
s3: Some(S3StorageSpec {
bucket: "b".into(),
region: " ".into(),
..Default::default()
}),
metadata_manager: None,
persistence: None,
};
assert!(missing_region.validate().is_err());
let ok = TieredStorage {
kind: TieredStorageType::S3,
s3: Some(S3StorageSpec {
bucket: "b".into(),
region: "r".into(),
..Default::default()
}),
metadata_manager: None,
persistence: None,
};
assert!(ok.validate().is_ok());
}
#[test]
fn metadata_manager_inmemory_with_topic_is_rejected() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: Some(MetadataManagerSpec {
kind: MetadataManagerType::InMemory,
topic: Some(TopicMetadataManagerSpec {
bootstrap: "127.0.0.1:9092".into(),
num_partitions: None,
replication: None,
}),
}),
persistence: None,
};
let err = ts.validate().unwrap_err();
assert!(err.contains("must not set `topic`"), "got: {err}");
}
#[test]
fn metadata_manager_topic_without_topic_is_valid() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: Some(MetadataManagerSpec {
kind: MetadataManagerType::Topic,
topic: None,
}),
persistence: None,
};
assert!(ts.validate().is_ok());
}
#[test]
fn metadata_manager_topic_requires_non_empty_bootstrap() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: Some(MetadataManagerSpec {
kind: MetadataManagerType::Topic,
topic: Some(TopicMetadataManagerSpec {
bootstrap: " ".into(),
num_partitions: None,
replication: None,
}),
}),
persistence: None,
};
let err = ts.validate().unwrap_err();
assert!(err.contains("bootstrap is required"), "got: {err}");
}
#[test]
fn metadata_manager_topic_rejects_non_positive_partition_count() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: Some(MetadataManagerSpec {
kind: MetadataManagerType::Topic,
topic: Some(TopicMetadataManagerSpec {
bootstrap: "127.0.0.1:9094".into(),
num_partitions: Some(0),
replication: None,
}),
}),
persistence: None,
};
let err = ts.validate().unwrap_err();
assert!(err.contains("numPartitions"), "got: {err}");
}
#[test]
fn metadata_manager_topic_with_defaults_validates() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: Some(MetadataManagerSpec {
kind: MetadataManagerType::Topic,
topic: Some(TopicMetadataManagerSpec {
bootstrap: "127.0.0.1:9094".into(),
num_partitions: None,
replication: None,
}),
}),
persistence: None,
};
assert!(ts.validate().is_ok());
}
#[test]
fn persistence_requires_local_kind() {
let ts = TieredStorage {
kind: TieredStorageType::S3,
s3: Some(S3StorageSpec {
bucket: "b".into(),
region: "r".into(),
..Default::default()
}),
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: "50Gi".into(),
class: None,
delete_claim: false,
}),
};
let err = ts.validate().unwrap_err();
assert!(err.contains("persistence is only valid with type=Local"));
}
#[test]
fn persistence_size_must_be_non_empty() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: " ".into(),
class: None,
delete_claim: false,
}),
};
let err = ts.validate().unwrap_err();
assert!(err.contains("persistence.size is required"));
}
#[test]
fn persistence_with_local_validates() {
let ts = TieredStorage {
kind: TieredStorageType::Local,
s3: None,
metadata_manager: None,
persistence: Some(TieredStoragePersistence {
size: "100Gi".into(),
class: Some("fast-ssd".into()),
delete_claim: false,
}),
};
assert!(ts.validate().is_ok());
}
#[test]
fn persistence_delete_claim_round_trips() {
let p = TieredStoragePersistence {
size: "10Gi".into(),
class: None,
delete_claim: true,
};
let yaml = serde_yaml::to_string(&p).unwrap();
assert!(yaml.contains("deleteClaim: true"));
let back: TieredStoragePersistence = serde_yaml::from_str(&yaml).unwrap();
assert!(back == p);
}
#[test]
fn persistence_delete_claim_defaults_false() {
let yaml = "size: 5Gi\n";
let p: TieredStoragePersistence = serde_yaml::from_str(yaml).unwrap();
assert!(!p.delete_claim);
}
#[test]
fn tracing_otlp_without_otlp_block_is_rejected() {
let t = Tracing {
kind: TracingType::Otlp,
otlp: None,
};
let err = t.validate().unwrap_err();
assert!(err.contains("type=Otlp requires `otlp`"), "got: {err}");
}
#[test]
fn tracing_otlp_requires_non_empty_endpoint() {
let t = Tracing {
kind: TracingType::Otlp,
otlp: Some(OtlpTracing {
endpoint: " ".into(),
protocol: None,
sample_ratio: None,
service_name: None,
timeout_secs: None,
}),
};
let err = t.validate().unwrap_err();
assert!(err.contains("otlp.endpoint is required"), "got: {err}");
}
#[test]
fn tracing_otlp_rejects_out_of_range_sample_ratio() {
let t = Tracing {
kind: TracingType::Otlp,
otlp: Some(OtlpTracing {
endpoint: "http://otel:4317".into(),
protocol: None,
sample_ratio: Some(1.5),
service_name: None,
timeout_secs: None,
}),
};
let err = t.validate().unwrap_err();
assert!(err.contains("otlp.sampleRatio"), "got: {err}");
}
#[test]
fn tracing_otlp_rejects_zero_timeout() {
let t = Tracing {
kind: TracingType::Otlp,
otlp: Some(OtlpTracing {
endpoint: "http://otel:4317".into(),
protocol: None,
sample_ratio: None,
service_name: None,
timeout_secs: Some(0),
}),
};
let err = t.validate().unwrap_err();
assert!(err.contains("otlp.timeoutSecs"), "got: {err}");
}
#[test]
fn tracing_otlp_with_full_spec_validates() {
let t = Tracing {
kind: TracingType::Otlp,
otlp: Some(OtlpTracing {
endpoint: "http://otel-collector.observability:4317".into(),
protocol: Some(OtlpProtocol::Grpc),
sample_ratio: Some(0.1),
service_name: Some("prod-cluster".into()),
timeout_secs: Some(5),
}),
};
assert!(t.validate().is_ok());
}
#[test]
fn otlp_protocol_env_value_matches_broker_parse() {
assert!(OtlpProtocol::Grpc.as_env_value() == "grpc");
assert!(OtlpProtocol::HttpProtobuf.as_env_value() == "http/protobuf");
}
}