1use crate::crd::{
7 ClusterReference, RivvenSchemaRegistry, RivvenSchemaRegistrySpec, RivvenSchemaRegistryStatus,
8 SchemaRegistryCondition, SchemaRegistryPhase,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
14use k8s_openapi::api::core::v1::{
15 ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
16 PodSpec, PodTemplateSpec, Probe, Service, ServicePort, ServiceSpec, VolumeMount,
17};
18use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
19use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
20use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
21use kube::api::{Api, Patch, PatchParams};
22use kube::runtime::controller::{Action, Controller};
23use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
24use kube::runtime::watcher::Config;
25use kube::{Client, Resource, ResourceExt};
26use std::collections::BTreeMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tracing::{debug, error, info, instrument, warn};
30use validator::Validate;
31
32pub const SCHEMA_REGISTRY_FINALIZER: &str = "rivven.hupe1980.github.io/schema-registry-finalizer";
34
35const DEFAULT_REQUEUE_SECONDS: u64 = 60; const ERROR_REQUEUE_SECONDS: u64 = 30;
40
41pub struct SchemaRegistryControllerContext {
43 pub client: Client,
45 pub metrics: Option<SchemaRegistryControllerMetrics>,
47}
48
49#[derive(Clone)]
51pub struct SchemaRegistryControllerMetrics {
52 pub reconciliations: metrics::Counter,
54 pub errors: metrics::Counter,
56 pub duration: metrics::Histogram,
58}
59
60impl SchemaRegistryControllerMetrics {
61 pub fn new() -> Self {
63 Self {
64 reconciliations: metrics::counter!("rivven_schema_registry_reconciliations_total"),
65 errors: metrics::counter!("rivven_schema_registry_reconciliation_errors_total"),
66 duration: metrics::histogram!("rivven_schema_registry_reconciliation_duration_seconds"),
67 }
68 }
69}
70
71impl Default for SchemaRegistryControllerMetrics {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77pub async fn run_schema_registry_controller(
79 client: Client,
80 namespace: Option<String>,
81) -> Result<()> {
82 let registries: Api<RivvenSchemaRegistry> = match &namespace {
83 Some(ns) => Api::namespaced(client.clone(), ns),
84 None => Api::all(client.clone()),
85 };
86
87 let ctx = Arc::new(SchemaRegistryControllerContext {
88 client: client.clone(),
89 metrics: Some(SchemaRegistryControllerMetrics::new()),
90 });
91
92 info!(
93 namespace = namespace.as_deref().unwrap_or("all"),
94 "Starting RivvenSchemaRegistry controller"
95 );
96
97 let deployments = match &namespace {
99 Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
100 None => Api::<Deployment>::all(client.clone()),
101 };
102
103 let configmaps = match &namespace {
104 Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
105 None => Api::<ConfigMap>::all(client.clone()),
106 };
107
108 Controller::new(registries.clone(), Config::default())
109 .owns(deployments, Config::default())
110 .owns(configmaps, Config::default())
111 .run(reconcile_schema_registry, schema_registry_error_policy, ctx)
112 .for_each(|result| async move {
113 match result {
114 Ok((obj, action)) => {
115 debug!(
116 name = obj.name,
117 namespace = obj.namespace,
118 ?action,
119 "Schema registry reconciliation completed"
120 );
121 }
122 Err(e) => {
123 error!(error = %e, "Schema registry reconciliation failed");
124 }
125 }
126 })
127 .await;
128
129 Ok(())
130}
131
132#[instrument(skip(registry, ctx), fields(name = %registry.name_any(), namespace = registry.namespace()))]
134async fn reconcile_schema_registry(
135 registry: Arc<RivvenSchemaRegistry>,
136 ctx: Arc<SchemaRegistryControllerContext>,
137) -> Result<Action> {
138 let start = std::time::Instant::now();
139
140 if let Some(ref metrics) = ctx.metrics {
141 metrics.reconciliations.increment(1);
142 }
143
144 let namespace = registry
145 .namespace()
146 .unwrap_or_else(|| "default".to_string());
147 let registries: Api<RivvenSchemaRegistry> = Api::namespaced(ctx.client.clone(), &namespace);
148
149 let result = finalizer(
150 ®istries,
151 SCHEMA_REGISTRY_FINALIZER,
152 registry,
153 |event| async {
154 match event {
155 FinalizerEvent::Apply(registry) => {
156 apply_schema_registry(registry, ctx.clone()).await
157 }
158 FinalizerEvent::Cleanup(registry) => {
159 cleanup_schema_registry(registry, ctx.clone()).await
160 }
161 }
162 },
163 )
164 .await;
165
166 if let Some(ref metrics) = ctx.metrics {
167 metrics.duration.record(start.elapsed().as_secs_f64());
168 }
169
170 result.map_err(|e| {
171 if let Some(ref metrics) = ctx.metrics {
172 metrics.errors.increment(1);
173 }
174 OperatorError::ReconcileFailed(e.to_string())
175 })
176}
177
178#[instrument(skip(registry, ctx))]
180async fn apply_schema_registry(
181 registry: Arc<RivvenSchemaRegistry>,
182 ctx: Arc<SchemaRegistryControllerContext>,
183) -> Result<Action> {
184 let name = registry.name_any();
185 let namespace = registry
186 .namespace()
187 .unwrap_or_else(|| "default".to_string());
188
189 info!(name = %name, namespace = %namespace, "Reconciling RivvenSchemaRegistry");
190
191 if let Err(errors) = registry.spec.validate() {
193 let error_messages: Vec<String> = errors
194 .field_errors()
195 .iter()
196 .flat_map(|(field, errs)| {
197 errs.iter()
198 .map(move |e| format!("{}: {:?}", field, e.message))
199 })
200 .collect();
201 let error_msg = error_messages.join("; ");
202 warn!(name = %name, errors = %error_msg, "Schema registry spec validation failed");
203
204 update_schema_registry_status(
206 &ctx.client,
207 &namespace,
208 &name,
209 build_failed_status(®istry, &error_msg),
210 )
211 .await?;
212
213 return Err(OperatorError::InvalidConfig(error_msg));
214 }
215
216 verify_cluster_ref(&ctx.client, &namespace, ®istry.spec.cluster_ref).await?;
218
219 let configmap = build_registry_configmap(®istry)?;
221 apply_registry_configmap(&ctx.client, &namespace, configmap).await?;
222
223 let deployment = build_registry_deployment(®istry)?;
225 let deployment_status = apply_registry_deployment(&ctx.client, &namespace, deployment).await?;
226
227 let service = build_registry_service(®istry)?;
229 apply_registry_service(&ctx.client, &namespace, service).await?;
230
231 if registry.spec.pod_disruption_budget.enabled {
233 let pdb = build_registry_pdb(®istry)?;
234 apply_registry_pdb(&ctx.client, &namespace, pdb).await?;
235 }
236
237 let status = build_running_status(®istry, &deployment_status);
239 update_schema_registry_status(&ctx.client, &namespace, &name, status).await?;
240
241 Ok(Action::requeue(Duration::from_secs(
242 DEFAULT_REQUEUE_SECONDS,
243 )))
244}
245
246#[instrument(skip(registry, _ctx))]
248async fn cleanup_schema_registry(
249 registry: Arc<RivvenSchemaRegistry>,
250 _ctx: Arc<SchemaRegistryControllerContext>,
251) -> Result<Action> {
252 let name = registry.name_any();
253 let namespace = registry
254 .namespace()
255 .unwrap_or_else(|| "default".to_string());
256
257 info!(name = %name, namespace = %namespace, "Cleaning up RivvenSchemaRegistry");
258
259 Ok(Action::await_change())
263}
264
265fn schema_registry_error_policy(
267 _registry: Arc<RivvenSchemaRegistry>,
268 error: &OperatorError,
269 _ctx: Arc<SchemaRegistryControllerContext>,
270) -> Action {
271 warn!(error = %error, "Schema registry reconciliation error, will retry");
272 Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS))
273}
274
275async fn verify_cluster_ref(
277 client: &Client,
278 namespace: &str,
279 cluster_ref: &ClusterReference,
280) -> Result<()> {
281 use crate::crd::RivvenCluster;
282
283 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
284 let clusters: Api<RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
285
286 clusters
287 .get(&cluster_ref.name)
288 .await
289 .map_err(|e| OperatorError::ClusterNotFound(format!("{}: {}", cluster_ref.name, e)))?;
290
291 Ok(())
292}
293
294fn build_registry_configmap(registry: &RivvenSchemaRegistry) -> Result<ConfigMap> {
296 let name = registry.name_any();
297 let namespace = registry
298 .namespace()
299 .unwrap_or_else(|| "default".to_string());
300 let resource_name = format!("rivven-schema-registry-{}", name);
301
302 let spec = ®istry.spec;
303 let labels = spec.get_labels(&name);
304
305 let config = build_registry_config_yaml(spec)?;
307
308 Ok(ConfigMap {
309 metadata: ObjectMeta {
310 name: Some(format!("{}-config", resource_name)),
311 namespace: Some(namespace),
312 labels: Some(labels),
313 owner_references: Some(vec![owner_reference(registry)]),
314 ..Default::default()
315 },
316 data: Some(BTreeMap::from([("config.yaml".to_string(), config)])),
317 ..Default::default()
318 })
319}
320
321fn build_registry_config_yaml(spec: &RivvenSchemaRegistrySpec) -> Result<String> {
323 let mut config = String::new();
324
325 config.push_str(&format!(
327 "server:\n port: {}\n bind_address: \"{}\"\n timeout_seconds: {}\n max_request_size: {}\n",
328 spec.server.port,
329 spec.server.bind_address,
330 spec.server.timeout_seconds,
331 spec.server.max_request_size,
332 ));
333
334 config.push_str(&format!(
336 "\nstorage:\n mode: \"{}\"\n topic: \"{}\"\n replication_factor: {}\n partitions: {}\n normalize: {}\n",
337 spec.storage.mode,
338 spec.storage.topic,
339 spec.storage.replication_factor,
340 spec.storage.partitions,
341 spec.storage.normalize,
342 ));
343
344 config.push_str(&format!(
346 "\ncompatibility:\n default_level: \"{}\"\n allow_overrides: {}\n",
347 spec.compatibility.default_level, spec.compatibility.allow_overrides,
348 ));
349
350 config.push_str(&format!(
352 "\nschemas:\n avro: {}\n json_schema: {}\n protobuf: {}\n strict_validation: {}\n",
353 spec.schemas.avro,
354 spec.schemas.json_schema,
355 spec.schemas.protobuf,
356 spec.schemas.strict_validation,
357 ));
358
359 if spec.contexts.enabled {
361 config.push_str(&format!(
362 "\ncontexts:\n enabled: true\n max_contexts: {}\n",
363 spec.contexts.max_contexts,
364 ));
365 }
366
367 if spec.metrics.enabled {
369 config.push_str(&format!(
370 "\nmetrics:\n enabled: true\n port: {}\n path: \"{}\"\n",
371 spec.metrics.port, spec.metrics.path,
372 ));
373 }
374
375 Ok(config)
376}
377
378fn build_registry_deployment(registry: &RivvenSchemaRegistry) -> Result<Deployment> {
380 let name = registry.name_any();
381 let namespace = registry
382 .namespace()
383 .unwrap_or_else(|| "default".to_string());
384 let resource_name = format!("rivven-schema-registry-{}", name);
385
386 let spec = ®istry.spec;
387 let labels = spec.get_labels(&name);
388 let selector_labels = spec.get_selector_labels(&name);
389
390 let container = build_registry_container(spec, &resource_name);
392
393 let mut pod_labels = selector_labels.clone();
395 pod_labels.extend(spec.pod_labels.clone());
396
397 let mut pod_annotations = BTreeMap::new();
399 if spec.metrics.enabled {
400 pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
401 pod_annotations.insert(
402 "prometheus.io/port".to_string(),
403 spec.metrics.port.to_string(),
404 );
405 pod_annotations.insert("prometheus.io/path".to_string(), spec.metrics.path.clone());
406 }
407 pod_annotations.extend(spec.pod_annotations.clone());
408
409 let pod_spec = PodSpec {
410 containers: vec![container],
411 node_selector: if spec.node_selector.is_empty() {
412 None
413 } else {
414 Some(spec.node_selector.clone())
415 },
416 tolerations: if spec.tolerations.is_empty() {
417 None
418 } else {
419 Some(spec.tolerations.clone())
420 },
421 service_account_name: spec.service_account.clone(),
422 image_pull_secrets: if spec.image_pull_secrets.is_empty() {
423 None
424 } else {
425 Some(
426 spec.image_pull_secrets
427 .iter()
428 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
429 .collect(),
430 )
431 },
432 automount_service_account_token: Some(false),
434 ..Default::default()
435 };
436
437 Ok(Deployment {
438 metadata: ObjectMeta {
439 name: Some(resource_name.clone()),
440 namespace: Some(namespace),
441 labels: Some(labels.clone()),
442 owner_references: Some(vec![owner_reference(registry)]),
443 ..Default::default()
444 },
445 spec: Some(DeploymentSpec {
446 replicas: Some(spec.replicas),
447 selector: LabelSelector {
448 match_labels: Some(selector_labels),
449 ..Default::default()
450 },
451 template: PodTemplateSpec {
452 metadata: Some(ObjectMeta {
453 labels: Some(pod_labels),
454 annotations: Some(pod_annotations),
455 ..Default::default()
456 }),
457 spec: Some(pod_spec),
458 },
459 ..Default::default()
460 }),
461 ..Default::default()
462 })
463}
464
465fn build_registry_container(spec: &RivvenSchemaRegistrySpec, _resource_name: &str) -> Container {
467 let mut env = vec![
469 EnvVar {
470 name: "RIVVEN_SCHEMA_PORT".to_string(),
471 value: Some(spec.server.port.to_string()),
472 ..Default::default()
473 },
474 EnvVar {
475 name: "RIVVEN_SCHEMA_BIND_ADDRESS".to_string(),
476 value: Some(spec.server.bind_address.clone()),
477 ..Default::default()
478 },
479 EnvVar {
480 name: "RIVVEN_SCHEMA_STORAGE_MODE".to_string(),
481 value: Some(spec.storage.mode.clone()),
482 ..Default::default()
483 },
484 EnvVar {
485 name: "RIVVEN_SCHEMA_STORAGE_TOPIC".to_string(),
486 value: Some(spec.storage.topic.clone()),
487 ..Default::default()
488 },
489 EnvVar {
490 name: "RIVVEN_SCHEMA_COMPATIBILITY".to_string(),
491 value: Some(spec.compatibility.default_level.clone()),
492 ..Default::default()
493 },
494 EnvVar {
496 name: "RIVVEN_BROKERS".to_string(),
497 value: Some(format!(
498 "rivven-{}-headless.{}.svc.cluster.local:9092",
499 spec.cluster_ref.name,
500 spec.cluster_ref.namespace.as_deref().unwrap_or("default")
501 )),
502 ..Default::default()
503 },
504 EnvVar {
506 name: "POD_NAME".to_string(),
507 value_from: Some(EnvVarSource {
508 field_ref: Some(ObjectFieldSelector {
509 field_path: "metadata.name".to_string(),
510 ..Default::default()
511 }),
512 ..Default::default()
513 }),
514 ..Default::default()
515 },
516 EnvVar {
517 name: "POD_NAMESPACE".to_string(),
518 value_from: Some(EnvVarSource {
519 field_ref: Some(ObjectFieldSelector {
520 field_path: "metadata.namespace".to_string(),
521 ..Default::default()
522 }),
523 ..Default::default()
524 }),
525 ..Default::default()
526 },
527 ];
528
529 for e in &spec.env {
531 if !env.iter().any(|existing| existing.name == e.name) {
532 env.push(e.clone());
533 }
534 }
535
536 let liveness_probe = if spec.liveness_probe.enabled {
538 Some(Probe {
539 http_get: Some(HTTPGetAction {
540 path: Some("/health/live".to_string()),
541 port: IntOrString::Int(spec.server.port),
542 ..Default::default()
543 }),
544 initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
545 period_seconds: Some(spec.liveness_probe.period_seconds),
546 timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
547 success_threshold: Some(spec.liveness_probe.success_threshold),
548 failure_threshold: Some(spec.liveness_probe.failure_threshold),
549 ..Default::default()
550 })
551 } else {
552 None
553 };
554
555 let readiness_probe = if spec.readiness_probe.enabled {
556 Some(Probe {
557 http_get: Some(HTTPGetAction {
558 path: Some("/health/ready".to_string()),
559 port: IntOrString::Int(spec.server.port),
560 ..Default::default()
561 }),
562 initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
563 period_seconds: Some(spec.readiness_probe.period_seconds),
564 timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
565 success_threshold: Some(spec.readiness_probe.success_threshold),
566 failure_threshold: Some(spec.readiness_probe.failure_threshold),
567 ..Default::default()
568 })
569 } else {
570 None
571 };
572
573 let mut ports = vec![ContainerPort {
575 name: Some("http".to_string()),
576 container_port: spec.server.port,
577 protocol: Some("TCP".to_string()),
578 ..Default::default()
579 }];
580
581 if spec.metrics.enabled && spec.metrics.port != spec.server.port {
582 ports.push(ContainerPort {
583 name: Some("metrics".to_string()),
584 container_port: spec.metrics.port,
585 protocol: Some("TCP".to_string()),
586 ..Default::default()
587 });
588 }
589
590 Container {
591 name: "rivven-schema-registry".to_string(),
592 image: Some(spec.get_image()),
593 image_pull_policy: Some(spec.image_pull_policy.clone()),
594 command: Some(vec!["rivven-schema".to_string()]),
595 args: Some(vec![
596 "serve".to_string(),
597 "--config".to_string(),
598 "/etc/rivven-schema/config.yaml".to_string(),
599 ]),
600 env: Some(env),
601 ports: Some(ports),
602 liveness_probe,
603 readiness_probe,
604 volume_mounts: Some(vec![VolumeMount {
605 name: "config".to_string(),
606 mount_path: "/etc/rivven-schema".to_string(),
607 read_only: Some(true),
608 ..Default::default()
609 }]),
610 security_context: Some(k8s_openapi::api::core::v1::SecurityContext {
612 run_as_non_root: Some(true),
613 run_as_user: Some(1000),
614 run_as_group: Some(1000),
615 read_only_root_filesystem: Some(true),
616 allow_privilege_escalation: Some(false),
617 capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
618 drop: Some(vec!["ALL".to_string()]),
619 ..Default::default()
620 }),
621 ..Default::default()
622 }),
623 ..Default::default()
624 }
625}
626
627fn build_registry_service(registry: &RivvenSchemaRegistry) -> Result<Service> {
629 let name = registry.name_any();
630 let namespace = registry
631 .namespace()
632 .unwrap_or_else(|| "default".to_string());
633 let resource_name = format!("rivven-schema-registry-{}", name);
634
635 let spec = ®istry.spec;
636 let labels = spec.get_labels(&name);
637 let selector_labels = spec.get_selector_labels(&name);
638
639 let mut ports = vec![ServicePort {
640 name: Some("http".to_string()),
641 port: spec.server.port,
642 target_port: Some(IntOrString::Int(spec.server.port)),
643 protocol: Some("TCP".to_string()),
644 ..Default::default()
645 }];
646
647 if spec.metrics.enabled && spec.metrics.port != spec.server.port {
648 ports.push(ServicePort {
649 name: Some("metrics".to_string()),
650 port: spec.metrics.port,
651 target_port: Some(IntOrString::Int(spec.metrics.port)),
652 protocol: Some("TCP".to_string()),
653 ..Default::default()
654 });
655 }
656
657 Ok(Service {
658 metadata: ObjectMeta {
659 name: Some(resource_name),
660 namespace: Some(namespace),
661 labels: Some(labels),
662 owner_references: Some(vec![owner_reference(registry)]),
663 ..Default::default()
664 },
665 spec: Some(ServiceSpec {
666 selector: Some(selector_labels),
667 ports: Some(ports),
668 type_: Some("ClusterIP".to_string()),
669 ..Default::default()
670 }),
671 ..Default::default()
672 })
673}
674
675fn build_registry_pdb(registry: &RivvenSchemaRegistry) -> Result<PodDisruptionBudget> {
677 let name = registry.name_any();
678 let namespace = registry
679 .namespace()
680 .unwrap_or_else(|| "default".to_string());
681 let resource_name = format!("rivven-schema-registry-{}", name);
682
683 let spec = ®istry.spec;
684 let labels = spec.get_labels(&name);
685 let selector_labels = spec.get_selector_labels(&name);
686
687 let pdb_spec = &spec.pod_disruption_budget;
688
689 Ok(PodDisruptionBudget {
690 metadata: ObjectMeta {
691 name: Some(resource_name),
692 namespace: Some(namespace),
693 labels: Some(labels),
694 owner_references: Some(vec![owner_reference(registry)]),
695 ..Default::default()
696 },
697 spec: Some(PodDisruptionBudgetSpec {
698 selector: Some(LabelSelector {
699 match_labels: Some(selector_labels),
700 ..Default::default()
701 }),
702 min_available: pdb_spec
703 .min_available
704 .as_ref()
705 .map(|v| IntOrString::String(v.clone())),
706 max_unavailable: pdb_spec
707 .max_unavailable
708 .as_ref()
709 .map(|v| IntOrString::String(v.clone())),
710 ..Default::default()
711 }),
712 ..Default::default()
713 })
714}
715
716fn owner_reference(registry: &RivvenSchemaRegistry) -> OwnerReference {
718 OwnerReference {
719 api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
720 kind: "RivvenSchemaRegistry".to_string(),
721 name: registry.name_any(),
722 uid: registry.metadata.uid.clone().unwrap_or_default(),
723 controller: Some(true),
724 block_owner_deletion: Some(true),
725 }
726}
727
728fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
730 let labels = existing.meta().labels.as_ref();
731 let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
732 match managed_by {
733 Some(manager) if manager != "rivven-operator" => {
734 let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
735 Err(OperatorError::InvalidConfig(format!(
736 "resource '{}' is managed by '{}', not rivven-operator; \
737 refusing to force-apply to avoid ownership conflict",
738 name, manager
739 )))
740 }
741 _ => Ok(()),
742 }
743}
744
745async fn apply_registry_configmap(
747 client: &Client,
748 namespace: &str,
749 configmap: ConfigMap,
750) -> Result<()> {
751 let configmaps: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
752 let name = configmap.metadata.name.clone().unwrap_or_default();
753
754 if let Ok(existing) = configmaps.get(&name).await {
756 verify_ownership(&existing)?;
757 }
758
759 let patch = Patch::Apply(&configmap);
760 let params = PatchParams::apply("rivven-operator").force();
761
762 configmaps
763 .patch(&name, ¶ms, &patch)
764 .await
765 .map_err(|e| OperatorError::ReconcileFailed(format!("ConfigMap: {}", e)))?;
766
767 debug!(name = %name, "Applied schema registry ConfigMap");
768 Ok(())
769}
770
771async fn apply_registry_deployment(
773 client: &Client,
774 namespace: &str,
775 deployment: Deployment,
776) -> Result<(i32, i32)> {
777 let deployments: Api<Deployment> = Api::namespaced(client.clone(), namespace);
778 let name = deployment.metadata.name.clone().unwrap_or_default();
779
780 if let Ok(existing) = deployments.get(&name).await {
782 verify_ownership(&existing)?;
783 }
784
785 let patch = Patch::Apply(&deployment);
786 let params = PatchParams::apply("rivven-operator").force();
787
788 let result = deployments
789 .patch(&name, ¶ms, &patch)
790 .await
791 .map_err(|e| OperatorError::ReconcileFailed(format!("Deployment: {}", e)))?;
792
793 let status = result.status.as_ref();
794 let replicas = status.and_then(|s| s.replicas).unwrap_or(0);
795 let ready_replicas = status.and_then(|s| s.ready_replicas).unwrap_or(0);
796
797 debug!(name = %name, replicas = replicas, ready = ready_replicas, "Applied schema registry Deployment");
798 Ok((replicas, ready_replicas))
799}
800
801async fn apply_registry_service(client: &Client, namespace: &str, service: Service) -> Result<()> {
803 let services: Api<Service> = Api::namespaced(client.clone(), namespace);
804 let name = service.metadata.name.clone().unwrap_or_default();
805
806 if let Ok(existing) = services.get(&name).await {
808 verify_ownership(&existing)?;
809 }
810
811 let patch = Patch::Apply(&service);
812 let params = PatchParams::apply("rivven-operator").force();
813
814 services
815 .patch(&name, ¶ms, &patch)
816 .await
817 .map_err(|e| OperatorError::ReconcileFailed(format!("Service: {}", e)))?;
818
819 debug!(name = %name, "Applied schema registry Service");
820 Ok(())
821}
822
823async fn apply_registry_pdb(
825 client: &Client,
826 namespace: &str,
827 pdb: PodDisruptionBudget,
828) -> Result<()> {
829 let pdbs: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
830 let name = pdb.metadata.name.clone().unwrap_or_default();
831
832 if let Ok(existing) = pdbs.get(&name).await {
834 verify_ownership(&existing)?;
835 }
836
837 let patch = Patch::Apply(&pdb);
838 let params = PatchParams::apply("rivven-operator").force();
839
840 pdbs.patch(&name, ¶ms, &patch)
841 .await
842 .map_err(|e| OperatorError::ReconcileFailed(format!("PodDisruptionBudget: {}", e)))?;
843
844 debug!(name = %name, "Applied schema registry PodDisruptionBudget");
845 Ok(())
846}
847
848async fn update_schema_registry_status(
850 client: &Client,
851 namespace: &str,
852 name: &str,
853 status: RivvenSchemaRegistryStatus,
854) -> Result<()> {
855 let registries: Api<RivvenSchemaRegistry> = Api::namespaced(client.clone(), namespace);
856
857 let patch = serde_json::json!({
858 "status": status
859 });
860
861 registries
862 .patch_status(
863 name,
864 &PatchParams::apply("rivven-operator"),
865 &Patch::Merge(&patch),
866 )
867 .await
868 .map_err(|e| OperatorError::ReconcileFailed(format!("Status update: {}", e)))?;
869
870 debug!(name = %name, phase = ?status.phase, "Updated schema registry status");
871 Ok(())
872}
873
874fn build_failed_status(registry: &RivvenSchemaRegistry, error: &str) -> RivvenSchemaRegistryStatus {
876 RivvenSchemaRegistryStatus {
877 phase: SchemaRegistryPhase::Failed,
878 replicas: 0,
879 ready_replicas: 0,
880 schemas_registered: 0,
881 subjects_count: 0,
882 contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
883 observed_generation: registry.metadata.generation.unwrap_or(0),
884 conditions: vec![SchemaRegistryCondition {
885 condition_type: "Ready".to_string(),
886 status: "False".to_string(),
887 reason: Some("ValidationFailed".to_string()),
888 message: Some(error.to_string()),
889 last_transition_time: Some(Utc::now().to_rfc3339()),
890 }],
891 endpoints: vec![],
892 storage_status: None,
893 external_sync_status: None,
894 last_sync_time: None,
895 last_updated: Some(Utc::now().to_rfc3339()),
896 message: Some(error.to_string()),
897 }
898}
899
900fn build_running_status(
902 registry: &RivvenSchemaRegistry,
903 deployment_status: &(i32, i32),
904) -> RivvenSchemaRegistryStatus {
905 let (replicas, ready_replicas) = *deployment_status;
906 let name = registry.name_any();
907 let namespace = registry
908 .namespace()
909 .unwrap_or_else(|| "default".to_string());
910
911 let phase = if ready_replicas == replicas && replicas > 0 {
912 SchemaRegistryPhase::Running
913 } else if ready_replicas > 0 {
914 SchemaRegistryPhase::Degraded
915 } else {
916 SchemaRegistryPhase::Provisioning
917 };
918
919 let endpoint = format!(
920 "http://rivven-schema-registry-{}.{}.svc.cluster.local:{}",
921 name, namespace, registry.spec.server.port
922 );
923
924 RivvenSchemaRegistryStatus {
925 phase,
926 replicas,
927 ready_replicas,
928 schemas_registered: 0, subjects_count: 0, contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
931 observed_generation: registry.metadata.generation.unwrap_or(0),
932 conditions: vec![
933 SchemaRegistryCondition {
934 condition_type: "Ready".to_string(),
935 status: if ready_replicas == replicas && replicas > 0 {
936 "True"
937 } else {
938 "False"
939 }
940 .to_string(),
941 reason: Some(
942 if ready_replicas == replicas && replicas > 0 {
943 "AllReplicasReady"
944 } else {
945 "ReplicasNotReady"
946 }
947 .to_string(),
948 ),
949 message: Some(format!("{}/{} replicas ready", ready_replicas, replicas)),
950 last_transition_time: Some(Utc::now().to_rfc3339()),
951 },
952 SchemaRegistryCondition {
953 condition_type: "StorageHealthy".to_string(),
954 status: "Unknown".to_string(),
955 reason: Some("Pending".to_string()),
956 message: Some("Storage health check pending".to_string()),
957 last_transition_time: Some(Utc::now().to_rfc3339()),
958 },
959 ],
960 endpoints: vec![endpoint],
961 storage_status: Some(registry.spec.storage.mode.clone()),
962 external_sync_status: if registry.spec.external.enabled {
963 Some("Pending".to_string())
964 } else {
965 None
966 },
967 last_sync_time: None,
968 last_updated: Some(Utc::now().to_rfc3339()),
969 message: None,
970 }
971}
972
973#[cfg(test)]
974mod tests {
975 use super::*;
976 use crate::crd::{
977 ClusterReference, ExternalRegistrySpec, PdbSpec, ProbeSpec, SchemaCompatibilitySpec,
978 SchemaContextsSpec, SchemaFormatSpec, SchemaRegistryAuthSpec, SchemaRegistryMetricsSpec,
979 SchemaRegistryServerSpec, SchemaRegistryStorageSpec, SchemaRegistryTlsSpec,
980 SchemaValidationSpec,
981 };
982
983 fn create_test_registry() -> RivvenSchemaRegistry {
984 RivvenSchemaRegistry {
985 metadata: ObjectMeta {
986 name: Some("test-registry".to_string()),
987 namespace: Some("default".to_string()),
988 uid: Some("test-uid".to_string()),
989 generation: Some(1),
990 ..Default::default()
991 },
992 spec: RivvenSchemaRegistrySpec {
993 cluster_ref: ClusterReference {
994 name: "test-cluster".to_string(),
995 namespace: None,
996 },
997 replicas: 3,
998 version: "0.0.1".to_string(),
999 image: None,
1000 image_pull_policy: "IfNotPresent".to_string(),
1001 image_pull_secrets: vec![],
1002 resources: None,
1003 server: SchemaRegistryServerSpec::default(),
1004 storage: SchemaRegistryStorageSpec::default(),
1005 compatibility: SchemaCompatibilitySpec::default(),
1006 schemas: SchemaFormatSpec::default(),
1007 contexts: SchemaContextsSpec::default(),
1008 validation: SchemaValidationSpec::default(),
1009 auth: SchemaRegistryAuthSpec::default(),
1010 tls: SchemaRegistryTlsSpec::default(),
1011 metrics: SchemaRegistryMetricsSpec::default(),
1012 external: ExternalRegistrySpec::default(),
1013 pod_annotations: BTreeMap::new(),
1014 pod_labels: BTreeMap::new(),
1015 env: vec![],
1016 node_selector: BTreeMap::new(),
1017 tolerations: vec![],
1018 affinity: None,
1019 service_account: None,
1020 security_context: None,
1021 container_security_context: None,
1022 liveness_probe: ProbeSpec::default(),
1023 readiness_probe: ProbeSpec::default(),
1024 pod_disruption_budget: PdbSpec::default(),
1025 },
1026 status: None,
1027 }
1028 }
1029
1030 #[test]
1031 fn test_build_registry_configmap() {
1032 let registry = create_test_registry();
1033 let configmap = build_registry_configmap(®istry).unwrap();
1034
1035 assert_eq!(
1036 configmap.metadata.name,
1037 Some("rivven-schema-registry-test-registry-config".to_string())
1038 );
1039 assert!(configmap.data.is_some());
1040 let data = configmap.data.unwrap();
1041 assert!(data.contains_key("config.yaml"));
1042 }
1043
1044 #[test]
1045 fn test_build_registry_deployment() {
1046 let registry = create_test_registry();
1047 let deployment = build_registry_deployment(®istry).unwrap();
1048
1049 assert_eq!(
1050 deployment.metadata.name,
1051 Some("rivven-schema-registry-test-registry".to_string())
1052 );
1053 assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(3));
1054 }
1055
1056 #[test]
1057 fn test_build_registry_service() {
1058 let registry = create_test_registry();
1059 let service = build_registry_service(®istry).unwrap();
1060
1061 assert_eq!(
1062 service.metadata.name,
1063 Some("rivven-schema-registry-test-registry".to_string())
1064 );
1065
1066 let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1067 assert_eq!(ports.len(), 2); assert_eq!(ports[0].port, 8081);
1069 }
1070
1071 #[test]
1072 fn test_build_registry_pdb() {
1073 let registry = create_test_registry();
1074 let pdb = build_registry_pdb(®istry).unwrap();
1075
1076 assert_eq!(
1077 pdb.metadata.name,
1078 Some("rivven-schema-registry-test-registry".to_string())
1079 );
1080 }
1081
1082 #[test]
1083 fn test_build_failed_status() {
1084 let registry = create_test_registry();
1085 let status = build_failed_status(®istry, "Test error");
1086
1087 assert_eq!(status.phase, SchemaRegistryPhase::Failed);
1088 assert_eq!(status.message, Some("Test error".to_string()));
1089 assert_eq!(status.conditions.len(), 1);
1090 assert_eq!(status.conditions[0].status, "False");
1091 }
1092
1093 #[test]
1094 fn test_build_running_status() {
1095 let registry = create_test_registry();
1096 let status = build_running_status(®istry, &(3, 3));
1097
1098 assert_eq!(status.phase, SchemaRegistryPhase::Running);
1099 assert_eq!(status.replicas, 3);
1100 assert_eq!(status.ready_replicas, 3);
1101 assert!(!status.endpoints.is_empty());
1102 }
1103
1104 #[test]
1105 fn test_build_running_status_degraded() {
1106 let registry = create_test_registry();
1107 let status = build_running_status(®istry, &(3, 2));
1108
1109 assert_eq!(status.phase, SchemaRegistryPhase::Degraded);
1110 assert_eq!(status.replicas, 3);
1111 assert_eq!(status.ready_replicas, 2);
1112 }
1113
1114 #[test]
1115 fn test_build_running_status_provisioning() {
1116 let registry = create_test_registry();
1117 let status = build_running_status(®istry, &(3, 0));
1118
1119 assert_eq!(status.phase, SchemaRegistryPhase::Provisioning);
1120 assert_eq!(status.ready_replicas, 0);
1121 }
1122
1123 #[test]
1124 fn test_spec_get_image_default() {
1125 let registry = create_test_registry();
1126 assert_eq!(
1127 registry.spec.get_image(),
1128 "ghcr.io/hupe1980/rivven-schema:0.0.1"
1129 );
1130 }
1131
1132 #[test]
1133 fn test_spec_get_image_custom() {
1134 let mut registry = create_test_registry();
1135 registry.spec.image = Some("custom/registry:latest".to_string());
1136 assert_eq!(registry.spec.get_image(), "custom/registry:latest");
1137 }
1138
1139 #[test]
1140 fn test_spec_get_labels() {
1141 let registry = create_test_registry();
1142 let labels = registry.spec.get_labels("test-registry");
1143
1144 assert_eq!(
1145 labels.get("app.kubernetes.io/name"),
1146 Some(&"rivven-schema-registry".to_string())
1147 );
1148 assert_eq!(
1149 labels.get("app.kubernetes.io/instance"),
1150 Some(&"test-registry".to_string())
1151 );
1152 assert_eq!(
1153 labels.get("app.kubernetes.io/component"),
1154 Some(&"schema-registry".to_string())
1155 );
1156 }
1157
1158 #[test]
1159 fn test_spec_get_selector_labels() {
1160 let registry = create_test_registry();
1161 let labels = registry.spec.get_selector_labels("test-registry");
1162
1163 assert_eq!(labels.len(), 2);
1164 assert_eq!(
1165 labels.get("app.kubernetes.io/name"),
1166 Some(&"rivven-schema-registry".to_string())
1167 );
1168 assert_eq!(
1169 labels.get("app.kubernetes.io/instance"),
1170 Some(&"test-registry".to_string())
1171 );
1172 }
1173
1174 #[test]
1175 fn test_schema_registry_phase_default() {
1176 let phase = SchemaRegistryPhase::default();
1177 assert_eq!(phase, SchemaRegistryPhase::Pending);
1178 }
1179
1180 #[test]
1181 fn test_server_spec_default() {
1182 let spec = SchemaRegistryServerSpec::default();
1183 assert_eq!(spec.port, 8081);
1184 assert_eq!(spec.bind_address, "0.0.0.0");
1185 assert_eq!(spec.timeout_seconds, 30);
1186 }
1187
1188 #[test]
1189 fn test_storage_spec_default() {
1190 let spec = SchemaRegistryStorageSpec::default();
1191 assert_eq!(spec.mode, "broker");
1192 assert_eq!(spec.topic, "_schemas");
1193 assert_eq!(spec.replication_factor, 3);
1194 }
1195
1196 #[test]
1197 fn test_compatibility_spec_default() {
1198 let spec = SchemaCompatibilitySpec::default();
1199 assert_eq!(spec.default_level, "BACKWARD");
1200 assert!(spec.allow_overrides);
1201 }
1202
1203 #[test]
1204 fn test_schema_format_spec_default() {
1205 let spec = SchemaFormatSpec::default();
1206 assert!(spec.avro);
1207 assert!(spec.json_schema);
1208 assert!(spec.protobuf);
1209 }
1210
1211 #[test]
1212 fn test_metrics_spec_default() {
1213 let spec = SchemaRegistryMetricsSpec::default();
1214 assert!(spec.enabled);
1215 assert_eq!(spec.port, 9090);
1216 assert_eq!(spec.path, "/metrics");
1217 }
1218
1219 #[test]
1220 fn test_build_registry_config_yaml() {
1221 let registry = create_test_registry();
1222 let config = build_registry_config_yaml(®istry.spec).unwrap();
1223
1224 assert!(config.contains("port: 8081"));
1225 assert!(config.contains("mode: \"broker\""));
1226 assert!(config.contains("default_level: \"BACKWARD\""));
1227 assert!(config.contains("avro: true"));
1228 }
1229
1230 #[test]
1231 fn test_controller_metrics_new() {
1232 let _metrics = SchemaRegistryControllerMetrics::new();
1233 }
1235}