Skip to main content

rivven_operator/
topic_controller.rs

1//! RivvenTopic Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenTopic
4//! custom resources. It watches for changes and reconciles topic state with
5//! the Rivven cluster.
6
7use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
8use crate::error::{OperatorError, Result};
9use chrono::Utc;
10use futures::StreamExt;
11use kube::api::{Api, Patch, PatchParams};
12use kube::runtime::controller::{Action, Controller};
13use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
14use kube::runtime::watcher::Config;
15use kube::{Client, ResourceExt};
16use std::sync::Arc;
17use std::time::Duration;
18use tracing::{debug, error, info, instrument, warn};
19use validator::Validate;
20
21/// Finalizer name for topic cleanup
22pub const TOPIC_FINALIZER: &str = "rivven.io/topic-finalizer";
23
24/// Default requeue interval for successful reconciliations
25const DEFAULT_REQUEUE_SECONDS: u64 = 120; // 2 minutes
26
27/// Requeue interval for error cases
28const ERROR_REQUEUE_SECONDS: u64 = 30;
29
30/// Context passed to the topic controller
31pub struct TopicControllerContext {
32    /// Kubernetes client
33    pub client: Client,
34    /// Metrics recorder
35    pub metrics: Option<TopicControllerMetrics>,
36}
37
38/// Metrics for the topic controller
39#[derive(Clone)]
40pub struct TopicControllerMetrics {
41    /// Counter for reconciliation attempts
42    pub reconciliations: metrics::Counter,
43    /// Counter for reconciliation errors
44    pub errors: metrics::Counter,
45    /// Histogram for reconciliation duration
46    pub duration: metrics::Histogram,
47    /// Gauge for total managed topics
48    pub topics_total: metrics::Gauge,
49}
50
51impl TopicControllerMetrics {
52    /// Create new topic controller metrics
53    pub fn new() -> Self {
54        Self {
55            reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
56            errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
57            duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
58            topics_total: metrics::gauge!("rivven_topic_managed_total"),
59        }
60    }
61}
62
63impl Default for TopicControllerMetrics {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69/// Start the RivvenTopic controller
70pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
71    let topics: Api<RivvenTopic> = match &namespace {
72        Some(ns) => Api::namespaced(client.clone(), ns),
73        None => Api::all(client.clone()),
74    };
75
76    let ctx = Arc::new(TopicControllerContext {
77        client: client.clone(),
78        metrics: Some(TopicControllerMetrics::new()),
79    });
80
81    info!(
82        namespace = namespace.as_deref().unwrap_or("all"),
83        "Starting RivvenTopic controller"
84    );
85
86    Controller::new(topics.clone(), Config::default())
87        .run(reconcile_topic, topic_error_policy, ctx)
88        .for_each(|result| async move {
89            match result {
90                Ok((obj, action)) => {
91                    debug!(
92                        name = obj.name,
93                        namespace = obj.namespace,
94                        ?action,
95                        "Topic reconciliation completed"
96                    );
97                }
98                Err(e) => {
99                    error!(error = %e, "Topic reconciliation failed");
100                }
101            }
102        })
103        .await;
104
105    Ok(())
106}
107
108/// Main reconciliation function for RivvenTopic
109#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
110async fn reconcile_topic(
111    topic: Arc<RivvenTopic>,
112    ctx: Arc<TopicControllerContext>,
113) -> Result<Action> {
114    let start = std::time::Instant::now();
115
116    if let Some(ref metrics) = ctx.metrics {
117        metrics.reconciliations.increment(1);
118    }
119
120    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
121    let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
122
123    let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
124        match event {
125            FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
126            FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
127        }
128    })
129    .await;
130
131    if let Some(ref metrics) = ctx.metrics {
132        metrics.duration.record(start.elapsed().as_secs_f64());
133    }
134
135    result.map_err(|e| {
136        if let Some(ref metrics) = ctx.metrics {
137            metrics.errors.increment(1);
138        }
139        OperatorError::ReconcileFailed(e.to_string())
140    })
141}
142
143/// Apply (create/update) the topic
144#[instrument(skip(topic, ctx))]
145async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
146    let name = topic.name_any();
147    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
148
149    info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
150
151    // Validate the topic spec
152    if let Err(errors) = topic.spec.validate() {
153        let error_messages: Vec<String> = errors
154            .field_errors()
155            .iter()
156            .flat_map(|(field, errs)| {
157                errs.iter()
158                    .map(move |e| format!("{}: {:?}", field, e.message))
159            })
160            .collect();
161        let error_msg = error_messages.join("; ");
162        warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
163
164        update_topic_status(
165            &ctx.client,
166            &namespace,
167            &name,
168            build_failed_status(&topic, &error_msg),
169        )
170        .await?;
171
172        return Err(OperatorError::InvalidConfig(error_msg));
173    }
174
175    // Verify cluster reference exists
176    let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
177
178    // Get broker endpoints from cluster status
179    let broker_endpoints = get_cluster_endpoints(&cluster);
180
181    if broker_endpoints.is_empty() {
182        warn!(name = %name, "Cluster has no ready broker endpoints");
183        update_topic_status(
184            &ctx.client,
185            &namespace,
186            &name,
187            build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
188        )
189        .await?;
190        return Ok(Action::requeue(Duration::from_secs(30)));
191    }
192
193    // Reconcile the topic with Rivven cluster
194    let topic_result = reconcile_topic_with_cluster(&topic, &broker_endpoints).await;
195
196    match topic_result {
197        Ok(topic_info) => {
198            // Update status to ready
199            let status = build_ready_status(&topic, topic_info);
200            update_topic_status(&ctx.client, &namespace, &name, status).await?;
201            info!(name = %name, "Topic reconciliation complete");
202            Ok(Action::requeue(Duration::from_secs(
203                DEFAULT_REQUEUE_SECONDS,
204            )))
205        }
206        Err(e) => {
207            // Update status with error
208            let status = build_error_status(&topic, &e.to_string());
209            update_topic_status(&ctx.client, &namespace, &name, status).await?;
210            warn!(name = %name, error = %e, "Topic reconciliation failed");
211            Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
212        }
213    }
214}
215
216/// Verify the referenced RivvenCluster exists and return it
217async fn verify_cluster_ref(
218    client: &Client,
219    namespace: &str,
220    cluster_ref: &ClusterReference,
221) -> Result<crate::crd::RivvenCluster> {
222    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
223    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
224
225    match clusters.get(&cluster_ref.name).await {
226        Ok(cluster) => Ok(cluster),
227        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
228            format!("{}/{}", cluster_ns, cluster_ref.name),
229        )),
230        Err(e) => Err(OperatorError::from(e)),
231    }
232}
233
234/// Get broker endpoints from cluster status
235fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
236    cluster
237        .status
238        .as_ref()
239        .map(|s| s.broker_endpoints.clone())
240        .unwrap_or_default()
241}
242
243/// Information about a reconciled topic
244struct TopicInfo {
245    /// Actual partition count
246    partitions: i32,
247    /// Actual replication factor
248    replication_factor: i32,
249    /// Whether the topic already existed
250    existed: bool,
251    /// Partition info (leaders, replicas, ISR)
252    partition_info: Vec<PartitionInfo>,
253}
254
255/// Reconcile topic state with the Rivven cluster
256///
257/// This connects to the cluster and ensures the topic exists with the correct configuration.
258async fn reconcile_topic_with_cluster(
259    topic: &RivvenTopic,
260    broker_endpoints: &[String],
261) -> Result<TopicInfo> {
262    let topic_name = topic.name_any();
263    let spec = &topic.spec;
264
265    // In a real implementation, this would use the Rivven client to:
266    // 1. Check if topic exists
267    // 2. Create topic if it doesn't exist
268    // 3. Update topic config if it exists and config changed
269    // 4. Update ACLs
270    // 5. Return current topic state
271
272    // For now, simulate successful topic creation/verification
273    // TODO: Integrate with rivven-client when admin API is available
274
275    info!(
276        topic = %topic_name,
277        partitions = spec.partitions,
278        replication = spec.replication_factor,
279        brokers = ?broker_endpoints,
280        "Reconciling topic with cluster"
281    );
282
283    // Simulate topic info - in production, this would come from the cluster
284    let partition_info: Vec<PartitionInfo> = (0..spec.partitions)
285        .map(|i| PartitionInfo {
286            partition: i,
287            leader: (i % 3), // Simulated leader distribution
288            replicas: (0..spec.replication_factor).collect(),
289            isr: (0..spec.replication_factor).collect(),
290        })
291        .collect();
292
293    Ok(TopicInfo {
294        partitions: spec.partitions,
295        replication_factor: spec.replication_factor,
296        existed: false, // Would be true if topic already existed
297        partition_info,
298    })
299}
300
301/// Build a ready status
302fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
303    let now = Utc::now().to_rfc3339();
304
305    let conditions = vec![
306        TopicCondition {
307            r#type: "Ready".to_string(),
308            status: "True".to_string(),
309            reason: "TopicReady".to_string(),
310            message: "Topic is ready and serving traffic".to_string(),
311            last_transition_time: now.clone(),
312        },
313        TopicCondition {
314            r#type: "Synced".to_string(),
315            status: "True".to_string(),
316            reason: "SyncSucceeded".to_string(),
317            message: if info.existed {
318                "Topic configuration synchronized".to_string()
319            } else {
320                "Topic created successfully".to_string()
321            },
322            last_transition_time: now.clone(),
323        },
324        TopicCondition {
325            r#type: "ConfigApplied".to_string(),
326            status: "True".to_string(),
327            reason: "ConfigApplied".to_string(),
328            message: "Topic configuration applied".to_string(),
329            last_transition_time: now.clone(),
330        },
331        TopicCondition {
332            r#type: "ACLsApplied".to_string(),
333            status: if topic.spec.acls.is_empty() {
334                "N/A".to_string()
335            } else {
336                "True".to_string()
337            },
338            reason: if topic.spec.acls.is_empty() {
339                "NoACLs".to_string()
340            } else {
341                "ACLsApplied".to_string()
342            },
343            message: if topic.spec.acls.is_empty() {
344                "No ACLs configured".to_string()
345            } else {
346                format!("{} ACL entries applied", topic.spec.acls.len())
347            },
348            last_transition_time: now.clone(),
349        },
350    ];
351
352    RivvenTopicStatus {
353        phase: "Ready".to_string(),
354        message: "Topic is ready".to_string(),
355        current_partitions: info.partitions,
356        current_replication_factor: info.replication_factor,
357        topic_exists: true,
358        observed_generation: topic.metadata.generation.unwrap_or(0),
359        conditions,
360        last_sync_time: Some(now),
361        partition_info: info.partition_info,
362    }
363}
364
365/// Build a pending status
366fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
367    let now = Utc::now().to_rfc3339();
368
369    RivvenTopicStatus {
370        phase: "Pending".to_string(),
371        message: message.to_string(),
372        current_partitions: 0,
373        current_replication_factor: 0,
374        topic_exists: false,
375        observed_generation: topic.metadata.generation.unwrap_or(0),
376        conditions: vec![TopicCondition {
377            r#type: "Ready".to_string(),
378            status: "False".to_string(),
379            reason: "Pending".to_string(),
380            message: message.to_string(),
381            last_transition_time: now.clone(),
382        }],
383        last_sync_time: Some(now),
384        partition_info: vec![],
385    }
386}
387
388/// Build a failed status
389fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
390    let now = Utc::now().to_rfc3339();
391
392    RivvenTopicStatus {
393        phase: "Failed".to_string(),
394        message: error_msg.to_string(),
395        current_partitions: 0,
396        current_replication_factor: 0,
397        topic_exists: false,
398        observed_generation: topic.metadata.generation.unwrap_or(0),
399        conditions: vec![TopicCondition {
400            r#type: "Ready".to_string(),
401            status: "False".to_string(),
402            reason: "ValidationFailed".to_string(),
403            message: error_msg.to_string(),
404            last_transition_time: now.clone(),
405        }],
406        last_sync_time: Some(now),
407        partition_info: vec![],
408    }
409}
410
411/// Build an error status (reconciliation failed)
412fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
413    let now = Utc::now().to_rfc3339();
414
415    // Preserve existing status where possible
416    let existing_status = topic.status.clone().unwrap_or_default();
417
418    RivvenTopicStatus {
419        phase: "Error".to_string(),
420        message: error_msg.to_string(),
421        current_partitions: existing_status.current_partitions,
422        current_replication_factor: existing_status.current_replication_factor,
423        topic_exists: existing_status.topic_exists,
424        observed_generation: topic.metadata.generation.unwrap_or(0),
425        conditions: vec![
426            TopicCondition {
427                r#type: "Ready".to_string(),
428                status: if existing_status.topic_exists {
429                    "True".to_string()
430                } else {
431                    "False".to_string()
432                },
433                reason: "ReconcileError".to_string(),
434                message: error_msg.to_string(),
435                last_transition_time: now.clone(),
436            },
437            TopicCondition {
438                r#type: "Synced".to_string(),
439                status: "False".to_string(),
440                reason: "SyncFailed".to_string(),
441                message: error_msg.to_string(),
442                last_transition_time: now.clone(),
443            },
444        ],
445        last_sync_time: Some(now),
446        partition_info: existing_status.partition_info,
447    }
448}
449
450/// Update the topic status subresource
451async fn update_topic_status(
452    client: &Client,
453    namespace: &str,
454    name: &str,
455    status: RivvenTopicStatus,
456) -> Result<()> {
457    let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
458
459    debug!(name = %name, phase = %status.phase, "Updating topic status");
460
461    let patch = serde_json::json!({
462        "status": status
463    });
464
465    let patch_params = PatchParams::default();
466    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
467        .await
468        .map_err(OperatorError::from)?;
469
470    Ok(())
471}
472
473/// Cleanup resources when topic is deleted
474#[instrument(skip(topic, ctx))]
475async fn cleanup_topic(
476    topic: Arc<RivvenTopic>,
477    ctx: Arc<TopicControllerContext>,
478) -> Result<Action> {
479    let name = topic.name_any();
480    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
481
482    info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
483
484    // Check if we should delete the topic from the cluster
485    if topic.spec.delete_on_remove {
486        info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
487
488        // Verify cluster exists
489        match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
490            Ok(cluster) => {
491                let endpoints = get_cluster_endpoints(&cluster);
492                if !endpoints.is_empty() {
493                    // Delete topic from cluster
494                    if let Err(e) = delete_topic_from_cluster(&name, &endpoints).await {
495                        warn!(
496                            name = %name,
497                            error = %e,
498                            "Failed to delete topic from cluster (may not exist)"
499                        );
500                    }
501                }
502            }
503            Err(e) => {
504                warn!(
505                    name = %name,
506                    error = %e,
507                    "Cluster not found during cleanup, topic may be orphaned"
508                );
509            }
510        }
511    } else {
512        info!(
513            name = %name,
514            "delete_on_remove is false, topic will remain in cluster"
515        );
516    }
517
518    info!(name = %name, "Topic cleanup complete");
519
520    Ok(Action::await_change())
521}
522
523/// Delete a topic from the Rivven cluster
524async fn delete_topic_from_cluster(topic_name: &str, _broker_endpoints: &[String]) -> Result<()> {
525    // TODO: Integrate with rivven-client admin API
526    info!(topic = %topic_name, "Would delete topic from cluster");
527    Ok(())
528}
529
530/// Error policy for the topic controller
531fn topic_error_policy(
532    _topic: Arc<RivvenTopic>,
533    error: &OperatorError,
534    _ctx: Arc<TopicControllerContext>,
535) -> Action {
536    warn!(
537        error = %error,
538        "Topic reconciliation error, will retry"
539    );
540
541    let delay = error
542        .requeue_delay()
543        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
544
545    Action::requeue(delay)
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
552    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
553    use std::collections::BTreeMap;
554
555    fn create_test_topic() -> RivvenTopic {
556        RivvenTopic {
557            metadata: ObjectMeta {
558                name: Some("test-topic".to_string()),
559                namespace: Some("default".to_string()),
560                uid: Some("test-uid".to_string()),
561                generation: Some(1),
562                ..Default::default()
563            },
564            spec: RivvenTopicSpec {
565                cluster_ref: ClusterReference {
566                    name: "test-cluster".to_string(),
567                    namespace: None,
568                },
569                partitions: 6,
570                replication_factor: 3,
571                config: TopicConfig::default(),
572                acls: vec![],
573                delete_on_remove: true,
574                topic_labels: BTreeMap::new(),
575            },
576            status: None,
577        }
578    }
579
580    fn create_test_topic_with_acls() -> RivvenTopic {
581        let mut topic = create_test_topic();
582        topic.spec.acls = vec![
583            TopicAcl {
584                principal: "user:app1".to_string(),
585                operations: vec!["Read".to_string(), "Write".to_string()],
586                permission_type: "Allow".to_string(),
587                host: "*".to_string(),
588            },
589            TopicAcl {
590                principal: "user:analytics".to_string(),
591                operations: vec!["Read".to_string()],
592                permission_type: "Allow".to_string(),
593                host: "*".to_string(),
594            },
595        ];
596        topic
597    }
598
599    #[test]
600    fn test_build_ready_status() {
601        let topic = create_test_topic();
602        let info = TopicInfo {
603            partitions: 6,
604            replication_factor: 3,
605            existed: false,
606            partition_info: vec![PartitionInfo {
607                partition: 0,
608                leader: 0,
609                replicas: vec![0, 1, 2],
610                isr: vec![0, 1, 2],
611            }],
612        };
613
614        let status = build_ready_status(&topic, info);
615
616        assert_eq!(status.phase, "Ready");
617        assert_eq!(status.current_partitions, 6);
618        assert_eq!(status.current_replication_factor, 3);
619        assert!(status.topic_exists);
620        assert_eq!(status.conditions.len(), 4);
621
622        let ready_cond = status
623            .conditions
624            .iter()
625            .find(|c| c.r#type == "Ready")
626            .unwrap();
627        assert_eq!(ready_cond.status, "True");
628    }
629
630    #[test]
631    fn test_build_ready_status_with_acls() {
632        let topic = create_test_topic_with_acls();
633        let info = TopicInfo {
634            partitions: 6,
635            replication_factor: 3,
636            existed: true,
637            partition_info: vec![],
638        };
639
640        let status = build_ready_status(&topic, info);
641
642        let acl_cond = status
643            .conditions
644            .iter()
645            .find(|c| c.r#type == "ACLsApplied")
646            .unwrap();
647        assert_eq!(acl_cond.status, "True");
648        assert!(acl_cond.message.contains("2 ACL entries"));
649    }
650
651    #[test]
652    fn test_build_pending_status() {
653        let topic = create_test_topic();
654        let status = build_pending_status(&topic, "Waiting for cluster");
655
656        assert_eq!(status.phase, "Pending");
657        assert_eq!(status.message, "Waiting for cluster");
658        assert!(!status.topic_exists);
659        assert_eq!(status.current_partitions, 0);
660    }
661
662    #[test]
663    fn test_build_failed_status() {
664        let topic = create_test_topic();
665        let status = build_failed_status(&topic, "Validation error");
666
667        assert_eq!(status.phase, "Failed");
668        assert!(!status.topic_exists);
669
670        let ready_cond = status
671            .conditions
672            .iter()
673            .find(|c| c.r#type == "Ready")
674            .unwrap();
675        assert_eq!(ready_cond.status, "False");
676        assert_eq!(ready_cond.reason, "ValidationFailed");
677    }
678
679    #[test]
680    fn test_build_error_status_preserves_existing() {
681        let mut topic = create_test_topic();
682        topic.status = Some(RivvenTopicStatus {
683            phase: "Ready".to_string(),
684            message: "Was ready".to_string(),
685            current_partitions: 6,
686            current_replication_factor: 3,
687            topic_exists: true,
688            observed_generation: 1,
689            conditions: vec![],
690            last_sync_time: None,
691            partition_info: vec![PartitionInfo {
692                partition: 0,
693                leader: 0,
694                replicas: vec![0, 1, 2],
695                isr: vec![0, 1, 2],
696            }],
697        });
698
699        let status = build_error_status(&topic, "Sync failed");
700
701        assert_eq!(status.phase, "Error");
702        // Should preserve existing state
703        assert!(status.topic_exists);
704        assert_eq!(status.current_partitions, 6);
705        assert_eq!(status.partition_info.len(), 1);
706    }
707
708    #[test]
709    fn test_partition_info_generation() {
710        let topic = create_test_topic();
711
712        // Simulate partition info generation
713        let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
714            .map(|i| PartitionInfo {
715                partition: i,
716                leader: (i % 3),
717                replicas: (0..topic.spec.replication_factor).collect(),
718                isr: (0..topic.spec.replication_factor).collect(),
719            })
720            .collect();
721
722        assert_eq!(partition_info.len(), 6);
723        assert_eq!(partition_info[0].partition, 0);
724        assert_eq!(partition_info[0].leader, 0);
725        assert_eq!(partition_info[3].leader, 0); // 3 % 3 = 0
726        assert_eq!(partition_info[4].leader, 1); // 4 % 3 = 1
727    }
728
729    #[test]
730    fn test_conditions_structure() {
731        let topic = create_test_topic();
732        let info = TopicInfo {
733            partitions: 6,
734            replication_factor: 3,
735            existed: false,
736            partition_info: vec![],
737        };
738
739        let status = build_ready_status(&topic, info);
740
741        // Should have all 4 condition types
742        let condition_types: Vec<&str> = status
743            .conditions
744            .iter()
745            .map(|c| c.r#type.as_str())
746            .collect();
747
748        assert!(condition_types.contains(&"Ready"));
749        assert!(condition_types.contains(&"Synced"));
750        assert!(condition_types.contains(&"ConfigApplied"));
751        assert!(condition_types.contains(&"ACLsApplied"));
752    }
753
754    #[test]
755    fn test_no_acls_condition() {
756        let topic = create_test_topic(); // No ACLs
757        let info = TopicInfo {
758            partitions: 6,
759            replication_factor: 3,
760            existed: false,
761            partition_info: vec![],
762        };
763
764        let status = build_ready_status(&topic, info);
765
766        let acl_cond = status
767            .conditions
768            .iter()
769            .find(|c| c.r#type == "ACLsApplied")
770            .unwrap();
771        assert_eq!(acl_cond.status, "N/A");
772        assert_eq!(acl_cond.reason, "NoACLs");
773    }
774}