1use crate::crd::{RivvenCluster, RivvenClusterSpec};
7use crate::error::{OperatorError, Result};
8use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
9use k8s_openapi::api::core::v1::{
10 ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
11 PersistentVolumeClaim, PersistentVolumeClaimSpec, PodSpec, PodTemplateSpec, Probe, Service,
12 ServicePort, ServiceSpec, VolumeMount,
13};
14use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
15use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
17use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
18use std::collections::BTreeMap;
19
20pub struct ResourceBuilder<'a> {
22 cluster: &'a RivvenCluster,
23 name: String,
24 namespace: String,
25}
26
27impl<'a> ResourceBuilder<'a> {
28 pub fn new(cluster: &'a RivvenCluster) -> Result<Self> {
30 let name =
31 cluster.metadata.name.clone().ok_or_else(|| {
32 OperatorError::InvalidConfig("cluster name is required".to_string())
33 })?;
34
35 let namespace = cluster
36 .metadata
37 .namespace
38 .clone()
39 .unwrap_or_else(|| "default".to_string());
40
41 Ok(Self {
42 cluster,
43 name,
44 namespace,
45 })
46 }
47
48 fn resource_name(&self) -> String {
50 format!("rivven-{}", self.name)
51 }
52
53 fn owner_reference(&self) -> OwnerReference {
55 OwnerReference {
56 api_version: "rivven.io/v1alpha1".to_string(),
57 kind: "RivvenCluster".to_string(),
58 name: self.name.clone(),
59 uid: self.cluster.metadata.uid.clone().unwrap_or_default(),
60 controller: Some(true),
61 block_owner_deletion: Some(true),
62 }
63 }
64
65 pub fn build_statefulset(&self) -> StatefulSet {
67 let spec = &self.cluster.spec;
68 let name = self.resource_name();
69 let labels = spec.get_labels(&self.name);
70 let selector_labels = spec.get_selector_labels(&self.name);
71
72 let container = self.build_container(spec);
74
75 let pvc_template = self.build_pvc_template(spec);
77
78 let mut pod_labels = selector_labels.clone();
80 pod_labels.extend(spec.pod_labels.clone());
81
82 let mut pod_annotations = BTreeMap::new();
83 if spec.metrics.enabled {
84 pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
85 pod_annotations.insert(
86 "prometheus.io/port".to_string(),
87 spec.metrics.port.to_string(),
88 );
89 }
90 pod_annotations.extend(spec.pod_annotations.clone());
91
92 let pod_security_context = spec.security_context.clone().or_else(|| {
94 Some(k8s_openapi::api::core::v1::PodSecurityContext {
95 run_as_non_root: Some(true),
97 run_as_user: Some(1000),
99 run_as_group: Some(1000),
100 fs_group: Some(1000),
101 seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
103 type_: "RuntimeDefault".to_string(),
104 ..Default::default()
105 }),
106 ..Default::default()
107 })
108 });
109
110 let pod_spec = PodSpec {
111 containers: vec![container],
112 affinity: spec.affinity.clone(),
113 security_context: pod_security_context,
114 node_selector: if spec.node_selector.is_empty() {
115 None
116 } else {
117 Some(spec.node_selector.clone())
118 },
119 tolerations: if spec.tolerations.is_empty() {
120 None
121 } else {
122 Some(spec.tolerations.clone())
123 },
124 service_account_name: spec.service_account.clone(),
125 image_pull_secrets: if spec.image_pull_secrets.is_empty() {
126 None
127 } else {
128 Some(
129 spec.image_pull_secrets
130 .iter()
131 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference {
132 name: s.clone(),
133 })
134 .collect(),
135 )
136 },
137 automount_service_account_token: Some(false),
139 ..Default::default()
140 };
141
142 StatefulSet {
143 metadata: ObjectMeta {
144 name: Some(name.clone()),
145 namespace: Some(self.namespace.clone()),
146 labels: Some(labels.clone()),
147 owner_references: Some(vec![self.owner_reference()]),
148 ..Default::default()
149 },
150 spec: Some(StatefulSetSpec {
151 service_name: format!("{}-headless", name),
152 replicas: Some(spec.replicas),
153 selector: LabelSelector {
154 match_labels: Some(selector_labels),
155 ..Default::default()
156 },
157 template: PodTemplateSpec {
158 metadata: Some(ObjectMeta {
159 labels: Some(pod_labels),
160 annotations: Some(pod_annotations),
161 ..Default::default()
162 }),
163 spec: Some(pod_spec),
164 },
165 volume_claim_templates: Some(vec![pvc_template]),
166 pod_management_policy: Some("Parallel".to_string()),
167 update_strategy: Some(k8s_openapi::api::apps::v1::StatefulSetUpdateStrategy {
168 type_: Some("RollingUpdate".to_string()),
169 rolling_update: Some(
170 k8s_openapi::api::apps::v1::RollingUpdateStatefulSetStrategy {
171 max_unavailable: Some(IntOrString::Int(1)),
172 partition: Some(0),
173 },
174 ),
175 }),
176 ..Default::default()
177 }),
178 ..Default::default()
179 }
180 }
181
182 fn build_container(&self, spec: &RivvenClusterSpec) -> Container {
184 let name = self.resource_name();
185
186 let mut env = vec![
188 EnvVar {
189 name: "RIVVEN_DATA_DIR".to_string(),
190 value: Some("/data".to_string()),
191 ..Default::default()
192 },
193 EnvVar {
194 name: "RIVVEN_BIND_ADDRESS".to_string(),
195 value: Some("0.0.0.0".to_string()),
196 ..Default::default()
197 },
198 EnvVar {
199 name: "RIVVEN_PORT".to_string(),
200 value: Some("9092".to_string()),
201 ..Default::default()
202 },
203 EnvVar {
204 name: "RIVVEN_CLUSTER_NAME".to_string(),
205 value: Some(self.name.clone()),
206 ..Default::default()
207 },
208 EnvVar {
209 name: "RIVVEN_POD_NAME".to_string(),
210 value_from: Some(EnvVarSource {
211 field_ref: Some(ObjectFieldSelector {
212 field_path: "metadata.name".to_string(),
213 ..Default::default()
214 }),
215 ..Default::default()
216 }),
217 ..Default::default()
218 },
219 EnvVar {
220 name: "RIVVEN_POD_NAMESPACE".to_string(),
221 value_from: Some(EnvVarSource {
222 field_ref: Some(ObjectFieldSelector {
223 field_path: "metadata.namespace".to_string(),
224 ..Default::default()
225 }),
226 ..Default::default()
227 }),
228 ..Default::default()
229 },
230 EnvVar {
231 name: "RIVVEN_SERVICE_NAME".to_string(),
232 value: Some(format!("{}-headless", name)),
233 ..Default::default()
234 },
235 EnvVar {
237 name: "RIVVEN_DEFAULT_PARTITIONS".to_string(),
238 value: Some(spec.config.default_partitions.to_string()),
239 ..Default::default()
240 },
241 EnvVar {
242 name: "RIVVEN_DEFAULT_REPLICATION_FACTOR".to_string(),
243 value: Some(spec.config.default_replication_factor.to_string()),
244 ..Default::default()
245 },
246 EnvVar {
247 name: "RIVVEN_COMPRESSION".to_string(),
248 value: Some(spec.config.compression_type.clone()),
249 ..Default::default()
250 },
251 ];
252
253 if spec.metrics.enabled {
255 env.push(EnvVar {
256 name: "RIVVEN_METRICS_ENABLED".to_string(),
257 value: Some("true".to_string()),
258 ..Default::default()
259 });
260 env.push(EnvVar {
261 name: "RIVVEN_METRICS_PORT".to_string(),
262 value: Some(spec.metrics.port.to_string()),
263 ..Default::default()
264 });
265 }
266
267 if spec.tls.enabled {
269 env.push(EnvVar {
270 name: "RIVVEN_TLS_ENABLED".to_string(),
271 value: Some("true".to_string()),
272 ..Default::default()
273 });
274 if let Some(ref cert_secret) = spec.tls.cert_secret_name {
275 env.push(EnvVar {
276 name: "RIVVEN_TLS_CERT_PATH".to_string(),
277 value: Some("/tls/tls.crt".to_string()),
278 ..Default::default()
279 });
280 env.push(EnvVar {
281 name: "RIVVEN_TLS_KEY_PATH".to_string(),
282 value: Some("/tls/tls.key".to_string()),
283 ..Default::default()
284 });
285 let _ = cert_secret; }
288 }
289
290 env.extend(spec.env.clone());
292
293 let mut ports = vec![ContainerPort {
295 name: Some("broker".to_string()),
296 container_port: 9092,
297 protocol: Some("TCP".to_string()),
298 ..Default::default()
299 }];
300
301 if spec.metrics.enabled {
302 ports.push(ContainerPort {
303 name: Some("metrics".to_string()),
304 container_port: spec.metrics.port,
305 protocol: Some("TCP".to_string()),
306 ..Default::default()
307 });
308 }
309
310 let liveness_probe = if spec.liveness_probe.enabled {
312 Some(Probe {
313 http_get: Some(HTTPGetAction {
314 path: Some("/health".to_string()),
315 port: IntOrString::Int(9092),
316 scheme: Some("HTTP".to_string()),
317 ..Default::default()
318 }),
319 initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
320 period_seconds: Some(spec.liveness_probe.period_seconds),
321 timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
322 success_threshold: Some(spec.liveness_probe.success_threshold),
323 failure_threshold: Some(spec.liveness_probe.failure_threshold),
324 ..Default::default()
325 })
326 } else {
327 None
328 };
329
330 let readiness_probe = if spec.readiness_probe.enabled {
331 Some(Probe {
332 http_get: Some(HTTPGetAction {
333 path: Some("/ready".to_string()),
334 port: IntOrString::Int(9092),
335 scheme: Some("HTTP".to_string()),
336 ..Default::default()
337 }),
338 initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
339 period_seconds: Some(spec.readiness_probe.period_seconds),
340 timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
341 success_threshold: Some(spec.readiness_probe.success_threshold),
342 failure_threshold: Some(spec.readiness_probe.failure_threshold),
343 ..Default::default()
344 })
345 } else {
346 None
347 };
348
349 let volume_mounts = vec![VolumeMount {
351 name: "data".to_string(),
352 mount_path: "/data".to_string(),
353 ..Default::default()
354 }];
355
356 let container_security_context = spec.container_security_context.clone().or_else(|| {
358 Some(k8s_openapi::api::core::v1::SecurityContext {
359 allow_privilege_escalation: Some(false),
361 read_only_root_filesystem: Some(true),
363 run_as_non_root: Some(true),
365 run_as_user: Some(1000),
366 run_as_group: Some(1000),
367 capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
369 drop: Some(vec!["ALL".to_string()]),
370 ..Default::default()
371 }),
372 seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
374 type_: "RuntimeDefault".to_string(),
375 ..Default::default()
376 }),
377 ..Default::default()
378 })
379 });
380
381 Container {
382 name: "rivven".to_string(),
383 image: Some(spec.get_image()),
384 image_pull_policy: Some(spec.image_pull_policy.clone()),
385 command: Some(vec!["rivvend".to_string()]),
386 args: Some(vec![
387 "--config".to_string(),
388 "/etc/rivven/config.yaml".to_string(),
389 ]),
390 env: Some(env),
391 ports: Some(ports),
392 resources: spec.resources.clone(),
393 liveness_probe,
394 readiness_probe,
395 volume_mounts: Some(volume_mounts),
396 security_context: container_security_context,
397 ..Default::default()
398 }
399 }
400
401 fn build_pvc_template(&self, spec: &RivvenClusterSpec) -> PersistentVolumeClaim {
403 let mut resources = BTreeMap::new();
404 resources.insert("storage".to_string(), Quantity(spec.storage.size.clone()));
405
406 PersistentVolumeClaim {
407 metadata: ObjectMeta {
408 name: Some("data".to_string()),
409 ..Default::default()
410 },
411 spec: Some(PersistentVolumeClaimSpec {
412 access_modes: Some(spec.storage.access_modes.clone()),
413 storage_class_name: spec.storage.storage_class_name.clone(),
414 resources: Some(k8s_openapi::api::core::v1::VolumeResourceRequirements {
415 requests: Some(resources),
416 ..Default::default()
417 }),
418 ..Default::default()
419 }),
420 ..Default::default()
421 }
422 }
423
424 pub fn build_headless_service(&self) -> Service {
426 let spec = &self.cluster.spec;
427 let name = format!("{}-headless", self.resource_name());
428 let labels = spec.get_labels(&self.name);
429 let selector_labels = spec.get_selector_labels(&self.name);
430
431 Service {
432 metadata: ObjectMeta {
433 name: Some(name),
434 namespace: Some(self.namespace.clone()),
435 labels: Some(labels),
436 owner_references: Some(vec![self.owner_reference()]),
437 ..Default::default()
438 },
439 spec: Some(ServiceSpec {
440 cluster_ip: Some("None".to_string()),
441 selector: Some(selector_labels),
442 ports: Some(vec![
443 ServicePort {
444 name: Some("broker".to_string()),
445 port: 9092,
446 target_port: Some(IntOrString::Int(9092)),
447 protocol: Some("TCP".to_string()),
448 ..Default::default()
449 },
450 ServicePort {
451 name: Some("raft".to_string()),
452 port: 9093,
453 target_port: Some(IntOrString::Int(9093)),
454 protocol: Some("TCP".to_string()),
455 ..Default::default()
456 },
457 ]),
458 publish_not_ready_addresses: Some(true),
459 ..Default::default()
460 }),
461 ..Default::default()
462 }
463 }
464
465 pub fn build_client_service(&self) -> Service {
467 let spec = &self.cluster.spec;
468 let name = self.resource_name();
469 let labels = spec.get_labels(&self.name);
470 let selector_labels = spec.get_selector_labels(&self.name);
471
472 let mut ports = vec![ServicePort {
473 name: Some("broker".to_string()),
474 port: 9092,
475 target_port: Some(IntOrString::Int(9092)),
476 protocol: Some("TCP".to_string()),
477 ..Default::default()
478 }];
479
480 if spec.metrics.enabled {
481 ports.push(ServicePort {
482 name: Some("metrics".to_string()),
483 port: spec.metrics.port,
484 target_port: Some(IntOrString::Int(spec.metrics.port)),
485 protocol: Some("TCP".to_string()),
486 ..Default::default()
487 });
488 }
489
490 Service {
491 metadata: ObjectMeta {
492 name: Some(name),
493 namespace: Some(self.namespace.clone()),
494 labels: Some(labels),
495 owner_references: Some(vec![self.owner_reference()]),
496 ..Default::default()
497 },
498 spec: Some(ServiceSpec {
499 type_: Some("ClusterIP".to_string()),
500 selector: Some(selector_labels),
501 ports: Some(ports),
502 ..Default::default()
503 }),
504 ..Default::default()
505 }
506 }
507
508 pub fn build_configmap(&self) -> ConfigMap {
510 let spec = &self.cluster.spec;
511 let name = format!("{}-config", self.resource_name());
512 let labels = spec.get_labels(&self.name);
513
514 let mut config = BTreeMap::new();
516 config.insert(
517 "default_partitions".to_string(),
518 spec.config.default_partitions.to_string(),
519 );
520 config.insert(
521 "default_replication_factor".to_string(),
522 spec.config.default_replication_factor.to_string(),
523 );
524 config.insert(
525 "log_retention_hours".to_string(),
526 spec.config.log_retention_hours.to_string(),
527 );
528 config.insert(
529 "log_segment_bytes".to_string(),
530 spec.config.log_segment_bytes.to_string(),
531 );
532 config.insert(
533 "max_message_bytes".to_string(),
534 spec.config.max_message_bytes.to_string(),
535 );
536 config.insert(
537 "auto_create_topics".to_string(),
538 spec.config.auto_create_topics.to_string(),
539 );
540 config.insert(
541 "compression_enabled".to_string(),
542 spec.config.compression_enabled.to_string(),
543 );
544 config.insert(
545 "compression_type".to_string(),
546 spec.config.compression_type.clone(),
547 );
548 config.insert(
549 "raft_election_timeout_ms".to_string(),
550 spec.config.raft_election_timeout_ms.to_string(),
551 );
552 config.insert(
553 "raft_heartbeat_interval_ms".to_string(),
554 spec.config.raft_heartbeat_interval_ms.to_string(),
555 );
556
557 for (k, v) in &spec.config.raw {
559 config.insert(k.clone(), v.clone());
560 }
561
562 let config_yaml = serde_yaml::to_string(&config).unwrap_or_default();
563
564 let mut data = BTreeMap::new();
565 data.insert("config.yaml".to_string(), config_yaml);
566
567 ConfigMap {
568 metadata: ObjectMeta {
569 name: Some(name),
570 namespace: Some(self.namespace.clone()),
571 labels: Some(labels),
572 owner_references: Some(vec![self.owner_reference()]),
573 ..Default::default()
574 },
575 data: Some(data),
576 ..Default::default()
577 }
578 }
579
580 pub fn build_pdb(&self) -> Option<PodDisruptionBudget> {
582 let spec = &self.cluster.spec;
583
584 if !spec.pod_disruption_budget.enabled {
585 return None;
586 }
587
588 let name = format!("{}-pdb", self.resource_name());
589 let labels = spec.get_labels(&self.name);
590 let selector_labels = spec.get_selector_labels(&self.name);
591
592 Some(PodDisruptionBudget {
593 metadata: ObjectMeta {
594 name: Some(name),
595 namespace: Some(self.namespace.clone()),
596 labels: Some(labels),
597 owner_references: Some(vec![self.owner_reference()]),
598 ..Default::default()
599 },
600 spec: Some(PodDisruptionBudgetSpec {
601 selector: Some(LabelSelector {
602 match_labels: Some(selector_labels),
603 ..Default::default()
604 }),
605 min_available: spec
606 .pod_disruption_budget
607 .min_available
608 .as_ref()
609 .map(|v| IntOrString::String(v.clone())),
610 max_unavailable: spec
611 .pod_disruption_budget
612 .max_unavailable
613 .as_ref()
614 .map(|v| IntOrString::String(v.clone())),
615 ..Default::default()
616 }),
617 ..Default::default()
618 })
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625 use crate::crd::{BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, StorageSpec, TlsSpec};
626
627 fn create_test_cluster(name: &str) -> RivvenCluster {
628 RivvenCluster {
629 metadata: ObjectMeta {
630 name: Some(name.to_string()),
631 namespace: Some("default".to_string()),
632 uid: Some("test-uid-123".to_string()),
633 ..Default::default()
634 },
635 spec: RivvenClusterSpec {
636 replicas: 3,
637 version: "0.0.1".to_string(),
638 image: None,
639 image_pull_policy: "IfNotPresent".to_string(),
640 image_pull_secrets: vec![],
641 storage: StorageSpec::default(),
642 resources: None,
643 config: BrokerConfig::default(),
644 tls: TlsSpec::default(),
645 metrics: MetricsSpec::default(),
646 affinity: None,
647 node_selector: BTreeMap::new(),
648 tolerations: vec![],
649 pod_disruption_budget: PdbSpec::default(),
650 service_account: None,
651 pod_annotations: BTreeMap::new(),
652 pod_labels: BTreeMap::new(),
653 env: vec![],
654 liveness_probe: ProbeSpec::default(),
655 readiness_probe: ProbeSpec::default(),
656 security_context: None,
657 container_security_context: None,
658 },
659 status: None,
660 }
661 }
662
663 #[test]
664 fn test_build_statefulset() {
665 let cluster = create_test_cluster("my-cluster");
666 let builder = ResourceBuilder::new(&cluster).unwrap();
667 let sts = builder.build_statefulset();
668
669 assert_eq!(sts.metadata.name, Some("rivven-my-cluster".to_string()));
670 assert_eq!(sts.spec.as_ref().unwrap().replicas, Some(3));
671 assert_eq!(
672 sts.spec.as_ref().unwrap().service_name,
673 "rivven-my-cluster-headless"
674 );
675 }
676
677 #[test]
678 fn test_build_headless_service() {
679 let cluster = create_test_cluster("my-cluster");
680 let builder = ResourceBuilder::new(&cluster).unwrap();
681 let svc = builder.build_headless_service();
682
683 assert_eq!(
684 svc.metadata.name,
685 Some("rivven-my-cluster-headless".to_string())
686 );
687 assert_eq!(
688 svc.spec.as_ref().unwrap().cluster_ip,
689 Some("None".to_string())
690 );
691 }
692
693 #[test]
694 fn test_build_client_service() {
695 let cluster = create_test_cluster("my-cluster");
696 let builder = ResourceBuilder::new(&cluster).unwrap();
697 let svc = builder.build_client_service();
698
699 assert_eq!(svc.metadata.name, Some("rivven-my-cluster".to_string()));
700 assert_eq!(
701 svc.spec.as_ref().unwrap().type_,
702 Some("ClusterIP".to_string())
703 );
704 }
705
706 #[test]
707 fn test_build_configmap() {
708 let cluster = create_test_cluster("my-cluster");
709 let builder = ResourceBuilder::new(&cluster).unwrap();
710 let cm = builder.build_configmap();
711
712 assert_eq!(
713 cm.metadata.name,
714 Some("rivven-my-cluster-config".to_string())
715 );
716 assert!(cm.data.is_some());
717 assert!(cm.data.as_ref().unwrap().contains_key("config.yaml"));
718 }
719
720 #[test]
721 fn test_build_pdb() {
722 let cluster = create_test_cluster("my-cluster");
723 let builder = ResourceBuilder::new(&cluster).unwrap();
724 let pdb = builder.build_pdb();
725
726 assert!(pdb.is_some());
727 let pdb = pdb.unwrap();
728 assert_eq!(pdb.metadata.name, Some("rivven-my-cluster-pdb".to_string()));
729 }
730
731 #[test]
732 fn test_owner_references() {
733 let cluster = create_test_cluster("my-cluster");
734 let builder = ResourceBuilder::new(&cluster).unwrap();
735 let sts = builder.build_statefulset();
736
737 let owner_refs = sts.metadata.owner_references.as_ref().unwrap();
738 assert_eq!(owner_refs.len(), 1);
739 assert_eq!(owner_refs[0].kind, "RivvenCluster");
740 assert_eq!(owner_refs[0].name, "my-cluster");
741 }
742
743 #[test]
744 fn test_custom_labels() {
745 let mut cluster = create_test_cluster("my-cluster");
746 cluster
747 .spec
748 .pod_labels
749 .insert("custom".to_string(), "label".to_string());
750
751 let builder = ResourceBuilder::new(&cluster).unwrap();
752 let sts = builder.build_statefulset();
753
754 let pod_labels = sts
755 .spec
756 .as_ref()
757 .unwrap()
758 .template
759 .metadata
760 .as_ref()
761 .unwrap()
762 .labels
763 .as_ref()
764 .unwrap();
765
766 assert_eq!(pod_labels.get("custom"), Some(&"label".to_string()));
767 }
768}