crabka_operator/crd/
topic.rs1use std::collections::BTreeMap;
5
6use kube::CustomResource;
7use schemars::JsonSchema;
8use serde::{Deserialize, Serialize};
9
10#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
11#[kube(
12 group = "crabka.io",
13 version = "v1alpha1",
14 kind = "KafkaTopic",
15 plural = "kafkatopics",
16 singular = "kafkatopic",
17 shortname = "kt",
18 namespaced,
19 status = "KafkaTopicStatus",
20 derive = "PartialEq"
21)]
22#[serde(rename_all = "camelCase")]
23pub struct KafkaTopicSpec {
24 #[serde(default, skip_serializing_if = "Option::is_none")]
28 pub topic_name: Option<String>,
29
30 #[schemars(range(min = 1, max = 1_000_000))]
33 pub partitions: i32,
34
35 #[schemars(range(min = 1, max = 1_000))]
38 pub replicas: i32,
39
40 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub config: Option<BTreeMap<String, String>>,
45
46 #[serde(default)]
50 pub preserve_topic: bool,
51}
52
53#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
54#[serde(rename_all = "camelCase")]
55pub struct KafkaTopicStatus {
56 #[serde(default)]
58 pub conditions: Vec<crate::crd::KafkaCondition>,
59
60 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub observed_generation: Option<i64>,
64
65 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub topic_name: Option<String>,
68
69 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub topic_id: Option<String>,
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use assert2::assert;
78 use kube::CustomResourceExt as _;
79
80 #[test]
81 fn crd_metadata_is_correct() {
82 let crd = KafkaTopic::crd();
83 assert!(crd.spec.group == "crabka.io");
84 assert!(crd.spec.names.kind == "KafkaTopic");
85 assert!(crd.spec.names.plural == "kafkatopics");
86 assert!(
87 crd.spec
88 .names
89 .short_names
90 .as_ref()
91 .is_some_and(|v| v.contains(&"kt".to_string())),
92 "expected shortname `kt`",
93 );
94 assert!(crd.spec.versions.len() == 1);
95 assert!(crd.spec.versions[0].name == "v1alpha1");
96 }
97
98 #[test]
99 fn spec_round_trips_through_json() {
100 let kt = KafkaTopic::new(
101 "demo-topic",
102 KafkaTopicSpec {
103 topic_name: Some("Demo.Topic".into()),
104 partitions: 3,
105 replicas: 2,
106 config: Some(BTreeMap::from([(
107 "retention.ms".to_string(),
108 "60000".to_string(),
109 )])),
110 preserve_topic: true,
111 },
112 );
113 let json = serde_json::to_string(&kt).unwrap();
114 assert!(json.contains("\"topicName\":\"Demo.Topic\""), "got: {json}");
115 assert!(json.contains("\"partitions\":3"), "got: {json}");
116 assert!(json.contains("\"preserveTopic\":true"), "got: {json}");
117 let back: KafkaTopic = serde_json::from_str(&json).unwrap();
118 assert!(back.spec == kt.spec);
119 }
120
121 #[test]
122 fn spec_omits_optional_fields_when_default() {
123 let kt = KafkaTopic::new(
124 "demo",
125 KafkaTopicSpec {
126 topic_name: None,
127 partitions: 1,
128 replicas: 1,
129 config: None,
130 preserve_topic: false,
131 },
132 );
133 let j = serde_json::to_string(&kt.spec).unwrap();
134 assert!(!j.contains("topicName"), "got: {j}");
135 assert!(!j.contains("config"), "got: {j}");
136 assert!(j.contains("\"preserveTopic\":false"), "got: {j}");
138 }
139
140 #[test]
141 fn status_topic_id_omitted_when_none() {
142 let status = KafkaTopicStatus {
143 conditions: vec![],
144 observed_generation: Some(1),
145 topic_name: Some("foo".into()),
146 topic_id: None,
147 };
148 let j = serde_json::to_string(&status).unwrap();
149 assert!(!j.contains("topicId"), "got: {j}");
150 assert!(j.contains("\"observedGeneration\":1"), "got: {j}");
151 }
152
153 #[test]
154 fn minimum_required_spec_parses() {
155 let json = r#"{"partitions":1,"replicas":1}"#;
156 let spec: KafkaTopicSpec = serde_json::from_str(json).unwrap();
157 assert!(spec.partitions == 1);
158 assert!(spec.replicas == 1);
159 assert!(spec.topic_name.is_none());
160 assert!(spec.config.is_none());
161 assert!(!spec.preserve_topic);
162 }
163}