Skip to main content

rivven_operator/
resources.rs

1//! Kubernetes Resource Builders
2//!
3//! This module generates Kubernetes manifests (StatefulSet, Service, ConfigMap, etc.)
4//! from RivvenCluster specifications.
5
6use crate::crd::{RivvenCluster, RivvenClusterSpec};
7use crate::error::{OperatorError, Result};
8use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
9use k8s_openapi::api::core::v1::{
10    ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
11    PersistentVolumeClaim, PersistentVolumeClaimSpec, PodSpec, PodTemplateSpec, Probe, Service,
12    ServicePort, ServiceSpec, VolumeMount,
13};
14use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
15use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
17use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
18use std::collections::BTreeMap;
19
20/// Builder for generating Kubernetes resources from a RivvenCluster
21pub struct ResourceBuilder<'a> {
22    cluster: &'a RivvenCluster,
23    name: String,
24    namespace: String,
25}
26
27impl<'a> ResourceBuilder<'a> {
28    /// Create a new resource builder
29    pub fn new(cluster: &'a RivvenCluster) -> Result<Self> {
30        let name =
31            cluster.metadata.name.clone().ok_or_else(|| {
32                OperatorError::InvalidConfig("cluster name is required".to_string())
33            })?;
34
35        let namespace = cluster
36            .metadata
37            .namespace
38            .clone()
39            .unwrap_or_else(|| "default".to_string());
40
41        Ok(Self {
42            cluster,
43            name,
44            namespace,
45        })
46    }
47
48    /// Get the resource name prefix
49    fn resource_name(&self) -> String {
50        format!("rivven-{}", self.name)
51    }
52
53    /// Get owner reference for managed resources
54    fn owner_reference(&self) -> OwnerReference {
55        OwnerReference {
56            api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
57            kind: "RivvenCluster".to_string(),
58            name: self.name.clone(),
59            uid: self.cluster.metadata.uid.clone().unwrap_or_default(),
60            controller: Some(true),
61            block_owner_deletion: Some(true),
62        }
63    }
64
65    /// Build the StatefulSet for the Rivven cluster
66    pub fn build_statefulset(&self) -> StatefulSet {
67        let spec = &self.cluster.spec;
68        let name = self.resource_name();
69        let labels = spec.get_labels(&self.name);
70        let selector_labels = spec.get_selector_labels(&self.name);
71
72        // Build container
73        let container = self.build_container(spec);
74
75        // Build PVC template
76        let pvc_template = self.build_pvc_template(spec);
77
78        // Build pod template
79        let mut pod_labels = selector_labels.clone();
80        pod_labels.extend(spec.pod_labels.clone());
81
82        let mut pod_annotations = BTreeMap::new();
83        if spec.metrics.enabled {
84            pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
85            pod_annotations.insert(
86                "prometheus.io/port".to_string(),
87                spec.metrics.port.to_string(),
88            );
89        }
90        pod_annotations.extend(spec.pod_annotations.clone());
91
92        // Apply secure defaults for pod security context if not specified
93        let pod_security_context = spec.security_context.clone().or_else(|| {
94            Some(k8s_openapi::api::core::v1::PodSecurityContext {
95                // Run as non-root by default
96                run_as_non_root: Some(true),
97                // Use rivven user (UID 1000)
98                run_as_user: Some(1000),
99                run_as_group: Some(1000),
100                fs_group: Some(1000),
101                // Use RuntimeDefault seccomp profile
102                seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
103                    type_: "RuntimeDefault".to_string(),
104                    ..Default::default()
105                }),
106                ..Default::default()
107            })
108        });
109
110        let pod_spec = PodSpec {
111            containers: vec![container],
112            affinity: spec.affinity.clone(),
113            security_context: pod_security_context,
114            node_selector: if spec.node_selector.is_empty() {
115                None
116            } else {
117                Some(spec.node_selector.clone())
118            },
119            tolerations: if spec.tolerations.is_empty() {
120                None
121            } else {
122                Some(spec.tolerations.clone())
123            },
124            service_account_name: spec.service_account.clone(),
125            image_pull_secrets: if spec.image_pull_secrets.is_empty() {
126                None
127            } else {
128                Some(
129                    spec.image_pull_secrets
130                        .iter()
131                        .map(|s| k8s_openapi::api::core::v1::LocalObjectReference {
132                            name: s.clone(),
133                        })
134                        .collect(),
135                )
136            },
137            // Disable service account token auto-mounting for security
138            automount_service_account_token: Some(false),
139            ..Default::default()
140        };
141
142        StatefulSet {
143            metadata: ObjectMeta {
144                name: Some(name.clone()),
145                namespace: Some(self.namespace.clone()),
146                labels: Some(labels.clone()),
147                owner_references: Some(vec![self.owner_reference()]),
148                ..Default::default()
149            },
150            spec: Some(StatefulSetSpec {
151                service_name: format!("{}-headless", name),
152                replicas: Some(spec.replicas),
153                selector: LabelSelector {
154                    match_labels: Some(selector_labels),
155                    ..Default::default()
156                },
157                template: PodTemplateSpec {
158                    metadata: Some(ObjectMeta {
159                        labels: Some(pod_labels),
160                        annotations: Some(pod_annotations),
161                        ..Default::default()
162                    }),
163                    spec: Some(pod_spec),
164                },
165                volume_claim_templates: Some(vec![pvc_template]),
166                pod_management_policy: Some("Parallel".to_string()),
167                update_strategy: Some(k8s_openapi::api::apps::v1::StatefulSetUpdateStrategy {
168                    type_: Some("RollingUpdate".to_string()),
169                    rolling_update: Some(
170                        k8s_openapi::api::apps::v1::RollingUpdateStatefulSetStrategy {
171                            max_unavailable: Some(IntOrString::Int(1)),
172                            partition: Some(0),
173                        },
174                    ),
175                }),
176                ..Default::default()
177            }),
178            ..Default::default()
179        }
180    }
181
182    /// Build the main container for the broker pod
183    fn build_container(&self, spec: &RivvenClusterSpec) -> Container {
184        let name = self.resource_name();
185
186        // Build environment variables
187        let mut env = vec![
188            EnvVar {
189                name: "RIVVEN_DATA_DIR".to_string(),
190                value: Some("/data".to_string()),
191                ..Default::default()
192            },
193            EnvVar {
194                name: "RIVVEN_BIND_ADDRESS".to_string(),
195                value: Some("0.0.0.0".to_string()),
196                ..Default::default()
197            },
198            EnvVar {
199                name: "RIVVEN_PORT".to_string(),
200                value: Some("9092".to_string()),
201                ..Default::default()
202            },
203            EnvVar {
204                name: "RIVVEN_CLUSTER_NAME".to_string(),
205                value: Some(self.name.clone()),
206                ..Default::default()
207            },
208            EnvVar {
209                name: "RIVVEN_POD_NAME".to_string(),
210                value_from: Some(EnvVarSource {
211                    field_ref: Some(ObjectFieldSelector {
212                        field_path: "metadata.name".to_string(),
213                        ..Default::default()
214                    }),
215                    ..Default::default()
216                }),
217                ..Default::default()
218            },
219            EnvVar {
220                name: "RIVVEN_POD_NAMESPACE".to_string(),
221                value_from: Some(EnvVarSource {
222                    field_ref: Some(ObjectFieldSelector {
223                        field_path: "metadata.namespace".to_string(),
224                        ..Default::default()
225                    }),
226                    ..Default::default()
227                }),
228                ..Default::default()
229            },
230            EnvVar {
231                name: "RIVVEN_SERVICE_NAME".to_string(),
232                value: Some(format!("{}-headless", name)),
233                ..Default::default()
234            },
235            // Broker config
236            EnvVar {
237                name: "RIVVEN_DEFAULT_PARTITIONS".to_string(),
238                value: Some(spec.config.default_partitions.to_string()),
239                ..Default::default()
240            },
241            EnvVar {
242                name: "RIVVEN_DEFAULT_REPLICATION_FACTOR".to_string(),
243                value: Some(spec.config.default_replication_factor.to_string()),
244                ..Default::default()
245            },
246            EnvVar {
247                name: "RIVVEN_COMPRESSION".to_string(),
248                value: Some(spec.config.compression_type.clone()),
249                ..Default::default()
250            },
251        ];
252
253        // Add metrics config
254        if spec.metrics.enabled {
255            env.push(EnvVar {
256                name: "RIVVEN_METRICS_ENABLED".to_string(),
257                value: Some("true".to_string()),
258                ..Default::default()
259            });
260            env.push(EnvVar {
261                name: "RIVVEN_METRICS_PORT".to_string(),
262                value: Some(spec.metrics.port.to_string()),
263                ..Default::default()
264            });
265        }
266
267        // Add TLS config
268        if spec.tls.enabled {
269            env.push(EnvVar {
270                name: "RIVVEN_TLS_ENABLED".to_string(),
271                value: Some("true".to_string()),
272                ..Default::default()
273            });
274            if let Some(ref cert_secret) = spec.tls.cert_secret_name {
275                env.push(EnvVar {
276                    name: "RIVVEN_TLS_CERT_PATH".to_string(),
277                    value: Some("/tls/tls.crt".to_string()),
278                    ..Default::default()
279                });
280                env.push(EnvVar {
281                    name: "RIVVEN_TLS_KEY_PATH".to_string(),
282                    value: Some("/tls/tls.key".to_string()),
283                    ..Default::default()
284                });
285                // Note: Volume mount for TLS secret would be added here
286                let _ = cert_secret; // Suppress unused warning
287            }
288        }
289
290        // Add user-defined env vars
291        env.extend(spec.env.clone());
292
293        // Build ports
294        let mut ports = vec![ContainerPort {
295            name: Some("broker".to_string()),
296            container_port: 9092,
297            protocol: Some("TCP".to_string()),
298            ..Default::default()
299        }];
300
301        if spec.metrics.enabled {
302            ports.push(ContainerPort {
303                name: Some("metrics".to_string()),
304                container_port: spec.metrics.port,
305                protocol: Some("TCP".to_string()),
306                ..Default::default()
307            });
308        }
309
310        // Build probes
311        let liveness_probe = if spec.liveness_probe.enabled {
312            Some(Probe {
313                http_get: Some(HTTPGetAction {
314                    path: Some("/health".to_string()),
315                    port: IntOrString::Int(9092),
316                    scheme: Some("HTTP".to_string()),
317                    ..Default::default()
318                }),
319                initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
320                period_seconds: Some(spec.liveness_probe.period_seconds),
321                timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
322                success_threshold: Some(spec.liveness_probe.success_threshold),
323                failure_threshold: Some(spec.liveness_probe.failure_threshold),
324                ..Default::default()
325            })
326        } else {
327            None
328        };
329
330        let readiness_probe = if spec.readiness_probe.enabled {
331            Some(Probe {
332                http_get: Some(HTTPGetAction {
333                    path: Some("/ready".to_string()),
334                    port: IntOrString::Int(9092),
335                    scheme: Some("HTTP".to_string()),
336                    ..Default::default()
337                }),
338                initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
339                period_seconds: Some(spec.readiness_probe.period_seconds),
340                timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
341                success_threshold: Some(spec.readiness_probe.success_threshold),
342                failure_threshold: Some(spec.readiness_probe.failure_threshold),
343                ..Default::default()
344            })
345        } else {
346            None
347        };
348
349        // Build volume mounts
350        let volume_mounts = vec![VolumeMount {
351            name: "data".to_string(),
352            mount_path: "/data".to_string(),
353            ..Default::default()
354        }];
355
356        // Apply secure defaults for container security context if not specified
357        let container_security_context = spec.container_security_context.clone().or_else(|| {
358            Some(k8s_openapi::api::core::v1::SecurityContext {
359                // Prevent privilege escalation
360                allow_privilege_escalation: Some(false),
361                // Read-only root filesystem (data volume is writable)
362                read_only_root_filesystem: Some(true),
363                // Run as non-root
364                run_as_non_root: Some(true),
365                run_as_user: Some(1000),
366                run_as_group: Some(1000),
367                // Drop all capabilities (Rivven doesn't need any)
368                capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
369                    drop: Some(vec!["ALL".to_string()]),
370                    ..Default::default()
371                }),
372                // Use RuntimeDefault seccomp profile
373                seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
374                    type_: "RuntimeDefault".to_string(),
375                    ..Default::default()
376                }),
377                ..Default::default()
378            })
379        });
380
381        Container {
382            name: "rivven".to_string(),
383            image: Some(spec.get_image()),
384            image_pull_policy: Some(spec.image_pull_policy.clone()),
385            command: Some(vec!["rivvend".to_string()]),
386            args: Some(vec![
387                "--config".to_string(),
388                "/etc/rivven/config.yaml".to_string(),
389            ]),
390            env: Some(env),
391            ports: Some(ports),
392            resources: spec.resources.clone(),
393            liveness_probe,
394            readiness_probe,
395            volume_mounts: Some(volume_mounts),
396            security_context: container_security_context,
397            ..Default::default()
398        }
399    }
400
401    /// Build PVC template for StatefulSet
402    fn build_pvc_template(&self, spec: &RivvenClusterSpec) -> PersistentVolumeClaim {
403        let mut resources = BTreeMap::new();
404        resources.insert("storage".to_string(), Quantity(spec.storage.size.clone()));
405
406        PersistentVolumeClaim {
407            metadata: ObjectMeta {
408                name: Some("data".to_string()),
409                ..Default::default()
410            },
411            spec: Some(PersistentVolumeClaimSpec {
412                access_modes: Some(spec.storage.access_modes.clone()),
413                storage_class_name: spec.storage.storage_class_name.clone(),
414                resources: Some(k8s_openapi::api::core::v1::VolumeResourceRequirements {
415                    requests: Some(resources),
416                    ..Default::default()
417                }),
418                ..Default::default()
419            }),
420            ..Default::default()
421        }
422    }
423
424    /// Build the headless service for pod discovery
425    pub fn build_headless_service(&self) -> Service {
426        let spec = &self.cluster.spec;
427        let name = format!("{}-headless", self.resource_name());
428        let labels = spec.get_labels(&self.name);
429        let selector_labels = spec.get_selector_labels(&self.name);
430
431        Service {
432            metadata: ObjectMeta {
433                name: Some(name),
434                namespace: Some(self.namespace.clone()),
435                labels: Some(labels),
436                owner_references: Some(vec![self.owner_reference()]),
437                ..Default::default()
438            },
439            spec: Some(ServiceSpec {
440                cluster_ip: Some("None".to_string()),
441                selector: Some(selector_labels),
442                ports: Some(vec![
443                    ServicePort {
444                        name: Some("broker".to_string()),
445                        port: 9092,
446                        target_port: Some(IntOrString::Int(9092)),
447                        protocol: Some("TCP".to_string()),
448                        ..Default::default()
449                    },
450                    ServicePort {
451                        name: Some("raft".to_string()),
452                        port: 9093,
453                        target_port: Some(IntOrString::Int(9093)),
454                        protocol: Some("TCP".to_string()),
455                        ..Default::default()
456                    },
457                ]),
458                publish_not_ready_addresses: Some(true),
459                ..Default::default()
460            }),
461            ..Default::default()
462        }
463    }
464
465    /// Build the client-facing service
466    pub fn build_client_service(&self) -> Service {
467        let spec = &self.cluster.spec;
468        let name = self.resource_name();
469        let labels = spec.get_labels(&self.name);
470        let selector_labels = spec.get_selector_labels(&self.name);
471
472        let mut ports = vec![ServicePort {
473            name: Some("broker".to_string()),
474            port: 9092,
475            target_port: Some(IntOrString::Int(9092)),
476            protocol: Some("TCP".to_string()),
477            ..Default::default()
478        }];
479
480        if spec.metrics.enabled {
481            ports.push(ServicePort {
482                name: Some("metrics".to_string()),
483                port: spec.metrics.port,
484                target_port: Some(IntOrString::Int(spec.metrics.port)),
485                protocol: Some("TCP".to_string()),
486                ..Default::default()
487            });
488        }
489
490        Service {
491            metadata: ObjectMeta {
492                name: Some(name),
493                namespace: Some(self.namespace.clone()),
494                labels: Some(labels),
495                owner_references: Some(vec![self.owner_reference()]),
496                ..Default::default()
497            },
498            spec: Some(ServiceSpec {
499                type_: Some("ClusterIP".to_string()),
500                selector: Some(selector_labels),
501                ports: Some(ports),
502                ..Default::default()
503            }),
504            ..Default::default()
505        }
506    }
507
508    /// Build ConfigMap for broker configuration
509    pub fn build_configmap(&self) -> ConfigMap {
510        let spec = &self.cluster.spec;
511        let name = format!("{}-config", self.resource_name());
512        let labels = spec.get_labels(&self.name);
513
514        // Build configuration YAML
515        let mut config = BTreeMap::new();
516        config.insert(
517            "default_partitions".to_string(),
518            spec.config.default_partitions.to_string(),
519        );
520        config.insert(
521            "default_replication_factor".to_string(),
522            spec.config.default_replication_factor.to_string(),
523        );
524        config.insert(
525            "log_retention_hours".to_string(),
526            spec.config.log_retention_hours.to_string(),
527        );
528        config.insert(
529            "log_segment_bytes".to_string(),
530            spec.config.log_segment_bytes.to_string(),
531        );
532        config.insert(
533            "max_message_bytes".to_string(),
534            spec.config.max_message_bytes.to_string(),
535        );
536        config.insert(
537            "auto_create_topics".to_string(),
538            spec.config.auto_create_topics.to_string(),
539        );
540        config.insert(
541            "compression_enabled".to_string(),
542            spec.config.compression_enabled.to_string(),
543        );
544        config.insert(
545            "compression_type".to_string(),
546            spec.config.compression_type.clone(),
547        );
548        config.insert(
549            "raft_election_timeout_ms".to_string(),
550            spec.config.raft_election_timeout_ms.to_string(),
551        );
552        config.insert(
553            "raft_heartbeat_interval_ms".to_string(),
554            spec.config.raft_heartbeat_interval_ms.to_string(),
555        );
556
557        // Add raw overrides
558        for (k, v) in &spec.config.raw {
559            config.insert(k.clone(), v.clone());
560        }
561
562        let config_yaml = serde_yaml::to_string(&config).unwrap_or_default();
563
564        let mut data = BTreeMap::new();
565        data.insert("config.yaml".to_string(), config_yaml);
566
567        ConfigMap {
568            metadata: ObjectMeta {
569                name: Some(name),
570                namespace: Some(self.namespace.clone()),
571                labels: Some(labels),
572                owner_references: Some(vec![self.owner_reference()]),
573                ..Default::default()
574            },
575            data: Some(data),
576            ..Default::default()
577        }
578    }
579
580    /// Build PodDisruptionBudget
581    pub fn build_pdb(&self) -> Option<PodDisruptionBudget> {
582        let spec = &self.cluster.spec;
583
584        if !spec.pod_disruption_budget.enabled {
585            return None;
586        }
587
588        let name = format!("{}-pdb", self.resource_name());
589        let labels = spec.get_labels(&self.name);
590        let selector_labels = spec.get_selector_labels(&self.name);
591
592        Some(PodDisruptionBudget {
593            metadata: ObjectMeta {
594                name: Some(name),
595                namespace: Some(self.namespace.clone()),
596                labels: Some(labels),
597                owner_references: Some(vec![self.owner_reference()]),
598                ..Default::default()
599            },
600            spec: Some(PodDisruptionBudgetSpec {
601                selector: Some(LabelSelector {
602                    match_labels: Some(selector_labels),
603                    ..Default::default()
604                }),
605                min_available: spec
606                    .pod_disruption_budget
607                    .min_available
608                    .as_ref()
609                    .map(|v| IntOrString::String(v.clone())),
610                max_unavailable: spec
611                    .pod_disruption_budget
612                    .max_unavailable
613                    .as_ref()
614                    .map(|v| IntOrString::String(v.clone())),
615                ..Default::default()
616            }),
617            ..Default::default()
618        })
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use crate::crd::{BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, StorageSpec, TlsSpec};
626
627    fn create_test_cluster(name: &str) -> RivvenCluster {
628        RivvenCluster {
629            metadata: ObjectMeta {
630                name: Some(name.to_string()),
631                namespace: Some("default".to_string()),
632                uid: Some("test-uid-123".to_string()),
633                ..Default::default()
634            },
635            spec: RivvenClusterSpec {
636                replicas: 3,
637                version: "0.0.1".to_string(),
638                image: None,
639                image_pull_policy: "IfNotPresent".to_string(),
640                image_pull_secrets: vec![],
641                storage: StorageSpec::default(),
642                resources: None,
643                config: BrokerConfig::default(),
644                tls: TlsSpec::default(),
645                metrics: MetricsSpec::default(),
646                affinity: None,
647                node_selector: BTreeMap::new(),
648                tolerations: vec![],
649                pod_disruption_budget: PdbSpec::default(),
650                service_account: None,
651                pod_annotations: BTreeMap::new(),
652                pod_labels: BTreeMap::new(),
653                env: vec![],
654                liveness_probe: ProbeSpec::default(),
655                readiness_probe: ProbeSpec::default(),
656                security_context: None,
657                container_security_context: None,
658            },
659            status: None,
660        }
661    }
662
663    #[test]
664    fn test_build_statefulset() {
665        let cluster = create_test_cluster("my-cluster");
666        let builder = ResourceBuilder::new(&cluster).unwrap();
667        let sts = builder.build_statefulset();
668
669        assert_eq!(sts.metadata.name, Some("rivven-my-cluster".to_string()));
670        assert_eq!(sts.spec.as_ref().unwrap().replicas, Some(3));
671        assert_eq!(
672            sts.spec.as_ref().unwrap().service_name,
673            "rivven-my-cluster-headless"
674        );
675    }
676
677    #[test]
678    fn test_build_headless_service() {
679        let cluster = create_test_cluster("my-cluster");
680        let builder = ResourceBuilder::new(&cluster).unwrap();
681        let svc = builder.build_headless_service();
682
683        assert_eq!(
684            svc.metadata.name,
685            Some("rivven-my-cluster-headless".to_string())
686        );
687        assert_eq!(
688            svc.spec.as_ref().unwrap().cluster_ip,
689            Some("None".to_string())
690        );
691    }
692
693    #[test]
694    fn test_build_client_service() {
695        let cluster = create_test_cluster("my-cluster");
696        let builder = ResourceBuilder::new(&cluster).unwrap();
697        let svc = builder.build_client_service();
698
699        assert_eq!(svc.metadata.name, Some("rivven-my-cluster".to_string()));
700        assert_eq!(
701            svc.spec.as_ref().unwrap().type_,
702            Some("ClusterIP".to_string())
703        );
704    }
705
706    #[test]
707    fn test_build_configmap() {
708        let cluster = create_test_cluster("my-cluster");
709        let builder = ResourceBuilder::new(&cluster).unwrap();
710        let cm = builder.build_configmap();
711
712        assert_eq!(
713            cm.metadata.name,
714            Some("rivven-my-cluster-config".to_string())
715        );
716        assert!(cm.data.is_some());
717        assert!(cm.data.as_ref().unwrap().contains_key("config.yaml"));
718    }
719
720    #[test]
721    fn test_build_pdb() {
722        let cluster = create_test_cluster("my-cluster");
723        let builder = ResourceBuilder::new(&cluster).unwrap();
724        let pdb = builder.build_pdb();
725
726        assert!(pdb.is_some());
727        let pdb = pdb.unwrap();
728        assert_eq!(pdb.metadata.name, Some("rivven-my-cluster-pdb".to_string()));
729    }
730
731    #[test]
732    fn test_owner_references() {
733        let cluster = create_test_cluster("my-cluster");
734        let builder = ResourceBuilder::new(&cluster).unwrap();
735        let sts = builder.build_statefulset();
736
737        let owner_refs = sts.metadata.owner_references.as_ref().unwrap();
738        assert_eq!(owner_refs.len(), 1);
739        assert_eq!(owner_refs[0].kind, "RivvenCluster");
740        assert_eq!(owner_refs[0].name, "my-cluster");
741    }
742
743    #[test]
744    fn test_custom_labels() {
745        let mut cluster = create_test_cluster("my-cluster");
746        cluster
747            .spec
748            .pod_labels
749            .insert("custom".to_string(), "label".to_string());
750
751        let builder = ResourceBuilder::new(&cluster).unwrap();
752        let sts = builder.build_statefulset();
753
754        let pod_labels = sts
755            .spec
756            .as_ref()
757            .unwrap()
758            .template
759            .metadata
760            .as_ref()
761            .unwrap()
762            .labels
763            .as_ref()
764            .unwrap();
765
766        assert_eq!(pod_labels.get("custom"), Some(&"label".to_string()));
767    }
768}