1use crate::crd::{RivvenCluster, RivvenClusterSpec};
7use crate::error::{OperatorError, Result};
8use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
9use k8s_openapi::api::core::v1::{
10 ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
11 PersistentVolumeClaim, PersistentVolumeClaimSpec, PodSpec, PodTemplateSpec, Probe, Service,
12 ServicePort, ServiceSpec, VolumeMount,
13};
14use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
15use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
17use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
18use std::collections::BTreeMap;
19
20pub struct ResourceBuilder<'a> {
22 cluster: &'a RivvenCluster,
23 name: String,
24 namespace: String,
25}
26
27impl<'a> ResourceBuilder<'a> {
28 pub fn new(cluster: &'a RivvenCluster) -> Result<Self> {
30 let name =
31 cluster.metadata.name.clone().ok_or_else(|| {
32 OperatorError::InvalidConfig("cluster name is required".to_string())
33 })?;
34
35 let namespace = cluster
36 .metadata
37 .namespace
38 .clone()
39 .unwrap_or_else(|| "default".to_string());
40
41 Ok(Self {
42 cluster,
43 name,
44 namespace,
45 })
46 }
47
48 fn resource_name(&self) -> String {
50 format!("rivven-{}", self.name)
51 }
52
53 fn owner_reference(&self) -> OwnerReference {
55 OwnerReference {
56 api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
57 kind: "RivvenCluster".to_string(),
58 name: self.name.clone(),
59 uid: self.cluster.metadata.uid.clone().unwrap_or_default(),
60 controller: Some(true),
61 block_owner_deletion: Some(true),
62 }
63 }
64
65 pub fn build_statefulset(&self) -> StatefulSet {
67 let spec = &self.cluster.spec;
68 let name = self.resource_name();
69 let labels = spec.get_labels(&self.name);
70 let selector_labels = spec.get_selector_labels(&self.name);
71
72 let container = self.build_container(spec);
74
75 let pvc_template = self.build_pvc_template(spec);
77
78 let mut pod_labels = selector_labels.clone();
80 pod_labels.extend(spec.pod_labels.clone());
81
82 let mut pod_annotations = BTreeMap::new();
83 if spec.metrics.enabled {
84 pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
85 pod_annotations.insert(
86 "prometheus.io/port".to_string(),
87 spec.metrics.port.to_string(),
88 );
89 }
90 pod_annotations.extend(spec.pod_annotations.clone());
91
92 let pod_security_context = spec.security_context.clone().or_else(|| {
94 Some(k8s_openapi::api::core::v1::PodSecurityContext {
95 run_as_non_root: Some(true),
97 run_as_user: Some(1000),
99 run_as_group: Some(1000),
100 fs_group: Some(1000),
101 seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
103 type_: "RuntimeDefault".to_string(),
104 ..Default::default()
105 }),
106 ..Default::default()
107 })
108 });
109
110 let volumes = if spec.tls.enabled {
112 spec.tls.cert_secret_name.as_ref().map(|secret_name| {
113 vec![k8s_openapi::api::core::v1::Volume {
114 name: "tls".to_string(),
115 secret: Some(k8s_openapi::api::core::v1::SecretVolumeSource {
116 secret_name: Some(secret_name.clone()),
117 default_mode: Some(0o400),
118 ..Default::default()
119 }),
120 ..Default::default()
121 }]
122 })
123 } else {
124 None
125 };
126
127 let pod_spec = PodSpec {
128 containers: vec![container],
129 volumes,
130 affinity: spec.affinity.clone(),
131 security_context: pod_security_context,
132 node_selector: if spec.node_selector.is_empty() {
133 None
134 } else {
135 Some(spec.node_selector.clone())
136 },
137 tolerations: if spec.tolerations.is_empty() {
138 None
139 } else {
140 Some(spec.tolerations.clone())
141 },
142 service_account_name: spec.service_account.clone(),
143 image_pull_secrets: if spec.image_pull_secrets.is_empty() {
144 None
145 } else {
146 Some(
147 spec.image_pull_secrets
148 .iter()
149 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference {
150 name: s.clone(),
151 })
152 .collect(),
153 )
154 },
155 automount_service_account_token: Some(false),
157 ..Default::default()
158 };
159
160 StatefulSet {
161 metadata: ObjectMeta {
162 name: Some(name.clone()),
163 namespace: Some(self.namespace.clone()),
164 labels: Some(labels.clone()),
165 owner_references: Some(vec![self.owner_reference()]),
166 ..Default::default()
167 },
168 spec: Some(StatefulSetSpec {
169 service_name: format!("{}-headless", name),
170 replicas: Some(spec.replicas),
171 selector: LabelSelector {
172 match_labels: Some(selector_labels),
173 ..Default::default()
174 },
175 template: PodTemplateSpec {
176 metadata: Some(ObjectMeta {
177 labels: Some(pod_labels),
178 annotations: Some(pod_annotations),
179 ..Default::default()
180 }),
181 spec: Some(pod_spec),
182 },
183 volume_claim_templates: Some(vec![pvc_template]),
184 pod_management_policy: Some("Parallel".to_string()),
185 update_strategy: Some(k8s_openapi::api::apps::v1::StatefulSetUpdateStrategy {
186 type_: Some("RollingUpdate".to_string()),
187 rolling_update: Some(
188 k8s_openapi::api::apps::v1::RollingUpdateStatefulSetStrategy {
189 max_unavailable: Some(IntOrString::Int(1)),
190 partition: Some(0),
191 },
192 ),
193 }),
194 ..Default::default()
195 }),
196 ..Default::default()
197 }
198 }
199
200 fn build_container(&self, spec: &RivvenClusterSpec) -> Container {
202 let name = self.resource_name();
203
204 let mut env = vec![
206 EnvVar {
207 name: "RIVVEN_DATA_DIR".to_string(),
208 value: Some("/data".to_string()),
209 ..Default::default()
210 },
211 EnvVar {
212 name: "RIVVEN_BIND_ADDRESS".to_string(),
213 value: Some("0.0.0.0".to_string()),
214 ..Default::default()
215 },
216 EnvVar {
217 name: "RIVVEN_PORT".to_string(),
218 value: Some("9092".to_string()),
219 ..Default::default()
220 },
221 EnvVar {
222 name: "RIVVEN_CLUSTER_NAME".to_string(),
223 value: Some(self.name.clone()),
224 ..Default::default()
225 },
226 EnvVar {
227 name: "RIVVEN_POD_NAME".to_string(),
228 value_from: Some(EnvVarSource {
229 field_ref: Some(ObjectFieldSelector {
230 field_path: "metadata.name".to_string(),
231 ..Default::default()
232 }),
233 ..Default::default()
234 }),
235 ..Default::default()
236 },
237 EnvVar {
238 name: "RIVVEN_POD_NAMESPACE".to_string(),
239 value_from: Some(EnvVarSource {
240 field_ref: Some(ObjectFieldSelector {
241 field_path: "metadata.namespace".to_string(),
242 ..Default::default()
243 }),
244 ..Default::default()
245 }),
246 ..Default::default()
247 },
248 EnvVar {
249 name: "RIVVEN_SERVICE_NAME".to_string(),
250 value: Some(format!("{}-headless", name)),
251 ..Default::default()
252 },
253 EnvVar {
255 name: "RIVVEN_DEFAULT_PARTITIONS".to_string(),
256 value: Some(spec.config.default_partitions.to_string()),
257 ..Default::default()
258 },
259 EnvVar {
260 name: "RIVVEN_DEFAULT_REPLICATION_FACTOR".to_string(),
261 value: Some(spec.config.default_replication_factor.to_string()),
262 ..Default::default()
263 },
264 EnvVar {
265 name: "RIVVEN_COMPRESSION".to_string(),
266 value: Some(spec.config.compression_type.clone()),
267 ..Default::default()
268 },
269 ];
270
271 if spec.metrics.enabled {
273 env.push(EnvVar {
274 name: "RIVVEN_METRICS_ENABLED".to_string(),
275 value: Some("true".to_string()),
276 ..Default::default()
277 });
278 env.push(EnvVar {
279 name: "RIVVEN_METRICS_PORT".to_string(),
280 value: Some(spec.metrics.port.to_string()),
281 ..Default::default()
282 });
283 }
284
285 if spec.tls.enabled {
287 env.push(EnvVar {
288 name: "RIVVEN_TLS_ENABLED".to_string(),
289 value: Some("true".to_string()),
290 ..Default::default()
291 });
292 if let Some(ref _cert_secret) = spec.tls.cert_secret_name {
293 env.push(EnvVar {
294 name: "RIVVEN_TLS_CERT_PATH".to_string(),
295 value: Some("/tls/tls.crt".to_string()),
296 ..Default::default()
297 });
298 env.push(EnvVar {
299 name: "RIVVEN_TLS_KEY_PATH".to_string(),
300 value: Some("/tls/tls.key".to_string()),
301 ..Default::default()
302 });
303 }
304 }
305
306 env.extend(spec.env.clone());
308
309 let mut ports = vec![ContainerPort {
311 name: Some("broker".to_string()),
312 container_port: 9092,
313 protocol: Some("TCP".to_string()),
314 ..Default::default()
315 }];
316
317 if spec.metrics.enabled {
318 ports.push(ContainerPort {
319 name: Some("metrics".to_string()),
320 container_port: spec.metrics.port,
321 protocol: Some("TCP".to_string()),
322 ..Default::default()
323 });
324 }
325
326 let liveness_probe = if spec.liveness_probe.enabled {
328 Some(Probe {
329 http_get: Some(HTTPGetAction {
330 path: Some("/health".to_string()),
331 port: IntOrString::Int(9092),
332 scheme: Some("HTTP".to_string()),
333 ..Default::default()
334 }),
335 initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
336 period_seconds: Some(spec.liveness_probe.period_seconds),
337 timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
338 success_threshold: Some(spec.liveness_probe.success_threshold),
339 failure_threshold: Some(spec.liveness_probe.failure_threshold),
340 ..Default::default()
341 })
342 } else {
343 None
344 };
345
346 let readiness_probe = if spec.readiness_probe.enabled {
347 Some(Probe {
348 http_get: Some(HTTPGetAction {
349 path: Some("/ready".to_string()),
350 port: IntOrString::Int(9092),
351 scheme: Some("HTTP".to_string()),
352 ..Default::default()
353 }),
354 initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
355 period_seconds: Some(spec.readiness_probe.period_seconds),
356 timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
357 success_threshold: Some(spec.readiness_probe.success_threshold),
358 failure_threshold: Some(spec.readiness_probe.failure_threshold),
359 ..Default::default()
360 })
361 } else {
362 None
363 };
364
365 let mut volume_mounts = vec![VolumeMount {
367 name: "data".to_string(),
368 mount_path: "/data".to_string(),
369 ..Default::default()
370 }];
371
372 if spec.tls.enabled && spec.tls.cert_secret_name.is_some() {
374 volume_mounts.push(VolumeMount {
375 name: "tls".to_string(),
376 mount_path: "/tls".to_string(),
377 read_only: Some(true),
378 ..Default::default()
379 });
380 }
381
382 let container_security_context = spec.container_security_context.clone().or_else(|| {
384 Some(k8s_openapi::api::core::v1::SecurityContext {
385 allow_privilege_escalation: Some(false),
387 read_only_root_filesystem: Some(true),
389 run_as_non_root: Some(true),
391 run_as_user: Some(1000),
392 run_as_group: Some(1000),
393 capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
395 drop: Some(vec!["ALL".to_string()]),
396 ..Default::default()
397 }),
398 seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
400 type_: "RuntimeDefault".to_string(),
401 ..Default::default()
402 }),
403 ..Default::default()
404 })
405 });
406
407 Container {
408 name: "rivven".to_string(),
409 image: Some(spec.get_image()),
410 image_pull_policy: Some(spec.image_pull_policy.clone()),
411 command: Some(vec!["rivvend".to_string()]),
412 args: Some(vec![
413 "--config".to_string(),
414 "/etc/rivven/config.yaml".to_string(),
415 ]),
416 env: Some(env),
417 ports: Some(ports),
418 resources: spec.resources.clone(),
419 liveness_probe,
420 readiness_probe,
421 volume_mounts: Some(volume_mounts),
422 security_context: container_security_context,
423 ..Default::default()
424 }
425 }
426
427 fn build_pvc_template(&self, spec: &RivvenClusterSpec) -> PersistentVolumeClaim {
429 let mut resources = BTreeMap::new();
430 resources.insert("storage".to_string(), Quantity(spec.storage.size.clone()));
431
432 PersistentVolumeClaim {
433 metadata: ObjectMeta {
434 name: Some("data".to_string()),
435 ..Default::default()
436 },
437 spec: Some(PersistentVolumeClaimSpec {
438 access_modes: Some(spec.storage.access_modes.clone()),
439 storage_class_name: spec.storage.storage_class_name.clone(),
440 resources: Some(k8s_openapi::api::core::v1::VolumeResourceRequirements {
441 requests: Some(resources),
442 ..Default::default()
443 }),
444 ..Default::default()
445 }),
446 ..Default::default()
447 }
448 }
449
450 pub fn build_headless_service(&self) -> Service {
452 let spec = &self.cluster.spec;
453 let name = format!("{}-headless", self.resource_name());
454 let labels = spec.get_labels(&self.name);
455 let selector_labels = spec.get_selector_labels(&self.name);
456
457 Service {
458 metadata: ObjectMeta {
459 name: Some(name),
460 namespace: Some(self.namespace.clone()),
461 labels: Some(labels),
462 owner_references: Some(vec![self.owner_reference()]),
463 ..Default::default()
464 },
465 spec: Some(ServiceSpec {
466 cluster_ip: Some("None".to_string()),
467 selector: Some(selector_labels),
468 ports: Some(vec![
469 ServicePort {
470 name: Some("broker".to_string()),
471 port: 9092,
472 target_port: Some(IntOrString::Int(9092)),
473 protocol: Some("TCP".to_string()),
474 ..Default::default()
475 },
476 ServicePort {
477 name: Some("raft".to_string()),
478 port: 9093,
479 target_port: Some(IntOrString::Int(9093)),
480 protocol: Some("TCP".to_string()),
481 ..Default::default()
482 },
483 ]),
484 publish_not_ready_addresses: Some(true),
485 ..Default::default()
486 }),
487 ..Default::default()
488 }
489 }
490
491 pub fn build_client_service(&self) -> Service {
493 let spec = &self.cluster.spec;
494 let name = self.resource_name();
495 let labels = spec.get_labels(&self.name);
496 let selector_labels = spec.get_selector_labels(&self.name);
497
498 let mut ports = vec![ServicePort {
499 name: Some("broker".to_string()),
500 port: 9092,
501 target_port: Some(IntOrString::Int(9092)),
502 protocol: Some("TCP".to_string()),
503 ..Default::default()
504 }];
505
506 if spec.metrics.enabled {
507 ports.push(ServicePort {
508 name: Some("metrics".to_string()),
509 port: spec.metrics.port,
510 target_port: Some(IntOrString::Int(spec.metrics.port)),
511 protocol: Some("TCP".to_string()),
512 ..Default::default()
513 });
514 }
515
516 Service {
517 metadata: ObjectMeta {
518 name: Some(name),
519 namespace: Some(self.namespace.clone()),
520 labels: Some(labels),
521 owner_references: Some(vec![self.owner_reference()]),
522 ..Default::default()
523 },
524 spec: Some(ServiceSpec {
525 type_: Some("ClusterIP".to_string()),
526 selector: Some(selector_labels),
527 ports: Some(ports),
528 ..Default::default()
529 }),
530 ..Default::default()
531 }
532 }
533
534 pub fn build_configmap(&self) -> Result<ConfigMap> {
536 let spec = &self.cluster.spec;
537 let name = format!("{}-config", self.resource_name());
538 let labels = spec.get_labels(&self.name);
539
540 let mut config = BTreeMap::new();
542 config.insert(
543 "default_partitions".to_string(),
544 spec.config.default_partitions.to_string(),
545 );
546 config.insert(
547 "default_replication_factor".to_string(),
548 spec.config.default_replication_factor.to_string(),
549 );
550 config.insert(
551 "log_retention_hours".to_string(),
552 spec.config.log_retention_hours.to_string(),
553 );
554 config.insert(
555 "log_segment_bytes".to_string(),
556 spec.config.log_segment_bytes.to_string(),
557 );
558 config.insert(
559 "max_message_bytes".to_string(),
560 spec.config.max_message_bytes.to_string(),
561 );
562 config.insert(
563 "auto_create_topics".to_string(),
564 spec.config.auto_create_topics.to_string(),
565 );
566 config.insert(
567 "compression_enabled".to_string(),
568 spec.config.compression_enabled.to_string(),
569 );
570 config.insert(
571 "compression_type".to_string(),
572 spec.config.compression_type.clone(),
573 );
574 config.insert(
575 "raft_election_timeout_ms".to_string(),
576 spec.config.raft_election_timeout_ms.to_string(),
577 );
578 config.insert(
579 "raft_heartbeat_interval_ms".to_string(),
580 spec.config.raft_heartbeat_interval_ms.to_string(),
581 );
582
583 for (k, v) in &spec.config.raw {
585 config.insert(k.clone(), v.clone());
586 }
587
588 let config_yaml = serde_yaml::to_string(&config)?;
589
590 let mut data = BTreeMap::new();
591 data.insert("config.yaml".to_string(), config_yaml);
592
593 Ok(ConfigMap {
594 metadata: ObjectMeta {
595 name: Some(name),
596 namespace: Some(self.namespace.clone()),
597 labels: Some(labels),
598 owner_references: Some(vec![self.owner_reference()]),
599 ..Default::default()
600 },
601 data: Some(data),
602 ..Default::default()
603 })
604 }
605
606 pub fn build_pdb(&self) -> Option<PodDisruptionBudget> {
608 let spec = &self.cluster.spec;
609
610 if !spec.pod_disruption_budget.enabled {
611 return None;
612 }
613
614 let name = format!("{}-pdb", self.resource_name());
615 let labels = spec.get_labels(&self.name);
616 let selector_labels = spec.get_selector_labels(&self.name);
617
618 Some(PodDisruptionBudget {
619 metadata: ObjectMeta {
620 name: Some(name),
621 namespace: Some(self.namespace.clone()),
622 labels: Some(labels),
623 owner_references: Some(vec![self.owner_reference()]),
624 ..Default::default()
625 },
626 spec: Some(PodDisruptionBudgetSpec {
627 selector: Some(LabelSelector {
628 match_labels: Some(selector_labels),
629 ..Default::default()
630 }),
631 min_available: spec
632 .pod_disruption_budget
633 .min_available
634 .as_ref()
635 .map(|v| IntOrString::String(v.clone())),
636 max_unavailable: spec
637 .pod_disruption_budget
638 .max_unavailable
639 .as_ref()
640 .map(|v| IntOrString::String(v.clone())),
641 ..Default::default()
642 }),
643 ..Default::default()
644 })
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651 use crate::crd::{BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, StorageSpec, TlsSpec};
652
653 fn create_test_cluster(name: &str) -> RivvenCluster {
654 RivvenCluster {
655 metadata: ObjectMeta {
656 name: Some(name.to_string()),
657 namespace: Some("default".to_string()),
658 uid: Some("test-uid-123".to_string()),
659 ..Default::default()
660 },
661 spec: RivvenClusterSpec {
662 replicas: 3,
663 version: "0.0.1".to_string(),
664 image: None,
665 image_pull_policy: "IfNotPresent".to_string(),
666 image_pull_secrets: vec![],
667 storage: StorageSpec::default(),
668 resources: None,
669 config: BrokerConfig::default(),
670 tls: TlsSpec::default(),
671 metrics: MetricsSpec::default(),
672 affinity: None,
673 node_selector: BTreeMap::new(),
674 tolerations: vec![],
675 pod_disruption_budget: PdbSpec::default(),
676 service_account: None,
677 pod_annotations: BTreeMap::new(),
678 pod_labels: BTreeMap::new(),
679 env: vec![],
680 liveness_probe: ProbeSpec::default(),
681 readiness_probe: ProbeSpec::default(),
682 security_context: None,
683 container_security_context: None,
684 },
685 status: None,
686 }
687 }
688
689 #[test]
690 fn test_build_statefulset() {
691 let cluster = create_test_cluster("my-cluster");
692 let builder = ResourceBuilder::new(&cluster).unwrap();
693 let sts = builder.build_statefulset();
694
695 assert_eq!(sts.metadata.name, Some("rivven-my-cluster".to_string()));
696 assert_eq!(sts.spec.as_ref().unwrap().replicas, Some(3));
697 assert_eq!(
698 sts.spec.as_ref().unwrap().service_name,
699 "rivven-my-cluster-headless"
700 );
701 }
702
703 #[test]
704 fn test_build_headless_service() {
705 let cluster = create_test_cluster("my-cluster");
706 let builder = ResourceBuilder::new(&cluster).unwrap();
707 let svc = builder.build_headless_service();
708
709 assert_eq!(
710 svc.metadata.name,
711 Some("rivven-my-cluster-headless".to_string())
712 );
713 assert_eq!(
714 svc.spec.as_ref().unwrap().cluster_ip,
715 Some("None".to_string())
716 );
717 }
718
719 #[test]
720 fn test_build_client_service() {
721 let cluster = create_test_cluster("my-cluster");
722 let builder = ResourceBuilder::new(&cluster).unwrap();
723 let svc = builder.build_client_service();
724
725 assert_eq!(svc.metadata.name, Some("rivven-my-cluster".to_string()));
726 assert_eq!(
727 svc.spec.as_ref().unwrap().type_,
728 Some("ClusterIP".to_string())
729 );
730 }
731
732 #[test]
733 fn test_build_configmap() {
734 let cluster = create_test_cluster("my-cluster");
735 let builder = ResourceBuilder::new(&cluster).unwrap();
736 let cm = builder.build_configmap().unwrap();
737
738 assert_eq!(
739 cm.metadata.name,
740 Some("rivven-my-cluster-config".to_string())
741 );
742 assert!(cm.data.is_some());
743 assert!(cm.data.as_ref().unwrap().contains_key("config.yaml"));
744 }
745
746 #[test]
747 fn test_build_pdb() {
748 let cluster = create_test_cluster("my-cluster");
749 let builder = ResourceBuilder::new(&cluster).unwrap();
750 let pdb = builder.build_pdb();
751
752 assert!(pdb.is_some());
753 let pdb = pdb.unwrap();
754 assert_eq!(pdb.metadata.name, Some("rivven-my-cluster-pdb".to_string()));
755 }
756
757 #[test]
758 fn test_owner_references() {
759 let cluster = create_test_cluster("my-cluster");
760 let builder = ResourceBuilder::new(&cluster).unwrap();
761 let sts = builder.build_statefulset();
762
763 let owner_refs = sts.metadata.owner_references.as_ref().unwrap();
764 assert_eq!(owner_refs.len(), 1);
765 assert_eq!(owner_refs[0].kind, "RivvenCluster");
766 assert_eq!(owner_refs[0].name, "my-cluster");
767 }
768
769 #[test]
770 fn test_custom_labels() {
771 let mut cluster = create_test_cluster("my-cluster");
772 cluster
773 .spec
774 .pod_labels
775 .insert("custom".to_string(), "label".to_string());
776
777 let builder = ResourceBuilder::new(&cluster).unwrap();
778 let sts = builder.build_statefulset();
779
780 let pod_labels = sts
781 .spec
782 .as_ref()
783 .unwrap()
784 .template
785 .metadata
786 .as_ref()
787 .unwrap()
788 .labels
789 .as_ref()
790 .unwrap();
791
792 assert_eq!(pod_labels.get("custom"), Some(&"label".to_string()));
793 }
794}