Skip to main content

rivven_operator/
crd.rs

1//! Custom Resource Definitions for Rivven Kubernetes Operator
2//!
3//! This module defines the `RivvenCluster` CRD that represents a Rivven
4//! distributed streaming cluster in Kubernetes.
5
6use k8s_openapi::api::core::v1::ResourceRequirements;
7use kube::CustomResource;
8use regex::Regex;
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11use std::collections::BTreeMap;
12use std::sync::LazyLock;
13use validator::{Validate, ValidationError};
14
15/// Regex for validating Kubernetes resource quantities (e.g., "10Gi", "100Mi")
16static QUANTITY_REGEX: LazyLock<Regex> =
17    LazyLock::new(|| Regex::new(r"^[0-9]+(\.[0-9]+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$").unwrap());
18
19/// Regex for validating Kubernetes names (RFC 1123 subdomain)
20static NAME_REGEX: LazyLock<Regex> =
21    LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
22
23/// Validate a Kubernetes resource quantity string
24fn validate_quantity(value: &str) -> Result<(), ValidationError> {
25    if QUANTITY_REGEX.is_match(value) {
26        Ok(())
27    } else {
28        Err(ValidationError::new("invalid_quantity")
29            .with_message(format!("'{}' is not a valid Kubernetes quantity", value).into()))
30    }
31}
32
33/// Validate a container image reference
34fn validate_image(value: &str) -> Result<(), ValidationError> {
35    if value.is_empty() {
36        return Ok(()); // Empty is allowed (uses default)
37    }
38    if value.len() > 255 {
39        return Err(ValidationError::new("image_too_long")
40            .with_message("image reference exceeds 255 characters".into()));
41    }
42    // Basic format check - not overly strict to allow various registries
43    if value.contains("..") || value.starts_with('/') || value.starts_with('-') {
44        return Err(ValidationError::new("invalid_image")
45            .with_message(format!("'{}' is not a valid container image", value).into()));
46    }
47    Ok(())
48}
49
50/// Validate a Kubernetes name (RFC 1123 subdomain)
51fn validate_k8s_name(value: &str) -> Result<(), ValidationError> {
52    if value.is_empty() {
53        return Ok(()); // Empty is allowed for optional fields
54    }
55    if value.len() > 63 {
56        return Err(
57            ValidationError::new("name_too_long").with_message("name exceeds 63 characters".into())
58        );
59    }
60    if !NAME_REGEX.is_match(value) {
61        return Err(ValidationError::new("invalid_name").with_message(
62            format!("'{}' is not a valid Kubernetes name (RFC 1123)", value).into(),
63        ));
64    }
65    Ok(())
66}
67
68/// Validate environment variable name (POSIX)
69fn validate_env_vars(vars: &[k8s_openapi::api::core::v1::EnvVar]) -> Result<(), ValidationError> {
70    // Limit number of env vars to prevent resource exhaustion
71    const MAX_ENV_VARS: usize = 100;
72    if vars.len() > MAX_ENV_VARS {
73        return Err(ValidationError::new("too_many_env_vars").with_message(
74            format!("maximum {} environment variables allowed", MAX_ENV_VARS).into(),
75        ));
76    }
77    for var in vars {
78        // Validate env var name format
79        if var.name.is_empty() || var.name.len() > 256 {
80            return Err(ValidationError::new("invalid_env_name")
81                .with_message("environment variable name must be 1-256 characters".into()));
82        }
83        // Check for dangerous env var names that could override security settings
84        let forbidden_prefixes = ["LD_", "DYLD_", "PATH=", "HOME=", "USER="];
85        for prefix in forbidden_prefixes {
86            if var.name.starts_with(prefix) && var.value.is_some() {
87                return Err(ValidationError::new("forbidden_env_var").with_message(
88                    format!(
89                        "environment variable '{}' is not allowed for security",
90                        var.name
91                    )
92                    .into(),
93                ));
94            }
95        }
96    }
97    Ok(())
98}
99
100/// RivvenCluster custom resource definition
101///
102/// Represents a Rivven distributed event streaming cluster deployment.
103/// The operator watches these resources and reconciles the actual cluster
104/// state to match the desired specification.
105#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
106#[kube(
107    group = "rivven.hupe1980.github.io",
108    version = "v1alpha1",
109    kind = "RivvenCluster",
110    plural = "rivvenclusters",
111    shortname = "rc",
112    namespaced,
113    status = "RivvenClusterStatus",
114    printcolumn = r#"{"name":"Replicas", "type":"integer", "jsonPath":".spec.replicas"}"#,
115    printcolumn = r#"{"name":"Ready", "type":"integer", "jsonPath":".status.readyReplicas"}"#,
116    printcolumn = r#"{"name":"Phase", "type":"string", "jsonPath":".status.phase"}"#,
117    printcolumn = r#"{"name":"Age", "type":"date", "jsonPath":".metadata.creationTimestamp"}"#
118)]
119#[serde(rename_all = "camelCase")]
120pub struct RivvenClusterSpec {
121    /// Number of broker replicas (1-100)
122    #[serde(default = "default_replicas")]
123    #[validate(range(min = 1, max = 100, message = "replicas must be between 1 and 100"))]
124    pub replicas: i32,
125
126    /// Rivven version to deploy (must match semver pattern)
127    #[serde(default = "default_version")]
128    #[validate(length(min = 1, max = 64, message = "version must be 1-64 characters"))]
129    pub version: String,
130
131    /// Container image (overrides version-based default)
132    /// Must be a valid container image reference
133    #[serde(default)]
134    #[validate(custom(function = "validate_optional_image"))]
135    pub image: Option<String>,
136
137    /// Image pull policy (Always, IfNotPresent, Never)
138    #[serde(default = "default_image_pull_policy")]
139    #[validate(custom(function = "validate_pull_policy"))]
140    pub image_pull_policy: String,
141
142    /// Image pull secrets (max 10 secrets)
143    #[serde(default)]
144    #[validate(length(max = 10, message = "maximum 10 image pull secrets allowed"))]
145    pub image_pull_secrets: Vec<String>,
146
147    /// Storage configuration
148    #[serde(default)]
149    #[validate(nested)]
150    pub storage: StorageSpec,
151
152    /// Resource requirements (CPU, memory)
153    #[serde(default)]
154    #[schemars(skip)]
155    pub resources: Option<ResourceRequirements>,
156
157    /// Broker configuration parameters
158    #[serde(default)]
159    #[validate(nested)]
160    pub config: BrokerConfig,
161
162    /// TLS configuration
163    #[serde(default)]
164    #[validate(nested)]
165    pub tls: TlsSpec,
166
167    /// Metrics configuration
168    #[serde(default)]
169    #[validate(nested)]
170    pub metrics: MetricsSpec,
171
172    /// Pod affinity/anti-affinity rules
173    #[serde(default)]
174    #[schemars(skip)]
175    pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
176
177    /// Node selector for pod scheduling (max 20 selectors)
178    #[serde(default)]
179    #[validate(custom(function = "validate_node_selector"))]
180    pub node_selector: BTreeMap<String, String>,
181
182    /// Tolerations for pod scheduling
183    #[serde(default)]
184    #[schemars(skip)]
185    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
186
187    /// Pod disruption budget configuration
188    #[serde(default)]
189    #[validate(nested)]
190    pub pod_disruption_budget: PdbSpec,
191
192    /// Service account name (must be valid K8s name)
193    #[serde(default)]
194    #[validate(custom(function = "validate_optional_k8s_name"))]
195    pub service_account: Option<String>,
196
197    /// Additional pod annotations (max 50)
198    #[serde(default)]
199    #[validate(custom(function = "validate_annotations"))]
200    pub pod_annotations: BTreeMap<String, String>,
201
202    /// Additional pod labels (max 20)
203    #[serde(default)]
204    #[validate(custom(function = "validate_labels"))]
205    pub pod_labels: BTreeMap<String, String>,
206
207    /// Environment variables (validated for security)
208    #[serde(default)]
209    #[schemars(skip)]
210    #[validate(custom(function = "validate_env_vars"))]
211    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
212
213    /// Liveness probe configuration
214    #[serde(default)]
215    #[validate(nested)]
216    pub liveness_probe: ProbeSpec,
217
218    /// Readiness probe configuration
219    #[serde(default)]
220    #[validate(nested)]
221    pub readiness_probe: ProbeSpec,
222
223    /// Security context for pods
224    #[serde(default)]
225    #[schemars(skip)]
226    pub security_context: Option<k8s_openapi::api::core::v1::PodSecurityContext>,
227
228    /// Container security context
229    #[serde(default)]
230    #[schemars(skip)]
231    pub container_security_context: Option<k8s_openapi::api::core::v1::SecurityContext>,
232}
233
234/// Validate optional image reference
235fn validate_optional_image(image: &str) -> Result<(), ValidationError> {
236    validate_image(image)
237}
238
239/// Validate image pull policy
240fn validate_pull_policy(policy: &str) -> Result<(), ValidationError> {
241    match policy {
242        "Always" | "IfNotPresent" | "Never" => Ok(()),
243        _ => Err(ValidationError::new("invalid_pull_policy")
244            .with_message("imagePullPolicy must be Always, IfNotPresent, or Never".into())),
245    }
246}
247
248/// Validate node selector map
249fn validate_node_selector(selectors: &BTreeMap<String, String>) -> Result<(), ValidationError> {
250    if selectors.len() > 20 {
251        return Err(ValidationError::new("too_many_selectors")
252            .with_message("maximum 20 node selectors allowed".into()));
253    }
254    for (key, value) in selectors {
255        if key.len() > 253 || value.len() > 63 {
256            return Err(ValidationError::new("selector_too_long")
257                .with_message("selector key max 253 chars, value max 63 chars".into()));
258        }
259    }
260    Ok(())
261}
262
263/// Validate optional Kubernetes name (for use with Option<String> fields)
264fn validate_optional_k8s_name(name: &str) -> Result<(), ValidationError> {
265    if name.is_empty() {
266        return Ok(()); // Empty is allowed for optional fields
267    }
268    validate_k8s_name(name)
269}
270
271/// Validate annotations map
272fn validate_annotations(annotations: &BTreeMap<String, String>) -> Result<(), ValidationError> {
273    if annotations.len() > 50 {
274        return Err(ValidationError::new("too_many_annotations")
275            .with_message("maximum 50 annotations allowed".into()));
276    }
277    for (key, value) in annotations {
278        // Annotation keys can be up to 253 chars (with optional prefix)
279        if key.len() > 253 {
280            return Err(ValidationError::new("annotation_key_too_long")
281                .with_message(format!("annotation key '{}' exceeds 253 characters", key).into()));
282        }
283        // Annotation values can be up to 256KB
284        if value.len() > 262144 {
285            return Err(ValidationError::new("annotation_value_too_long")
286                .with_message(format!("annotation '{}' value exceeds 256KB", key).into()));
287        }
288    }
289    Ok(())
290}
291
292/// Validate labels map
293fn validate_labels(labels: &BTreeMap<String, String>) -> Result<(), ValidationError> {
294    if labels.len() > 20 {
295        return Err(ValidationError::new("too_many_labels")
296            .with_message("maximum 20 labels allowed".into()));
297    }
298    for (key, value) in labels {
299        if key.len() > 253 || value.len() > 63 {
300            return Err(ValidationError::new("label_too_long")
301                .with_message("label key max 253 chars, value max 63 chars".into()));
302        }
303        // Labels must not override managed labels
304        if key.starts_with("app.kubernetes.io/") {
305            return Err(ValidationError::new("reserved_label").with_message(
306                format!("label '{}' uses reserved prefix app.kubernetes.io/", key).into(),
307            ));
308        }
309    }
310    Ok(())
311}
312
313/// Storage specification for broker data
314#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
315#[serde(rename_all = "camelCase")]
316pub struct StorageSpec {
317    /// Storage size (e.g., "100Gi") - must be valid Kubernetes quantity
318    #[serde(default = "default_storage_size")]
319    #[validate(custom(function = "validate_quantity"))]
320    pub size: String,
321
322    /// Storage class name (empty uses default)
323    #[serde(default)]
324    #[validate(custom(function = "validate_optional_k8s_name"))]
325    pub storage_class_name: Option<String>,
326
327    /// Access modes for the PVC
328    #[serde(default = "default_access_modes")]
329    #[validate(length(min = 1, max = 3, message = "access modes must have 1-3 entries"))]
330    #[validate(custom(function = "validate_access_modes"))]
331    pub access_modes: Vec<String>,
332}
333
334/// Validate PVC access modes
335fn validate_access_modes(modes: &[String]) -> Result<(), ValidationError> {
336    let valid_modes = [
337        "ReadWriteOnce",
338        "ReadOnlyMany",
339        "ReadWriteMany",
340        "ReadWriteOncePod",
341    ];
342    for mode in modes {
343        if !valid_modes.contains(&mode.as_str()) {
344            return Err(ValidationError::new("invalid_access_mode")
345                .with_message(format!("'{}' is not a valid access mode", mode).into()));
346        }
347    }
348    Ok(())
349}
350
351impl Default for StorageSpec {
352    fn default() -> Self {
353        Self {
354            size: default_storage_size(),
355            storage_class_name: None,
356            access_modes: default_access_modes(),
357        }
358    }
359}
360
361/// Broker configuration parameters
362#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
363#[serde(rename_all = "camelCase")]
364pub struct BrokerConfig {
365    /// Default number of partitions for new topics (1-1000)
366    #[serde(default = "default_partitions")]
367    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
368    pub default_partitions: i32,
369
370    /// Default replication factor for new topics (1-10)
371    #[serde(default = "default_replication_factor")]
372    #[validate(range(
373        min = 1,
374        max = 10,
375        message = "replication factor must be between 1 and 10"
376    ))]
377    pub default_replication_factor: i32,
378
379    /// Log retention period in hours (1-8760, i.e., 1 hour to 1 year)
380    #[serde(default = "default_log_retention_hours")]
381    #[validate(range(
382        min = 1,
383        max = 8760,
384        message = "retention hours must be between 1 and 8760"
385    ))]
386    pub log_retention_hours: i32,
387
388    /// Log segment size in bytes (1MB to 10GB)
389    #[serde(default = "default_log_segment_bytes")]
390    #[validate(custom(function = "validate_segment_size"))]
391    pub log_segment_bytes: i64,
392
393    /// Maximum message size in bytes (1KB to 100MB)
394    #[serde(default = "default_max_message_bytes")]
395    #[validate(custom(function = "validate_message_size"))]
396    pub max_message_bytes: i64,
397
398    /// Enable auto topic creation
399    #[serde(default = "default_true")]
400    pub auto_create_topics: bool,
401
402    /// Enable compression
403    #[serde(default = "default_true")]
404    pub compression_enabled: bool,
405
406    /// Compression algorithm (lz4, zstd, none)
407    #[serde(default = "default_compression")]
408    #[validate(custom(function = "validate_compression_type"))]
409    pub compression_type: String,
410
411    /// Raft election timeout in milliseconds (100-60000)
412    #[serde(default = "default_election_timeout")]
413    #[validate(range(
414        min = 100,
415        max = 60000,
416        message = "election timeout must be between 100ms and 60s"
417    ))]
418    pub raft_election_timeout_ms: i32,
419
420    /// Raft heartbeat interval in milliseconds (10-10000)
421    #[serde(default = "default_heartbeat_interval")]
422    #[validate(range(
423        min = 10,
424        max = 10000,
425        message = "heartbeat interval must be between 10ms and 10s"
426    ))]
427    pub raft_heartbeat_interval_ms: i32,
428
429    /// Additional raw configuration overrides (max 50 entries)
430    #[serde(default)]
431    #[validate(custom(function = "validate_raw_config"))]
432    pub raw: BTreeMap<String, String>,
433}
434
435/// Validate compression type
436fn validate_compression_type(compression: &str) -> Result<(), ValidationError> {
437    match compression {
438        "lz4" | "zstd" | "none" | "snappy" | "gzip" => Ok(()),
439        _ => Err(ValidationError::new("invalid_compression")
440            .with_message("compression must be one of: lz4, zstd, none, snappy, gzip".into())),
441    }
442}
443
444/// Validate log segment size (1MB to 10GB)
445fn validate_segment_size(size: i64) -> Result<(), ValidationError> {
446    const MIN_SEGMENT_SIZE: i64 = 1_048_576; // 1MB
447    const MAX_SEGMENT_SIZE: i64 = 10_737_418_240; // 10GB
448    if !(MIN_SEGMENT_SIZE..=MAX_SEGMENT_SIZE).contains(&size) {
449        return Err(ValidationError::new("invalid_segment_size")
450            .with_message("segment size must be between 1MB and 10GB".into()));
451    }
452    Ok(())
453}
454
455/// Validate max message size (1KB to 100MB)
456fn validate_message_size(size: i64) -> Result<(), ValidationError> {
457    const MIN_MESSAGE_SIZE: i64 = 1_024; // 1KB
458    const MAX_MESSAGE_SIZE: i64 = 104_857_600; // 100MB
459    if !(MIN_MESSAGE_SIZE..=MAX_MESSAGE_SIZE).contains(&size) {
460        return Err(ValidationError::new("invalid_message_size")
461            .with_message("max message size must be between 1KB and 100MB".into()));
462    }
463    Ok(())
464}
465
466/// Validate raw config map
467fn validate_raw_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
468    if config.len() > 50 {
469        return Err(ValidationError::new("too_many_raw_configs")
470            .with_message("maximum 50 raw configuration entries allowed".into()));
471    }
472    for (key, value) in config {
473        if key.len() > 128 || value.len() > 4096 {
474            return Err(ValidationError::new("raw_config_too_long")
475                .with_message("raw config key max 128 chars, value max 4096 chars".into()));
476        }
477        // Prevent injection of dangerous config keys
478        let forbidden_keys = ["command", "args", "image", "securityContext", "volumes"];
479        if forbidden_keys.contains(&key.as_str()) {
480            return Err(ValidationError::new("forbidden_raw_config")
481                .with_message(format!("raw config key '{}' is not allowed", key).into()));
482        }
483    }
484    Ok(())
485}
486
487impl Default for BrokerConfig {
488    fn default() -> Self {
489        Self {
490            default_partitions: default_partitions(),
491            default_replication_factor: default_replication_factor(),
492            log_retention_hours: default_log_retention_hours(),
493            log_segment_bytes: default_log_segment_bytes(),
494            max_message_bytes: default_max_message_bytes(),
495            auto_create_topics: default_true(),
496            compression_enabled: default_true(),
497            compression_type: default_compression(),
498            raft_election_timeout_ms: default_election_timeout(),
499            raft_heartbeat_interval_ms: default_heartbeat_interval(),
500            raw: BTreeMap::new(),
501        }
502    }
503}
504
505/// TLS configuration
506#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
507#[serde(rename_all = "camelCase")]
508pub struct TlsSpec {
509    /// Enable TLS
510    #[serde(default)]
511    pub enabled: bool,
512
513    /// Name of the secret containing TLS certificates (required if enabled)
514    #[serde(default)]
515    #[validate(custom(function = "validate_optional_k8s_name"))]
516    pub cert_secret_name: Option<String>,
517
518    /// Enable mTLS (mutual TLS)
519    #[serde(default)]
520    pub mtls_enabled: bool,
521
522    /// Name of the secret containing CA certificate for mTLS
523    #[serde(default)]
524    #[validate(custom(function = "validate_optional_k8s_name"))]
525    pub ca_secret_name: Option<String>,
526}
527
528/// Metrics configuration
529#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
530#[serde(rename_all = "camelCase")]
531pub struct MetricsSpec {
532    /// Enable Prometheus metrics
533    #[serde(default = "default_true")]
534    pub enabled: bool,
535
536    /// Metrics port (1024-65535, must be unprivileged)
537    #[serde(default = "default_metrics_port")]
538    #[validate(range(
539        min = 1024,
540        max = 65535,
541        message = "metrics port must be between 1024 and 65535"
542    ))]
543    pub port: i32,
544
545    /// ServiceMonitor configuration for Prometheus Operator
546    #[serde(default)]
547    #[validate(nested)]
548    pub service_monitor: ServiceMonitorSpec,
549}
550
551impl Default for MetricsSpec {
552    fn default() -> Self {
553        Self {
554            enabled: true,
555            port: 9090,
556            service_monitor: ServiceMonitorSpec::default(),
557        }
558    }
559}
560
561/// ServiceMonitor configuration for Prometheus Operator integration
562#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
563#[serde(rename_all = "camelCase")]
564pub struct ServiceMonitorSpec {
565    /// Create a ServiceMonitor resource
566    #[serde(default)]
567    pub enabled: bool,
568
569    /// Namespace for the ServiceMonitor (defaults to cluster namespace)
570    #[serde(default)]
571    #[validate(custom(function = "validate_optional_k8s_name"))]
572    pub namespace: Option<String>,
573
574    /// Scrape interval (must match Prometheus duration format)
575    #[serde(default = "default_scrape_interval")]
576    #[validate(custom(function = "validate_duration"))]
577    pub interval: String,
578
579    /// Additional labels for the ServiceMonitor (max 10)
580    #[serde(default)]
581    #[validate(custom(function = "validate_service_monitor_labels"))]
582    pub labels: BTreeMap<String, String>,
583}
584
585/// Validate Prometheus duration format (e.g., "30s", "1m", "5m30s")
586fn validate_duration(duration: &str) -> Result<(), ValidationError> {
587    static DURATION_REGEX: LazyLock<Regex> =
588        LazyLock::new(|| Regex::new(r"^([0-9]+[smh])+$").unwrap());
589    if !DURATION_REGEX.is_match(duration) {
590        return Err(ValidationError::new("invalid_duration").with_message(
591            format!("'{}' is not a valid duration (e.g., 30s, 1m)", duration).into(),
592        ));
593    }
594    Ok(())
595}
596
597/// Validate ServiceMonitor labels
598fn validate_service_monitor_labels(
599    labels: &BTreeMap<String, String>,
600) -> Result<(), ValidationError> {
601    if labels.len() > 10 {
602        return Err(ValidationError::new("too_many_labels")
603            .with_message("maximum 10 ServiceMonitor labels allowed".into()));
604    }
605    Ok(())
606}
607
608/// Pod Disruption Budget configuration
609#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
610#[serde(rename_all = "camelCase")]
611pub struct PdbSpec {
612    /// Enable PDB creation
613    #[serde(default = "default_true")]
614    pub enabled: bool,
615
616    /// Minimum available pods (mutually exclusive with maxUnavailable)
617    /// Can be an integer or percentage (e.g., "50%")
618    #[serde(default)]
619    #[validate(custom(function = "validate_optional_int_or_percent"))]
620    pub min_available: Option<String>,
621
622    /// Maximum unavailable pods
623    /// Can be an integer or percentage (e.g., "25%")
624    #[serde(default = "default_max_unavailable")]
625    #[validate(custom(function = "validate_optional_int_or_percent"))]
626    pub max_unavailable: Option<String>,
627}
628
629/// Validate integer or percentage string (for Option<String> fields)
630fn validate_optional_int_or_percent(value: &str) -> Result<(), ValidationError> {
631    if value.is_empty() {
632        return Ok(());
633    }
634    // Allow integers or percentages
635    static INT_OR_PERCENT_REGEX: LazyLock<Regex> =
636        LazyLock::new(|| Regex::new(r"^([0-9]+|[0-9]+%)$").unwrap());
637    if !INT_OR_PERCENT_REGEX.is_match(value) {
638        return Err(ValidationError::new("invalid_int_or_percent").with_message(
639            format!(
640                "'{}' must be an integer or percentage (e.g., '1' or '25%')",
641                value
642            )
643            .into(),
644        ));
645    }
646    Ok(())
647}
648
649impl Default for PdbSpec {
650    fn default() -> Self {
651        Self {
652            enabled: true,
653            min_available: None,
654            max_unavailable: Some("1".to_string()),
655        }
656    }
657}
658
659/// Probe configuration for liveness/readiness
660#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
661#[serde(rename_all = "camelCase")]
662pub struct ProbeSpec {
663    /// Enable the probe
664    #[serde(default = "default_true")]
665    pub enabled: bool,
666
667    /// Initial delay before first probe (0-3600 seconds)
668    #[serde(default = "default_initial_delay")]
669    #[validate(range(min = 0, max = 3600, message = "initial delay must be 0-3600 seconds"))]
670    pub initial_delay_seconds: i32,
671
672    /// Period between probes (1-300 seconds)
673    #[serde(default = "default_period")]
674    #[validate(range(min = 1, max = 300, message = "period must be 1-300 seconds"))]
675    pub period_seconds: i32,
676
677    /// Timeout for probe (1-60 seconds)
678    #[serde(default = "default_timeout")]
679    #[validate(range(min = 1, max = 60, message = "timeout must be 1-60 seconds"))]
680    pub timeout_seconds: i32,
681
682    /// Success threshold (1-10)
683    #[serde(default = "default_one")]
684    #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
685    pub success_threshold: i32,
686
687    /// Failure threshold (1-30)
688    #[serde(default = "default_three")]
689    #[validate(range(min = 1, max = 30, message = "failure threshold must be 1-30"))]
690    pub failure_threshold: i32,
691}
692
693impl Default for ProbeSpec {
694    fn default() -> Self {
695        Self {
696            enabled: true,
697            initial_delay_seconds: 30,
698            period_seconds: 10,
699            timeout_seconds: 5,
700            success_threshold: 1,
701            failure_threshold: 3,
702        }
703    }
704}
705
706/// Status of a RivvenCluster resource
707#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
708#[serde(rename_all = "camelCase")]
709pub struct RivvenClusterStatus {
710    /// Current phase of the cluster
711    pub phase: ClusterPhase,
712
713    /// Total number of replicas
714    pub replicas: i32,
715
716    /// Number of ready replicas
717    pub ready_replicas: i32,
718
719    /// Number of updated replicas
720    pub updated_replicas: i32,
721
722    /// Current observed generation
723    pub observed_generation: i64,
724
725    /// Conditions describing cluster state
726    #[serde(default)]
727    pub conditions: Vec<ClusterCondition>,
728
729    /// Broker endpoints
730    #[serde(default)]
731    pub broker_endpoints: Vec<String>,
732
733    /// Current leader broker (if known)
734    pub leader: Option<String>,
735
736    /// Last time the status was updated
737    pub last_updated: Option<String>,
738
739    /// Error message if any
740    pub message: Option<String>,
741}
742
743/// Phase of the cluster lifecycle
744#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
745pub enum ClusterPhase {
746    /// Cluster is being created
747    #[default]
748    Pending,
749    /// Cluster is being provisioned
750    Provisioning,
751    /// Cluster is running and healthy
752    Running,
753    /// Cluster is updating/rolling
754    Updating,
755    /// Cluster is in degraded state
756    Degraded,
757    /// Cluster has failed
758    Failed,
759    /// Cluster is being deleted
760    Terminating,
761}
762
763/// Condition describing an aspect of cluster state
764#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
765#[serde(rename_all = "camelCase")]
766pub struct ClusterCondition {
767    /// Type of condition
768    #[serde(rename = "type")]
769    pub condition_type: String,
770
771    /// Status of the condition (True, False, Unknown)
772    pub status: String,
773
774    /// Reason for the condition
775    pub reason: Option<String>,
776
777    /// Human-readable message
778    pub message: Option<String>,
779
780    /// Last transition time
781    pub last_transition_time: Option<String>,
782}
783
784// Default value functions
785fn default_replicas() -> i32 {
786    3
787}
788
789fn default_version() -> String {
790    "0.0.1".to_string()
791}
792
793fn default_image_pull_policy() -> String {
794    "IfNotPresent".to_string()
795}
796
797fn default_storage_size() -> String {
798    "10Gi".to_string()
799}
800
801fn default_access_modes() -> Vec<String> {
802    vec!["ReadWriteOnce".to_string()]
803}
804
805fn default_partitions() -> i32 {
806    3
807}
808
809fn default_replication_factor() -> i32 {
810    2
811}
812
813fn default_log_retention_hours() -> i32 {
814    168 // 7 days
815}
816
817fn default_log_segment_bytes() -> i64 {
818    1073741824 // 1 GiB
819}
820
821fn default_max_message_bytes() -> i64 {
822    1048576 // 1 MiB
823}
824
825fn default_compression() -> String {
826    "lz4".to_string()
827}
828
829fn default_election_timeout() -> i32 {
830    1000
831}
832
833fn default_heartbeat_interval() -> i32 {
834    100
835}
836
837fn default_metrics_port() -> i32 {
838    9090
839}
840
841fn default_scrape_interval() -> String {
842    "30s".to_string()
843}
844
845fn default_max_unavailable() -> Option<String> {
846    Some("1".to_string())
847}
848
849fn default_initial_delay() -> i32 {
850    30
851}
852
853fn default_period() -> i32 {
854    10
855}
856
857fn default_timeout() -> i32 {
858    5
859}
860
861fn default_one() -> i32 {
862    1
863}
864
865fn default_three() -> i32 {
866    3
867}
868
869fn default_true() -> bool {
870    true
871}
872
873impl RivvenClusterSpec {
874    /// Get the full container image including version
875    pub fn get_image(&self) -> String {
876        if let Some(ref image) = self.image {
877            image.clone()
878        } else {
879            format!("ghcr.io/hupe1980/rivven:{}", self.version)
880        }
881    }
882
883    /// Get labels for managed resources
884    pub fn get_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
885        let mut labels = BTreeMap::new();
886        labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
887        labels.insert(
888            "app.kubernetes.io/instance".to_string(),
889            cluster_name.to_string(),
890        );
891        labels.insert(
892            "app.kubernetes.io/component".to_string(),
893            "broker".to_string(),
894        );
895        labels.insert(
896            "app.kubernetes.io/managed-by".to_string(),
897            "rivven-operator".to_string(),
898        );
899        labels.insert(
900            "app.kubernetes.io/version".to_string(),
901            self.version.clone(),
902        );
903        labels
904    }
905
906    /// Get selector labels for managed resources
907    pub fn get_selector_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
908        let mut labels = BTreeMap::new();
909        labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
910        labels.insert(
911            "app.kubernetes.io/instance".to_string(),
912            cluster_name.to_string(),
913        );
914        labels
915    }
916}
917
918// ============================================================================
919// RivvenConnect CRD - Connector Framework for Rivven
920// ============================================================================
921// Note: The RivvenConnect CRD is defined here but the controller is not yet
922// implemented. The #[allow(dead_code)] attributes are temporary until the
923// connect_controller module is added.
924
925/// RivvenConnect custom resource for managing connectors
926///
927/// This CRD allows declarative management of Rivven Connect pipelines,
928/// including source connectors (CDC, HTTP, etc.) and sink connectors
929/// (S3, stdout, HTTP webhooks, etc.).
930#[allow(dead_code)]
931#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
932#[kube(
933    group = "rivven.hupe1980.github.io",
934    version = "v1alpha1",
935    kind = "RivvenConnect",
936    plural = "rivvenconnects",
937    shortname = "rc",
938    namespaced,
939    status = "RivvenConnectStatus",
940    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
941    printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
942    printcolumn = r#"{"name":"Sources","type":"integer","jsonPath":".status.sourcesRunning"}"#,
943    printcolumn = r#"{"name":"Sinks","type":"integer","jsonPath":".status.sinksRunning"}"#,
944    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
945    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
946)]
947#[serde(rename_all = "camelCase")]
948pub struct RivvenConnectSpec {
949    /// Reference to the RivvenCluster this connect instance connects to
950    #[validate(nested)]
951    pub cluster_ref: ClusterReference,
952
953    /// Number of connect worker replicas (1-10)
954    #[serde(default = "default_connect_replicas")]
955    #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
956    pub replicas: i32,
957
958    /// Connect image version
959    #[serde(default = "default_version")]
960    pub version: String,
961
962    /// Custom container image (overrides version-based default)
963    #[serde(default)]
964    #[validate(custom(function = "validate_optional_image"))]
965    pub image: Option<String>,
966
967    /// Image pull policy
968    #[serde(default = "default_image_pull_policy")]
969    #[validate(custom(function = "validate_pull_policy"))]
970    pub image_pull_policy: String,
971
972    /// Image pull secrets
973    #[serde(default)]
974    pub image_pull_secrets: Vec<String>,
975
976    /// Resource requests/limits (following k8s ResourceRequirements schema)
977    #[serde(default)]
978    pub resources: Option<serde_json::Value>,
979
980    /// Global connect configuration
981    #[serde(default)]
982    #[validate(nested)]
983    pub config: ConnectConfigSpec,
984
985    /// Source connectors (read from external systems, publish to Rivven)
986    #[serde(default)]
987    #[validate(length(max = 50, message = "maximum 50 source connectors allowed"))]
988    pub sources: Vec<SourceConnectorSpec>,
989
990    /// Sink connectors (consume from Rivven, write to external systems)
991    #[serde(default)]
992    #[validate(length(max = 50, message = "maximum 50 sink connectors allowed"))]
993    pub sinks: Vec<SinkConnectorSpec>,
994
995    /// Global settings for all connectors
996    #[serde(default)]
997    #[validate(nested)]
998    pub settings: GlobalConnectSettings,
999
1000    /// TLS configuration for broker connection
1001    #[serde(default)]
1002    #[validate(nested)]
1003    pub tls: ConnectTlsSpec,
1004
1005    /// Pod annotations
1006    #[serde(default)]
1007    #[validate(custom(function = "validate_annotations"))]
1008    pub pod_annotations: BTreeMap<String, String>,
1009
1010    /// Pod labels (cannot override app.kubernetes.io/* labels)
1011    #[serde(default)]
1012    #[validate(custom(function = "validate_labels"))]
1013    pub pod_labels: BTreeMap<String, String>,
1014
1015    /// Environment variables for the container
1016    #[serde(default)]
1017    #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
1018    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
1019
1020    /// Node selector for pod scheduling
1021    #[serde(default)]
1022    pub node_selector: BTreeMap<String, String>,
1023
1024    /// Pod tolerations
1025    #[serde(default)]
1026    #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
1027    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
1028
1029    /// Pod affinity rules
1030    #[serde(default)]
1031    pub affinity: Option<serde_json::Value>,
1032
1033    /// Service account name
1034    #[serde(default)]
1035    #[validate(custom(function = "validate_optional_k8s_name"))]
1036    pub service_account: Option<String>,
1037
1038    /// Pod security context
1039    #[serde(default)]
1040    pub security_context: Option<serde_json::Value>,
1041
1042    /// Container security context
1043    #[serde(default)]
1044    pub container_security_context: Option<serde_json::Value>,
1045}
1046
1047/// Reference to a RivvenCluster
1048#[allow(dead_code)]
1049#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1050#[serde(rename_all = "camelCase")]
1051pub struct ClusterReference {
1052    /// Name of the RivvenCluster
1053    #[validate(length(min = 1, max = 63, message = "cluster name must be 1-63 characters"))]
1054    #[validate(custom(function = "validate_k8s_name"))]
1055    pub name: String,
1056
1057    /// Namespace of the RivvenCluster (defaults to same namespace)
1058    #[serde(default)]
1059    #[validate(custom(function = "validate_optional_k8s_name"))]
1060    pub namespace: Option<String>,
1061}
1062
1063// ============================================================================
1064// RivvenTopic CRD - Declarative Topic Management
1065// ============================================================================
1066
1067/// RivvenTopic custom resource for declarative topic management
1068///
1069/// This CRD allows users to define topics as Kubernetes resources,
1070/// enabling GitOps workflows for topic lifecycle management.
1071///
1072/// # Example
1073///
1074/// ```yaml
1075/// apiVersion: rivven.hupe1980.github.io/v1alpha1
1076/// kind: RivvenTopic
1077/// metadata:
1078///   name: orders-events
1079///   namespace: production
1080/// spec:
1081///   clusterRef:
1082///     name: my-rivven-cluster
1083///   partitions: 12
1084///   replicationFactor: 3
1085///   config:
1086///     retentionMs: 604800000   # 7 days
1087///     cleanupPolicy: delete
1088///     compressionType: lz4
1089///   acls:
1090///     - principal: "user:order-service"
1091///       operations: ["Read", "Write"]
1092///     - principal: "user:analytics"
1093///       operations: ["Read"]
1094/// ```
1095// Note: These types are defined for CRD generation and future controller use.
1096// They are intentionally not yet constructed - the controller will use them.
1097#[allow(dead_code)]
1098#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1099#[kube(
1100    group = "rivven.hupe1980.github.io",
1101    version = "v1alpha1",
1102    kind = "RivvenTopic",
1103    plural = "rivventopics",
1104    shortname = "rt",
1105    namespaced,
1106    status = "RivvenTopicStatus",
1107    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1108    printcolumn = r#"{"name":"Partitions","type":"integer","jsonPath":".spec.partitions"}"#,
1109    printcolumn = r#"{"name":"Replication","type":"integer","jsonPath":".spec.replicationFactor"}"#,
1110    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1111    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1112)]
1113#[serde(rename_all = "camelCase")]
1114pub struct RivvenTopicSpec {
1115    /// Reference to the RivvenCluster this topic belongs to
1116    #[validate(nested)]
1117    pub cluster_ref: ClusterReference,
1118
1119    /// Number of partitions for the topic (1-10000)
1120    /// Cannot be decreased after creation
1121    #[serde(default = "default_rivven_topic_partitions")]
1122    #[validate(range(
1123        min = 1,
1124        max = 10000,
1125        message = "partitions must be between 1 and 10000"
1126    ))]
1127    pub partitions: i32,
1128
1129    /// Replication factor for the topic (1-10)
1130    /// Must not exceed the number of brokers in the cluster
1131    #[serde(default = "default_rivven_topic_replication")]
1132    #[validate(range(
1133        min = 1,
1134        max = 10,
1135        message = "replication factor must be between 1 and 10"
1136    ))]
1137    pub replication_factor: i32,
1138
1139    /// Topic configuration parameters
1140    #[serde(default)]
1141    #[validate(nested)]
1142    pub config: TopicConfig,
1143
1144    /// Access control list entries for the topic
1145    #[serde(default)]
1146    #[validate(length(max = 100, message = "maximum 100 ACL entries allowed"))]
1147    #[validate(custom(function = "validate_topic_acls"))]
1148    pub acls: Vec<TopicAcl>,
1149
1150    /// Whether to delete the topic from Rivven when the CRD is deleted
1151    /// Default: true (topic is deleted when CRD is removed)
1152    #[serde(default = "default_true")]
1153    pub delete_on_remove: bool,
1154
1155    /// Labels to apply to the topic metadata
1156    #[serde(default)]
1157    #[validate(custom(function = "validate_labels"))]
1158    pub topic_labels: BTreeMap<String, String>,
1159}
1160
1161#[allow(dead_code)]
1162fn default_rivven_topic_partitions() -> i32 {
1163    3
1164}
1165
1166#[allow(dead_code)]
1167fn default_rivven_topic_replication() -> i32 {
1168    1
1169}
1170
1171/// Topic configuration parameters
1172#[allow(dead_code)]
1173#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1174#[serde(rename_all = "camelCase")]
1175pub struct TopicConfig {
1176    /// Retention time in milliseconds (1 hour to 10 years)
1177    /// Default: 604800000 (7 days)
1178    #[serde(default = "default_topic_retention_ms")]
1179    #[validate(range(
1180        min = 3600000,
1181        max = 315360000000_i64,
1182        message = "retention must be between 1 hour and 10 years"
1183    ))]
1184    pub retention_ms: i64,
1185
1186    /// Retention size in bytes per partition (-1 for unlimited)
1187    /// Default: -1 (unlimited)
1188    #[serde(default = "default_topic_retention_bytes")]
1189    #[validate(custom(function = "validate_topic_retention_bytes"))]
1190    pub retention_bytes: i64,
1191
1192    /// Segment size in bytes (1MB to 10GB)
1193    #[serde(default = "default_topic_segment_bytes")]
1194    #[validate(custom(function = "validate_segment_size"))]
1195    pub segment_bytes: i64,
1196
1197    /// Cleanup policy: "delete", "compact", or "delete,compact"
1198    #[serde(default = "default_topic_cleanup_policy")]
1199    #[validate(custom(function = "validate_topic_cleanup_policy"))]
1200    pub cleanup_policy: String,
1201
1202    /// Compression type: "none", "gzip", "snappy", "lz4", "zstd"
1203    #[serde(default = "default_topic_compression")]
1204    #[validate(custom(function = "validate_topic_compression"))]
1205    pub compression_type: String,
1206
1207    /// Minimum number of in-sync replicas for writes (1-10)
1208    #[serde(default = "default_topic_min_isr")]
1209    #[validate(range(min = 1, max = 10, message = "min ISR must be between 1 and 10"))]
1210    pub min_insync_replicas: i32,
1211
1212    /// Maximum message size in bytes (1KB to 100MB)
1213    #[serde(default = "default_max_message_bytes")]
1214    #[validate(custom(function = "validate_message_size"))]
1215    pub max_message_bytes: i64,
1216
1217    /// Whether to enable message timestamps
1218    #[serde(default = "default_true")]
1219    pub message_timestamp_enabled: bool,
1220
1221    /// Timestamp type: "CreateTime" or "LogAppendTime"
1222    #[serde(default = "default_topic_timestamp_type")]
1223    #[validate(custom(function = "validate_topic_timestamp_type"))]
1224    pub message_timestamp_type: String,
1225
1226    /// Whether to enable idempotent writes
1227    #[serde(default = "default_true")]
1228    pub idempotent_writes: bool,
1229
1230    /// Flush interval in milliseconds (0 for no scheduled flush)
1231    #[serde(default)]
1232    #[validate(range(min = 0, max = 86400000, message = "flush interval must be 0-24 hours"))]
1233    pub flush_interval_ms: i64,
1234
1235    /// Custom configuration key-value pairs
1236    #[serde(default)]
1237    #[validate(custom(function = "validate_topic_custom_config"))]
1238    pub custom: BTreeMap<String, String>,
1239}
1240
1241#[allow(dead_code)]
1242fn default_topic_retention_ms() -> i64 {
1243    604800000 // 7 days
1244}
1245
1246#[allow(dead_code)]
1247fn default_topic_retention_bytes() -> i64 {
1248    -1 // unlimited
1249}
1250
1251#[allow(dead_code)]
1252fn default_topic_segment_bytes() -> i64 {
1253    1073741824 // 1GB
1254}
1255
1256#[allow(dead_code)]
1257fn default_topic_cleanup_policy() -> String {
1258    "delete".to_string()
1259}
1260
1261#[allow(dead_code)]
1262fn default_topic_compression() -> String {
1263    "lz4".to_string()
1264}
1265
1266#[allow(dead_code)]
1267fn default_topic_min_isr() -> i32 {
1268    1
1269}
1270
1271#[allow(dead_code)]
1272fn default_topic_timestamp_type() -> String {
1273    "CreateTime".to_string()
1274}
1275
1276#[allow(dead_code)]
1277fn validate_topic_retention_bytes(value: i64) -> Result<(), ValidationError> {
1278    if value == -1 || (1048576..=10995116277760).contains(&value) {
1279        Ok(())
1280    } else {
1281        Err(ValidationError::new("invalid_retention_bytes")
1282            .with_message("retention_bytes must be -1 (unlimited) or 1MB-10TB".into()))
1283    }
1284}
1285
1286#[allow(dead_code)]
1287fn validate_topic_cleanup_policy(policy: &str) -> Result<(), ValidationError> {
1288    match policy {
1289        "delete" | "compact" | "delete,compact" | "compact,delete" => Ok(()),
1290        _ => Err(ValidationError::new("invalid_cleanup_policy").with_message(
1291            "cleanup_policy must be 'delete', 'compact', or 'delete,compact'".into(),
1292        )),
1293    }
1294}
1295
1296#[allow(dead_code)]
1297fn validate_topic_compression(compression: &str) -> Result<(), ValidationError> {
1298    match compression {
1299        "none" | "gzip" | "snappy" | "lz4" | "zstd" | "producer" => Ok(()),
1300        _ => Err(ValidationError::new("invalid_compression")
1301            .with_message("compression must be none, gzip, snappy, lz4, zstd, or producer".into())),
1302    }
1303}
1304
1305#[allow(dead_code)]
1306fn validate_topic_timestamp_type(ts_type: &str) -> Result<(), ValidationError> {
1307    match ts_type {
1308        "CreateTime" | "LogAppendTime" => Ok(()),
1309        _ => Err(ValidationError::new("invalid_timestamp_type")
1310            .with_message("timestamp type must be 'CreateTime' or 'LogAppendTime'".into())),
1311    }
1312}
1313
1314#[allow(dead_code)]
1315fn validate_topic_custom_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
1316    if config.len() > 50 {
1317        return Err(ValidationError::new("too_many_custom_configs")
1318            .with_message("maximum 50 custom config entries allowed".into()));
1319    }
1320    for (key, value) in config {
1321        if key.len() > 128 || value.len() > 4096 {
1322            return Err(ValidationError::new("config_too_long")
1323                .with_message("config key max 128 chars, value max 4096 chars".into()));
1324        }
1325        // Prevent overriding protected configs
1326        let protected = [
1327            "retention.ms",
1328            "retention.bytes",
1329            "segment.bytes",
1330            "cleanup.policy",
1331        ];
1332        if protected.contains(&key.as_str()) {
1333            return Err(ValidationError::new("protected_config").with_message(
1334                format!(
1335                    "'{}' must be set via dedicated field, not custom config",
1336                    key
1337                )
1338                .into(),
1339            ));
1340        }
1341    }
1342    Ok(())
1343}
1344
1345/// Topic ACL entry
1346#[allow(dead_code)]
1347#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1348#[serde(rename_all = "camelCase")]
1349pub struct TopicAcl {
1350    /// Principal (e.g., "user:myuser", "group:mygroup", "*")
1351    #[validate(length(min = 1, max = 256, message = "principal must be 1-256 characters"))]
1352    #[validate(custom(function = "validate_principal"))]
1353    pub principal: String,
1354
1355    /// Allowed operations: Read, Write, Create, Delete, Alter, Describe, All
1356    #[validate(length(min = 1, max = 7, message = "must specify 1-7 operations"))]
1357    #[validate(custom(function = "validate_operations"))]
1358    pub operations: Vec<String>,
1359
1360    /// Permission type: Allow or Deny (default: Allow)
1361    #[serde(default = "default_permission_type")]
1362    #[validate(custom(function = "validate_permission_type"))]
1363    pub permission_type: String,
1364
1365    /// Host restriction (default: "*" for any host)
1366    #[serde(default = "default_acl_host")]
1367    #[validate(length(max = 256, message = "host must be max 256 characters"))]
1368    pub host: String,
1369}
1370
1371#[allow(dead_code)]
1372fn default_permission_type() -> String {
1373    "Allow".to_string()
1374}
1375
1376#[allow(dead_code)]
1377fn default_acl_host() -> String {
1378    "*".to_string()
1379}
1380
1381#[allow(dead_code)]
1382fn validate_principal(principal: &str) -> Result<(), ValidationError> {
1383    if principal == "*" {
1384        return Ok(());
1385    }
1386    if let Some((prefix, name)) = principal.split_once(':') {
1387        if !["user", "group", "User", "Group"].contains(&prefix) {
1388            return Err(ValidationError::new("invalid_principal_prefix")
1389                .with_message("principal prefix must be 'user:' or 'group:'".into()));
1390        }
1391        if name.is_empty() || name.len() > 128 {
1392            return Err(ValidationError::new("invalid_principal_name")
1393                .with_message("principal name must be 1-128 characters".into()));
1394        }
1395        Ok(())
1396    } else {
1397        Err(ValidationError::new("invalid_principal_format")
1398            .with_message("principal must be '*' or 'user:name' or 'group:name'".into()))
1399    }
1400}
1401
1402fn validate_operations(ops: &[String]) -> Result<(), ValidationError> {
1403    let valid_ops = [
1404        "Read",
1405        "Write",
1406        "Create",
1407        "Delete",
1408        "Alter",
1409        "Describe",
1410        "All",
1411        "DescribeConfigs",
1412        "AlterConfigs",
1413    ];
1414    for op in ops {
1415        if !valid_ops.contains(&op.as_str()) {
1416            return Err(ValidationError::new("invalid_operation").with_message(
1417                format!("'{}' is not a valid operation. Valid: {:?}", op, valid_ops).into(),
1418            ));
1419        }
1420    }
1421    Ok(())
1422}
1423
1424#[allow(dead_code)]
1425fn validate_permission_type(perm: &str) -> Result<(), ValidationError> {
1426    match perm {
1427        "Allow" | "Deny" => Ok(()),
1428        _ => Err(ValidationError::new("invalid_permission_type")
1429            .with_message("permission_type must be 'Allow' or 'Deny'".into())),
1430    }
1431}
1432
1433#[allow(dead_code)]
1434fn validate_topic_acls(acls: &[TopicAcl]) -> Result<(), ValidationError> {
1435    // Check for duplicate principal+operation combinations
1436    let mut seen = std::collections::HashSet::new();
1437    for acl in acls {
1438        for op in &acl.operations {
1439            let key = format!("{}:{}", acl.principal, op);
1440            if !seen.insert(key.clone()) {
1441                return Err(ValidationError::new("duplicate_acl")
1442                    .with_message(format!("duplicate ACL entry for {}", key).into()));
1443            }
1444        }
1445    }
1446    Ok(())
1447}
1448
1449/// Status of the RivvenTopic resource
1450#[allow(dead_code)]
1451#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1452#[serde(rename_all = "camelCase")]
1453pub struct RivvenTopicStatus {
1454    /// Current phase: Pending, Creating, Ready, Updating, Deleting, Failed
1455    #[serde(default)]
1456    pub phase: String,
1457
1458    /// Human-readable message about current state
1459    #[serde(default)]
1460    pub message: String,
1461
1462    /// Actual number of partitions (may differ during scaling)
1463    #[serde(default)]
1464    pub current_partitions: i32,
1465
1466    /// Actual replication factor
1467    #[serde(default)]
1468    pub current_replication_factor: i32,
1469
1470    /// Whether the topic exists in Rivven
1471    #[serde(default)]
1472    pub topic_exists: bool,
1473
1474    /// Observed generation for tracking spec changes
1475    #[serde(default)]
1476    pub observed_generation: i64,
1477
1478    /// Conditions for detailed status tracking
1479    #[serde(default)]
1480    pub conditions: Vec<TopicCondition>,
1481
1482    /// Last time the topic was successfully synced
1483    #[serde(default)]
1484    pub last_sync_time: Option<String>,
1485
1486    /// Partition leader information
1487    #[serde(default)]
1488    pub partition_info: Vec<PartitionInfo>,
1489}
1490
1491/// Condition for tracking topic status
1492#[allow(dead_code)]
1493#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1494#[serde(rename_all = "camelCase")]
1495pub struct TopicCondition {
1496    /// Type of condition: Ready, Synced, ACLsApplied, ConfigApplied
1497    pub r#type: String,
1498
1499    /// Status: True, False, Unknown
1500    pub status: String,
1501
1502    /// Machine-readable reason
1503    pub reason: String,
1504
1505    /// Human-readable message
1506    pub message: String,
1507
1508    /// Last transition time
1509    pub last_transition_time: String,
1510}
1511
1512/// Information about a topic partition
1513#[allow(dead_code)]
1514#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1515#[serde(rename_all = "camelCase")]
1516pub struct PartitionInfo {
1517    /// Partition ID
1518    pub partition: i32,
1519
1520    /// Leader broker ID
1521    pub leader: i32,
1522
1523    /// Replica broker IDs
1524    pub replicas: Vec<i32>,
1525
1526    /// In-sync replica broker IDs
1527    pub isr: Vec<i32>,
1528}
1529
1530/// Global connect configuration
1531#[allow(dead_code)]
1532#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1533#[serde(rename_all = "camelCase")]
1534pub struct ConnectConfigSpec {
1535    /// State directory path inside container
1536    #[serde(default = "default_state_dir")]
1537    pub state_dir: String,
1538
1539    /// Log level (trace, debug, info, warn, error)
1540    #[serde(default = "default_log_level")]
1541    #[validate(custom(function = "validate_log_level"))]
1542    pub log_level: String,
1543}
1544
1545#[allow(dead_code)]
1546fn default_state_dir() -> String {
1547    "/data/connect-state".to_string()
1548}
1549
1550#[allow(dead_code)]
1551fn default_log_level() -> String {
1552    "info".to_string()
1553}
1554
1555#[allow(dead_code)]
1556fn validate_log_level(level: &str) -> Result<(), ValidationError> {
1557    match level {
1558        "trace" | "debug" | "info" | "warn" | "error" => Ok(()),
1559        _ => Err(ValidationError::new("invalid_log_level")
1560            .with_message("log level must be one of: trace, debug, info, warn, error".into())),
1561    }
1562}
1563
1564/// Source connector specification (Kafka Connect style - generic config)
1565///
1566/// All connector-specific configuration goes in the generic `config` field.
1567/// Connector validation happens at runtime, not at CRD schema level.
1568/// This design scales to 300+ connectors without CRD changes.
1569#[allow(dead_code)]
1570#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1571#[serde(rename_all = "camelCase")]
1572pub struct SourceConnectorSpec {
1573    /// Unique name for this source connector
1574    #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1575    #[validate(custom(function = "validate_k8s_name"))]
1576    pub name: String,
1577
1578    /// Connector type (postgres-cdc, mysql-cdc, http, datagen, kafka, mqtt, etc.)
1579    #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1580    #[validate(custom(function = "validate_connector_type"))]
1581    pub connector: String,
1582
1583    /// Target topic to publish events to (fallback/default)
1584    #[validate(length(min = 1, max = 255, message = "topic must be 1-255 characters"))]
1585    pub topic: String,
1586
1587    /// Topic routing pattern for CDC connectors
1588    /// Enables dynamic topic selection based on CDC event metadata.
1589    /// Supported placeholders: {database}, {schema}, {table}
1590    /// Example: "cdc.{schema}.{table}" → "cdc.public.users"
1591    #[serde(default)]
1592    pub topic_routing: Option<String>,
1593
1594    /// Whether this source is enabled
1595    #[serde(default = "default_true")]
1596    pub enabled: bool,
1597
1598    /// Connector-specific configuration (Kafka Connect style)
1599    /// All connector parameters go here. Validated at runtime by the controller.
1600    /// Example for postgres-cdc: {"host": "...", "slot": "...", "tables": [...]}
1601    #[serde(default)]
1602    pub config: serde_json::Value,
1603
1604    /// Secret reference for sensitive configuration (passwords, keys, tokens)
1605    /// The referenced Secret's data will be merged into config at runtime.
1606    #[serde(default)]
1607    #[validate(custom(function = "validate_optional_k8s_name"))]
1608    pub config_secret_ref: Option<String>,
1609
1610    /// Topic configuration (partitions, replication)
1611    #[serde(default)]
1612    #[validate(nested)]
1613    pub topic_config: SourceTopicConfigSpec,
1614}
1615
1616#[allow(dead_code)]
1617fn validate_connector_type(connector: &str) -> Result<(), ValidationError> {
1618    // Allow standard connectors and custom ones (must be alphanumeric with hyphens)
1619    static CONNECTOR_REGEX: LazyLock<Regex> =
1620        LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
1621    if !CONNECTOR_REGEX.is_match(connector) {
1622        return Err(ValidationError::new("invalid_connector_type")
1623            .with_message("connector type must be lowercase alphanumeric with hyphens".into()));
1624    }
1625    Ok(())
1626}
1627
1628/// Table specification for CDC sources
1629#[allow(dead_code)]
1630#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1631#[serde(rename_all = "camelCase")]
1632pub struct TableSpec {
1633    /// Schema/namespace (e.g., "public" for PostgreSQL)
1634    #[serde(default)]
1635    pub schema: Option<String>,
1636
1637    /// Table name
1638    #[validate(length(min = 1, max = 128, message = "table name must be 1-128 characters"))]
1639    pub table: String,
1640
1641    /// Override topic for this specific table
1642    #[serde(default)]
1643    pub topic: Option<String>,
1644
1645    /// Columns to include (empty = all columns)
1646    #[serde(default)]
1647    #[validate(length(max = 500, message = "maximum 500 columns per table"))]
1648    pub columns: Vec<String>,
1649
1650    /// Columns to exclude
1651    #[serde(default)]
1652    #[validate(length(max = 500, message = "maximum 500 excluded columns per table"))]
1653    pub exclude_columns: Vec<String>,
1654
1655    /// Column masking rules (column -> mask pattern)
1656    #[serde(default)]
1657    pub column_masks: std::collections::BTreeMap<String, String>,
1658}
1659
1660/// Source topic configuration
1661#[allow(dead_code)]
1662#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1663#[serde(rename_all = "camelCase")]
1664pub struct SourceTopicConfigSpec {
1665    /// Number of partitions for auto-created topics
1666    #[serde(default)]
1667    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1668    pub partitions: Option<i32>,
1669
1670    /// Replication factor for auto-created topics
1671    #[serde(default)]
1672    #[validate(range(
1673        min = 1,
1674        max = 10,
1675        message = "replication factor must be between 1 and 10"
1676    ))]
1677    pub replication_factor: Option<i32>,
1678
1679    /// Auto-create topics if they don't exist
1680    #[serde(default)]
1681    pub auto_create: Option<bool>,
1682}
1683
1684/// Sink connector specification (Kafka Connect style - generic config)
1685///
1686/// All connector-specific configuration goes in the generic `config` field.
1687/// Connector validation happens at runtime, not at CRD schema level.
1688/// This design scales to 300+ connectors without CRD changes.
1689#[allow(dead_code)]
1690#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1691#[serde(rename_all = "camelCase")]
1692pub struct SinkConnectorSpec {
1693    /// Unique name for this sink connector
1694    #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1695    #[validate(custom(function = "validate_k8s_name"))]
1696    pub name: String,
1697
1698    /// Connector type (stdout, s3, http, iceberg, delta, elasticsearch, etc.)
1699    #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1700    #[validate(custom(function = "validate_connector_type"))]
1701    pub connector: String,
1702
1703    /// Topics to consume from (supports wildcards like "cdc.*")
1704    #[validate(length(min = 1, max = 100, message = "must have 1-100 topics"))]
1705    pub topics: Vec<String>,
1706
1707    /// Consumer group for offset tracking
1708    #[validate(length(
1709        min = 1,
1710        max = 128,
1711        message = "consumer group must be 1-128 characters"
1712    ))]
1713    pub consumer_group: String,
1714
1715    /// Whether this sink is enabled
1716    #[serde(default = "default_true")]
1717    pub enabled: bool,
1718
1719    /// Starting offset (earliest, latest, or timestamp)
1720    #[serde(default = "default_start_offset")]
1721    #[validate(custom(function = "validate_start_offset"))]
1722    pub start_offset: String,
1723
1724    /// Connector-specific configuration (Kafka Connect style)
1725    /// All connector parameters go here. Validated at runtime by the controller.
1726    /// Example for iceberg: {"catalog": {...}, "namespace": "...", "table": "..."}
1727    #[serde(default)]
1728    pub config: serde_json::Value,
1729
1730    /// Secret reference for sensitive configuration (passwords, keys, tokens)
1731    /// The referenced Secret's data will be merged into config at runtime.
1732    #[serde(default)]
1733    #[validate(custom(function = "validate_optional_k8s_name"))]
1734    pub config_secret_ref: Option<String>,
1735
1736    /// Rate limiting configuration
1737    #[serde(default)]
1738    #[validate(nested)]
1739    pub rate_limit: RateLimitSpec,
1740}
1741
1742#[allow(dead_code)]
1743fn default_start_offset() -> String {
1744    "latest".to_string()
1745}
1746
1747#[allow(dead_code)]
1748fn validate_start_offset(offset: &str) -> Result<(), ValidationError> {
1749    match offset {
1750        "earliest" | "latest" => Ok(()),
1751        s if s.contains('T') && s.contains(':') => Ok(()), // ISO 8601 timestamp
1752        _ => Err(ValidationError::new("invalid_start_offset").with_message(
1753            "start offset must be 'earliest', 'latest', or ISO 8601 timestamp".into(),
1754        )),
1755    }
1756}
1757
1758/// Rate limiting configuration for sinks
1759#[allow(dead_code)]
1760#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1761#[serde(rename_all = "camelCase")]
1762pub struct RateLimitSpec {
1763    /// Maximum events per second (0 = unlimited)
1764    #[serde(default)]
1765    #[validate(range(
1766        min = 0,
1767        max = 1_000_000,
1768        message = "events per second must be 0-1000000"
1769    ))]
1770    pub events_per_second: u64,
1771
1772    /// Burst capacity (extra events above steady rate)
1773    #[serde(default)]
1774    #[validate(range(min = 0, max = 100_000, message = "burst capacity must be 0-100000"))]
1775    pub burst_capacity: Option<u64>,
1776}
1777
1778/// Global settings for all connectors
1779#[allow(dead_code)]
1780#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1781#[serde(rename_all = "camelCase")]
1782pub struct GlobalConnectSettings {
1783    /// Topic auto-creation settings
1784    #[serde(default)]
1785    #[validate(nested)]
1786    pub topic: TopicSettingsSpec,
1787
1788    /// Retry configuration
1789    #[serde(default)]
1790    #[validate(nested)]
1791    pub retry: RetryConfigSpec,
1792
1793    /// Health check configuration
1794    #[serde(default)]
1795    #[validate(nested)]
1796    pub health: HealthConfigSpec,
1797
1798    /// Metrics configuration
1799    #[serde(default)]
1800    #[validate(nested)]
1801    pub metrics: ConnectMetricsSpec,
1802}
1803
1804/// Topic settings for auto-creation
1805#[allow(dead_code)]
1806#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1807#[serde(rename_all = "camelCase")]
1808pub struct TopicSettingsSpec {
1809    /// Enable automatic topic creation
1810    #[serde(default = "default_true")]
1811    pub auto_create: bool,
1812
1813    /// Default partitions for new topics
1814    #[serde(default = "default_topic_partitions")]
1815    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1816    pub default_partitions: i32,
1817
1818    /// Default replication factor
1819    #[serde(default = "default_topic_replication")]
1820    #[validate(range(
1821        min = 1,
1822        max = 10,
1823        message = "replication factor must be between 1 and 10"
1824    ))]
1825    pub default_replication_factor: i32,
1826
1827    /// Fail if topic doesn't exist and auto_create is false
1828    #[serde(default = "default_true")]
1829    pub require_topic_exists: bool,
1830}
1831
1832#[allow(dead_code)]
1833fn default_topic_partitions() -> i32 {
1834    1
1835}
1836
1837#[allow(dead_code)]
1838fn default_topic_replication() -> i32 {
1839    1
1840}
1841
1842impl Default for TopicSettingsSpec {
1843    fn default() -> Self {
1844        Self {
1845            auto_create: true,
1846            default_partitions: 1,
1847            default_replication_factor: 1,
1848            require_topic_exists: true,
1849        }
1850    }
1851}
1852
1853/// Retry configuration
1854#[allow(dead_code)]
1855#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1856#[serde(rename_all = "camelCase")]
1857pub struct RetryConfigSpec {
1858    /// Maximum retry attempts
1859    #[serde(default = "default_max_retries")]
1860    #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
1861    pub max_retries: i32,
1862
1863    /// Initial backoff in milliseconds
1864    #[serde(default = "default_initial_backoff_ms")]
1865    #[validate(range(min = 10, max = 60000, message = "initial backoff must be 10-60000ms"))]
1866    pub initial_backoff_ms: i64,
1867
1868    /// Maximum backoff in milliseconds
1869    #[serde(default = "default_max_backoff_ms")]
1870    #[validate(range(
1871        min = 100,
1872        max = 3600000,
1873        message = "max backoff must be 100-3600000ms"
1874    ))]
1875    pub max_backoff_ms: i64,
1876
1877    /// Backoff multiplier
1878    #[serde(default = "default_backoff_multiplier")]
1879    pub backoff_multiplier: f64,
1880}
1881
1882#[allow(dead_code)]
1883fn default_max_retries() -> i32 {
1884    10
1885}
1886
1887#[allow(dead_code)]
1888fn default_initial_backoff_ms() -> i64 {
1889    100
1890}
1891
1892#[allow(dead_code)]
1893fn default_max_backoff_ms() -> i64 {
1894    30000
1895}
1896
1897#[allow(dead_code)]
1898fn default_backoff_multiplier() -> f64 {
1899    2.0
1900}
1901
1902impl Default for RetryConfigSpec {
1903    fn default() -> Self {
1904        Self {
1905            max_retries: 10,
1906            initial_backoff_ms: 100,
1907            max_backoff_ms: 30000,
1908            backoff_multiplier: 2.0,
1909        }
1910    }
1911}
1912
1913/// Health check configuration
1914#[allow(dead_code)]
1915#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1916#[serde(rename_all = "camelCase")]
1917pub struct HealthConfigSpec {
1918    /// Enable health check HTTP endpoint
1919    #[serde(default)]
1920    pub enabled: bool,
1921
1922    /// Health check port
1923    #[serde(default = "default_health_port")]
1924    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
1925    pub port: i32,
1926
1927    /// Health check path
1928    #[serde(default = "default_health_path")]
1929    pub path: String,
1930}
1931
1932#[allow(dead_code)]
1933fn default_health_port() -> i32 {
1934    8080
1935}
1936
1937#[allow(dead_code)]
1938fn default_health_path() -> String {
1939    "/health".to_string()
1940}
1941
1942impl Default for HealthConfigSpec {
1943    fn default() -> Self {
1944        Self {
1945            enabled: false,
1946            port: 8080,
1947            path: "/health".to_string(),
1948        }
1949    }
1950}
1951
1952/// Metrics configuration for connect
1953#[allow(dead_code)]
1954#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1955#[serde(rename_all = "camelCase")]
1956pub struct ConnectMetricsSpec {
1957    /// Enable metrics endpoint
1958    #[serde(default)]
1959    pub enabled: bool,
1960
1961    /// Metrics port
1962    #[serde(default = "default_connect_metrics_port")]
1963    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
1964    pub port: i32,
1965}
1966
1967#[allow(dead_code)]
1968fn default_connect_metrics_port() -> i32 {
1969    9091
1970}
1971
1972impl Default for ConnectMetricsSpec {
1973    fn default() -> Self {
1974        Self {
1975            enabled: false,
1976            port: 9091,
1977        }
1978    }
1979}
1980
1981/// TLS configuration for connect broker connection
1982#[allow(dead_code)]
1983#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1984#[serde(rename_all = "camelCase")]
1985pub struct ConnectTlsSpec {
1986    /// Enable TLS for broker connection
1987    #[serde(default)]
1988    pub enabled: bool,
1989
1990    /// Secret containing TLS certificates
1991    #[serde(default)]
1992    #[validate(custom(function = "validate_optional_k8s_name"))]
1993    pub cert_secret_name: Option<String>,
1994
1995    /// Enable mTLS (mutual TLS)
1996    #[serde(default)]
1997    pub mtls_enabled: bool,
1998
1999    /// CA secret name for mTLS
2000    #[serde(default)]
2001    #[validate(custom(function = "validate_optional_k8s_name"))]
2002    pub ca_secret_name: Option<String>,
2003
2004    /// Skip server certificate verification (DANGEROUS - testing only)
2005    #[serde(default)]
2006    pub insecure: bool,
2007}
2008
2009#[allow(dead_code)]
2010fn default_connect_replicas() -> i32 {
2011    1
2012}
2013
2014/// Status of a RivvenConnect resource
2015#[allow(dead_code)]
2016#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
2017#[serde(rename_all = "camelCase")]
2018pub struct RivvenConnectStatus {
2019    /// Current phase of the connect instance
2020    pub phase: ConnectPhase,
2021
2022    /// Total replicas
2023    pub replicas: i32,
2024
2025    /// Ready replicas
2026    pub ready_replicas: i32,
2027
2028    /// Number of source connectors running
2029    pub sources_running: i32,
2030
2031    /// Number of sink connectors running
2032    pub sinks_running: i32,
2033
2034    /// Total number of sources configured
2035    pub sources_total: i32,
2036
2037    /// Total number of sinks configured
2038    pub sinks_total: i32,
2039
2040    /// Current observed generation
2041    pub observed_generation: i64,
2042
2043    /// Conditions describing connect state
2044    #[serde(default)]
2045    pub conditions: Vec<ConnectCondition>,
2046
2047    /// Individual connector statuses
2048    #[serde(default)]
2049    pub connector_statuses: Vec<ConnectorStatus>,
2050
2051    /// Last time the status was updated
2052    pub last_updated: Option<String>,
2053
2054    /// Error message if any
2055    pub message: Option<String>,
2056}
2057
2058/// Phase of the connect lifecycle
2059#[allow(dead_code)]
2060#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
2061pub enum ConnectPhase {
2062    /// Connect is being created
2063    #[default]
2064    Pending,
2065    /// Connect is starting up
2066    Starting,
2067    /// Connect is running and healthy
2068    Running,
2069    /// Connect is partially healthy (some connectors failed)
2070    Degraded,
2071    /// Connect has failed
2072    Failed,
2073    /// Connect is being deleted
2074    Terminating,
2075}
2076
2077/// Condition describing an aspect of connect state
2078#[allow(dead_code)]
2079#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2080#[serde(rename_all = "camelCase")]
2081pub struct ConnectCondition {
2082    /// Type of condition (Ready, BrokerConnected, SourcesHealthy, SinksHealthy)
2083    #[serde(rename = "type")]
2084    pub condition_type: String,
2085
2086    /// Status of the condition (True, False, Unknown)
2087    pub status: String,
2088
2089    /// Reason for the condition
2090    pub reason: Option<String>,
2091
2092    /// Human-readable message
2093    pub message: Option<String>,
2094
2095    /// Last transition time
2096    pub last_transition_time: Option<String>,
2097}
2098
2099/// Status of an individual connector
2100#[allow(dead_code)]
2101#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2102#[serde(rename_all = "camelCase")]
2103pub struct ConnectorStatus {
2104    /// Connector name
2105    pub name: String,
2106
2107    /// Connector type (source or sink)
2108    pub connector_type: String,
2109
2110    /// Connector kind (postgres-cdc, stdout, etc.)
2111    pub kind: String,
2112
2113    /// Current state (running, stopped, failed)
2114    pub state: String,
2115
2116    /// Number of events processed
2117    pub events_processed: i64,
2118
2119    /// Last error message
2120    pub last_error: Option<String>,
2121
2122    /// Last successful operation time
2123    pub last_success_time: Option<String>,
2124}
2125
2126#[allow(dead_code)]
2127impl RivvenConnectSpec {
2128    /// Get the full container image including version
2129    pub fn get_image(&self) -> String {
2130        if let Some(ref image) = self.image {
2131            image.clone()
2132        } else {
2133            format!("ghcr.io/hupe1980/rivven-connect:{}", self.version)
2134        }
2135    }
2136
2137    /// Get labels for managed resources
2138    pub fn get_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2139        let mut labels = BTreeMap::new();
2140        labels.insert(
2141            "app.kubernetes.io/name".to_string(),
2142            "rivven-connect".to_string(),
2143        );
2144        labels.insert(
2145            "app.kubernetes.io/instance".to_string(),
2146            connect_name.to_string(),
2147        );
2148        labels.insert(
2149            "app.kubernetes.io/component".to_string(),
2150            "connector".to_string(),
2151        );
2152        labels.insert(
2153            "app.kubernetes.io/managed-by".to_string(),
2154            "rivven-operator".to_string(),
2155        );
2156        labels.insert(
2157            "app.kubernetes.io/version".to_string(),
2158            self.version.clone(),
2159        );
2160        labels
2161    }
2162
2163    /// Get selector labels for managed resources
2164    pub fn get_selector_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2165        let mut labels = BTreeMap::new();
2166        labels.insert(
2167            "app.kubernetes.io/name".to_string(),
2168            "rivven-connect".to_string(),
2169        );
2170        labels.insert(
2171            "app.kubernetes.io/instance".to_string(),
2172            connect_name.to_string(),
2173        );
2174        labels
2175    }
2176
2177    /// Get enabled sources count
2178    pub fn enabled_sources_count(&self) -> usize {
2179        self.sources.iter().filter(|s| s.enabled).count()
2180    }
2181
2182    /// Get enabled sinks count
2183    pub fn enabled_sinks_count(&self) -> usize {
2184        self.sinks.iter().filter(|s| s.enabled).count()
2185    }
2186}
2187
2188// ============================================================================
2189// Advanced CDC Configuration Types (Shared between PostgreSQL and MySQL CDC)
2190// ============================================================================
2191// These types map to rivven-cdc features and provide enterprise-grade CDC capabilities.
2192
2193/// Snapshot configuration for initial data load
2194#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2195#[serde(rename_all = "camelCase")]
2196pub struct SnapshotCdcConfigSpec {
2197    /// Batch size for SELECT queries (rows per batch)
2198    #[serde(default = "default_snapshot_batch_size")]
2199    #[validate(range(min = 100, max = 1000000, message = "batch size must be 100-1000000"))]
2200    pub batch_size: i32,
2201
2202    /// Number of tables to snapshot in parallel
2203    #[serde(default = "default_snapshot_parallel_tables")]
2204    #[validate(range(min = 1, max = 32, message = "parallel tables must be 1-32"))]
2205    pub parallel_tables: i32,
2206
2207    /// Query timeout in seconds
2208    #[serde(default = "default_snapshot_query_timeout")]
2209    #[validate(range(min = 10, max = 3600, message = "query timeout must be 10-3600s"))]
2210    pub query_timeout_secs: i32,
2211
2212    /// Delay between batches for backpressure (ms)
2213    #[serde(default)]
2214    #[validate(range(min = 0, max = 60000, message = "throttle delay must be 0-60000ms"))]
2215    pub throttle_delay_ms: i32,
2216
2217    /// Maximum retries per batch on failure
2218    #[serde(default = "default_snapshot_max_retries")]
2219    #[validate(range(min = 0, max = 10, message = "max retries must be 0-10"))]
2220    pub max_retries: i32,
2221
2222    /// Tables to include in snapshot (empty = all tables)
2223    #[serde(default)]
2224    pub include_tables: Vec<String>,
2225
2226    /// Tables to exclude from snapshot
2227    #[serde(default)]
2228    pub exclude_tables: Vec<String>,
2229}
2230
2231fn default_snapshot_batch_size() -> i32 {
2232    10_000
2233}
2234fn default_snapshot_parallel_tables() -> i32 {
2235    4
2236}
2237fn default_snapshot_query_timeout() -> i32 {
2238    300
2239}
2240fn default_snapshot_max_retries() -> i32 {
2241    3
2242}
2243
2244/// Incremental snapshot configuration (non-blocking re-snapshots)
2245#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2246#[serde(rename_all = "camelCase")]
2247pub struct IncrementalSnapshotSpec {
2248    /// Enable incremental snapshots
2249    #[serde(default)]
2250    pub enabled: bool,
2251
2252    /// Chunk size (rows per chunk)
2253    #[serde(default = "default_incremental_chunk_size")]
2254    #[validate(range(min = 100, max = 100000, message = "chunk size must be 100-100000"))]
2255    pub chunk_size: i32,
2256
2257    /// Watermark strategy: insert, update_and_insert
2258    #[serde(default)]
2259    #[validate(custom(function = "validate_watermark_strategy"))]
2260    pub watermark_strategy: String,
2261
2262    /// Signal table for watermark signals
2263    #[serde(default)]
2264    pub watermark_signal_table: Option<String>,
2265
2266    /// Maximum concurrent chunks
2267    #[serde(default = "default_incremental_max_chunks")]
2268    #[validate(range(min = 1, max = 16, message = "max concurrent chunks must be 1-16"))]
2269    pub max_concurrent_chunks: i32,
2270
2271    /// Delay between chunks (ms)
2272    #[serde(default)]
2273    #[validate(range(min = 0, max = 60000, message = "chunk delay must be 0-60000ms"))]
2274    pub chunk_delay_ms: i32,
2275}
2276
2277fn default_incremental_chunk_size() -> i32 {
2278    1024
2279}
2280fn default_incremental_max_chunks() -> i32 {
2281    1
2282}
2283
2284fn validate_watermark_strategy(strategy: &str) -> Result<(), ValidationError> {
2285    match strategy {
2286        "" | "insert" | "update_and_insert" => Ok(()),
2287        _ => Err(ValidationError::new("invalid_watermark_strategy")
2288            .with_message("watermark strategy must be: insert or update_and_insert".into())),
2289    }
2290}
2291
2292/// Signal table configuration for ad-hoc snapshots and control
2293#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2294#[serde(rename_all = "camelCase")]
2295pub struct SignalTableSpec {
2296    /// Enable signal processing
2297    #[serde(default)]
2298    pub enabled: bool,
2299
2300    /// Fully-qualified signal table name (schema.table)
2301    #[serde(default)]
2302    pub data_collection: Option<String>,
2303
2304    /// Topic for signal messages (alternative to table)
2305    #[serde(default)]
2306    pub topic: Option<String>,
2307
2308    /// Enabled signal channels: source, topic, file, api
2309    #[serde(default)]
2310    pub enabled_channels: Vec<String>,
2311
2312    /// Poll interval for file channel (ms)
2313    #[serde(default = "default_signal_poll_interval")]
2314    pub poll_interval_ms: i32,
2315}
2316
2317fn default_signal_poll_interval() -> i32 {
2318    1000
2319}
2320
2321/// Heartbeat monitoring configuration
2322#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2323#[serde(rename_all = "camelCase")]
2324pub struct HeartbeatCdcSpec {
2325    /// Enable heartbeat monitoring
2326    #[serde(default)]
2327    pub enabled: bool,
2328
2329    /// Heartbeat interval in seconds
2330    #[serde(default = "default_heartbeat_cdc_interval")]
2331    #[validate(range(min = 1, max = 3600, message = "heartbeat interval must be 1-3600s"))]
2332    pub interval_secs: i32,
2333
2334    /// Maximum allowed lag before marking as unhealthy (seconds)
2335    #[serde(default = "default_heartbeat_max_lag")]
2336    #[validate(range(min = 10, max = 86400, message = "max lag must be 10-86400s"))]
2337    pub max_lag_secs: i32,
2338
2339    /// Emit heartbeat events to a topic
2340    #[serde(default)]
2341    pub emit_events: bool,
2342
2343    /// Topic name for heartbeat events
2344    #[serde(default)]
2345    pub topic: Option<String>,
2346
2347    /// SQL query to execute on each heartbeat (keeps connections alive)
2348    #[serde(default)]
2349    pub action_query: Option<String>,
2350}
2351
2352fn default_heartbeat_cdc_interval() -> i32 {
2353    10
2354}
2355fn default_heartbeat_max_lag() -> i32 {
2356    300
2357}
2358
2359/// Event deduplication configuration
2360#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2361#[serde(rename_all = "camelCase")]
2362pub struct DeduplicationCdcSpec {
2363    /// Enable deduplication
2364    #[serde(default)]
2365    pub enabled: bool,
2366
2367    /// Bloom filter expected insertions
2368    #[serde(default = "default_bloom_expected")]
2369    #[validate(range(
2370        min = 1000,
2371        max = 10000000,
2372        message = "bloom expected must be 1000-10M"
2373    ))]
2374    pub bloom_expected_insertions: i64,
2375
2376    /// Bloom filter false positive rate (0.001-0.1)
2377    #[serde(default = "default_bloom_fpp")]
2378    pub bloom_fpp: f64,
2379
2380    /// LRU cache size
2381    #[serde(default = "default_lru_size")]
2382    #[validate(range(min = 1000, max = 1000000, message = "LRU size must be 1000-1M"))]
2383    pub lru_size: i64,
2384
2385    /// Deduplication window (seconds)
2386    #[serde(default = "default_dedup_window")]
2387    #[validate(range(min = 60, max = 604800, message = "window must be 60-604800s"))]
2388    pub window_secs: i64,
2389}
2390
2391fn default_bloom_expected() -> i64 {
2392    100_000
2393}
2394fn default_bloom_fpp() -> f64 {
2395    0.01
2396}
2397fn default_lru_size() -> i64 {
2398    10_000
2399}
2400fn default_dedup_window() -> i64 {
2401    3600
2402}
2403
2404/// Transaction topic configuration
2405#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2406#[serde(rename_all = "camelCase")]
2407pub struct TransactionTopicSpec {
2408    /// Enable transaction topic
2409    #[serde(default)]
2410    pub enabled: bool,
2411
2412    /// Transaction topic name (default: {database}.transaction)
2413    #[serde(default)]
2414    pub topic_name: Option<String>,
2415
2416    /// Include transaction data collections in events
2417    #[serde(default = "default_true_cdc")]
2418    pub include_data_collections: bool,
2419
2420    /// Minimum events in transaction to emit (filter small txns)
2421    #[serde(default)]
2422    #[validate(range(min = 0, max = 10000, message = "min events must be 0-10000"))]
2423    pub min_events_threshold: i32,
2424}
2425
2426fn default_true_cdc() -> bool {
2427    true
2428}
2429
2430/// Schema change topic configuration
2431#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2432#[serde(rename_all = "camelCase")]
2433pub struct SchemaChangeTopicSpec {
2434    /// Enable schema change topic
2435    #[serde(default)]
2436    pub enabled: bool,
2437
2438    /// Schema change topic name (default: {database}.schema_changes)
2439    #[serde(default)]
2440    pub topic_name: Option<String>,
2441
2442    /// Include table columns in change events
2443    #[serde(default = "default_true_cdc")]
2444    pub include_columns: bool,
2445
2446    /// Schemas to monitor for changes (empty = all)
2447    #[serde(default)]
2448    pub schemas: Vec<String>,
2449}
2450
2451/// Tombstone configuration for log compaction
2452#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2453#[serde(rename_all = "camelCase")]
2454pub struct TombstoneCdcSpec {
2455    /// Enable tombstone events for deletes
2456    #[serde(default)]
2457    pub enabled: bool,
2458
2459    /// Emit tombstone after delete event
2460    #[serde(default = "default_true_cdc")]
2461    pub after_delete: bool,
2462
2463    /// Tombstone behavior: emit_null, emit_with_key
2464    #[serde(default)]
2465    #[validate(custom(function = "validate_tombstone_behavior"))]
2466    pub behavior: String,
2467}
2468
2469fn validate_tombstone_behavior(behavior: &str) -> Result<(), ValidationError> {
2470    match behavior {
2471        "" | "emit_null" | "emit_with_key" => Ok(()),
2472        _ => Err(ValidationError::new("invalid_tombstone_behavior")
2473            .with_message("tombstone behavior must be: emit_null or emit_with_key".into())),
2474    }
2475}
2476
2477/// Field-level encryption configuration
2478#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2479#[serde(rename_all = "camelCase")]
2480pub struct FieldEncryptionSpec {
2481    /// Enable field encryption
2482    #[serde(default)]
2483    pub enabled: bool,
2484
2485    /// Secret reference containing encryption key
2486    #[serde(default)]
2487    #[validate(custom(function = "validate_optional_k8s_name"))]
2488    pub key_secret_ref: Option<String>,
2489
2490    /// Fields to encrypt (table.column format)
2491    #[serde(default)]
2492    pub fields: Vec<String>,
2493
2494    /// Encryption algorithm (default: aes-256-gcm)
2495    #[serde(default = "default_encryption_algorithm")]
2496    pub algorithm: String,
2497}
2498
2499fn default_encryption_algorithm() -> String {
2500    "aes-256-gcm".to_string()
2501}
2502
2503/// Read-only replica configuration
2504#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2505#[serde(rename_all = "camelCase")]
2506pub struct ReadOnlyReplicaSpec {
2507    /// Enable read-only mode (for connecting to replicas)
2508    #[serde(default)]
2509    pub enabled: bool,
2510
2511    /// Replication lag threshold before warnings (ms)
2512    #[serde(default = "default_lag_threshold")]
2513    #[validate(range(
2514        min = 100,
2515        max = 300000,
2516        message = "lag threshold must be 100-300000ms"
2517    ))]
2518    pub lag_threshold_ms: i64,
2519
2520    /// Enable deduplication (required for replicas)
2521    #[serde(default = "default_true_cdc")]
2522    pub deduplicate: bool,
2523
2524    /// Watermark source: primary, replica
2525    #[serde(default)]
2526    #[validate(custom(function = "validate_watermark_source"))]
2527    pub watermark_source: String,
2528}
2529
2530fn default_lag_threshold() -> i64 {
2531    5000
2532}
2533
2534fn validate_watermark_source(source: &str) -> Result<(), ValidationError> {
2535    match source {
2536        "" | "primary" | "replica" => Ok(()),
2537        _ => Err(ValidationError::new("invalid_watermark_source")
2538            .with_message("watermark source must be: primary or replica".into())),
2539    }
2540}
2541
2542/// Event router configuration with dead letter queue support
2543#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2544#[serde(rename_all = "camelCase")]
2545pub struct EventRouterSpec {
2546    /// Enable event routing
2547    #[serde(default)]
2548    pub enabled: bool,
2549
2550    /// Default destination when no rules match
2551    #[serde(default)]
2552    pub default_destination: Option<String>,
2553
2554    /// Dead letter queue for unroutable events
2555    #[serde(default)]
2556    pub dead_letter_queue: Option<String>,
2557
2558    /// Drop unroutable events instead of sending to DLQ
2559    #[serde(default)]
2560    pub drop_unroutable: bool,
2561
2562    /// Routing rules (evaluated in priority order)
2563    #[serde(default)]
2564    #[validate(nested)]
2565    pub rules: Vec<RouteRuleSpec>,
2566}
2567
2568/// Routing rule specification
2569#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2570#[serde(rename_all = "camelCase")]
2571pub struct RouteRuleSpec {
2572    /// Rule name (for logging/debugging)
2573    #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
2574    pub name: String,
2575
2576    /// Priority (higher = evaluated first)
2577    #[serde(default)]
2578    pub priority: i32,
2579
2580    /// Routing condition type: always, table, table_pattern, schema, operation, field_equals, field_exists
2581    #[validate(custom(function = "validate_route_condition_type"))]
2582    pub condition_type: String,
2583
2584    /// Condition value (table name, pattern, field, etc.)
2585    #[serde(default)]
2586    pub condition_value: Option<String>,
2587
2588    /// Second condition value (for field_equals: expected value)
2589    #[serde(default)]
2590    pub condition_value2: Option<String>,
2591
2592    /// Destination(s) for matching events
2593    pub destinations: Vec<String>,
2594
2595    /// Continue evaluating rules after match (fan-out)
2596    #[serde(default)]
2597    pub continue_matching: bool,
2598}
2599
2600fn validate_route_condition_type(condition: &str) -> Result<(), ValidationError> {
2601    match condition {
2602        "always" | "table" | "table_pattern" | "schema" | "operation" | "field_equals"
2603        | "field_exists" => Ok(()),
2604        _ => Err(ValidationError::new("invalid_route_condition_type").with_message(
2605            "condition type must be: always, table, table_pattern, schema, operation, field_equals, or field_exists".into(),
2606        )),
2607    }
2608}
2609
2610/// Partitioner configuration for Kafka-style partitioning
2611#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2612#[serde(rename_all = "camelCase")]
2613pub struct PartitionerSpec {
2614    /// Enable partitioning
2615    #[serde(default)]
2616    pub enabled: bool,
2617
2618    /// Number of partitions
2619    #[serde(default = "default_num_partitions")]
2620    #[validate(range(min = 1, max = 10000, message = "num partitions must be 1-10000"))]
2621    pub num_partitions: i32,
2622
2623    /// Partitioning strategy: round_robin, key_hash, table_hash, full_table_hash, sticky
2624    #[serde(default = "default_partition_strategy")]
2625    #[validate(custom(function = "validate_partition_strategy"))]
2626    pub strategy: String,
2627
2628    /// Key columns for key_hash strategy
2629    #[serde(default)]
2630    pub key_columns: Vec<String>,
2631
2632    /// Fixed partition for sticky strategy
2633    #[serde(default)]
2634    pub sticky_partition: Option<i32>,
2635}
2636
2637fn default_num_partitions() -> i32 {
2638    16
2639}
2640fn default_partition_strategy() -> String {
2641    "key_hash".to_string()
2642}
2643
2644fn validate_partition_strategy(strategy: &str) -> Result<(), ValidationError> {
2645    match strategy {
2646        "round_robin" | "key_hash" | "table_hash" | "full_table_hash" | "sticky" => Ok(()),
2647        _ => Err(ValidationError::new("invalid_partition_strategy").with_message(
2648            "partition strategy must be: round_robin, key_hash, table_hash, full_table_hash, or sticky".into(),
2649        )),
2650    }
2651}
2652
2653/// Single Message Transform (SMT) configuration
2654#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2655#[serde(rename_all = "camelCase")]
2656pub struct SmtTransformSpec {
2657    /// Transform type (see SMT documentation for available transforms)
2658    #[validate(custom(function = "validate_smt_transform_type"))]
2659    pub transform_type: String,
2660
2661    /// Transform name (optional, for logging)
2662    #[serde(default)]
2663    pub name: Option<String>,
2664
2665    /// Transform-specific configuration
2666    #[serde(default)]
2667    pub config: serde_json::Value,
2668}
2669
2670fn validate_smt_transform_type(transform: &str) -> Result<(), ValidationError> {
2671    match transform {
2672        "extract_new_record_state" | "value_to_key" | "timestamp_converter"
2673        | "timezone_converter" | "mask_field" | "filter" | "flatten" | "insert_field"
2674        | "rename_field" | "replace_field" | "cast" | "regex_router" | "content_router"
2675        | "header_to_value" | "unwrap" | "set_null" | "compute_field" | "conditional_smt" => Ok(()),
2676        _ => Err(ValidationError::new("invalid_smt_transform_type").with_message(
2677            "transform type must be a valid SMT (extract_new_record_state, mask_field, filter, etc.)".into(),
2678        )),
2679    }
2680}
2681
2682/// Parallel CDC processing configuration
2683#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2684#[serde(rename_all = "camelCase")]
2685pub struct ParallelCdcSpec {
2686    /// Enable parallel processing
2687    #[serde(default)]
2688    pub enabled: bool,
2689
2690    /// Maximum concurrent table workers
2691    #[serde(default = "default_parallel_concurrency")]
2692    #[validate(range(min = 1, max = 64, message = "concurrency must be 1-64"))]
2693    pub concurrency: i32,
2694
2695    /// Buffer size per table
2696    #[serde(default = "default_per_table_buffer")]
2697    #[validate(range(
2698        min = 100,
2699        max = 100000,
2700        message = "per table buffer must be 100-100000"
2701    ))]
2702    pub per_table_buffer: i32,
2703
2704    /// Output channel buffer size
2705    #[serde(default = "default_output_buffer")]
2706    #[validate(range(min = 1000, max = 1000000, message = "output buffer must be 1000-1M"))]
2707    pub output_buffer: i32,
2708
2709    /// Enable work stealing between workers
2710    #[serde(default = "default_true_cdc")]
2711    pub work_stealing: bool,
2712
2713    /// Maximum events per second per table (None = unlimited)
2714    #[serde(default)]
2715    pub per_table_rate_limit: Option<i64>,
2716
2717    /// Shutdown timeout in seconds
2718    #[serde(default = "default_shutdown_timeout")]
2719    #[validate(range(min = 1, max = 300, message = "shutdown timeout must be 1-300s"))]
2720    pub shutdown_timeout_secs: i32,
2721}
2722
2723fn default_parallel_concurrency() -> i32 {
2724    4
2725}
2726fn default_per_table_buffer() -> i32 {
2727    1000
2728}
2729fn default_output_buffer() -> i32 {
2730    10_000
2731}
2732fn default_shutdown_timeout() -> i32 {
2733    30
2734}
2735
2736/// Outbox pattern configuration
2737#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2738#[serde(rename_all = "camelCase")]
2739pub struct OutboxSpec {
2740    /// Enable outbox pattern
2741    #[serde(default)]
2742    pub enabled: bool,
2743
2744    /// Outbox table name
2745    #[serde(default = "default_outbox_table")]
2746    #[validate(length(min = 1, max = 128, message = "outbox table must be 1-128 characters"))]
2747    pub table_name: String,
2748
2749    /// Poll interval in milliseconds
2750    #[serde(default = "default_outbox_poll_interval")]
2751    #[validate(range(min = 10, max = 60000, message = "poll interval must be 10-60000ms"))]
2752    pub poll_interval_ms: i32,
2753
2754    /// Maximum events per batch
2755    #[serde(default = "default_outbox_batch_size")]
2756    #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2757    pub batch_size: i32,
2758
2759    /// Maximum delivery retries before DLQ
2760    #[serde(default = "default_outbox_max_retries")]
2761    #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
2762    pub max_retries: i32,
2763
2764    /// Delivery timeout in seconds
2765    #[serde(default = "default_outbox_timeout")]
2766    #[validate(range(min = 1, max = 300, message = "timeout must be 1-300s"))]
2767    pub delivery_timeout_secs: i32,
2768
2769    /// Enable ordered delivery per aggregate
2770    #[serde(default = "default_true_cdc")]
2771    pub ordered_delivery: bool,
2772
2773    /// Retention period for delivered events in seconds
2774    #[serde(default = "default_outbox_retention")]
2775    #[validate(range(min = 60, max = 604800, message = "retention must be 60-604800s"))]
2776    pub retention_secs: i64,
2777
2778    /// Maximum concurrent deliveries
2779    #[serde(default = "default_outbox_concurrency")]
2780    #[validate(range(min = 1, max = 100, message = "concurrency must be 1-100"))]
2781    pub max_concurrency: i32,
2782}
2783
2784fn default_outbox_table() -> String {
2785    "outbox".to_string()
2786}
2787fn default_outbox_poll_interval() -> i32 {
2788    100
2789}
2790fn default_outbox_batch_size() -> i32 {
2791    100
2792}
2793fn default_outbox_max_retries() -> i32 {
2794    3
2795}
2796fn default_outbox_timeout() -> i32 {
2797    30
2798}
2799fn default_outbox_retention() -> i64 {
2800    86400
2801}
2802fn default_outbox_concurrency() -> i32 {
2803    10
2804}
2805
2806/// Health monitoring configuration for CDC connectors
2807#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2808#[serde(rename_all = "camelCase")]
2809pub struct HealthMonitorSpec {
2810    /// Enable health monitoring
2811    #[serde(default)]
2812    pub enabled: bool,
2813
2814    /// Interval between health checks in seconds
2815    #[serde(default = "default_health_check_interval")]
2816    #[validate(range(min = 1, max = 300, message = "check interval must be 1-300s"))]
2817    pub check_interval_secs: i32,
2818
2819    /// Maximum allowed replication lag in milliseconds
2820    #[serde(default = "default_health_max_lag")]
2821    #[validate(range(min = 1000, max = 3600000, message = "max lag must be 1000-3600000ms"))]
2822    pub max_lag_ms: i64,
2823
2824    /// Number of failed checks before marking unhealthy
2825    #[serde(default = "default_health_failure_threshold")]
2826    #[validate(range(min = 1, max = 10, message = "failure threshold must be 1-10"))]
2827    pub failure_threshold: i32,
2828
2829    /// Number of successful checks to recover
2830    #[serde(default = "default_health_success_threshold")]
2831    #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
2832    pub success_threshold: i32,
2833
2834    /// Timeout for individual health checks in seconds
2835    #[serde(default = "default_health_check_timeout")]
2836    #[validate(range(min = 1, max = 60, message = "check timeout must be 1-60s"))]
2837    pub check_timeout_secs: i32,
2838
2839    /// Enable automatic recovery on failure
2840    #[serde(default = "default_true_cdc")]
2841    pub auto_recovery: bool,
2842
2843    /// Initial recovery delay in seconds
2844    #[serde(default = "default_health_recovery_delay")]
2845    #[validate(range(min = 1, max = 300, message = "recovery delay must be 1-300s"))]
2846    pub recovery_delay_secs: i32,
2847
2848    /// Maximum recovery delay in seconds (for exponential backoff)
2849    #[serde(default = "default_health_max_recovery_delay")]
2850    #[validate(range(min = 1, max = 3600, message = "max recovery delay must be 1-3600s"))]
2851    pub max_recovery_delay_secs: i32,
2852}
2853
2854fn default_health_check_interval() -> i32 {
2855    10
2856}
2857fn default_health_max_lag() -> i64 {
2858    30_000
2859}
2860fn default_health_failure_threshold() -> i32 {
2861    3
2862}
2863fn default_health_success_threshold() -> i32 {
2864    2
2865}
2866fn default_health_check_timeout() -> i32 {
2867    5
2868}
2869fn default_health_recovery_delay() -> i32 {
2870    1
2871}
2872fn default_health_max_recovery_delay() -> i32 {
2873    60
2874}
2875
2876// ============================================================================
2877// RivvenSchemaRegistry CRD - Schema Registry for Rivven
2878// ============================================================================
2879
2880/// RivvenSchemaRegistry custom resource for managing Schema Registry instances
2881///
2882/// This CRD allows declarative management of Rivven Schema Registry deployments,
2883/// a Confluent-compatible schema registry supporting Avro, JSON Schema, and Protobuf.
2884///
2885/// # Example
2886///
2887/// ```yaml
2888/// apiVersion: rivven.hupe1980.github.io/v1alpha1
2889/// kind: RivvenSchemaRegistry
2890/// metadata:
2891///   name: production-registry
2892///   namespace: rivven
2893/// spec:
2894///   clusterRef:
2895///     name: my-rivven-cluster
2896///   replicas: 3
2897///   version: "0.0.1"
2898///   storage:
2899///     mode: broker
2900///     topic: _schemas
2901///   compatibility:
2902///     default: BACKWARD
2903///   schemas:
2904///     avro: true
2905///     jsonSchema: true
2906///     protobuf: true
2907///   contexts:
2908///     enabled: true
2909///   tls:
2910///     enabled: true
2911///     certSecretName: schema-registry-tls
2912/// ```
2913#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2914#[kube(
2915    group = "rivven.hupe1980.github.io",
2916    version = "v1alpha1",
2917    kind = "RivvenSchemaRegistry",
2918    plural = "rivvenschemaregistries",
2919    shortname = "rsr",
2920    namespaced,
2921    status = "RivvenSchemaRegistryStatus",
2922    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
2923    printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
2924    printcolumn = r#"{"name":"Ready","type":"integer","jsonPath":".status.readyReplicas"}"#,
2925    printcolumn = r#"{"name":"Schemas","type":"integer","jsonPath":".status.schemasRegistered"}"#,
2926    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
2927    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
2928)]
2929#[serde(rename_all = "camelCase")]
2930pub struct RivvenSchemaRegistrySpec {
2931    /// Reference to the RivvenCluster this registry connects to
2932    #[validate(nested)]
2933    pub cluster_ref: ClusterReference,
2934
2935    /// Number of registry replicas (1-10)
2936    #[serde(default = "default_schema_registry_replicas")]
2937    #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
2938    pub replicas: i32,
2939
2940    /// Schema Registry version
2941    #[serde(default = "default_version")]
2942    pub version: String,
2943
2944    /// Custom container image (overrides version-based default)
2945    #[serde(default)]
2946    #[validate(custom(function = "validate_optional_image"))]
2947    pub image: Option<String>,
2948
2949    /// Image pull policy
2950    #[serde(default = "default_image_pull_policy")]
2951    #[validate(custom(function = "validate_pull_policy"))]
2952    pub image_pull_policy: String,
2953
2954    /// Image pull secrets
2955    #[serde(default)]
2956    pub image_pull_secrets: Vec<String>,
2957
2958    /// Resource requests/limits
2959    #[serde(default)]
2960    pub resources: Option<serde_json::Value>,
2961
2962    /// HTTP server configuration
2963    #[serde(default)]
2964    #[validate(nested)]
2965    pub server: SchemaRegistryServerSpec,
2966
2967    /// Storage backend configuration
2968    #[serde(default)]
2969    #[validate(nested)]
2970    pub storage: SchemaRegistryStorageSpec,
2971
2972    /// Compatibility configuration
2973    #[serde(default)]
2974    #[validate(nested)]
2975    pub compatibility: SchemaCompatibilitySpec,
2976
2977    /// Schema format support configuration
2978    #[serde(default)]
2979    #[validate(nested)]
2980    pub schemas: SchemaFormatSpec,
2981
2982    /// Schema contexts (multi-tenant) configuration
2983    #[serde(default)]
2984    #[validate(nested)]
2985    pub contexts: SchemaContextsSpec,
2986
2987    /// Validation rules configuration
2988    #[serde(default)]
2989    #[validate(nested)]
2990    pub validation: SchemaValidationSpec,
2991
2992    /// Authentication configuration
2993    #[serde(default)]
2994    #[validate(nested)]
2995    pub auth: SchemaRegistryAuthSpec,
2996
2997    /// TLS configuration
2998    #[serde(default)]
2999    #[validate(nested)]
3000    pub tls: SchemaRegistryTlsSpec,
3001
3002    /// Metrics configuration
3003    #[serde(default)]
3004    #[validate(nested)]
3005    pub metrics: SchemaRegistryMetricsSpec,
3006
3007    /// External registry configuration (for mirroring/sync)
3008    #[serde(default)]
3009    #[validate(nested)]
3010    pub external: ExternalRegistrySpec,
3011
3012    /// Pod annotations
3013    #[serde(default)]
3014    #[validate(custom(function = "validate_annotations"))]
3015    pub pod_annotations: BTreeMap<String, String>,
3016
3017    /// Pod labels
3018    #[serde(default)]
3019    #[validate(custom(function = "validate_labels"))]
3020    pub pod_labels: BTreeMap<String, String>,
3021
3022    /// Environment variables
3023    #[serde(default)]
3024    #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
3025    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
3026
3027    /// Node selector for pod scheduling
3028    #[serde(default)]
3029    pub node_selector: BTreeMap<String, String>,
3030
3031    /// Pod tolerations
3032    #[serde(default)]
3033    #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
3034    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
3035
3036    /// Pod affinity rules
3037    #[serde(default)]
3038    pub affinity: Option<serde_json::Value>,
3039
3040    /// Service account name
3041    #[serde(default)]
3042    #[validate(custom(function = "validate_optional_k8s_name"))]
3043    pub service_account: Option<String>,
3044
3045    /// Pod security context
3046    #[serde(default)]
3047    pub security_context: Option<serde_json::Value>,
3048
3049    /// Container security context
3050    #[serde(default)]
3051    pub container_security_context: Option<serde_json::Value>,
3052
3053    /// Liveness probe configuration
3054    #[serde(default)]
3055    #[validate(nested)]
3056    pub liveness_probe: ProbeSpec,
3057
3058    /// Readiness probe configuration
3059    #[serde(default)]
3060    #[validate(nested)]
3061    pub readiness_probe: ProbeSpec,
3062
3063    /// Pod disruption budget configuration
3064    #[serde(default)]
3065    #[validate(nested)]
3066    pub pod_disruption_budget: PdbSpec,
3067}
3068
3069fn default_schema_registry_replicas() -> i32 {
3070    1
3071}
3072
3073/// HTTP server configuration for Schema Registry
3074#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3075#[serde(rename_all = "camelCase")]
3076pub struct SchemaRegistryServerSpec {
3077    /// HTTP server port (1024-65535)
3078    #[serde(default = "default_schema_registry_port")]
3079    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3080    pub port: i32,
3081
3082    /// Bind address (default: 0.0.0.0)
3083    #[serde(default = "default_bind_address")]
3084    pub bind_address: String,
3085
3086    /// Request timeout in seconds
3087    #[serde(default = "default_request_timeout")]
3088    #[validate(range(min = 1, max = 300, message = "timeout must be 1-300 seconds"))]
3089    pub timeout_seconds: i32,
3090
3091    /// Maximum request body size in bytes (default: 10MB)
3092    #[serde(default = "default_max_request_size")]
3093    #[validate(range(
3094        min = 1024,
3095        max = 104857600,
3096        message = "max request size must be 1KB-100MB"
3097    ))]
3098    pub max_request_size: i64,
3099
3100    /// Enable CORS (for web clients)
3101    #[serde(default)]
3102    pub cors_enabled: bool,
3103
3104    /// CORS allowed origins (empty = all)
3105    #[serde(default)]
3106    pub cors_allowed_origins: Vec<String>,
3107}
3108
3109fn default_schema_registry_port() -> i32 {
3110    8081
3111}
3112
3113fn default_bind_address() -> String {
3114    "0.0.0.0".to_string()
3115}
3116
3117fn default_request_timeout() -> i32 {
3118    30
3119}
3120
3121fn default_max_request_size() -> i64 {
3122    10_485_760 // 10MB
3123}
3124
3125impl Default for SchemaRegistryServerSpec {
3126    fn default() -> Self {
3127        Self {
3128            port: 8081,
3129            bind_address: "0.0.0.0".to_string(),
3130            timeout_seconds: 30,
3131            max_request_size: 10_485_760,
3132            cors_enabled: false,
3133            cors_allowed_origins: vec![],
3134        }
3135    }
3136}
3137
3138/// Storage configuration for Schema Registry
3139#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3140#[serde(rename_all = "camelCase")]
3141pub struct SchemaRegistryStorageSpec {
3142    /// Storage mode: memory, broker
3143    #[serde(default = "default_storage_mode")]
3144    #[validate(custom(function = "validate_storage_mode"))]
3145    pub mode: String,
3146
3147    /// Rivven topic for broker-backed storage (required if mode=broker)
3148    #[serde(default = "default_schema_topic")]
3149    #[validate(length(max = 255, message = "topic name max 255 characters"))]
3150    pub topic: String,
3151
3152    /// Topic replication factor
3153    #[serde(default = "default_schema_topic_replication")]
3154    #[validate(range(min = 1, max = 10, message = "replication factor must be 1-10"))]
3155    pub replication_factor: i32,
3156
3157    /// Topic partitions (usually 1 for ordered processing)
3158    #[serde(default = "default_schema_topic_partitions")]
3159    #[validate(range(min = 1, max = 100, message = "partitions must be 1-100"))]
3160    pub partitions: i32,
3161
3162    /// Enable schema normalization
3163    #[serde(default = "default_true")]
3164    pub normalize: bool,
3165}
3166
3167fn default_storage_mode() -> String {
3168    "broker".to_string()
3169}
3170
3171fn default_schema_topic() -> String {
3172    "_schemas".to_string()
3173}
3174
3175fn default_schema_topic_replication() -> i32 {
3176    3
3177}
3178
3179fn default_schema_topic_partitions() -> i32 {
3180    1
3181}
3182
3183fn validate_storage_mode(mode: &str) -> Result<(), ValidationError> {
3184    match mode {
3185        "memory" | "broker" => Ok(()),
3186        _ => Err(ValidationError::new("invalid_storage_mode")
3187            .with_message("storage mode must be 'memory' or 'broker'".into())),
3188    }
3189}
3190
3191impl Default for SchemaRegistryStorageSpec {
3192    fn default() -> Self {
3193        Self {
3194            mode: "broker".to_string(),
3195            topic: "_schemas".to_string(),
3196            replication_factor: 3,
3197            partitions: 1,
3198            normalize: true,
3199        }
3200    }
3201}
3202
3203/// Schema compatibility configuration
3204#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3205#[serde(rename_all = "camelCase")]
3206pub struct SchemaCompatibilitySpec {
3207    /// Default compatibility level for new subjects
3208    #[serde(default = "default_compatibility_level")]
3209    #[validate(custom(function = "validate_compatibility_level"))]
3210    pub default_level: String,
3211
3212    /// Allow per-subject compatibility overrides
3213    #[serde(default = "default_true")]
3214    pub allow_overrides: bool,
3215
3216    /// Per-subject compatibility levels
3217    #[serde(default)]
3218    #[validate(custom(function = "validate_subject_compatibility_map"))]
3219    pub subjects: BTreeMap<String, String>,
3220}
3221
3222fn default_compatibility_level() -> String {
3223    "BACKWARD".to_string()
3224}
3225
3226fn validate_compatibility_level(level: &str) -> Result<(), ValidationError> {
3227    let valid_levels = [
3228        "BACKWARD",
3229        "BACKWARD_TRANSITIVE",
3230        "FORWARD",
3231        "FORWARD_TRANSITIVE",
3232        "FULL",
3233        "FULL_TRANSITIVE",
3234        "NONE",
3235    ];
3236    if valid_levels.contains(&level) {
3237        Ok(())
3238    } else {
3239        Err(
3240            ValidationError::new("invalid_compatibility_level").with_message(
3241                format!(
3242                    "'{}' is not valid. Must be one of: {:?}",
3243                    level, valid_levels
3244                )
3245                .into(),
3246            ),
3247        )
3248    }
3249}
3250
3251fn validate_subject_compatibility_map(
3252    subjects: &BTreeMap<String, String>,
3253) -> Result<(), ValidationError> {
3254    if subjects.len() > 1000 {
3255        return Err(ValidationError::new("too_many_subjects")
3256            .with_message("maximum 1000 per-subject compatibility entries".into()));
3257    }
3258    for (subject, level) in subjects {
3259        if subject.len() > 255 {
3260            return Err(ValidationError::new("subject_name_too_long")
3261                .with_message(format!("subject '{}' exceeds 255 characters", subject).into()));
3262        }
3263        validate_compatibility_level(level)?;
3264    }
3265    Ok(())
3266}
3267
3268impl Default for SchemaCompatibilitySpec {
3269    fn default() -> Self {
3270        Self {
3271            default_level: "BACKWARD".to_string(),
3272            allow_overrides: true,
3273            subjects: BTreeMap::new(),
3274        }
3275    }
3276}
3277
3278/// Schema format support configuration
3279#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3280#[serde(rename_all = "camelCase")]
3281pub struct SchemaFormatSpec {
3282    /// Enable Avro schema support
3283    #[serde(default = "default_true")]
3284    pub avro: bool,
3285
3286    /// Enable JSON Schema support
3287    #[serde(default = "default_true")]
3288    pub json_schema: bool,
3289
3290    /// Enable Protobuf schema support
3291    #[serde(default = "default_true")]
3292    pub protobuf: bool,
3293
3294    /// Strict schema validation
3295    #[serde(default = "default_true")]
3296    pub strict_validation: bool,
3297}
3298
3299impl Default for SchemaFormatSpec {
3300    fn default() -> Self {
3301        Self {
3302            avro: true,
3303            json_schema: true,
3304            protobuf: true,
3305            strict_validation: true,
3306        }
3307    }
3308}
3309
3310/// Schema contexts (multi-tenant) configuration
3311#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3312#[serde(rename_all = "camelCase")]
3313pub struct SchemaContextsSpec {
3314    /// Enable schema contexts for multi-tenant isolation
3315    #[serde(default)]
3316    pub enabled: bool,
3317
3318    /// Maximum number of contexts (0 = unlimited)
3319    #[serde(default)]
3320    #[validate(range(min = 0, max = 10000, message = "max contexts must be 0-10000"))]
3321    pub max_contexts: i32,
3322
3323    /// Pre-defined contexts to create on startup
3324    #[serde(default)]
3325    #[validate(length(max = 100, message = "maximum 100 pre-defined contexts"))]
3326    pub predefined: Vec<SchemaContextDefinition>,
3327}
3328
3329/// Pre-defined schema context definition
3330#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3331#[serde(rename_all = "camelCase")]
3332pub struct SchemaContextDefinition {
3333    /// Context name
3334    #[validate(length(min = 1, max = 128, message = "context name must be 1-128 characters"))]
3335    pub name: String,
3336
3337    /// Context description
3338    #[serde(default)]
3339    #[validate(length(max = 512, message = "description max 512 characters"))]
3340    pub description: Option<String>,
3341
3342    /// Whether this context is active
3343    #[serde(default = "default_true")]
3344    pub active: bool,
3345}
3346
3347/// Validation rules configuration for Schema Registry
3348#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3349#[serde(rename_all = "camelCase")]
3350pub struct SchemaValidationSpec {
3351    /// Enable content validation rules
3352    #[serde(default)]
3353    pub enabled: bool,
3354
3355    /// Maximum schema size in bytes
3356    #[serde(default = "default_max_schema_size")]
3357    #[validate(range(
3358        min = 1024,
3359        max = 10485760,
3360        message = "max schema size must be 1KB-10MB"
3361    ))]
3362    pub max_schema_size: i64,
3363
3364    /// Pre-defined validation rules
3365    #[serde(default)]
3366    #[validate(length(max = 100, message = "maximum 100 validation rules"))]
3367    pub rules: Vec<SchemaValidationRule>,
3368}
3369
3370fn default_max_schema_size() -> i64 {
3371    1_048_576 // 1MB
3372}
3373
3374impl Default for SchemaValidationSpec {
3375    fn default() -> Self {
3376        Self {
3377            enabled: false,
3378            max_schema_size: 1_048_576,
3379            rules: vec![],
3380        }
3381    }
3382}
3383
3384/// Validation rule definition
3385#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3386#[serde(rename_all = "camelCase")]
3387pub struct SchemaValidationRule {
3388    /// Rule name
3389    #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
3390    pub name: String,
3391
3392    /// Rule type: regex, field_exists, field_type, naming_convention, documentation
3393    #[validate(custom(function = "validate_rule_type"))]
3394    pub rule_type: String,
3395
3396    /// Rule configuration/pattern
3397    #[validate(length(min = 1, max = 4096, message = "pattern must be 1-4096 characters"))]
3398    pub pattern: String,
3399
3400    /// Subjects to apply this rule to (empty = all)
3401    #[serde(default)]
3402    pub subjects: Vec<String>,
3403
3404    /// Schema types to apply this rule to
3405    #[serde(default)]
3406    pub schema_types: Vec<String>,
3407
3408    /// Validation level: error, warning, info
3409    #[serde(default = "default_validation_level")]
3410    #[validate(custom(function = "validate_validation_level"))]
3411    pub level: String,
3412
3413    /// Rule description
3414    #[serde(default)]
3415    #[validate(length(max = 512, message = "description max 512 characters"))]
3416    pub description: Option<String>,
3417}
3418
3419fn validate_rule_type(rule_type: &str) -> Result<(), ValidationError> {
3420    let valid_types = [
3421        "regex",
3422        "field_exists",
3423        "field_type",
3424        "naming_convention",
3425        "documentation",
3426    ];
3427    if valid_types.contains(&rule_type) {
3428        Ok(())
3429    } else {
3430        Err(ValidationError::new("invalid_rule_type").with_message(
3431            format!(
3432                "'{}' is not valid. Must be one of: {:?}",
3433                rule_type, valid_types
3434            )
3435            .into(),
3436        ))
3437    }
3438}
3439
3440fn default_validation_level() -> String {
3441    "error".to_string()
3442}
3443
3444fn validate_validation_level(level: &str) -> Result<(), ValidationError> {
3445    match level {
3446        "error" | "warning" | "info" => Ok(()),
3447        _ => Err(ValidationError::new("invalid_validation_level")
3448            .with_message("validation level must be 'error', 'warning', or 'info'".into())),
3449    }
3450}
3451
3452/// Authentication configuration for Schema Registry
3453#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3454#[serde(rename_all = "camelCase")]
3455pub struct SchemaRegistryAuthSpec {
3456    /// Enable authentication
3457    #[serde(default)]
3458    pub enabled: bool,
3459
3460    /// Authentication method: basic, jwt, cedar
3461    #[serde(default)]
3462    #[validate(custom(function = "validate_auth_method"))]
3463    pub method: Option<String>,
3464
3465    /// Secret containing authentication credentials
3466    #[serde(default)]
3467    #[validate(custom(function = "validate_optional_k8s_name"))]
3468    pub credentials_secret_ref: Option<String>,
3469
3470    /// JWT/OIDC configuration (if method=jwt)
3471    #[serde(default)]
3472    #[validate(nested)]
3473    pub jwt: JwtAuthSpec,
3474
3475    /// Basic auth users (for method=basic, max 100)
3476    #[serde(default)]
3477    #[validate(length(max = 100, message = "maximum 100 users"))]
3478    pub users: Vec<SchemaRegistryUser>,
3479}
3480
3481fn validate_auth_method(method: &str) -> Result<(), ValidationError> {
3482    match method {
3483        "" | "basic" | "jwt" | "cedar" => Ok(()),
3484        _ => Err(ValidationError::new("invalid_auth_method")
3485            .with_message("auth method must be 'basic', 'jwt', or 'cedar'".into())),
3486    }
3487}
3488
3489/// JWT/OIDC authentication configuration
3490#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3491#[serde(rename_all = "camelCase")]
3492pub struct JwtAuthSpec {
3493    /// OIDC issuer URL
3494    #[serde(default)]
3495    pub issuer_url: Option<String>,
3496
3497    /// JWKS URL (for key validation)
3498    #[serde(default)]
3499    pub jwks_url: Option<String>,
3500
3501    /// Required audience claim
3502    #[serde(default)]
3503    pub audience: Option<String>,
3504
3505    /// Claim to use for username
3506    #[serde(default = "default_username_claim")]
3507    pub username_claim: String,
3508
3509    /// Claim to use for roles
3510    #[serde(default = "default_roles_claim")]
3511    pub roles_claim: String,
3512}
3513
3514fn default_username_claim() -> String {
3515    "sub".to_string()
3516}
3517
3518fn default_roles_claim() -> String {
3519    "roles".to_string()
3520}
3521
3522/// Schema Registry user definition (for basic auth)
3523#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3524#[serde(rename_all = "camelCase")]
3525pub struct SchemaRegistryUser {
3526    /// Username
3527    #[validate(length(min = 1, max = 128, message = "username must be 1-128 characters"))]
3528    pub username: String,
3529
3530    /// Secret key containing password
3531    #[serde(default)]
3532    #[validate(custom(function = "validate_optional_k8s_name"))]
3533    pub password_secret_key: Option<String>,
3534
3535    /// User role: admin, writer, reader
3536    #[serde(default = "default_user_role")]
3537    #[validate(custom(function = "validate_user_role"))]
3538    pub role: String,
3539
3540    /// Allowed subjects (empty = all, for reader/writer roles)
3541    #[serde(default)]
3542    pub allowed_subjects: Vec<String>,
3543}
3544
3545fn default_user_role() -> String {
3546    "reader".to_string()
3547}
3548
3549fn validate_user_role(role: &str) -> Result<(), ValidationError> {
3550    match role {
3551        "admin" | "writer" | "reader" => Ok(()),
3552        _ => Err(ValidationError::new("invalid_user_role")
3553            .with_message("role must be 'admin', 'writer', or 'reader'".into())),
3554    }
3555}
3556
3557/// TLS configuration for Schema Registry
3558#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3559#[serde(rename_all = "camelCase")]
3560pub struct SchemaRegistryTlsSpec {
3561    /// Enable TLS for HTTP server
3562    #[serde(default)]
3563    pub enabled: bool,
3564
3565    /// Secret containing TLS certificates
3566    #[serde(default)]
3567    #[validate(custom(function = "validate_optional_k8s_name"))]
3568    pub cert_secret_name: Option<String>,
3569
3570    /// Enable mTLS (mutual TLS)
3571    #[serde(default)]
3572    pub mtls_enabled: bool,
3573
3574    /// CA secret name for mTLS
3575    #[serde(default)]
3576    #[validate(custom(function = "validate_optional_k8s_name"))]
3577    pub ca_secret_name: Option<String>,
3578
3579    /// Enable TLS for broker connection
3580    #[serde(default)]
3581    pub broker_tls_enabled: bool,
3582
3583    /// Secret for broker TLS certificates
3584    #[serde(default)]
3585    #[validate(custom(function = "validate_optional_k8s_name"))]
3586    pub broker_cert_secret_name: Option<String>,
3587}
3588
3589/// Metrics configuration for Schema Registry
3590#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3591#[serde(rename_all = "camelCase")]
3592pub struct SchemaRegistryMetricsSpec {
3593    /// Enable Prometheus metrics endpoint
3594    #[serde(default = "default_true")]
3595    pub enabled: bool,
3596
3597    /// Metrics endpoint port
3598    #[serde(default = "default_schema_metrics_port")]
3599    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3600    pub port: i32,
3601
3602    /// Metrics endpoint path
3603    #[serde(default = "default_metrics_path")]
3604    pub path: String,
3605
3606    /// Create ServiceMonitor for Prometheus Operator
3607    #[serde(default)]
3608    pub service_monitor_enabled: bool,
3609
3610    /// ServiceMonitor scrape interval
3611    #[serde(default = "default_scrape_interval")]
3612    #[validate(custom(function = "validate_duration"))]
3613    pub scrape_interval: String,
3614}
3615
3616fn default_schema_metrics_port() -> i32 {
3617    9090
3618}
3619
3620fn default_metrics_path() -> String {
3621    "/metrics".to_string()
3622}
3623
3624impl Default for SchemaRegistryMetricsSpec {
3625    fn default() -> Self {
3626        Self {
3627            enabled: true,
3628            port: 9090,
3629            path: "/metrics".to_string(),
3630            service_monitor_enabled: false,
3631            scrape_interval: "30s".to_string(),
3632        }
3633    }
3634}
3635
3636/// External registry configuration for mirroring/sync
3637#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3638#[serde(rename_all = "camelCase")]
3639pub struct ExternalRegistrySpec {
3640    /// Enable external registry integration
3641    #[serde(default)]
3642    pub enabled: bool,
3643
3644    /// External registry type: confluent, glue
3645    #[serde(default)]
3646    #[validate(custom(function = "validate_external_registry_type"))]
3647    pub registry_type: Option<String>,
3648
3649    /// Confluent Schema Registry URL
3650    #[serde(default)]
3651    pub confluent_url: Option<String>,
3652
3653    /// AWS Glue registry ARN
3654    #[serde(default)]
3655    pub glue_registry_arn: Option<String>,
3656
3657    /// AWS region for Glue
3658    #[serde(default)]
3659    pub aws_region: Option<String>,
3660
3661    /// Sync mode: mirror (read from external), push (write to external), bidirectional
3662    #[serde(default)]
3663    #[validate(custom(function = "validate_sync_mode"))]
3664    pub sync_mode: Option<String>,
3665
3666    /// Subjects to sync (empty = all)
3667    #[serde(default)]
3668    pub sync_subjects: Vec<String>,
3669
3670    /// Sync interval in seconds
3671    #[serde(default = "default_sync_interval")]
3672    #[validate(range(
3673        min = 10,
3674        max = 86400,
3675        message = "sync interval must be 10-86400 seconds"
3676    ))]
3677    pub sync_interval_seconds: i32,
3678
3679    /// Credentials secret reference
3680    #[serde(default)]
3681    #[validate(custom(function = "validate_optional_k8s_name"))]
3682    pub credentials_secret_ref: Option<String>,
3683}
3684
3685fn validate_external_registry_type(reg_type: &str) -> Result<(), ValidationError> {
3686    match reg_type {
3687        "" | "confluent" | "glue" => Ok(()),
3688        _ => Err(ValidationError::new("invalid_external_registry_type")
3689            .with_message("registry type must be 'confluent' or 'glue'".into())),
3690    }
3691}
3692
3693fn validate_sync_mode(mode: &str) -> Result<(), ValidationError> {
3694    match mode {
3695        "" | "mirror" | "push" | "bidirectional" => Ok(()),
3696        _ => Err(ValidationError::new("invalid_sync_mode")
3697            .with_message("sync mode must be 'mirror', 'push', or 'bidirectional'".into())),
3698    }
3699}
3700
3701fn default_sync_interval() -> i32 {
3702    300 // 5 minutes
3703}
3704
3705/// Status of a RivvenSchemaRegistry resource
3706#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
3707#[serde(rename_all = "camelCase")]
3708pub struct RivvenSchemaRegistryStatus {
3709    /// Current phase of the registry
3710    #[serde(default)]
3711    pub phase: SchemaRegistryPhase,
3712
3713    /// Total number of replicas
3714    pub replicas: i32,
3715
3716    /// Number of ready replicas
3717    pub ready_replicas: i32,
3718
3719    /// Number of schemas registered
3720    pub schemas_registered: i32,
3721
3722    /// Number of subjects
3723    pub subjects_count: i32,
3724
3725    /// Number of active contexts (if contexts enabled)
3726    pub contexts_count: i32,
3727
3728    /// Current observed generation
3729    pub observed_generation: i64,
3730
3731    /// Conditions describing registry state
3732    #[serde(default)]
3733    pub conditions: Vec<SchemaRegistryCondition>,
3734
3735    /// Registry endpoints (URLs)
3736    #[serde(default)]
3737    pub endpoints: Vec<String>,
3738
3739    /// Storage backend status
3740    pub storage_status: Option<String>,
3741
3742    /// External registry sync status
3743    pub external_sync_status: Option<String>,
3744
3745    /// Last successful external sync time
3746    pub last_sync_time: Option<String>,
3747
3748    /// Last time the status was updated
3749    pub last_updated: Option<String>,
3750
3751    /// Error message if any
3752    pub message: Option<String>,
3753}
3754
3755/// Phase of the Schema Registry lifecycle
3756#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
3757pub enum SchemaRegistryPhase {
3758    /// Registry is being created
3759    #[default]
3760    Pending,
3761    /// Registry is being provisioned
3762    Provisioning,
3763    /// Registry is running and healthy
3764    Running,
3765    /// Registry is updating
3766    Updating,
3767    /// Registry is in degraded state
3768    Degraded,
3769    /// Registry has failed
3770    Failed,
3771    /// Registry is being deleted
3772    Terminating,
3773}
3774
3775/// Condition describing an aspect of Schema Registry state
3776#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
3777#[serde(rename_all = "camelCase")]
3778pub struct SchemaRegistryCondition {
3779    /// Type of condition (Ready, BrokerConnected, StorageHealthy, ExternalSyncHealthy)
3780    #[serde(rename = "type")]
3781    pub condition_type: String,
3782
3783    /// Status of the condition (True, False, Unknown)
3784    pub status: String,
3785
3786    /// Reason for the condition
3787    pub reason: Option<String>,
3788
3789    /// Human-readable message
3790    pub message: Option<String>,
3791
3792    /// Last transition time
3793    pub last_transition_time: Option<String>,
3794}
3795
3796impl RivvenSchemaRegistrySpec {
3797    /// Get the full container image including version
3798    pub fn get_image(&self) -> String {
3799        if let Some(ref image) = self.image {
3800            image.clone()
3801        } else {
3802            format!("ghcr.io/hupe1980/rivven-schema:{}", self.version)
3803        }
3804    }
3805
3806    /// Get labels for managed resources
3807    pub fn get_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3808        let mut labels = BTreeMap::new();
3809        labels.insert(
3810            "app.kubernetes.io/name".to_string(),
3811            "rivven-schema-registry".to_string(),
3812        );
3813        labels.insert(
3814            "app.kubernetes.io/instance".to_string(),
3815            registry_name.to_string(),
3816        );
3817        labels.insert(
3818            "app.kubernetes.io/component".to_string(),
3819            "schema-registry".to_string(),
3820        );
3821        labels.insert(
3822            "app.kubernetes.io/managed-by".to_string(),
3823            "rivven-operator".to_string(),
3824        );
3825        labels.insert(
3826            "app.kubernetes.io/version".to_string(),
3827            self.version.clone(),
3828        );
3829        labels
3830    }
3831
3832    /// Get selector labels for managed resources
3833    pub fn get_selector_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3834        let mut labels = BTreeMap::new();
3835        labels.insert(
3836            "app.kubernetes.io/name".to_string(),
3837            "rivven-schema-registry".to_string(),
3838        );
3839        labels.insert(
3840            "app.kubernetes.io/instance".to_string(),
3841            registry_name.to_string(),
3842        );
3843        labels
3844    }
3845}
3846
3847#[cfg(test)]
3848mod tests {
3849    use super::*;
3850
3851    #[test]
3852    fn test_default_spec() {
3853        let spec = RivvenClusterSpec {
3854            replicas: 3,
3855            version: "0.0.1".to_string(),
3856            image: None,
3857            image_pull_policy: "IfNotPresent".to_string(),
3858            image_pull_secrets: vec![],
3859            storage: StorageSpec::default(),
3860            resources: None,
3861            config: BrokerConfig::default(),
3862            tls: TlsSpec::default(),
3863            metrics: MetricsSpec::default(),
3864            affinity: None,
3865            node_selector: BTreeMap::new(),
3866            tolerations: vec![],
3867            pod_disruption_budget: PdbSpec::default(),
3868            service_account: None,
3869            pod_annotations: BTreeMap::new(),
3870            pod_labels: BTreeMap::new(),
3871            env: vec![],
3872            liveness_probe: ProbeSpec::default(),
3873            readiness_probe: ProbeSpec::default(),
3874            security_context: None,
3875            container_security_context: None,
3876        };
3877
3878        assert_eq!(spec.replicas, 3);
3879        assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven:0.0.1");
3880    }
3881
3882    #[test]
3883    fn test_get_labels() {
3884        let spec = RivvenClusterSpec {
3885            replicas: 3,
3886            version: "0.0.1".to_string(),
3887            image: None,
3888            image_pull_policy: "IfNotPresent".to_string(),
3889            image_pull_secrets: vec![],
3890            storage: StorageSpec::default(),
3891            resources: None,
3892            config: BrokerConfig::default(),
3893            tls: TlsSpec::default(),
3894            metrics: MetricsSpec::default(),
3895            affinity: None,
3896            node_selector: BTreeMap::new(),
3897            tolerations: vec![],
3898            pod_disruption_budget: PdbSpec::default(),
3899            service_account: None,
3900            pod_annotations: BTreeMap::new(),
3901            pod_labels: BTreeMap::new(),
3902            env: vec![],
3903            liveness_probe: ProbeSpec::default(),
3904            readiness_probe: ProbeSpec::default(),
3905            security_context: None,
3906            container_security_context: None,
3907        };
3908
3909        let labels = spec.get_labels("my-cluster");
3910        assert_eq!(
3911            labels.get("app.kubernetes.io/name"),
3912            Some(&"rivven".to_string())
3913        );
3914        assert_eq!(
3915            labels.get("app.kubernetes.io/instance"),
3916            Some(&"my-cluster".to_string())
3917        );
3918    }
3919
3920    #[test]
3921    fn test_custom_image() {
3922        let spec = RivvenClusterSpec {
3923            replicas: 1,
3924            version: "0.0.1".to_string(),
3925            image: Some("my-registry/rivven:custom".to_string()),
3926            image_pull_policy: "Always".to_string(),
3927            image_pull_secrets: vec![],
3928            storage: StorageSpec::default(),
3929            resources: None,
3930            config: BrokerConfig::default(),
3931            tls: TlsSpec::default(),
3932            metrics: MetricsSpec::default(),
3933            affinity: None,
3934            node_selector: BTreeMap::new(),
3935            tolerations: vec![],
3936            pod_disruption_budget: PdbSpec::default(),
3937            service_account: None,
3938            pod_annotations: BTreeMap::new(),
3939            pod_labels: BTreeMap::new(),
3940            env: vec![],
3941            liveness_probe: ProbeSpec::default(),
3942            readiness_probe: ProbeSpec::default(),
3943            security_context: None,
3944            container_security_context: None,
3945        };
3946
3947        assert_eq!(spec.get_image(), "my-registry/rivven:custom");
3948    }
3949
3950    #[test]
3951    fn test_cluster_phase_default() {
3952        let phase = ClusterPhase::default();
3953        assert_eq!(phase, ClusterPhase::Pending);
3954    }
3955
3956    #[test]
3957    fn test_storage_spec_default() {
3958        let storage = StorageSpec::default();
3959        assert_eq!(storage.size, "10Gi");
3960        assert!(storage.storage_class_name.is_none());
3961    }
3962
3963    #[test]
3964    fn test_broker_config_defaults() {
3965        let config = BrokerConfig::default();
3966        assert_eq!(config.default_partitions, 3);
3967        assert_eq!(config.default_replication_factor, 2);
3968        assert!(config.auto_create_topics);
3969    }
3970
3971    #[test]
3972    fn test_probe_spec_defaults() {
3973        let probe = ProbeSpec::default();
3974        assert!(probe.enabled);
3975        assert_eq!(probe.initial_delay_seconds, 30);
3976        assert_eq!(probe.period_seconds, 10);
3977    }
3978
3979    #[test]
3980    fn test_validate_quantity_valid() {
3981        assert!(validate_quantity("10Gi").is_ok());
3982        assert!(validate_quantity("100Mi").is_ok());
3983        assert!(validate_quantity("1Ti").is_ok());
3984        assert!(validate_quantity("500").is_ok());
3985        assert!(validate_quantity("1.5Gi").is_ok());
3986    }
3987
3988    #[test]
3989    fn test_validate_quantity_invalid() {
3990        assert!(validate_quantity("10GB").is_err()); // Wrong suffix
3991        assert!(validate_quantity("abc").is_err()); // Not a number
3992        assert!(validate_quantity("-10Gi").is_err()); // Negative
3993        assert!(validate_quantity("").is_err()); // Empty
3994    }
3995
3996    #[test]
3997    fn test_validate_k8s_name_valid() {
3998        assert!(validate_k8s_name("my-cluster").is_ok());
3999        assert!(validate_k8s_name("cluster123").is_ok());
4000        assert!(validate_k8s_name("a").is_ok());
4001    }
4002
4003    #[test]
4004    fn test_validate_k8s_name_invalid() {
4005        assert!(validate_k8s_name("My-Cluster").is_err()); // Uppercase
4006        assert!(validate_k8s_name("-cluster").is_err()); // Starts with dash
4007        assert!(validate_k8s_name("cluster-").is_err()); // Ends with dash
4008        assert!(validate_k8s_name("cluster_name").is_err()); // Underscore
4009    }
4010
4011    #[test]
4012    fn test_validate_compression_type() {
4013        assert!(validate_compression_type("lz4").is_ok());
4014        assert!(validate_compression_type("zstd").is_ok());
4015        assert!(validate_compression_type("none").is_ok());
4016        assert!(validate_compression_type("invalid").is_err());
4017    }
4018
4019    #[test]
4020    fn test_validate_segment_size() {
4021        assert!(validate_segment_size(1_048_576).is_ok()); // 1MB - minimum
4022        assert!(validate_segment_size(10_737_418_240).is_ok()); // 10GB - maximum
4023        assert!(validate_segment_size(1_073_741_824).is_ok()); // 1GB - valid
4024        assert!(validate_segment_size(1_000).is_err()); // Too small
4025        assert!(validate_segment_size(20_000_000_000).is_err()); // Too large
4026    }
4027
4028    #[test]
4029    fn test_validate_message_size() {
4030        assert!(validate_message_size(1_024).is_ok()); // 1KB - minimum
4031        assert!(validate_message_size(104_857_600).is_ok()); // 100MB - maximum
4032        assert!(validate_message_size(1_048_576).is_ok()); // 1MB - valid
4033        assert!(validate_message_size(100).is_err()); // Too small
4034        assert!(validate_message_size(200_000_000).is_err()); // Too large
4035    }
4036
4037    #[test]
4038    fn test_validate_pull_policy() {
4039        assert!(validate_pull_policy("Always").is_ok());
4040        assert!(validate_pull_policy("IfNotPresent").is_ok());
4041        assert!(validate_pull_policy("Never").is_ok());
4042        assert!(validate_pull_policy("always").is_err()); // Wrong case
4043        assert!(validate_pull_policy("Invalid").is_err());
4044    }
4045
4046    #[test]
4047    fn test_validate_duration() {
4048        assert!(validate_duration("30s").is_ok());
4049        assert!(validate_duration("1m").is_ok());
4050        assert!(validate_duration("5m30s").is_ok());
4051        assert!(validate_duration("1h").is_ok());
4052        assert!(validate_duration("invalid").is_err());
4053        assert!(validate_duration("30").is_err()); // Missing unit
4054    }
4055
4056    #[test]
4057    fn test_validate_access_modes() {
4058        assert!(validate_access_modes(&["ReadWriteOnce".to_string()]).is_ok());
4059        assert!(
4060            validate_access_modes(&["ReadWriteOnce".to_string(), "ReadOnlyMany".to_string()])
4061                .is_ok()
4062        );
4063        assert!(validate_access_modes(&["Invalid".to_string()]).is_err());
4064    }
4065
4066    // RivvenConnect CRD tests
4067    #[test]
4068    fn test_connect_spec_defaults() {
4069        let spec = RivvenConnectSpec {
4070            cluster_ref: ClusterReference {
4071                name: "my-cluster".to_string(),
4072                namespace: None,
4073            },
4074            replicas: 1,
4075            version: "0.0.1".to_string(),
4076            image: None,
4077            image_pull_policy: "IfNotPresent".to_string(),
4078            image_pull_secrets: vec![],
4079            resources: None,
4080            config: ConnectConfigSpec::default(),
4081            sources: vec![],
4082            sinks: vec![],
4083            settings: GlobalConnectSettings::default(),
4084            tls: ConnectTlsSpec::default(),
4085            pod_annotations: BTreeMap::new(),
4086            pod_labels: BTreeMap::new(),
4087            env: vec![],
4088            node_selector: BTreeMap::new(),
4089            tolerations: vec![],
4090            affinity: None,
4091            service_account: None,
4092            security_context: None,
4093            container_security_context: None,
4094        };
4095        assert_eq!(spec.replicas, 1);
4096    }
4097
4098    #[test]
4099    fn test_connect_phase_default() {
4100        let phase = ConnectPhase::default();
4101        assert_eq!(phase, ConnectPhase::Pending);
4102    }
4103
4104    #[test]
4105    fn test_validate_connector_type() {
4106        assert!(validate_connector_type("postgres-cdc").is_ok());
4107        assert!(validate_connector_type("mysql-cdc").is_ok());
4108        assert!(validate_connector_type("http").is_ok());
4109        assert!(validate_connector_type("stdout").is_ok());
4110        assert!(validate_connector_type("s3").is_ok());
4111        assert!(validate_connector_type("datagen").is_ok());
4112        assert!(validate_connector_type("custom-connector").is_ok());
4113    }
4114
4115    #[test]
4116    fn test_validate_start_offset() {
4117        assert!(validate_start_offset("earliest").is_ok());
4118        assert!(validate_start_offset("latest").is_ok());
4119        assert!(validate_start_offset("2024-01-01T00:00:00Z").is_ok());
4120        assert!(validate_start_offset("invalid").is_err());
4121    }
4122
4123    #[test]
4124    fn test_validate_image_valid() {
4125        assert!(validate_image("nginx").is_ok());
4126        assert!(validate_image("nginx:latest").is_ok());
4127        assert!(validate_image("ghcr.io/hupe1980/rivven:0.0.1").is_ok());
4128        assert!(validate_image("my-registry.io:5000/image:tag").is_ok());
4129        assert!(validate_image("localhost:5000/myimage").is_ok());
4130        assert!(validate_image("").is_ok()); // Empty allowed, uses default
4131    }
4132
4133    #[test]
4134    fn test_validate_image_invalid() {
4135        assert!(validate_image("/absolute/path").is_err()); // Starts with /
4136        assert!(validate_image("-invalid").is_err()); // Starts with -
4137        assert!(validate_image("image..path").is_err()); // Contains ..
4138                                                         // Very long image name
4139        let long_name = "a".repeat(300);
4140        assert!(validate_image(&long_name).is_err());
4141    }
4142
4143    #[test]
4144    fn test_validate_node_selector() {
4145        let mut selectors = BTreeMap::new();
4146        selectors.insert("node-type".to_string(), "compute".to_string());
4147        assert!(validate_node_selector(&selectors).is_ok());
4148
4149        // Too many selectors
4150        let mut many = BTreeMap::new();
4151        for i in 0..25 {
4152            many.insert(format!("key-{}", i), "value".to_string());
4153        }
4154        assert!(validate_node_selector(&many).is_err());
4155    }
4156
4157    #[test]
4158    fn test_validate_annotations() {
4159        let mut annotations = BTreeMap::new();
4160        annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
4161        assert!(validate_annotations(&annotations).is_ok());
4162
4163        // Too many annotations
4164        let mut many = BTreeMap::new();
4165        for i in 0..55 {
4166            many.insert(format!("annotation-{}", i), "value".to_string());
4167        }
4168        assert!(validate_annotations(&many).is_err());
4169    }
4170
4171    #[test]
4172    fn test_validate_labels() {
4173        let mut labels = BTreeMap::new();
4174        labels.insert("team".to_string(), "platform".to_string());
4175        assert!(validate_labels(&labels).is_ok());
4176
4177        // Reserved prefix
4178        let mut reserved = BTreeMap::new();
4179        reserved.insert("app.kubernetes.io/custom".to_string(), "value".to_string());
4180        assert!(validate_labels(&reserved).is_err());
4181    }
4182
4183    #[test]
4184    fn test_validate_raw_config() {
4185        let mut config = BTreeMap::new();
4186        config.insert("custom.setting".to_string(), "value".to_string());
4187        assert!(validate_raw_config(&config).is_ok());
4188
4189        // Forbidden key
4190        let mut forbidden = BTreeMap::new();
4191        forbidden.insert("command".to_string(), "/bin/sh".to_string());
4192        assert!(validate_raw_config(&forbidden).is_err());
4193
4194        // Too many entries
4195        let mut many = BTreeMap::new();
4196        for i in 0..55 {
4197            many.insert(format!("config-{}", i), "value".to_string());
4198        }
4199        assert!(validate_raw_config(&many).is_err());
4200    }
4201
4202    #[test]
4203    fn test_validate_int_or_percent() {
4204        assert!(validate_optional_int_or_percent("1").is_ok());
4205        assert!(validate_optional_int_or_percent("25%").is_ok());
4206        assert!(validate_optional_int_or_percent("100%").is_ok());
4207        assert!(validate_optional_int_or_percent("").is_ok()); // Empty allowed
4208        assert!(validate_optional_int_or_percent("abc").is_err());
4209        assert!(validate_optional_int_or_percent("25%%").is_err());
4210    }
4211
4212    #[test]
4213    fn test_tls_spec_default() {
4214        let tls = TlsSpec::default();
4215        assert!(!tls.enabled);
4216        assert!(tls.cert_secret_name.is_none());
4217        assert!(!tls.mtls_enabled);
4218    }
4219
4220    #[test]
4221    fn test_metrics_spec_default() {
4222        let metrics = MetricsSpec::default();
4223        assert!(metrics.enabled);
4224        assert_eq!(metrics.port, 9090);
4225    }
4226
4227    #[test]
4228    fn test_pdb_spec_default() {
4229        let pdb = PdbSpec::default();
4230        assert!(pdb.enabled);
4231        assert!(pdb.min_available.is_none());
4232        assert_eq!(pdb.max_unavailable, Some("1".to_string()));
4233    }
4234
4235    #[test]
4236    fn test_service_monitor_labels() {
4237        let mut labels = BTreeMap::new();
4238        labels.insert("release".to_string(), "prometheus".to_string());
4239        assert!(validate_service_monitor_labels(&labels).is_ok());
4240
4241        // Too many labels
4242        let mut many = BTreeMap::new();
4243        for i in 0..15 {
4244            many.insert(format!("label-{}", i), "value".to_string());
4245        }
4246        assert!(validate_service_monitor_labels(&many).is_err());
4247    }
4248
4249    #[test]
4250    fn test_cluster_condition_time_format() {
4251        let condition = ClusterCondition {
4252            condition_type: "Ready".to_string(),
4253            status: "True".to_string(),
4254            last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4255            reason: Some("AllReplicasReady".to_string()),
4256            message: Some("All replicas are ready".to_string()),
4257        };
4258        assert!(condition.last_transition_time.unwrap().contains('T'));
4259    }
4260
4261    // ========================================================================
4262    // RivvenSchemaRegistry CRD Tests
4263    // ========================================================================
4264
4265    #[test]
4266    fn test_schema_registry_phase_default() {
4267        let phase = SchemaRegistryPhase::default();
4268        assert_eq!(phase, SchemaRegistryPhase::Pending);
4269    }
4270
4271    #[test]
4272    fn test_schema_registry_server_spec_default() {
4273        let spec = SchemaRegistryServerSpec::default();
4274        assert_eq!(spec.port, 8081);
4275        assert_eq!(spec.bind_address, "0.0.0.0");
4276        assert_eq!(spec.timeout_seconds, 30);
4277        assert_eq!(spec.max_request_size, 10_485_760);
4278        assert!(!spec.cors_enabled);
4279    }
4280
4281    #[test]
4282    fn test_schema_registry_storage_spec_default() {
4283        let spec = SchemaRegistryStorageSpec::default();
4284        assert_eq!(spec.mode, "broker");
4285        assert_eq!(spec.topic, "_schemas");
4286        assert_eq!(spec.replication_factor, 3);
4287        assert_eq!(spec.partitions, 1);
4288        assert!(spec.normalize);
4289    }
4290
4291    #[test]
4292    fn test_validate_storage_mode() {
4293        assert!(validate_storage_mode("memory").is_ok());
4294        assert!(validate_storage_mode("broker").is_ok());
4295        assert!(validate_storage_mode("invalid").is_err());
4296    }
4297
4298    #[test]
4299    fn test_schema_compatibility_spec_default() {
4300        let spec = SchemaCompatibilitySpec::default();
4301        assert_eq!(spec.default_level, "BACKWARD");
4302        assert!(spec.allow_overrides);
4303        assert!(spec.subjects.is_empty());
4304    }
4305
4306    #[test]
4307    fn test_validate_compatibility_level() {
4308        assert!(validate_compatibility_level("BACKWARD").is_ok());
4309        assert!(validate_compatibility_level("BACKWARD_TRANSITIVE").is_ok());
4310        assert!(validate_compatibility_level("FORWARD").is_ok());
4311        assert!(validate_compatibility_level("FORWARD_TRANSITIVE").is_ok());
4312        assert!(validate_compatibility_level("FULL").is_ok());
4313        assert!(validate_compatibility_level("FULL_TRANSITIVE").is_ok());
4314        assert!(validate_compatibility_level("NONE").is_ok());
4315        assert!(validate_compatibility_level("invalid").is_err());
4316        assert!(validate_compatibility_level("backward").is_err()); // Case sensitive
4317    }
4318
4319    #[test]
4320    fn test_schema_format_spec_default() {
4321        let spec = SchemaFormatSpec::default();
4322        assert!(spec.avro);
4323        assert!(spec.json_schema);
4324        assert!(spec.protobuf);
4325        assert!(spec.strict_validation);
4326    }
4327
4328    #[test]
4329    fn test_schema_contexts_spec_default() {
4330        let spec = SchemaContextsSpec::default();
4331        assert!(!spec.enabled);
4332        assert_eq!(spec.max_contexts, 0);
4333        assert!(spec.predefined.is_empty());
4334    }
4335
4336    #[test]
4337    fn test_schema_validation_spec_default() {
4338        let spec = SchemaValidationSpec::default();
4339        assert!(!spec.enabled);
4340        assert_eq!(spec.max_schema_size, 1_048_576);
4341        assert!(spec.rules.is_empty());
4342    }
4343
4344    #[test]
4345    fn test_validate_rule_type() {
4346        assert!(validate_rule_type("regex").is_ok());
4347        assert!(validate_rule_type("field_exists").is_ok());
4348        assert!(validate_rule_type("field_type").is_ok());
4349        assert!(validate_rule_type("naming_convention").is_ok());
4350        assert!(validate_rule_type("documentation").is_ok());
4351        assert!(validate_rule_type("invalid").is_err());
4352    }
4353
4354    #[test]
4355    fn test_validate_validation_level() {
4356        assert!(validate_validation_level("error").is_ok());
4357        assert!(validate_validation_level("warning").is_ok());
4358        assert!(validate_validation_level("info").is_ok());
4359        assert!(validate_validation_level("invalid").is_err());
4360    }
4361
4362    #[test]
4363    fn test_schema_registry_auth_spec_default() {
4364        let spec = SchemaRegistryAuthSpec::default();
4365        assert!(!spec.enabled);
4366        assert!(spec.method.is_none());
4367        assert!(spec.credentials_secret_ref.is_none());
4368    }
4369
4370    #[test]
4371    fn test_validate_auth_method() {
4372        assert!(validate_auth_method("basic").is_ok());
4373        assert!(validate_auth_method("jwt").is_ok());
4374        assert!(validate_auth_method("cedar").is_ok());
4375        assert!(validate_auth_method("").is_ok());
4376        assert!(validate_auth_method("invalid").is_err());
4377    }
4378
4379    #[test]
4380    fn test_jwt_auth_spec_default() {
4381        let spec = JwtAuthSpec::default();
4382        assert!(spec.issuer_url.is_none());
4383        assert!(spec.jwks_url.is_none());
4384        // Note: Default::default() gives empty strings; serde defaults apply during deserialization
4385        assert!(spec.username_claim.is_empty());
4386        assert!(spec.roles_claim.is_empty());
4387    }
4388
4389    #[test]
4390    fn test_validate_user_role() {
4391        assert!(validate_user_role("admin").is_ok());
4392        assert!(validate_user_role("writer").is_ok());
4393        assert!(validate_user_role("reader").is_ok());
4394        assert!(validate_user_role("invalid").is_err());
4395    }
4396
4397    #[test]
4398    fn test_schema_registry_tls_spec_default() {
4399        let spec = SchemaRegistryTlsSpec::default();
4400        assert!(!spec.enabled);
4401        assert!(spec.cert_secret_name.is_none());
4402        assert!(!spec.mtls_enabled);
4403        assert!(!spec.broker_tls_enabled);
4404    }
4405
4406    #[test]
4407    fn test_schema_registry_metrics_spec_default() {
4408        let spec = SchemaRegistryMetricsSpec::default();
4409        assert!(spec.enabled);
4410        assert_eq!(spec.port, 9090);
4411        assert_eq!(spec.path, "/metrics");
4412        assert!(!spec.service_monitor_enabled);
4413        assert_eq!(spec.scrape_interval, "30s");
4414    }
4415
4416    #[test]
4417    fn test_external_registry_spec_default() {
4418        let spec = ExternalRegistrySpec::default();
4419        assert!(!spec.enabled);
4420        assert!(spec.registry_type.is_none());
4421        assert!(spec.confluent_url.is_none());
4422        assert!(spec.glue_registry_arn.is_none());
4423        // Note: Default::default() gives 0; serde default (300) applies during deserialization
4424        assert_eq!(spec.sync_interval_seconds, 0);
4425    }
4426
4427    #[test]
4428    fn test_validate_external_registry_type() {
4429        assert!(validate_external_registry_type("confluent").is_ok());
4430        assert!(validate_external_registry_type("glue").is_ok());
4431        assert!(validate_external_registry_type("").is_ok());
4432        assert!(validate_external_registry_type("invalid").is_err());
4433    }
4434
4435    #[test]
4436    fn test_validate_sync_mode() {
4437        assert!(validate_sync_mode("mirror").is_ok());
4438        assert!(validate_sync_mode("push").is_ok());
4439        assert!(validate_sync_mode("bidirectional").is_ok());
4440        assert!(validate_sync_mode("").is_ok());
4441        assert!(validate_sync_mode("invalid").is_err());
4442    }
4443
4444    #[test]
4445    fn test_schema_registry_spec_get_image_default() {
4446        let spec = RivvenSchemaRegistrySpec {
4447            cluster_ref: ClusterReference {
4448                name: "test".to_string(),
4449                namespace: None,
4450            },
4451            replicas: 1,
4452            version: "0.0.1".to_string(),
4453            image: None,
4454            image_pull_policy: "IfNotPresent".to_string(),
4455            image_pull_secrets: vec![],
4456            resources: None,
4457            server: SchemaRegistryServerSpec::default(),
4458            storage: SchemaRegistryStorageSpec::default(),
4459            compatibility: SchemaCompatibilitySpec::default(),
4460            schemas: SchemaFormatSpec::default(),
4461            contexts: SchemaContextsSpec::default(),
4462            validation: SchemaValidationSpec::default(),
4463            auth: SchemaRegistryAuthSpec::default(),
4464            tls: SchemaRegistryTlsSpec::default(),
4465            metrics: SchemaRegistryMetricsSpec::default(),
4466            external: ExternalRegistrySpec::default(),
4467            pod_annotations: BTreeMap::new(),
4468            pod_labels: BTreeMap::new(),
4469            env: vec![],
4470            node_selector: BTreeMap::new(),
4471            tolerations: vec![],
4472            affinity: None,
4473            service_account: None,
4474            security_context: None,
4475            container_security_context: None,
4476            liveness_probe: ProbeSpec::default(),
4477            readiness_probe: ProbeSpec::default(),
4478            pod_disruption_budget: PdbSpec::default(),
4479        };
4480        assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven-schema:0.0.1");
4481    }
4482
4483    #[test]
4484    fn test_schema_registry_spec_get_image_custom() {
4485        let spec = RivvenSchemaRegistrySpec {
4486            cluster_ref: ClusterReference {
4487                name: "test".to_string(),
4488                namespace: None,
4489            },
4490            replicas: 1,
4491            version: "0.0.1".to_string(),
4492            image: Some("custom/schema-registry:latest".to_string()),
4493            image_pull_policy: "IfNotPresent".to_string(),
4494            image_pull_secrets: vec![],
4495            resources: None,
4496            server: SchemaRegistryServerSpec::default(),
4497            storage: SchemaRegistryStorageSpec::default(),
4498            compatibility: SchemaCompatibilitySpec::default(),
4499            schemas: SchemaFormatSpec::default(),
4500            contexts: SchemaContextsSpec::default(),
4501            validation: SchemaValidationSpec::default(),
4502            auth: SchemaRegistryAuthSpec::default(),
4503            tls: SchemaRegistryTlsSpec::default(),
4504            metrics: SchemaRegistryMetricsSpec::default(),
4505            external: ExternalRegistrySpec::default(),
4506            pod_annotations: BTreeMap::new(),
4507            pod_labels: BTreeMap::new(),
4508            env: vec![],
4509            node_selector: BTreeMap::new(),
4510            tolerations: vec![],
4511            affinity: None,
4512            service_account: None,
4513            security_context: None,
4514            container_security_context: None,
4515            liveness_probe: ProbeSpec::default(),
4516            readiness_probe: ProbeSpec::default(),
4517            pod_disruption_budget: PdbSpec::default(),
4518        };
4519        assert_eq!(spec.get_image(), "custom/schema-registry:latest");
4520    }
4521
4522    #[test]
4523    fn test_schema_registry_spec_get_labels() {
4524        let spec = RivvenSchemaRegistrySpec {
4525            cluster_ref: ClusterReference {
4526                name: "test".to_string(),
4527                namespace: None,
4528            },
4529            replicas: 1,
4530            version: "0.0.1".to_string(),
4531            image: None,
4532            image_pull_policy: "IfNotPresent".to_string(),
4533            image_pull_secrets: vec![],
4534            resources: None,
4535            server: SchemaRegistryServerSpec::default(),
4536            storage: SchemaRegistryStorageSpec::default(),
4537            compatibility: SchemaCompatibilitySpec::default(),
4538            schemas: SchemaFormatSpec::default(),
4539            contexts: SchemaContextsSpec::default(),
4540            validation: SchemaValidationSpec::default(),
4541            auth: SchemaRegistryAuthSpec::default(),
4542            tls: SchemaRegistryTlsSpec::default(),
4543            metrics: SchemaRegistryMetricsSpec::default(),
4544            external: ExternalRegistrySpec::default(),
4545            pod_annotations: BTreeMap::new(),
4546            pod_labels: BTreeMap::new(),
4547            env: vec![],
4548            node_selector: BTreeMap::new(),
4549            tolerations: vec![],
4550            affinity: None,
4551            service_account: None,
4552            security_context: None,
4553            container_security_context: None,
4554            liveness_probe: ProbeSpec::default(),
4555            readiness_probe: ProbeSpec::default(),
4556            pod_disruption_budget: PdbSpec::default(),
4557        };
4558
4559        let labels = spec.get_labels("my-registry");
4560        assert_eq!(
4561            labels.get("app.kubernetes.io/name"),
4562            Some(&"rivven-schema-registry".to_string())
4563        );
4564        assert_eq!(
4565            labels.get("app.kubernetes.io/instance"),
4566            Some(&"my-registry".to_string())
4567        );
4568        assert_eq!(
4569            labels.get("app.kubernetes.io/component"),
4570            Some(&"schema-registry".to_string())
4571        );
4572    }
4573
4574    #[test]
4575    fn test_schema_registry_condition_time_format() {
4576        let condition = SchemaRegistryCondition {
4577            condition_type: "Ready".to_string(),
4578            status: "True".to_string(),
4579            last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4580            reason: Some("AllReplicasReady".to_string()),
4581            message: Some("All replicas are ready".to_string()),
4582        };
4583        assert!(condition.last_transition_time.unwrap().contains('T'));
4584    }
4585
4586    #[test]
4587    fn test_validate_subject_compatibility_map() {
4588        let mut subjects = BTreeMap::new();
4589        subjects.insert("orders-value".to_string(), "BACKWARD".to_string());
4590        subjects.insert("users-value".to_string(), "FULL".to_string());
4591        assert!(validate_subject_compatibility_map(&subjects).is_ok());
4592
4593        // Invalid compatibility level
4594        let mut invalid = BTreeMap::new();
4595        invalid.insert("test".to_string(), "invalid".to_string());
4596        assert!(validate_subject_compatibility_map(&invalid).is_err());
4597
4598        // Subject name too long
4599        let mut long_name = BTreeMap::new();
4600        long_name.insert("a".repeat(300), "BACKWARD".to_string());
4601        assert!(validate_subject_compatibility_map(&long_name).is_err());
4602    }
4603
4604    // ========================================================================
4605    // Advanced CDC Configuration Tests
4606    // ========================================================================
4607
4608    #[test]
4609    fn test_snapshot_cdc_config_spec_serde_defaults() {
4610        // Test that serde deserialization applies default functions
4611        let json = r#"{}"#;
4612        let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4613        assert_eq!(config.batch_size, 10_000);
4614        assert_eq!(config.parallel_tables, 4);
4615        assert_eq!(config.query_timeout_secs, 300);
4616        assert_eq!(config.throttle_delay_ms, 0);
4617        assert_eq!(config.max_retries, 3);
4618        assert!(config.include_tables.is_empty());
4619        assert!(config.exclude_tables.is_empty());
4620    }
4621
4622    #[test]
4623    fn test_snapshot_cdc_config_spec_validation() {
4624        let json = r#"{"batchSize": 50}"#;
4625        let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4626        assert!(config.validate().is_err());
4627
4628        let json = r#"{"batchSize": 5000, "parallelTables": 50}"#;
4629        let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4630        assert!(config.validate().is_err());
4631    }
4632
4633    #[test]
4634    fn test_incremental_snapshot_spec_serde_defaults() {
4635        let json = r#"{}"#;
4636        let config: IncrementalSnapshotSpec = serde_json::from_str(json).unwrap();
4637        assert!(!config.enabled);
4638        assert_eq!(config.chunk_size, 1024);
4639        assert!(config.watermark_strategy.is_empty());
4640        assert_eq!(config.max_concurrent_chunks, 1);
4641    }
4642
4643    #[test]
4644    fn test_validate_watermark_strategy() {
4645        assert!(validate_watermark_strategy("insert").is_ok());
4646        assert!(validate_watermark_strategy("update_and_insert").is_ok());
4647        assert!(validate_watermark_strategy("").is_ok());
4648        assert!(validate_watermark_strategy("invalid").is_err());
4649    }
4650
4651    #[test]
4652    fn test_heartbeat_cdc_spec_serde_defaults() {
4653        let json = r#"{}"#;
4654        let config: HeartbeatCdcSpec = serde_json::from_str(json).unwrap();
4655        assert!(!config.enabled);
4656        assert_eq!(config.interval_secs, 10);
4657        assert_eq!(config.max_lag_secs, 300);
4658        assert!(!config.emit_events);
4659    }
4660
4661    #[test]
4662    fn test_deduplication_cdc_spec_serde_defaults() {
4663        let json = r#"{}"#;
4664        let config: DeduplicationCdcSpec = serde_json::from_str(json).unwrap();
4665        assert!(!config.enabled);
4666        assert_eq!(config.bloom_expected_insertions, 100_000);
4667        assert_eq!(config.bloom_fpp, 0.01);
4668        assert_eq!(config.lru_size, 10_000);
4669        assert_eq!(config.window_secs, 3600);
4670    }
4671
4672    #[test]
4673    fn test_transaction_topic_spec_serde_defaults() {
4674        let json = r#"{}"#;
4675        let config: TransactionTopicSpec = serde_json::from_str(json).unwrap();
4676        assert!(!config.enabled);
4677        assert!(config.topic_name.is_none());
4678        assert!(config.include_data_collections);
4679        assert_eq!(config.min_events_threshold, 0);
4680    }
4681
4682    #[test]
4683    fn test_schema_change_topic_spec_serde_defaults() {
4684        let json = r#"{}"#;
4685        let config: SchemaChangeTopicSpec = serde_json::from_str(json).unwrap();
4686        assert!(!config.enabled);
4687        assert!(config.topic_name.is_none());
4688        assert!(config.include_columns);
4689        assert!(config.schemas.is_empty());
4690    }
4691
4692    #[test]
4693    fn test_tombstone_cdc_spec_serde_defaults() {
4694        let json = r#"{}"#;
4695        let config: TombstoneCdcSpec = serde_json::from_str(json).unwrap();
4696        assert!(!config.enabled);
4697        assert!(config.after_delete);
4698        assert!(config.behavior.is_empty());
4699    }
4700
4701    #[test]
4702    fn test_validate_tombstone_behavior() {
4703        assert!(validate_tombstone_behavior("emit_null").is_ok());
4704        assert!(validate_tombstone_behavior("emit_with_key").is_ok());
4705        assert!(validate_tombstone_behavior("").is_ok());
4706        assert!(validate_tombstone_behavior("invalid").is_err());
4707    }
4708
4709    #[test]
4710    fn test_field_encryption_spec_serde_defaults() {
4711        let json = r#"{}"#;
4712        let config: FieldEncryptionSpec = serde_json::from_str(json).unwrap();
4713        assert!(!config.enabled);
4714        assert!(config.key_secret_ref.is_none());
4715        assert!(config.fields.is_empty());
4716        assert_eq!(config.algorithm, "aes-256-gcm");
4717    }
4718
4719    #[test]
4720    fn test_read_only_replica_spec_serde_defaults() {
4721        let json = r#"{}"#;
4722        let config: ReadOnlyReplicaSpec = serde_json::from_str(json).unwrap();
4723        assert!(!config.enabled);
4724        assert_eq!(config.lag_threshold_ms, 5000);
4725        assert!(config.deduplicate);
4726        assert!(config.watermark_source.is_empty());
4727    }
4728
4729    #[test]
4730    fn test_validate_watermark_source() {
4731        assert!(validate_watermark_source("primary").is_ok());
4732        assert!(validate_watermark_source("replica").is_ok());
4733        assert!(validate_watermark_source("").is_ok());
4734        assert!(validate_watermark_source("invalid").is_err());
4735    }
4736
4737    #[test]
4738    fn test_event_router_spec_serde_defaults() {
4739        let json = r#"{}"#;
4740        let config: EventRouterSpec = serde_json::from_str(json).unwrap();
4741        assert!(!config.enabled);
4742        assert!(config.default_destination.is_none());
4743        assert!(config.dead_letter_queue.is_none());
4744        assert!(!config.drop_unroutable);
4745        assert!(config.rules.is_empty());
4746    }
4747
4748    #[test]
4749    fn test_validate_route_condition_type() {
4750        assert!(validate_route_condition_type("always").is_ok());
4751        assert!(validate_route_condition_type("table").is_ok());
4752        assert!(validate_route_condition_type("table_pattern").is_ok());
4753        assert!(validate_route_condition_type("schema").is_ok());
4754        assert!(validate_route_condition_type("operation").is_ok());
4755        assert!(validate_route_condition_type("field_equals").is_ok());
4756        assert!(validate_route_condition_type("field_exists").is_ok());
4757        assert!(validate_route_condition_type("invalid").is_err());
4758    }
4759
4760    #[test]
4761    fn test_partitioner_spec_serde_defaults() {
4762        let json = r#"{}"#;
4763        let config: PartitionerSpec = serde_json::from_str(json).unwrap();
4764        assert!(!config.enabled);
4765        assert_eq!(config.num_partitions, 16);
4766        assert_eq!(config.strategy, "key_hash");
4767        assert!(config.key_columns.is_empty());
4768    }
4769
4770    #[test]
4771    fn test_validate_partition_strategy() {
4772        assert!(validate_partition_strategy("round_robin").is_ok());
4773        assert!(validate_partition_strategy("key_hash").is_ok());
4774        assert!(validate_partition_strategy("table_hash").is_ok());
4775        assert!(validate_partition_strategy("full_table_hash").is_ok());
4776        assert!(validate_partition_strategy("sticky").is_ok());
4777        assert!(validate_partition_strategy("invalid").is_err());
4778    }
4779
4780    #[test]
4781    fn test_validate_smt_transform_type() {
4782        assert!(validate_smt_transform_type("extract_new_record_state").is_ok());
4783        assert!(validate_smt_transform_type("mask_field").is_ok());
4784        assert!(validate_smt_transform_type("filter").is_ok());
4785        assert!(validate_smt_transform_type("flatten").is_ok());
4786        assert!(validate_smt_transform_type("cast").is_ok());
4787        assert!(validate_smt_transform_type("regex_router").is_ok());
4788        assert!(validate_smt_transform_type("content_router").is_ok());
4789        assert!(validate_smt_transform_type("invalid").is_err());
4790    }
4791
4792    #[test]
4793    fn test_parallel_cdc_spec_serde_defaults() {
4794        let json = r#"{}"#;
4795        let config: ParallelCdcSpec = serde_json::from_str(json).unwrap();
4796        assert!(!config.enabled);
4797        assert_eq!(config.concurrency, 4);
4798        assert_eq!(config.per_table_buffer, 1000);
4799        assert_eq!(config.output_buffer, 10_000);
4800        assert!(config.work_stealing);
4801        assert!(config.per_table_rate_limit.is_none());
4802        assert_eq!(config.shutdown_timeout_secs, 30);
4803    }
4804
4805    #[test]
4806    fn test_outbox_spec_serde_defaults() {
4807        let json = r#"{}"#;
4808        let config: OutboxSpec = serde_json::from_str(json).unwrap();
4809        assert!(!config.enabled);
4810        assert_eq!(config.table_name, "outbox");
4811        assert_eq!(config.poll_interval_ms, 100);
4812        assert_eq!(config.batch_size, 100);
4813        assert_eq!(config.max_retries, 3);
4814        assert_eq!(config.delivery_timeout_secs, 30);
4815        assert!(config.ordered_delivery);
4816        assert_eq!(config.retention_secs, 86400);
4817        assert_eq!(config.max_concurrency, 10);
4818    }
4819
4820    #[test]
4821    fn test_health_monitor_spec_serde_defaults() {
4822        let json = r#"{}"#;
4823        let config: HealthMonitorSpec = serde_json::from_str(json).unwrap();
4824        assert!(!config.enabled);
4825        assert_eq!(config.check_interval_secs, 10);
4826        assert_eq!(config.max_lag_ms, 30_000);
4827        assert_eq!(config.failure_threshold, 3);
4828        assert_eq!(config.success_threshold, 2);
4829        assert_eq!(config.check_timeout_secs, 5);
4830        assert!(config.auto_recovery);
4831        assert_eq!(config.recovery_delay_secs, 1);
4832        assert_eq!(config.max_recovery_delay_secs, 60);
4833    }
4834
4835    #[test]
4836    fn test_signal_table_spec_serde_defaults() {
4837        let json = r#"{}"#;
4838        let config: SignalTableSpec = serde_json::from_str(json).unwrap();
4839        assert!(!config.enabled);
4840        assert!(config.data_collection.is_none());
4841        assert!(config.topic.is_none());
4842        assert!(config.enabled_channels.is_empty());
4843        assert_eq!(config.poll_interval_ms, 1000);
4844    }
4845}