use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[kube(
group = "crabka.io",
version = "v1alpha1",
kind = "KafkaTopic",
plural = "kafkatopics",
singular = "kafkatopic",
shortname = "kt",
namespaced,
status = "KafkaTopicStatus",
derive = "PartialEq"
)]
#[serde(rename_all = "camelCase")]
pub struct KafkaTopicSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub topic_name: Option<String>,
#[schemars(range(min = 1, max = 1_000_000))]
pub partitions: i32,
#[schemars(range(min = 1, max = 1_000))]
pub replicas: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<BTreeMap<String, String>>,
#[serde(default)]
pub preserve_topic: bool,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaTopicStatus {
#[serde(default)]
pub conditions: Vec<crate::crd::KafkaCondition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub observed_generation: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub topic_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub topic_id: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use kube::CustomResourceExt as _;
#[test]
fn crd_metadata_is_correct() {
let crd = KafkaTopic::crd();
assert!(crd.spec.group == "crabka.io");
assert!(crd.spec.names.kind == "KafkaTopic");
assert!(crd.spec.names.plural == "kafkatopics");
assert!(
crd.spec
.names
.short_names
.as_ref()
.is_some_and(|v| v.contains(&"kt".to_string())),
"expected shortname `kt`",
);
assert!(crd.spec.versions.len() == 1);
assert!(crd.spec.versions[0].name == "v1alpha1");
}
#[test]
fn spec_round_trips_through_json() {
let kt = KafkaTopic::new(
"demo-topic",
KafkaTopicSpec {
topic_name: Some("Demo.Topic".into()),
partitions: 3,
replicas: 2,
config: Some(BTreeMap::from([(
"retention.ms".to_string(),
"60000".to_string(),
)])),
preserve_topic: true,
},
);
let json = serde_json::to_string(&kt).unwrap();
assert!(json.contains("\"topicName\":\"Demo.Topic\""), "got: {json}");
assert!(json.contains("\"partitions\":3"), "got: {json}");
assert!(json.contains("\"preserveTopic\":true"), "got: {json}");
let back: KafkaTopic = serde_json::from_str(&json).unwrap();
assert!(back.spec == kt.spec);
}
#[test]
fn spec_omits_optional_fields_when_default() {
let kt = KafkaTopic::new(
"demo",
KafkaTopicSpec {
topic_name: None,
partitions: 1,
replicas: 1,
config: None,
preserve_topic: false,
},
);
let j = serde_json::to_string(&kt.spec).unwrap();
assert!(!j.contains("topicName"), "got: {j}");
assert!(!j.contains("config"), "got: {j}");
assert!(j.contains("\"preserveTopic\":false"), "got: {j}");
}
#[test]
fn status_topic_id_omitted_when_none() {
let status = KafkaTopicStatus {
conditions: vec![],
observed_generation: Some(1),
topic_name: Some("foo".into()),
topic_id: None,
};
let j = serde_json::to_string(&status).unwrap();
assert!(!j.contains("topicId"), "got: {j}");
assert!(j.contains("\"observedGeneration\":1"), "got: {j}");
}
#[test]
fn minimum_required_spec_parses() {
let json = r#"{"partitions":1,"replicas":1}"#;
let spec: KafkaTopicSpec = serde_json::from_str(json).unwrap();
assert!(spec.partitions == 1);
assert!(spec.replicas == 1);
assert!(spec.topic_name.is_none());
assert!(spec.config.is_none());
assert!(!spec.preserve_topic);
}
}