Skip to main content

rivven_operator/
schema_registry_controller.rs

1//! RivvenSchemaRegistry Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenSchemaRegistry
4//! custom resources. It watches for changes and reconciles Schema Registry deployments.
5
6use crate::crd::{
7    ClusterReference, RivvenSchemaRegistry, RivvenSchemaRegistrySpec, RivvenSchemaRegistryStatus,
8    SchemaRegistryCondition, SchemaRegistryPhase,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
14use k8s_openapi::api::core::v1::{
15    ConfigMap, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, ObjectFieldSelector,
16    PodSpec, PodTemplateSpec, Probe, Service, ServicePort, ServiceSpec, VolumeMount,
17};
18use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
19use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta, OwnerReference};
20use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
21use kube::api::{Api, Patch, PatchParams};
22use kube::runtime::controller::{Action, Controller};
23use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
24use kube::runtime::watcher::Config;
25use kube::{Client, Resource, ResourceExt};
26use std::collections::BTreeMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tracing::{debug, error, info, instrument, warn};
30use validator::Validate;
31
32/// Finalizer name for cleanup operations
33pub const SCHEMA_REGISTRY_FINALIZER: &str = "rivven.hupe1980.github.io/schema-registry-finalizer";
34
35/// Default requeue interval for successful reconciliations
36const DEFAULT_REQUEUE_SECONDS: u64 = 60; // 1 minute
37
38/// Requeue interval for error cases
39const ERROR_REQUEUE_SECONDS: u64 = 30;
40
41/// Context passed to the schema registry controller
42pub struct SchemaRegistryControllerContext {
43    /// Kubernetes client
44    pub client: Client,
45    /// Metrics recorder
46    pub metrics: Option<SchemaRegistryControllerMetrics>,
47}
48
49/// Metrics for the schema registry controller
50#[derive(Clone)]
51pub struct SchemaRegistryControllerMetrics {
52    /// Counter for reconciliation attempts
53    pub reconciliations: metrics::Counter,
54    /// Counter for reconciliation errors
55    pub errors: metrics::Counter,
56    /// Histogram for reconciliation duration
57    pub duration: metrics::Histogram,
58}
59
60impl SchemaRegistryControllerMetrics {
61    /// Create new schema registry controller metrics
62    pub fn new() -> Self {
63        Self {
64            reconciliations: metrics::counter!("rivven_schema_registry_reconciliations_total"),
65            errors: metrics::counter!("rivven_schema_registry_reconciliation_errors_total"),
66            duration: metrics::histogram!("rivven_schema_registry_reconciliation_duration_seconds"),
67        }
68    }
69}
70
71impl Default for SchemaRegistryControllerMetrics {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Start the RivvenSchemaRegistry controller
78pub async fn run_schema_registry_controller(
79    client: Client,
80    namespace: Option<String>,
81) -> Result<()> {
82    let registries: Api<RivvenSchemaRegistry> = match &namespace {
83        Some(ns) => Api::namespaced(client.clone(), ns),
84        None => Api::all(client.clone()),
85    };
86
87    let ctx = Arc::new(SchemaRegistryControllerContext {
88        client: client.clone(),
89        metrics: Some(SchemaRegistryControllerMetrics::new()),
90    });
91
92    info!(
93        namespace = namespace.as_deref().unwrap_or("all"),
94        "Starting RivvenSchemaRegistry controller"
95    );
96
97    // Watch related resources for changes
98    let deployments = match &namespace {
99        Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
100        None => Api::<Deployment>::all(client.clone()),
101    };
102
103    let configmaps = match &namespace {
104        Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
105        None => Api::<ConfigMap>::all(client.clone()),
106    };
107
108    Controller::new(registries.clone(), Config::default())
109        .owns(deployments, Config::default())
110        .owns(configmaps, Config::default())
111        .run(reconcile_schema_registry, schema_registry_error_policy, ctx)
112        .for_each(|result| async move {
113            match result {
114                Ok((obj, action)) => {
115                    debug!(
116                        name = obj.name,
117                        namespace = obj.namespace,
118                        ?action,
119                        "Schema registry reconciliation completed"
120                    );
121                }
122                Err(e) => {
123                    error!(error = %e, "Schema registry reconciliation failed");
124                }
125            }
126        })
127        .await;
128
129    Ok(())
130}
131
132/// Main reconciliation function for RivvenSchemaRegistry
133#[instrument(skip(registry, ctx), fields(name = %registry.name_any(), namespace = registry.namespace()))]
134async fn reconcile_schema_registry(
135    registry: Arc<RivvenSchemaRegistry>,
136    ctx: Arc<SchemaRegistryControllerContext>,
137) -> Result<Action> {
138    let start = std::time::Instant::now();
139
140    if let Some(ref metrics) = ctx.metrics {
141        metrics.reconciliations.increment(1);
142    }
143
144    let namespace = registry
145        .namespace()
146        .unwrap_or_else(|| "default".to_string());
147    let registries: Api<RivvenSchemaRegistry> = Api::namespaced(ctx.client.clone(), &namespace);
148
149    let result = finalizer(
150        &registries,
151        SCHEMA_REGISTRY_FINALIZER,
152        registry,
153        |event| async {
154            match event {
155                FinalizerEvent::Apply(registry) => {
156                    apply_schema_registry(registry, ctx.clone()).await
157                }
158                FinalizerEvent::Cleanup(registry) => {
159                    cleanup_schema_registry(registry, ctx.clone()).await
160                }
161            }
162        },
163    )
164    .await;
165
166    if let Some(ref metrics) = ctx.metrics {
167        metrics.duration.record(start.elapsed().as_secs_f64());
168    }
169
170    result.map_err(|e| {
171        if let Some(ref metrics) = ctx.metrics {
172            metrics.errors.increment(1);
173        }
174        OperatorError::ReconcileFailed(e.to_string())
175    })
176}
177
178/// Apply (create/update) the schema registry resources
179#[instrument(skip(registry, ctx))]
180async fn apply_schema_registry(
181    registry: Arc<RivvenSchemaRegistry>,
182    ctx: Arc<SchemaRegistryControllerContext>,
183) -> Result<Action> {
184    let name = registry.name_any();
185    let namespace = registry
186        .namespace()
187        .unwrap_or_else(|| "default".to_string());
188
189    info!(name = %name, namespace = %namespace, "Reconciling RivvenSchemaRegistry");
190
191    // Validate the registry spec
192    if let Err(errors) = registry.spec.validate() {
193        let error_messages: Vec<String> = errors
194            .field_errors()
195            .iter()
196            .flat_map(|(field, errs)| {
197                errs.iter()
198                    .map(move |e| format!("{}: {:?}", field, e.message))
199            })
200            .collect();
201        let error_msg = error_messages.join("; ");
202        warn!(name = %name, errors = %error_msg, "Schema registry spec validation failed");
203
204        // Update status to failed
205        update_schema_registry_status(
206            &ctx.client,
207            &namespace,
208            &name,
209            build_failed_status(&registry, &error_msg),
210        )
211        .await?;
212
213        return Err(OperatorError::InvalidConfig(error_msg));
214    }
215
216    // Verify cluster reference exists
217    verify_cluster_ref(&ctx.client, &namespace, &registry.spec.cluster_ref).await?;
218
219    // Build and apply ConfigMap with registry configuration
220    let configmap = build_registry_configmap(&registry)?;
221    apply_registry_configmap(&ctx.client, &namespace, configmap).await?;
222
223    // Build and apply Deployment
224    let deployment = build_registry_deployment(&registry)?;
225    let deployment_status = apply_registry_deployment(&ctx.client, &namespace, deployment).await?;
226
227    // Build and apply Service
228    let service = build_registry_service(&registry)?;
229    apply_registry_service(&ctx.client, &namespace, service).await?;
230
231    // Apply PDB if enabled
232    if registry.spec.pod_disruption_budget.enabled {
233        let pdb = build_registry_pdb(&registry)?;
234        apply_registry_pdb(&ctx.client, &namespace, pdb).await?;
235    }
236
237    // Update status
238    let status = build_running_status(&registry, &deployment_status);
239    update_schema_registry_status(&ctx.client, &namespace, &name, status).await?;
240
241    Ok(Action::requeue(Duration::from_secs(
242        DEFAULT_REQUEUE_SECONDS,
243    )))
244}
245
246/// Cleanup the schema registry resources
247#[instrument(skip(registry, _ctx))]
248async fn cleanup_schema_registry(
249    registry: Arc<RivvenSchemaRegistry>,
250    _ctx: Arc<SchemaRegistryControllerContext>,
251) -> Result<Action> {
252    let name = registry.name_any();
253    let namespace = registry
254        .namespace()
255        .unwrap_or_else(|| "default".to_string());
256
257    info!(name = %name, namespace = %namespace, "Cleaning up RivvenSchemaRegistry");
258
259    // Resources with owner references will be garbage collected automatically
260    // Additional cleanup logic can be added here if needed
261
262    Ok(Action::await_change())
263}
264
265/// Error policy for the controller
266fn schema_registry_error_policy(
267    _registry: Arc<RivvenSchemaRegistry>,
268    error: &OperatorError,
269    _ctx: Arc<SchemaRegistryControllerContext>,
270) -> Action {
271    warn!(error = %error, "Schema registry reconciliation error, will retry");
272    Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS))
273}
274
275/// Verify that the cluster reference exists
276async fn verify_cluster_ref(
277    client: &Client,
278    namespace: &str,
279    cluster_ref: &ClusterReference,
280) -> Result<()> {
281    use crate::crd::RivvenCluster;
282
283    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
284    let clusters: Api<RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
285
286    clusters
287        .get(&cluster_ref.name)
288        .await
289        .map_err(|e| OperatorError::ClusterNotFound(format!("{}: {}", cluster_ref.name, e)))?;
290
291    Ok(())
292}
293
294/// Build the ConfigMap for the schema registry
295fn build_registry_configmap(registry: &RivvenSchemaRegistry) -> Result<ConfigMap> {
296    let name = registry.name_any();
297    let namespace = registry
298        .namespace()
299        .unwrap_or_else(|| "default".to_string());
300    let resource_name = format!("rivven-schema-registry-{}", name);
301
302    let spec = &registry.spec;
303    let labels = spec.get_labels(&name);
304
305    // Build configuration YAML
306    let config = build_registry_config_yaml(spec)?;
307
308    Ok(ConfigMap {
309        metadata: ObjectMeta {
310            name: Some(format!("{}-config", resource_name)),
311            namespace: Some(namespace),
312            labels: Some(labels),
313            owner_references: Some(vec![owner_reference(registry)]),
314            ..Default::default()
315        },
316        data: Some(BTreeMap::from([("config.yaml".to_string(), config)])),
317        ..Default::default()
318    })
319}
320
321/// Build the registry configuration YAML
322fn build_registry_config_yaml(spec: &RivvenSchemaRegistrySpec) -> Result<String> {
323    let mut config = String::new();
324
325    // Server configuration
326    config.push_str(&format!(
327        "server:\n  port: {}\n  bind_address: \"{}\"\n  timeout_seconds: {}\n  max_request_size: {}\n",
328        spec.server.port,
329        spec.server.bind_address,
330        spec.server.timeout_seconds,
331        spec.server.max_request_size,
332    ));
333
334    // Storage configuration
335    config.push_str(&format!(
336        "\nstorage:\n  mode: \"{}\"\n  topic: \"{}\"\n  replication_factor: {}\n  partitions: {}\n  normalize: {}\n",
337        spec.storage.mode,
338        spec.storage.topic,
339        spec.storage.replication_factor,
340        spec.storage.partitions,
341        spec.storage.normalize,
342    ));
343
344    // Compatibility configuration
345    config.push_str(&format!(
346        "\ncompatibility:\n  default_level: \"{}\"\n  allow_overrides: {}\n",
347        spec.compatibility.default_level, spec.compatibility.allow_overrides,
348    ));
349
350    // Schema format configuration
351    config.push_str(&format!(
352        "\nschemas:\n  avro: {}\n  json_schema: {}\n  protobuf: {}\n  strict_validation: {}\n",
353        spec.schemas.avro,
354        spec.schemas.json_schema,
355        spec.schemas.protobuf,
356        spec.schemas.strict_validation,
357    ));
358
359    // Contexts configuration
360    if spec.contexts.enabled {
361        config.push_str(&format!(
362            "\ncontexts:\n  enabled: true\n  max_contexts: {}\n",
363            spec.contexts.max_contexts,
364        ));
365    }
366
367    // Metrics configuration
368    if spec.metrics.enabled {
369        config.push_str(&format!(
370            "\nmetrics:\n  enabled: true\n  port: {}\n  path: \"{}\"\n",
371            spec.metrics.port, spec.metrics.path,
372        ));
373    }
374
375    Ok(config)
376}
377
378/// Build the Deployment for the schema registry
379fn build_registry_deployment(registry: &RivvenSchemaRegistry) -> Result<Deployment> {
380    let name = registry.name_any();
381    let namespace = registry
382        .namespace()
383        .unwrap_or_else(|| "default".to_string());
384    let resource_name = format!("rivven-schema-registry-{}", name);
385
386    let spec = &registry.spec;
387    let labels = spec.get_labels(&name);
388    let selector_labels = spec.get_selector_labels(&name);
389
390    // Build container
391    let container = build_registry_container(spec, &resource_name);
392
393    // Build pod labels
394    let mut pod_labels = selector_labels.clone();
395    pod_labels.extend(spec.pod_labels.clone());
396
397    // Build pod annotations
398    let mut pod_annotations = BTreeMap::new();
399    if spec.metrics.enabled {
400        pod_annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
401        pod_annotations.insert(
402            "prometheus.io/port".to_string(),
403            spec.metrics.port.to_string(),
404        );
405        pod_annotations.insert("prometheus.io/path".to_string(), spec.metrics.path.clone());
406    }
407    pod_annotations.extend(spec.pod_annotations.clone());
408
409    let pod_spec = PodSpec {
410        containers: vec![container],
411        node_selector: if spec.node_selector.is_empty() {
412            None
413        } else {
414            Some(spec.node_selector.clone())
415        },
416        tolerations: if spec.tolerations.is_empty() {
417            None
418        } else {
419            Some(spec.tolerations.clone())
420        },
421        service_account_name: spec.service_account.clone(),
422        image_pull_secrets: if spec.image_pull_secrets.is_empty() {
423            None
424        } else {
425            Some(
426                spec.image_pull_secrets
427                    .iter()
428                    .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
429                    .collect(),
430            )
431        },
432        // Disable service account token auto-mounting for security
433        automount_service_account_token: Some(false),
434        ..Default::default()
435    };
436
437    Ok(Deployment {
438        metadata: ObjectMeta {
439            name: Some(resource_name.clone()),
440            namespace: Some(namespace),
441            labels: Some(labels.clone()),
442            owner_references: Some(vec![owner_reference(registry)]),
443            ..Default::default()
444        },
445        spec: Some(DeploymentSpec {
446            replicas: Some(spec.replicas),
447            selector: LabelSelector {
448                match_labels: Some(selector_labels),
449                ..Default::default()
450            },
451            template: PodTemplateSpec {
452                metadata: Some(ObjectMeta {
453                    labels: Some(pod_labels),
454                    annotations: Some(pod_annotations),
455                    ..Default::default()
456                }),
457                spec: Some(pod_spec),
458            },
459            ..Default::default()
460        }),
461        ..Default::default()
462    })
463}
464
465/// Build the container for the schema registry
466fn build_registry_container(spec: &RivvenSchemaRegistrySpec, _resource_name: &str) -> Container {
467    // Build environment variables
468    let mut env = vec![
469        EnvVar {
470            name: "RIVVEN_SCHEMA_PORT".to_string(),
471            value: Some(spec.server.port.to_string()),
472            ..Default::default()
473        },
474        EnvVar {
475            name: "RIVVEN_SCHEMA_BIND_ADDRESS".to_string(),
476            value: Some(spec.server.bind_address.clone()),
477            ..Default::default()
478        },
479        EnvVar {
480            name: "RIVVEN_SCHEMA_STORAGE_MODE".to_string(),
481            value: Some(spec.storage.mode.clone()),
482            ..Default::default()
483        },
484        EnvVar {
485            name: "RIVVEN_SCHEMA_STORAGE_TOPIC".to_string(),
486            value: Some(spec.storage.topic.clone()),
487            ..Default::default()
488        },
489        EnvVar {
490            name: "RIVVEN_SCHEMA_COMPATIBILITY".to_string(),
491            value: Some(spec.compatibility.default_level.clone()),
492            ..Default::default()
493        },
494        // Broker connection (from cluster reference)
495        EnvVar {
496            name: "RIVVEN_BROKERS".to_string(),
497            value: Some(format!(
498                "rivven-{}-headless.{}.svc.cluster.local:9092",
499                spec.cluster_ref.name,
500                spec.cluster_ref.namespace.as_deref().unwrap_or("default")
501            )),
502            ..Default::default()
503        },
504        // Pod info
505        EnvVar {
506            name: "POD_NAME".to_string(),
507            value_from: Some(EnvVarSource {
508                field_ref: Some(ObjectFieldSelector {
509                    field_path: "metadata.name".to_string(),
510                    ..Default::default()
511                }),
512                ..Default::default()
513            }),
514            ..Default::default()
515        },
516        EnvVar {
517            name: "POD_NAMESPACE".to_string(),
518            value_from: Some(EnvVarSource {
519                field_ref: Some(ObjectFieldSelector {
520                    field_path: "metadata.namespace".to_string(),
521                    ..Default::default()
522                }),
523                ..Default::default()
524            }),
525            ..Default::default()
526        },
527    ];
528
529    // Add custom env vars
530    for e in &spec.env {
531        if !env.iter().any(|existing| existing.name == e.name) {
532            env.push(e.clone());
533        }
534    }
535
536    // Build probes
537    let liveness_probe = if spec.liveness_probe.enabled {
538        Some(Probe {
539            http_get: Some(HTTPGetAction {
540                path: Some("/health/live".to_string()),
541                port: IntOrString::Int(spec.server.port),
542                ..Default::default()
543            }),
544            initial_delay_seconds: Some(spec.liveness_probe.initial_delay_seconds),
545            period_seconds: Some(spec.liveness_probe.period_seconds),
546            timeout_seconds: Some(spec.liveness_probe.timeout_seconds),
547            success_threshold: Some(spec.liveness_probe.success_threshold),
548            failure_threshold: Some(spec.liveness_probe.failure_threshold),
549            ..Default::default()
550        })
551    } else {
552        None
553    };
554
555    let readiness_probe = if spec.readiness_probe.enabled {
556        Some(Probe {
557            http_get: Some(HTTPGetAction {
558                path: Some("/health/ready".to_string()),
559                port: IntOrString::Int(spec.server.port),
560                ..Default::default()
561            }),
562            initial_delay_seconds: Some(spec.readiness_probe.initial_delay_seconds),
563            period_seconds: Some(spec.readiness_probe.period_seconds),
564            timeout_seconds: Some(spec.readiness_probe.timeout_seconds),
565            success_threshold: Some(spec.readiness_probe.success_threshold),
566            failure_threshold: Some(spec.readiness_probe.failure_threshold),
567            ..Default::default()
568        })
569    } else {
570        None
571    };
572
573    // Container ports
574    let mut ports = vec![ContainerPort {
575        name: Some("http".to_string()),
576        container_port: spec.server.port,
577        protocol: Some("TCP".to_string()),
578        ..Default::default()
579    }];
580
581    if spec.metrics.enabled && spec.metrics.port != spec.server.port {
582        ports.push(ContainerPort {
583            name: Some("metrics".to_string()),
584            container_port: spec.metrics.port,
585            protocol: Some("TCP".to_string()),
586            ..Default::default()
587        });
588    }
589
590    Container {
591        name: "rivven-schema-registry".to_string(),
592        image: Some(spec.get_image()),
593        image_pull_policy: Some(spec.image_pull_policy.clone()),
594        command: Some(vec!["rivven-schema".to_string()]),
595        args: Some(vec![
596            "serve".to_string(),
597            "--config".to_string(),
598            "/etc/rivven-schema/config.yaml".to_string(),
599        ]),
600        env: Some(env),
601        ports: Some(ports),
602        liveness_probe,
603        readiness_probe,
604        volume_mounts: Some(vec![VolumeMount {
605            name: "config".to_string(),
606            mount_path: "/etc/rivven-schema".to_string(),
607            read_only: Some(true),
608            ..Default::default()
609        }]),
610        // Secure defaults
611        security_context: Some(k8s_openapi::api::core::v1::SecurityContext {
612            run_as_non_root: Some(true),
613            run_as_user: Some(1000),
614            run_as_group: Some(1000),
615            read_only_root_filesystem: Some(true),
616            allow_privilege_escalation: Some(false),
617            capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
618                drop: Some(vec!["ALL".to_string()]),
619                ..Default::default()
620            }),
621            ..Default::default()
622        }),
623        ..Default::default()
624    }
625}
626
627/// Build the Service for the schema registry
628fn build_registry_service(registry: &RivvenSchemaRegistry) -> Result<Service> {
629    let name = registry.name_any();
630    let namespace = registry
631        .namespace()
632        .unwrap_or_else(|| "default".to_string());
633    let resource_name = format!("rivven-schema-registry-{}", name);
634
635    let spec = &registry.spec;
636    let labels = spec.get_labels(&name);
637    let selector_labels = spec.get_selector_labels(&name);
638
639    let mut ports = vec![ServicePort {
640        name: Some("http".to_string()),
641        port: spec.server.port,
642        target_port: Some(IntOrString::Int(spec.server.port)),
643        protocol: Some("TCP".to_string()),
644        ..Default::default()
645    }];
646
647    if spec.metrics.enabled && spec.metrics.port != spec.server.port {
648        ports.push(ServicePort {
649            name: Some("metrics".to_string()),
650            port: spec.metrics.port,
651            target_port: Some(IntOrString::Int(spec.metrics.port)),
652            protocol: Some("TCP".to_string()),
653            ..Default::default()
654        });
655    }
656
657    Ok(Service {
658        metadata: ObjectMeta {
659            name: Some(resource_name),
660            namespace: Some(namespace),
661            labels: Some(labels),
662            owner_references: Some(vec![owner_reference(registry)]),
663            ..Default::default()
664        },
665        spec: Some(ServiceSpec {
666            selector: Some(selector_labels),
667            ports: Some(ports),
668            type_: Some("ClusterIP".to_string()),
669            ..Default::default()
670        }),
671        ..Default::default()
672    })
673}
674
675/// Build the PodDisruptionBudget for the schema registry
676fn build_registry_pdb(registry: &RivvenSchemaRegistry) -> Result<PodDisruptionBudget> {
677    let name = registry.name_any();
678    let namespace = registry
679        .namespace()
680        .unwrap_or_else(|| "default".to_string());
681    let resource_name = format!("rivven-schema-registry-{}", name);
682
683    let spec = &registry.spec;
684    let labels = spec.get_labels(&name);
685    let selector_labels = spec.get_selector_labels(&name);
686
687    let pdb_spec = &spec.pod_disruption_budget;
688
689    Ok(PodDisruptionBudget {
690        metadata: ObjectMeta {
691            name: Some(resource_name),
692            namespace: Some(namespace),
693            labels: Some(labels),
694            owner_references: Some(vec![owner_reference(registry)]),
695            ..Default::default()
696        },
697        spec: Some(PodDisruptionBudgetSpec {
698            selector: Some(LabelSelector {
699                match_labels: Some(selector_labels),
700                ..Default::default()
701            }),
702            min_available: pdb_spec
703                .min_available
704                .as_ref()
705                .map(|v| IntOrString::String(v.clone())),
706            max_unavailable: pdb_spec
707                .max_unavailable
708                .as_ref()
709                .map(|v| IntOrString::String(v.clone())),
710            ..Default::default()
711        }),
712        ..Default::default()
713    })
714}
715
716/// Create owner reference for managed resources
717fn owner_reference(registry: &RivvenSchemaRegistry) -> OwnerReference {
718    OwnerReference {
719        api_version: "rivven.hupe1980.github.io/v1alpha1".to_string(),
720        kind: "RivvenSchemaRegistry".to_string(),
721        name: registry.name_any(),
722        uid: registry.metadata.uid.clone().unwrap_or_default(),
723        controller: Some(true),
724        block_owner_deletion: Some(true),
725    }
726}
727
728/// Verify the operator still owns a resource before force-applying.
729fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
730    let labels = existing.meta().labels.as_ref();
731    let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
732    match managed_by {
733        Some(manager) if manager != "rivven-operator" => {
734            let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
735            Err(OperatorError::InvalidConfig(format!(
736                "resource '{}' is managed by '{}', not rivven-operator; \
737                 refusing to force-apply to avoid ownership conflict",
738                name, manager
739            )))
740        }
741        _ => Ok(()),
742    }
743}
744
745/// Apply ConfigMap to cluster
746async fn apply_registry_configmap(
747    client: &Client,
748    namespace: &str,
749    configmap: ConfigMap,
750) -> Result<()> {
751    let configmaps: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
752    let name = configmap.metadata.name.clone().unwrap_or_default();
753
754    // Verify ownership before force-applying
755    if let Ok(existing) = configmaps.get(&name).await {
756        verify_ownership(&existing)?;
757    }
758
759    let patch = Patch::Apply(&configmap);
760    let params = PatchParams::apply("rivven-operator").force();
761
762    configmaps
763        .patch(&name, &params, &patch)
764        .await
765        .map_err(|e| OperatorError::ReconcileFailed(format!("ConfigMap: {}", e)))?;
766
767    debug!(name = %name, "Applied schema registry ConfigMap");
768    Ok(())
769}
770
771/// Apply Deployment to cluster
772async fn apply_registry_deployment(
773    client: &Client,
774    namespace: &str,
775    deployment: Deployment,
776) -> Result<(i32, i32)> {
777    let deployments: Api<Deployment> = Api::namespaced(client.clone(), namespace);
778    let name = deployment.metadata.name.clone().unwrap_or_default();
779
780    // Verify ownership before force-applying
781    if let Ok(existing) = deployments.get(&name).await {
782        verify_ownership(&existing)?;
783    }
784
785    let patch = Patch::Apply(&deployment);
786    let params = PatchParams::apply("rivven-operator").force();
787
788    let result = deployments
789        .patch(&name, &params, &patch)
790        .await
791        .map_err(|e| OperatorError::ReconcileFailed(format!("Deployment: {}", e)))?;
792
793    let status = result.status.as_ref();
794    let replicas = status.and_then(|s| s.replicas).unwrap_or(0);
795    let ready_replicas = status.and_then(|s| s.ready_replicas).unwrap_or(0);
796
797    debug!(name = %name, replicas = replicas, ready = ready_replicas, "Applied schema registry Deployment");
798    Ok((replicas, ready_replicas))
799}
800
801/// Apply Service to cluster
802async fn apply_registry_service(client: &Client, namespace: &str, service: Service) -> Result<()> {
803    let services: Api<Service> = Api::namespaced(client.clone(), namespace);
804    let name = service.metadata.name.clone().unwrap_or_default();
805
806    // Verify ownership before force-applying
807    if let Ok(existing) = services.get(&name).await {
808        verify_ownership(&existing)?;
809    }
810
811    let patch = Patch::Apply(&service);
812    let params = PatchParams::apply("rivven-operator").force();
813
814    services
815        .patch(&name, &params, &patch)
816        .await
817        .map_err(|e| OperatorError::ReconcileFailed(format!("Service: {}", e)))?;
818
819    debug!(name = %name, "Applied schema registry Service");
820    Ok(())
821}
822
823/// Apply PodDisruptionBudget to cluster
824async fn apply_registry_pdb(
825    client: &Client,
826    namespace: &str,
827    pdb: PodDisruptionBudget,
828) -> Result<()> {
829    let pdbs: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
830    let name = pdb.metadata.name.clone().unwrap_or_default();
831
832    // Verify ownership before force-applying
833    if let Ok(existing) = pdbs.get(&name).await {
834        verify_ownership(&existing)?;
835    }
836
837    let patch = Patch::Apply(&pdb);
838    let params = PatchParams::apply("rivven-operator").force();
839
840    pdbs.patch(&name, &params, &patch)
841        .await
842        .map_err(|e| OperatorError::ReconcileFailed(format!("PodDisruptionBudget: {}", e)))?;
843
844    debug!(name = %name, "Applied schema registry PodDisruptionBudget");
845    Ok(())
846}
847
848/// Update the schema registry status
849async fn update_schema_registry_status(
850    client: &Client,
851    namespace: &str,
852    name: &str,
853    status: RivvenSchemaRegistryStatus,
854) -> Result<()> {
855    let registries: Api<RivvenSchemaRegistry> = Api::namespaced(client.clone(), namespace);
856
857    let patch = serde_json::json!({
858        "status": status
859    });
860
861    registries
862        .patch_status(
863            name,
864            &PatchParams::apply("rivven-operator"),
865            &Patch::Merge(&patch),
866        )
867        .await
868        .map_err(|e| OperatorError::ReconcileFailed(format!("Status update: {}", e)))?;
869
870    debug!(name = %name, phase = ?status.phase, "Updated schema registry status");
871    Ok(())
872}
873
874/// Build status for a failed registry
875fn build_failed_status(registry: &RivvenSchemaRegistry, error: &str) -> RivvenSchemaRegistryStatus {
876    RivvenSchemaRegistryStatus {
877        phase: SchemaRegistryPhase::Failed,
878        replicas: 0,
879        ready_replicas: 0,
880        schemas_registered: 0,
881        subjects_count: 0,
882        contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
883        observed_generation: registry.metadata.generation.unwrap_or(0),
884        conditions: vec![SchemaRegistryCondition {
885            condition_type: "Ready".to_string(),
886            status: "False".to_string(),
887            reason: Some("ValidationFailed".to_string()),
888            message: Some(error.to_string()),
889            last_transition_time: Some(Utc::now().to_rfc3339()),
890        }],
891        endpoints: vec![],
892        storage_status: None,
893        external_sync_status: None,
894        last_sync_time: None,
895        last_updated: Some(Utc::now().to_rfc3339()),
896        message: Some(error.to_string()),
897    }
898}
899
900/// Build status for a running registry
901fn build_running_status(
902    registry: &RivvenSchemaRegistry,
903    deployment_status: &(i32, i32),
904) -> RivvenSchemaRegistryStatus {
905    let (replicas, ready_replicas) = *deployment_status;
906    let name = registry.name_any();
907    let namespace = registry
908        .namespace()
909        .unwrap_or_else(|| "default".to_string());
910
911    let phase = if ready_replicas == replicas && replicas > 0 {
912        SchemaRegistryPhase::Running
913    } else if ready_replicas > 0 {
914        SchemaRegistryPhase::Degraded
915    } else {
916        SchemaRegistryPhase::Provisioning
917    };
918
919    let endpoint = format!(
920        "http://rivven-schema-registry-{}.{}.svc.cluster.local:{}",
921        name, namespace, registry.spec.server.port
922    );
923
924    RivvenSchemaRegistryStatus {
925        phase,
926        replicas,
927        ready_replicas,
928        schemas_registered: 0, // Updated by registry itself
929        subjects_count: 0,     // Updated by registry itself
930        contexts_count: if registry.spec.contexts.enabled { 1 } else { 0 },
931        observed_generation: registry.metadata.generation.unwrap_or(0),
932        conditions: vec![
933            SchemaRegistryCondition {
934                condition_type: "Ready".to_string(),
935                status: if ready_replicas == replicas && replicas > 0 {
936                    "True"
937                } else {
938                    "False"
939                }
940                .to_string(),
941                reason: Some(
942                    if ready_replicas == replicas && replicas > 0 {
943                        "AllReplicasReady"
944                    } else {
945                        "ReplicasNotReady"
946                    }
947                    .to_string(),
948                ),
949                message: Some(format!("{}/{} replicas ready", ready_replicas, replicas)),
950                last_transition_time: Some(Utc::now().to_rfc3339()),
951            },
952            SchemaRegistryCondition {
953                condition_type: "StorageHealthy".to_string(),
954                status: "Unknown".to_string(),
955                reason: Some("Pending".to_string()),
956                message: Some("Storage health check pending".to_string()),
957                last_transition_time: Some(Utc::now().to_rfc3339()),
958            },
959        ],
960        endpoints: vec![endpoint],
961        storage_status: Some(registry.spec.storage.mode.clone()),
962        external_sync_status: if registry.spec.external.enabled {
963            Some("Pending".to_string())
964        } else {
965            None
966        },
967        last_sync_time: None,
968        last_updated: Some(Utc::now().to_rfc3339()),
969        message: None,
970    }
971}
972
973#[cfg(test)]
974mod tests {
975    use super::*;
976    use crate::crd::{
977        ClusterReference, ExternalRegistrySpec, PdbSpec, ProbeSpec, SchemaCompatibilitySpec,
978        SchemaContextsSpec, SchemaFormatSpec, SchemaRegistryAuthSpec, SchemaRegistryMetricsSpec,
979        SchemaRegistryServerSpec, SchemaRegistryStorageSpec, SchemaRegistryTlsSpec,
980        SchemaValidationSpec,
981    };
982
983    fn create_test_registry() -> RivvenSchemaRegistry {
984        RivvenSchemaRegistry {
985            metadata: ObjectMeta {
986                name: Some("test-registry".to_string()),
987                namespace: Some("default".to_string()),
988                uid: Some("test-uid".to_string()),
989                generation: Some(1),
990                ..Default::default()
991            },
992            spec: RivvenSchemaRegistrySpec {
993                cluster_ref: ClusterReference {
994                    name: "test-cluster".to_string(),
995                    namespace: None,
996                },
997                replicas: 3,
998                version: "0.0.1".to_string(),
999                image: None,
1000                image_pull_policy: "IfNotPresent".to_string(),
1001                image_pull_secrets: vec![],
1002                resources: None,
1003                server: SchemaRegistryServerSpec::default(),
1004                storage: SchemaRegistryStorageSpec::default(),
1005                compatibility: SchemaCompatibilitySpec::default(),
1006                schemas: SchemaFormatSpec::default(),
1007                contexts: SchemaContextsSpec::default(),
1008                validation: SchemaValidationSpec::default(),
1009                auth: SchemaRegistryAuthSpec::default(),
1010                tls: SchemaRegistryTlsSpec::default(),
1011                metrics: SchemaRegistryMetricsSpec::default(),
1012                external: ExternalRegistrySpec::default(),
1013                pod_annotations: BTreeMap::new(),
1014                pod_labels: BTreeMap::new(),
1015                env: vec![],
1016                node_selector: BTreeMap::new(),
1017                tolerations: vec![],
1018                affinity: None,
1019                service_account: None,
1020                security_context: None,
1021                container_security_context: None,
1022                liveness_probe: ProbeSpec::default(),
1023                readiness_probe: ProbeSpec::default(),
1024                pod_disruption_budget: PdbSpec::default(),
1025            },
1026            status: None,
1027        }
1028    }
1029
1030    #[test]
1031    fn test_build_registry_configmap() {
1032        let registry = create_test_registry();
1033        let configmap = build_registry_configmap(&registry).unwrap();
1034
1035        assert_eq!(
1036            configmap.metadata.name,
1037            Some("rivven-schema-registry-test-registry-config".to_string())
1038        );
1039        assert!(configmap.data.is_some());
1040        let data = configmap.data.unwrap();
1041        assert!(data.contains_key("config.yaml"));
1042    }
1043
1044    #[test]
1045    fn test_build_registry_deployment() {
1046        let registry = create_test_registry();
1047        let deployment = build_registry_deployment(&registry).unwrap();
1048
1049        assert_eq!(
1050            deployment.metadata.name,
1051            Some("rivven-schema-registry-test-registry".to_string())
1052        );
1053        assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(3));
1054    }
1055
1056    #[test]
1057    fn test_build_registry_service() {
1058        let registry = create_test_registry();
1059        let service = build_registry_service(&registry).unwrap();
1060
1061        assert_eq!(
1062            service.metadata.name,
1063            Some("rivven-schema-registry-test-registry".to_string())
1064        );
1065
1066        let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1067        assert_eq!(ports.len(), 2); // http + metrics
1068        assert_eq!(ports[0].port, 8081);
1069    }
1070
1071    #[test]
1072    fn test_build_registry_pdb() {
1073        let registry = create_test_registry();
1074        let pdb = build_registry_pdb(&registry).unwrap();
1075
1076        assert_eq!(
1077            pdb.metadata.name,
1078            Some("rivven-schema-registry-test-registry".to_string())
1079        );
1080    }
1081
1082    #[test]
1083    fn test_build_failed_status() {
1084        let registry = create_test_registry();
1085        let status = build_failed_status(&registry, "Test error");
1086
1087        assert_eq!(status.phase, SchemaRegistryPhase::Failed);
1088        assert_eq!(status.message, Some("Test error".to_string()));
1089        assert_eq!(status.conditions.len(), 1);
1090        assert_eq!(status.conditions[0].status, "False");
1091    }
1092
1093    #[test]
1094    fn test_build_running_status() {
1095        let registry = create_test_registry();
1096        let status = build_running_status(&registry, &(3, 3));
1097
1098        assert_eq!(status.phase, SchemaRegistryPhase::Running);
1099        assert_eq!(status.replicas, 3);
1100        assert_eq!(status.ready_replicas, 3);
1101        assert!(!status.endpoints.is_empty());
1102    }
1103
1104    #[test]
1105    fn test_build_running_status_degraded() {
1106        let registry = create_test_registry();
1107        let status = build_running_status(&registry, &(3, 2));
1108
1109        assert_eq!(status.phase, SchemaRegistryPhase::Degraded);
1110        assert_eq!(status.replicas, 3);
1111        assert_eq!(status.ready_replicas, 2);
1112    }
1113
1114    #[test]
1115    fn test_build_running_status_provisioning() {
1116        let registry = create_test_registry();
1117        let status = build_running_status(&registry, &(3, 0));
1118
1119        assert_eq!(status.phase, SchemaRegistryPhase::Provisioning);
1120        assert_eq!(status.ready_replicas, 0);
1121    }
1122
1123    #[test]
1124    fn test_spec_get_image_default() {
1125        let registry = create_test_registry();
1126        assert_eq!(
1127            registry.spec.get_image(),
1128            "ghcr.io/hupe1980/rivven-schema:0.0.1"
1129        );
1130    }
1131
1132    #[test]
1133    fn test_spec_get_image_custom() {
1134        let mut registry = create_test_registry();
1135        registry.spec.image = Some("custom/registry:latest".to_string());
1136        assert_eq!(registry.spec.get_image(), "custom/registry:latest");
1137    }
1138
1139    #[test]
1140    fn test_spec_get_labels() {
1141        let registry = create_test_registry();
1142        let labels = registry.spec.get_labels("test-registry");
1143
1144        assert_eq!(
1145            labels.get("app.kubernetes.io/name"),
1146            Some(&"rivven-schema-registry".to_string())
1147        );
1148        assert_eq!(
1149            labels.get("app.kubernetes.io/instance"),
1150            Some(&"test-registry".to_string())
1151        );
1152        assert_eq!(
1153            labels.get("app.kubernetes.io/component"),
1154            Some(&"schema-registry".to_string())
1155        );
1156    }
1157
1158    #[test]
1159    fn test_spec_get_selector_labels() {
1160        let registry = create_test_registry();
1161        let labels = registry.spec.get_selector_labels("test-registry");
1162
1163        assert_eq!(labels.len(), 2);
1164        assert_eq!(
1165            labels.get("app.kubernetes.io/name"),
1166            Some(&"rivven-schema-registry".to_string())
1167        );
1168        assert_eq!(
1169            labels.get("app.kubernetes.io/instance"),
1170            Some(&"test-registry".to_string())
1171        );
1172    }
1173
1174    #[test]
1175    fn test_schema_registry_phase_default() {
1176        let phase = SchemaRegistryPhase::default();
1177        assert_eq!(phase, SchemaRegistryPhase::Pending);
1178    }
1179
1180    #[test]
1181    fn test_server_spec_default() {
1182        let spec = SchemaRegistryServerSpec::default();
1183        assert_eq!(spec.port, 8081);
1184        assert_eq!(spec.bind_address, "0.0.0.0");
1185        assert_eq!(spec.timeout_seconds, 30);
1186    }
1187
1188    #[test]
1189    fn test_storage_spec_default() {
1190        let spec = SchemaRegistryStorageSpec::default();
1191        assert_eq!(spec.mode, "broker");
1192        assert_eq!(spec.topic, "_schemas");
1193        assert_eq!(spec.replication_factor, 3);
1194    }
1195
1196    #[test]
1197    fn test_compatibility_spec_default() {
1198        let spec = SchemaCompatibilitySpec::default();
1199        assert_eq!(spec.default_level, "BACKWARD");
1200        assert!(spec.allow_overrides);
1201    }
1202
1203    #[test]
1204    fn test_schema_format_spec_default() {
1205        let spec = SchemaFormatSpec::default();
1206        assert!(spec.avro);
1207        assert!(spec.json_schema);
1208        assert!(spec.protobuf);
1209    }
1210
1211    #[test]
1212    fn test_metrics_spec_default() {
1213        let spec = SchemaRegistryMetricsSpec::default();
1214        assert!(spec.enabled);
1215        assert_eq!(spec.port, 9090);
1216        assert_eq!(spec.path, "/metrics");
1217    }
1218
1219    #[test]
1220    fn test_build_registry_config_yaml() {
1221        let registry = create_test_registry();
1222        let config = build_registry_config_yaml(&registry.spec).unwrap();
1223
1224        assert!(config.contains("port: 8081"));
1225        assert!(config.contains("mode: \"broker\""));
1226        assert!(config.contains("default_level: \"BACKWARD\""));
1227        assert!(config.contains("avro: true"));
1228    }
1229
1230    #[test]
1231    fn test_controller_metrics_new() {
1232        let _metrics = SchemaRegistryControllerMetrics::new();
1233        // Test passes if it doesn't panic during construction
1234    }
1235}