1use crate::crd::{
7 ClusterReference, RivvenSchemaRegistry, RivvenSchemaRegistrySpec, RivvenSchemaRegistryStatus,
8 SchemaRegistryCondition, SchemaRegistryPhase,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
14use k8s_openapi::api::core::v1::{
15 ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
16 PodSpec, PodTemplateSpec, Probe, Service, ServicePort, ServiceSpec, VolumeMount,
17};
18use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
19use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
20use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
21use kube::api::{Api, Patch, PatchParams};
22use kube::runtime::controller::{Action, Controller};
23use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
24use kube::runtime::watcher::Config;
25use kube::{Client, ResourceExt};
26use std::collections::BTreeMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tracing::{debug, error, info, instrument, warn};
30use validator::Validate;
31
32pub const SCHEMA_REGISTRY_FINALIZER: &str = "rivven.hupe1980.github.io/schema-registry-finalizer";
34
35const DEFAULT_REQUEUE_SECONDS: u64 = 60; const ERROR_REQUEUE_SECONDS: u64 = 30;
40
41pub struct SchemaRegistryControllerContext {
43 pub client: Client,
45 pub metrics: Option<SchemaRegistryControllerMetrics>,
47}
48
49#[derive(Clone)]
51pub struct SchemaRegistryControllerMetrics {
52 pub reconciliations: metrics::Counter,
54 pub errors: metrics::Counter,
56 pub duration: metrics::Histogram,
58}
59
60impl SchemaRegistryControllerMetrics {
61 pub fn new() -> Self {
63 Self {
64 reconciliations: metrics::counter!("rivven_schema_registry_reconciliations_total"),
65 errors: metrics::counter!("rivven_schema_registry_reconciliation_errors_total"),
66 duration: metrics::histogram!("rivven_schema_registry_reconciliation_duration_seconds"),
67 }
68 }
69}
70
71impl Default for SchemaRegistryControllerMetrics {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77pub async fn run_schema_registry_controller(
79 client: Client,
80 namespace: Option<String>,
81) -> Result<()> {
82 let registries: Api<RivvenSchemaRegistry> = match &namespace {
83 Some(ns) => Api::namespaced(client.clone(), ns),
84 None => Api::all(client.clone()),
85 };
86
87 let ctx = Arc::new(SchemaRegistryControllerContext {
88 client: client.clone(),
89 metrics: Some(SchemaRegistryControllerMetrics::new()),
90 });
91
92 info!(
93 namespace = namespace.as_deref().unwrap_or("all"),
94 "Starting RivvenSchemaRegistry controller"
95 );
96
97 let deployments = match &namespace {
99 Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
100 None => Api::<Deployment>::all(client.clone()),
101 };
102
103 let configmaps = match &namespace {
104 Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
105 None => Api::<ConfigMap>::all(client.clone()),
106 };
107
108 Controller::new(registries.clone(), Config::default())
109 .owns(deployments, Config::default())
110 .owns(configmaps, Config::default())
111 .run(reconcile_schema_registry, schema_registry_error_policy, ctx)
112 .for_each(|result| async move {
113 match result {
114 Ok((obj, action)) => {
115 debug!(
116 name = obj.name,
117 namespace = obj.namespace,
118 ?action,
119 "Schema registry reconciliation completed"
120 );
121 }
122 Err(e) => {
123 error!(error = %e, "Schema registry reconciliation failed");
124 }
125 }
126 })
127 .await;
128
129 Ok(())
130}
131
132#[instrument(skip(registry, ctx), fields(name = %registry.name_any(), namespace = registry.namespace()))]
134async fn reconcile_schema_registry(
135 registry: Arc<RivvenSchemaRegistry>,
136 ctx: Arc<SchemaRegistryControllerContext>,
137) -> Result<Action> {
138 let start = std::time::Instant::now();
139
140 if let Some(ref metrics) = ctx.metrics {
141 metrics.reconciliations.increment(1);
142 }
143
144 let namespace = registry
145 .namespace()
146 .unwrap_or_else(|| "default".to_string());
147 let registries: Api<RivvenSchemaRegistry> = Api::namespaced(ctx.client.clone(), &namespace);
148
149 let result = finalizer(
150 ®istries,
151 SCHEMA_REGISTRY_FINALIZER,
152 registry,
153 |event| async {
154 match event {
155 FinalizerEvent::Apply(registry) => {
156 apply_schema_registry(registry, ctx.clone()).await
157 }
158 FinalizerEvent::Cleanup(registry) => {
159 cleanup_schema_registry(registry, ctx.clone()).await
160 }
161 }
162 },
163 )
164 .await;
165
166 if let Some(ref metrics) = ctx.metrics {
167 metrics.duration.record(start.elapsed().as_secs_f64());
168 }
169
170 result.map_err(|e| {
171 if let Some(ref metrics) = ctx.metrics {
172 metrics.errors.increment(1);
173 }
174 OperatorError::ReconcileFailed(e.to_string())
175 })
176}
177
178#[instrument(skip(registry, ctx))]
180async fn apply_schema_registry(
181 registry: Arc<RivvenSchemaRegistry>,
182 ctx: Arc<SchemaRegistryControllerContext>,
183) -> Result<Action> {
184 let name = registry.name_any();
185 let namespace = registry
186 .namespace()
187 .unwrap_or_else(|| "default".to_string());
188
189 info!(name = %name, namespace = %namespace, "Reconciling RivvenSchemaRegistry");
190
191 if let Err(errors) = registry.spec.validate() {
193 let error_messages: Vec<String> = errors
194 .field_errors()
195 .iter()
196 .flat_map(|(field, errs)| {
197 errs.iter()
198 .map(move |e| format!("{}: {:?}", field, e.message))
199 })
200 .collect();
201 let error_msg = error_messages.join("; ");
202 warn!(name = %name, errors = %error_msg, "Schema registry spec validation failed");
203
204 update_schema_registry_status(
206 &ctx.client,
207 &namespace,
208 &name,
209 build_failed_status(®istry, &error_msg),
210 )
211 .await?;
212
213 return Err(OperatorError::InvalidConfig(error_msg));
214 }
215
216 verify_cluster_ref(&ctx.client, &namespace, ®istry.spec.cluster_ref).await?;
218
219 let configmap = build_registry_configmap(®istry)?;
221 apply_registry_configmap(&ctx.client, &namespace, configmap).await?;
222
223 let deployment = build_registry_deployment(®istry)?;
225 let deployment_status = apply_registry_deployment(&ctx.client, &namespace, deployment).await?;
226
227 let service = build_registry_service(®istry)?;
229 apply_registry_service(&ctx.client, &namespace, service).await?;
230
231 if registry.spec.pod_disruption_budget.enabled {
233 let pdb = build_registry_pdb(®istry)?;
234 apply_registry_pdb(&ctx.client, &namespace, pdb).await?;
235 }
236
237 let status = build_running_status(®istry, &deployment_status);
239 update_schema_registry_status(&ctx.client, &namespace, &name, status).await?;
240
241 Ok(Action::requeue(Duration::from_secs(
242 DEFAULT_REQUEUE_SECONDS,
243 )))
244}
245
246#[instrument(skip(registry, _ctx))]
248async fn cleanup_schema_registry(
249 registry: Arc<RivvenSchemaRegistry>,
250 _ctx: Arc<SchemaRegistryControllerContext>,
251) -> Result<Action> {
252 let name = registry.name_any();
253 let namespace = registry
254 .namespace()
255 .unwrap_or_else(|| "default".to_string());
256
257 info!(name = %name, namespace = %namespace, "Cleaning up RivvenSchemaRegistry");
258
259 Ok(Action::await_change())
263}
264
265fn schema_registry_error_policy(
267 _registry: Arc<RivvenSchemaRegistry>,
268 error: &OperatorError,
269 _ctx: Arc<SchemaRegistryControllerContext>,
270) -> Action {
271 warn!(error = %error, "Schema registry reconciliation error, will retry");
272 Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS))
273}
274
275async fn verify_cluster_ref(
277 client: &Client,
278 namespace: &str,
279 cluster_ref: &ClusterReference,
280) -> Result<()> {
281 use crate::crd::RivvenCluster;
282
283 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
284 let clusters: Api<RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
285
286 clusters
287 .get(&cluster_ref.name)
288 .await
289 .map_err(|e| OperatorError::ClusterNotFound(format!("{}: {}", cluster_ref.name, e)))?;
290
291 Ok(())
292}
293
294fn build_registry_configmap(registry: &RivvenSchemaRegistry) -> Result<ConfigMap> {
296 let name = registry.name_any();
297 let namespace = registry
298 .namespace()
299 .unwrap_or_else(|| "default".to_string());
300 let resource_name = format!("rivven-schema-registry-{}", name);
301
302 let spec = ®istry.spec;
303 let labels = spec.get_labels(&name);
304
305 let config = build_registry_config_yaml(spec)?;
307
308 Ok(ConfigMap {
309 metadata: ObjectMeta {
310 name: Some(format!("{}-config", resource_name)),
311 namespace: Some(namespace),
312 labels: Some(labels),
313 owner_references: Some(vec![owner_reference(registry)]),
314 ..Default::default()
315 },
316 data: Some(BTreeMap::from([("config.yaml".to_string(), config)])),
317 ..Default::default()
318 })
319}
320
321fn build_registry_config_yaml(spec: &RivvenSchemaRegistrySpec) -> Result<String> {
323 let mut config = String::new();
324
325 config.push_str(&format!(
327 "server:\n port: {}\n bind_address: \"{}\"\n timeout_seconds: {}\n max_request_size: {}\n",
328 spec.server.port,
329 spec.server.bind_address,
330 spec.server.timeout_seconds,
331 spec.server.max_request_size,
332 ));
333
334 config.push_str(&format!(
336 "\nstorage:\n mode: \"{}\"\n topic: \"{}\"\n replication_factor: {}\n partitions: {}\n normalize: {}\n",
337 spec.storage.mode,
338 spec.storage.topic,
339 spec.storage.replication_factor,
340 spec.storage.partitions,
341 spec.storage.normalize,
342 ));
343
344 config.push_str(&format!(
346 "\ncompatibility:\n default_level: \"{}\"\n allow_overrides: {}\n",
347 spec.compatibility.default_level, spec.compatibility.allow_overrides,
348 ));
349
350 config.push_str(&format!(
352 "\nschemas:\n avro: {}\n json_schema: {}\n protobuf: {}\n strict_validation: {}\n",
353 spec.schemas.avro,
354 spec.schemas.json_schema,
355 spec.schemas.protobuf,
356 spec.schemas.strict_validation,
357 ));
358
359 if spec.contexts.enabled {
361 config.push_str(&format!(
362 "\ncontexts:\n enabled: true\n max_contexts: {}\n",
363 spec.contexts.max_contexts,
364 ));
365 }
366
367 if spec.metrics.enabled {
369 config.push_str(&format!(
370 "\nmetrics:\n enabled: true\n port: {}\n path: \"{}\"\n",
371 spec.metrics.port, spec.metrics.path,
372 ));
373 }
374
375 Ok(config)
376}
377
378fn build_registry_deployment(registry: &RivvenSchemaRegistry) -> Result<Deployment> {
380 let name = registry.name_any();
381 let namespace = registry
382 .namespace()
383 .unwrap_or_else(|| "default".to_string());
384 let resource_name = format!("rivven-schema-registry-{}", name);
385
386 let spec = ®istry.spec;
387 let labels = spec.get_labels(&name);
388 let selector_labels = spec.get_selector_labels(&name);
389
390 let container = build_registry_container(spec, &resource_name);
392
393 let mut pod_labels = selector_labels.clone();
395 pod_labels.extend(spec.pod_labels.clone());
396
397 let mut pod_annotations = BTreeMap::new();
399 if spec.metrics.enabled {
400 pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
401 pod_annotations.insert(
402 "prometheus.io/port".to_string(),
403 spec.metrics.port.to_string(),
404 );
405 pod_annotations.insert("prometheus.io/path".to_string(), spec.metrics.path.clone());
406 }
407 pod_annotations.extend(spec.pod_annotations.clone());
408
409 let pod_spec = PodSpec {
410 containers: vec![container],
411 node_selector: if spec.node_selector.is_empty() {
412 None
413 } else {
414 Some(spec.node_selector.clone())
415 },
416 tolerations: if spec.tolerations.is_empty() {
417 None
418 } else {
419 Some(spec.tolerations.clone())
420 },
421 service_account_name: spec.service_account.clone(),
422 image_pull_secrets: if spec.image_pull_secrets.is_empty() {
423 None
424 } else {
425 Some(
426 spec.image_pull_secrets
427 .iter()
428 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
429 .collect(),
430 )
431 },
432 automount_service_account_token: Some(false),
434 ..Default::default()
435 };
436
437 Ok(Deployment {
438 metadata: ObjectMeta {
439 name: Some(resource_name.clone()),
440 namespace: Some(namespace),
441 labels: Some(labels.clone()),
442 owner_references: Some(vec![owner_reference(registry)]),
443 ..Default::default()
444 },
445 spec: Some(DeploymentSpec {
446 replicas: Some(spec.replicas),
447 selector: LabelSelector {
448 match_labels: Some(selector_labels),
449 ..Default::default()
450 },
451 template: PodTemplateSpec {
452 metadata: Some(ObjectMeta {
453 labels: Some(pod_labels),
454 annotations: Some(pod_annotations),
455 ..Default::default()
456 }),
457 spec: Some(pod_spec),
458 },
459 ..Default::default()
460 }),
461 ..Default::default()
462 })
463}
464
465fn build_registry_container(spec: &RivvenSchemaRegistrySpec, _resource_name: &str) -> Container {
467 let mut env = vec![
469 EnvVar {
470 name: "RIVVEN_SCHEMA_PORT".to_string(),
471 value: Some(spec.server.port.to_string()),
472 ..Default::default()
473 },
474 EnvVar {
475 name: "RIVVEN_SCHEMA_BIND_ADDRESS".to_string(),
476 value: Some(spec.server.bind_address.clone()),
477 ..Default::default()
478 },
479 EnvVar {
480 name: "RIVVEN_SCHEMA_STORAGE_MODE".to_string(),
481 value: Some(spec.storage.mode.clone()),
482 ..Default::default()
483 },
484 EnvVar {
485 name: "RIVVEN_SCHEMA_STORAGE_TOPIC".to_string(),
486 value: Some(spec.storage.topic.clone()),
487 ..Default::default()
488 },
489 EnvVar {
490 name: "RIVVEN_SCHEMA_COMPATIBILITY".to_string(),
491 value: Some(spec.compatibility.default_level.clone()),
492 ..Default::default()
493 },
494 EnvVar {
496 name: "RIVVEN_BROKERS".to_string(),
497 value: Some(format!(
498 "rivven-{}-headless.{}.svc.cluster.local:9092",
499 spec.cluster_ref.name,
500 spec.cluster_ref.namespace.as_deref().unwrap_or("default")
501 )),
502 ..Default::default()
503 },
504 EnvVar {
506 name: "POD_NAME".to_string(),
507 value_from: Some(EnvVarSource {
508 field_ref: Some(ObjectFieldSelector {
509 field_path: "metadata.name".to_string(),
510 ..Default::default()
511 }),
512 ..Default::default()
513 }),
514 ..Default::default()
515 },
516 EnvVar {
517 name: "POD_NAMESPACE".to_string(),
518 value_from: Some(EnvVarSource {
519 field_ref: Some(ObjectFieldSelector {
520 field_path: "metadata.namespace".to_string(),
521 ..Default::default()
522 }),
523 ..Default::default()
524 }),
525 ..Default::default()
526 },
527 ];
528
529 for e in &spec.env {
531 if !env.iter().any(|existing| existing.name == e.name) {
532 env.push(e.clone());
533 }
534 }
535
536 let liveness_probe = if spec.liveness_probe.enabled {
538 Some(Probe {
539 http_get: Some(HTTPGetAction {
540 path: Some("/health/live".to_string()),
541 port: IntOrString::Int(spec.server.port),
542 ..Default::default()
543 }),
544 initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
545 period_seconds: Some(spec.liveness_probe.period_seconds),
546 timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
547 success_threshold: Some(spec.liveness_probe.success_threshold),
548 failure_threshold: Some(spec.liveness_probe.failure_threshold),
549 ..Default::default()
550 })
551 } else {
552 None
553 };
554
555 let readiness_probe = if spec.readiness_probe.enabled {
556 Some(Probe {
557 http_get: Some(HTTPGetAction {
558 path: Some("/health/ready".to_string()),
559 port: IntOrString::Int(spec.server.port),
560 ..Default::default()
561 }),
562 initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
563 period_seconds: Some(spec.readiness_probe.period_seconds),
564 timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
565 success_threshold: Some(spec.readiness_probe.success_threshold),
566 failure_threshold: Some(spec.readiness_probe.failure_threshold),
567 ..Default::default()
568 })
569 } else {
570 None
571 };
572
573 let mut ports = vec![ContainerPort {
575 name: Some("http".to_string()),
576 container_port: spec.server.port,
577 protocol: Some("TCP".to_string()),
578 ..Default::default()
579 }];
580
581 if spec.metrics.enabled && spec.metrics.port != spec.server.port {
582 ports.push(ContainerPort {
583 name: Some("metrics".to_string()),
584 container_port: spec.metrics.port,
585 protocol: Some("TCP".to_string()),
586 ..Default::default()
587 });
588 }
589
590 Container {
591 name: "rivven-schema-registry".to_string(),
592 image: Some(spec.get_image()),
593 image_pull_policy: Some(spec.image_pull_policy.clone()),
594 command: Some(vec!["rivven-schema".to_string()]),
595 args: Some(vec![
596 "serve".to_string(),
597 "--config".to_string(),
598 "/etc/rivven-schema/config.yaml".to_string(),
599 ]),
600 env: Some(env),
601 ports: Some(ports),
602 liveness_probe,
603 readiness_probe,
604 volume_mounts: Some(vec![VolumeMount {
605 name: "config".to_string(),
606 mount_path: "/etc/rivven-schema".to_string(),
607 read_only: Some(true),
608 ..Default::default()
609 }]),
610 security_context: Some(k8s_openapi::api::core::v1::SecurityContext {
612 run_as_non_root: Some(true),
613 run_as_user: Some(1000),
614 run_as_group: Some(1000),
615 read_only_root_filesystem: Some(true),
616 allow_privilege_escalation: Some(false),
617 capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
618 drop: Some(vec!["ALL".to_string()]),
619 ..Default::default()
620 }),
621 ..Default::default()
622 }),
623 ..Default::default()
624 }
625}
626
627fn build_registry_service(registry: &RivvenSchemaRegistry) -> Result<Service> {
629 let name = registry.name_any();
630 let namespace = registry
631 .namespace()
632 .unwrap_or_else(|| "default".to_string());
633 let resource_name = format!("rivven-schema-registry-{}", name);
634
635 let spec = ®istry.spec;
636 let labels = spec.get_labels(&name);
637 let selector_labels = spec.get_selector_labels(&name);
638
639 let mut ports = vec![ServicePort {
640 name: Some("http".to_string()),
641 port: spec.server.port,
642 target_port: Some(IntOrString::Int(spec.server.port)),
643 protocol: Some("TCP".to_string()),
644 ..Default::default()
645 }];
646
647 if spec.metrics.enabled && spec.metrics.port != spec.server.port {
648 ports.push(ServicePort {
649 name: Some("metrics".to_string()),
650 port: spec.metrics.port,
651 target_port: Some(IntOrString::Int(spec.metrics.port)),
652 protocol: Some("TCP".to_string()),
653 ..Default::default()
654 });
655 }
656
657 Ok(Service {
658 metadata: ObjectMeta {
659 name: Some(resource_name),
660 namespace: Some(namespace),
661 labels: Some(labels),
662 owner_references: Some(vec![owner_reference(registry)]),
663 ..Default::default()
664 },
665 spec: Some(ServiceSpec {
666 selector: Some(selector_labels),
667 ports: Some(ports),
668 type_: Some("ClusterIP".to_string()),
669 ..Default::default()
670 }),
671 ..Default::default()
672 })
673}
674
675fn build_registry_pdb(registry: &RivvenSchemaRegistry) -> Result<PodDisruptionBudget> {
677 let name = registry.name_any();
678 let namespace = registry
679 .namespace()
680 .unwrap_or_else(|| "default".to_string());
681 let resource_name = format!("rivven-schema-registry-{}", name);
682
683 let spec = ®istry.spec;
684 let labels = spec.get_labels(&name);
685 let selector_labels = spec.get_selector_labels(&name);
686
687 let pdb_spec = &spec.pod_disruption_budget;
688
689 Ok(PodDisruptionBudget {
690 metadata: ObjectMeta {
691 name: Some(resource_name),
692 namespace: Some(namespace),
693 labels: Some(labels),
694 owner_references: Some(vec![owner_reference(registry)]),
695 ..Default::default()
696 },
697 spec: Some(PodDisruptionBudgetSpec {
698 selector: Some(LabelSelector {
699 match_labels: Some(selector_labels),
700 ..Default::default()
701 }),
702 min_available: pdb_spec
703 .min_available
704 .as_ref()
705 .map(|v| IntOrString::String(v.clone())),
706 max_unavailable: pdb_spec
707 .max_unavailable
708 .as_ref()
709 .map(|v| IntOrString::String(v.clone())),
710 ..Default::default()
711 }),
712 ..Default::default()
713 })
714}
715
716fn owner_reference(registry: &RivvenSchemaRegistry) -> OwnerReference {
718 OwnerReference {
719 api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
720 kind: "RivvenSchemaRegistry".to_string(),
721 name: registry.name_any(),
722 uid: registry.metadata.uid.clone().unwrap_or_default(),
723 controller: Some(true),
724 block_owner_deletion: Some(true),
725 }
726}
727
728async fn apply_registry_configmap(
730 client: &Client,
731 namespace: &str,
732 configmap: ConfigMap,
733) -> Result<()> {
734 let configmaps: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
735 let name = configmap.metadata.name.clone().unwrap_or_default();
736
737 let patch = Patch::Apply(&configmap);
738 let params = PatchParams::apply("rivven-operator").force();
739
740 configmaps
741 .patch(&name, ¶ms, &patch)
742 .await
743 .map_err(|e| OperatorError::ReconcileFailed(format!("ConfigMap: {}", e)))?;
744
745 debug!(name = %name, "Applied schema registry ConfigMap");
746 Ok(())
747}
748
749async fn apply_registry_deployment(
751 client: &Client,
752 namespace: &str,
753 deployment: Deployment,
754) -> Result<(i32, i32)> {
755 let deployments: Api<Deployment> = Api::namespaced(client.clone(), namespace);
756 let name = deployment.metadata.name.clone().unwrap_or_default();
757
758 let patch = Patch::Apply(&deployment);
759 let params = PatchParams::apply("rivven-operator").force();
760
761 let result = deployments
762 .patch(&name, ¶ms, &patch)
763 .await
764 .map_err(|e| OperatorError::ReconcileFailed(format!("Deployment: {}", e)))?;
765
766 let status = result.status.as_ref();
767 let replicas = status.and_then(|s| s.replicas).unwrap_or(0);
768 let ready_replicas = status.and_then(|s| s.ready_replicas).unwrap_or(0);
769
770 debug!(name = %name, replicas = replicas, ready = ready_replicas, "Applied schema registry Deployment");
771 Ok((replicas, ready_replicas))
772}
773
774async fn apply_registry_service(client: &Client, namespace: &str, service: Service) -> Result<()> {
776 let services: Api<Service> = Api::namespaced(client.clone(), namespace);
777 let name = service.metadata.name.clone().unwrap_or_default();
778
779 let patch = Patch::Apply(&service);
780 let params = PatchParams::apply("rivven-operator").force();
781
782 services
783 .patch(&name, ¶ms, &patch)
784 .await
785 .map_err(|e| OperatorError::ReconcileFailed(format!("Service: {}", e)))?;
786
787 debug!(name = %name, "Applied schema registry Service");
788 Ok(())
789}
790
791async fn apply_registry_pdb(
793 client: &Client,
794 namespace: &str,
795 pdb: PodDisruptionBudget,
796) -> Result<()> {
797 let pdbs: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
798 let name = pdb.metadata.name.clone().unwrap_or_default();
799
800 let patch = Patch::Apply(&pdb);
801 let params = PatchParams::apply("rivven-operator").force();
802
803 pdbs.patch(&name, ¶ms, &patch)
804 .await
805 .map_err(|e| OperatorError::ReconcileFailed(format!("PodDisruptionBudget: {}", e)))?;
806
807 debug!(name = %name, "Applied schema registry PodDisruptionBudget");
808 Ok(())
809}
810
811async fn update_schema_registry_status(
813 client: &Client,
814 namespace: &str,
815 name: &str,
816 status: RivvenSchemaRegistryStatus,
817) -> Result<()> {
818 let registries: Api<RivvenSchemaRegistry> = Api::namespaced(client.clone(), namespace);
819
820 let patch = serde_json::json!({
821 "status": status
822 });
823
824 registries
825 .patch_status(
826 name,
827 &PatchParams::apply("rivven-operator"),
828 &Patch::Merge(&patch),
829 )
830 .await
831 .map_err(|e| OperatorError::ReconcileFailed(format!("Status update: {}", e)))?;
832
833 debug!(name = %name, phase = ?status.phase, "Updated schema registry status");
834 Ok(())
835}
836
837fn build_failed_status(registry: &RivvenSchemaRegistry, error: &str) -> RivvenSchemaRegistryStatus {
839 RivvenSchemaRegistryStatus {
840 phase: SchemaRegistryPhase::Failed,
841 replicas: 0,
842 ready_replicas: 0,
843 schemas_registered: 0,
844 subjects_count: 0,
845 contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
846 observed_generation: registry.metadata.generation.unwrap_or(0),
847 conditions: vec![SchemaRegistryCondition {
848 condition_type: "Ready".to_string(),
849 status: "False".to_string(),
850 reason: Some("ValidationFailed".to_string()),
851 message: Some(error.to_string()),
852 last_transition_time: Some(Utc::now().to_rfc3339()),
853 }],
854 endpoints: vec![],
855 storage_status: None,
856 external_sync_status: None,
857 last_sync_time: None,
858 last_updated: Some(Utc::now().to_rfc3339()),
859 message: Some(error.to_string()),
860 }
861}
862
863fn build_running_status(
865 registry: &RivvenSchemaRegistry,
866 deployment_status: &(i32, i32),
867) -> RivvenSchemaRegistryStatus {
868 let (replicas, ready_replicas) = *deployment_status;
869 let name = registry.name_any();
870 let namespace = registry
871 .namespace()
872 .unwrap_or_else(|| "default".to_string());
873
874 let phase = if ready_replicas == replicas && replicas > 0 {
875 SchemaRegistryPhase::Running
876 } else if ready_replicas > 0 {
877 SchemaRegistryPhase::Degraded
878 } else {
879 SchemaRegistryPhase::Provisioning
880 };
881
882 let endpoint = format!(
883 "http://rivven-schema-registry-{}.{}.svc.cluster.local:{}",
884 name, namespace, registry.spec.server.port
885 );
886
887 RivvenSchemaRegistryStatus {
888 phase,
889 replicas,
890 ready_replicas,
891 schemas_registered: 0, subjects_count: 0, contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
894 observed_generation: registry.metadata.generation.unwrap_or(0),
895 conditions: vec![
896 SchemaRegistryCondition {
897 condition_type: "Ready".to_string(),
898 status: if ready_replicas == replicas && replicas > 0 {
899 "True"
900 } else {
901 "False"
902 }
903 .to_string(),
904 reason: Some(
905 if ready_replicas == replicas && replicas > 0 {
906 "AllReplicasReady"
907 } else {
908 "ReplicasNotReady"
909 }
910 .to_string(),
911 ),
912 message: Some(format!("{}/{} replicas ready", ready_replicas, replicas)),
913 last_transition_time: Some(Utc::now().to_rfc3339()),
914 },
915 SchemaRegistryCondition {
916 condition_type: "StorageHealthy".to_string(),
917 status: "Unknown".to_string(),
918 reason: Some("Pending".to_string()),
919 message: Some("Storage health check pending".to_string()),
920 last_transition_time: Some(Utc::now().to_rfc3339()),
921 },
922 ],
923 endpoints: vec![endpoint],
924 storage_status: Some(registry.spec.storage.mode.clone()),
925 external_sync_status: if registry.spec.external.enabled {
926 Some("Pending".to_string())
927 } else {
928 None
929 },
930 last_sync_time: None,
931 last_updated: Some(Utc::now().to_rfc3339()),
932 message: None,
933 }
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939 use crate::crd::{
940 ClusterReference, ExternalRegistrySpec, PdbSpec, ProbeSpec, SchemaCompatibilitySpec,
941 SchemaContextsSpec, SchemaFormatSpec, SchemaRegistryAuthSpec, SchemaRegistryMetricsSpec,
942 SchemaRegistryServerSpec, SchemaRegistryStorageSpec, SchemaRegistryTlsSpec,
943 SchemaValidationSpec,
944 };
945
946 fn create_test_registry() -> RivvenSchemaRegistry {
947 RivvenSchemaRegistry {
948 metadata: ObjectMeta {
949 name: Some("test-registry".to_string()),
950 namespace: Some("default".to_string()),
951 uid: Some("test-uid".to_string()),
952 generation: Some(1),
953 ..Default::default()
954 },
955 spec: RivvenSchemaRegistrySpec {
956 cluster_ref: ClusterReference {
957 name: "test-cluster".to_string(),
958 namespace: None,
959 },
960 replicas: 3,
961 version: "0.0.1".to_string(),
962 image: None,
963 image_pull_policy: "IfNotPresent".to_string(),
964 image_pull_secrets: vec![],
965 resources: None,
966 server: SchemaRegistryServerSpec::default(),
967 storage: SchemaRegistryStorageSpec::default(),
968 compatibility: SchemaCompatibilitySpec::default(),
969 schemas: SchemaFormatSpec::default(),
970 contexts: SchemaContextsSpec::default(),
971 validation: SchemaValidationSpec::default(),
972 auth: SchemaRegistryAuthSpec::default(),
973 tls: SchemaRegistryTlsSpec::default(),
974 metrics: SchemaRegistryMetricsSpec::default(),
975 external: ExternalRegistrySpec::default(),
976 pod_annotations: BTreeMap::new(),
977 pod_labels: BTreeMap::new(),
978 env: vec![],
979 node_selector: BTreeMap::new(),
980 tolerations: vec![],
981 affinity: None,
982 service_account: None,
983 security_context: None,
984 container_security_context: None,
985 liveness_probe: ProbeSpec::default(),
986 readiness_probe: ProbeSpec::default(),
987 pod_disruption_budget: PdbSpec::default(),
988 },
989 status: None,
990 }
991 }
992
993 #[test]
994 fn test_build_registry_configmap() {
995 let registry = create_test_registry();
996 let configmap = build_registry_configmap(®istry).unwrap();
997
998 assert_eq!(
999 configmap.metadata.name,
1000 Some("rivven-schema-registry-test-registry-config".to_string())
1001 );
1002 assert!(configmap.data.is_some());
1003 let data = configmap.data.unwrap();
1004 assert!(data.contains_key("config.yaml"));
1005 }
1006
1007 #[test]
1008 fn test_build_registry_deployment() {
1009 let registry = create_test_registry();
1010 let deployment = build_registry_deployment(®istry).unwrap();
1011
1012 assert_eq!(
1013 deployment.metadata.name,
1014 Some("rivven-schema-registry-test-registry".to_string())
1015 );
1016 assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(3));
1017 }
1018
1019 #[test]
1020 fn test_build_registry_service() {
1021 let registry = create_test_registry();
1022 let service = build_registry_service(®istry).unwrap();
1023
1024 assert_eq!(
1025 service.metadata.name,
1026 Some("rivven-schema-registry-test-registry".to_string())
1027 );
1028
1029 let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1030 assert_eq!(ports.len(), 2); assert_eq!(ports[0].port, 8081);
1032 }
1033
1034 #[test]
1035 fn test_build_registry_pdb() {
1036 let registry = create_test_registry();
1037 let pdb = build_registry_pdb(®istry).unwrap();
1038
1039 assert_eq!(
1040 pdb.metadata.name,
1041 Some("rivven-schema-registry-test-registry".to_string())
1042 );
1043 }
1044
1045 #[test]
1046 fn test_build_failed_status() {
1047 let registry = create_test_registry();
1048 let status = build_failed_status(®istry, "Test error");
1049
1050 assert_eq!(status.phase, SchemaRegistryPhase::Failed);
1051 assert_eq!(status.message, Some("Test error".to_string()));
1052 assert_eq!(status.conditions.len(), 1);
1053 assert_eq!(status.conditions[0].status, "False");
1054 }
1055
1056 #[test]
1057 fn test_build_running_status() {
1058 let registry = create_test_registry();
1059 let status = build_running_status(®istry, &(3, 3));
1060
1061 assert_eq!(status.phase, SchemaRegistryPhase::Running);
1062 assert_eq!(status.replicas, 3);
1063 assert_eq!(status.ready_replicas, 3);
1064 assert!(!status.endpoints.is_empty());
1065 }
1066
1067 #[test]
1068 fn test_build_running_status_degraded() {
1069 let registry = create_test_registry();
1070 let status = build_running_status(®istry, &(3, 2));
1071
1072 assert_eq!(status.phase, SchemaRegistryPhase::Degraded);
1073 assert_eq!(status.replicas, 3);
1074 assert_eq!(status.ready_replicas, 2);
1075 }
1076
1077 #[test]
1078 fn test_build_running_status_provisioning() {
1079 let registry = create_test_registry();
1080 let status = build_running_status(®istry, &(3, 0));
1081
1082 assert_eq!(status.phase, SchemaRegistryPhase::Provisioning);
1083 assert_eq!(status.ready_replicas, 0);
1084 }
1085
1086 #[test]
1087 fn test_spec_get_image_default() {
1088 let registry = create_test_registry();
1089 assert_eq!(
1090 registry.spec.get_image(),
1091 "ghcr.io/hupe1980/rivven-schema:0.0.1"
1092 );
1093 }
1094
1095 #[test]
1096 fn test_spec_get_image_custom() {
1097 let mut registry = create_test_registry();
1098 registry.spec.image = Some("custom/registry:latest".to_string());
1099 assert_eq!(registry.spec.get_image(), "custom/registry:latest");
1100 }
1101
1102 #[test]
1103 fn test_spec_get_labels() {
1104 let registry = create_test_registry();
1105 let labels = registry.spec.get_labels("test-registry");
1106
1107 assert_eq!(
1108 labels.get("app.kubernetes.io/name"),
1109 Some(&"rivven-schema-registry".to_string())
1110 );
1111 assert_eq!(
1112 labels.get("app.kubernetes.io/instance"),
1113 Some(&"test-registry".to_string())
1114 );
1115 assert_eq!(
1116 labels.get("app.kubernetes.io/component"),
1117 Some(&"schema-registry".to_string())
1118 );
1119 }
1120
1121 #[test]
1122 fn test_spec_get_selector_labels() {
1123 let registry = create_test_registry();
1124 let labels = registry.spec.get_selector_labels("test-registry");
1125
1126 assert_eq!(labels.len(), 2);
1127 assert_eq!(
1128 labels.get("app.kubernetes.io/name"),
1129 Some(&"rivven-schema-registry".to_string())
1130 );
1131 assert_eq!(
1132 labels.get("app.kubernetes.io/instance"),
1133 Some(&"test-registry".to_string())
1134 );
1135 }
1136
1137 #[test]
1138 fn test_schema_registry_phase_default() {
1139 let phase = SchemaRegistryPhase::default();
1140 assert_eq!(phase, SchemaRegistryPhase::Pending);
1141 }
1142
1143 #[test]
1144 fn test_server_spec_default() {
1145 let spec = SchemaRegistryServerSpec::default();
1146 assert_eq!(spec.port, 8081);
1147 assert_eq!(spec.bind_address, "0.0.0.0");
1148 assert_eq!(spec.timeout_seconds, 30);
1149 }
1150
1151 #[test]
1152 fn test_storage_spec_default() {
1153 let spec = SchemaRegistryStorageSpec::default();
1154 assert_eq!(spec.mode, "broker");
1155 assert_eq!(spec.topic, "_schemas");
1156 assert_eq!(spec.replication_factor, 3);
1157 }
1158
1159 #[test]
1160 fn test_compatibility_spec_default() {
1161 let spec = SchemaCompatibilitySpec::default();
1162 assert_eq!(spec.default_level, "BACKWARD");
1163 assert!(spec.allow_overrides);
1164 }
1165
1166 #[test]
1167 fn test_schema_format_spec_default() {
1168 let spec = SchemaFormatSpec::default();
1169 assert!(spec.avro);
1170 assert!(spec.json_schema);
1171 assert!(spec.protobuf);
1172 }
1173
1174 #[test]
1175 fn test_metrics_spec_default() {
1176 let spec = SchemaRegistryMetricsSpec::default();
1177 assert!(spec.enabled);
1178 assert_eq!(spec.port, 9090);
1179 assert_eq!(spec.path, "/metrics");
1180 }
1181
1182 #[test]
1183 fn test_build_registry_config_yaml() {
1184 let registry = create_test_registry();
1185 let config = build_registry_config_yaml(®istry.spec).unwrap();
1186
1187 assert!(config.contains("port: 8081"));
1188 assert!(config.contains("mode: \"broker\""));
1189 assert!(config.contains("default_level: \"BACKWARD\""));
1190 assert!(config.contains("avro: true"));
1191 }
1192
1193 #[test]
1194 fn test_controller_metrics_new() {
1195 let _metrics = SchemaRegistryControllerMetrics::new();
1196 }
1198}