Skip to main content

crabka_operator/crd/
topic.rs

1//! `KafkaTopic` CRD. Strimzi-shaped; unidirectional
2//! reconciliation (CRD wins).
3
4use std::collections::BTreeMap;
5
6use kube::CustomResource;
7use schemars::JsonSchema;
8use serde::{Deserialize, Serialize};
9
10#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
11#[kube(
12    group = "crabka.io",
13    version = "v1alpha1",
14    kind = "KafkaTopic",
15    plural = "kafkatopics",
16    singular = "kafkatopic",
17    shortname = "kt",
18    namespaced,
19    status = "KafkaTopicStatus",
20    derive = "PartialEq"
21)]
22#[serde(rename_all = "camelCase")]
23pub struct KafkaTopicSpec {
24    /// Optional override for the Kafka topic name. Defaults to
25    /// `metadata.name`. Validated at reconcile time against Kafka's
26    /// rules (length ≤ 249, chars `[A-Za-z0-9._-]`, not `.` or `..`).
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub topic_name: Option<String>,
29
30    /// Number of partitions. Increases honored via `CreatePartitions`;
31    /// decreases rejected with `ImmutableFieldChanged`.
32    #[schemars(range(min = 1, max = 1_000_000))]
33    pub partitions: i32,
34
35    /// Replication factor. Changes rejected with
36    /// `ImmutableFieldChanged` until partition reassignment lands.
37    #[schemars(range(min = 1, max = 1_000))]
38    pub replicas: i32,
39
40    /// Opaque topic-level config (`retention.ms`, `cleanup.policy`,
41    /// etc.). Reconciled via `IncrementalAlterConfigs` SET/DELETE diff
42    /// against the cluster's current dynamic-topic overrides.
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub config: Option<BTreeMap<String, String>>,
45
46    /// When `true`, CRD delete still removes the finalizer but skips
47    /// the `DeleteTopics` call so the Kafka topic survives. Default
48    /// `false`.
49    #[serde(default)]
50    pub preserve_topic: bool,
51}
52
53#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
54#[serde(rename_all = "camelCase")]
55pub struct KafkaTopicStatus {
56    /// Standard Kubernetes-style condition list. Surfaces `Ready`.
57    #[serde(default)]
58    pub conditions: Vec<crate::crd::KafkaCondition>,
59
60    /// `metadata.generation` of the last successfully-reconciled spec
61    /// (i.e. last time we wrote `Ready=True reason=Ready`).
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub observed_generation: Option<i64>,
64
65    /// Effective topic name (defaulted if `spec.topicName` unset).
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub topic_name: Option<String>,
68
69    /// Cluster-assigned topic UUID, populated once the topic exists.
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub topic_id: Option<String>,
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use assert2::assert;
78    use kube::CustomResourceExt as _;
79
80    #[test]
81    fn crd_metadata_is_correct() {
82        let crd = KafkaTopic::crd();
83        assert!(crd.spec.group == "crabka.io");
84        assert!(crd.spec.names.kind == "KafkaTopic");
85        assert!(crd.spec.names.plural == "kafkatopics");
86        assert!(
87            crd.spec
88                .names
89                .short_names
90                .as_ref()
91                .is_some_and(|v| v.contains(&"kt".to_string())),
92            "expected shortname `kt`",
93        );
94        assert!(crd.spec.versions.len() == 1);
95        assert!(crd.spec.versions[0].name == "v1alpha1");
96    }
97
98    #[test]
99    fn spec_round_trips_through_json() {
100        let kt = KafkaTopic::new(
101            "demo-topic",
102            KafkaTopicSpec {
103                topic_name: Some("Demo.Topic".into()),
104                partitions: 3,
105                replicas: 2,
106                config: Some(BTreeMap::from([(
107                    "retention.ms".to_string(),
108                    "60000".to_string(),
109                )])),
110                preserve_topic: true,
111            },
112        );
113        let json = serde_json::to_string(&kt).unwrap();
114        assert!(json.contains("\"topicName\":\"Demo.Topic\""), "got: {json}");
115        assert!(json.contains("\"partitions\":3"), "got: {json}");
116        assert!(json.contains("\"preserveTopic\":true"), "got: {json}");
117        let back: KafkaTopic = serde_json::from_str(&json).unwrap();
118        assert!(back.spec == kt.spec);
119    }
120
121    #[test]
122    fn spec_omits_optional_fields_when_default() {
123        let kt = KafkaTopic::new(
124            "demo",
125            KafkaTopicSpec {
126                topic_name: None,
127                partitions: 1,
128                replicas: 1,
129                config: None,
130                preserve_topic: false,
131            },
132        );
133        let j = serde_json::to_string(&kt.spec).unwrap();
134        assert!(!j.contains("topicName"), "got: {j}");
135        assert!(!j.contains("config"), "got: {j}");
136        // `preserveTopic` is a plain bool — serde emits it.
137        assert!(j.contains("\"preserveTopic\":false"), "got: {j}");
138    }
139
140    #[test]
141    fn status_topic_id_omitted_when_none() {
142        let status = KafkaTopicStatus {
143            conditions: vec![],
144            observed_generation: Some(1),
145            topic_name: Some("foo".into()),
146            topic_id: None,
147        };
148        let j = serde_json::to_string(&status).unwrap();
149        assert!(!j.contains("topicId"), "got: {j}");
150        assert!(j.contains("\"observedGeneration\":1"), "got: {j}");
151    }
152
153    #[test]
154    fn minimum_required_spec_parses() {
155        let json = r#"{"partitions":1,"replicas":1}"#;
156        let spec: KafkaTopicSpec = serde_json::from_str(json).unwrap();
157        assert!(spec.partitions == 1);
158        assert!(spec.replicas == 1);
159        assert!(spec.topic_name.is_none());
160        assert!(spec.config.is_none());
161        assert!(!spec.preserve_topic);
162    }
163}