Skip to main content

crabka_operator/controller/
topic.rs

1//! `KafkaTopic` reconciler — unidirectional (CRD wins).
2//!
3//! Watches `KafkaTopic` (primary) and `Kafka` (secondary, so a cluster
4//! becoming Ready wakes pending topic reconciles). Diff-and-apply
5//! against the live cluster via `crabka_client_admin::AdminClient`.
6
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use crabka_client_admin::{CreatePartitionsOp, CreateTopicSpec, IncrementalAlterOp};
12use futures::StreamExt as _;
13use kube::api::{Api, Patch, PatchParams};
14use kube::runtime::controller::{Action, Controller};
15use kube::runtime::reflector::ObjectRef;
16use kube::runtime::watcher;
17use kube::{Resource, ResourceExt as _};
18use serde_json::json;
19
20use crate::context::Context;
21use crate::controller::common::{FIELD_MANAGER, ReconcileError, condition};
22use crate::crd::{Kafka, KafkaTopic};
23
24const FINALIZER: &str = "crabka.io/topic-finalizer";
25
26/// Run the controller forever.
27pub async fn run(ctx: Context) -> anyhow::Result<()> {
28    let topic_api: Api<KafkaTopic> = Api::all(ctx.client.clone());
29    let kafka_api: Api<Kafka> = Api::all(ctx.client.clone());
30    Controller::new(topic_api, watcher::Config::default())
31        // Kafka watch wakes the reconcile loop on cluster status changes.
32        // We return empty here (rather than listing matching topics) so
33        // the mapper stays sync-safe — listing would require an async
34        // call that the kube-rs `mapper` signature doesn't allow. The
35        // 60-second periodic requeue on each `KafkaTopic` catches the
36        // transition (matches how `kafka.rs` handles its Node watch).
37        .watches(kafka_api, watcher::Config::default(), |_kafka| {
38            Vec::<ObjectRef<KafkaTopic>>::new().into_iter()
39        })
40        .run(reconcile, error_policy, Arc::new(ctx))
41        .for_each(|res| async move {
42            match res {
43                Ok((obj, _)) => tracing::debug!(?obj, "topic reconciled"),
44                Err(e) => tracing::warn!(error = %e, "topic reconcile error"),
45            }
46        })
47        .await;
48    Ok(())
49}
50
51pub fn error_policy(_obj: Arc<KafkaTopic>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
52    tracing::warn!(error = %err, "topic reconcile error, requeueing");
53    Action::requeue(Duration::from_secs(15))
54}
55
56#[allow(clippy::too_many_lines)] // linear pipeline; extraction hurts more than helps
57pub async fn reconcile(obj: Arc<KafkaTopic>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
58    let ns = obj.namespace().unwrap_or_else(|| "default".into());
59    let name = obj.name_any();
60    let topic_api: Api<KafkaTopic> = Api::namespaced(ctx.client.clone(), &ns);
61
62    // 1. Cluster label
63    let cluster = obj
64        .meta()
65        .labels
66        .as_ref()
67        .and_then(|l| l.get("crabka.io/cluster").cloned());
68    let Some(cluster) = cluster else {
69        patch_status(
70            &topic_api,
71            &name,
72            &obj,
73            "False",
74            "MissingClusterLabel",
75            "metadata.labels[\"crabka.io/cluster\"] is required",
76            None,
77            false,
78        )
79        .await?;
80        return Ok(Action::requeue(Duration::from_mins(1)));
81    };
82
83    // 2. Effective topic name
84    let topic_name = obj.spec.topic_name.clone().unwrap_or_else(|| name.clone());
85    if let Err(msg) = validate_kafka_topic_name(&topic_name) {
86        patch_status(
87            &topic_api,
88            &name,
89            &obj,
90            "False",
91            "InvalidTopicName",
92            &msg,
93            None,
94            false,
95        )
96        .await?;
97        return Ok(Action::requeue(Duration::from_mins(5)));
98    }
99
100    // 3. Look up the Kafka + bootstrap
101    let kafka_api: Api<Kafka> = Api::namespaced(ctx.client.clone(), &ns);
102    let kafka = kafka_api.get_opt(&cluster).await?;
103    let bootstrap = kafka.as_ref().and_then(internal_listener_bootstrap);
104    let Some(bootstrap) = bootstrap else {
105        patch_status(
106            &topic_api,
107            &name,
108            &obj,
109            "False",
110            "ClusterNotReady",
111            &format!("Kafka/{cluster} not Ready or no internal listener"),
112            None,
113            false,
114        )
115        .await?;
116        return Ok(Action::requeue(Duration::from_secs(30)));
117    };
118
119    // 4. Finalizer / delete path
120    if obj.meta().deletion_timestamp.is_some() {
121        if !obj.spec.preserve_topic {
122            // Best-effort: log non-UNKNOWN_TOPIC errors but don't propagate
123            // (we want the finalizer removal to succeed even if the cluster
124            // is gone).
125            let client = ctx.admin_client_for(&cluster, &bootstrap).await;
126            if let Ok(client) = client {
127                let mut admin = client.lock().await;
128                match admin.delete_topics(&[&topic_name], 30_000).await {
129                    Ok(_) => {}
130                    Err(e) => {
131                        tracing::warn!(error = %e, %topic_name, "DeleteTopics failed during finalizer");
132                    }
133                }
134            }
135        }
136        remove_finalizer(&topic_api, &name).await?;
137        return Ok(Action::await_change());
138    }
139
140    // 5. Ensure finalizer
141    if !has_finalizer(&obj) {
142        add_finalizer(&topic_api, &name).await?;
143        return Ok(Action::requeue(Duration::ZERO)); // re-enter
144    }
145
146    // 6. Connect and fetch current state
147    let admin_handle = match ctx.admin_client_for(&cluster, &bootstrap).await {
148        Ok(h) => h,
149        Err(e) => {
150            tracing::warn!(error = %e, %cluster, "AdminClient connect failed");
151            return Ok(Action::requeue(Duration::from_secs(15)));
152        }
153    };
154    let mut admin = admin_handle.lock().await;
155
156    let md = match admin.metadata(&[&topic_name]).await {
157        Ok(m) => m,
158        Err(e) => {
159            tracing::warn!(error = %e, %topic_name, "Metadata failed");
160            let is_transport = matches!(e, crabka_client_admin::AdminError::Transport(_));
161            drop(admin);
162            if is_transport {
163                ctx.drop_admin_client(&cluster).await;
164            }
165            return Ok(Action::requeue(Duration::from_secs(15)));
166        }
167    };
168    let current = md.topics.iter().find(|t| t.name == topic_name);
169
170    // 7. Diff and apply
171    let current = match current {
172        Some(t) if t.error.is_none() => Some(t.clone()),
173        _ => None,
174    };
175    match current {
176        None => {
177            // CreateTopics
178            let outcome_vec = admin
179                .create_topics(
180                    &[CreateTopicSpec {
181                        name: topic_name.clone(),
182                        partitions: obj.spec.partitions,
183                        replicas: obj.spec.replicas,
184                        configs: obj.spec.config.clone().unwrap_or_default(),
185                    }],
186                    30_000,
187                )
188                .await;
189            let outcome = match outcome_vec {
190                Ok(mut v) => v.pop().expect("one spec → one outcome"),
191                Err(e) => {
192                    tracing::warn!(error = %e, "CreateTopics transport failure");
193                    let is_transport = matches!(e, crabka_client_admin::AdminError::Transport(_));
194                    drop(admin);
195                    if is_transport {
196                        ctx.drop_admin_client(&cluster).await;
197                    }
198                    return Ok(Action::requeue(Duration::from_secs(15)));
199                }
200            };
201            if let Some(err) = outcome.error {
202                patch_status(
203                    &topic_api,
204                    &name,
205                    &obj,
206                    "False",
207                    "BrokerError",
208                    &format!("CreateTopics: {} ({})", err.name, err.code),
209                    None,
210                    false,
211                )
212                .await?;
213                return Ok(Action::requeue(Duration::from_secs(15)));
214            }
215            patch_status(
216                &topic_api,
217                &name,
218                &obj,
219                "True",
220                "Ready",
221                "topic created",
222                outcome.topic_id.map(|u| u.to_string()),
223                true,
224            )
225            .await?;
226            Ok(Action::requeue(Duration::from_mins(1)))
227        }
228        Some(cur) => {
229            // Immutable fields
230            if cur.replication_factor != obj.spec.replicas {
231                patch_status(
232                    &topic_api,
233                    &name,
234                    &obj,
235                    "False",
236                    "ImmutableFieldChanged",
237                    "spec.replicas change requires partition reassignment",
238                    cur.topic_id.map(|u| u.to_string()),
239                    false,
240                )
241                .await?;
242                return Ok(Action::requeue(Duration::from_mins(5)));
243            }
244            if cur.partition_count > obj.spec.partitions {
245                patch_status(
246                    &topic_api,
247                    &name,
248                    &obj,
249                    "False",
250                    "ImmutableFieldChanged",
251                    "spec.partitions decrease is not supported by Kafka",
252                    cur.topic_id.map(|u| u.to_string()),
253                    false,
254                )
255                .await?;
256                return Ok(Action::requeue(Duration::from_mins(5)));
257            }
258
259            // Partition increase
260            if cur.partition_count < obj.spec.partitions {
261                let outcomes = admin
262                    .create_partitions(
263                        &[CreatePartitionsOp {
264                            name: topic_name.clone(),
265                            new_total_count: obj.spec.partitions,
266                        }],
267                        30_000,
268                    )
269                    .await;
270                match outcomes {
271                    Ok(mut v) => {
272                        let o = v.pop().expect("one op → one outcome");
273                        if let Some(err) = o.error {
274                            patch_status(
275                                &topic_api,
276                                &name,
277                                &obj,
278                                "False",
279                                "BrokerError",
280                                &format!("CreatePartitions: {} ({})", err.name, err.code),
281                                cur.topic_id.map(|u| u.to_string()),
282                                false,
283                            )
284                            .await?;
285                            return Ok(Action::requeue(Duration::from_secs(15)));
286                        }
287                    }
288                    Err(e) => {
289                        tracing::warn!(error = %e, "CreatePartitions transport failure");
290                        let is_transport =
291                            matches!(e, crabka_client_admin::AdminError::Transport(_));
292                        drop(admin);
293                        if is_transport {
294                            ctx.drop_admin_client(&cluster).await;
295                        }
296                        return Ok(Action::requeue(Duration::from_secs(15)));
297                    }
298                }
299            }
300
301            // Config diff
302            let desired = obj.spec.config.clone().unwrap_or_default();
303            let overrides = match admin.describe_configs(&[&topic_name]).await {
304                Ok(v) => v
305                    .into_iter()
306                    .next()
307                    .map(|o| o.overrides)
308                    .unwrap_or_default(),
309                Err(e) => {
310                    tracing::warn!(error = %e, "DescribeConfigs failed");
311                    let is_transport = matches!(e, crabka_client_admin::AdminError::Transport(_));
312                    drop(admin);
313                    if is_transport {
314                        ctx.drop_admin_client(&cluster).await;
315                    }
316                    return Ok(Action::requeue(Duration::from_secs(15)));
317                }
318            };
319            let ops = diff_configs(&overrides, &desired, &topic_name);
320            if !ops.is_empty() {
321                match admin.incremental_alter_configs(&ops).await {
322                    Ok(outcomes) => {
323                        if let Some(err) = outcomes.into_iter().find_map(|o| o.error) {
324                            patch_status(
325                                &topic_api,
326                                &name,
327                                &obj,
328                                "False",
329                                "BrokerError",
330                                &format!("IncrementalAlterConfigs: {} ({})", err.name, err.code),
331                                cur.topic_id.map(|u| u.to_string()),
332                                false,
333                            )
334                            .await?;
335                            return Ok(Action::requeue(Duration::from_secs(15)));
336                        }
337                    }
338                    Err(e) => {
339                        tracing::warn!(error = %e, "IncrementalAlterConfigs failure");
340                        let is_transport =
341                            matches!(e, crabka_client_admin::AdminError::Transport(_));
342                        drop(admin);
343                        if is_transport {
344                            ctx.drop_admin_client(&cluster).await;
345                        }
346                        return Ok(Action::requeue(Duration::from_secs(15)));
347                    }
348                }
349            }
350
351            patch_status(
352                &topic_api,
353                &name,
354                &obj,
355                "True",
356                "Ready",
357                "topic in sync",
358                cur.topic_id.map(|u| u.to_string()),
359                true,
360            )
361            .await?;
362            Ok(Action::requeue(Duration::from_mins(1)))
363        }
364    }
365}
366
367/// Diff `desired` against `current` overrides; produce a `Vec` of
368/// `IncrementalAlterOps`. Pure function — covered by tests below.
369pub(crate) fn diff_configs(
370    current: &BTreeMap<String, String>,
371    desired: &BTreeMap<String, String>,
372    topic: &str,
373) -> Vec<IncrementalAlterOp> {
374    let mut ops = Vec::new();
375    for (k, v) in desired {
376        if current.get(k) != Some(v) {
377            ops.push(IncrementalAlterOp::Set {
378                topic: topic.to_string(),
379                key: k.clone(),
380                value: v.clone(),
381            });
382        }
383    }
384    for k in current.keys() {
385        if !desired.contains_key(k) {
386            ops.push(IncrementalAlterOp::Delete {
387                topic: topic.to_string(),
388                key: k.clone(),
389            });
390        }
391    }
392    ops
393}
394
395/// Bootstrap address from `Kafka.status.listeners[<inter_broker>]`.
396/// Returns `None` if `Kafka.status.conditions[Ready].status != "True"`.
397pub(crate) fn internal_listener_bootstrap(kafka: &Kafka) -> Option<String> {
398    let ready_true = kafka
399        .status
400        .as_ref()
401        .and_then(|s| s.conditions.iter().find(|c| c.type_ == "Ready"))
402        .is_some_and(|c| c.status == "True");
403    if !ready_true {
404        return None;
405    }
406    let inter_broker = kafka
407        .spec
408        .inter_broker_listener_name
409        .as_deref()
410        .unwrap_or("PLAIN");
411    let listeners = &kafka.status.as_ref()?.listeners;
412    listeners
413        .iter()
414        .find(|l| l.name == inter_broker)
415        .map(|l| l.bootstrap_servers.clone())
416        .filter(|s| !s.is_empty())
417}
418
419/// Kafka topic name validation. Mirrors the JVM client's `Topic.validate`.
420pub(crate) fn validate_kafka_topic_name(name: &str) -> Result<(), String> {
421    if name.is_empty() {
422        return Err("topic name is empty".into());
423    }
424    if name.len() > 249 {
425        return Err(format!("topic name length {} exceeds 249", name.len()));
426    }
427    if name == "." || name == ".." {
428        return Err("topic name cannot be \".\" or \"..\"".into());
429    }
430    if !name
431        .chars()
432        .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-')
433    {
434        return Err(format!("topic name {name:?} contains invalid characters"));
435    }
436    Ok(())
437}
438
439fn has_finalizer(obj: &KafkaTopic) -> bool {
440    obj.meta()
441        .finalizers
442        .as_ref()
443        .is_some_and(|f| f.iter().any(|s| s == FINALIZER))
444}
445
446async fn add_finalizer(api: &Api<KafkaTopic>, name: &str) -> Result<(), ReconcileError> {
447    let patch = json!({ "metadata": { "finalizers": [FINALIZER] } });
448    let params = PatchParams {
449        field_manager: Some(FIELD_MANAGER.into()),
450        ..Default::default()
451    };
452    api.patch(name, &params, &Patch::Merge(&patch)).await?;
453    Ok(())
454}
455
456async fn remove_finalizer(api: &Api<KafkaTopic>, name: &str) -> Result<(), ReconcileError> {
457    let patch = json!({ "metadata": { "finalizers": [] } });
458    let params = PatchParams {
459        field_manager: Some(FIELD_MANAGER.into()),
460        ..Default::default()
461    };
462    api.patch(name, &params, &Patch::Merge(&patch)).await?;
463    Ok(())
464}
465
466/// Build + patch status. `advance_generation = true` writes
467/// `observedGeneration` to the current generation (only on successful
468/// True/Ready landings).
469#[allow(clippy::too_many_arguments)] // pure status helper; arity reflects the condition contract
470async fn patch_status(
471    api: &Api<KafkaTopic>,
472    name: &str,
473    obj: &KafkaTopic,
474    status: &str,
475    reason: &str,
476    message: &str,
477    topic_id: Option<String>,
478    advance_generation: bool,
479) -> Result<(), ReconcileError> {
480    let topic_name = obj
481        .spec
482        .topic_name
483        .clone()
484        .unwrap_or_else(|| name.to_string());
485    let conditions = vec![condition("Ready", status, reason, message)];
486    let observed_generation = if advance_generation {
487        obj.meta().generation
488    } else {
489        obj.status.as_ref().and_then(|s| s.observed_generation)
490    };
491
492    let body = json!({
493        "status": {
494            "conditions": conditions,
495            "observedGeneration": observed_generation,
496            "topicName": topic_name,
497            "topicId": topic_id,
498        }
499    });
500    let params = PatchParams {
501        field_manager: Some(FIELD_MANAGER.into()),
502        ..Default::default()
503    };
504    api.patch_status(name, &params, &Patch::Merge(&body))
505        .await?;
506    Ok(())
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use crate::crd::{KafkaCondition, KafkaSpec, KafkaStatus, ListenerStatus, ListenerType};
513    use assert2::assert;
514
515    fn kafka_ready(name: &str, namespace: &str, listener_port: i32) -> Kafka {
516        let mut k = Kafka::new(
517            name,
518            KafkaSpec {
519                kafka_version: "0.1.1".into(),
520                metadata_version: None,
521                config: None,
522                listeners: vec![],
523                inter_broker_listener_name: Some("PLAIN".into()),
524                metrics_config: None,
525                network_policy: None,
526                cluster_ca: None,
527                clients_ca: None,
528                logging: None,
529                delegation_token: None,
530                authorization: None,
531                tiered_storage: None,
532                inter_broker_kerberos: None,
533                krb5_conf_secret_ref: None,
534                tracing: None,
535            },
536        );
537        k.metadata.namespace = Some(namespace.into());
538        k.status = Some(KafkaStatus {
539            conditions: vec![KafkaCondition {
540                type_: "Ready".into(),
541                status: "True".into(),
542                reason: "Available".into(),
543                message: String::new(),
544                last_transition_time: "2026-05-18T00:00:00Z".into(),
545            }],
546            replicas: Some(1),
547            ready_replicas: Some(1),
548            listeners: vec![ListenerStatus {
549                name: "PLAIN".into(),
550                type_: ListenerType::Internal,
551                bootstrap_servers: format!(
552                    "{name}-broker-headless.{namespace}.svc.cluster.local:{listener_port}"
553                ),
554                addresses: vec![],
555            }],
556            cluster_ca: None,
557            clients_ca: None,
558            kafka_version: None,
559            metadata_version: None,
560        });
561        k
562    }
563
564    #[test]
565    fn validate_topic_name_accepts_typical() {
566        assert!(validate_kafka_topic_name("demo-topic").is_ok());
567        assert!(validate_kafka_topic_name("My.Topic_1").is_ok());
568    }
569
570    #[test]
571    fn validate_topic_name_rejects_empty() {
572        assert!(validate_kafka_topic_name("").is_err());
573    }
574
575    #[test]
576    fn validate_topic_name_rejects_dot_and_dotdot() {
577        assert!(validate_kafka_topic_name(".").is_err());
578        assert!(validate_kafka_topic_name("..").is_err());
579    }
580
581    #[test]
582    fn validate_topic_name_rejects_too_long() {
583        let n = "a".repeat(250);
584        assert!(validate_kafka_topic_name(&n).is_err());
585    }
586
587    #[test]
588    fn validate_topic_name_rejects_invalid_chars() {
589        assert!(validate_kafka_topic_name("has space").is_err());
590        assert!(validate_kafka_topic_name("has/slash").is_err());
591        assert!(validate_kafka_topic_name("has@at").is_err());
592    }
593
594    #[test]
595    fn diff_configs_set_adds_missing_key() {
596        let current = BTreeMap::new();
597        let desired = BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]);
598        let ops = diff_configs(&current, &desired, "foo");
599        assert!(ops.len() == 1);
600        assert!(matches!(&ops[0], IncrementalAlterOp::Set { key, value, .. }
601            if key == "retention.ms" && value == "60000"));
602    }
603
604    #[test]
605    fn diff_configs_set_updates_changed_value() {
606        let current = BTreeMap::from([("retention.ms".to_string(), "30000".to_string())]);
607        let desired = BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]);
608        let ops = diff_configs(&current, &desired, "foo");
609        assert!(ops.len() == 1);
610        assert!(matches!(&ops[0], IncrementalAlterOp::Set { value, .. } if value == "60000"));
611    }
612
613    #[test]
614    fn diff_configs_delete_removes_extra_key() {
615        let current = BTreeMap::from([("cleanup.policy".to_string(), "delete".to_string())]);
616        let desired = BTreeMap::new();
617        let ops = diff_configs(&current, &desired, "foo");
618        assert!(ops.len() == 1);
619        assert!(
620            matches!(&ops[0], IncrementalAlterOp::Delete { key, .. } if key == "cleanup.policy")
621        );
622    }
623
624    #[test]
625    fn diff_configs_noop_when_matching() {
626        let m = BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]);
627        assert!(diff_configs(&m, &m, "foo").is_empty());
628    }
629
630    #[test]
631    fn diff_configs_combines_set_and_delete() {
632        let current = BTreeMap::from([
633            ("retention.ms".to_string(), "30000".to_string()),
634            ("cleanup.policy".to_string(), "delete".to_string()),
635        ]);
636        let desired = BTreeMap::from([
637            ("retention.ms".to_string(), "60000".to_string()),
638            ("segment.bytes".to_string(), "1048576".to_string()),
639        ]);
640        let ops = diff_configs(&current, &desired, "foo");
641        assert!(
642            ops.len() == 3,
643            "expected SET(retention.ms), SET(segment.bytes), DELETE(cleanup.policy)"
644        );
645    }
646
647    #[test]
648    fn internal_listener_bootstrap_returns_listener_when_ready() {
649        let k = kafka_ready("demo", "default", 9092);
650        assert!(
651            internal_listener_bootstrap(&k).as_deref()
652                == Some("demo-broker-headless.default.svc.cluster.local:9092")
653        );
654    }
655
656    #[test]
657    fn internal_listener_bootstrap_returns_none_when_not_ready() {
658        let mut k = kafka_ready("demo", "default", 9092);
659        if let Some(s) = k.status.as_mut() {
660            s.conditions[0].status = "False".into();
661        }
662        assert!(internal_listener_bootstrap(&k).is_none());
663    }
664}