Skip to main content

rivven_operator/
resources.rs

1//! Kubernetes Resource Builders
2//!
3//! This module generates Kubernetes manifests (StatefulSet, Service, ConfigMap, etc.)
4//! from RivvenCluster specifications.
5
6use crate::crd::{RivvenCluster, RivvenClusterSpec};
7use crate::error::{OperatorError, Result};
8use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
9use k8s_openapi::api::core::v1::{
10    ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
11    PersistentVolumeClaim, PersistentVolumeClaimSpec, PodSpec, PodTemplateSpec, Probe, Service,
12    ServicePort, ServiceSpec, VolumeMount,
13};
14use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
15use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
17use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
18use std::collections::BTreeMap;
19
20/// Builder for generating Kubernetes resources from a RivvenCluster
21pub struct ResourceBuilder<'a> {
22    cluster: &'a RivvenCluster,
23    name: String,
24    namespace: String,
25}
26
27impl<'a> ResourceBuilder<'a> {
28    /// Create a new resource builder
29    pub fn new(cluster: &'a RivvenCluster) -> Result<Self> {
30        let name =
31            cluster.metadata.name.clone().ok_or_else(|| {
32                OperatorError::InvalidConfig("cluster name is required".to_string())
33            })?;
34
35        let namespace = cluster
36            .metadata
37            .namespace
38            .clone()
39            .unwrap_or_else(|| "default".to_string());
40
41        Ok(Self {
42            cluster,
43            name,
44            namespace,
45        })
46    }
47
48    /// Get the resource name prefix
49    fn resource_name(&self) -> String {
50        format!("rivven-{}", self.name)
51    }
52
53    /// Get owner reference for managed resources
54    fn owner_reference(&self) -> OwnerReference {
55        OwnerReference {
56            api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
57            kind: "RivvenCluster".to_string(),
58            name: self.name.clone(),
59            uid: self.cluster.metadata.uid.clone().unwrap_or_default(),
60            controller: Some(true),
61            block_owner_deletion: Some(true),
62        }
63    }
64
65    /// Build the StatefulSet for the Rivven cluster
66    pub fn build_statefulset(&self) -> StatefulSet {
67        let spec = &self.cluster.spec;
68        let name = self.resource_name();
69        let labels = spec.get_labels(&self.name);
70        let selector_labels = spec.get_selector_labels(&self.name);
71
72        // Build container
73        let container = self.build_container(spec);
74
75        // Build PVC template
76        let pvc_template = self.build_pvc_template(spec);
77
78        // Build pod template
79        let mut pod_labels = selector_labels.clone();
80        pod_labels.extend(spec.pod_labels.clone());
81
82        let mut pod_annotations = BTreeMap::new();
83        if spec.metrics.enabled {
84            pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
85            pod_annotations.insert(
86                "prometheus.io/port".to_string(),
87                spec.metrics.port.to_string(),
88            );
89        }
90        pod_annotations.extend(spec.pod_annotations.clone());
91
92        // Apply secure defaults for pod security context if not specified
93        let pod_security_context = spec.security_context.clone().or_else(|| {
94            Some(k8s_openapi::api::core::v1::PodSecurityContext {
95                // Run as non-root by default
96                run_as_non_root: Some(true),
97                // Use rivven user (UID 1000)
98                run_as_user: Some(1000),
99                run_as_group: Some(1000),
100                fs_group: Some(1000),
101                // Use RuntimeDefault seccomp profile
102                seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
103                    type_: "RuntimeDefault".to_string(),
104                    ..Default::default()
105                }),
106                ..Default::default()
107            })
108        });
109
110        // Build volumes for TLS if enabled
111        let volumes = if spec.tls.enabled {
112            spec.tls.cert_secret_name.as_ref().map(|secret_name| {
113                vec![k8s_openapi::api::core::v1::Volume {
114                    name: "tls".to_string(),
115                    secret: Some(k8s_openapi::api::core::v1::SecretVolumeSource {
116                        secret_name: Some(secret_name.clone()),
117                        default_mode: Some(0o400),
118                        ..Default::default()
119                    }),
120                    ..Default::default()
121                }]
122            })
123        } else {
124            None
125        };
126
127        let pod_spec = PodSpec {
128            containers: vec![container],
129            volumes,
130            affinity: spec.affinity.clone(),
131            security_context: pod_security_context,
132            node_selector: if spec.node_selector.is_empty() {
133                None
134            } else {
135                Some(spec.node_selector.clone())
136            },
137            tolerations: if spec.tolerations.is_empty() {
138                None
139            } else {
140                Some(spec.tolerations.clone())
141            },
142            service_account_name: spec.service_account.clone(),
143            image_pull_secrets: if spec.image_pull_secrets.is_empty() {
144                None
145            } else {
146                Some(
147                    spec.image_pull_secrets
148                        .iter()
149                        .map(|s| k8s_openapi::api::core::v1::LocalObjectReference {
150                            name: s.clone(),
151                        })
152                        .collect(),
153                )
154            },
155            // Disable service account token auto-mounting for security
156            automount_service_account_token: Some(false),
157            ..Default::default()
158        };
159
160        StatefulSet {
161            metadata: ObjectMeta {
162                name: Some(name.clone()),
163                namespace: Some(self.namespace.clone()),
164                labels: Some(labels.clone()),
165                owner_references: Some(vec![self.owner_reference()]),
166                ..Default::default()
167            },
168            spec: Some(StatefulSetSpec {
169                service_name: format!("{}-headless", name),
170                replicas: Some(spec.replicas),
171                selector: LabelSelector {
172                    match_labels: Some(selector_labels),
173                    ..Default::default()
174                },
175                template: PodTemplateSpec {
176                    metadata: Some(ObjectMeta {
177                        labels: Some(pod_labels),
178                        annotations: Some(pod_annotations),
179                        ..Default::default()
180                    }),
181                    spec: Some(pod_spec),
182                },
183                volume_claim_templates: Some(vec![pvc_template]),
184                pod_management_policy: Some("Parallel".to_string()),
185                update_strategy: Some(k8s_openapi::api::apps::v1::StatefulSetUpdateStrategy {
186                    type_: Some("RollingUpdate".to_string()),
187                    rolling_update: Some(
188                        k8s_openapi::api::apps::v1::RollingUpdateStatefulSetStrategy {
189                            max_unavailable: Some(IntOrString::Int(1)),
190                            partition: Some(0),
191                        },
192                    ),
193                }),
194                ..Default::default()
195            }),
196            ..Default::default()
197        }
198    }
199
200    /// Build the main container for the broker pod
201    fn build_container(&self, spec: &RivvenClusterSpec) -> Container {
202        let name = self.resource_name();
203
204        // Build environment variables
205        let mut env = vec![
206            EnvVar {
207                name: "RIVVEN_DATA_DIR".to_string(),
208                value: Some("/data".to_string()),
209                ..Default::default()
210            },
211            EnvVar {
212                name: "RIVVEN_BIND_ADDRESS".to_string(),
213                value: Some("0.0.0.0".to_string()),
214                ..Default::default()
215            },
216            EnvVar {
217                name: "RIVVEN_PORT".to_string(),
218                value: Some("9092".to_string()),
219                ..Default::default()
220            },
221            EnvVar {
222                name: "RIVVEN_CLUSTER_NAME".to_string(),
223                value: Some(self.name.clone()),
224                ..Default::default()
225            },
226            EnvVar {
227                name: "RIVVEN_POD_NAME".to_string(),
228                value_from: Some(EnvVarSource {
229                    field_ref: Some(ObjectFieldSelector {
230                        field_path: "metadata.name".to_string(),
231                        ..Default::default()
232                    }),
233                    ..Default::default()
234                }),
235                ..Default::default()
236            },
237            EnvVar {
238                name: "RIVVEN_POD_NAMESPACE".to_string(),
239                value_from: Some(EnvVarSource {
240                    field_ref: Some(ObjectFieldSelector {
241                        field_path: "metadata.namespace".to_string(),
242                        ..Default::default()
243                    }),
244                    ..Default::default()
245                }),
246                ..Default::default()
247            },
248            EnvVar {
249                name: "RIVVEN_SERVICE_NAME".to_string(),
250                value: Some(format!("{}-headless", name)),
251                ..Default::default()
252            },
253            // Broker config
254            EnvVar {
255                name: "RIVVEN_DEFAULT_PARTITIONS".to_string(),
256                value: Some(spec.config.default_partitions.to_string()),
257                ..Default::default()
258            },
259            EnvVar {
260                name: "RIVVEN_DEFAULT_REPLICATION_FACTOR".to_string(),
261                value: Some(spec.config.default_replication_factor.to_string()),
262                ..Default::default()
263            },
264            EnvVar {
265                name: "RIVVEN_COMPRESSION".to_string(),
266                value: Some(spec.config.compression_type.clone()),
267                ..Default::default()
268            },
269        ];
270
271        // Add metrics config
272        if spec.metrics.enabled {
273            env.push(EnvVar {
274                name: "RIVVEN_METRICS_ENABLED".to_string(),
275                value: Some("true".to_string()),
276                ..Default::default()
277            });
278            env.push(EnvVar {
279                name: "RIVVEN_METRICS_PORT".to_string(),
280                value: Some(spec.metrics.port.to_string()),
281                ..Default::default()
282            });
283        }
284
285        // Add TLS config
286        if spec.tls.enabled {
287            env.push(EnvVar {
288                name: "RIVVEN_TLS_ENABLED".to_string(),
289                value: Some("true".to_string()),
290                ..Default::default()
291            });
292            if let Some(ref _cert_secret) = spec.tls.cert_secret_name {
293                env.push(EnvVar {
294                    name: "RIVVEN_TLS_CERT_PATH".to_string(),
295                    value: Some("/tls/tls.crt".to_string()),
296                    ..Default::default()
297                });
298                env.push(EnvVar {
299                    name: "RIVVEN_TLS_KEY_PATH".to_string(),
300                    value: Some("/tls/tls.key".to_string()),
301                    ..Default::default()
302                });
303            }
304        }
305
306        // Add user-defined env vars
307        env.extend(spec.env.clone());
308
309        // Build ports
310        let mut ports = vec![ContainerPort {
311            name: Some("broker".to_string()),
312            container_port: 9092,
313            protocol: Some("TCP".to_string()),
314            ..Default::default()
315        }];
316
317        if spec.metrics.enabled {
318            ports.push(ContainerPort {
319                name: Some("metrics".to_string()),
320                container_port: spec.metrics.port,
321                protocol: Some("TCP".to_string()),
322                ..Default::default()
323            });
324        }
325
326        // Build probes
327        let liveness_probe = if spec.liveness_probe.enabled {
328            Some(Probe {
329                http_get: Some(HTTPGetAction {
330                    path: Some("/health".to_string()),
331                    port: IntOrString::Int(9092),
332                    scheme: Some("HTTP".to_string()),
333                    ..Default::default()
334                }),
335                initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
336                period_seconds: Some(spec.liveness_probe.period_seconds),
337                timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
338                success_threshold: Some(spec.liveness_probe.success_threshold),
339                failure_threshold: Some(spec.liveness_probe.failure_threshold),
340                ..Default::default()
341            })
342        } else {
343            None
344        };
345
346        let readiness_probe = if spec.readiness_probe.enabled {
347            Some(Probe {
348                http_get: Some(HTTPGetAction {
349                    path: Some("/ready".to_string()),
350                    port: IntOrString::Int(9092),
351                    scheme: Some("HTTP".to_string()),
352                    ..Default::default()
353                }),
354                initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
355                period_seconds: Some(spec.readiness_probe.period_seconds),
356                timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
357                success_threshold: Some(spec.readiness_probe.success_threshold),
358                failure_threshold: Some(spec.readiness_probe.failure_threshold),
359                ..Default::default()
360            })
361        } else {
362            None
363        };
364
365        // Build volume mounts
366        let mut volume_mounts = vec![VolumeMount {
367            name: "data".to_string(),
368            mount_path: "/data".to_string(),
369            ..Default::default()
370        }];
371
372        // Add TLS volume mount if TLS is enabled with a cert secret
373        if spec.tls.enabled && spec.tls.cert_secret_name.is_some() {
374            volume_mounts.push(VolumeMount {
375                name: "tls".to_string(),
376                mount_path: "/tls".to_string(),
377                read_only: Some(true),
378                ..Default::default()
379            });
380        }
381
382        // Apply secure defaults for container security context if not specified
383        let container_security_context = spec.container_security_context.clone().or_else(|| {
384            Some(k8s_openapi::api::core::v1::SecurityContext {
385                // Prevent privilege escalation
386                allow_privilege_escalation: Some(false),
387                // Read-only root filesystem (data volume is writable)
388                read_only_root_filesystem: Some(true),
389                // Run as non-root
390                run_as_non_root: Some(true),
391                run_as_user: Some(1000),
392                run_as_group: Some(1000),
393                // Drop all capabilities (Rivven doesn't need any)
394                capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
395                    drop: Some(vec!["ALL".to_string()]),
396                    ..Default::default()
397                }),
398                // Use RuntimeDefault seccomp profile
399                seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
400                    type_: "RuntimeDefault".to_string(),
401                    ..Default::default()
402                }),
403                ..Default::default()
404            })
405        });
406
407        Container {
408            name: "rivven".to_string(),
409            image: Some(spec.get_image()),
410            image_pull_policy: Some(spec.image_pull_policy.clone()),
411            command: Some(vec!["rivvend".to_string()]),
412            args: Some(vec![
413                "--config".to_string(),
414                "/etc/rivven/config.yaml".to_string(),
415            ]),
416            env: Some(env),
417            ports: Some(ports),
418            resources: spec.resources.clone(),
419            liveness_probe,
420            readiness_probe,
421            volume_mounts: Some(volume_mounts),
422            security_context: container_security_context,
423            ..Default::default()
424        }
425    }
426
427    /// Build PVC template for StatefulSet
428    fn build_pvc_template(&self, spec: &RivvenClusterSpec) -> PersistentVolumeClaim {
429        let mut resources = BTreeMap::new();
430        resources.insert("storage".to_string(), Quantity(spec.storage.size.clone()));
431
432        PersistentVolumeClaim {
433            metadata: ObjectMeta {
434                name: Some("data".to_string()),
435                ..Default::default()
436            },
437            spec: Some(PersistentVolumeClaimSpec {
438                access_modes: Some(spec.storage.access_modes.clone()),
439                storage_class_name: spec.storage.storage_class_name.clone(),
440                resources: Some(k8s_openapi::api::core::v1::VolumeResourceRequirements {
441                    requests: Some(resources),
442                    ..Default::default()
443                }),
444                ..Default::default()
445            }),
446            ..Default::default()
447        }
448    }
449
450    /// Build the headless service for pod discovery
451    pub fn build_headless_service(&self) -> Service {
452        let spec = &self.cluster.spec;
453        let name = format!("{}-headless", self.resource_name());
454        let labels = spec.get_labels(&self.name);
455        let selector_labels = spec.get_selector_labels(&self.name);
456
457        Service {
458            metadata: ObjectMeta {
459                name: Some(name),
460                namespace: Some(self.namespace.clone()),
461                labels: Some(labels),
462                owner_references: Some(vec![self.owner_reference()]),
463                ..Default::default()
464            },
465            spec: Some(ServiceSpec {
466                cluster_ip: Some("None".to_string()),
467                selector: Some(selector_labels),
468                ports: Some(vec![
469                    ServicePort {
470                        name: Some("broker".to_string()),
471                        port: 9092,
472                        target_port: Some(IntOrString::Int(9092)),
473                        protocol: Some("TCP".to_string()),
474                        ..Default::default()
475                    },
476                    ServicePort {
477                        name: Some("raft".to_string()),
478                        port: 9093,
479                        target_port: Some(IntOrString::Int(9093)),
480                        protocol: Some("TCP".to_string()),
481                        ..Default::default()
482                    },
483                ]),
484                publish_not_ready_addresses: Some(true),
485                ..Default::default()
486            }),
487            ..Default::default()
488        }
489    }
490
491    /// Build the client-facing service
492    pub fn build_client_service(&self) -> Service {
493        let spec = &self.cluster.spec;
494        let name = self.resource_name();
495        let labels = spec.get_labels(&self.name);
496        let selector_labels = spec.get_selector_labels(&self.name);
497
498        let mut ports = vec![ServicePort {
499            name: Some("broker".to_string()),
500            port: 9092,
501            target_port: Some(IntOrString::Int(9092)),
502            protocol: Some("TCP".to_string()),
503            ..Default::default()
504        }];
505
506        if spec.metrics.enabled {
507            ports.push(ServicePort {
508                name: Some("metrics".to_string()),
509                port: spec.metrics.port,
510                target_port: Some(IntOrString::Int(spec.metrics.port)),
511                protocol: Some("TCP".to_string()),
512                ..Default::default()
513            });
514        }
515
516        Service {
517            metadata: ObjectMeta {
518                name: Some(name),
519                namespace: Some(self.namespace.clone()),
520                labels: Some(labels),
521                owner_references: Some(vec![self.owner_reference()]),
522                ..Default::default()
523            },
524            spec: Some(ServiceSpec {
525                type_: Some("ClusterIP".to_string()),
526                selector: Some(selector_labels),
527                ports: Some(ports),
528                ..Default::default()
529            }),
530            ..Default::default()
531        }
532    }
533
534    /// Build ConfigMap for broker configuration
535    pub fn build_configmap(&self) -> Result<ConfigMap> {
536        let spec = &self.cluster.spec;
537        let name = format!("{}-config", self.resource_name());
538        let labels = spec.get_labels(&self.name);
539
540        // Build configuration YAML
541        let mut config = BTreeMap::new();
542        config.insert(
543            "default_partitions".to_string(),
544            spec.config.default_partitions.to_string(),
545        );
546        config.insert(
547            "default_replication_factor".to_string(),
548            spec.config.default_replication_factor.to_string(),
549        );
550        config.insert(
551            "log_retention_hours".to_string(),
552            spec.config.log_retention_hours.to_string(),
553        );
554        config.insert(
555            "log_segment_bytes".to_string(),
556            spec.config.log_segment_bytes.to_string(),
557        );
558        config.insert(
559            "max_message_bytes".to_string(),
560            spec.config.max_message_bytes.to_string(),
561        );
562        config.insert(
563            "auto_create_topics".to_string(),
564            spec.config.auto_create_topics.to_string(),
565        );
566        config.insert(
567            "compression_enabled".to_string(),
568            spec.config.compression_enabled.to_string(),
569        );
570        config.insert(
571            "compression_type".to_string(),
572            spec.config.compression_type.clone(),
573        );
574        config.insert(
575            "raft_election_timeout_ms".to_string(),
576            spec.config.raft_election_timeout_ms.to_string(),
577        );
578        config.insert(
579            "raft_heartbeat_interval_ms".to_string(),
580            spec.config.raft_heartbeat_interval_ms.to_string(),
581        );
582
583        // Add raw overrides
584        for (k, v) in &spec.config.raw {
585            config.insert(k.clone(), v.clone());
586        }
587
588        let config_yaml = serde_yaml::to_string(&config)?;
589
590        let mut data = BTreeMap::new();
591        data.insert("config.yaml".to_string(), config_yaml);
592
593        Ok(ConfigMap {
594            metadata: ObjectMeta {
595                name: Some(name),
596                namespace: Some(self.namespace.clone()),
597                labels: Some(labels),
598                owner_references: Some(vec![self.owner_reference()]),
599                ..Default::default()
600            },
601            data: Some(data),
602            ..Default::default()
603        })
604    }
605
606    /// Build PodDisruptionBudget
607    pub fn build_pdb(&self) -> Option<PodDisruptionBudget> {
608        let spec = &self.cluster.spec;
609
610        if !spec.pod_disruption_budget.enabled {
611            return None;
612        }
613
614        let name = format!("{}-pdb", self.resource_name());
615        let labels = spec.get_labels(&self.name);
616        let selector_labels = spec.get_selector_labels(&self.name);
617
618        Some(PodDisruptionBudget {
619            metadata: ObjectMeta {
620                name: Some(name),
621                namespace: Some(self.namespace.clone()),
622                labels: Some(labels),
623                owner_references: Some(vec![self.owner_reference()]),
624                ..Default::default()
625            },
626            spec: Some(PodDisruptionBudgetSpec {
627                selector: Some(LabelSelector {
628                    match_labels: Some(selector_labels),
629                    ..Default::default()
630                }),
631                min_available: spec
632                    .pod_disruption_budget
633                    .min_available
634                    .as_ref()
635                    .map(|v| IntOrString::String(v.clone())),
636                max_unavailable: spec
637                    .pod_disruption_budget
638                    .max_unavailable
639                    .as_ref()
640                    .map(|v| IntOrString::String(v.clone())),
641                ..Default::default()
642            }),
643            ..Default::default()
644        })
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::crd::{BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, StorageSpec, TlsSpec};
652
653    fn create_test_cluster(name: &str) -> RivvenCluster {
654        RivvenCluster {
655            metadata: ObjectMeta {
656                name: Some(name.to_string()),
657                namespace: Some("default".to_string()),
658                uid: Some("test-uid-123".to_string()),
659                ..Default::default()
660            },
661            spec: RivvenClusterSpec {
662                replicas: 3,
663                version: "0.0.1".to_string(),
664                image: None,
665                image_pull_policy: "IfNotPresent".to_string(),
666                image_pull_secrets: vec![],
667                storage: StorageSpec::default(),
668                resources: None,
669                config: BrokerConfig::default(),
670                tls: TlsSpec::default(),
671                metrics: MetricsSpec::default(),
672                affinity: None,
673                node_selector: BTreeMap::new(),
674                tolerations: vec![],
675                pod_disruption_budget: PdbSpec::default(),
676                service_account: None,
677                pod_annotations: BTreeMap::new(),
678                pod_labels: BTreeMap::new(),
679                env: vec![],
680                liveness_probe: ProbeSpec::default(),
681                readiness_probe: ProbeSpec::default(),
682                security_context: None,
683                container_security_context: None,
684            },
685            status: None,
686        }
687    }
688
689    #[test]
690    fn test_build_statefulset() {
691        let cluster = create_test_cluster("my-cluster");
692        let builder = ResourceBuilder::new(&cluster).unwrap();
693        let sts = builder.build_statefulset();
694
695        assert_eq!(sts.metadata.name, Some("rivven-my-cluster".to_string()));
696        assert_eq!(sts.spec.as_ref().unwrap().replicas, Some(3));
697        assert_eq!(
698            sts.spec.as_ref().unwrap().service_name,
699            "rivven-my-cluster-headless"
700        );
701    }
702
703    #[test]
704    fn test_build_headless_service() {
705        let cluster = create_test_cluster("my-cluster");
706        let builder = ResourceBuilder::new(&cluster).unwrap();
707        let svc = builder.build_headless_service();
708
709        assert_eq!(
710            svc.metadata.name,
711            Some("rivven-my-cluster-headless".to_string())
712        );
713        assert_eq!(
714            svc.spec.as_ref().unwrap().cluster_ip,
715            Some("None".to_string())
716        );
717    }
718
719    #[test]
720    fn test_build_client_service() {
721        let cluster = create_test_cluster("my-cluster");
722        let builder = ResourceBuilder::new(&cluster).unwrap();
723        let svc = builder.build_client_service();
724
725        assert_eq!(svc.metadata.name, Some("rivven-my-cluster".to_string()));
726        assert_eq!(
727            svc.spec.as_ref().unwrap().type_,
728            Some("ClusterIP".to_string())
729        );
730    }
731
732    #[test]
733    fn test_build_configmap() {
734        let cluster = create_test_cluster("my-cluster");
735        let builder = ResourceBuilder::new(&cluster).unwrap();
736        let cm = builder.build_configmap().unwrap();
737
738        assert_eq!(
739            cm.metadata.name,
740            Some("rivven-my-cluster-config".to_string())
741        );
742        assert!(cm.data.is_some());
743        assert!(cm.data.as_ref().unwrap().contains_key("config.yaml"));
744    }
745
746    #[test]
747    fn test_build_pdb() {
748        let cluster = create_test_cluster("my-cluster");
749        let builder = ResourceBuilder::new(&cluster).unwrap();
750        let pdb = builder.build_pdb();
751
752        assert!(pdb.is_some());
753        let pdb = pdb.unwrap();
754        assert_eq!(pdb.metadata.name, Some("rivven-my-cluster-pdb".to_string()));
755    }
756
757    #[test]
758    fn test_owner_references() {
759        let cluster = create_test_cluster("my-cluster");
760        let builder = ResourceBuilder::new(&cluster).unwrap();
761        let sts = builder.build_statefulset();
762
763        let owner_refs = sts.metadata.owner_references.as_ref().unwrap();
764        assert_eq!(owner_refs.len(), 1);
765        assert_eq!(owner_refs[0].kind, "RivvenCluster");
766        assert_eq!(owner_refs[0].name, "my-cluster");
767    }
768
769    #[test]
770    fn test_custom_labels() {
771        let mut cluster = create_test_cluster("my-cluster");
772        cluster
773            .spec
774            .pod_labels
775            .insert("custom".to_string(), "label".to_string());
776
777        let builder = ResourceBuilder::new(&cluster).unwrap();
778        let sts = builder.build_statefulset();
779
780        let pod_labels = sts
781            .spec
782            .as_ref()
783            .unwrap()
784            .template
785            .metadata
786            .as_ref()
787            .unwrap()
788            .labels
789            .as_ref()
790            .unwrap();
791
792        assert_eq!(pod_labels.get("custom"), Some(&"label".to_string()));
793    }
794}