Skip to main content

rivven_operator/
topic_controller.rs

1//! RivvenTopic Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenTopic
4//! custom resources. It watches for changes and reconciles topic state with
5//! the Rivven cluster.
6
7use crate::cluster_client::ClusterClient;
8use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
9use crate::error::{OperatorError, Result};
10use chrono::Utc;
11use futures::StreamExt;
12use kube::api::{Api, Patch, PatchParams};
13use kube::runtime::controller::{Action, Controller};
14use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
15use kube::runtime::watcher::Config;
16use kube::{Client, ResourceExt};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20use validator::Validate;
21
22/// Finalizer name for topic cleanup
23pub const TOPIC_FINALIZER: &str = "rivven.hupe1980.github.io/topic-finalizer";
24
25/// Default requeue interval for successful reconciliations
26const DEFAULT_REQUEUE_SECONDS: u64 = 120; // 2 minutes
27
28/// Requeue interval for error cases
29const ERROR_REQUEUE_SECONDS: u64 = 30;
30
31/// Context passed to the topic controller
32pub struct TopicControllerContext {
33    /// Kubernetes client
34    pub client: Client,
35    /// Rivven cluster client for topic operations
36    pub cluster_client: ClusterClient,
37    /// Metrics recorder
38    pub metrics: Option<TopicControllerMetrics>,
39}
40
41/// Metrics for the topic controller
42#[derive(Clone)]
43pub struct TopicControllerMetrics {
44    /// Counter for reconciliation attempts
45    pub reconciliations: metrics::Counter,
46    /// Counter for reconciliation errors
47    pub errors: metrics::Counter,
48    /// Histogram for reconciliation duration
49    pub duration: metrics::Histogram,
50    /// Gauge for total managed topics
51    pub topics_total: metrics::Gauge,
52}
53
54impl TopicControllerMetrics {
55    /// Create new topic controller metrics
56    pub fn new() -> Self {
57        Self {
58            reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
59            errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
60            duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
61            topics_total: metrics::gauge!("rivven_topic_managed_total"),
62        }
63    }
64}
65
66impl Default for TopicControllerMetrics {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Start the RivvenTopic controller
73pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
74    let topics: Api<RivvenTopic> = match &namespace {
75        Some(ns) => Api::namespaced(client.clone(), ns),
76        None => Api::all(client.clone()),
77    };
78
79    let ctx = Arc::new(TopicControllerContext {
80        client: client.clone(),
81        cluster_client: ClusterClient::new(),
82        metrics: Some(TopicControllerMetrics::new()),
83    });
84
85    info!(
86        namespace = namespace.as_deref().unwrap_or("all"),
87        "Starting RivvenTopic controller"
88    );
89
90    Controller::new(topics.clone(), Config::default())
91        .run(reconcile_topic, topic_error_policy, ctx)
92        .for_each(|result| async move {
93            match result {
94                Ok((obj, action)) => {
95                    debug!(
96                        name = obj.name,
97                        namespace = obj.namespace,
98                        ?action,
99                        "Topic reconciliation completed"
100                    );
101                }
102                Err(e) => {
103                    error!(error = %e, "Topic reconciliation failed");
104                }
105            }
106        })
107        .await;
108
109    Ok(())
110}
111
112/// Main reconciliation function for RivvenTopic
113#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
114async fn reconcile_topic(
115    topic: Arc<RivvenTopic>,
116    ctx: Arc<TopicControllerContext>,
117) -> Result<Action> {
118    let start = std::time::Instant::now();
119
120    if let Some(ref metrics) = ctx.metrics {
121        metrics.reconciliations.increment(1);
122    }
123
124    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
125    let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
126
127    let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
128        match event {
129            FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
130            FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
131        }
132    })
133    .await;
134
135    if let Some(ref metrics) = ctx.metrics {
136        metrics.duration.record(start.elapsed().as_secs_f64());
137    }
138
139    result.map_err(|e| {
140        if let Some(ref metrics) = ctx.metrics {
141            metrics.errors.increment(1);
142        }
143        OperatorError::ReconcileFailed(e.to_string())
144    })
145}
146
147/// Apply (create/update) the topic
148#[instrument(skip(topic, ctx))]
149async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
150    let name = topic.name_any();
151    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
152
153    info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
154
155    // Validate the topic spec
156    if let Err(errors) = topic.spec.validate() {
157        let error_messages: Vec<String> = errors
158            .field_errors()
159            .iter()
160            .flat_map(|(field, errs)| {
161                errs.iter()
162                    .map(move |e| format!("{}: {:?}", field, e.message))
163            })
164            .collect();
165        let error_msg = error_messages.join("; ");
166        warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
167
168        update_topic_status(
169            &ctx.client,
170            &namespace,
171            &name,
172            build_failed_status(&topic, &error_msg),
173        )
174        .await?;
175
176        return Err(OperatorError::InvalidConfig(error_msg));
177    }
178
179    // Verify cluster reference exists
180    let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
181
182    // Get broker endpoints from cluster status
183    let broker_endpoints = get_cluster_endpoints(&cluster);
184
185    if broker_endpoints.is_empty() {
186        warn!(name = %name, "Cluster has no ready broker endpoints");
187        update_topic_status(
188            &ctx.client,
189            &namespace,
190            &name,
191            build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
192        )
193        .await?;
194        return Ok(Action::requeue(Duration::from_secs(30)));
195    }
196
197    // Reconcile the topic with Rivven cluster using the real cluster client
198    let topic_result =
199        reconcile_topic_with_cluster(&topic, &broker_endpoints, &ctx.cluster_client).await;
200
201    match topic_result {
202        Ok(topic_info) => {
203            // Update status to ready
204            let status = build_ready_status(&topic, topic_info);
205            update_topic_status(&ctx.client, &namespace, &name, status).await?;
206            if let Some(ref metrics) = ctx.metrics {
207                metrics.topics_total.increment(1.0);
208            }
209            info!(name = %name, "Topic reconciliation complete");
210            Ok(Action::requeue(Duration::from_secs(
211                DEFAULT_REQUEUE_SECONDS,
212            )))
213        }
214        Err(e) => {
215            // Update status with error
216            let status = build_error_status(&topic, &e.to_string());
217            update_topic_status(&ctx.client, &namespace, &name, status).await?;
218            warn!(name = %name, error = %e, "Topic reconciliation failed");
219            Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
220        }
221    }
222}
223
224/// Verify the referenced RivvenCluster exists and return it
225async fn verify_cluster_ref(
226    client: &Client,
227    namespace: &str,
228    cluster_ref: &ClusterReference,
229) -> Result<crate::crd::RivvenCluster> {
230    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
231    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
232
233    match clusters.get(&cluster_ref.name).await {
234        Ok(cluster) => Ok(cluster),
235        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
236            format!("{}/{}", cluster_ns, cluster_ref.name),
237        )),
238        Err(e) => Err(OperatorError::from(e)),
239    }
240}
241
242/// Get broker endpoints from cluster status
243fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
244    cluster
245        .status
246        .as_ref()
247        .map(|s| s.broker_endpoints.clone())
248        .unwrap_or_default()
249}
250
251/// Information about a reconciled topic
252struct TopicInfo {
253    /// Actual partition count
254    partitions: i32,
255    /// Actual replication factor
256    replication_factor: i32,
257    /// Whether the topic already existed
258    existed: bool,
259    /// Partition info (leaders, replicas, ISR)
260    partition_info: Vec<PartitionInfo>,
261}
262
263/// Reconcile topic state with the Rivven cluster
264///
265/// This connects to the cluster and ensures the topic exists with the correct configuration.
266async fn reconcile_topic_with_cluster(
267    topic: &RivvenTopic,
268    broker_endpoints: &[String],
269    cluster_client: &ClusterClient,
270) -> Result<TopicInfo> {
271    let topic_name = topic.name_any();
272    let spec = &topic.spec;
273
274    info!(
275        topic = %topic_name,
276        partitions = spec.partitions,
277        replication = spec.replication_factor,
278        brokers = ?broker_endpoints,
279        "Reconciling topic with cluster"
280    );
281
282    // Use the real cluster client to ensure the topic exists
283    let partitions_u32 = u32::try_from(spec.partitions).map_err(|_| {
284        OperatorError::ValidationError(format!(
285            "partitions {} is negative or exceeds u32::MAX",
286            spec.partitions
287        ))
288    })?;
289    let cluster_topic_info = cluster_client
290        .ensure_topic(broker_endpoints, &topic_name, partitions_u32)
291        .await?;
292
293    // Build partition info from actual cluster state when available.
294    // report honest unavailable state instead of fabricated topology.
295    // Real partition assignment requires describe_topic API — until then, mark
296    // leader as -1 (no leader) and leave replica/ISR empty.
297    let partition_info: Vec<PartitionInfo> = (0..i32::try_from(cluster_topic_info.partitions)
298        .unwrap_or(i32::MAX))
299        .map(|i| PartitionInfo {
300            partition: i,
301            leader: -1, // Unknown — requires describe_topic API
302            replicas: vec![],
303            isr: vec![],
304        })
305        .collect();
306
307    info!(
308        topic = %topic_name,
309        partitions = cluster_topic_info.partitions,
310        existed = cluster_topic_info.existed,
311        "Topic reconciliation successful"
312    );
313
314    Ok(TopicInfo {
315        partitions: i32::try_from(cluster_topic_info.partitions).unwrap_or(i32::MAX),
316        replication_factor: spec.replication_factor,
317        existed: cluster_topic_info.existed,
318        partition_info,
319    })
320}
321
322/// Build a ready status
323fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
324    let now = Utc::now().to_rfc3339();
325
326    let conditions = vec![
327        TopicCondition {
328            r#type: "Ready".to_string(),
329            status: "True".to_string(),
330            reason: "TopicReady".to_string(),
331            message: "Topic is ready and serving traffic".to_string(),
332            last_transition_time: now.clone(),
333        },
334        TopicCondition {
335            r#type: "Synced".to_string(),
336            status: "True".to_string(),
337            reason: "SyncSucceeded".to_string(),
338            message: if info.existed {
339                "Topic configuration synchronized".to_string()
340            } else {
341                "Topic created successfully".to_string()
342            },
343            last_transition_time: now.clone(),
344        },
345        TopicCondition {
346            r#type: "ConfigApplied".to_string(),
347            status: "True".to_string(),
348            reason: "ConfigApplied".to_string(),
349            message: "Topic configuration applied".to_string(),
350            last_transition_time: now.clone(),
351        },
352        TopicCondition {
353            r#type: "ACLsApplied".to_string(),
354            status: if topic.spec.acls.is_empty() {
355                "N/A".to_string()
356            } else {
357                "True".to_string()
358            },
359            reason: if topic.spec.acls.is_empty() {
360                "NoACLs".to_string()
361            } else {
362                "ACLsApplied".to_string()
363            },
364            message: if topic.spec.acls.is_empty() {
365                "No ACLs configured".to_string()
366            } else {
367                format!("{} ACL entries applied", topic.spec.acls.len())
368            },
369            last_transition_time: now.clone(),
370        },
371    ];
372
373    RivvenTopicStatus {
374        phase: "Ready".to_string(),
375        message: "Topic is ready".to_string(),
376        current_partitions: info.partitions,
377        current_replication_factor: info.replication_factor,
378        topic_exists: true,
379        observed_generation: topic.metadata.generation.unwrap_or(0),
380        conditions,
381        last_sync_time: Some(now),
382        partition_info: info.partition_info,
383    }
384}
385
386/// Build a pending status
387fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
388    let now = Utc::now().to_rfc3339();
389
390    RivvenTopicStatus {
391        phase: "Pending".to_string(),
392        message: message.to_string(),
393        current_partitions: 0,
394        current_replication_factor: 0,
395        topic_exists: false,
396        observed_generation: topic.metadata.generation.unwrap_or(0),
397        conditions: vec![TopicCondition {
398            r#type: "Ready".to_string(),
399            status: "False".to_string(),
400            reason: "Pending".to_string(),
401            message: message.to_string(),
402            last_transition_time: now.clone(),
403        }],
404        last_sync_time: Some(now),
405        partition_info: vec![],
406    }
407}
408
409/// Build a failed status
410fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
411    let now = Utc::now().to_rfc3339();
412
413    RivvenTopicStatus {
414        phase: "Failed".to_string(),
415        message: error_msg.to_string(),
416        current_partitions: 0,
417        current_replication_factor: 0,
418        topic_exists: false,
419        observed_generation: topic.metadata.generation.unwrap_or(0),
420        conditions: vec![TopicCondition {
421            r#type: "Ready".to_string(),
422            status: "False".to_string(),
423            reason: "ValidationFailed".to_string(),
424            message: error_msg.to_string(),
425            last_transition_time: now.clone(),
426        }],
427        last_sync_time: Some(now),
428        partition_info: vec![],
429    }
430}
431
432/// Build an error status (reconciliation failed)
433fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
434    let now = Utc::now().to_rfc3339();
435
436    // Preserve existing status where possible
437    let existing_status = topic.status.clone().unwrap_or_default();
438
439    RivvenTopicStatus {
440        phase: "Error".to_string(),
441        message: error_msg.to_string(),
442        current_partitions: existing_status.current_partitions,
443        current_replication_factor: existing_status.current_replication_factor,
444        topic_exists: existing_status.topic_exists,
445        observed_generation: topic.metadata.generation.unwrap_or(0),
446        conditions: vec![
447            TopicCondition {
448                r#type: "Ready".to_string(),
449                status: if existing_status.topic_exists {
450                    "True".to_string()
451                } else {
452                    "False".to_string()
453                },
454                reason: "ReconcileError".to_string(),
455                message: error_msg.to_string(),
456                last_transition_time: now.clone(),
457            },
458            TopicCondition {
459                r#type: "Synced".to_string(),
460                status: "False".to_string(),
461                reason: "SyncFailed".to_string(),
462                message: error_msg.to_string(),
463                last_transition_time: now.clone(),
464            },
465        ],
466        last_sync_time: Some(now),
467        partition_info: existing_status.partition_info,
468    }
469}
470
471/// Update the topic status subresource
472async fn update_topic_status(
473    client: &Client,
474    namespace: &str,
475    name: &str,
476    status: RivvenTopicStatus,
477) -> Result<()> {
478    let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
479
480    debug!(name = %name, phase = %status.phase, "Updating topic status");
481
482    let patch = serde_json::json!({
483        "status": status
484    });
485
486    let patch_params = PatchParams::default();
487    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
488        .await
489        .map_err(OperatorError::from)?;
490
491    Ok(())
492}
493
494/// Cleanup resources when topic is deleted
495#[instrument(skip(topic, ctx))]
496async fn cleanup_topic(
497    topic: Arc<RivvenTopic>,
498    ctx: Arc<TopicControllerContext>,
499) -> Result<Action> {
500    let name = topic.name_any();
501    let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
502
503    info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
504
505    // Check if we should delete the topic from the cluster
506    if topic.spec.delete_on_remove {
507        info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
508
509        // Verify cluster exists
510        match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
511            Ok(cluster) => {
512                let endpoints = get_cluster_endpoints(&cluster);
513                if !endpoints.is_empty() {
514                    // Delete topic from cluster using real cluster client
515                    if let Err(e) =
516                        delete_topic_from_cluster(&name, &endpoints, &ctx.cluster_client).await
517                    {
518                        warn!(
519                            name = %name,
520                            error = %e,
521                            "Failed to delete topic from cluster (may not exist)"
522                        );
523                    }
524                }
525            }
526            Err(e) => {
527                warn!(
528                    name = %name,
529                    error = %e,
530                    "Cluster not found during cleanup, topic may be orphaned"
531                );
532            }
533        }
534    } else {
535        info!(
536            name = %name,
537            "delete_on_remove is false, topic will remain in cluster"
538        );
539    }
540
541    info!(name = %name, "Topic cleanup complete");
542
543    if let Some(ref metrics) = ctx.metrics {
544        metrics.topics_total.decrement(1.0);
545    }
546
547    Ok(Action::await_change())
548}
549
550/// Delete a topic from the Rivven cluster
551async fn delete_topic_from_cluster(
552    topic_name: &str,
553    broker_endpoints: &[String],
554    cluster_client: &ClusterClient,
555) -> Result<()> {
556    info!(topic = %topic_name, "Deleting topic from cluster");
557    cluster_client
558        .delete_topic(broker_endpoints, topic_name)
559        .await
560}
561
562/// Error policy for the topic controller
563fn topic_error_policy(
564    _topic: Arc<RivvenTopic>,
565    error: &OperatorError,
566    _ctx: Arc<TopicControllerContext>,
567) -> Action {
568    warn!(
569        error = %error,
570        "Topic reconciliation error, will retry"
571    );
572
573    let delay = error
574        .requeue_delay()
575        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
576
577    Action::requeue(delay)
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
584    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
585    use std::collections::BTreeMap;
586
587    fn create_test_topic() -> RivvenTopic {
588        RivvenTopic {
589            metadata: ObjectMeta {
590                name: Some("test-topic".to_string()),
591                namespace: Some("default".to_string()),
592                uid: Some("test-uid".to_string()),
593                generation: Some(1),
594                ..Default::default()
595            },
596            spec: RivvenTopicSpec {
597                cluster_ref: ClusterReference {
598                    name: "test-cluster".to_string(),
599                    namespace: None,
600                },
601                partitions: 6,
602                replication_factor: 3,
603                config: TopicConfig::default(),
604                acls: vec![],
605                delete_on_remove: true,
606                topic_labels: BTreeMap::new(),
607            },
608            status: None,
609        }
610    }
611
612    fn create_test_topic_with_acls() -> RivvenTopic {
613        let mut topic = create_test_topic();
614        topic.spec.acls = vec![
615            TopicAcl {
616                principal: "user:app1".to_string(),
617                operations: vec!["Read".to_string(), "Write".to_string()],
618                permission_type: "Allow".to_string(),
619                host: "*".to_string(),
620            },
621            TopicAcl {
622                principal: "user:analytics".to_string(),
623                operations: vec!["Read".to_string()],
624                permission_type: "Allow".to_string(),
625                host: "*".to_string(),
626            },
627        ];
628        topic
629    }
630
631    #[test]
632    fn test_build_ready_status() {
633        let topic = create_test_topic();
634        let info = TopicInfo {
635            partitions: 6,
636            replication_factor: 3,
637            existed: false,
638            partition_info: vec![PartitionInfo {
639                partition: 0,
640                leader: 0,
641                replicas: vec![0, 1, 2],
642                isr: vec![0, 1, 2],
643            }],
644        };
645
646        let status = build_ready_status(&topic, info);
647
648        assert_eq!(status.phase, "Ready");
649        assert_eq!(status.current_partitions, 6);
650        assert_eq!(status.current_replication_factor, 3);
651        assert!(status.topic_exists);
652        assert_eq!(status.conditions.len(), 4);
653
654        let ready_cond = status
655            .conditions
656            .iter()
657            .find(|c| c.r#type == "Ready")
658            .unwrap();
659        assert_eq!(ready_cond.status, "True");
660    }
661
662    #[test]
663    fn test_build_ready_status_with_acls() {
664        let topic = create_test_topic_with_acls();
665        let info = TopicInfo {
666            partitions: 6,
667            replication_factor: 3,
668            existed: true,
669            partition_info: vec![],
670        };
671
672        let status = build_ready_status(&topic, info);
673
674        let acl_cond = status
675            .conditions
676            .iter()
677            .find(|c| c.r#type == "ACLsApplied")
678            .unwrap();
679        assert_eq!(acl_cond.status, "True");
680        assert!(acl_cond.message.contains("2 ACL entries"));
681    }
682
683    #[test]
684    fn test_build_pending_status() {
685        let topic = create_test_topic();
686        let status = build_pending_status(&topic, "Waiting for cluster");
687
688        assert_eq!(status.phase, "Pending");
689        assert_eq!(status.message, "Waiting for cluster");
690        assert!(!status.topic_exists);
691        assert_eq!(status.current_partitions, 0);
692    }
693
694    #[test]
695    fn test_build_failed_status() {
696        let topic = create_test_topic();
697        let status = build_failed_status(&topic, "Validation error");
698
699        assert_eq!(status.phase, "Failed");
700        assert!(!status.topic_exists);
701
702        let ready_cond = status
703            .conditions
704            .iter()
705            .find(|c| c.r#type == "Ready")
706            .unwrap();
707        assert_eq!(ready_cond.status, "False");
708        assert_eq!(ready_cond.reason, "ValidationFailed");
709    }
710
711    #[test]
712    fn test_build_error_status_preserves_existing() {
713        let mut topic = create_test_topic();
714        topic.status = Some(RivvenTopicStatus {
715            phase: "Ready".to_string(),
716            message: "Was ready".to_string(),
717            current_partitions: 6,
718            current_replication_factor: 3,
719            topic_exists: true,
720            observed_generation: 1,
721            conditions: vec![],
722            last_sync_time: None,
723            partition_info: vec![PartitionInfo {
724                partition: 0,
725                leader: 0,
726                replicas: vec![0, 1, 2],
727                isr: vec![0, 1, 2],
728            }],
729        });
730
731        let status = build_error_status(&topic, "Sync failed");
732
733        assert_eq!(status.phase, "Error");
734        // Should preserve existing state
735        assert!(status.topic_exists);
736        assert_eq!(status.current_partitions, 6);
737        assert_eq!(status.partition_info.len(), 1);
738    }
739
740    #[test]
741    fn test_partition_info_generation() {
742        let topic = create_test_topic();
743
744        // Simulate partition info generation
745        let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
746            .map(|i| PartitionInfo {
747                partition: i,
748                leader: (i % 3),
749                replicas: (0..topic.spec.replication_factor).collect(),
750                isr: (0..topic.spec.replication_factor).collect(),
751            })
752            .collect();
753
754        assert_eq!(partition_info.len(), 6);
755        assert_eq!(partition_info[0].partition, 0);
756        assert_eq!(partition_info[0].leader, 0);
757        assert_eq!(partition_info[3].leader, 0); // 3 % 3 = 0
758        assert_eq!(partition_info[4].leader, 1); // 4 % 3 = 1
759    }
760
761    #[test]
762    fn test_conditions_structure() {
763        let topic = create_test_topic();
764        let info = TopicInfo {
765            partitions: 6,
766            replication_factor: 3,
767            existed: false,
768            partition_info: vec![],
769        };
770
771        let status = build_ready_status(&topic, info);
772
773        // Should have all 4 condition types
774        let condition_types: Vec<&str> = status
775            .conditions
776            .iter()
777            .map(|c| c.r#type.as_str())
778            .collect();
779
780        assert!(condition_types.contains(&"Ready"));
781        assert!(condition_types.contains(&"Synced"));
782        assert!(condition_types.contains(&"ConfigApplied"));
783        assert!(condition_types.contains(&"ACLsApplied"));
784    }
785
786    #[test]
787    fn test_no_acls_condition() {
788        let topic = create_test_topic(); // No ACLs
789        let info = TopicInfo {
790            partitions: 6,
791            replication_factor: 3,
792            existed: false,
793            partition_info: vec![],
794        };
795
796        let status = build_ready_status(&topic, info);
797
798        let acl_cond = status
799            .conditions
800            .iter()
801            .find(|c| c.r#type == "ACLsApplied")
802            .unwrap();
803        assert_eq!(acl_cond.status, "N/A");
804        assert_eq!(acl_cond.reason, "NoACLs");
805    }
806}