Skip to main content

rivven_operator/
topic_controller.rs

1//! RivvenTopic Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenTopic
4//! custom resources. It watches for changes and reconciles topic state with
5//! the Rivven cluster.
6
7use crate::cluster_client::ClusterClient;
8use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
9use crate::error::{OperatorError, Result};
10use chrono::Utc;
11use futures::StreamExt;
12use kube::api::{Api, Patch, PatchParams};
13use kube::runtime::controller::{Action, Controller};
14use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
15use kube::runtime::watcher::Config;
16use kube::{Client, ResourceExt};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20use validator::Validate;
21
22/// Finalizer name for topic cleanup
23pub const TOPIC_FINALIZER: &str = "rivven.hupe1980.github.io/topic-finalizer";
24
25/// Default requeue interval for successful reconciliations
26const DEFAULT_REQUEUE_SECONDS: u64 = 120; // 2 minutes
27
28/// Requeue interval for error cases
29const ERROR_REQUEUE_SECONDS: u64 = 30;
30
31/// Context passed to the topic controller
32pub struct TopicControllerContext {
33    /// Kubernetes client
34    pub client: Client,
35    /// Rivven cluster client for topic operations
36    pub cluster_client: ClusterClient,
37    /// Metrics recorder
38    pub metrics: Option<TopicControllerMetrics>,
39}
40
41/// Metrics for the topic controller
42#[derive(Clone)]
43pub struct TopicControllerMetrics {
44    /// Counter for reconciliation attempts
45    pub reconciliations: metrics::Counter,
46    /// Counter for reconciliation errors
47    pub errors: metrics::Counter,
48    /// Histogram for reconciliation duration
49    pub duration: metrics::Histogram,
50    /// Gauge for total managed topics
51    pub topics_total: metrics::Gauge,
52}
53
54impl TopicControllerMetrics {
55    /// Create new topic controller metrics
56    pub fn new() -> Self {
57        Self {
58            reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
59            errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
60            duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
61            topics_total: metrics::gauge!("rivven_topic_managed_total"),
62        }
63    }
64}
65
66impl Default for TopicControllerMetrics {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Start the RivvenTopic controller
73pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
74    let topics: Api<RivvenTopic> = match &namespace {
75        Some(ns) => Api::namespaced(client.clone(), ns),
76        None => Api::all(client.clone()),
77    };
78
79    let ctx = Arc::new(TopicControllerContext {
80        client: client.clone(),
81        cluster_client: ClusterClient::new(),
82        metrics: Some(TopicControllerMetrics::new()),
83    });
84
85    info!(
86        namespace = namespace.as_deref().unwrap_or("all"),
87        "Starting RivvenTopic controller"
88    );
89
90    Controller::new(topics.clone(), Config::default())
91        .run(reconcile_topic, topic_error_policy, ctx)
92        .for_each(|result| async move {
93            match result {
94                Ok((obj, action)) => {
95                    debug!(
96                        name = obj.name,
97                        namespace = obj.namespace,
98                        ?action,
99                        "Topic reconciliation completed"
100                    );
101                }
102                Err(e) => {
103                    error!(error = %e, "Topic reconciliation failed");
104                }
105            }
106        })
107        .await;
108
109    Ok(())
110}
111
112/// Main reconciliation function for RivvenTopic
113#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
114async fn reconcile_topic(
115    topic: Arc<RivvenTopic>,
116    ctx: Arc<TopicControllerContext>,
117) -> Result<Action> {
118    let start = std::time::Instant::now();
119
120    if let Some(ref metrics) = ctx.metrics {
121        metrics.reconciliations.increment(1);
122    }
123
124    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
125    let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
126
127    let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
128        match event {
129            FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
130            FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
131        }
132    })
133    .await;
134
135    if let Some(ref metrics) = ctx.metrics {
136        metrics.duration.record(start.elapsed().as_secs_f64());
137    }
138
139    result.map_err(|e| {
140        if let Some(ref metrics) = ctx.metrics {
141            metrics.errors.increment(1);
142        }
143        OperatorError::ReconcileFailed(e.to_string())
144    })
145}
146
147/// Apply (create/update) the topic
148#[instrument(skip(topic, ctx))]
149async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
150    let name = topic.name_any();
151    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
152
153    info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
154
155    // Validate the topic spec
156    if let Err(errors) = topic.spec.validate() {
157        let error_messages: Vec<String> = errors
158            .field_errors()
159            .iter()
160            .flat_map(|(field, errs)| {
161                errs.iter()
162                    .map(move |e| format!("{}: {:?}", field, e.message))
163            })
164            .collect();
165        let error_msg = error_messages.join("; ");
166        warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
167
168        update_topic_status(
169            &ctx.client,
170            &namespace,
171            &name,
172            build_failed_status(&topic, &error_msg),
173        )
174        .await?;
175
176        return Err(OperatorError::InvalidConfig(error_msg));
177    }
178
179    // Verify cluster reference exists
180    let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
181
182    // Get broker endpoints from cluster status
183    let broker_endpoints = get_cluster_endpoints(&cluster);
184
185    if broker_endpoints.is_empty() {
186        warn!(name = %name, "Cluster has no ready broker endpoints");
187        update_topic_status(
188            &ctx.client,
189            &namespace,
190            &name,
191            build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
192        )
193        .await?;
194        return Ok(Action::requeue(Duration::from_secs(30)));
195    }
196
197    // Reconcile the topic with Rivven cluster using the real cluster client
198    let topic_result =
199        reconcile_topic_with_cluster(&topic, &broker_endpoints, &ctx.cluster_client).await;
200
201    match topic_result {
202        Ok(topic_info) => {
203            // Update status to ready
204            let status = build_ready_status(&topic, topic_info);
205            update_topic_status(&ctx.client, &namespace, &name, status).await?;
206            if let Some(ref metrics) = ctx.metrics {
207                metrics.topics_total.increment(1.0);
208            }
209            info!(name = %name, "Topic reconciliation complete");
210            Ok(Action::requeue(Duration::from_secs(
211                DEFAULT_REQUEUE_SECONDS,
212            )))
213        }
214        Err(e) => {
215            // Update status with error
216            let status = build_error_status(&topic, &e.to_string());
217            update_topic_status(&ctx.client, &namespace, &name, status).await?;
218            warn!(name = %name, error = %e, "Topic reconciliation failed");
219            Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
220        }
221    }
222}
223
224/// Verify the referenced RivvenCluster exists and return it
225async fn verify_cluster_ref(
226    client: &Client,
227    namespace: &str,
228    cluster_ref: &ClusterReference,
229) -> Result<crate::crd::RivvenCluster> {
230    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
231    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
232
233    match clusters.get(&cluster_ref.name).await {
234        Ok(cluster) => Ok(cluster),
235        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
236            format!("{}/{}", cluster_ns, cluster_ref.name),
237        )),
238        Err(e) => Err(OperatorError::from(e)),
239    }
240}
241
242/// Get broker endpoints from cluster status
243fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
244    cluster
245        .status
246        .as_ref()
247        .map(|s| s.broker_endpoints.clone())
248        .unwrap_or_default()
249}
250
251/// Information about a reconciled topic
252struct TopicInfo {
253    /// Actual partition count
254    partitions: i32,
255    /// Actual replication factor
256    replication_factor: i32,
257    /// Whether the topic already existed
258    existed: bool,
259    /// Partition info (leaders, replicas, ISR)
260    partition_info: Vec<PartitionInfo>,
261}
262
263/// Reconcile topic state with the Rivven cluster
264///
265/// This connects to the cluster and ensures the topic exists with the correct configuration.
266async fn reconcile_topic_with_cluster(
267    topic: &RivvenTopic,
268    broker_endpoints: &[String],
269    cluster_client: &ClusterClient,
270) -> Result<TopicInfo> {
271    let topic_name = topic.name_any();
272    let spec = &topic.spec;
273
274    info!(
275        topic = %topic_name,
276        partitions = spec.partitions,
277        replication = spec.replication_factor,
278        brokers = ?broker_endpoints,
279        "Reconciling topic with cluster"
280    );
281
282    // Use the real cluster client to ensure the topic exists
283    let cluster_topic_info = cluster_client
284        .ensure_topic(broker_endpoints, &topic_name, spec.partitions as u32)
285        .await?;
286
287    // Build partition info based on the actual partition count
288    // Note: In production, this would come from describe_topic API
289    let partition_info: Vec<PartitionInfo> = (0..cluster_topic_info.partitions as i32)
290        .map(|i| PartitionInfo {
291            partition: i,
292            leader: (i % 3), // Distribution across brokers
293            replicas: (0..spec.replication_factor).collect(),
294            isr: (0..spec.replication_factor).collect(),
295        })
296        .collect();
297
298    info!(
299        topic = %topic_name,
300        partitions = cluster_topic_info.partitions,
301        existed = cluster_topic_info.existed,
302        "Topic reconciliation successful"
303    );
304
305    Ok(TopicInfo {
306        partitions: cluster_topic_info.partitions as i32,
307        replication_factor: spec.replication_factor,
308        existed: cluster_topic_info.existed,
309        partition_info,
310    })
311}
312
313/// Build a ready status
314fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
315    let now = Utc::now().to_rfc3339();
316
317    let conditions = vec![
318        TopicCondition {
319            r#type: "Ready".to_string(),
320            status: "True".to_string(),
321            reason: "TopicReady".to_string(),
322            message: "Topic is ready and serving traffic".to_string(),
323            last_transition_time: now.clone(),
324        },
325        TopicCondition {
326            r#type: "Synced".to_string(),
327            status: "True".to_string(),
328            reason: "SyncSucceeded".to_string(),
329            message: if info.existed {
330                "Topic configuration synchronized".to_string()
331            } else {
332                "Topic created successfully".to_string()
333            },
334            last_transition_time: now.clone(),
335        },
336        TopicCondition {
337            r#type: "ConfigApplied".to_string(),
338            status: "True".to_string(),
339            reason: "ConfigApplied".to_string(),
340            message: "Topic configuration applied".to_string(),
341            last_transition_time: now.clone(),
342        },
343        TopicCondition {
344            r#type: "ACLsApplied".to_string(),
345            status: if topic.spec.acls.is_empty() {
346                "N/A".to_string()
347            } else {
348                "True".to_string()
349            },
350            reason: if topic.spec.acls.is_empty() {
351                "NoACLs".to_string()
352            } else {
353                "ACLsApplied".to_string()
354            },
355            message: if topic.spec.acls.is_empty() {
356                "No ACLs configured".to_string()
357            } else {
358                format!("{} ACL entries applied", topic.spec.acls.len())
359            },
360            last_transition_time: now.clone(),
361        },
362    ];
363
364    RivvenTopicStatus {
365        phase: "Ready".to_string(),
366        message: "Topic is ready".to_string(),
367        current_partitions: info.partitions,
368        current_replication_factor: info.replication_factor,
369        topic_exists: true,
370        observed_generation: topic.metadata.generation.unwrap_or(0),
371        conditions,
372        last_sync_time: Some(now),
373        partition_info: info.partition_info,
374    }
375}
376
377/// Build a pending status
378fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
379    let now = Utc::now().to_rfc3339();
380
381    RivvenTopicStatus {
382        phase: "Pending".to_string(),
383        message: message.to_string(),
384        current_partitions: 0,
385        current_replication_factor: 0,
386        topic_exists: false,
387        observed_generation: topic.metadata.generation.unwrap_or(0),
388        conditions: vec![TopicCondition {
389            r#type: "Ready".to_string(),
390            status: "False".to_string(),
391            reason: "Pending".to_string(),
392            message: message.to_string(),
393            last_transition_time: now.clone(),
394        }],
395        last_sync_time: Some(now),
396        partition_info: vec![],
397    }
398}
399
400/// Build a failed status
401fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
402    let now = Utc::now().to_rfc3339();
403
404    RivvenTopicStatus {
405        phase: "Failed".to_string(),
406        message: error_msg.to_string(),
407        current_partitions: 0,
408        current_replication_factor: 0,
409        topic_exists: false,
410        observed_generation: topic.metadata.generation.unwrap_or(0),
411        conditions: vec![TopicCondition {
412            r#type: "Ready".to_string(),
413            status: "False".to_string(),
414            reason: "ValidationFailed".to_string(),
415            message: error_msg.to_string(),
416            last_transition_time: now.clone(),
417        }],
418        last_sync_time: Some(now),
419        partition_info: vec![],
420    }
421}
422
423/// Build an error status (reconciliation failed)
424fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
425    let now = Utc::now().to_rfc3339();
426
427    // Preserve existing status where possible
428    let existing_status = topic.status.clone().unwrap_or_default();
429
430    RivvenTopicStatus {
431        phase: "Error".to_string(),
432        message: error_msg.to_string(),
433        current_partitions: existing_status.current_partitions,
434        current_replication_factor: existing_status.current_replication_factor,
435        topic_exists: existing_status.topic_exists,
436        observed_generation: topic.metadata.generation.unwrap_or(0),
437        conditions: vec![
438            TopicCondition {
439                r#type: "Ready".to_string(),
440                status: if existing_status.topic_exists {
441                    "True".to_string()
442                } else {
443                    "False".to_string()
444                },
445                reason: "ReconcileError".to_string(),
446                message: error_msg.to_string(),
447                last_transition_time: now.clone(),
448            },
449            TopicCondition {
450                r#type: "Synced".to_string(),
451                status: "False".to_string(),
452                reason: "SyncFailed".to_string(),
453                message: error_msg.to_string(),
454                last_transition_time: now.clone(),
455            },
456        ],
457        last_sync_time: Some(now),
458        partition_info: existing_status.partition_info,
459    }
460}
461
462/// Update the topic status subresource
463async fn update_topic_status(
464    client: &Client,
465    namespace: &str,
466    name: &str,
467    status: RivvenTopicStatus,
468) -> Result<()> {
469    let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
470
471    debug!(name = %name, phase = %status.phase, "Updating topic status");
472
473    let patch = serde_json::json!({
474        "status": status
475    });
476
477    let patch_params = PatchParams::default();
478    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
479        .await
480        .map_err(OperatorError::from)?;
481
482    Ok(())
483}
484
485/// Cleanup resources when topic is deleted
486#[instrument(skip(topic, ctx))]
487async fn cleanup_topic(
488    topic: Arc<RivvenTopic>,
489    ctx: Arc<TopicControllerContext>,
490) -> Result<Action> {
491    let name = topic.name_any();
492    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
493
494    info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
495
496    // Check if we should delete the topic from the cluster
497    if topic.spec.delete_on_remove {
498        info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
499
500        // Verify cluster exists
501        match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
502            Ok(cluster) => {
503                let endpoints = get_cluster_endpoints(&cluster);
504                if !endpoints.is_empty() {
505                    // Delete topic from cluster using real cluster client
506                    if let Err(e) =
507                        delete_topic_from_cluster(&name, &endpoints, &ctx.cluster_client).await
508                    {
509                        warn!(
510                            name = %name,
511                            error = %e,
512                            "Failed to delete topic from cluster (may not exist)"
513                        );
514                    }
515                }
516            }
517            Err(e) => {
518                warn!(
519                    name = %name,
520                    error = %e,
521                    "Cluster not found during cleanup, topic may be orphaned"
522                );
523            }
524        }
525    } else {
526        info!(
527            name = %name,
528            "delete_on_remove is false, topic will remain in cluster"
529        );
530    }
531
532    info!(name = %name, "Topic cleanup complete");
533
534    if let Some(ref metrics) = ctx.metrics {
535        metrics.topics_total.decrement(1.0);
536    }
537
538    Ok(Action::await_change())
539}
540
541/// Delete a topic from the Rivven cluster
542async fn delete_topic_from_cluster(
543    topic_name: &str,
544    broker_endpoints: &[String],
545    cluster_client: &ClusterClient,
546) -> Result<()> {
547    info!(topic = %topic_name, "Deleting topic from cluster");
548    cluster_client
549        .delete_topic(broker_endpoints, topic_name)
550        .await
551}
552
553/// Error policy for the topic controller
554fn topic_error_policy(
555    _topic: Arc<RivvenTopic>,
556    error: &OperatorError,
557    _ctx: Arc<TopicControllerContext>,
558) -> Action {
559    warn!(
560        error = %error,
561        "Topic reconciliation error, will retry"
562    );
563
564    let delay = error
565        .requeue_delay()
566        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
567
568    Action::requeue(delay)
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
575    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
576    use std::collections::BTreeMap;
577
578    fn create_test_topic() -> RivvenTopic {
579        RivvenTopic {
580            metadata: ObjectMeta {
581                name: Some("test-topic".to_string()),
582                namespace: Some("default".to_string()),
583                uid: Some("test-uid".to_string()),
584                generation: Some(1),
585                ..Default::default()
586            },
587            spec: RivvenTopicSpec {
588                cluster_ref: ClusterReference {
589                    name: "test-cluster".to_string(),
590                    namespace: None,
591                },
592                partitions: 6,
593                replication_factor: 3,
594                config: TopicConfig::default(),
595                acls: vec![],
596                delete_on_remove: true,
597                topic_labels: BTreeMap::new(),
598            },
599            status: None,
600        }
601    }
602
603    fn create_test_topic_with_acls() -> RivvenTopic {
604        let mut topic = create_test_topic();
605        topic.spec.acls = vec![
606            TopicAcl {
607                principal: "user:app1".to_string(),
608                operations: vec!["Read".to_string(), "Write".to_string()],
609                permission_type: "Allow".to_string(),
610                host: "*".to_string(),
611            },
612            TopicAcl {
613                principal: "user:analytics".to_string(),
614                operations: vec!["Read".to_string()],
615                permission_type: "Allow".to_string(),
616                host: "*".to_string(),
617            },
618        ];
619        topic
620    }
621
622    #[test]
623    fn test_build_ready_status() {
624        let topic = create_test_topic();
625        let info = TopicInfo {
626            partitions: 6,
627            replication_factor: 3,
628            existed: false,
629            partition_info: vec![PartitionInfo {
630                partition: 0,
631                leader: 0,
632                replicas: vec![0, 1, 2],
633                isr: vec![0, 1, 2],
634            }],
635        };
636
637        let status = build_ready_status(&topic, info);
638
639        assert_eq!(status.phase, "Ready");
640        assert_eq!(status.current_partitions, 6);
641        assert_eq!(status.current_replication_factor, 3);
642        assert!(status.topic_exists);
643        assert_eq!(status.conditions.len(), 4);
644
645        let ready_cond = status
646            .conditions
647            .iter()
648            .find(|c| c.r#type == "Ready")
649            .unwrap();
650        assert_eq!(ready_cond.status, "True");
651    }
652
653    #[test]
654    fn test_build_ready_status_with_acls() {
655        let topic = create_test_topic_with_acls();
656        let info = TopicInfo {
657            partitions: 6,
658            replication_factor: 3,
659            existed: true,
660            partition_info: vec![],
661        };
662
663        let status = build_ready_status(&topic, info);
664
665        let acl_cond = status
666            .conditions
667            .iter()
668            .find(|c| c.r#type == "ACLsApplied")
669            .unwrap();
670        assert_eq!(acl_cond.status, "True");
671        assert!(acl_cond.message.contains("2 ACL entries"));
672    }
673
674    #[test]
675    fn test_build_pending_status() {
676        let topic = create_test_topic();
677        let status = build_pending_status(&topic, "Waiting for cluster");
678
679        assert_eq!(status.phase, "Pending");
680        assert_eq!(status.message, "Waiting for cluster");
681        assert!(!status.topic_exists);
682        assert_eq!(status.current_partitions, 0);
683    }
684
685    #[test]
686    fn test_build_failed_status() {
687        let topic = create_test_topic();
688        let status = build_failed_status(&topic, "Validation error");
689
690        assert_eq!(status.phase, "Failed");
691        assert!(!status.topic_exists);
692
693        let ready_cond = status
694            .conditions
695            .iter()
696            .find(|c| c.r#type == "Ready")
697            .unwrap();
698        assert_eq!(ready_cond.status, "False");
699        assert_eq!(ready_cond.reason, "ValidationFailed");
700    }
701
702    #[test]
703    fn test_build_error_status_preserves_existing() {
704        let mut topic = create_test_topic();
705        topic.status = Some(RivvenTopicStatus {
706            phase: "Ready".to_string(),
707            message: "Was ready".to_string(),
708            current_partitions: 6,
709            current_replication_factor: 3,
710            topic_exists: true,
711            observed_generation: 1,
712            conditions: vec![],
713            last_sync_time: None,
714            partition_info: vec![PartitionInfo {
715                partition: 0,
716                leader: 0,
717                replicas: vec![0, 1, 2],
718                isr: vec![0, 1, 2],
719            }],
720        });
721
722        let status = build_error_status(&topic, "Sync failed");
723
724        assert_eq!(status.phase, "Error");
725        // Should preserve existing state
726        assert!(status.topic_exists);
727        assert_eq!(status.current_partitions, 6);
728        assert_eq!(status.partition_info.len(), 1);
729    }
730
731    #[test]
732    fn test_partition_info_generation() {
733        let topic = create_test_topic();
734
735        // Simulate partition info generation
736        let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
737            .map(|i| PartitionInfo {
738                partition: i,
739                leader: (i % 3),
740                replicas: (0..topic.spec.replication_factor).collect(),
741                isr: (0..topic.spec.replication_factor).collect(),
742            })
743            .collect();
744
745        assert_eq!(partition_info.len(), 6);
746        assert_eq!(partition_info[0].partition, 0);
747        assert_eq!(partition_info[0].leader, 0);
748        assert_eq!(partition_info[3].leader, 0); // 3 % 3 = 0
749        assert_eq!(partition_info[4].leader, 1); // 4 % 3 = 1
750    }
751
752    #[test]
753    fn test_conditions_structure() {
754        let topic = create_test_topic();
755        let info = TopicInfo {
756            partitions: 6,
757            replication_factor: 3,
758            existed: false,
759            partition_info: vec![],
760        };
761
762        let status = build_ready_status(&topic, info);
763
764        // Should have all 4 condition types
765        let condition_types: Vec<&str> = status
766            .conditions
767            .iter()
768            .map(|c| c.r#type.as_str())
769            .collect();
770
771        assert!(condition_types.contains(&"Ready"));
772        assert!(condition_types.contains(&"Synced"));
773        assert!(condition_types.contains(&"ConfigApplied"));
774        assert!(condition_types.contains(&"ACLsApplied"));
775    }
776
777    #[test]
778    fn test_no_acls_condition() {
779        let topic = create_test_topic(); // No ACLs
780        let info = TopicInfo {
781            partitions: 6,
782            replication_factor: 3,
783            existed: false,
784            partition_info: vec![],
785        };
786
787        let status = build_ready_status(&topic, info);
788
789        let acl_cond = status
790            .conditions
791            .iter()
792            .find(|c| c.r#type == "ACLsApplied")
793            .unwrap();
794        assert_eq!(acl_cond.status, "N/A");
795        assert_eq!(acl_cond.reason, "NoACLs");
796    }
797}