Skip to main content

rivven_operator/
controller.rs

1//! RivvenCluster Controller
2//!
3//! This module implements the Kubernetes controller pattern for managing
4//! RivvenCluster custom resources. It watches for changes and reconciles
5//! the actual cluster state to match the desired specification.
6
7use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, PersistentVolumeClaim, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, Resource, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25/// Finalizer name for cleanup operations
26pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28/// Default requeue interval for successful reconciliations
29const DEFAULT_REQUEUE_SECONDS: u64 = 300; // 5 minutes
30
31/// Requeue interval for error cases (base for exponential backoff)
32const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34/// Maximum requeue delay for error backoff
35const MAX_ERROR_REQUEUE_SECONDS: u64 = 600;
36
37/// Context passed to the controller
38pub struct ControllerContext {
39    /// Kubernetes client
40    pub client: Client,
41    /// Metrics recorder (optional)
42    pub metrics: Option<ControllerMetrics>,
43    /// Per-cluster error retry counts for exponential backoff
44    pub error_counts: dashmap::DashMap<String, u32>,
45}
46
47/// Metrics for the controller
48#[derive(Clone)]
49pub struct ControllerMetrics {
50    /// Counter for reconciliation attempts
51    pub reconciliations: metrics::Counter,
52    /// Counter for reconciliation errors
53    pub errors: metrics::Counter,
54    /// Histogram for reconciliation duration
55    pub duration: metrics::Histogram,
56}
57
58impl ControllerMetrics {
59    /// Create new controller metrics
60    pub fn new() -> Self {
61        Self {
62            reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
63            errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
64            duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
65        }
66    }
67}
68
69impl Default for ControllerMetrics {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75/// Start the RivvenCluster controller
76pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
77    let clusters: Api<RivvenCluster> = match &namespace {
78        Some(ns) => Api::namespaced(client.clone(), ns),
79        None => Api::all(client.clone()),
80    };
81
82    let ctx = Arc::new(ControllerContext {
83        client: client.clone(),
84        metrics: Some(ControllerMetrics::new()),
85        error_counts: dashmap::DashMap::new(),
86    });
87
88    info!(
89        namespace = namespace.as_deref().unwrap_or("all"),
90        "Starting RivvenCluster controller"
91    );
92
93    // Watch related resources for changes
94    let statefulsets = match &namespace {
95        Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
96        None => Api::<StatefulSet>::all(client.clone()),
97    };
98
99    let services = match &namespace {
100        Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
101        None => Api::<Service>::all(client.clone()),
102    };
103
104    Controller::new(clusters.clone(), Config::default())
105        .owns(statefulsets, Config::default())
106        .owns(services, Config::default())
107        .run(reconcile, error_policy, ctx)
108        .for_each(|result| async move {
109            match result {
110                Ok((obj, action)) => {
111                    debug!(
112                        name = obj.name,
113                        namespace = obj.namespace,
114                        ?action,
115                        "Reconciliation completed"
116                    );
117                }
118                Err(e) => {
119                    error!(error = %e, "Reconciliation failed");
120                }
121            }
122        })
123        .await;
124
125    Ok(())
126}
127
128/// Main reconciliation function
129#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
130async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
131    let start = std::time::Instant::now();
132
133    if let Some(ref metrics) = ctx.metrics {
134        metrics.reconciliations.increment(1);
135    }
136
137    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
138    let cluster_name = cluster.name_any();
139    let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
140
141    let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
142        match event {
143            FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
144            FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
145        }
146    })
147    .await;
148
149    if let Some(ref metrics) = ctx.metrics {
150        metrics.duration.record(start.elapsed().as_secs_f64());
151    }
152
153    // Reset error backoff counter on success
154    if result.is_ok() {
155        ctx.error_counts.remove(&cluster_name);
156    }
157
158    result.map_err(|e| {
159        if let Some(ref metrics) = ctx.metrics {
160            metrics.errors.increment(1);
161        }
162        OperatorError::ReconcileFailed(e.to_string())
163    })
164}
165
166/// Apply (create/update) the cluster resources
167#[instrument(skip(cluster, ctx))]
168async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
169    let name = cluster.name_any();
170    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
171
172    info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
173
174    // Validate the cluster spec before reconciliation
175    if let Err(errors) = cluster.spec.validate() {
176        let error_messages: Vec<String> = errors
177            .field_errors()
178            .iter()
179            .flat_map(|(field, errs)| {
180                errs.iter()
181                    .map(move |e| format!("{}: {:?}", field, e.message))
182            })
183            .collect();
184        let error_msg = error_messages.join("; ");
185        warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
186        return Err(OperatorError::InvalidConfig(error_msg));
187    }
188
189    // Additional security validations
190    validate_cluster_security(&cluster)?;
191
192    // Build all resources
193    let builder = ResourceBuilder::new(&cluster)?;
194
195    // Apply ConfigMap
196    let configmap = builder.build_configmap()?;
197    apply_configmap(&ctx.client, &namespace, configmap).await?;
198
199    // Apply headless service
200    let headless_svc = builder.build_headless_service();
201    apply_service(&ctx.client, &namespace, headless_svc).await?;
202
203    // Apply client service
204    let client_svc = builder.build_client_service();
205    apply_service(&ctx.client, &namespace, client_svc).await?;
206
207    // Apply StatefulSet
208    let statefulset = builder.build_statefulset();
209    let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
210
211    // Apply PDB if enabled
212    if let Some(pdb) = builder.build_pdb() {
213        apply_pdb(&ctx.client, &namespace, pdb).await?;
214    }
215
216    // Update cluster status
217    let status = build_status(&cluster, sts_status);
218    update_status(&ctx.client, &namespace, &name, status).await?;
219
220    info!(name = %name, "Reconciliation complete");
221
222    Ok(Action::requeue(Duration::from_secs(
223        DEFAULT_REQUEUE_SECONDS,
224    )))
225}
226
227/// Validate cluster for security best practices
228fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
229    let spec = &cluster.spec;
230
231    // Validate TLS configuration consistency
232    if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
233        return Err(OperatorError::InvalidConfig(
234            "TLS is enabled but no certificate secret is specified".to_string(),
235        ));
236    }
237
238    if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
239        return Err(OperatorError::InvalidConfig(
240            "mTLS is enabled but no CA secret is specified".to_string(),
241        ));
242    }
243
244    // Validate replication factor vs replicas
245    if spec.config.default_replication_factor > spec.replicas {
246        return Err(OperatorError::InvalidConfig(format!(
247            "Replication factor ({}) cannot exceed replica count ({})",
248            spec.config.default_replication_factor, spec.replicas
249        )));
250    }
251
252    // Warn if running with single replica in production
253    if spec.replicas == 1 {
254        warn!(
255            cluster = cluster.name_any(),
256            "Running with single replica - not recommended for production"
257        );
258    }
259
260    // Validate Raft timing constraints (heartbeat should be < election timeout / 3)
261    let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
262    if spec.config.raft_election_timeout_ms < min_election_timeout {
263        return Err(OperatorError::InvalidConfig(format!(
264            "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
265            spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
266        )));
267    }
268
269    Ok(())
270}
271
272/// Cleanup resources when cluster is deleted
273#[instrument(skip(cluster, ctx))]
274async fn cleanup_cluster(
275    cluster: Arc<RivvenCluster>,
276    ctx: Arc<ControllerContext>,
277) -> Result<Action> {
278    let name = cluster.name_any();
279    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
280
281    info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
282
283    // Resources with owner references (StatefulSet, Service, ConfigMap, PDB)
284    // are garbage-collected automatically by Kubernetes when the CR is deleted.
285    // PVCs created by StatefulSet volumeClaimTemplates do NOT get owner refs
286    // and must be deleted explicitly.
287
288    let pvcs: Api<PersistentVolumeClaim> = Api::namespaced(ctx.client.clone(), &namespace);
289    let lp = ListParams::default().labels(&format!("app.kubernetes.io/instance={}", name));
290
291    match pvcs.list(&lp).await {
292        Ok(pvc_list) => {
293            for pvc in pvc_list.items {
294                if let Some(pvc_name) = pvc.metadata.name.as_deref() {
295                    info!(name = %name, pvc = %pvc_name, "Deleting PVC");
296                    if let Err(e) = pvcs.delete(pvc_name, &DeleteParams::default()).await {
297                        warn!(
298                            name = %name,
299                            pvc = %pvc_name,
300                            error = %e,
301                            "Failed to delete PVC (may have already been removed)"
302                        );
303                    }
304                }
305            }
306        }
307        Err(e) => {
308            warn!(
309                name = %name,
310                error = %e,
311                "Failed to list PVCs for cleanup"
312            );
313        }
314    }
315
316    info!(name = %name, "Cleanup complete");
317
318    Ok(Action::await_change())
319}
320
321/// Verify the operator still owns a resource before force-applying.
322///
323/// Checks whether an existing resource was created by the rivven-operator by
324/// inspecting the `app.kubernetes.io/managed-by` label. If the resource
325/// exists and is managed by a different controller (e.g. Helm, another
326/// operator), the apply is rejected to prevent silently hijacking ownership.
327/// If the resource does not yet exist (404), ownership is trivially valid.
328fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
329    let labels = existing.meta().labels.as_ref();
330    let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
331    match managed_by {
332        Some(manager) if manager != "rivven-operator" => {
333            let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
334            Err(OperatorError::InvalidConfig(format!(
335                "resource '{}' is managed by '{}', not rivven-operator; \
336                 refusing to force-apply to avoid ownership conflict",
337                name, manager
338            )))
339        }
340        _ => Ok(()),
341    }
342}
343
344/// Apply a ConfigMap using server-side apply.
345///
346/// # Force-apply and field ownership
347///
348/// Uses `PatchParams::apply("rivven-operator").force()` which takes
349/// ownership of **all** fields in the patch, even if they were previously
350/// managed by another field manager (e.g. kubectl, Helm, or another
351/// controller). Before force-applying, verifies the operator owns the
352/// resource via the `app.kubernetes.io/managed-by` label to prevent
353/// silently hijacking resources managed by other controllers.
354async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
355    let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
356    let name =
357        cm.metadata.name.as_ref().ok_or_else(|| {
358            OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
359        })?;
360
361    debug!(name = %name, "Applying ConfigMap");
362
363    // Verify ownership before force-applying
364    if let Ok(existing) = api.get(name).await {
365        verify_ownership(&existing)?;
366    }
367
368    let patch_params = PatchParams::apply("rivven-operator").force();
369    api.patch(name, &patch_params, &Patch::Apply(&cm))
370        .await
371        .map_err(OperatorError::from)?;
372
373    Ok(())
374}
375
376/// Apply a Service using server-side apply
377async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
378    let api: Api<Service> = Api::namespaced(client.clone(), namespace);
379    let name = svc
380        .metadata
381        .name
382        .as_ref()
383        .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
384
385    debug!(name = %name, "Applying Service");
386
387    // Verify ownership before force-applying
388    if let Ok(existing) = api.get(name).await {
389        verify_ownership(&existing)?;
390    }
391
392    let patch_params = PatchParams::apply("rivven-operator").force();
393    api.patch(name, &patch_params, &Patch::Apply(&svc))
394        .await
395        .map_err(OperatorError::from)?;
396
397    Ok(())
398}
399
400/// Apply a StatefulSet using server-side apply
401async fn apply_statefulset(
402    client: &Client,
403    namespace: &str,
404    sts: StatefulSet,
405) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
406    let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
407    let name =
408        sts.metadata.name.as_ref().ok_or_else(|| {
409            OperatorError::InvalidConfig("StatefulSet missing metadata.name".into())
410        })?;
411
412    debug!(name = %name, "Applying StatefulSet");
413
414    // Verify ownership before force-applying
415    if let Ok(existing) = api.get(name).await {
416        verify_ownership(&existing)?;
417    }
418
419    let patch_params = PatchParams::apply("rivven-operator").force();
420    let result = api
421        .patch(name, &patch_params, &Patch::Apply(&sts))
422        .await
423        .map_err(OperatorError::from)?;
424
425    Ok(result.status)
426}
427
428/// Apply a PodDisruptionBudget using server-side apply
429async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
430    let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
431    let name = pdb
432        .metadata
433        .name
434        .as_ref()
435        .ok_or_else(|| OperatorError::InvalidConfig("PDB missing metadata.name".into()))?;
436
437    debug!(name = %name, "Applying PodDisruptionBudget");
438
439    // Verify ownership before force-applying
440    if let Ok(existing) = api.get(name).await {
441        verify_ownership(&existing)?;
442    }
443
444    let patch_params = PatchParams::apply("rivven-operator").force();
445    api.patch(name, &patch_params, &Patch::Apply(&pdb))
446        .await
447        .map_err(OperatorError::from)?;
448
449    Ok(())
450}
451
452/// Build cluster status from StatefulSet status
453fn build_status(
454    cluster: &RivvenCluster,
455    sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
456) -> RivvenClusterStatus {
457    let now = Utc::now().to_rfc3339();
458
459    let (replicas, ready_replicas, updated_replicas) = sts_status
460        .map(|s| {
461            (
462                s.replicas,
463                s.ready_replicas.unwrap_or(0),
464                s.updated_replicas.unwrap_or(0),
465            )
466        })
467        .unwrap_or((0, 0, 0));
468
469    let desired_replicas = cluster.spec.replicas;
470
471    // Determine phase based on state
472    let phase = if ready_replicas == 0 {
473        ClusterPhase::Provisioning
474    } else if ready_replicas < desired_replicas {
475        if updated_replicas < desired_replicas {
476            ClusterPhase::Updating
477        } else {
478            ClusterPhase::Degraded
479        }
480    } else if ready_replicas == desired_replicas {
481        ClusterPhase::Running
482    } else {
483        ClusterPhase::Degraded
484    };
485
486    // Build conditions
487    let mut conditions = vec![];
488
489    // Ready condition
490    conditions.push(ClusterCondition {
491        condition_type: "Ready".to_string(),
492        status: if ready_replicas >= desired_replicas {
493            "True".to_string()
494        } else {
495            "False".to_string()
496        },
497        reason: Some(format!(
498            "{}/{} replicas ready",
499            ready_replicas, desired_replicas
500        )),
501        message: None,
502        last_transition_time: Some(now.clone()),
503    });
504
505    // Available condition
506    conditions.push(ClusterCondition {
507        condition_type: "Available".to_string(),
508        status: if ready_replicas > 0 {
509            "True".to_string()
510        } else {
511            "False".to_string()
512        },
513        reason: Some(
514            if ready_replicas > 0 {
515                "AtLeastOneReplicaReady"
516            } else {
517                "NoReplicasReady"
518            }
519            .to_string(),
520        ),
521        message: None,
522        last_transition_time: Some(now.clone()),
523    });
524
525    // Build broker endpoints
526    let name = cluster
527        .metadata
528        .name
529        .as_ref()
530        .map(|n| format!("rivven-{}", n));
531    let namespace = cluster
532        .metadata
533        .namespace
534        .as_ref()
535        .cloned()
536        .unwrap_or_else(|| "default".to_string());
537
538    let broker_endpoints: Vec<String> = (0..ready_replicas)
539        .map(|i| {
540            format!(
541                "{}-{}.{}-headless.{}.svc.cluster.local:9092",
542                name.as_deref().unwrap_or("rivven"),
543                i,
544                name.as_deref().unwrap_or("rivven"),
545                namespace
546            )
547        })
548        .collect();
549
550    RivvenClusterStatus {
551        phase,
552        replicas,
553        ready_replicas,
554        updated_replicas,
555        observed_generation: cluster.metadata.generation.unwrap_or(0),
556        conditions,
557        broker_endpoints,
558        leader: None, // Would need to query the cluster to determine leader
559        last_updated: Some(now),
560        message: None,
561    }
562}
563
564/// Update the cluster status subresource
565async fn update_status(
566    client: &Client,
567    namespace: &str,
568    name: &str,
569    status: RivvenClusterStatus,
570) -> Result<()> {
571    let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
572
573    debug!(name = %name, phase = ?status.phase, "Updating cluster status");
574
575    let patch = serde_json::json!({
576        "status": status
577    });
578
579    let patch_params = PatchParams::default();
580    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
581        .await
582        .map_err(OperatorError::from)?;
583
584    Ok(())
585}
586
587/// Error policy for the controller — exponential backoff with jitter.
588fn error_policy(
589    cluster: Arc<RivvenCluster>,
590    error: &OperatorError,
591    ctx: Arc<ControllerContext>,
592) -> Action {
593    let key = cluster.name_any();
594    let retries = {
595        let mut entry = ctx.error_counts.entry(key.clone()).or_insert(0);
596        *entry += 1;
597        *entry
598    };
599
600    // Use the error's suggested delay OR exponential backoff:
601    // 30s → 60s → 120s → 240s → 480s → 600s (capped)
602    let delay = error.requeue_delay().unwrap_or_else(|| {
603        let base = Duration::from_secs(ERROR_REQUEUE_SECONDS);
604        let backoff = base * 2u32.saturating_pow((retries - 1).min(5));
605        backoff.min(Duration::from_secs(MAX_ERROR_REQUEUE_SECONDS))
606    });
607
608    warn!(
609        error = %error,
610        retry = retries,
611        delay_secs = delay.as_secs(),
612        "Reconciliation error for '{}', will retry",
613        key
614    );
615
616    Action::requeue(delay)
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    use crate::crd::{
623        BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
624    };
625    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
626    use std::collections::BTreeMap;
627
628    fn create_test_cluster() -> RivvenCluster {
629        RivvenCluster {
630            metadata: ObjectMeta {
631                name: Some("test-cluster".to_string()),
632                namespace: Some("default".to_string()),
633                uid: Some("test-uid".to_string()),
634                generation: Some(1),
635                ..Default::default()
636            },
637            spec: RivvenClusterSpec {
638                replicas: 3,
639                version: "0.0.1".to_string(),
640                image: None,
641                image_pull_policy: "IfNotPresent".to_string(),
642                image_pull_secrets: vec![],
643                storage: StorageSpec::default(),
644                resources: None,
645                config: BrokerConfig::default(),
646                tls: TlsSpec::default(),
647                metrics: MetricsSpec::default(),
648                affinity: None,
649                node_selector: BTreeMap::new(),
650                tolerations: vec![],
651                pod_disruption_budget: PdbSpec::default(),
652                service_account: None,
653                pod_annotations: BTreeMap::new(),
654                pod_labels: BTreeMap::new(),
655                env: vec![],
656                liveness_probe: ProbeSpec::default(),
657                readiness_probe: ProbeSpec::default(),
658                security_context: None,
659                container_security_context: None,
660            },
661            status: None,
662        }
663    }
664
665    #[test]
666    fn test_build_status_provisioning() {
667        let cluster = create_test_cluster();
668        let status = build_status(&cluster, None);
669
670        assert_eq!(status.phase, ClusterPhase::Provisioning);
671        assert_eq!(status.replicas, 0);
672        assert_eq!(status.ready_replicas, 0);
673    }
674
675    #[test]
676    fn test_build_status_running() {
677        let cluster = create_test_cluster();
678        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
679            replicas: 3,
680            ready_replicas: Some(3),
681            updated_replicas: Some(3),
682            ..Default::default()
683        };
684
685        let status = build_status(&cluster, Some(sts_status));
686
687        assert_eq!(status.phase, ClusterPhase::Running);
688        assert_eq!(status.replicas, 3);
689        assert_eq!(status.ready_replicas, 3);
690    }
691
692    #[test]
693    fn test_build_status_degraded() {
694        let cluster = create_test_cluster();
695        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
696            replicas: 3,
697            ready_replicas: Some(2),
698            updated_replicas: Some(3),
699            ..Default::default()
700        };
701
702        let status = build_status(&cluster, Some(sts_status));
703
704        assert_eq!(status.phase, ClusterPhase::Degraded);
705    }
706
707    #[test]
708    fn test_build_status_updating() {
709        let cluster = create_test_cluster();
710        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
711            replicas: 3,
712            ready_replicas: Some(2),
713            updated_replicas: Some(1),
714            ..Default::default()
715        };
716
717        let status = build_status(&cluster, Some(sts_status));
718
719        assert_eq!(status.phase, ClusterPhase::Updating);
720    }
721
722    #[test]
723    fn test_broker_endpoints() {
724        let cluster = create_test_cluster();
725        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
726            replicas: 3,
727            ready_replicas: Some(3),
728            updated_replicas: Some(3),
729            ..Default::default()
730        };
731
732        let status = build_status(&cluster, Some(sts_status));
733
734        assert_eq!(status.broker_endpoints.len(), 3);
735        assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
736    }
737
738    #[test]
739    fn test_conditions() {
740        let cluster = create_test_cluster();
741        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
742            replicas: 3,
743            ready_replicas: Some(3),
744            updated_replicas: Some(3),
745            ..Default::default()
746        };
747
748        let status = build_status(&cluster, Some(sts_status));
749
750        assert_eq!(status.conditions.len(), 2);
751
752        let ready_cond = status
753            .conditions
754            .iter()
755            .find(|c| c.condition_type == "Ready")
756            .unwrap();
757        assert_eq!(ready_cond.status, "True");
758
759        let available_cond = status
760            .conditions
761            .iter()
762            .find(|c| c.condition_type == "Available")
763            .unwrap();
764        assert_eq!(available_cond.status, "True");
765    }
766}