Skip to main content

rivven_operator/
topic_controller.rs

1//! RivvenTopic Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenTopic
4//! custom resources. It watches for changes and reconciles topic state with
5//! the Rivven cluster.
6
7use crate::cluster_client::ClusterClient;
8use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
9use crate::error::{OperatorError, Result};
10use chrono::Utc;
11use futures::StreamExt;
12use kube::api::{Api, Patch, PatchParams};
13use kube::runtime::controller::{Action, Controller};
14use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
15use kube::runtime::watcher::Config;
16use kube::{Client, ResourceExt};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20use validator::Validate;
21
22/// Finalizer name for topic cleanup
23pub const TOPIC_FINALIZER: &str = "rivven.hupe1980.github.io/topic-finalizer";
24
25/// Default requeue interval for successful reconciliations
26const DEFAULT_REQUEUE_SECONDS: u64 = 120; // 2 minutes
27
28/// Requeue interval for error cases
29const ERROR_REQUEUE_SECONDS: u64 = 30;
30
31/// Context passed to the topic controller
32pub struct TopicControllerContext {
33    /// Kubernetes client
34    pub client: Client,
35    /// Rivven cluster client for topic operations
36    pub cluster_client: ClusterClient,
37    /// Metrics recorder
38    pub metrics: Option<TopicControllerMetrics>,
39}
40
41/// Metrics for the topic controller
42#[derive(Clone)]
43pub struct TopicControllerMetrics {
44    /// Counter for reconciliation attempts
45    pub reconciliations: metrics::Counter,
46    /// Counter for reconciliation errors
47    pub errors: metrics::Counter,
48    /// Histogram for reconciliation duration
49    pub duration: metrics::Histogram,
50    /// Gauge for total managed topics
51    pub topics_total: metrics::Gauge,
52}
53
54impl TopicControllerMetrics {
55    /// Create new topic controller metrics
56    pub fn new() -> Self {
57        Self {
58            reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
59            errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
60            duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
61            topics_total: metrics::gauge!("rivven_topic_managed_total"),
62        }
63    }
64}
65
66impl Default for TopicControllerMetrics {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Start the RivvenTopic controller
73pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
74    let topics: Api<RivvenTopic> = match &namespace {
75        Some(ns) => Api::namespaced(client.clone(), ns),
76        None => Api::all(client.clone()),
77    };
78
79    let ctx = Arc::new(TopicControllerContext {
80        client: client.clone(),
81        cluster_client: ClusterClient::new(),
82        metrics: Some(TopicControllerMetrics::new()),
83    });
84
85    info!(
86        namespace = namespace.as_deref().unwrap_or("all"),
87        "Starting RivvenTopic controller"
88    );
89
90    Controller::new(topics.clone(), Config::default())
91        .run(reconcile_topic, topic_error_policy, ctx)
92        .for_each(|result| async move {
93            match result {
94                Ok((obj, action)) => {
95                    debug!(
96                        name = obj.name,
97                        namespace = obj.namespace,
98                        ?action,
99                        "Topic reconciliation completed"
100                    );
101                }
102                Err(e) => {
103                    error!(error = %e, "Topic reconciliation failed");
104                }
105            }
106        })
107        .await;
108
109    Ok(())
110}
111
112/// Main reconciliation function for RivvenTopic
113#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
114async fn reconcile_topic(
115    topic: Arc<RivvenTopic>,
116    ctx: Arc<TopicControllerContext>,
117) -> Result<Action> {
118    let start = std::time::Instant::now();
119
120    if let Some(ref metrics) = ctx.metrics {
121        metrics.reconciliations.increment(1);
122    }
123
124    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
125    let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
126
127    let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
128        match event {
129            FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
130            FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
131        }
132    })
133    .await;
134
135    if let Some(ref metrics) = ctx.metrics {
136        metrics.duration.record(start.elapsed().as_secs_f64());
137    }
138
139    result.map_err(|e| {
140        if let Some(ref metrics) = ctx.metrics {
141            metrics.errors.increment(1);
142        }
143        OperatorError::ReconcileFailed(e.to_string())
144    })
145}
146
147/// Apply (create/update) the topic
148#[instrument(skip(topic, ctx))]
149async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
150    let name = topic.name_any();
151    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
152
153    info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
154
155    // Validate the topic spec
156    if let Err(errors) = topic.spec.validate() {
157        let error_messages: Vec<String> = errors
158            .field_errors()
159            .iter()
160            .flat_map(|(field, errs)| {
161                errs.iter()
162                    .map(move |e| format!("{}: {:?}", field, e.message))
163            })
164            .collect();
165        let error_msg = error_messages.join("; ");
166        warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
167
168        update_topic_status(
169            &ctx.client,
170            &namespace,
171            &name,
172            build_failed_status(&topic, &error_msg),
173        )
174        .await?;
175
176        return Err(OperatorError::InvalidConfig(error_msg));
177    }
178
179    // Verify cluster reference exists
180    let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
181
182    // Get broker endpoints from cluster status
183    let broker_endpoints = get_cluster_endpoints(&cluster);
184
185    if broker_endpoints.is_empty() {
186        warn!(name = %name, "Cluster has no ready broker endpoints");
187        update_topic_status(
188            &ctx.client,
189            &namespace,
190            &name,
191            build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
192        )
193        .await?;
194        return Ok(Action::requeue(Duration::from_secs(30)));
195    }
196
197    // Reconcile the topic with Rivven cluster using the real cluster client
198    let topic_result =
199        reconcile_topic_with_cluster(&topic, &broker_endpoints, &ctx.cluster_client).await;
200
201    match topic_result {
202        Ok(topic_info) => {
203            // Update status to ready
204            let status = build_ready_status(&topic, topic_info);
205            update_topic_status(&ctx.client, &namespace, &name, status).await?;
206            info!(name = %name, "Topic reconciliation complete");
207            Ok(Action::requeue(Duration::from_secs(
208                DEFAULT_REQUEUE_SECONDS,
209            )))
210        }
211        Err(e) => {
212            // Update status with error
213            let status = build_error_status(&topic, &e.to_string());
214            update_topic_status(&ctx.client, &namespace, &name, status).await?;
215            warn!(name = %name, error = %e, "Topic reconciliation failed");
216            Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
217        }
218    }
219}
220
221/// Verify the referenced RivvenCluster exists and return it
222async fn verify_cluster_ref(
223    client: &Client,
224    namespace: &str,
225    cluster_ref: &ClusterReference,
226) -> Result<crate::crd::RivvenCluster> {
227    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230    match clusters.get(&cluster_ref.name).await {
231        Ok(cluster) => Ok(cluster),
232        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233            format!("{}/{}", cluster_ns, cluster_ref.name),
234        )),
235        Err(e) => Err(OperatorError::from(e)),
236    }
237}
238
239/// Get broker endpoints from cluster status
240fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
241    cluster
242        .status
243        .as_ref()
244        .map(|s| s.broker_endpoints.clone())
245        .unwrap_or_default()
246}
247
248/// Information about a reconciled topic
249struct TopicInfo {
250    /// Actual partition count
251    partitions: i32,
252    /// Actual replication factor
253    replication_factor: i32,
254    /// Whether the topic already existed
255    existed: bool,
256    /// Partition info (leaders, replicas, ISR)
257    partition_info: Vec<PartitionInfo>,
258}
259
260/// Reconcile topic state with the Rivven cluster
261///
262/// This connects to the cluster and ensures the topic exists with the correct configuration.
263async fn reconcile_topic_with_cluster(
264    topic: &RivvenTopic,
265    broker_endpoints: &[String],
266    cluster_client: &ClusterClient,
267) -> Result<TopicInfo> {
268    let topic_name = topic.name_any();
269    let spec = &topic.spec;
270
271    info!(
272        topic = %topic_name,
273        partitions = spec.partitions,
274        replication = spec.replication_factor,
275        brokers = ?broker_endpoints,
276        "Reconciling topic with cluster"
277    );
278
279    // Use the real cluster client to ensure the topic exists
280    let cluster_topic_info = cluster_client
281        .ensure_topic(broker_endpoints, &topic_name, spec.partitions as u32)
282        .await?;
283
284    // Build partition info based on the actual partition count
285    // Note: In production, this would come from describe_topic API
286    let partition_info: Vec<PartitionInfo> = (0..cluster_topic_info.partitions as i32)
287        .map(|i| PartitionInfo {
288            partition: i,
289            leader: (i % 3), // Distribution across brokers
290            replicas: (0..spec.replication_factor).collect(),
291            isr: (0..spec.replication_factor).collect(),
292        })
293        .collect();
294
295    info!(
296        topic = %topic_name,
297        partitions = cluster_topic_info.partitions,
298        existed = cluster_topic_info.existed,
299        "Topic reconciliation successful"
300    );
301
302    Ok(TopicInfo {
303        partitions: cluster_topic_info.partitions as i32,
304        replication_factor: spec.replication_factor,
305        existed: cluster_topic_info.existed,
306        partition_info,
307    })
308}
309
310/// Build a ready status
311fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
312    let now = Utc::now().to_rfc3339();
313
314    let conditions = vec![
315        TopicCondition {
316            r#type: "Ready".to_string(),
317            status: "True".to_string(),
318            reason: "TopicReady".to_string(),
319            message: "Topic is ready and serving traffic".to_string(),
320            last_transition_time: now.clone(),
321        },
322        TopicCondition {
323            r#type: "Synced".to_string(),
324            status: "True".to_string(),
325            reason: "SyncSucceeded".to_string(),
326            message: if info.existed {
327                "Topic configuration synchronized".to_string()
328            } else {
329                "Topic created successfully".to_string()
330            },
331            last_transition_time: now.clone(),
332        },
333        TopicCondition {
334            r#type: "ConfigApplied".to_string(),
335            status: "True".to_string(),
336            reason: "ConfigApplied".to_string(),
337            message: "Topic configuration applied".to_string(),
338            last_transition_time: now.clone(),
339        },
340        TopicCondition {
341            r#type: "ACLsApplied".to_string(),
342            status: if topic.spec.acls.is_empty() {
343                "N/A".to_string()
344            } else {
345                "True".to_string()
346            },
347            reason: if topic.spec.acls.is_empty() {
348                "NoACLs".to_string()
349            } else {
350                "ACLsApplied".to_string()
351            },
352            message: if topic.spec.acls.is_empty() {
353                "No ACLs configured".to_string()
354            } else {
355                format!("{} ACL entries applied", topic.spec.acls.len())
356            },
357            last_transition_time: now.clone(),
358        },
359    ];
360
361    RivvenTopicStatus {
362        phase: "Ready".to_string(),
363        message: "Topic is ready".to_string(),
364        current_partitions: info.partitions,
365        current_replication_factor: info.replication_factor,
366        topic_exists: true,
367        observed_generation: topic.metadata.generation.unwrap_or(0),
368        conditions,
369        last_sync_time: Some(now),
370        partition_info: info.partition_info,
371    }
372}
373
374/// Build a pending status
375fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
376    let now = Utc::now().to_rfc3339();
377
378    RivvenTopicStatus {
379        phase: "Pending".to_string(),
380        message: message.to_string(),
381        current_partitions: 0,
382        current_replication_factor: 0,
383        topic_exists: false,
384        observed_generation: topic.metadata.generation.unwrap_or(0),
385        conditions: vec![TopicCondition {
386            r#type: "Ready".to_string(),
387            status: "False".to_string(),
388            reason: "Pending".to_string(),
389            message: message.to_string(),
390            last_transition_time: now.clone(),
391        }],
392        last_sync_time: Some(now),
393        partition_info: vec![],
394    }
395}
396
397/// Build a failed status
398fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
399    let now = Utc::now().to_rfc3339();
400
401    RivvenTopicStatus {
402        phase: "Failed".to_string(),
403        message: error_msg.to_string(),
404        current_partitions: 0,
405        current_replication_factor: 0,
406        topic_exists: false,
407        observed_generation: topic.metadata.generation.unwrap_or(0),
408        conditions: vec![TopicCondition {
409            r#type: "Ready".to_string(),
410            status: "False".to_string(),
411            reason: "ValidationFailed".to_string(),
412            message: error_msg.to_string(),
413            last_transition_time: now.clone(),
414        }],
415        last_sync_time: Some(now),
416        partition_info: vec![],
417    }
418}
419
420/// Build an error status (reconciliation failed)
421fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
422    let now = Utc::now().to_rfc3339();
423
424    // Preserve existing status where possible
425    let existing_status = topic.status.clone().unwrap_or_default();
426
427    RivvenTopicStatus {
428        phase: "Error".to_string(),
429        message: error_msg.to_string(),
430        current_partitions: existing_status.current_partitions,
431        current_replication_factor: existing_status.current_replication_factor,
432        topic_exists: existing_status.topic_exists,
433        observed_generation: topic.metadata.generation.unwrap_or(0),
434        conditions: vec![
435            TopicCondition {
436                r#type: "Ready".to_string(),
437                status: if existing_status.topic_exists {
438                    "True".to_string()
439                } else {
440                    "False".to_string()
441                },
442                reason: "ReconcileError".to_string(),
443                message: error_msg.to_string(),
444                last_transition_time: now.clone(),
445            },
446            TopicCondition {
447                r#type: "Synced".to_string(),
448                status: "False".to_string(),
449                reason: "SyncFailed".to_string(),
450                message: error_msg.to_string(),
451                last_transition_time: now.clone(),
452            },
453        ],
454        last_sync_time: Some(now),
455        partition_info: existing_status.partition_info,
456    }
457}
458
459/// Update the topic status subresource
460async fn update_topic_status(
461    client: &Client,
462    namespace: &str,
463    name: &str,
464    status: RivvenTopicStatus,
465) -> Result<()> {
466    let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
467
468    debug!(name = %name, phase = %status.phase, "Updating topic status");
469
470    let patch = serde_json::json!({
471        "status": status
472    });
473
474    let patch_params = PatchParams::default();
475    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
476        .await
477        .map_err(OperatorError::from)?;
478
479    Ok(())
480}
481
482/// Cleanup resources when topic is deleted
483#[instrument(skip(topic, ctx))]
484async fn cleanup_topic(
485    topic: Arc<RivvenTopic>,
486    ctx: Arc<TopicControllerContext>,
487) -> Result<Action> {
488    let name = topic.name_any();
489    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
490
491    info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
492
493    // Check if we should delete the topic from the cluster
494    if topic.spec.delete_on_remove {
495        info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
496
497        // Verify cluster exists
498        match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
499            Ok(cluster) => {
500                let endpoints = get_cluster_endpoints(&cluster);
501                if !endpoints.is_empty() {
502                    // Delete topic from cluster using real cluster client
503                    if let Err(e) =
504                        delete_topic_from_cluster(&name, &endpoints, &ctx.cluster_client).await
505                    {
506                        warn!(
507                            name = %name,
508                            error = %e,
509                            "Failed to delete topic from cluster (may not exist)"
510                        );
511                    }
512                }
513            }
514            Err(e) => {
515                warn!(
516                    name = %name,
517                    error = %e,
518                    "Cluster not found during cleanup, topic may be orphaned"
519                );
520            }
521        }
522    } else {
523        info!(
524            name = %name,
525            "delete_on_remove is false, topic will remain in cluster"
526        );
527    }
528
529    info!(name = %name, "Topic cleanup complete");
530
531    Ok(Action::await_change())
532}
533
534/// Delete a topic from the Rivven cluster
535async fn delete_topic_from_cluster(
536    topic_name: &str,
537    broker_endpoints: &[String],
538    cluster_client: &ClusterClient,
539) -> Result<()> {
540    info!(topic = %topic_name, "Deleting topic from cluster");
541    cluster_client
542        .delete_topic(broker_endpoints, topic_name)
543        .await
544}
545
546/// Error policy for the topic controller
547fn topic_error_policy(
548    _topic: Arc<RivvenTopic>,
549    error: &OperatorError,
550    _ctx: Arc<TopicControllerContext>,
551) -> Action {
552    warn!(
553        error = %error,
554        "Topic reconciliation error, will retry"
555    );
556
557    let delay = error
558        .requeue_delay()
559        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
560
561    Action::requeue(delay)
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
568    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
569    use std::collections::BTreeMap;
570
571    fn create_test_topic() -> RivvenTopic {
572        RivvenTopic {
573            metadata: ObjectMeta {
574                name: Some("test-topic".to_string()),
575                namespace: Some("default".to_string()),
576                uid: Some("test-uid".to_string()),
577                generation: Some(1),
578                ..Default::default()
579            },
580            spec: RivvenTopicSpec {
581                cluster_ref: ClusterReference {
582                    name: "test-cluster".to_string(),
583                    namespace: None,
584                },
585                partitions: 6,
586                replication_factor: 3,
587                config: TopicConfig::default(),
588                acls: vec![],
589                delete_on_remove: true,
590                topic_labels: BTreeMap::new(),
591            },
592            status: None,
593        }
594    }
595
596    fn create_test_topic_with_acls() -> RivvenTopic {
597        let mut topic = create_test_topic();
598        topic.spec.acls = vec![
599            TopicAcl {
600                principal: "user:app1".to_string(),
601                operations: vec!["Read".to_string(), "Write".to_string()],
602                permission_type: "Allow".to_string(),
603                host: "*".to_string(),
604            },
605            TopicAcl {
606                principal: "user:analytics".to_string(),
607                operations: vec!["Read".to_string()],
608                permission_type: "Allow".to_string(),
609                host: "*".to_string(),
610            },
611        ];
612        topic
613    }
614
615    #[test]
616    fn test_build_ready_status() {
617        let topic = create_test_topic();
618        let info = TopicInfo {
619            partitions: 6,
620            replication_factor: 3,
621            existed: false,
622            partition_info: vec![PartitionInfo {
623                partition: 0,
624                leader: 0,
625                replicas: vec![0, 1, 2],
626                isr: vec![0, 1, 2],
627            }],
628        };
629
630        let status = build_ready_status(&topic, info);
631
632        assert_eq!(status.phase, "Ready");
633        assert_eq!(status.current_partitions, 6);
634        assert_eq!(status.current_replication_factor, 3);
635        assert!(status.topic_exists);
636        assert_eq!(status.conditions.len(), 4);
637
638        let ready_cond = status
639            .conditions
640            .iter()
641            .find(|c| c.r#type == "Ready")
642            .unwrap();
643        assert_eq!(ready_cond.status, "True");
644    }
645
646    #[test]
647    fn test_build_ready_status_with_acls() {
648        let topic = create_test_topic_with_acls();
649        let info = TopicInfo {
650            partitions: 6,
651            replication_factor: 3,
652            existed: true,
653            partition_info: vec![],
654        };
655
656        let status = build_ready_status(&topic, info);
657
658        let acl_cond = status
659            .conditions
660            .iter()
661            .find(|c| c.r#type == "ACLsApplied")
662            .unwrap();
663        assert_eq!(acl_cond.status, "True");
664        assert!(acl_cond.message.contains("2 ACL entries"));
665    }
666
667    #[test]
668    fn test_build_pending_status() {
669        let topic = create_test_topic();
670        let status = build_pending_status(&topic, "Waiting for cluster");
671
672        assert_eq!(status.phase, "Pending");
673        assert_eq!(status.message, "Waiting for cluster");
674        assert!(!status.topic_exists);
675        assert_eq!(status.current_partitions, 0);
676    }
677
678    #[test]
679    fn test_build_failed_status() {
680        let topic = create_test_topic();
681        let status = build_failed_status(&topic, "Validation error");
682
683        assert_eq!(status.phase, "Failed");
684        assert!(!status.topic_exists);
685
686        let ready_cond = status
687            .conditions
688            .iter()
689            .find(|c| c.r#type == "Ready")
690            .unwrap();
691        assert_eq!(ready_cond.status, "False");
692        assert_eq!(ready_cond.reason, "ValidationFailed");
693    }
694
695    #[test]
696    fn test_build_error_status_preserves_existing() {
697        let mut topic = create_test_topic();
698        topic.status = Some(RivvenTopicStatus {
699            phase: "Ready".to_string(),
700            message: "Was ready".to_string(),
701            current_partitions: 6,
702            current_replication_factor: 3,
703            topic_exists: true,
704            observed_generation: 1,
705            conditions: vec![],
706            last_sync_time: None,
707            partition_info: vec![PartitionInfo {
708                partition: 0,
709                leader: 0,
710                replicas: vec![0, 1, 2],
711                isr: vec![0, 1, 2],
712            }],
713        });
714
715        let status = build_error_status(&topic, "Sync failed");
716
717        assert_eq!(status.phase, "Error");
718        // Should preserve existing state
719        assert!(status.topic_exists);
720        assert_eq!(status.current_partitions, 6);
721        assert_eq!(status.partition_info.len(), 1);
722    }
723
724    #[test]
725    fn test_partition_info_generation() {
726        let topic = create_test_topic();
727
728        // Simulate partition info generation
729        let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
730            .map(|i| PartitionInfo {
731                partition: i,
732                leader: (i % 3),
733                replicas: (0..topic.spec.replication_factor).collect(),
734                isr: (0..topic.spec.replication_factor).collect(),
735            })
736            .collect();
737
738        assert_eq!(partition_info.len(), 6);
739        assert_eq!(partition_info[0].partition, 0);
740        assert_eq!(partition_info[0].leader, 0);
741        assert_eq!(partition_info[3].leader, 0); // 3 % 3 = 0
742        assert_eq!(partition_info[4].leader, 1); // 4 % 3 = 1
743    }
744
745    #[test]
746    fn test_conditions_structure() {
747        let topic = create_test_topic();
748        let info = TopicInfo {
749            partitions: 6,
750            replication_factor: 3,
751            existed: false,
752            partition_info: vec![],
753        };
754
755        let status = build_ready_status(&topic, info);
756
757        // Should have all 4 condition types
758        let condition_types: Vec<&str> = status
759            .conditions
760            .iter()
761            .map(|c| c.r#type.as_str())
762            .collect();
763
764        assert!(condition_types.contains(&"Ready"));
765        assert!(condition_types.contains(&"Synced"));
766        assert!(condition_types.contains(&"ConfigApplied"));
767        assert!(condition_types.contains(&"ACLsApplied"));
768    }
769
770    #[test]
771    fn test_no_acls_condition() {
772        let topic = create_test_topic(); // No ACLs
773        let info = TopicInfo {
774            partitions: 6,
775            replication_factor: 3,
776            existed: false,
777            partition_info: vec![],
778        };
779
780        let status = build_ready_status(&topic, info);
781
782        let acl_cond = status
783            .conditions
784            .iter()
785            .find(|c| c.r#type == "ACLsApplied")
786            .unwrap();
787        assert_eq!(acl_cond.status, "N/A");
788        assert_eq!(acl_cond.reason, "NoACLs");
789    }
790}