1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use crabka_client_admin::{CreatePartitionsOp, CreateTopicSpec, IncrementalAlterOp};
12use futures::StreamExt as _;
13use kube::api::{Api, Patch, PatchParams};
14use kube::runtime::controller::{Action, Controller};
15use kube::runtime::reflector::ObjectRef;
16use kube::runtime::watcher;
17use kube::{Resource, ResourceExt as _};
18use serde_json::json;
19
20use crate::context::Context;
21use crate::controller::common::{FIELD_MANAGER, ReconcileError, condition};
22use crate::crd::{Kafka, KafkaTopic};
23
24const FINALIZER: &str = "crabka.io/topic-finalizer";
25
26pub async fn run(ctx: Context) -> anyhow::Result<()> {
28 let topic_api: Api<KafkaTopic> = Api::all(ctx.client.clone());
29 let kafka_api: Api<Kafka> = Api::all(ctx.client.clone());
30 Controller::new(topic_api, watcher::Config::default())
31 .watches(kafka_api, watcher::Config::default(), |_kafka| {
38 Vec::<ObjectRef<KafkaTopic>>::new().into_iter()
39 })
40 .run(reconcile, error_policy, Arc::new(ctx))
41 .for_each(|res| async move {
42 match res {
43 Ok((obj, _)) => tracing::debug!(?obj, "topic reconciled"),
44 Err(e) => tracing::warn!(error = %e, "topic reconcile error"),
45 }
46 })
47 .await;
48 Ok(())
49}
50
51pub fn error_policy(_obj: Arc<KafkaTopic>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
52 tracing::warn!(error = %err, "topic reconcile error, requeueing");
53 Action::requeue(Duration::from_secs(15))
54}
55
56#[allow(clippy::too_many_lines)] pub async fn reconcile(obj: Arc<KafkaTopic>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
58 let ns = obj.namespace().unwrap_or_else(|| "default".into());
59 let name = obj.name_any();
60 let topic_api: Api<KafkaTopic> = Api::namespaced(ctx.client.clone(), &ns);
61
62 let cluster = obj
64 .meta()
65 .labels
66 .as_ref()
67 .and_then(|l| l.get("crabka.io/cluster").cloned());
68 let Some(cluster) = cluster else {
69 patch_status(
70 &topic_api,
71 &name,
72 &obj,
73 "False",
74 "MissingClusterLabel",
75 "metadata.labels[\"crabka.io/cluster\"] is required",
76 None,
77 false,
78 )
79 .await?;
80 return Ok(Action::requeue(Duration::from_mins(1)));
81 };
82
83 let topic_name = obj.spec.topic_name.clone().unwrap_or_else(|| name.clone());
85 if let Err(msg) = validate_kafka_topic_name(&topic_name) {
86 patch_status(
87 &topic_api,
88 &name,
89 &obj,
90 "False",
91 "InvalidTopicName",
92 &msg,
93 None,
94 false,
95 )
96 .await?;
97 return Ok(Action::requeue(Duration::from_mins(5)));
98 }
99
100 let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
102 let kafka = kafka_api.get_opt(&cluster).await?;
103 let bootstrap = kafka.as_ref().and_then(internal_listener_bootstrap);
104 let Some(bootstrap) = bootstrap else {
105 patch_status(
106 &topic_api,
107 &name,
108 &obj,
109 "False",
110 "ClusterNotReady",
111 &format!("Kafka/{cluster} not Ready or no internal listener"),
112 None,
113 false,
114 )
115 .await?;
116 return Ok(Action::requeue(Duration::from_secs(30)));
117 };
118
119 if obj.meta().deletion_timestamp.is_some() {
121 if !obj.spec.preserve_topic {
122 let client = ctx.admin_client_for(&cluster, &bootstrap).await;
126 if let Ok(client) = client {
127 let mut admin = client.lock().await;
128 match admin.delete_topics(&[&topic_name], 30_000).await {
129 Ok(_) => {}
130 Err(e) => {
131 tracing::warn!(error = %e, %topic_name, "DeleteTopics failed during finalizer");
132 }
133 }
134 }
135 }
136 remove_finalizer(&topic_api, &name).await?;
137 return Ok(Action::await_change());
138 }
139
140 if !has_finalizer(&obj) {
142 add_finalizer(&topic_api, &name).await?;
143 return Ok(Action::requeue(Duration::ZERO)); }
145
146 let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
148 Ok(h) => h,
149 Err(e) => {
150 tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
151 return Ok(Action::requeue(Duration::from_secs(15)));
152 }
153 };
154 let mut admin = admin_handle.lock().await;
155
156 let md = match admin.metadata(&[&topic_name]).await {
157 Ok(m) => m,
158 Err(e) => {
159 tracing::warn!(error = %e, %topic_name, "Metadata failed");
160 let is_transport = matches!(e, crabka_client_admin::AdminError::Transport(_));
161 drop(admin);
162 if is_transport {
163 ctx.drop_admin_client(&cluster).await;
164 }
165 return Ok(Action::requeue(Duration::from_secs(15)));
166 }
167 };
168 let current = md.topics.iter().find(|t| t.name == topic_name);
169
170 let current = match current {
172 Some(t) if t.error.is_none() => Some(t.clone()),
173 _ => None,
174 };
175 match current {
176 None => {
177 let outcome_vec = admin
179 .create_topics(
180 &[CreateTopicSpec {
181 name: topic_name.clone(),
182 partitions: obj.spec.partitions,
183 replicas: obj.spec.replicas,
184 configs: obj.spec.config.clone().unwrap_or_default(),
185 }],
186 30_000,
187 )
188 .await;
189 let outcome = match outcome_vec {
190 Ok(mut v) => v.pop().expect("one spec → one outcome"),
191 Err(e) => {
192 tracing::warn!(error = %e, "CreateTopics transport failure");
193 let is_transport = matches!(e, crabka_client_admin::AdminError::Transport(_));
194 drop(admin);
195 if is_transport {
196 ctx.drop_admin_client(&cluster).await;
197 }
198 return Ok(Action::requeue(Duration::from_secs(15)));
199 }
200 };
201 if let Some(err) = outcome.error {
202 patch_status(
203 &topic_api,
204 &name,
205 &obj,
206 "False",
207 "BrokerError",
208 &format!("CreateTopics: {} ({})", err.name, err.code),
209 None,
210 false,
211 )
212 .await?;
213 return Ok(Action::requeue(Duration::from_secs(15)));
214 }
215 patch_status(
216 &topic_api,
217 &name,
218 &obj,
219 "True",
220 "Ready",
221 "topic created",
222 outcome.topic_id.map(|u| u.to_string()),
223 true,
224 )
225 .await?;
226 Ok(Action::requeue(Duration::from_mins(1)))
227 }
228 Some(cur) => {
229 if cur.replication_factor != obj.spec.replicas {
231 patch_status(
232 &topic_api,
233 &name,
234 &obj,
235 "False",
236 "ImmutableFieldChanged",
237 "spec.replicas change requires partition reassignment",
238 cur.topic_id.map(|u| u.to_string()),
239 false,
240 )
241 .await?;
242 return Ok(Action::requeue(Duration::from_mins(5)));
243 }
244 if cur.partition_count > obj.spec.partitions {
245 patch_status(
246 &topic_api,
247 &name,
248 &obj,
249 "False",
250 "ImmutableFieldChanged",
251 "spec.partitions decrease is not supported by Kafka",
252 cur.topic_id.map(|u| u.to_string()),
253 false,
254 )
255 .await?;
256 return Ok(Action::requeue(Duration::from_mins(5)));
257 }
258
259 if cur.partition_count < obj.spec.partitions {
261 let outcomes = admin
262 .create_partitions(
263 &[CreatePartitionsOp {
264 name: topic_name.clone(),
265 new_total_count: obj.spec.partitions,
266 }],
267 30_000,
268 )
269 .await;
270 match outcomes {
271 Ok(mut v) => {
272 let o = v.pop().expect("one op → one outcome");
273 if let Some(err) = o.error {
274 patch_status(
275 &topic_api,
276 &name,
277 &obj,
278 "False",
279 "BrokerError",
280 &format!("CreatePartitions: {} ({})", err.name, err.code),
281 cur.topic_id.map(|u| u.to_string()),
282 false,
283 )
284 .await?;
285 return Ok(Action::requeue(Duration::from_secs(15)));
286 }
287 }
288 Err(e) => {
289 tracing::warn!(error = %e, "CreatePartitions transport failure");
290 let is_transport =
291 matches!(e, crabka_client_admin::AdminError::Transport(_));
292 drop(admin);
293 if is_transport {
294 ctx.drop_admin_client(&cluster).await;
295 }
296 return Ok(Action::requeue(Duration::from_secs(15)));
297 }
298 }
299 }
300
301 let desired = obj.spec.config.clone().unwrap_or_default();
303 let overrides = match admin.describe_configs(&[&topic_name]).await {
304 Ok(v) => v
305 .into_iter()
306 .next()
307 .map(|o| o.overrides)
308 .unwrap_or_default(),
309 Err(e) => {
310 tracing::warn!(error = %e, "DescribeConfigs failed");
311 let is_transport = matches!(e, crabka_client_admin::AdminError::Transport(_));
312 drop(admin);
313 if is_transport {
314 ctx.drop_admin_client(&cluster).await;
315 }
316 return Ok(Action::requeue(Duration::from_secs(15)));
317 }
318 };
319 let ops = diff_configs(&overrides, &desired, &topic_name);
320 if !ops.is_empty() {
321 match admin.incremental_alter_configs(&ops).await {
322 Ok(outcomes) => {
323 if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
324 patch_status(
325 &topic_api,
326 &name,
327 &obj,
328 "False",
329 "BrokerError",
330 &format!("IncrementalAlterConfigs: {} ({})", err.name, err.code),
331 cur.topic_id.map(|u| u.to_string()),
332 false,
333 )
334 .await?;
335 return Ok(Action::requeue(Duration::from_secs(15)));
336 }
337 }
338 Err(e) => {
339 tracing::warn!(error = %e, "IncrementalAlterConfigs failure");
340 let is_transport =
341 matches!(e, crabka_client_admin::AdminError::Transport(_));
342 drop(admin);
343 if is_transport {
344 ctx.drop_admin_client(&cluster).await;
345 }
346 return Ok(Action::requeue(Duration::from_secs(15)));
347 }
348 }
349 }
350
351 patch_status(
352 &topic_api,
353 &name,
354 &obj,
355 "True",
356 "Ready",
357 "topic in sync",
358 cur.topic_id.map(|u| u.to_string()),
359 true,
360 )
361 .await?;
362 Ok(Action::requeue(Duration::from_mins(1)))
363 }
364 }
365}
366
367pub(crate) fn diff_configs(
370 current: &BTreeMap<String, String>,
371 desired: &BTreeMap<String, String>,
372 topic: &str,
373) -> Vec<IncrementalAlterOp> {
374 let mut ops = Vec::new();
375 for (k, v) in desired {
376 if current.get(k) != Some(v) {
377 ops.push(IncrementalAlterOp::Set {
378 topic: topic.to_string(),
379 key: k.clone(),
380 value: v.clone(),
381 });
382 }
383 }
384 for k in current.keys() {
385 if !desired.contains_key(k) {
386 ops.push(IncrementalAlterOp::Delete {
387 topic: topic.to_string(),
388 key: k.clone(),
389 });
390 }
391 }
392 ops
393}
394
395pub(crate) fn internal_listener_bootstrap(kafka: &Kafka) -> Option<String> {
398 let ready_true = kafka
399 .status
400 .as_ref()
401 .and_then(|s| s.conditions.iter().find(|c| c.type_ == "Ready"))
402 .is_some_and(|c| c.status == "True");
403 if !ready_true {
404 return None;
405 }
406 let inter_broker = kafka
407 .spec
408 .inter_broker_listener_name
409 .as_deref()
410 .unwrap_or("PLAIN");
411 let listeners = &kafka.status.as_ref()?.listeners;
412 listeners
413 .iter()
414 .find(|l| l.name == inter_broker)
415 .map(|l| l.bootstrap_servers.clone())
416 .filter(|s| !s.is_empty())
417}
418
419pub(crate) fn validate_kafka_topic_name(name: &str) -> Result<(), String> {
421 if name.is_empty() {
422 return Err("topic name is empty".into());
423 }
424 if name.len() > 249 {
425 return Err(format!("topic name length {} exceeds 249", name.len()));
426 }
427 if name == "." || name == ".." {
428 return Err("topic name cannot be \".\" or \"..\"".into());
429 }
430 if !name
431 .chars()
432 .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-')
433 {
434 return Err(format!("topic name {name:?} contains invalid characters"));
435 }
436 Ok(())
437}
438
439fn has_finalizer(obj: &KafkaTopic) -> bool {
440 obj.meta()
441 .finalizers
442 .as_ref()
443 .is_some_and(|f| f.iter().any(|s| s == FINALIZER))
444}
445
446async fn add_finalizer(api: &Api<KafkaTopic>, name: &str) -> Result<(), ReconcileError> {
447 let patch = json!({ "metadata": { "finalizers": [FINALIZER] } });
448 let params = PatchParams {
449 field_manager: Some(FIELD_MANAGER.into()),
450 ..Default::default()
451 };
452 api.patch(name, ¶ms, &Patch::Merge(&patch)).await?;
453 Ok(())
454}
455
456async fn remove_finalizer(api: &Api<KafkaTopic>, name: &str) -> Result<(), ReconcileError> {
457 let patch = json!({ "metadata": { "finalizers": [] } });
458 let params = PatchParams {
459 field_manager: Some(FIELD_MANAGER.into()),
460 ..Default::default()
461 };
462 api.patch(name, ¶ms, &Patch::Merge(&patch)).await?;
463 Ok(())
464}
465
466#[allow(clippy::too_many_arguments)] async fn patch_status(
471 api: &Api<KafkaTopic>,
472 name: &str,
473 obj: &KafkaTopic,
474 status: &str,
475 reason: &str,
476 message: &str,
477 topic_id: Option<String>,
478 advance_generation: bool,
479) -> Result<(), ReconcileError> {
480 let topic_name = obj
481 .spec
482 .topic_name
483 .clone()
484 .unwrap_or_else(|| name.to_string());
485 let conditions = vec![condition("Ready", status, reason, message)];
486 let observed_generation = if advance_generation {
487 obj.meta().generation
488 } else {
489 obj.status.as_ref().and_then(|s| s.observed_generation)
490 };
491
492 let body = json!({
493 "status": {
494 "conditions": conditions,
495 "observedGeneration": observed_generation,
496 "topicName": topic_name,
497 "topicId": topic_id,
498 }
499 });
500 let params = PatchParams {
501 field_manager: Some(FIELD_MANAGER.into()),
502 ..Default::default()
503 };
504 api.patch_status(name, ¶ms, &Patch::Merge(&body))
505 .await?;
506 Ok(())
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use crate::crd::{KafkaCondition, KafkaSpec, KafkaStatus, ListenerStatus, ListenerType};
513 use assert2::assert;
514
515 fn kafka_ready(name: &str, namespace: &str, listener_port: i32) -> Kafka {
516 let mut k = Kafka::new(
517 name,
518 KafkaSpec {
519 kafka_version: "0.1.1".into(),
520 metadata_version: None,
521 config: None,
522 listeners: vec![],
523 inter_broker_listener_name: Some("PLAIN".into()),
524 metrics_config: None,
525 network_policy: None,
526 cluster_ca: None,
527 clients_ca: None,
528 logging: None,
529 delegation_token: None,
530 authorization: None,
531 tiered_storage: None,
532 inter_broker_kerberos: None,
533 krb5_conf_secret_ref: None,
534 tracing: None,
535 },
536 );
537 k.metadata.namespace = Some(namespace.into());
538 k.status = Some(KafkaStatus {
539 conditions: vec![KafkaCondition {
540 type_: "Ready".into(),
541 status: "True".into(),
542 reason: "Available".into(),
543 message: String::new(),
544 last_transition_time: "2026-05-18T00:00:00Z".into(),
545 }],
546 replicas: Some(1),
547 ready_replicas: Some(1),
548 listeners: vec![ListenerStatus {
549 name: "PLAIN".into(),
550 type_: ListenerType::Internal,
551 bootstrap_servers: format!(
552 "{name}-broker-headless.{namespace}.svc.cluster.local:{listener_port}"
553 ),
554 addresses: vec![],
555 }],
556 cluster_ca: None,
557 clients_ca: None,
558 kafka_version: None,
559 metadata_version: None,
560 });
561 k
562 }
563
564 #[test]
565 fn validate_topic_name_accepts_typical() {
566 assert!(validate_kafka_topic_name("demo-topic").is_ok());
567 assert!(validate_kafka_topic_name("My.Topic_1").is_ok());
568 }
569
570 #[test]
571 fn validate_topic_name_rejects_empty() {
572 assert!(validate_kafka_topic_name("").is_err());
573 }
574
575 #[test]
576 fn validate_topic_name_rejects_dot_and_dotdot() {
577 assert!(validate_kafka_topic_name(".").is_err());
578 assert!(validate_kafka_topic_name("..").is_err());
579 }
580
581 #[test]
582 fn validate_topic_name_rejects_too_long() {
583 let n = "a".repeat(250);
584 assert!(validate_kafka_topic_name(&n).is_err());
585 }
586
587 #[test]
588 fn validate_topic_name_rejects_invalid_chars() {
589 assert!(validate_kafka_topic_name("has space").is_err());
590 assert!(validate_kafka_topic_name("has/slash").is_err());
591 assert!(validate_kafka_topic_name("has@at").is_err());
592 }
593
594 #[test]
595 fn diff_configs_set_adds_missing_key() {
596 let current = BTreeMap::new();
597 let desired = BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]);
598 let ops = diff_configs(¤t, &desired, "foo");
599 assert!(ops.len() == 1);
600 assert!(matches!(&ops[0], IncrementalAlterOp::Set { key, value, .. }
601 if key == "retention.ms" && value == "60000"));
602 }
603
604 #[test]
605 fn diff_configs_set_updates_changed_value() {
606 let current = BTreeMap::from([("retention.ms".to_string(), "30000".to_string())]);
607 let desired = BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]);
608 let ops = diff_configs(¤t, &desired, "foo");
609 assert!(ops.len() == 1);
610 assert!(matches!(&ops[0], IncrementalAlterOp::Set { value, .. } if value == "60000"));
611 }
612
613 #[test]
614 fn diff_configs_delete_removes_extra_key() {
615 let current = BTreeMap::from([("cleanup.policy".to_string(), "delete".to_string())]);
616 let desired = BTreeMap::new();
617 let ops = diff_configs(¤t, &desired, "foo");
618 assert!(ops.len() == 1);
619 assert!(
620 matches!(&ops[0], IncrementalAlterOp::Delete { key, .. } if key == "cleanup.policy")
621 );
622 }
623
624 #[test]
625 fn diff_configs_noop_when_matching() {
626 let m = BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]);
627 assert!(diff_configs(&m, &m, "foo").is_empty());
628 }
629
630 #[test]
631 fn diff_configs_combines_set_and_delete() {
632 let current = BTreeMap::from([
633 ("retention.ms".to_string(), "30000".to_string()),
634 ("cleanup.policy".to_string(), "delete".to_string()),
635 ]);
636 let desired = BTreeMap::from([
637 ("retention.ms".to_string(), "60000".to_string()),
638 ("segment.bytes".to_string(), "1048576".to_string()),
639 ]);
640 let ops = diff_configs(¤t, &desired, "foo");
641 assert!(
642 ops.len() == 3,
643 "expected SET(retention.ms), SET(segment.bytes), DELETE(cleanup.policy)"
644 );
645 }
646
647 #[test]
648 fn internal_listener_bootstrap_returns_listener_when_ready() {
649 let k = kafka_ready("demo", "default", 9092);
650 assert!(
651 internal_listener_bootstrap(&k).as_deref()
652 == Some("demo-broker-headless.default.svc.cluster.local:9092")
653 );
654 }
655
656 #[test]
657 fn internal_listener_bootstrap_returns_none_when_not_ready() {
658 let mut k = kafka_ready("demo", "default", 9092);
659 if let Some(s) = k.status.as_mut() {
660 s.conditions[0].status = "False".into();
661 }
662 assert!(internal_listener_bootstrap(&k).is_none());
663 }
664}