Skip to main content

rivven_operator/
schema_registry_controller.rs

1//! RivvenSchemaRegistry Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenSchemaRegistry
4//! custom resources. It watches for changes and reconciles Schema Registry deployments.
5
6use crate::crd::{
7    ClusterReference, RivvenSchemaRegistry, RivvenSchemaRegistrySpec, RivvenSchemaRegistryStatus,
8    SchemaRegistryCondition, SchemaRegistryPhase,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
14use k8s_openapi::api::core::v1::{
15    ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
16    PodSpec, PodTemplateSpec, Probe, Service, ServicePort, ServiceSpec, VolumeMount,
17};
18use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
19use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
20use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
21use kube::api::{Api, Patch, PatchParams};
22use kube::runtime::controller::{Action, Controller};
23use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
24use kube::runtime::watcher::Config;
25use kube::{Client, ResourceExt};
26use std::collections::BTreeMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tracing::{debug, error, info, instrument, warn};
30use validator::Validate;
31
32/// Finalizer name for cleanup operations
33pub const SCHEMA_REGISTRY_FINALIZER: &str = "rivven.hupe1980.github.io/schema-registry-finalizer";
34
35/// Default requeue interval for successful reconciliations
36const DEFAULT_REQUEUE_SECONDS: u64 = 60; // 1 minute
37
38/// Requeue interval for error cases
39const ERROR_REQUEUE_SECONDS: u64 = 30;
40
41/// Context passed to the schema registry controller
42pub struct SchemaRegistryControllerContext {
43    /// Kubernetes client
44    pub client: Client,
45    /// Metrics recorder
46    pub metrics: Option<SchemaRegistryControllerMetrics>,
47}
48
49/// Metrics for the schema registry controller
50#[derive(Clone)]
51pub struct SchemaRegistryControllerMetrics {
52    /// Counter for reconciliation attempts
53    pub reconciliations: metrics::Counter,
54    /// Counter for reconciliation errors
55    pub errors: metrics::Counter,
56    /// Histogram for reconciliation duration
57    pub duration: metrics::Histogram,
58}
59
60impl SchemaRegistryControllerMetrics {
61    /// Create new schema registry controller metrics
62    pub fn new() -> Self {
63        Self {
64            reconciliations: metrics::counter!("rivven_schema_registry_reconciliations_total"),
65            errors: metrics::counter!("rivven_schema_registry_reconciliation_errors_total"),
66            duration: metrics::histogram!("rivven_schema_registry_reconciliation_duration_seconds"),
67        }
68    }
69}
70
71impl Default for SchemaRegistryControllerMetrics {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Start the RivvenSchemaRegistry controller
78pub async fn run_schema_registry_controller(
79    client: Client,
80    namespace: Option<String>,
81) -> Result<()> {
82    let registries: Api<RivvenSchemaRegistry> = match &namespace {
83        Some(ns) => Api::namespaced(client.clone(), ns),
84        None => Api::all(client.clone()),
85    };
86
87    let ctx = Arc::new(SchemaRegistryControllerContext {
88        client: client.clone(),
89        metrics: Some(SchemaRegistryControllerMetrics::new()),
90    });
91
92    info!(
93        namespace = namespace.as_deref().unwrap_or("all"),
94        "Starting RivvenSchemaRegistry controller"
95    );
96
97    // Watch related resources for changes
98    let deployments = match &namespace {
99        Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
100        None => Api::<Deployment>::all(client.clone()),
101    };
102
103    let configmaps = match &namespace {
104        Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
105        None => Api::<ConfigMap>::all(client.clone()),
106    };
107
108    Controller::new(registries.clone(), Config::default())
109        .owns(deployments, Config::default())
110        .owns(configmaps, Config::default())
111        .run(reconcile_schema_registry, schema_registry_error_policy, ctx)
112        .for_each(|result| async move {
113            match result {
114                Ok((obj, action)) => {
115                    debug!(
116                        name = obj.name,
117                        namespace = obj.namespace,
118                        ?action,
119                        "Schema registry reconciliation completed"
120                    );
121                }
122                Err(e) => {
123                    error!(error = %e, "Schema registry reconciliation failed");
124                }
125            }
126        })
127        .await;
128
129    Ok(())
130}
131
132/// Main reconciliation function for RivvenSchemaRegistry
133#[instrument(skip(registry, ctx), fields(name = %registry.name_any(), namespace = registry.namespace()))]
134async fn reconcile_schema_registry(
135    registry: Arc<RivvenSchemaRegistry>,
136    ctx: Arc<SchemaRegistryControllerContext>,
137) -> Result<Action> {
138    let start = std::time::Instant::now();
139
140    if let Some(ref metrics) = ctx.metrics {
141        metrics.reconciliations.increment(1);
142    }
143
144    let namespace = registry
145        .namespace()
146        .unwrap_or_else(|| "default".to_string());
147    let registries: Api<RivvenSchemaRegistry> = Api::namespaced(ctx.client.clone(), &namespace);
148
149    let result = finalizer(
150        &registries,
151        SCHEMA_REGISTRY_FINALIZER,
152        registry,
153        |event| async {
154            match event {
155                FinalizerEvent::Apply(registry) => {
156                    apply_schema_registry(registry, ctx.clone()).await
157                }
158                FinalizerEvent::Cleanup(registry) => {
159                    cleanup_schema_registry(registry, ctx.clone()).await
160                }
161            }
162        },
163    )
164    .await;
165
166    if let Some(ref metrics) = ctx.metrics {
167        metrics.duration.record(start.elapsed().as_secs_f64());
168    }
169
170    result.map_err(|e| {
171        if let Some(ref metrics) = ctx.metrics {
172            metrics.errors.increment(1);
173        }
174        OperatorError::ReconcileFailed(e.to_string())
175    })
176}
177
178/// Apply (create/update) the schema registry resources
179#[instrument(skip(registry, ctx))]
180async fn apply_schema_registry(
181    registry: Arc<RivvenSchemaRegistry>,
182    ctx: Arc<SchemaRegistryControllerContext>,
183) -> Result<Action> {
184    let name = registry.name_any();
185    let namespace = registry
186        .namespace()
187        .unwrap_or_else(|| "default".to_string());
188
189    info!(name = %name, namespace = %namespace, "Reconciling RivvenSchemaRegistry");
190
191    // Validate the registry spec
192    if let Err(errors) = registry.spec.validate() {
193        let error_messages: Vec<String> = errors
194            .field_errors()
195            .iter()
196            .flat_map(|(field, errs)| {
197                errs.iter()
198                    .map(move |e| format!("{}: {:?}", field, e.message))
199            })
200            .collect();
201        let error_msg = error_messages.join("; ");
202        warn!(name = %name, errors = %error_msg, "Schema registry spec validation failed");
203
204        // Update status to failed
205        update_schema_registry_status(
206            &ctx.client,
207            &namespace,
208            &name,
209            build_failed_status(&registry, &error_msg),
210        )
211        .await?;
212
213        return Err(OperatorError::InvalidConfig(error_msg));
214    }
215
216    // Verify cluster reference exists
217    verify_cluster_ref(&ctx.client, &namespace, &registry.spec.cluster_ref).await?;
218
219    // Build and apply ConfigMap with registry configuration
220    let configmap = build_registry_configmap(&registry)?;
221    apply_registry_configmap(&ctx.client, &namespace, configmap).await?;
222
223    // Build and apply Deployment
224    let deployment = build_registry_deployment(&registry)?;
225    let deployment_status = apply_registry_deployment(&ctx.client, &namespace, deployment).await?;
226
227    // Build and apply Service
228    let service = build_registry_service(&registry)?;
229    apply_registry_service(&ctx.client, &namespace, service).await?;
230
231    // Apply PDB if enabled
232    if registry.spec.pod_disruption_budget.enabled {
233        let pdb = build_registry_pdb(&registry)?;
234        apply_registry_pdb(&ctx.client, &namespace, pdb).await?;
235    }
236
237    // Update status
238    let status = build_running_status(&registry, &deployment_status);
239    update_schema_registry_status(&ctx.client, &namespace, &name, status).await?;
240
241    Ok(Action::requeue(Duration::from_secs(
242        DEFAULT_REQUEUE_SECONDS,
243    )))
244}
245
246/// Cleanup the schema registry resources
247#[instrument(skip(registry, _ctx))]
248async fn cleanup_schema_registry(
249    registry: Arc<RivvenSchemaRegistry>,
250    _ctx: Arc<SchemaRegistryControllerContext>,
251) -> Result<Action> {
252    let name = registry.name_any();
253    let namespace = registry
254        .namespace()
255        .unwrap_or_else(|| "default".to_string());
256
257    info!(name = %name, namespace = %namespace, "Cleaning up RivvenSchemaRegistry");
258
259    // Resources with owner references will be garbage collected automatically
260    // Additional cleanup logic can be added here if needed
261
262    Ok(Action::await_change())
263}
264
265/// Error policy for the controller
266fn schema_registry_error_policy(
267    _registry: Arc<RivvenSchemaRegistry>,
268    error: &OperatorError,
269    _ctx: Arc<SchemaRegistryControllerContext>,
270) -> Action {
271    warn!(error = %error, "Schema registry reconciliation error, will retry");
272    Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS))
273}
274
275/// Verify that the cluster reference exists
276async fn verify_cluster_ref(
277    client: &Client,
278    namespace: &str,
279    cluster_ref: &ClusterReference,
280) -> Result<()> {
281    use crate::crd::RivvenCluster;
282
283    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
284    let clusters: Api<RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
285
286    clusters
287        .get(&cluster_ref.name)
288        .await
289        .map_err(|e| OperatorError::ClusterNotFound(format!("{}: {}", cluster_ref.name, e)))?;
290
291    Ok(())
292}
293
294/// Build the ConfigMap for the schema registry
295fn build_registry_configmap(registry: &RivvenSchemaRegistry) -> Result<ConfigMap> {
296    let name = registry.name_any();
297    let namespace = registry
298        .namespace()
299        .unwrap_or_else(|| "default".to_string());
300    let resource_name = format!("rivven-schema-registry-{}", name);
301
302    let spec = &registry.spec;
303    let labels = spec.get_labels(&name);
304
305    // Build configuration YAML
306    let config = build_registry_config_yaml(spec)?;
307
308    Ok(ConfigMap {
309        metadata: ObjectMeta {
310            name: Some(format!("{}-config", resource_name)),
311            namespace: Some(namespace),
312            labels: Some(labels),
313            owner_references: Some(vec![owner_reference(registry)]),
314            ..Default::default()
315        },
316        data: Some(BTreeMap::from([("config.yaml".to_string(), config)])),
317        ..Default::default()
318    })
319}
320
321/// Build the registry configuration YAML
322fn build_registry_config_yaml(spec: &RivvenSchemaRegistrySpec) -> Result<String> {
323    let mut config = String::new();
324
325    // Server configuration
326    config.push_str(&format!(
327        "server:\n  port: {}\n  bind_address: \"{}\"\n  timeout_seconds: {}\n  max_request_size: {}\n",
328        spec.server.port,
329        spec.server.bind_address,
330        spec.server.timeout_seconds,
331        spec.server.max_request_size,
332    ));
333
334    // Storage configuration
335    config.push_str(&format!(
336        "\nstorage:\n  mode: \"{}\"\n  topic: \"{}\"\n  replication_factor: {}\n  partitions: {}\n  normalize: {}\n",
337        spec.storage.mode,
338        spec.storage.topic,
339        spec.storage.replication_factor,
340        spec.storage.partitions,
341        spec.storage.normalize,
342    ));
343
344    // Compatibility configuration
345    config.push_str(&format!(
346        "\ncompatibility:\n  default_level: \"{}\"\n  allow_overrides: {}\n",
347        spec.compatibility.default_level, spec.compatibility.allow_overrides,
348    ));
349
350    // Schema format configuration
351    config.push_str(&format!(
352        "\nschemas:\n  avro: {}\n  json_schema: {}\n  protobuf: {}\n  strict_validation: {}\n",
353        spec.schemas.avro,
354        spec.schemas.json_schema,
355        spec.schemas.protobuf,
356        spec.schemas.strict_validation,
357    ));
358
359    // Contexts configuration
360    if spec.contexts.enabled {
361        config.push_str(&format!(
362            "\ncontexts:\n  enabled: true\n  max_contexts: {}\n",
363            spec.contexts.max_contexts,
364        ));
365    }
366
367    // Metrics configuration
368    if spec.metrics.enabled {
369        config.push_str(&format!(
370            "\nmetrics:\n  enabled: true\n  port: {}\n  path: \"{}\"\n",
371            spec.metrics.port, spec.metrics.path,
372        ));
373    }
374
375    Ok(config)
376}
377
378/// Build the Deployment for the schema registry
379fn build_registry_deployment(registry: &RivvenSchemaRegistry) -> Result<Deployment> {
380    let name = registry.name_any();
381    let namespace = registry
382        .namespace()
383        .unwrap_or_else(|| "default".to_string());
384    let resource_name = format!("rivven-schema-registry-{}", name);
385
386    let spec = &registry.spec;
387    let labels = spec.get_labels(&name);
388    let selector_labels = spec.get_selector_labels(&name);
389
390    // Build container
391    let container = build_registry_container(spec, &resource_name);
392
393    // Build pod labels
394    let mut pod_labels = selector_labels.clone();
395    pod_labels.extend(spec.pod_labels.clone());
396
397    // Build pod annotations
398    let mut pod_annotations = BTreeMap::new();
399    if spec.metrics.enabled {
400        pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
401        pod_annotations.insert(
402            "prometheus.io/port".to_string(),
403            spec.metrics.port.to_string(),
404        );
405        pod_annotations.insert("prometheus.io/path".to_string(), spec.metrics.path.clone());
406    }
407    pod_annotations.extend(spec.pod_annotations.clone());
408
409    let pod_spec = PodSpec {
410        containers: vec![container],
411        node_selector: if spec.node_selector.is_empty() {
412            None
413        } else {
414            Some(spec.node_selector.clone())
415        },
416        tolerations: if spec.tolerations.is_empty() {
417            None
418        } else {
419            Some(spec.tolerations.clone())
420        },
421        service_account_name: spec.service_account.clone(),
422        image_pull_secrets: if spec.image_pull_secrets.is_empty() {
423            None
424        } else {
425            Some(
426                spec.image_pull_secrets
427                    .iter()
428                    .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
429                    .collect(),
430            )
431        },
432        // Disable service account token auto-mounting for security
433        automount_service_account_token: Some(false),
434        ..Default::default()
435    };
436
437    Ok(Deployment {
438        metadata: ObjectMeta {
439            name: Some(resource_name.clone()),
440            namespace: Some(namespace),
441            labels: Some(labels.clone()),
442            owner_references: Some(vec![owner_reference(registry)]),
443            ..Default::default()
444        },
445        spec: Some(DeploymentSpec {
446            replicas: Some(spec.replicas),
447            selector: LabelSelector {
448                match_labels: Some(selector_labels),
449                ..Default::default()
450            },
451            template: PodTemplateSpec {
452                metadata: Some(ObjectMeta {
453                    labels: Some(pod_labels),
454                    annotations: Some(pod_annotations),
455                    ..Default::default()
456                }),
457                spec: Some(pod_spec),
458            },
459            ..Default::default()
460        }),
461        ..Default::default()
462    })
463}
464
465/// Build the container for the schema registry
466fn build_registry_container(spec: &RivvenSchemaRegistrySpec, _resource_name: &str) -> Container {
467    // Build environment variables
468    let mut env = vec![
469        EnvVar {
470            name: "RIVVEN_SCHEMA_PORT".to_string(),
471            value: Some(spec.server.port.to_string()),
472            ..Default::default()
473        },
474        EnvVar {
475            name: "RIVVEN_SCHEMA_BIND_ADDRESS".to_string(),
476            value: Some(spec.server.bind_address.clone()),
477            ..Default::default()
478        },
479        EnvVar {
480            name: "RIVVEN_SCHEMA_STORAGE_MODE".to_string(),
481            value: Some(spec.storage.mode.clone()),
482            ..Default::default()
483        },
484        EnvVar {
485            name: "RIVVEN_SCHEMA_STORAGE_TOPIC".to_string(),
486            value: Some(spec.storage.topic.clone()),
487            ..Default::default()
488        },
489        EnvVar {
490            name: "RIVVEN_SCHEMA_COMPATIBILITY".to_string(),
491            value: Some(spec.compatibility.default_level.clone()),
492            ..Default::default()
493        },
494        // Broker connection (from cluster reference)
495        EnvVar {
496            name: "RIVVEN_BROKERS".to_string(),
497            value: Some(format!(
498                "rivven-{}-headless.{}.svc.cluster.local:9092",
499                spec.cluster_ref.name,
500                spec.cluster_ref.namespace.as_deref().unwrap_or("default")
501            )),
502            ..Default::default()
503        },
504        // Pod info
505        EnvVar {
506            name: "POD_NAME".to_string(),
507            value_from: Some(EnvVarSource {
508                field_ref: Some(ObjectFieldSelector {
509                    field_path: "metadata.name".to_string(),
510                    ..Default::default()
511                }),
512                ..Default::default()
513            }),
514            ..Default::default()
515        },
516        EnvVar {
517            name: "POD_NAMESPACE".to_string(),
518            value_from: Some(EnvVarSource {
519                field_ref: Some(ObjectFieldSelector {
520                    field_path: "metadata.namespace".to_string(),
521                    ..Default::default()
522                }),
523                ..Default::default()
524            }),
525            ..Default::default()
526        },
527    ];
528
529    // Add custom env vars
530    for e in &spec.env {
531        if !env.iter().any(|existing| existing.name == e.name) {
532            env.push(e.clone());
533        }
534    }
535
536    // Build probes
537    let liveness_probe = if spec.liveness_probe.enabled {
538        Some(Probe {
539            http_get: Some(HTTPGetAction {
540                path: Some("/health/live".to_string()),
541                port: IntOrString::Int(spec.server.port),
542                ..Default::default()
543            }),
544            initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
545            period_seconds: Some(spec.liveness_probe.period_seconds),
546            timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
547            success_threshold: Some(spec.liveness_probe.success_threshold),
548            failure_threshold: Some(spec.liveness_probe.failure_threshold),
549            ..Default::default()
550        })
551    } else {
552        None
553    };
554
555    let readiness_probe = if spec.readiness_probe.enabled {
556        Some(Probe {
557            http_get: Some(HTTPGetAction {
558                path: Some("/health/ready".to_string()),
559                port: IntOrString::Int(spec.server.port),
560                ..Default::default()
561            }),
562            initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
563            period_seconds: Some(spec.readiness_probe.period_seconds),
564            timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
565            success_threshold: Some(spec.readiness_probe.success_threshold),
566            failure_threshold: Some(spec.readiness_probe.failure_threshold),
567            ..Default::default()
568        })
569    } else {
570        None
571    };
572
573    // Container ports
574    let mut ports = vec![ContainerPort {
575        name: Some("http".to_string()),
576        container_port: spec.server.port,
577        protocol: Some("TCP".to_string()),
578        ..Default::default()
579    }];
580
581    if spec.metrics.enabled && spec.metrics.port != spec.server.port {
582        ports.push(ContainerPort {
583            name: Some("metrics".to_string()),
584            container_port: spec.metrics.port,
585            protocol: Some("TCP".to_string()),
586            ..Default::default()
587        });
588    }
589
590    Container {
591        name: "rivven-schema-registry".to_string(),
592        image: Some(spec.get_image()),
593        image_pull_policy: Some(spec.image_pull_policy.clone()),
594        command: Some(vec!["rivven-schema".to_string()]),
595        args: Some(vec![
596            "serve".to_string(),
597            "--config".to_string(),
598            "/etc/rivven-schema/config.yaml".to_string(),
599        ]),
600        env: Some(env),
601        ports: Some(ports),
602        liveness_probe,
603        readiness_probe,
604        volume_mounts: Some(vec![VolumeMount {
605            name: "config".to_string(),
606            mount_path: "/etc/rivven-schema".to_string(),
607            read_only: Some(true),
608            ..Default::default()
609        }]),
610        // Secure defaults
611        security_context: Some(k8s_openapi::api::core::v1::SecurityContext {
612            run_as_non_root: Some(true),
613            run_as_user: Some(1000),
614            run_as_group: Some(1000),
615            read_only_root_filesystem: Some(true),
616            allow_privilege_escalation: Some(false),
617            capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
618                drop: Some(vec!["ALL".to_string()]),
619                ..Default::default()
620            }),
621            ..Default::default()
622        }),
623        ..Default::default()
624    }
625}
626
627/// Build the Service for the schema registry
628fn build_registry_service(registry: &RivvenSchemaRegistry) -> Result<Service> {
629    let name = registry.name_any();
630    let namespace = registry
631        .namespace()
632        .unwrap_or_else(|| "default".to_string());
633    let resource_name = format!("rivven-schema-registry-{}", name);
634
635    let spec = &registry.spec;
636    let labels = spec.get_labels(&name);
637    let selector_labels = spec.get_selector_labels(&name);
638
639    let mut ports = vec![ServicePort {
640        name: Some("http".to_string()),
641        port: spec.server.port,
642        target_port: Some(IntOrString::Int(spec.server.port)),
643        protocol: Some("TCP".to_string()),
644        ..Default::default()
645    }];
646
647    if spec.metrics.enabled && spec.metrics.port != spec.server.port {
648        ports.push(ServicePort {
649            name: Some("metrics".to_string()),
650            port: spec.metrics.port,
651            target_port: Some(IntOrString::Int(spec.metrics.port)),
652            protocol: Some("TCP".to_string()),
653            ..Default::default()
654        });
655    }
656
657    Ok(Service {
658        metadata: ObjectMeta {
659            name: Some(resource_name),
660            namespace: Some(namespace),
661            labels: Some(labels),
662            owner_references: Some(vec![owner_reference(registry)]),
663            ..Default::default()
664        },
665        spec: Some(ServiceSpec {
666            selector: Some(selector_labels),
667            ports: Some(ports),
668            type_: Some("ClusterIP".to_string()),
669            ..Default::default()
670        }),
671        ..Default::default()
672    })
673}
674
675/// Build the PodDisruptionBudget for the schema registry
676fn build_registry_pdb(registry: &RivvenSchemaRegistry) -> Result<PodDisruptionBudget> {
677    let name = registry.name_any();
678    let namespace = registry
679        .namespace()
680        .unwrap_or_else(|| "default".to_string());
681    let resource_name = format!("rivven-schema-registry-{}", name);
682
683    let spec = &registry.spec;
684    let labels = spec.get_labels(&name);
685    let selector_labels = spec.get_selector_labels(&name);
686
687    let pdb_spec = &spec.pod_disruption_budget;
688
689    Ok(PodDisruptionBudget {
690        metadata: ObjectMeta {
691            name: Some(resource_name),
692            namespace: Some(namespace),
693            labels: Some(labels),
694            owner_references: Some(vec![owner_reference(registry)]),
695            ..Default::default()
696        },
697        spec: Some(PodDisruptionBudgetSpec {
698            selector: Some(LabelSelector {
699                match_labels: Some(selector_labels),
700                ..Default::default()
701            }),
702            min_available: pdb_spec
703                .min_available
704                .as_ref()
705                .map(|v| IntOrString::String(v.clone())),
706            max_unavailable: pdb_spec
707                .max_unavailable
708                .as_ref()
709                .map(|v| IntOrString::String(v.clone())),
710            ..Default::default()
711        }),
712        ..Default::default()
713    })
714}
715
716/// Create owner reference for managed resources
717fn owner_reference(registry: &RivvenSchemaRegistry) -> OwnerReference {
718    OwnerReference {
719        api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
720        kind: "RivvenSchemaRegistry".to_string(),
721        name: registry.name_any(),
722        uid: registry.metadata.uid.clone().unwrap_or_default(),
723        controller: Some(true),
724        block_owner_deletion: Some(true),
725    }
726}
727
728/// Apply ConfigMap to cluster
729async fn apply_registry_configmap(
730    client: &Client,
731    namespace: &str,
732    configmap: ConfigMap,
733) -> Result<()> {
734    let configmaps: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
735    let name = configmap.metadata.name.clone().unwrap_or_default();
736
737    let patch = Patch::Apply(&configmap);
738    let params = PatchParams::apply("rivven-operator").force();
739
740    configmaps
741        .patch(&name, &params, &patch)
742        .await
743        .map_err(|e| OperatorError::ReconcileFailed(format!("ConfigMap: {}", e)))?;
744
745    debug!(name = %name, "Applied schema registry ConfigMap");
746    Ok(())
747}
748
749/// Apply Deployment to cluster
750async fn apply_registry_deployment(
751    client: &Client,
752    namespace: &str,
753    deployment: Deployment,
754) -> Result<(i32, i32)> {
755    let deployments: Api<Deployment> = Api::namespaced(client.clone(), namespace);
756    let name = deployment.metadata.name.clone().unwrap_or_default();
757
758    let patch = Patch::Apply(&deployment);
759    let params = PatchParams::apply("rivven-operator").force();
760
761    let result = deployments
762        .patch(&name, &params, &patch)
763        .await
764        .map_err(|e| OperatorError::ReconcileFailed(format!("Deployment: {}", e)))?;
765
766    let status = result.status.as_ref();
767    let replicas = status.and_then(|s| s.replicas).unwrap_or(0);
768    let ready_replicas = status.and_then(|s| s.ready_replicas).unwrap_or(0);
769
770    debug!(name = %name, replicas = replicas, ready = ready_replicas, "Applied schema registry Deployment");
771    Ok((replicas, ready_replicas))
772}
773
774/// Apply Service to cluster
775async fn apply_registry_service(client: &Client, namespace: &str, service: Service) -> Result<()> {
776    let services: Api<Service> = Api::namespaced(client.clone(), namespace);
777    let name = service.metadata.name.clone().unwrap_or_default();
778
779    let patch = Patch::Apply(&service);
780    let params = PatchParams::apply("rivven-operator").force();
781
782    services
783        .patch(&name, &params, &patch)
784        .await
785        .map_err(|e| OperatorError::ReconcileFailed(format!("Service: {}", e)))?;
786
787    debug!(name = %name, "Applied schema registry Service");
788    Ok(())
789}
790
791/// Apply PodDisruptionBudget to cluster
792async fn apply_registry_pdb(
793    client: &Client,
794    namespace: &str,
795    pdb: PodDisruptionBudget,
796) -> Result<()> {
797    let pdbs: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
798    let name = pdb.metadata.name.clone().unwrap_or_default();
799
800    let patch = Patch::Apply(&pdb);
801    let params = PatchParams::apply("rivven-operator").force();
802
803    pdbs.patch(&name, &params, &patch)
804        .await
805        .map_err(|e| OperatorError::ReconcileFailed(format!("PodDisruptionBudget: {}", e)))?;
806
807    debug!(name = %name, "Applied schema registry PodDisruptionBudget");
808    Ok(())
809}
810
811/// Update the schema registry status
812async fn update_schema_registry_status(
813    client: &Client,
814    namespace: &str,
815    name: &str,
816    status: RivvenSchemaRegistryStatus,
817) -> Result<()> {
818    let registries: Api<RivvenSchemaRegistry> = Api::namespaced(client.clone(), namespace);
819
820    let patch = serde_json::json!({
821        "status": status
822    });
823
824    registries
825        .patch_status(
826            name,
827            &PatchParams::apply("rivven-operator"),
828            &Patch::Merge(&patch),
829        )
830        .await
831        .map_err(|e| OperatorError::ReconcileFailed(format!("Status update: {}", e)))?;
832
833    debug!(name = %name, phase = ?status.phase, "Updated schema registry status");
834    Ok(())
835}
836
837/// Build status for a failed registry
838fn build_failed_status(registry: &RivvenSchemaRegistry, error: &str) -> RivvenSchemaRegistryStatus {
839    RivvenSchemaRegistryStatus {
840        phase: SchemaRegistryPhase::Failed,
841        replicas: 0,
842        ready_replicas: 0,
843        schemas_registered: 0,
844        subjects_count: 0,
845        contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
846        observed_generation: registry.metadata.generation.unwrap_or(0),
847        conditions: vec![SchemaRegistryCondition {
848            condition_type: "Ready".to_string(),
849            status: "False".to_string(),
850            reason: Some("ValidationFailed".to_string()),
851            message: Some(error.to_string()),
852            last_transition_time: Some(Utc::now().to_rfc3339()),
853        }],
854        endpoints: vec![],
855        storage_status: None,
856        external_sync_status: None,
857        last_sync_time: None,
858        last_updated: Some(Utc::now().to_rfc3339()),
859        message: Some(error.to_string()),
860    }
861}
862
863/// Build status for a running registry
864fn build_running_status(
865    registry: &RivvenSchemaRegistry,
866    deployment_status: &(i32, i32),
867) -> RivvenSchemaRegistryStatus {
868    let (replicas, ready_replicas) = *deployment_status;
869    let name = registry.name_any();
870    let namespace = registry
871        .namespace()
872        .unwrap_or_else(|| "default".to_string());
873
874    let phase = if ready_replicas == replicas && replicas > 0 {
875        SchemaRegistryPhase::Running
876    } else if ready_replicas > 0 {
877        SchemaRegistryPhase::Degraded
878    } else {
879        SchemaRegistryPhase::Provisioning
880    };
881
882    let endpoint = format!(
883        "http://rivven-schema-registry-{}.{}.svc.cluster.local:{}",
884        name, namespace, registry.spec.server.port
885    );
886
887    RivvenSchemaRegistryStatus {
888        phase,
889        replicas,
890        ready_replicas,
891        schemas_registered: 0, // Updated by registry itself
892        subjects_count: 0,     // Updated by registry itself
893        contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
894        observed_generation: registry.metadata.generation.unwrap_or(0),
895        conditions: vec![
896            SchemaRegistryCondition {
897                condition_type: "Ready".to_string(),
898                status: if ready_replicas == replicas && replicas > 0 {
899                    "True"
900                } else {
901                    "False"
902                }
903                .to_string(),
904                reason: Some(
905                    if ready_replicas == replicas && replicas > 0 {
906                        "AllReplicasReady"
907                    } else {
908                        "ReplicasNotReady"
909                    }
910                    .to_string(),
911                ),
912                message: Some(format!("{}/{} replicas ready", ready_replicas, replicas)),
913                last_transition_time: Some(Utc::now().to_rfc3339()),
914            },
915            SchemaRegistryCondition {
916                condition_type: "StorageHealthy".to_string(),
917                status: "Unknown".to_string(),
918                reason: Some("Pending".to_string()),
919                message: Some("Storage health check pending".to_string()),
920                last_transition_time: Some(Utc::now().to_rfc3339()),
921            },
922        ],
923        endpoints: vec![endpoint],
924        storage_status: Some(registry.spec.storage.mode.clone()),
925        external_sync_status: if registry.spec.external.enabled {
926            Some("Pending".to_string())
927        } else {
928            None
929        },
930        last_sync_time: None,
931        last_updated: Some(Utc::now().to_rfc3339()),
932        message: None,
933    }
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939    use crate::crd::{
940        ClusterReference, ExternalRegistrySpec, PdbSpec, ProbeSpec, SchemaCompatibilitySpec,
941        SchemaContextsSpec, SchemaFormatSpec, SchemaRegistryAuthSpec, SchemaRegistryMetricsSpec,
942        SchemaRegistryServerSpec, SchemaRegistryStorageSpec, SchemaRegistryTlsSpec,
943        SchemaValidationSpec,
944    };
945
946    fn create_test_registry() -> RivvenSchemaRegistry {
947        RivvenSchemaRegistry {
948            metadata: ObjectMeta {
949                name: Some("test-registry".to_string()),
950                namespace: Some("default".to_string()),
951                uid: Some("test-uid".to_string()),
952                generation: Some(1),
953                ..Default::default()
954            },
955            spec: RivvenSchemaRegistrySpec {
956                cluster_ref: ClusterReference {
957                    name: "test-cluster".to_string(),
958                    namespace: None,
959                },
960                replicas: 3,
961                version: "0.0.1".to_string(),
962                image: None,
963                image_pull_policy: "IfNotPresent".to_string(),
964                image_pull_secrets: vec![],
965                resources: None,
966                server: SchemaRegistryServerSpec::default(),
967                storage: SchemaRegistryStorageSpec::default(),
968                compatibility: SchemaCompatibilitySpec::default(),
969                schemas: SchemaFormatSpec::default(),
970                contexts: SchemaContextsSpec::default(),
971                validation: SchemaValidationSpec::default(),
972                auth: SchemaRegistryAuthSpec::default(),
973                tls: SchemaRegistryTlsSpec::default(),
974                metrics: SchemaRegistryMetricsSpec::default(),
975                external: ExternalRegistrySpec::default(),
976                pod_annotations: BTreeMap::new(),
977                pod_labels: BTreeMap::new(),
978                env: vec![],
979                node_selector: BTreeMap::new(),
980                tolerations: vec![],
981                affinity: None,
982                service_account: None,
983                security_context: None,
984                container_security_context: None,
985                liveness_probe: ProbeSpec::default(),
986                readiness_probe: ProbeSpec::default(),
987                pod_disruption_budget: PdbSpec::default(),
988            },
989            status: None,
990        }
991    }
992
993    #[test]
994    fn test_build_registry_configmap() {
995        let registry = create_test_registry();
996        let configmap = build_registry_configmap(&registry).unwrap();
997
998        assert_eq!(
999            configmap.metadata.name,
1000            Some("rivven-schema-registry-test-registry-config".to_string())
1001        );
1002        assert!(configmap.data.is_some());
1003        let data = configmap.data.unwrap();
1004        assert!(data.contains_key("config.yaml"));
1005    }
1006
1007    #[test]
1008    fn test_build_registry_deployment() {
1009        let registry = create_test_registry();
1010        let deployment = build_registry_deployment(&registry).unwrap();
1011
1012        assert_eq!(
1013            deployment.metadata.name,
1014            Some("rivven-schema-registry-test-registry".to_string())
1015        );
1016        assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(3));
1017    }
1018
1019    #[test]
1020    fn test_build_registry_service() {
1021        let registry = create_test_registry();
1022        let service = build_registry_service(&registry).unwrap();
1023
1024        assert_eq!(
1025            service.metadata.name,
1026            Some("rivven-schema-registry-test-registry".to_string())
1027        );
1028
1029        let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1030        assert_eq!(ports.len(), 2); // http + metrics
1031        assert_eq!(ports[0].port, 8081);
1032    }
1033
1034    #[test]
1035    fn test_build_registry_pdb() {
1036        let registry = create_test_registry();
1037        let pdb = build_registry_pdb(&registry).unwrap();
1038
1039        assert_eq!(
1040            pdb.metadata.name,
1041            Some("rivven-schema-registry-test-registry".to_string())
1042        );
1043    }
1044
1045    #[test]
1046    fn test_build_failed_status() {
1047        let registry = create_test_registry();
1048        let status = build_failed_status(&registry, "Test error");
1049
1050        assert_eq!(status.phase, SchemaRegistryPhase::Failed);
1051        assert_eq!(status.message, Some("Test error".to_string()));
1052        assert_eq!(status.conditions.len(), 1);
1053        assert_eq!(status.conditions[0].status, "False");
1054    }
1055
1056    #[test]
1057    fn test_build_running_status() {
1058        let registry = create_test_registry();
1059        let status = build_running_status(&registry, &(3, 3));
1060
1061        assert_eq!(status.phase, SchemaRegistryPhase::Running);
1062        assert_eq!(status.replicas, 3);
1063        assert_eq!(status.ready_replicas, 3);
1064        assert!(!status.endpoints.is_empty());
1065    }
1066
1067    #[test]
1068    fn test_build_running_status_degraded() {
1069        let registry = create_test_registry();
1070        let status = build_running_status(&registry, &(3, 2));
1071
1072        assert_eq!(status.phase, SchemaRegistryPhase::Degraded);
1073        assert_eq!(status.replicas, 3);
1074        assert_eq!(status.ready_replicas, 2);
1075    }
1076
1077    #[test]
1078    fn test_build_running_status_provisioning() {
1079        let registry = create_test_registry();
1080        let status = build_running_status(&registry, &(3, 0));
1081
1082        assert_eq!(status.phase, SchemaRegistryPhase::Provisioning);
1083        assert_eq!(status.ready_replicas, 0);
1084    }
1085
1086    #[test]
1087    fn test_spec_get_image_default() {
1088        let registry = create_test_registry();
1089        assert_eq!(
1090            registry.spec.get_image(),
1091            "ghcr.io/hupe1980/rivven-schema:0.0.1"
1092        );
1093    }
1094
1095    #[test]
1096    fn test_spec_get_image_custom() {
1097        let mut registry = create_test_registry();
1098        registry.spec.image = Some("custom/registry:latest".to_string());
1099        assert_eq!(registry.spec.get_image(), "custom/registry:latest");
1100    }
1101
1102    #[test]
1103    fn test_spec_get_labels() {
1104        let registry = create_test_registry();
1105        let labels = registry.spec.get_labels("test-registry");
1106
1107        assert_eq!(
1108            labels.get("app.kubernetes.io/name"),
1109            Some(&"rivven-schema-registry".to_string())
1110        );
1111        assert_eq!(
1112            labels.get("app.kubernetes.io/instance"),
1113            Some(&"test-registry".to_string())
1114        );
1115        assert_eq!(
1116            labels.get("app.kubernetes.io/component"),
1117            Some(&"schema-registry".to_string())
1118        );
1119    }
1120
1121    #[test]
1122    fn test_spec_get_selector_labels() {
1123        let registry = create_test_registry();
1124        let labels = registry.spec.get_selector_labels("test-registry");
1125
1126        assert_eq!(labels.len(), 2);
1127        assert_eq!(
1128            labels.get("app.kubernetes.io/name"),
1129            Some(&"rivven-schema-registry".to_string())
1130        );
1131        assert_eq!(
1132            labels.get("app.kubernetes.io/instance"),
1133            Some(&"test-registry".to_string())
1134        );
1135    }
1136
1137    #[test]
1138    fn test_schema_registry_phase_default() {
1139        let phase = SchemaRegistryPhase::default();
1140        assert_eq!(phase, SchemaRegistryPhase::Pending);
1141    }
1142
1143    #[test]
1144    fn test_server_spec_default() {
1145        let spec = SchemaRegistryServerSpec::default();
1146        assert_eq!(spec.port, 8081);
1147        assert_eq!(spec.bind_address, "0.0.0.0");
1148        assert_eq!(spec.timeout_seconds, 30);
1149    }
1150
1151    #[test]
1152    fn test_storage_spec_default() {
1153        let spec = SchemaRegistryStorageSpec::default();
1154        assert_eq!(spec.mode, "broker");
1155        assert_eq!(spec.topic, "_schemas");
1156        assert_eq!(spec.replication_factor, 3);
1157    }
1158
1159    #[test]
1160    fn test_compatibility_spec_default() {
1161        let spec = SchemaCompatibilitySpec::default();
1162        assert_eq!(spec.default_level, "BACKWARD");
1163        assert!(spec.allow_overrides);
1164    }
1165
1166    #[test]
1167    fn test_schema_format_spec_default() {
1168        let spec = SchemaFormatSpec::default();
1169        assert!(spec.avro);
1170        assert!(spec.json_schema);
1171        assert!(spec.protobuf);
1172    }
1173
1174    #[test]
1175    fn test_metrics_spec_default() {
1176        let spec = SchemaRegistryMetricsSpec::default();
1177        assert!(spec.enabled);
1178        assert_eq!(spec.port, 9090);
1179        assert_eq!(spec.path, "/metrics");
1180    }
1181
1182    #[test]
1183    fn test_build_registry_config_yaml() {
1184        let registry = create_test_registry();
1185        let config = build_registry_config_yaml(&registry.spec).unwrap();
1186
1187        assert!(config.contains("port: 8081"));
1188        assert!(config.contains("mode: \"broker\""));
1189        assert!(config.contains("default_level: \"BACKWARD\""));
1190        assert!(config.contains("avro: true"));
1191    }
1192
1193    #[test]
1194    fn test_controller_metrics_new() {
1195        let _metrics = SchemaRegistryControllerMetrics::new();
1196        // Test passes if it doesn't panic during construction
1197    }
1198}